RabbitMQ消息顺序性解密-确保消息的正确顺序 (rabbit中文翻译)
引言
RabbitMQ 是一个流行的消息队列系统,但它本身并不提供严格的消息顺序保证。这对于一些需要严格按照顺序处理消息的应用场景来说是一个挑战。本文将探讨如何在使用 RabbitMQ 时实现消息的正确顺序,并介绍一些常见的解决方案和注意事项。为什么消息顺序很重要?
在某些场景下,消息的顺序性是非常重要的,比如:- 订单处理:在电商平台中,订单的处理必须按照用户提交的顺序进行,否则可能导致错误的交易或者库存混乱。
- 日志记录:在日志系统中,需要确保日志按照生成的时间顺序进行记录,以保证后续的分析和审计的准确性。
- 事务处理:在金融领域等事务处理中,消息的处理顺序必须按照特定的逻辑进行,以确保交易的一致性和正确性。
常见的解决方案
在使用 RabbitMQ 时,要实现消息的正确顺序传递,可以采取以下解决方案:- 单一队列顺序消费:将所有需要按序处理的消息发送到同一个队列中,然后只使用一个消费者来消费队列中的消息。这样可以确保消息按照发送的顺序被消费,但会造成系统的可扩展性和性能瓶颈。
- 多个队列顺序消费:根据业务逻辑将消息分发到多个队列中,每个队列对应一个消费者。消费者按照队列的顺序依次消费消息,并在消费完成后发送确认消息,这样可以达到大部分情况下的顺序处理要求。
- 消息标识和重排序:在消息的属性中添加一个消息标识,消费者在处理消息时,先根据标识进行排序,然后再进行处理。这种方式可以实现基于消息标识的顺序处理,但会增加一定的处理开销。
- 基于时间窗口的顺序处理:在生产者端根据时间戳将消息分发到不同的队列中,消费者按照队列的顺序依次消费消息。这种方式可以实现基于时间窗口的顺序处理,但对于消息的时间戳要求比较高。
注意事项和挑战
在实现消息的正确顺序时,需要注意以下事项和挑战:- 性能和可扩展性:某些解决方案可能会对系统的性能和可扩展性产生一定的影响,需要权衡好顺序性和系统性能之间的平衡。
- 消息丢失和重复:在使用多个队列顺序消费的解决方案中,如果某个队列出现故障或者消息丢失,可能会引发消息顺序的错乱或者重复消费问题,需要考虑如何处理这种情况。
- 消费者负载均衡:在使用多个队列顺序消费的解决方案中,需要确保各个队列上的消费者负载均衡,避免因为某个队列的消费者处理速度较慢导致整体性能下降。
- 数据库一致性:如果消息需要写入数据库进行持久化,需要保证数据库的一致性,以避免因为消息顺序问题导致数据库状态异常或者数据不一致的情况。
结论
通过合理选择解决方案和注意事项的考虑,我们可以在使用 RabbitMQ 时实现消息的正确顺序。根据具体的业务需求和系统架构,选择合适的解决方案,权衡好顺序性和性能之间的平衡。同时,要注意处理消息丢失、重复消费、消费者负载均衡和数据库一致性等问题,以确保消息的顺序性和系统的稳定性。理解 RabbitMQ 工作流程
RabbitMQ 是基于 AMQP(Advanced Message Queue Protocol 高级消息队列协议)协议实现的消息队列中间件,协议的基本模型结构如下图: 从图中可以看出 AMQP 协议主要包含如下几个部分: 结合上边的内容,我们可以大致的描述出 RabbitMQ 的工作流程:生产者(Producer)与消费者(Consumer)和 RabbitMQ 服务(Broker)建立连接, 然后生产者发布消息(Message)同时需要携带交换机(Exchange) 名称以及路由规则(Routing Key),这样消息会到达指定的交换机,然后交换机根据路由规则匹配对应的 Binding,最终将消息发送到匹配的消息队列(Quene),最后 RabbitMQ 服务将队列中的消息投递给订阅了该队列的消费者(消费者也可以主动拉取消息)。 前边我们已经了解到,一个 Exchange 可以绑定多个 Queue, Exchange 接收生产者发送的消息,然后将消息按照路由规则投放到指定的 Queue 中。 接下来我们需要了解这个路由规则具体是什么样的,它是如何决定 Exchange 将消息发送到那个 Queue。 Binding 代表一个 Exchange 和一个 Queue 的绑定关系,这个绑定关系上可以附件一个参数 Routing Key,Exchange 会根据发送消息时携带的 Routing Key 去和该 Exchange 所有相关的 Binding 匹配,如果匹配到了,就将消息发送给 Binding 中绑定的 Queue。 上边是一个相对通用的流程,可以帮助我们理解交换机的工作原理,当然也存在一些特殊的情况后边会提到。 在 RabbitMQ 中,Exchange 主要分为:Fanout Excahnge 、 Direct Exchange 、 Topic Exchange 、 Default Exchange 、 Headers Exchange ,同时 Routing Key 结合不同类型的 Exchange 使用时,用法也有所不同。 使用 Fanout Exchange 时,会忽略 Routing Key,所以我们也就没必要在绑定 Exchange 和 Queue,以及发送消息时去设置 Routing Key 了,Exchange 直接将消息发送到和它绑定的所有 Queue 中,省略了 Routing Key 匹配的环节。 在 RabbitMQ 中默认使用的是 Direct Exchange,Exchange 和 Queue 绑定时需要指定 Routing Key,发送消息时也需要携带 Routing Key,这样 Exchange 收到消息后就可以根据 Routing Key 匹配到对应的 Binding,进而将消息发送到目标队列里。 注意这里的 Routing Key 是需要精确匹配的。 Topic Exchange 和 Direct Exchange 的用法很类似,区别在于 Topic Exchange 中的 Routing Key 是支持模糊匹配的。 提供了两个通配符: 这样绑定 Exchange 绑定 Queue1、Queue2、Queue3 时指定 Routing Key 分别为 *.# 、 #.# 、 green.* ,这样发送消息是如果携带的 Routing Key 为 ,最终可以匹配到 Queue1、Queue3。 Default Exchange 是一种特殊的 Direct Exchange,使用它时,我们只需要创建一个 Queue 即可,RabbitMQ 服务会自动将 Queue 和一个名称为空的默认的 Exchange 绑定,同时将 Routing Key 指定为 Queue 的名称。 这种 Exchange 简化了使用步骤,但也不够灵活,不好做实际的业务划分,可以用来完成一些简单的需求。 Headers Exchange 不使用 Routing Key 来匹配目标 Queue,而是绑定 Exchange 和 Queue 时需要指定额外的 Arguments 参数,发送消息时携带上对应的参数来实现匹配。 这种 Exchange 用的比较少,了解即可。 一般情况下使用Fanout Excahnge 、 Direct Exchange 就能满足业务需求了。
RabbitMQ消费者注意点
消费者客户端可以通过推模式和拉模式来进行消息消费。 当rabbitmq队列有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。 很多时候轮询的分发机制也不是那么优雅。默认情况下,如果有n个消费者,那么RabbitMQ会将第m条消息分发给第m%n(取余的方式)个消费者,RabbitMQ不管消费者是否消费并已经确认()了消息。 试想一下,如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者由于某些原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。
比如在订阅消费队列之前,消费端程序调用了(5),之后订阅了某个队列进行消费。RabbitMQ 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后 RabbitMQ 将相应的计数减1 ,之后消费者可以继续接收消息,直到再次到达计数上限。
对于还有要特别注意的一点是它的重载方法
global为true的时候,是该信道上所有的消费者未确认的消息数上限总和 为false的时候是针对单个消费者 比如:一个信道设置了
那么这里每个消费者最多只能收到3个未确认的消息,两个消费者能收到的未确认的消息个数之和的上限为5。 如无特殊需要,最好只使用global=false 设置,这也是默认的设置。
消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。 如果生产者发布的消息分别为 msg1 msg2 msg3 ,那么消费者必然也是按照 msg1 msg2 msg3 的顺序进行消费的。
在以下情况下消息的顺序性会被打破:
1.如果生产者使用了事务机制,在发送消息之后遇到异常进行了事务回滚,那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的,那么消息在生产者这个源头就出现了错序。 2.同样,如果启用 publisher confirm时,在发生超时、中断,又或者是收到RabbitMQ的 命令时,那么同样需要补偿发送,结果与事务机制一样会错序。或者这种说法有些牵强,我们可以固执地认为消息的顺序性保障是从存入队列之后开始的,而不是在发送的时候。 3.考虑另一种情形,如果生产者发送的消息设置了不同的超时时间,并且也设置了死信队列,整体上来说相当于一个延迟队列,那么消费者在消费这个延迟队列的时候,消息的顺序必然不会和生产者发送消息的顺序一致。 4.如果消息设置了优先级,那么消费者消费到的消息也必然不是顺序性的 5.使用/将消息拒绝,比如: 如果一个队列按照前后顺序分有msg1、msg2、msg3、msg4这4个消息,同时有ConsumerA和 ConsumerB 这两个消费者同时订阅了这个队列。队列中的消息轮询分发到各个消费者之中, ConsumerA 中的消息为 msg1和msg3,ConsumerB中的消息为msg2、msg4。ConsumerA收到消息 msg1 之后并不想处理而调用了/ 将消息拒绝,与此同时将 requeue 设置为 true,这样这条消息就可以重新存入队列中。消息 msg1之后被发送到了 ConsumerB中,此时 ConsumerB已经消费了msg2、msg4,之后再消费 msg1,这样消息顺序性也就错乱了。或者消息 msg1 又重新发往 ConsumerA中,此时 ConsumerA已经消费了msg3,那么再消费 msg1,消息顺序性也无法得到保障。
QueueingConsumer 还包含(但不仅限于)以下一些缺陷∶
一般消息中间件的消息传输保障分为三个层级。
rabbitmq支持最少一次和最多一次。 最少一次: (1)消息生产者需要开启事务机制或者 publisher confim 机制,以确保消息可以可靠地传输到RabbitMQ中。 (2)消息生产者需要配合使用 mandatory参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。 (3)消息和队列都需要进行持久化处理,以确保 RabbitMQ 服务器在遇到异常情况时不会造成消息丢失 (4)消费者在消费消息的同时需要将 autoAck 设置为 false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。
最多一次: 就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确保消息不会丢失
恰好一次: 恰好一次是RabbitMQ目前无法保障的。
免责声明:本文转载或采集自网络,版权归原作者所有。本网站刊发此文旨在传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及版权、内容等问题,请联系本网,我们将在第一时间删除。同时,本网站不对所刊发内容的准确性、真实性、完整性、及时性、原创性等进行保证,请读者仅作参考,并请自行核实相关内容。对于因使用或依赖本文内容所产生的任何直接或间接损失,本网站不承担任何责任。