在上一篇手把手带你入门RabbitMQ中我们实现了程序从一个已经命名的队列里发送和接收消息,并且深入了解了分析RabbitMQ中创建队列的方法queueDeclare(queue, durable, exclusive, autoDelete, arguments),这篇文章我们将要创建工作队列把一些耗时的任务分配给多个worker
工作队列的主要作用途场景是在web应用程序中,对于一些在一个短的http请求里无法完成的复杂任务进行处理,避开立刻处理某个资源消耗较大的任务并且等待它执行完成所消耗的时间成本,取而代之是在请求之后将它加入计划列表后面执行,将任务分装成一个消息发送到任务队列,后台的工作队列接收到消息后会立刻执行,当运行多个任务执行器时,任务将会在它们之间共享。
创建一个工作队列并且发送消息至队列
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//指定队列持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
for(int i=0;i<5;i++) {
String message = i+"."+"Hello RabbitMQ WorkQueue!";
//指定消息持久化
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println("发送:" + message + "");
}
}
}
}
运行生产方得到结果:
发送:0.Hello RabbitMQ WorkQueue!
发送:1.Hello RabbitMQ WorkQueue!
发送:2.Hello RabbitMQ WorkQueue!
发送:3.Hello RabbitMQ WorkQueue!
发送:4.Hello RabbitMQ WorkQueue!
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//指定队列持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//指定该消费者同时只接收一条消息
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("收到:" + message + "");
try {
doWork(message);
} finally {
//返回接收到消息的确认信息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
//打开消息应答机制
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
/**
* 模拟执行任务消耗时间
* @param task
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(3000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
运行消费端1得到结果:
收到:0.Hello RabbitMQ WorkQueue!
收到:4.Hello RabbitMQ WorkQueue!
这里添加创建消费端2可以解决在线上环境后台队列里如果积压大量数据处理不过来的时候,就可以通过添加worker的方式也解决,两个消费端同时处理任务,默认情况下RabbitMQ会把每个消息以此轮询发到每个消费端。
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//指定队列持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//指定该消费者同时只接收一条消息
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("收到:" + message + "");
try {
doWork(message);
} finally {
//返回接收到消息的确认信息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
//打开消息应答机制
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
/**
* 模拟执行任务消耗时间
* @param task
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
运行消费端2得到结果:
收到:1.Hello RabbitMQ WorkQueue!
收到:2.Hello RabbitMQ WorkQueue!
收到:3.Hello RabbitMQ WorkQueue!
在每个消费端我们均添加配置了消费应答模式,主要为了应对向消费端推送某条消息后RabbitMQ会立刻删除此条信息,这样如果我们kill掉某个worker的话,将会失去该worker正在处理的任务(还未完成),还会丢失已经发送到该消费端未被消费的任务,为了保证不丢失消息,RabbitMQ支持消费应答机制,当消费端接收消息并且完成任务后会向RabbitMQ发送一条确认消息,然后RabbitMQ才会将消息删除,消费端处理完任务之后没发送确认信息就挂了,RabbitMQ将会视为未完成,然后将此条消息发送给另一个消费端。
通过设置队列持久化,消息持久化可以将消息持久化存储到系统磁盘,生产方和消费方都使用queueDeclare方法来指定持久化属性,另外消息持久化不一定能保证消息一定不会丢失,因为RabbitMQ接收到消息时会先将消息写到内存缓冲区,并不是直接将单条信息存储于磁盘。消息的持久化不是健壮的,如果需要一套健壮的持久化方案,可以使用publisher confirms
channel.basicQos(1);
使用basicQos设置该消费端可以同时接收多少条消息,这里设置为1,表示RabbitMQ同一时间只能发送一条任务至该消费端,等到处理完这条之后接收处理下一条。
感谢博主,喝杯咖啡~
感谢博主,喝杯咖啡~
还没有人发表评论