手把手带你入门RabbitMQ

mq / 264人浏览 / 0人评论
什么是MQ?
消息总线(Message Queue)是一种跨进程、异步的通信机制,用于上下游传递消息,由消息系统来确保消息的可靠传递。
MQ的作用是什么?
应用解耦、异步、流量削锋、数据分发、错峰流控、日志收集等等...
什么是RabbitMQ?
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。

安装RabbitMQ

下载安装Erlang和RabbitMQ 百度云下载,提取码:tiok
推荐百度云下载,官网太慢且容易因为版本问题不能打开管理控制台一步一步按操作先安装好Erlang再安装完RabbitMQ,安装完成后以管理员的方式运行CMD,进入RabbitMQ安装目录下sbin,输入

rabbitmq-plugins enable rabbitmq_management

访问:http://localhost:15672 默认账号:guest 默认密码:guest

输入图片说明
安装完成

创建生产方:rabbitmq-provider

创建SpringBoot项目,此处忽略

配置RabbitMQ


这里以Maven方式引入到项目

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

推送消息至RabbitMQ


send.java

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] arg) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送 '" + message + "'");
        }
    }
}

分析queueDeclare(queue, durable, exclusive, autoDelete, arguments)方法

输入图片说明

queue:队列名称
durable:是否持久化默认为false,队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库。设置消息持久化必须先设置队列持久化,要不然队列不持久化,消息持久化,队列都不存在了,消息存在还有什么意义。消息持久化需要将交换机持久化、队列持久化、消息持久化,才能最终达到持久化的目的。

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
 
String message = "Hello RabbitMQ: ";
// 设置消息持久化
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
properties.deliveryMode(2);  // 设置消息是否持久化,1: 非持久化 2:持久化
 
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8"));

exclusive:是否排外的,有两个作用
1.当连接关闭时connection.close()该队列是否会自动删除;
2.该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如     果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:

com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)

一般等于true的话用于一个队列只能有一个消费者来消费的场景。
autoDelete是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers=0时队列就会自动删除。
arguments:

Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL,设置队列里所有消息过期时间为10秒,10秒后这个队列消息清零。

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-message-ttl", 10000);
 
// 声明队列时指定队列中的消息过期时间
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

Auto Expire(x-expires): 用于当多长时间没有消费者访问该队列的时候,该队列会自动删除,可以设置一个延迟时间,如仅启动一个生产者,10秒之后该队列会删除,或者启动一个生产者,再启动一个消费者,消费者运行结束后10秒,队列也会被删除。

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-expires", 10000);
 
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

Max Length(x-max-length):用于指定队列的长度,如果不指定,可以认为是无限长,例如指定队列的长度是4,当超过4条消息,前面的消息将被删除,给后面的消息腾位。

Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-max-length", 4);
 
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
for(int i = 1; i <= 5; i++) {
     channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes("UTF-8"));
}

Max Length Bytes(x-max-length-bytes):用于指定队列存储消息的占用空间大小,当达到最大值是会删除之前的数据腾出空间。

Map<String, Object> arguments = new HashMap<String, Object>();
rguments.put("x-max-length-bytes", 1024);
 
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

Dead letter exchange(x-dead-letter-exchange):当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX。
Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK。
Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费。
Lazy mode(x-queue-mode=lazy):Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中。
Master locator(x-queue-master-locator)
运行结果

发送 'Hello World!'

打开后台查看

输入图片说明

输入图片说明
已经可以看到我们已经创建一个队列hello并且发送了一条消息

创建消费方:rabbitmq-consumer

创建SpringBoot项目及配置RabbitMQ此处忽略

从RabbitMQ拉取消息消费


Recv.java

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] arg) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("收到 '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

运行结果:

收到 'Hello World!'

启动后发现收到了我们刚才发送的消息,此时一个最简单的消息队列就搭建好了。

0 条评论

还没有人发表评论

发表评论 取消回复

记住我的信息,方便下次评论
有人回复时邮件通知我