RabbitMQ知识点梳理

RabbitMQ是一个开源的消息代理程序,实现了高级消息队列协议(AMQP)并且可以通过插件的方式进行扩展,使其支持STOMP和MQTT等协议,是目前应用最为广泛的面向消息的中间件之一。RabbitMQ支持各种主流的编程语言,比如Python、Java、C#、Go、Swift等,并且可以在各种操作系统平台上运行,通过扩展的MQTT协议可以很轻量的在移动设备上实现消息推送,早期的Android推送服务就是采用MQTT的方式进行消息订阅和推送的。

在开始之前,先对消息队列(MQ)的各个名词和定义等基本概念进行一下基本的认识。

基本概念

producer

producer(生产者),发送消息的程序代表生产者,生产者只进行消息的发送,可以用首字母P来表示。

queue

queue(队列),这里的队列指的是消息队列,消息队列的本质是一个消息缓冲区,消息队列把消息存储在主机的内存和磁盘上。多个生产者可以往一个队列发送数据,多个消费者可以从一个队列接收数据,每个队列都有一个名称。

consumer

consumer(消费者),是一个等待接收消息的程序,从消息队列接收数据的程序代表消费者,用首字母C来表示。

生产者、消费者与MQ不必运行在同一个主机上,可以分布在不同的主机上。由于RabbitMQ支持各种编程语言,所以生产者和消费者可以使用不同的编程语言进行实现,消息中间件使异构系统的实现变得更加简化,并且一个程序可以同时既是生产者又是消费者。

简单使用

创建一个生产者项目用来向消息队列发送数据,创建一个消费者项目用来从消息队列里接收数据,消费者需要注册到指定到MQ队列中,如下图所示:

在开始之前,需要先从RabbitMQ到官网下载RabbitMQ服务进行安装并启动服务,并且根据自己所用到开发语言下载操作RabbitMQ所需要的驱动程序,由于我使用Gradle构建项目,所以不必自己下载和管理jar包,只需要引入以下依赖即可:

1
compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.6.0'

生产者

导入需要用到的类:

1
2
3
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
  1. ConnectionFactory 连接工厂类,提供了各种属性用来设置连接到MQ的参数,从工厂类获取连接对象。
  2. Connection 连接对象接口类,通过connectionFactory.newConnection()获取到一个连接对象。
  3. Channel 通道,通过connection.createChannel()创建通道对象,Channel提供了操作MQ的各种API。

定义队列名称

1
private static final String QUEUE_NAME = "hello";

要将消息发送到具体的队列上,需要为每个队列指定名称用来区分。

创建连接发送数据

1
2
3
4
5
6
7
8
9
10
11
12
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
try(Connection connection = connectionFactory.newConnection()){
Channel channel = connection.createChannel();
// 声明队列(队列名称,非持久化,非独占,不自动删除队列,空参数)
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 定义消息
String message = "Hello,World!";
// 发送消息(exchange,routingKey,其他属性,消息正文)
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println(" [x] Send '" + message + "'");
}

由于RabbitMQ服务安装在本机,所以代码里只需要设置connectionFactory.setHost("localhost");,其他属性会使用默认值,RabbitMQ默认通信端口5672,用户guest,密码guest,如果安装了RabbitMQ管理插件,可通过15672端口进入后台管理页面。
如果修改了RabbitMQ服务的配置参数值,就需要在程序中设置对应的属性值,比如:

1
2
3
4
5
connectionFactory.setPort(56723);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("********");
connectionFactory.setVirtualHost("app_message");
// ...

在使用Channel发送数据前,需要通过channel.queueDeclare()方法为该Channel定义队列声明。queueDeclare()方法定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 声明队列
*
* @param queue 队列名称
* @param durable true表示持久队列,队列将会在MQ服务重启后继续存在
* @param exclusive true表示队列独占(仅限于该链接)
* @param autoDelete true表示自动删除队列(服务器不再使用时删除该队列)
* @param arguments 队列的其他属性
* @return 队列成功声明返回 Queue.DeclareOk
* @throws java.io.IOException 如果有错误抛出IOException
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;

注意:队列不允许重复定义
一个队列用队列名称进行标识,一旦队列初始化声明后就不允许定义成其他属性值,比如:

1
2
3
4
5
// 程序A:声明队列(队列名称,非持久化,非独占,不自动删除队列,空参数)
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

// 程序B:声明队列(队列名称,持久化,独占,自动删除队列,空参数)
channel.queueDeclare(QUEUE_NAME,true,true,true,null);

由于程序A已经定义并且初始化了队列,程序B重新初始化不一样的属性值会导致异常。

如果要重新声明队列属性,需要将已经声明的队列删除,可以进入管理后台进行删除操作。

在使用Channel发送数据时,通过channel.basicPublish()方法发送数据。根据队列的不同性质,传入的参数也不同。basicPublish()方法定义如下:

1
2
3
4
5
6
7
8
9
10
/**
* 发布消息
*
* @param exchange 消息要发送到哪个exchange上
* @param routingKey 路由key
* @param props 其他属性
* @param body 消息正文
* @throws java.io.IOException 如果有错误抛出IOException
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

消费者

导入需要用到的类:

1
2
3
4
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

定义队列名称

1
private static final String QUEUE_NAME = "hello";

队列名称要实际存在,要和对应的生产者发布数据的队列名称一致,否则无法接受生产者所发送的数据。

创建连接接收数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
try(Connection connection = connectionFactory.newConnection()){
Channel channel = connection.createChannel();
// 声明队列(队列名称,非持久化,非独占,不自动删除队列,空参数)
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 消费者接收到队列投递的回调(consumerTag 服务端生成的消费者标识,delivery投递)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 获取消息内容并输出
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
// 开启消费者监听
channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});
}

消费者和生产者一样,需要声明队列。因为消费者程序的启动可能早于生产者,声明队列可以初始化队列信息,确保队列存在。消费者通过监听指定队列进行数据消费。通过basicConsume()方法开启一个消费者监听,监听回调使用DeliverCallback进行处理。
DeliverCallback()方法定义如下:

1
2
3
4
5
6
7
8
9
10
/**
* 服务端生成的消费者标识,启动非本地、非独占的消费者
* @param queue 队列名称
* @param autoAck 设置为true消费者接收消息后会自动发送一个ACK消息给服务器确认,false不会自动发送确认消息给服务器
* @param deliverCallback 消息传递时回调
* @param cancelCallback 取消消费者时的回调
* @return 服务端生成的消费者标识
* @throws IOException 如果有错误抛出IOException
*/
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

无论是生产者还是消费者,使用完消息队列后,都需要先关闭通道channel.close(),然后再关闭连接对象connection.close()。上面的代码示例使用了try(){}代码块对资源进行自动回收。

Work queues

Work queues 工作队列也称为任务队列,主要思想是避免立即执行资源密集型的任务。将需要执行的任务放到消息队列中,等待资源空闲时消费者从消息队列中取出消息并逐个执行。

工作队列适用于很多场景,一般的使用方式也都是采用任务队列。
一个生产者对应多个消费者,并且消费者要处理耗时的任务,所以这里的消息机制需要做一些修改

自动确认消息改为手动确认

1
2
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,consumerTag -> {});

一个生产者对多个消费者,生产者必须公平的保证一条消息发送给每一个消费者,所以要把消费者接收自动确认 autoAck改成false。

手动确认消息可以防止某个生产者发送Ack导致MQ中的消息被删除,但是不能保证MQ不丢消息,比如突然断电MQ重启,
为了让MQ能够持久化消息,在初始化队列的时候需要指定队列durable属性值为true,开启消息持久化。

1
2
boolean durable = true;
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);

队列开启持久化后仍然不一定能够保证在极端情况下不丢失数据,整个MQ从消息的发送,中转,再到接收都需要严格把控。所以如果要彻底防止丢失消息,需要在生产者发送消息时做发布确认。

Publish/Subscribe

Publish/Subscribe 发布订阅模式,将消息广播发送给所有消费者。
这里以日志系统为例,假设生产者程序将消息发送给两个消费者,其中一个消费者负责将日志输出到控制台,另外一个消费者负责将日志写入到磁盘。在发布订阅模式中有个很重要的概念 Exchanges

Exchanges

生产者时发生消息的用户应用程序。
消息队列是存储消息的缓存区。
消费者是接收消息的用户应用程序。
在RabbitMQ的消息传递模型中,他的核心思想是生产者永远不会将任何消息直接发送到队列上,甚至不知道消息是否被传递到任何队列。

生产者向Exchanges发送消息。 Exchanges负责生产者消息的接收,将消息推送到队列。
Exchanges 通过exchange type指定的类型明确要如何处理消息,比如附加到特定队列或者所有队列,或者将消息丢弃。

命令行执行如下命令,查看目前RabbitMQ中的Exchanges

1
rabbitmqctl list_exchanges

也可以通过访问RabbitMQ的网页管理后台,默认地址http://localhost:15672/#/exchanges进行查看

消息发布者通过 exchange发送消息示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class PublishSample {

// 自定义 exchange
private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) {

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");

try(Connection connection = connectionFactory.newConnection()){

Channel channel = connection.createChannel();

String message = "Hello,World!";

// 指定exchange的类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

// 向指定exchange发送消息
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));

System.out.println(" [x] Send '" + message + "'");

} catch (TimeoutException | IOException e) {
e.printStackTrace();
}

}
}

通过对比简单使用MQ的代码发现,生成者不直接发送消息到队列,所以不需要声明队列语句:

1
2
// 声明队列(队列名称,非持久化,非独占,不自动删除队列,空参数)
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

使用exchange的方式发送消息到消息队列,需要指定exchange和对应的类型

1
2
// 指定exchange的类型为fanout
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);

通过查看枚举类可知,exchange可指定的类型有:

1
DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");

当类型为fanout时,将忽略routingKey参数
向exchange发送消息:

1
2
// 向指定exchange发送消息,由于是fanout类型,所以自动忽略第二个参数 routingKey的值
channel.basicPublish(EXCHANGE,"",null,message.getBytes());

对比本章的基础使用部分:

1
2
// 发送消息(exchange,路由key,路由其他属性,消息正文)
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

基础使用部分的代码示例中,直接向MQ队列名称发送消息,其中exchange的参数为一个空字符串""表示默认或者匿名交换,消息将通过第二个参数routingKey指定的队列名称路由到队列(如果队列存在的话)。

订阅者代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class SubscribeSample {

// 自定义 exchange
private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) {

ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");

try(Connection connection = connectionFactory.newConnection()){

Channel channel = connection.createChannel();

// 指定exchange的类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

// 获取队列,得到一个随机名称(非持久化的自动删除队列)
String queueName = channel.queueDeclare().getQueue();
System.out.println(queueName);

// 绑定消息队列和exchange
channel.queueBind(queueName,EXCHANGE_NAME,"");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, message) -> {
String messages = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + messages +"'");
};

channel.basicConsume(queueName,true,deliverCallback, consumerTag -> {});

} catch (TimeoutException | IOException e) {
e.printStackTrace();
}

}
}

生产者通过发送消息到交换机exchange
消费者从channel中获取一个随机的非持久化自动删除队列:

1
2
// 获取队列,得到一个随机名称(非持久化的自动删除队列)
String queueName = channel.queueDeclare().getQueue();

并且将该队列绑定到指定的exchange

1
2
// 绑定消息队列和exchange
channel.queueBind(queueName,EXCHANGE_NAME,"");

关系图如下:

注意:消费者必须先绑定到exchange上,然后生产者再发送消息,否则exchange无法将消息路由到任何队列。
消费者接收消息后自动确认消息并且自动删除队列。

Routing

Routing 路由,进行有选择的接收消息,可以订阅某个消息队列的子集。

生产者代码:

1
2
3
4
5
6
7
8
9
10
11
// 指定exchange的类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

// 向指定exchange发送消息,路由key为 info
channel.basicPublish(EXCHANGE_NAME,"info",null,"info message".getBytes(StandardCharsets.UTF_8));

// 向指定exchange发送消息,路由key为 warning
channel.basicPublish(EXCHANGE_NAME,"warning",null,"warning message".getBytes(StandardCharsets.UTF_8));

// 向指定exchange发送消息,路由key为 error
channel.basicPublish(EXCHANGE_NAME,"info",null,"error message".getBytes(StandardCharsets.UTF_8));

消费者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 指定exchange的类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

// 获取队列,得到一个随机名称(非持久化的自动删除队列)
String queueName = channel.queueDeclare().getQueue();

// 绑定消息队列、exchange、路由key为error
channel.queueBind(queueName,EXCHANGE_NAME,"error");

// 绑定消息队列、exchange、路由key为info
// channel.queueBind(queueName,EXCHANGE_NAME,"info");
// ...

// 接收指定exchange中routingKey为error的消息并处理

生产者可以绑定多个routingKey,发送到类型为direct的exchange
消费者可以为一个队列绑定多个routingKey,关系图如下所示:

Topics

Topics 主题,基于主题交换的策略接收消息。

基于Topics的方式可以让消息队列的使用更加灵活,为消息的发送和订阅提供更加细粒度的控制
首先需要指定exchange的类型为topic,在生产者发送消息时设置 routingKey为一个符号表达式。
routingKey的格式必须是由点.分割的单词列表,单词可以是任何内容,通常用与消息相关的单词来表示
比如:sys.info.log*.info.loglazy.#*.*.info.#
* 一个字符占位符
# 0个或者多个字符的占位符
生产者代码:

1
2
3
4
5
6
7
8
// 指定exchange的类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

// 向指定exchange发送消息, routingKey 为 s.info.l, 订阅的消费者需要指定主题routingKey为 *.info.* 或者 #.info.#
channel.basicPublish(EXCHANGE_NAME,"s.info.l",null,"info message".getBytes(StandardCharsets.UTF_8));

// 向指定exchange发送消息, routingKey 为 lazy.test.one, 订阅的消费者需要指定主题routingKey为 lazy.#
channel.basicPublish(EXCHANGE_NAME,"lazy.test.one",null,"lazy message".getBytes(StandardCharsets.UTF_8));

消费者代码:

1
2
3
4
5
6
7
8
9
// 指定exchange的类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

// 获取队列,得到一个随机名称(非持久化的自动删除队列)
String queueName = channel.queueDeclare().getQueue();

// 绑定消息队列和exchange
channel.queueBind(queueName,EXCHANGE_NAME,"*.info.*");
// 消费者将会收到生产者发送的 info message 消息内容

RPC

RPC 远程过程调用,RabbitMQ也支持这种同步调用的特性,调用之后等待调用结果返回。

客户端通过RabbitMQ的RPC调用服务端,等待服务端返回结果,示例程序如下:

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class RPCClient implements AutoCloseable {

private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";

public RPCClient() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
}

public static void main(String[] args) {
try(RPCClient fibonacciRpc = new RPCClient()){
for (int i = 0; i < 32; i++){
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str +")");
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (Exception e) {
e.printStackTrace();
}
}

public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();

String replayQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replayQueueName)
.build();
channel.basicPublish("",requestQueueName,basicProperties,message.getBytes(StandardCharsets.UTF_8));

final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

String cTag = channel.basicConsume(replayQueueName,true,(consumeTag, delivery)->{

if (delivery.getProperties().getCorrelationId().equals(corrId)){
response.offer(new String(delivery.getBody(),StandardCharsets.UTF_8));
}
},consumerTag -> {

});
String result = response.take();
channel.basicCancel(cTag);

return result;
}

@Override
public void close() throws Exception {
connection.close();
}
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class RPCServer {

private static final String RPC_QUEUE_NAME = "rpc_queue";

private static int fib(int n){
if (n == 0) return n;
if (n == 1) return n;
return fib(n-1) + fib(n-2);
}

public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");

try(Connection connection = connectionFactory.newConnection()){
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
channel.queuePurge(RPC_QUEUE_NAME);

channel.basicQos(1);

System.out.println(" [x] Awaiting PRC request");

Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, message) -> {
AMQP.BasicProperties replyProperties =new AMQP.BasicProperties
.Builder()
.correlationId(message.getProperties().getCorrelationId())
.build();

String response = "";

try{
String theMessage = new String(message.getBody(), StandardCharsets.UTF_8);
int n = Integer.parseInt(theMessage);
System.out.println(" [.] fib(" + n + ")");
response += fib(n);
}catch (Exception e){
System.out.println(" [.] " + e.toString());
}finally {
channel.basicPublish("",message.getProperties().getReplyTo(),replyProperties,response.getBytes(StandardCharsets.UTF_8));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
synchronized (monitor){
monitor.notify();
}
}
};
channel.basicConsume(RPC_QUEUE_NAME,false,deliverCallback,(consumeTag->{ }));
while (true){
synchronized (monitor){
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (TimeoutException | IOException e) {
e.printStackTrace();
}
}
}

注:本文章为RabbitMQ的学习笔记,文章中的图片和部分代码都来自RabbitMQ官网,官方文档地址:
https://www.rabbitmq.com/getstarted.html

0%