RabbitMQ快速上手以及核心概念详解

推荐:

bilibili-京东架构师诸葛

04-RabbitMQ 高级教程

MQ介绍

什么是MQ,有什么用

MQ即MessageQueue,消息对列。我们这次要学习的RabbitMQ就是一种典型的MQ产品。

那么到底什么是MQ呢?可以分两个部分来理解:

  • 消息Message:在不同应用程序之间传递的数据。
  • 队列Queue:一种FIFO先进先出的数据结构。将消息以队列的形式存储起来,并且在不同的应用程序之间进行传递,这就成了MessageQueue。

**MQ产品最直接的作用,是将同步的事件驱动改为异步的消息驱动。**这话什么意思?我们从一个最常见的SpringBoot应用开始说起。

首先搭建一个普通的Maven项目在pom.xml中引入SpringBoot的依赖

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies>

然后添加一个监听器类

1
2
3
4
5
6
7
8
9
10
11
public class MyApplicationListener implements ApplicationListener<ApplicationEvent> {
@Override
public void onApplicationEvent(ApplicationEvent event) {
System.out.println("=====> MyApplicationListener: " + event);
}

@Override
public boolean supportsAsyncExecution() {
return ApplicationListener.super.supportsAsyncExecution();
}
}

接下来,添加一个springboot启动类,在启动类中加入自己的监听器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SpringBootApplication
public class ServiceDemoApplication implements CommandLineRunner {

@Resource
private ApplicationContext applicationContext;

public static void main(String[] args) {
SpringApplication application = new SpringApplication(ServiceDemoApplication.class);
application.addListeners(new MyApplicationListener());
application.run(args);
}

@Override
public void run(String... args) throws Exception {
applicationContext.publishEvent(new ApplicationEvent("myEvent") {
});
}
}

好了,不用添加配置文件,直接启动,你会看到:

image-20241118105439872

从这个示例看到,SpringBoot框架其实在启动时,就会尝试发布各种Applicationevent事件,表示自己启动到了哪个步骤。这时,SpringBoot框架就可以称为消息生产者Producer。同样的,只要有ApplicationEvent事件发布了,就会触发MyApplicationListener监听器,处理这些事件。MyApplicationListener就可以成为消息消费者Consumer。

Producer和Consumer他们的运行状况互不干涉,不管有没有Consumer,Producer一样会发布消息。反过来,不管Producer有没有发布消息,Consumer也一样会监听这些事件。这种方式,实际上就是通过事件中包含的消息在驱动Producer和Consumer工作,这种工作方式也就称为消息驱动。

与消息驱动形成对比的是常见的事件驱动。比如经常写的Controller,只有通过一个事件主动触发,才会调用。

从这个简单的例子可以看到,SpringBoot内部就集成了这种消息驱动的机制。但是,这些Producer和Consumer都只能在一个进程中使用。如果需要跨进程调用呢?这就需要独立一个中间服务,才能发布和接受这些消息。而这个中间服务,就是MQ中间件。

比如在一个大型电商项目中,订单服务完成下单,就可以发布下单事件,而下游的消费者就可以消费这个下单事件,进行一些补充的业务。

image-20241118105946620

优势

在这个业务过程中,MQ中间件应该要起到什么作用呢?

  • **应用解耦:**Producer和Consumer都只跟中间件进行交互,而不需要互相进行交互。这意味着,在Producer生产消息时,不需要考虑有没有Consumer或者有多少个Consumer。反之亦然。甚至,即便Producer和Consumer是用不同语言开发的,只要都能够与MQ中间件正常交互,那么他们就可以通过MQ中间件进行消息传递。

  • **异步提速:**消息并不是从Producer生产出来就立即交由Consumer消费,而是在MQ中暂存下来。等Consumer启动后,再去MQ消费。也就是说,错开了Producer生产消息和Consumer消费消息的时间,提升了接口响应速度。

  • **削峰填谷:**有了MQ做消息暂存,那么当Producer生产消息的速度与Consumer消费消息的速度不一致时,MQ就能起到削峰填谷的作用。

    场景说明:秒杀活动是流量削峰的一种应用场景,由于服务器处理资源能力有限,因此出现峰值时很容易造成服务器岩机、用户无法访问的情况。为了解决这个问题,通常会采用消息队列缓冲瞬时高峰流量,对请求进行分层过滤,从而过滤掉一些请求。

    针对上述秒杀业务的场景需求,如果专门增设服务器来应对秒杀活动期间的请求瞬时高峰的话,在非秒杀活动期间,这些多余的服务器和配置显得有些浪费:如果不进行有效处理的话,秒杀活动瞬时高峰流量请求有可能压跨服务,因此,在秒杀活动中加入消息服务是较为理想的解决方案。通过在应用前端加入消息服务,先将所有请求写入到消息队列,并限定一定的阀值,多余的请求直接返回秒杀失败,秒杀服务会根据秒杀规则从消息队列中读取并处理有限的秒杀请求。

  • 分布式事务管理:

    场景说明:在分布式系统中,分布式事务是开发中必须要面对的技术难题,怎样保证分布式系统的请求业务处理的数据一致性通常是要重点考虑的问题。针对这种分布式事务管理的情况,自前较为可靠的处理方式是基于消息队列的二次提交,在失败的情况可以进行多次尝试,或者基于队列数据进行回滚操作。因此,在分布式系统中加入消息服务是一个既能保证性能不变,又能保证业务一致性的方案。

劣势

  • **系统可用性降低:**系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
  • **系统复杂度提高:**MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
  • **一致性问题:**A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败,则会造成数据处理的不一致,但可实现最终一致性。

主流MQ产品对比

在MQ长期发展过程中,诞生了很多MQ产品,但是有很多MQ产品都已经逐渐被淘汰了。比如早期的ZeroMQ、ActiveMQ等。目前最常用的MQ产品包括Kafka、RabbitMQ和RocketMQ。我们对这三个产品做下简单的比较,重点需要理解他们的适用场景。

特性 RabbitMQ RocktMQ Kafka
单机吞吐量 万级,比RocketMQ、Kafka低一个数量级 10万级,支撑高吞吐 10万级,高吞吐,技术生态完整,一般配合大数据系统来进行实时数据计算、日志采集等场景
topic数量对吞吐量的影响 topic可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic topic从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka尽量保证topic数量不要过多,如果要支撑大规模的topic,需要增加更多的机器资源
时效性 微秒级,这是RabbitMQ的一大特点,延迟最低 ms级 延迟在ms级以内
可用性 高,基于主从架构实现高可用 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 基本不丢 经过参数优化配置,可以做到0丢失 经过参数优化配置,可以做到0丢失
优劣势总结 并发能力强性能极好(erlang天生优势),延时很低,消息可靠性高,界面客户端丰富,功能全面。
吞吐量较低。消息积压会影响性能。erlang语言比较小众。
基于Java,高吞吐,高性能,高可用,可伸缩,高级功能非常全面。
技术生态相对没那么完整。
吞吐量大,性能非常好,技术生态完整。
功能比较单一,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用。
适用场景 需要可靠消息传递的业务场景,例如金融系统的支付、订单处理等。
需要高度灵活性的消息模型,例如消息路由、动态队列等。
需要与其他应用集成的场景,RabbitMQ提供了丰富的客户端库和协议支持。

企业内部系统调用
高性能、高可用性的消息传递场景,例如实时数据分析、电商秒杀等。
需要强大的消息过滤和消息追踪功能的场景,例如广告投放、用户推送等
需要分布式事务支持的场景,RocketMQ提供了分布式事务消息特性。

几乎全场景,尤其适合金融场景。
需要高吞吐量和低延迟的实时数据处理场景,例如用户行为日志分析、实时监控等。
需要保留大量历史数据并支持数据回溯的场景,例如大数据分析、数据仓库等
需要构建事件驱动架构的场景,Kafka可以作为事件源和消息总线

分布式日志收集,大数据采集。

好的产品都是在不断演进的,所以对这些产品的理解也需要与时俱进。比如现在还有个MQ产品Pulsar,非常适合于大型企业内部海量的系统调用,也体现了非常强大的竞争力。

RabbitMQ安装

https://www.rabbitmq.com/docs

RabbitMQ是基于Erlang语言开发的,所以安装RabbitMQ之前需要安装Erlang语言环境。需要注意下的是RabbitMQ与Erlang语言之间是有版本对应关系的。https://www.rabbitmq.com/docs/which-erlang

在基于 RPM 的 Linux上安装

注意:不同 Centos 版本的 rpm 包不同:

  • 下载对应 centos 版本的 erlang 安装包
  • 下载对应 erlang 版本的 rabbitmq 安装包
1
2
3
4
5
6
7
8
9
10
11
12
13
dnf install esl-erlang_26.2.5_1_centos_8_x86_64.rpm
dnf install rabbitmq-server-3.13.7-1.el8.noarch.rpm

dnf list installed | grep rabbit
dnf list installed | grep erlang
dnf remove rabbitmq-server.noarch
dnf remove esl-erlang.x86_64

# 单元文件位置 /usr/lib/systemd/system/rabbitmq-server.service
systemctl status rabbitmq-server
systemctl start rabbitmq-server
systemctl stop rabbitmq-server
systemctl enable rabbitmq-server

通用二进制构建

源码安装erlang

下载地址:https://www.erlang.org/downloads

所用版本:26.2.5.5

安装指南:https://www.erlang.org/docs/26/installation_guide/install

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 安装前依赖解决
yum install \
m4 \
vim \
wget \
gcc \
gcc-c++ \
make \
cmake \
automake \
autoconf \
readline \
kernel-devel \
ncurses-devel \
openssl-devel \
readline-devel \
-y

dnf remove --oldinstallonly # 一键删除旧内核
dnf autoremove # 从系统中删除所有最初作为用户安装软件包的依赖项安装的软件包,但此类包不再需要它们。

cd /opt/local/
tar -zxvf otp_src_26.2.5.5.tar.gz
cd /opt/local/otp_src_26.2.5.5/
export ERL_TOP=`pwd` # 设置指向当前路径环境变量,仅当前shell有效
./configure --prefix=/opt/local/erlang --without-javac # --without-javac 具体含义见erlang官方安装指南
make && make install
rm -rf /opt/local/otp_src_26.2.5.5

添加环境变量(也可以创建 erlang 的软链接,见下文 开机自启systemctl ):

1
2
3
echo "export PATH=/opt/local/erlang/bin:$PATH" >> /etc/profile
echo "RABBITMQ_HOME=/opt/local/rabbitmq" >> /etc/profile
source /etc/profile

测试

1
2
$ erl -version
Erlang (SMP,ASYNC_THREADS) (BEAM) emulator version 14.2.5.4

源码安装RabbitMQ

下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.13.7

所用版本:rabbitmq-server-generic-unix-3.13.7.tar.xz

1
2
3
4
5
cd /opt/local/

xz -d rabbitmq-server-generic-unix-3.13.7.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.13.7.tar
mv /opt/local/rabbitmq_server-3.13.7 /opt/local/rabbitmq

添加环境变量

1
2
echo "export PATH=/opt/local/rabbitmq/sbin:$PATH" >> /etc/profile
source /etc/profile

示例配置文件

RabbitMQ 配置文件位于 $RABBITMQ_HOME/etc/rabbitmq/rabbitmq.conf,是配置节点的主要方式。

可以使用环境变量来控制某些设置。建议使用 $RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf 文件。

安装后这两个文件都不存在,因此必须先创建它们。

请参阅 RabbitMQ 配置指南 以了解更多信息。

示例配置文件:https://www.rabbitmq.com/docs/configure#example-config

修改配置以支持guest远程登录:

1
echo "loopback_users = none" >> /opt/local/rabbitmq/etc/rabbitmq/rabbitmq.conf

启动、停止

1
2
/opt/local/rabbitmq/sbin/rabbitmq-server -detached # 后台运行
/opt/local/rabbitmq/sbin/rabbitmqctl shutdown

开启 web 管理界面

1
2
3
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl change_password guest 123456789 # 更改guest用户密码
# RabbitMQ默认只有一个guest帐号,guest帐号只能在RabbitMQ安装服务器上登录,解决办法是添加一个新的帐号,或修改配置

访问地址:http://192.168.0.7:15672

开机自启service–似乎不支持

vim /etc/init.d/rabbitmq

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/bin/bash

source /etc/profile

case "$1" in
'start' )
echo "Starting RabbitMQ ..."
rabbitmq-server -detached
;;
'stop' )
echo "Stopping RabbitMQ ..."
rabbitmqctl shutdown
;;
'status')
echo "Status RabbitMQ ..."
rabbitmqctl status
;;
*)
echo "Usage: $0 {start|stop|status}" >&2
;;
esac

echo "Usage: $0 {start|stop}" >&2 # $0:shell脚本的名字;$1:向shell脚本传的第一个参数;>&2:将“错误”输出到STDERR(标准错误)

启动与停止

1
2
3
4
5
6
7
8
9
10
11
chmod 777 /etc/init.d/rabbitmq

service rabbitmq start # 启动失败,为什么???????????????
service rabbitmq status
service rabbitmq stop
# 等价于
/etc/init.d/rabbitmq start # 启动成功
/etc/init.d/rabbitmq status
/etc/init.d/rabbitmq stop

chkconfig --level 2345 rabbitmq on # 自启动设置失败,提示 【服务 rabbitmq 不支持 chkconfig】

开机自启systemctl–使用中

vim /usr/lib/systemd/system/rabbitmq.service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# rabbitmq.service
[Unit]
Description=rabbitmq
Documentation= https://zh.wikipedia.org/wiki/Systemd
After=syslog.target network.target network-online.target

[Service]
#Type= notify
User=root
Group=root
#UMask=0027
#NotifyAccess=all
TimeoutStartSec=600

# To override LimitNOFILE, create the following file:
#
# /etc/systemd/system/rabbitmq-server.service.d/limits.conf
#
# with the following content:
#
# [Service]
# LimitNOFILE=65536
LimitNOFILE=65536

# Note: systemd on CentOS 7 complains about in-line comments,
# so only append them here
#
# Restart:
# The following setting will automatically restart RabbitMQ
# in the event of a failure. systemd service restarts are not a
# replacement for service monitoring. Please see
# https://www.rabbitmq.com/monitoring.html
Restart=on-failure
RestartSec=10
WorkingDirectory=/opt/local/rabbitmq
# ExecStart 中不能使用 "rabbitmq-server -detached" 方式启动
ExecStart=/opt/local/rabbitmq/sbin/rabbitmq-server
ExecStop=/opt/local/rabbitmq/sbin/rabbitmqctl shutdown
SuccessExitStatus=69
PrivateTmp=true

[Install]
WantedBy=multi-user.target

重新加载配置文件:systemctl daemon-reload

创建 erlang 的软链接(因为 /etc/profile 中配置的环境变量不会生效),如果使用rpm包安装的 erlang 请跳过此步骤

1
2
ln -s /opt/local/erlang/bin/erl /usr/local/bin/erl
ln -s /opt/local/erlang/bin/escript /usr/local/bin/escript

/usr/local/bin 和 /usr/bin 区别

首先注意 usr 指 Unix System Resource,而不是User。

/usr/bin 下面的都是系统预装的可执行程序,会随着系统升级而改变。

/usr/local/bin 目录是给用户放置自己的可执行程序的地方,推荐放在这里,不会被系统升级而覆盖同名文件。

如果两个目录下有相同的可执行程序,由PATH环境变量决定优先级,比如一台服务器的PATH变量为:

1
2
echo $PATH
/root/.local/bin:/root/bin:/usr/local/bin:/usr/local/sbin:/usr/bin:/usr/sbin

这里 /usr/local/bin 优先于 /usr/bin,一般都是如此。

启动与停止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
systemctl start rabbitmq
systemctl stop rabbitmq

systemctl enable rabbitmq
systemctl disable rabbitmq

systemctl status rabbitmq
journalctl -xeu rabbitmq.service # journalctl

# 报以下错误,是因为 /etc/init.d/ 目录下存在相同的服务名,删除即可 rm -rf /etc/init.d/rabbitmq
$ systemctl disable rabbitmq
Synchronizing state of rabbitmq.service with SysV service script with /usr/lib/systemd/systemd-sysv-install.
Executing: /usr/lib/systemd/systemd-sysv-install disable rabbitmq
服务 rabbitmq 不支持 chkconfig

添加用户

可以在管理界面添加或使用如下命令:

1
2
3
4
5
6
7
[root@localhost /]# rabbitmqctl add_user admin 123456789
Adding user "admin" ...
Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.
[root@localhost /]# rabbitmqctl set_permissions -p / admin "." "." ".*"
Setting permissions for user "admin" in vhost "/" ...
[root@localhost /]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...

RabbitMQ基础使用

登录控制台后上方就能看到 RabbitMQ 的主要功能。

Overview:概述,主要展示 RabbitMQ 服务的一些整体运行情况。

Conections、Channels、Exchanges、Queues and Streams:RabbitMQ 的核心功能。

Admin:一些管理功能,例如:用户及访问权限管理、虚拟机管理(virtual host)等。

image-20241206180703465

在RabbitMQ中,不同虚拟机之间的资源是完全隔离的。在资源充足的情况下,每个虚拟机可以当成一个独立的RabbitMQ服务来使用。

image-20241206181353060

接下来我们来上手使用一下RabbitMQ的核心功能。

RabbitMQ高级教程

RabbitMQ的消息流转模型

image-20241206190446895

四大核心概念

生产者

产生数据发送消息的程序是生产者。

交换机

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定 。

队列

队列是RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 。

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

各个名词介绍

**Broker:**接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。

**Virtual host:**出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。

**Connection:**publisher/consumer 和 broker 之间的 TCP 连接。

**Channel:**如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。

**Exchange:**message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast) 。

**Queue:**消息最终被送到这里等待 consumer 取走。

**Binding:**exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保

存到 exchange 中的查询表中,用于 message 的分发依据。

交换机和交换机类型

服务器发送消息不会直接发送到队列中(Queue),只能将消息发送给交换机(Exchange),然后根据确定的规则,RabbitMQ将会决定消息该投递到哪个队列。这些规则称为路由键(routing key),队列通过路由键绑定到交换机上。消息发送到服务器端(broker),消息也有自己的路由键(也可以是空),RabbitMQ也会将消息和消息指定发送的交换机的绑定(binding,就是队列和交互机的根据路由键映射的关系)的路由键进行匹配。

如果匹配的话,就会将消息投递到相应的队列。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。AMQP 0-9-1的代理提供了四种交换机。

Name(交换机类型) Default pre-declared names(预声明的默认名称)
Direct exchange(直连交换机) (Empty string) and amq.direct
Fanout exchange(扇型交换机) amq.fanout
Topic exchange(主题交换机) amq.topic
Headers exchange(头交换机) amq.match (and amq.headers in RabbitMQ)

除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:

  • Name
  • Durability (消息代理重启后,交换机是否还存在)
  • Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)
  • Arguments(依赖代理本身)

交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。

Direct Exchange

将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果相等,则发送到该Binding对应的Queue中。

image-20211230172801166

Topic Exchange

将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行对比,如果匹配上了,则发送到该Binding对应的Queue中。

image-20211230172811483

topic要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,**它必须是一个单词列表,以点号分隔开。**这些单词可以是任意单词,比如说:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。

规则

在这个规则列表中,其中有两个替换符是大家需要注意的。

*(星号)可以代替一个单词

#(井号)可以替代零个或多个单词

注意

  • 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

Fanout Exchange

直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key

image-20211230172824713

Headers Exchange

将消息中的headers与该Exchange相关联的所有Binging中的参数进行匹配,如果匹配上了,则发送到该Binding对应的Queue中。

六大模式

简单模式

image-20211230172833658

工作模式

image-20211230172845711

发布订阅模式

image-20211230172957861

路由模式

image-20211230173014717

主题模式

image-20211230173025614

RPC模式

image-20211230173036138


绑定

binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个 queue 进行了绑定关系。如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。

image-20211230173046913

如果AMQP的消息无法路由到队列(例如,发送到的交换机没有绑定队列),消息会被就地销毁或者返还给发布者。如何处理取决于发布者设置的消息属性。

消息分发

轮训分发

RabbitMQ 默认分发消息采用的轮训分发的,如果同一队列有多个消费节点,则会按照消息顺序进行轮训消费。

不公平分发

RabbitMQ 默认分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是RabbitMQ 并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以设置参数 channel.basicQos(1); 意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。

限流

通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。


消息确认

发布确认

原理

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在*该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始)*,一旦**消息被投递到所有匹配的队列**之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

开启发布确认

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法。

1
2
3
4
5
6
7
//创建 channel实例
channel = connection.createChannel();

// 开启发布确认
channel.confirmSelect();

JAVA

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布**,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。一个最大的缺点就是:*发布速度特别的慢*,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供**每秒不超过数百条发布消息的吞吐量。

1
2
3
4
5
6
// 默认0L
channel.waitForConfirmsOrDie();
// 时间内
channel.waitForConfirmsOrDie(1000L);

JAVA

批量确认发布

与单个等待确认消息相比,先发布一批消息然后一起确认waitForConfirms()*可以*极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的**,也一样阻塞消息的发布。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}

//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}

JAVA

异步确认发布

利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//开启发布确认
channel.confirmSelect();
/**
* 线程安全有序的一个跳表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目 只要给到序列号
* 3.支持并发访问
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

/**
* 确认监听器
* 1. 消息序列号
* 2. true 可以确认小于等于当前序列号的消息
* 3. false 确认当前序列号消息
*/
ConfirmListener confirmListener = new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 是否批量
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
//清除该部分未确认消息
confirmed.clear();
}else{
//只清除当前序列号的消息
outstandingConfirms.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
String message = outstandingConfirms.get(deliveryTag);
logger.error("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
}
};


/**
* 添加一个异步确认的监听器
* 1.确认收到消息的回调
* 2.未收到消息的回调
*/
channel.addConfirmListener(confirmListener);

for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/**
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 通过序列号与消息体进行一个关联
* 全部都是未确认的消息体
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}

JAVA

回退消息

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

1
2
3
4
5
6
7
8
9
10
11
12
13
// mandatory true 强制推送到一个队列中 
public abstract void publish(String topic,boolean mandatory, boolean immediate,boolean durable,Object data) throws Exception;

ReturnListener returnListener = new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, BasicProperties properties, byte[] body) throws IOException {

}
};
// 添加回退监听器
channel.addReturnListener(returnListener);

JAVA

消息应答

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,**这种模式需要在高吞吐量和数据传输安全性方面做权衡,*因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式*仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

  • Channel.basicAck(用于肯定确认)RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  • Channel.basicNack(用于否定确认)
  • Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数不处理该消息了直接拒绝,可以将其丢弃了

批量应答

在手动应答时,**指定 multiple 为 true,**可以进行批量应答,从而减少网络拥堵。

1
2
3
channel.basicAck(envelope.getDeliveryTag(), true);

JAVA

重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。在手动应答时,指定 requeue 为 true,可以进行重新入队。

1
2
3
4
// requeue ? 重新入队 : 丢弃
channel.basicReject(envelope.getDeliveryTag(), true);

JAVA

持久化

我们需要将队列和消息都标记为持久化。来保障当 RabbitMQ 服务停掉或奔溃以后消息生产者发送过来的消息不丢失。

队列持久化

默认我们创建的队列都是非持久化的。rabbitmq 如果重启的话,该队列就会被删除掉,如果要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化。

1
2
3
4
boolean queue_durable = true;
channel.queueDeclare(queue_name, queue_durable, false, false, getQueueArgs(queueLength,queueByteLength));

JAVA

注意:如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误

消息持久化

要想让消息实现持久化,需要在生产者发布消息时 指定 deliveryMode 为 2。

1
2
3
4
5
6
7
8
public abstract void publish(String topic,boolean mandatory, boolean immediate,boolean durable,Object data) throws Exception;

BasicProperties.Builder propsBuilder = new BasicProperties.Builder();

//是否持久化
propsBuilder.deliveryMode(durable ? 2 : 1);

JAVA

注意:将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没

有真正写入磁盘。持久性保证并不强,。如果需要更强有力的持久化策略,可以增加发布确认


队列

死信队列

死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

死信来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

设置死信队列

image-20211230173628371

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

//声明死信队列
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);

// 死信队列绑定死信交换机与 routingkey
channel.queueBind(deadQueue, DEAD_EXCHANGE, "dead_routingkey");

// 正常队列绑定死信队列信息
Map<String, Object> params = new HashMap<>();
// 正常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "dead_routingkey");
String normalQueue = "normal-queue";
// 声明正常队列
channel.queueDeclare(normalQueue, false, false, false, params);
// 正常队列绑定
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "normal_routingkey");

JAVA

延迟队列

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列

实现方式

  • 基于消息TTL和队列TTL转换入死信队列,消费死信队列中的消息
  • 基于延迟插件,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中
  • 用 Java 的 DelayQueue
  • 利用 Redis 的 zset

优先队列

要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序。

1
2
3
4
5
6
7
8
9
10
// 队列
Map args = MapUtils.toMap(new Object[][]{{"x-max-priority", 10}});
channel.queueDeclare(queue_name, queue_durable, false, false, args);

// 消息
BasicProperties.Builder propsBuilder = new BasicProperties.Builder();
propsBuilder.priority(5);
channel.basicPublish(exchange_name, topic, null, str_msg.getBytes("UTF-8"));

JAVA

惰性队列

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。

队列声明

1
2
3
4
5
// 队列
Map args = MapUtils.toMap(new Object[][]{{"x-queue-mode", "lazy"}});
channel.queueDeclare(queue_name, queue_durable, false, false, args);

JAVA

内存开销对比

image-20211230173648687

在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB。

TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为”死信”。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用

队列TTL

1
2
3
4
Map args = MapUtils.toMap(new Object[][]{{"x-message-ttl", 2000}});
channel.queueDeclare(queue_name, queue_durable, false, false, args);

JAVA

消息TTL

1
2
3
4
BasicProperties.Builder basicProperties = new BasicProperties.Builder();
basicProperties.expiration("1000");

JAVA

幂等性

消息重复消费

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

解决思路

MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。

幂等性保障

唯一 ID+指纹码机制

指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

Redis 原子性

利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费。