MQ
基础
同步调用

同步调用的优势是什么?
异步调用
异步调用方式其实就是基于消息通知的方式,一般包含三个角色
- 消息发送者:投递消息的人,就是原来的调用方
- 消息代理:管理、暂存、转发消息,可以把它理解成微信服务器
- 消息接收者:接收和处理消息的人,就是原来的服务提供方

异步调用的优势:
- 耦合度低,拓展性强
- 异步调用,无需等待,性能好
- 故障隔离,下游服务故障不影响上游业务
- 缓存消息,流量削峰填谷
异步调用的问题
- 不能立即得到调用结果,时效性差
- 不确定下游业务执行是否成功
- 业务安全依赖于 Broker 的可靠性
MQ 技术选型
Message Queue,字面看就是存放消息的队列。也就是异步调用中的 Broker
|
RabbitMQ |
ActiveMQ |
RocketMQ |
Kafka |
公司/社区 |
Rabbit |
Apache |
阿里 |
Apache |
开发语言 |
Erlang |
Java |
Java |
Scala&Java |
协议支持 |
AMQP,XMPP,SMTP,STOMP |
OpenWire,STOMP,REST,XMPP,AMQP |
自定义协议 |
自定义协议 |
可用性 |
高 |
一般 |
高 |
高 |
单机吞吐量 |
一般 |
差 |
高 |
非常高 |
消息延迟 |
微秒级 |
毫秒级 |
毫秒级 |
毫秒以内 |
消息可靠性 |
高 |
一般 |
高 |
一般 |
安装
官网地址:https://www.rabbitmq.com/
基于Docker来安装RabbitMQ,使用下面的命令即可:
1 2 3 4 5 6 7 8 9 10 11
| docker run \ -e RABBITMQ_DEFAULT_USER=hanyang \ -e RABBITMQ_DEFAULT_PASS=123321 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ --network hmall \ -d \ rabbitmq:3.8-management
|
可以看到在安装命令中有两个映射的端口:
- 15672:RabbitMQ提供的管理控制台的端口
- 5672:RabbitMQ的消息发送处理接口
整体结构

SpringAMQP
官网地址:https://spring.io/projects/spring-amqp
消息发送
首先配置 MQ 地址,在 publisher
服务的 application.yml
中添加配置:
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.150.101 port: 5672 virtual-host: /hmall username: hmall password: 123
|
然后在 publisher
服务中编写测试类 SpringAmqpTest
,并利用 RabbitTemplate
实现消息发送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest public class SpringAmqpTest {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void testSimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); } }
|
消息接收
首先配置 MQ 地址,在 consumer
服务的 application.yml
中添加配置:
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.150.101 port: 5672 virtual-host: /hmall username: hmall password: 123
|
然后在 consumer
服务的 com.hanyang.consumer.listener
包中新建一个类 SpringRabbitListener
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class SpringRabbitListener { @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者接收到消息:【" + msg + "】"); } }
|

SpringAMQP 如何收发消息?
- 引入 spring-boot-starter-amqp 依赖
- 配置 rabbitmq 服务端信息
- 利用 RabbitTemplate 发送消息
- 利用 @RabbitListener 注解声明要监听的队列,监听消息
work 模型
Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
消息发送
在 publisher 服务中的 SpringAmqpTest 类中添加一个测试方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@Test public void testWorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, message_"; for (int i = 0; i < 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
|
消息接收
要模拟多个消费者绑定同一个队列,在 consumer 服务的 SpringRabbitListener 中添加 2 个新的方法:
1 2 3 4 5 6 7 8 9 10 11
| @RabbitListener(queues = "work.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); }
@RabbitListener(queues = "work.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }
|
注意到这两消费者,都设置了 Thead.sleep
,模拟任务耗时:
- 消费者1 sleep了 20 毫秒,相当于每秒钟处理 50 个消息
- 消费者2 sleep了 200 毫秒,相当于每秒处理 5 个消息
在 spring 中有一个简单的配置,可以解决这个问题。修改 consumer 服务的 application.yml 文件,添加配置:
1 2 3 4 5
| spring: rabbitmq: listener: simple: prefetch: 1
|
work 模型的使用
- 多个消费者绑定到一个队列,可以加快消息处理速度
- 同一条消息只会被一个消费者处理
- 通过设置 prefetch 来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
交换机类型
真正生产环境都会经过 exchange 来发送消息,而不是直接发送到队列,交换机的类型有以下三种:
- Fanout:广播
- Direct:定向
- Topic:话题

Fanout 交换机

消息发送
在 publisher 服务的 SpringAmqpTest 类中添加测试方法:
1 2 3 4 5 6 7 8
| @Test public void testFanoutExchange() { String exchangeName = "hmall.fanout"; String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); }
|
消息接收
在 consumer 服务的 SpringRabbitListener 中添加两个方法,作为消费者:
1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); }
@RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); }
|
交换机的作用是什么?
- 接收 publisher 发送的消息
- 将消息按照规则路由到与之绑定的队列
- FanoutExchange的会将消息路由到每个绑定的队列
Direct 交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的 Queue,因此称为定向路由
- 每一个 Queue 都与 Exchange 设置一个 BindingKey
- 发布者发送消息时,指定消息的 RoutingKey
- Exchange 将消息路由到 BindingKey 与消息 RoutingKey 一致的队列

消息接收
在 consumer 服务的 SpringRabbitListener 中添加方法:
1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = "direct.queue1") public void listenDirectQueue1(String msg) { System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); }
@RabbitListener(queues = "direct.queue2") public void listenDirectQueue2(String msg) { System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); }
|
消息发送
在 publisher 服务的 SpringAmqpTest 类中添加测试方法:
1 2 3 4 5 6 7 8 9
| @Test public void testSendDirectExchange() { String exchangeName = "hmall.direct"; String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!"; rabbitTemplate.convertAndSend(exchangeName, "red", message); }
|
Topic 交换机
TopicExchange 与 DirectExchange 类似,区别于 routingKey 可以是多个单词的列表,并且以 . 分割
Queue 与 Exchange 指定 BindingKey 时可以使用通配符:

描述下Direct交换机与Fanout交换机的差异?
- Topic 交换机接收的消息 RoutingKey 可以是多个单词,以 . 分割
- Topic 交换机与队列绑定时的 bindingKey 可以指定通配符
- #:代表 0 个或多个词
- *:代表 1 个词
声明队列和交换机
SpringAMQP 提供了几个类,用来声明队列、交换机及其绑定关系
- Queue:用于声明队列,可以用工厂类 QueueBuilder 构建
- Exchange:用于声明交换机,可以用工厂类 ExchangeBuilder 构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类 BindingBuilder 构建
Fanout 示例
在 consumer 中创建一个类,声明队列和交换机:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class FanoutConfig {
@Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("hmall.fanout"); }
@Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); }
@Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); }
@Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); }
@Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
|
direct 示例
direct 模式由于要绑定多个 Key,会非常麻烦,每一个 Key 都要编写一个 binding:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class DirectConfig {
@Bean public DirectExchange directExchange(){ return ExchangeBuilder.directExchange("hmall.direct").build(); }
@Bean public Queue directQueue1(){ return new Queue("direct.queue1"); }
@Bean public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){ return BindingBuilder.bind(directQueue1).to(directExchange).with("red"); }
@Bean public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){ return BindingBuilder.bind(directQueue1).to(directExchange).with("blue"); }
@Bean public Queue directQueue2(){ return new Queue("direct.queue2"); }
@Bean public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){ return BindingBuilder.bind(directQueue2).to(directExchange).with("red"); }
@Bean public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){ return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow"); } }
|
基于注解声明
基于 @Bean 的方式声明队列和交换机比较麻烦,Spring 还提供了基于注解方式来声明。
例如,我们同样声明 Direct 模式的交换机和队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】"); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】"); }
|
Topic 模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】"); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】"); }
|
声明队列、交换机、绑定关系的 Bean 是什么?
- Queue
- FanoutExchange、DirectExchange、TopicExchange
- Binding
基于 @RabbitListener 注解声明队列和交换机有哪些常见注解
消息转换器
Spring 的消息发送代码接收的消息体是一个 Object,而在数据传输时,它会把你发送的消息序列化为字节发送给 MQ,接收消息的时候,还会把字节反序列化为 Java 对象。
只不过,默认情况下 Spring 采用的序列化方式是 JDK 序列化。众所周知,JDK 序列化存在下列问题:
默认转换器
1)创建测试队列
首先,在 consumer 服务中声明一个新的配置类:
1 2 3 4 5 6 7 8 9 10 11 12
| import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class MessageConfig {
@Bean public Queue objectQueue() { return new Queue("object.queue"); } }
|
2)发送消息
在 publisher 模块的 SpringAmqpTest 中新增一个消息发送的代码,发送一个 Map 对象:
1 2 3 4 5 6 7 8 9
| @Test public void testSendMap() throws InterruptedException { Map<String,Object> msg = new HashMap<>(); msg.put("name", "小明"); msg.put("age", 21); rabbitTemplate.convertAndSend("object.queue", msg); }
|
配置 JSON 转换器
在 publisher
和 consumer
两个服务中都引入依赖:
1 2 3 4 5
| <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
|
注意,如果项目中引入了 spring-boot-starter-web
依赖,则无需再次引入 Jackson
依赖。
配置消息转换器,在 publisher
和 consumer
两个服务的启动类中添加一个Bean即可:
1 2 3 4 5 6 7 8
| @Bean public MessageConverter messageConverter(){ Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); jackson2JsonMessageConverter.setCreateMessageIds(true); return jackson2JsonMessageConverter; }
|
消息转换器中添加的 messageId 可以便于将来做幂等性判断。
消费者接收消息
在 consumer 服务中定义一个新的消费者,publisher 是用 Map 发送,那么消费者也一定要用 Map 接收,格式如下:
1 2 3 4
| @RabbitListener(queues = "object.queue") public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException { System.out.println("消费者接收到object.queue消息:【" + msg + "】"); }
|
高级
生产者可靠性
生产者重连
有的时候由于网络波动,可能会出现客户端连接 MQ 失败的情况。通过配置可以开启连接失败后的重连机制
修改 publisher
模块的 application.yaml
文件,添加下面的内容:
1 2 3 4 5 6 7 8 9
| spring: rabbitmq: connection-timeout: 1s template: retry: enabled: true initial-interval: 1000ms multiplier: 1 max-attempts: 3
|
生产者确认
RabbitMQ有 Publisher Confirm 和 Publisher Return 两种确认机制。开启确认机制后,在 MQ 成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
- 消息投递到 MQ,但是路由失败。此时通过 Publisher Return 返回异常原因,然后返回 ACK,告知投递成功
- 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知投递成功
- 持久消息投递到了 MQ,并且入队完成持久化,返回 ACK ,告知投递成功
- 其它情况都会返回 NACK,告知投递失败

如何处理生产者的确认消息?
- 生产者确认需要额外的网络和系统资源开销,尽量不要使用
- 如果一定要使用,无需开启 Publisher-Return 机制,因为一般路由失败是自己业务问题
- 对于 nack 消息可以有限次数重试,依然失败则记录异常消息
MQ 可靠性
数据持久化
在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:
- 一旦 MQ 宕机,内存中的消息会丢失
- 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发 MQ 阻塞
RabbitMQ 实现数据持久化包括三个方面:
Lazy Queue
惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
RabbitMQ 如何保证消息的可靠性
-
首先通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MQ 重启消息依然存在
-
RabbitMQ 在 3.6 版本引入了 LazyQueue,并且在 3.12 版本后会称为队列的默认模式。LazyQueue 会将所有消息都持久化
-
开启持久化和生产者确认时,RabbitMQ 只有在消息持久化完成后才会给生产者返回 ACK 回执
消费者可靠性
消费者确认
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP 已经实现了消息确认功能,并允许通过配置文件选择 ACK 处理方式,有三种方式:
-
none:不处理,即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用
-
manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject,存在业务入侵,但更灵活
-
auto:自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack
当业务出现异常时,根据异常判断返回不同结果:
- 如果是业务异常,会自动返回 nack
- 如果是消息处理或校验异常,自动返回 reject
1 2 3 4 5 6
| spring: rabbitmq: listener: simple: prefetch: 1 acknowledge-mode: none # none, 关闭 ack; manual, 手动 ack; auto: 自动ack
|
消费失败处理
当消费者出现异常后,消息会不断 requeue (重新入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue,无限循环,导致 mq 的消息处理飙升,带来不必要的压力
修改consumer服务的application.yml文件,添加内容:
1 2 3 4 5 6 7 8 9 10
| spring: rabbitmq: listener: simple: retry: enabled: true initial-interval: 1000ms multiplier: 1 max-attempts: 3 stateless: true
|
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer 接口来处理,它包含三个不同的实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接 reject
,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer
:重试耗尽后,返回 nack
,消息重新入队
RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
消费者如何保证消息一定被消费?
- 开启消费者确认机制为 auto,由 spring 确认消息处理成功后返回 ack,异常时返回 nack
- 开启消费者失败重试机制,并设置 MessageRecover,多次重试失败后将消息投递到异常交换机,交由人工处理
业务幂等性
如何保证支付服务与交易服务之间的订单状态一致性?
- 首先,支付服务会在用户支付成功以后利用 MQ 消息通知交易服务,完成订单状态同步
- 其次,为了保证 MQ 消息的可靠性,采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了 MQ 的持久化,避免因服务宕机导致消息丢失
- 最后,还在交易服务更新订单状态时做了业务幂等性判断,避免因消息重复消费导致订单状态异常
如果交易服务消息处理失败,有什么兜底方案?
- 可以在交易服务设置定时任务,定期查询订单支付状态。这样即便 MQ 通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性
延迟消息
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后收到消息
延迟任务:设置在一定时间之后才执行的任务

死信交换机
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
- 消费者使用
basic.reject
或 basic.nack
声明消费失败,并且消息的 requeue
参数设置为false
- 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过 dead-letter-exchange
属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange,简称 DLX)
延迟消息插件
Scheduling Messages with RabbitMQ | RabbitMQ - Blog
下载地址:GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
取消超时订单
设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:
- 如果并发较高,30 分钟可能堆积消息过多,对 MQ 压力很大
- 大多数订单在下单后 1 分钟内就会支付,但是却需要在 MQ 内等待 30 分钟,浪费资源
