RabbitMQ

MQ

基础

同步调用

  • 拓展性差
  • 性能下降
  • 级联失败

同步调用的优势是什么?

  • 时效性强,等待结果后才返回

异步调用

异步调用方式其实就是基于消息通知的方式,一般包含三个角色

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息代理:管理、暂存、转发消息,可以把它理解成微信服务器
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

异步调用的优势:

  • 耦合度低,拓展性强
  • 异步调用,无需等待,性能好
  • 故障隔离,下游服务故障不影响上游业务
  • 缓存消息,流量削峰填谷

异步调用的问题

  • 不能立即得到调用结果,时效性差
  • 不确定下游业务执行是否成功
  • 业务安全依赖于 Broker 的可靠性

MQ 技术选型

Message Queue,字面看就是存放消息的队列。也就是异步调用中的 Broker

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

安装

官网地址:https://www.rabbitmq.com/

基于Docker来安装RabbitMQ,使用下面的命令即可:

1
2
3
4
5
6
7
8
9
10
11
docker run \
-e RABBITMQ_DEFAULT_USER=hanyang \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall \
-d \
rabbitmq:3.8-management

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

整体结构

  • virtual-host:虚拟主机,起到数据隔离的作用

  • publisher:消息发送者

  • consumer:消息的消费者

  • queue:队列,存储消息

  • exchange:交换机,负责路由消息

SpringAMQP

官网地址:https://spring.io/projects/spring-amqp

消息发送

首先配置 MQ 地址,在 publisher 服务的 application.yml 中添加配置:

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码

然后在 publisher 服务中编写测试类 SpringAmqpTest,并利用 RabbitTemplate 实现消息发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class SpringAmqpTest {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}

消息接收

首先配置 MQ 地址,在 consumer 服务的 application.yml 中添加配置:

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码

然后在 consumer 服务的 com.hanyang.consumer.listener 包中新建一个类 SpringRabbitListener,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}

SpringAMQP 如何收发消息?

  1. 引入 spring-boot-starter-amqp 依赖
  2. 配置 rabbitmq 服务端信息
  3. 利用 RabbitTemplate 发送消息
  4. 利用 @RabbitListener 注解声明要监听的队列,监听消息

work 模型

Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

消息发送

在 publisher 服务中的 SpringAmqpTest 类中添加一个测试方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
消息接收

要模拟多个消费者绑定同一个队列,在 consumer 服务的 SpringRabbitListener 中添加 2 个新的方法:

1
2
3
4
5
6
7
8
9
10
11
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}

@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}

注意到这两消费者,都设置了 Thead.sleep,模拟任务耗时:

  • 消费者1 sleep了 20 毫秒,相当于每秒钟处理 50 个消息
  • 消费者2 sleep了 200 毫秒,相当于每秒处理 5 个消息

在 spring 中有一个简单的配置,可以解决这个问题。修改 consumer 服务的 application.yml 文件,添加配置:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

work 模型的使用

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置 prefetch 来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

交换机类型

真正生产环境都会经过 exchange 来发送消息,而不是直接发送到队列,交换机的类型有以下三种:

  • Fanout:广播
  • Direct:定向
  • Topic:话题

Fanout 交换机

消息发送

在 publisher 服务的 SpringAmqpTest 类中添加测试方法:

1
2
3
4
5
6
7
8
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消息接收

在 consumer 服务的 SpringRabbitListener 中添加两个方法,作为消费者:

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

交换机的作用是什么?

  • 接收 publisher 发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • FanoutExchange的会将消息路由到每个绑定的队列

Direct 交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的 Queue,因此称为定向路由

  • 每一个 Queue 都与 Exchange 设置一个 BindingKey
  • 发布者发送消息时,指定消息的 RoutingKey
  • Exchange 将消息路由到 BindingKey 与消息 RoutingKey 一致的队列

消息接收

在 consumer 服务的 SpringRabbitListener 中添加方法:

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
消息发送

在 publisher 服务的 SpringAmqpTest 类中添加测试方法:

1
2
3
4
5
6
7
8
9
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "hmall.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

Topic 交换机

TopicExchange 与 DirectExchange 类似,区别于 routingKey 可以是多个单词的列表,并且以 . 分割

Queue 与 Exchange 指定 BindingKey 时可以使用通配符:

  • #:代指 0 个 或多个单词
  • *:代指一个单词

描述下Direct交换机与Fanout交换机的差异?

  • Topic 交换机接收的消息 RoutingKey 可以是多个单词,以 . 分割
  • Topic 交换机与队列绑定时的 bindingKey 可以指定通配符
  • #:代表 0 个或多个词
  • *:代表 1 个词

声明队列和交换机

SpringAMQP 提供了几个类,用来声明队列、交换机及其绑定关系

  • Queue:用于声明队列,可以用工厂类 QueueBuilder 构建
  • Exchange:用于声明交换机,可以用工厂类 ExchangeBuilder 构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类 BindingBuilder 构建
Fanout 示例

在 consumer 中创建一个类,声明队列和交换机:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hmall.fanout");
}

/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}

/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}

/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}

/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
direct 示例

direct 模式由于要绑定多个 Key,会非常麻烦,每一个 Key 都要编写一个 binding:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

/**
* 声明交换机
* @return Direct类型交换机
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("hmall.direct").build();
}

/**
* 第1个队列
*/
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}

/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}

/**
* 第2个队列
*/
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}

/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}

基于注解声明

基于 @Bean 的方式声明队列和交换机比较麻烦,Spring 还提供了基于注解方式来声明。

例如,我们同样声明 Direct 模式的交换机和队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

Topic 模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}

声明队列、交换机、绑定关系的 Bean 是什么?

  • Queue
  • FanoutExchange、DirectExchange、TopicExchange
  • Binding

基于 @RabbitListener 注解声明队列和交换机有哪些常见注解

  • @Queue
  • @Exchange

消息转换器

Spring 的消息发送代码接收的消息体是一个 Object,而在数据传输时,它会把你发送的消息序列化为字节发送给 MQ,接收消息的时候,还会把字节反序列化为 Java 对象。
只不过,默认情况下 Spring 采用的序列化方式是 JDK 序列化。众所周知,JDK 序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差
默认转换器

1)创建测试队列
首先,在 consumer 服务中声明一个新的配置类:

1
2
3
4
5
6
7
8
9
10
11
12
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageConfig {

@Bean
public Queue objectQueue() {
return new Queue("object.queue");
}
}

2)发送消息
在 publisher 模块的 SpringAmqpTest 中新增一个消息发送的代码,发送一个 Map 对象:

1
2
3
4
5
6
7
8
9
@Test
public void testSendMap() throws InterruptedException {
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "小明");
msg.put("age", 21);
// 发送消息
rabbitTemplate.convertAndSend("object.queue", msg);
}
配置 JSON 转换器

publisherconsumer 两个服务中都引入依赖:

1
2
3
4
5
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>

注意,如果项目中引入了 spring-boot-starter-web 依赖,则无需再次引入 Jackson 依赖。

配置消息转换器,在 publisherconsumer 两个服务的启动类中添加一个Bean即可:

1
2
3
4
5
6
7
8
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}

消息转换器中添加的 messageId 可以便于将来做幂等性判断。

消费者接收消息

在 consumer 服务中定义一个新的消费者,publisher 是用 Map 发送,那么消费者也一定要用 Map 接收,格式如下:

1
2
3
4
@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}

高级

生产者可靠性

生产者重连

有的时候由于网络波动,可能会出现客户端连接 MQ 失败的情况。通过配置可以开启连接失败后的重连机制

修改 publisher 模块的 application.yaml 文件,添加下面的内容:

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数

生产者确认

RabbitMQ有 Publisher ConfirmPublisher Return 两种确认机制。开启确认机制后,在 MQ 成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:

  • 消息投递到 MQ,但是路由失败。此时通过 Publisher Return 返回异常原因,然后返回 ACK,告知投递成功
  • 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知投递成功
  • 持久消息投递到了 MQ,并且入队完成持久化,返回 ACK ,告知投递成功
  • 其它情况都会返回 NACK,告知投递失败

如何处理生产者的确认消息?

  • 生产者确认需要额外的网络和系统资源开销,尽量不要使用
  • 如果一定要使用,无需开启 Publisher-Return 机制,因为一般路由失败是自己业务问题
  • 对于 nack 消息可以有限次数重试,依然失败则记录异常消息

MQ 可靠性

数据持久化

在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

  • 一旦 MQ 宕机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发 MQ 阻塞

RabbitMQ 实现数据持久化包括三个方面:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

Lazy Queue

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

RabbitMQ 如何保证消息的可靠性

  • 首先通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MQ 重启消息依然存在

  • RabbitMQ 在 3.6 版本引入了 LazyQueue,并且在 3.12 版本后会称为队列的默认模式。LazyQueue 会将所有消息都持久化

  • 开启持久化和生产者确认时,RabbitMQ 只有在消息持久化完成后才会给生产者返回 ACK 回执

消费者可靠性

消费者确认

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP 已经实现了消息确认功能,并允许通过配置文件选择 ACK 处理方式,有三种方式:

  • none:不处理,即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack

    当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回 nack
    • 如果是消息处理或校验异常,自动返回 reject
    1
    2
    3
    4
    5
    6
    spring:
    rabbitmq:
    listener:
    simple:
    prefetch: 1
    acknowledge-mode: none # none, 关闭 ack; manual, 手动 ack; auto: 自动ack

消费失败处理

当消费者出现异常后,消息会不断 requeue (重新入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue,无限循环,导致 mq 的消息处理飙升,带来不必要的压力

修改consumer服务的application.yml文件,添加内容:

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer 接口来处理,它包含三个不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

消费者如何保证消息一定被消费?

  • 开启消费者确认机制为 auto,由 spring 确认消息处理成功后返回 ack,异常时返回 nack
  • 开启消费者失败重试机制,并设置 MessageRecover,多次重试失败后将消息投递到异常交换机,交由人工处理

业务幂等性

如何保证支付服务与交易服务之间的订单状态一致性?

  • 首先,支付服务会在用户支付成功以后利用 MQ 消息通知交易服务,完成订单状态同步
  • 其次,为了保证 MQ 消息的可靠性,采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了 MQ 的持久化,避免因服务宕机导致消息丢失
  • 最后,还在交易服务更新订单状态时做了业务幂等性判断,避免因消息重复消费导致订单状态异常

如果交易服务消息处理失败,有什么兜底方案?

  • 可以在交易服务设置定时任务,定期查询订单支付状态。这样即便 MQ 通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性

延迟消息

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后收到消息

延迟任务:设置在一定时间之后才执行的任务

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

  • 消费者使用 basic.rejectbasic.nack 声明消费失败,并且消息的 requeue 参数设置为false
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange,简称 DLX)

延迟消息插件

Scheduling Messages with RabbitMQ | RabbitMQ - Blog

下载地址:GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

取消超时订单

设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:

  • 如果并发较高,30 分钟可能堆积消息过多,对 MQ 压力很大
  • 大多数订单在下单后 1 分钟内就会支付,但是却需要在 MQ 内等待 30 分钟,浪费资源


RabbitMQ
https://www.renkelin.vip/2024/02/07/RabbitMQ/
Author
Kolin
Posted on
February 7, 2024
Licensed under