RabbitMQ之队列优先级

RabbitMQ之队列优先级

优先级队列,顾名思义,具有更高优先级的队列具有较高的优先权,优先级高的消息具备优先被消费的特权。

本文主要讲解如何使用RabbitMQ实现队列优先级。

可以通过RabbitMQ管理界面配置队列的优先级属性,如下图的x-max-priority.

也可以通过代码去实现,比如:

Map<String,Object> args = new HashMap<String,Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("queue_priority", true, false, false, args);

配置了队列优先级的属性之后,可以在管理页面看到Pri的标记:

上面配置的是一个队列queue的最大优先级。之后要在发送的消息中设置消息本身的优先级,如下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes());

下面演示一段生产-消费的代码。首先producer端先生产10个消息,第奇数个消息具备优先级,第偶数个消息就是默认的优先级(最低:0)。

生产端:

package com.vms.test.zzh.rabbitmq.priority;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
* Created by hidden on 2017/2/14.
*/
public class PriorityProducer {
public static final String ip = "xx.xx.xx.73";
public static final int port = 5672;
public static final String username = "root";
public static final String password = "root";

public static void main(String[] arggs) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setPassword(password);
connectionFactory.setUsername(username);
connectionFactory.setPort(port);
connectionFactory.setHost(ip);

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

//create exchange
channel.exchangeDeclare("exchange_priority","direct",true);

//create queue with priority
Map<String,Object> args = new HashMap<String,Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("queue_priority", true, false, false, args);
channel.queueBind("queue_priority", "exchange_priority", "rk_priority");

//send message with priority
for(int i=0;i<10;i++) {
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
if(i%2!=0)
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchange_priority","rk_priority",properties,("messages-"+i).getBytes());
}

channel.close();
connection.close();
}
}

消费端:

package com.vms.test.zzh.rabbitmq.priority;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* Created by hidden on 2017/2/14.
*/
public class PriorityConsumer {
public static final String ip = "xx.xx.xx.73";
public static final int port = 5672;
public static final String username = "root";
public static final String password = "root";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setPassword(password);
connectionFactory.setUsername(username);
connectionFactory.setPort(port);
connectionFactory.setHost(ip);

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("queue_priority", false, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(msg);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}

消费端运行结果:

messages-1
messages-3
messages-5
messages-7
messages-9
messages-0
messages-2
messages-4
messages-6
messages-8

查看运行结果可以验证优先级队列的作用。

当然,在消费端速度大于生产端速度,且broker中没有消息堆积的话,对发送的消息设置优先级也没什么实际意义,因为发送端刚发送完一条消息就被消费端消费了,那么就相当于broker至多只有一条消息,那么对于单条消息来说优先级是没有什么意义的。


欢迎支持笔者的作品《深入理解Kafka: 核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客(ID: hiddenkafka)。
本文作者: 朱小厮

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×