实现原理图


基本概念
- Channel(信道):多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道。
- Producer(消息的生产者):向消息队列发布消息的客户端应用程序。
- Consumer(消息的消费者):从消息队列取得消息的客户端应用程序。
- Message(消息):消息由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(消息优先权)、delivery-mode(是否持久性存储)等。
- Routing Key(路由键):消息头的一个属性,用于标记消息的路由规则,决定了交换机的转发路径。最大长度255 字节。
- Queue(消息队列):存储消息的一种数据结构,用来保存消息,直到消息发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将消息取走。需要注意,当多个消费者订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,每一条消息只能被一个订阅者接收。
- Exchange(交换器|路由器):提供Producer到Queue之间的匹配,接收生产者发送的消息并将这些消息按照路由规则转发到消息队列。交换器用于转发消息,它不会存储消息 ,如果没有 Queue绑定到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。交换器有四种消息调度策略(下面会介绍),分别是fanout, direct, topic, headers。
- Binding(绑定):用于建立Exchange和Queue之间的关联。一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则,所以可以将交换器理解成一个由Binding构成的路由表。
- Binding Key(绑定键):Exchange与Queue的绑定关系,用于匹配Routing Key。最大长度255 字节。
- Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker。
- Virtual host(虚拟主机):出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue 等。
Exchange类型
1. Fanout(订阅模式|广播模式)

Fanout交换器会把所有发送到该交换器的消息路由到所有与该交换器绑定的消息队列中。订阅模式与Binding Key和Routing Key无关,交换器将接受到的消息分发给有绑定关系的所有消息队列队列(不论Binding Key和Routing Key是什么)。类似于子网广播,子网内的每台主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
2. Direct(路由模式)

Direct交换器需要消息的Routing Key与 Exchange和Queue 之间的Binding Key完全匹配,如果匹配成功,将消息分发到该Queue。只有当Routing Key和Binding Key完全匹配的时候,消息队列才可以获取消息。Direct是Exchange的默认模式。RabbitMQ默认提供了一个Exchange,名字是空字符串,类型是Direct,绑定到所有的Queue(每一个Queue和这个无名Exchange之间的Binding Key是Queue的名字)。所以,有时候我们感觉不需要交换器也可以发送和接收消息,但是实际上是使用了RabbitMQ默认提供的Exchange。
3. Topic(通配符模式)

Topic交换器按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的Binding Key进行模糊匹配,如果匹配成功,将消息分发到该Queue。Routing Key是一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding Key中可以存在两种特殊字符“ * ”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(也可以是零个或一个)。
RPC机制

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败,甚至连有没有消费者来处理这条消息都不知道。但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。
RabbitMQ中实现RPC的机制是:1,生产者发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14个属性,这些属性会随着消息一起发送)中设置两个属性值replyTo(一个Queue名称,用于告诉消费者处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,消费者处理完成后需要将此属性返还,生产者将根据这个id了解哪条请求被成功执行了或执行失败)。2,消费者收到消息并处理。3,消费者处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性。4,生产者之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理。
示例:https://blog.csdn.net/SanfordLuo/article/details/122622920
消息确认机制
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在Timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的问题,Queue中堆积的消息会越来越多,消费者重启后会重复消费这些消息并重复执行业务逻辑。如果我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ会立即把这个Message标记为完成,然后从queue中删除了。
消息持久化机制
如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。
事务
对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器接收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。
消息分发机制
我们在应用程序使用消息系统时,一般情况下生产者往队列里插入数据时速度是比较快的,但是消费者消费数据往往涉及到一些业务逻辑处理导致速度跟不上生产者生产数据。因此如果一个生产者对应一个消费者的话,很容易导致很多消息堆积在队列里。这时,就得使用工作队列了。一个队列有多个消费者同时消费数据。工作队列有两种分发数据的方式:轮询分发(Round-robin)和 公平分发(Fair dispatch)。轮询分发:队列给每一个消费者发送数量一样的数据。公平分发:消费者设置每次从队列中取一条数据,并且消费完后手动应答,继续从队列取下一个数据。
1. 轮询分发
如果工作队列中有两个消费者,两个消费者得到的数据量一样的,并不会因为两个消费者处理数据速度不一样使得两个消费者取得不一样数量的数据。但是这种分发方式存在着一些隐患,消费者虽然得到了消息,但是如果消费者没能成功处理业务逻辑,在RabbitMQ中也不存在这条消息。就会出现消息丢失并且业务逻辑没能成功处理的情况。
2. 公平分发
消费者设置每次从队列里取一条数据,并且关闭自动回复机制,每次取完一条数据后,手动回复并继续取下一条数据。与轮询分发不同的是,当每个消费都设置了每次只会从队列取一条数据时,并且关闭自动应答,在每次处理完数据后手动给队列发送确认收到数据。这样队列就会公平给每个消息费者发送数据,消费一条再发第二条,而且可以在管理界面中看到数据是一条条随着消费者消费完从而减少的,并不是一下子全部分发完了。采用公平分发方式就不会出现消息丢失并且业务逻辑没能成功处理的情况。
解决方案:
顺序性
RabbitMQ使用过程中,有些业务场景需要我们保证顺序消费,例如:业务上产生三条消息,分别是对数据的增加、修改、删除操作,如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了 。

RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序、队列中消息的顺序、消费消息的顺序。
发送消息的顺序
先看一下是什么原因造成了发送消息时候的顺序错乱
- 消息生产者启用了发送确认(ack)机制,在发生中断时,需要 RabbitMQ 补偿发送时,那么此时消息在源头就已经出现顺序混乱了,导致消息被消费时也是乱序的。
- 另一种情况,如果消息发送时,设置了超时时间,并且采用了死信队列,模拟了延时队列的效果,那么此时消息的顺序也时不能保证的。
- 还有一种情况,如果消息设置了优先级,那么在高并发的情况下,消息的顺序也是得不到保证的,消息的消费顺序也就不能保证了。
- 发送消息的顺序性,一般来说不做要求,但是如果一定要求顺序,可以使用锁机制配合 ack机制 来保证消息的顺序到达。
队列中消息的顺序
消息队列中的消息是遵循FIFO(先进先出)原则,天然有序
消费消息的顺序
有这样一个订单操作,insert 、update、delete连续操作,并且消息已经顺序存在queue中,那么如何保证消费顺序是insert 、update、delete,而不是delete、insert 、update呢?
方案一:拆分多个queue,每个queue一个consumer,该条订单的相关操作全部放到这个queue中,由这一个consumer消费,这样做多了一些queue。
方案二:就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,该条订单相关的消息全部放到一个队列中,然后分发给底层不同的worker线程来处理。
可靠性
实际上消息队列是没法百分百保证不丢失的,我们只能尽量降低概率,然后在消息丢失后记录日志,再处理
有这样一个典型的订单场景

- MQ 挂了,消息没发出去。创建订单后面几个优惠券、积分的下游系统全都没有执行业务结算怎么办?
- MQ 是高可用的,消息发出去了,但是优惠券结算业务报错了怎么办?因为这个时候是异步的,也不好去回滚
- 消息正常发出去,消费者也接收到了,订单系统、优惠券系统都正常执行完了,积分业务报错了导致积分没结算,那这个订单的数据就不一致了
要解决上述问题,就是要保证消息一定要可靠地被消费,那么我们可以来分析下消息有哪些步骤会出问题
RabbitMQ 发送的消息是这样的 , 消息被生产者发到指定的交换机根据路由规则路由到绑定的队列,然后推送给消费者 。

在这个过程中,可能会出一下问题
- 生产者消息没到交换机,相当于生产者弄丢消息
- 交换机没有把消息路由到队列,相当于生产者弄丢消息
- RabbitMQ 宕机导致队列、队列中的消息丢失,相当于 RabbitMQ 弄丢消息
- 消费者消费出现异常,业务没执行,相当于消费者弄丢消息
单消费实例的解决方案:
- 生产者弄丢消息RabbitMQ 提供了确认和回退机制,有一个异步监听机制,每次发送消息,如果成功/未成功发送到交换机都可以触发一个监听ConfirmCallback(),从交换机路由到队列失败也会有一个监听ReturnsCallback()。只需要开启这两个监听机制,使用记录日志、发送邮件通知、落库定时任务扫描重发这些应对策略生产者弄丢数据其实及其罕见,落库定时任务扫描重发工作量大,一般记录日志后,发邮件给对应人员,补充数据库数据即可
- RabbitMQ弄丢消息宕机重启不开启持久化的情况下 RabbitMQ 重启之后所有队列和消息都会消失,所以我们创建队列时设置持久化
- 消费者弄丢消息RabbitMQ 给我们提供了消费者应答(ack)机制,默认情况下这个机制是自动应答,只要消息推送到消费者就会自动 ack ,然后 RabbitMQ 删除队列中的消息。启用手动应答之后我们在消费端调用 API 手动 ack 确认之后,RabbitMQ 才会从队列删除这条消息 。开启手动ack,在业务处理完成之后手动ack即可,如果在业务处理过程中出异常了,队列会给消费者重推,也要注意重推导致的循环异常,可以配置重试次数策略。
消息重复消费(幂等性)
这个也是生产环境业务中经常出现的场景,重复消费也要从两方面分析,为什么会出现重复消费
生产时消息重复
在网络波动的情况下,生产者给MQ服务器发送消息,由于网络原因导致生产者没有收到ACK确认消息,但是MQ服务器实际上已经接收到了消息,在这种情况下生产者就会重新发送一遍刚才的消息。
此时重发是MQ-client发起的,消息的处理是MQ-server,为了避免broker落地重复的消息,对每条消息,MQ系统内部必须生成一个inner-msg-id,作为去重和幂等的依据,这个内部消息ID的特性是:
全局唯一MQ生成,具备业务无关性,对消息发送方和消息接收方屏蔽有了这个inner-msg-id,就能保证即使重发,也只有1条消息落到MQ-server的DB中
消费时消息重复
在消费者方面如果出现网络问题,比如消费者对消息已经成功消费了,在向MQ服务器进行确认的时候网络异常了,这时候MQ服务器就没有接收到确认,MQ为了保证消息被消费,就会继续向消费者发送之前已经被消费了的消息,这种情况下消费者就会接收到两条一样的消息。
我们解决消息重复消费主要是保证消费的幂等性,有两种角度,第一种就是不让消费端执行两次,第二种是让它重复消费了,但是不会对我的业务数据造成影响就行。通常可以在发消息的时候携带业务唯一id,消费成功后保存到redis/db中,消费前再检查下有没有这个ID,有的话就表示已经消费过了,或者使用数据库唯一性主键约束,再或者使用cas,最后遇到重复消息丢弃消息即可。
消息堆积
- 对生产者发消息接口进行适当限流(不太推荐,影响用户体验)
- 多部署几台消费者实例(推荐)
- 适当增加 prefetch 的数量,让消费端一次多接受一些消息(推荐,可以和第二种方案一起用)
RabbitMQ的高级应用
1. 死信队列

死信队列(DLX,Dead-Letter-Exchange),利用DLX,当消息在一个队列中变成无法被消费的消息(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。
消息变成死信的几种情况:
- 消息被拒绝(channel.basicReject/channel.basicNack)并且request=false;
- 消息在队列的存活时间超过设置的生存时间(TTL)时间;
- 队列达到最大长度(队列满了,无法再添加数据到队列中)。
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
死信队列的设置:
- 首先,需要设置死信队列的Exchange和queue,然后进行绑定;
- 然后,我们进行正常声明交换机、队列、绑定,只不过我们需要在队列机上一个参数即可:arguments.put(“x-dead-letter-exchange”,”dlx.exchange”);这样消息在过期、被拒绝、队列在达到最大长度时,消息就可以直接路由到死信队列。
2. 延迟队列

基于rabbitmq_delayed_message_exchange插件,实现延迟队列效果。它是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。使用延迟队列,可以有效解决定时任务带来的系统压力以及业务处理时效性等问题。
应用场景:
- 订单在30分钟之内未支持,则自动取消订单
- 工单在60分钟之内仍未处理,则发送消息提醒
- 预定会议室后,在预定时间前10分钟,通知提醒各参会人员
3. 队列幂等性

幂等性的实质是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复相同的请求而对该资源重复造成影响。注意关注的是请求操作对资源本身造成的影响,而不是请求资源返回的结果。
MQ消费者的幂等性一般使用全局ID或者写个唯一标识(比如流水号/时间戳/UUID/订单号)来判断该消息是否已消费过,也可以利用redis执行setnx命令,天然具有幂等性,从而实现不重复消费(推荐使用redis)。
4. 优先级队列
优先级队列,也就是具有高优先级的队列,优先级高的消息具备优先被消费的特权。通过队列的 x-max-priority 参数设置队列的最大优先级,之后在发送消息时通过 priority 属性再设置当前消息的优先级。优先级应在 0 和 255 之间,推荐1 ~ 10。
- 优先级默认最低为0,最高为队列设置的最大优先级;
- 对于单条消息来谈优先级是没有什么意义的。假如消费者的消费速度大于生产者的速度且Broker中没有消息堆积的情况下,对发送的消息设置优先级就没有什么意义,因为生产者刚发完一个消息就被消费者消费了,相当于Broker中至多只有一条消息。
5. 惰性队列

惰性队列会尽可能地将消息存入磁盘中,而在消费者消费消息时才会被加载到内存中,它支持更多的消息存储。
队列具备两种模式:default 和 lazy。默认的为 default 模式,在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。
保障消息 100% 投递成功方案
什么是生产端的可靠性投递?
- 保障消息的成功发出
- 保障MQ节点的成功接收
- 发送端收到MQ节点(Broker) 确认应答
- 完善的消息补偿机制
如果想保障消息百分百投递成功,只做到前三步不一定能够保障。有些时候或者说有些极端情况,比如生产端在投递消息时可能就失败了,或者说生产端投递了消息,MQ也收到了,MQ在返回确认应答时,由于网络闪断导致生产端没有收到应答,此时这条消息就不知道投递成功了还是失败了,所以针对这些情况我们需要做一些补偿机制。
互联网大厂的解决方案
- 消息落库,对消息状态进行打标
- 消息的延迟投递,做二次确认,回调检查
具体使用哪种要根据业务场景和并发量、数据量大小来决定
1. 消息信息落库,对消息状态进行打标

- 进行数据的入库比如我们要发送一条订单消息,首先把业务数据也就是订单信息进行入库,然后生成一条消息,把消息也进行入库,这条消息应该包含消息状态属性,并设置初始值比如为0,表示消息创建成功正在发送中,这种方式缺陷在于我们要对数据库进行持久化两次。
- 首先要保证第一步消息都存储成功了,没有出现任何异常情况,然后生产端再进行消息发送。如果失败了就进行快速失败机制。
- MQ把消息收到的结果应答(confirm)给生产端
- 生产端有一个Confirm Listener,去异步的监听Broker回送的响应,从而判断消息是否投递成功,如果成功,去数据库查询该消息,并将消息状态更新为1,表示消息投递成功。
假设第二步OK了,在第三步回送响应时,网络突然出现了闪断,导致生产端的Listener就永远收不到这条消息的confirm应答了,也就是说这条消息的状态就一直为0了。
- 此时我们需要设置一个规则,比如说消息在入库时候设置一个临界值timeout,5分钟之后如果还是0的状态那就需要把消息抽取出来。这里我们使用的是分布式定时任务,去定时抓取DB中距离消息创建时间超过5分钟的且状态为0的消息。
- 把抓取出来的消息进行重新投递(Retry Send),也就是从第二步开始继续往下走
- 当然有些消息可能就是由于一些实际的问题无法路由到Broker,比如routingKey设置不对,对应的队列被误删除了,那么这种消息即使重试多次也仍然无法投递成功,所以需要对重试次数做限制,比如限制3次,如果投递次数大于三次,那么就将消息状态更新为2,表示这个消息最终投递失败。
针对这种情况如何去做补偿呢,可以有一个补偿系统去查询这些最终失败的消息,然后给出失败的原因,当然这些可能都需要人工去操作。
第一种可靠性投递,在高并发的场景下是否适合?
对于第一种方案,我们需要做两次数据库的持久化操作,在高并发场景下显然数据库存在着性能瓶颈。其实在我们的核心链路中只需要对业务进行入库就可以了,消息就没必要先入库了,我们可以做消息的延迟投递,做二次确认,回调检查。
当然这种方案不一定能保障百分百投递成功,但是基本上可以保障大概99.9%的消息是OK的,有些特别极端的情况只能是人工去做补偿了,或者使用定时任务去做都可以。使用第二种方式主要目的是为了减少数据库操作,提高并发量。
2. 消息的延迟投递,做二次确认,回调检查

Upstream Service上游服务也就是生产端,Downstream service下游服务也就是消费端,Callback service就是回调服务。
- 先将业务消息进行入库,然后生产端将消息发送出去,注意一定是等数据库操作完成以后再去发送消息。
- 在发送消息之后,紧接着生产端再次发送一条消息(Second Send Delay Check),即延迟消息投递检查,这里需要设置一个延迟时间,比如5分钟之后进行投递。
- 消费端去监听指定队列,将收到的消息进行处理。
- 处理完成之后,发送一个confirm消息,也就是回送响应,但是这里响应不是正常的ACK,而是重新生成一条消息,投递到MQ中。
- 上面的Callback service是一个单独的服务,其实它扮演了第一种方案的存储消息的DB角色,它通过MQ去监听下游服务发送的confirm消息,如果Callback service收到confirm消息,那么就对消息做持久化存储,即将消息持久化到DB中。
- 5分钟之后延迟消息发送到MQ了,然后Callback service还是去监听延迟消息所对应的队列,收到Check消息后去检查DB中是否存在消息,如果存在,则不需要做任何处理,如果不存在或者消费失败了,那么Callback service就需要主动发起RPC通信给上游服务,告诉它延迟检查的这条消息我没有找到,你需要重新发送,生产端收到信息后就会重新查询业务消息然后将消息发送出去。
这么做的目的是少做了一次DB的存储,在高并发场景下,最关心的不是消息100%投递成功,而是一定要保证性能,保证能抗得住这么大的并发量。所以能节省数据库的操作就尽量节省,可以异步的进行补偿。
其实在主流程里面是没有这个Callback service的,它属于一个补偿的服务,整个核心链路就是生产端入库业务消息,发送消息到MQ,消费端监听队列,消费消息。其他的步骤都是一个补偿机制。
第二种方案也是互联网大厂更为经典和主流的解决方案。
注意
- 一定是先入库,再发RabbitMQ消息
- 互联网大厂,不加任何事务,事务会造成严重的性能瓶颈。只最后做业务补偿。
幂等性的主流解决方案
幂等性是什么?
简单来说就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的。我们可以借鉴数据库的乐观锁机制来举个例子
- 首先为表添加一个版本字段version
- 在执行更新操作前呢,会先去数据库查询这个version
- 然后执行更新语句,以version作为条件,例如:UPDATE T_REPS SET COUNT = COUNT -1,VERSION = VERSION + 1 WHERE VERSION = 1
- 如果执行更新时有其他人先更新了这张表的数据,那么这个条件就不生效了,也就不会执行操作了,通过这种乐观锁的机制来保障幂等性。
消费端-幂等性保障
什么情况下会出现重复消费?
当消费者消费完消息时,在给生产端返回ack时由于网络中断,导致生产端未收到确认信息,该条消息会重新发送并被消费者消费,但实际上该消费者已成功消费了该条消息,这就是重复消费问题。
如何避免消息的重复消费问题?
消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即使我们收到了多条一样的消息
业界主流的幂等性操作:
- 唯一ID + 指纹码机制,利用数据库主键去重
- 利用Redis的原子性去实现
唯一ID+指纹码机制
- 唯一ID + 指纹码机制,利用数据库主键去重
- SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID +指纹码
- 好处:实现简单
- 坏处:高并发下有数据库写入的性能瓶颈
- 解决方案:跟进ID进行分库分表进行算法路由
整个思路就是首先我们需要根据消息生成一个全局唯一的ID,然后还需要加上一个指纹码。这个指纹码它并不一定是系统去生成的,而是一些外部的规则或者内部的业务规则去拼接,它的目的就是为了保障这次操作是绝对唯一的。
将ID + 指纹码拼接好的值作为数据库主键,就可以进行去重了。即在消费消息前呢,先去数据库查询这条消息的指纹码标识是否存在,没有就执行insert操作,如果有就代表已经被消费了,就不需要管了。
对于高并发下的数据库性能瓶颈,可以跟进ID进行分库分表策略,采用一些路由算法去进行分压分流。应该保证ID通过这种算法,消息即使投递多次都落到同一个数据库分片上,这样就由单台数据库幂等变成多库的幂等。
利用Redis的原子性去实现
我们都知道redis是单线程的,并且性能也非常好,提供了很多原子性的命令。比如可以使用 setnx 命令。
在接收到消息后将消息ID作为key执行 setnx 命令,如果执行成功就表示没有处理过这条消息,可以进行消费了,执行失败表示消息已经被消费了。
使用 redis 的原子性去实现主要需要考虑两个点
- 第一:我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
- 第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略(同步到关系型数据库)?缓存又如何做到数据可靠性保障呢
关于不落库,定时同步的策略,目前主流方案有两种,第一种为双缓存模式,异步写入到缓存中,也可以异步写到数据库,但是最终会有一个回调函数检查,这样能保障最终一致性,不能保证100%的实时性。第二种是定时同步,比如databus同步。
机制与策略
确认消息Confirm
理解Confirm消息确认机制
- 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
- 生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障!
确认机制流程图
生产端发送消息到Broker,然后Broker接收到了消息后,进行回送响应,生产端有一个Confirm Listener,去监听应答,当然这个操作是异步进行的,生产端将消息发送出去就可以不用管了,让内部监听器去监听Broker给我们的响应。

Return消息机制
Return Listener 用于处理一些不可路由的消息!
我们的消息生产者,通过指定一个Exchange 和Routingkey,把消息送达到某一个队列中去, 然后我们的消费者监听队列,进行消费处理操作!
但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener

Return机制实现
- 添加return监听:ch.NotifyReturn(make(chan amqp.Return)),生产端去监听这些不可达的消息,做一些后续处理,比如说,记录下消息日志,或者及时去跟踪记录,有可能重新设置一下就好了
- 发送消息时,设置Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
消费端的限流策略
为什么需要消费端的限流?
- 假设一个场景,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这种情况:巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!此时很有可能导致服务器崩溃,严重的可能导致线上的故障。
- 除了这种场景,还有一些其他的场景,比如说单个生产者一分钟生产出了几百条数据,但是单个消费者一分钟可能只能处理60条数据,这个时候生产端和消费端肯定是不平衡的。通常生产端是没办法做限制的。所以消费端肯定需要做一些限流措施,否则如果超出最大负载,可能导致消费端性能下降,服务器卡顿甚至崩溃等一系列严重后果。
消费端限流机制
RabbitMQ提供了一种qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息 (通过基于consume或者channel设置Qos的值) 未被确认前,不进行消费新的消息。需要注意:1.不能设置自动签收功能(autoAck = false)2.如果消息没被确认,就不会到达消费端,目的就是给消费端减压
消费端ACK与重回队列机制
消费端的手工ACK与NACK
当我们设置 autoACK=false 时,就可以使用手工ACK方式了,那么其实手工方式包括了手工ACK与NACK。当我们手工 ACK 时,会发送给Broker一个应答,代表消息成功处理了,Broker就可以回送响应给生产端了。NACK 则表示消息处理失败了,如果设置重回队列,Broker端就会将没有成功处理的消息重新发送。
使用方式
消费端进行消费的时候,如果由于业务异常我们可以手工 NACK 并进行日志的记录,然后进行补偿!方法:func (d Delivery) Nack(multiple, requeue bool) error
如果由于服务器宕机等严重问题,那我们就需要手工进行 ACK 保障消费端消费成功!方法:func (d Delivery) Ack(multiple bool) error
消费端的重回队列
- 消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker!
- 重回队列,会把消费失败的消息重新添加到队列的首部(不是尾端),供消费者继续消费。
- 一般我们在实际应用中,都会关闭重回队列,也就是设置为false
TTL消息详解
TTL说明
- TTL是Time To Live的缩写,也就是生存时间
- RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
- RabbitMQ支持为每个队列设置消息的超时时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除
包括:消息的过期时间设置,队列的过期时间设置
注意事项
- 两者的区别是设置队列的过期时间是对该队列的所有消息生效的。
- 为消息设置TTL有一个问题:RabbitMQ只对处于队头的消息判断是否过期(即不会扫描队列),所以,很可能队列中已存在死消息,但是队列并不知情。这会影响队列统计数据的正确性,妨碍队列及时释放资源。
死信队列详解
死信队列介绍
- 死信队列:DLX,dead-letter-exchange
- 利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
消息变成死信有以下几种情况
- 消息被拒绝(reject / nack),并且requeue = false
- 消息TTL过期
- 队列达到最大长度
死信处理过程
- DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
- 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
- 可以监听这个队列中的消息做相应的处理。
死信队列设置
- 首先需要设置死信队列的exchange和queue,然后进行绑定:
Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: #表示只要有消息到达了Exchange,那么都会路由到这个queue上
- 然后需要有一个监听,去监听这个队列进行处理
- 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可:arguments["x-dead-letter-exchange"]="dlx.exchange",这样消息在过期、requeue、 队列在达到最大长度时,消息就可以直接路由到死信队列!
本文参考:
https://blog.csdn.net/weixin_43498985/article/details/119026198
https://blog.csdn.net/m0_37540696/article/details/128026687
https://blog.csdn.net/weixin_38854703/article/details/113856708
https://www.guaosi.com/2020/01/29/dive-into-the-advanced-features-of-rabbitmq-with-golang/