应答-RocketMQ-信息沉积的综合指南 (应答器)
这篇文章,咱们聊聊如何应答RocketMQ信息沉积。
图片
1基础概念
消费者在消费的环节中,消费的速度跟不上服务端的发送速度,未处置的信息会越来越多,信息出现沉积进而会形成信息消费提早。
只管笔者经常讲:RocketMQ、Kafka具有沉积的才干,但是以下场景要求重点关注信息沉积和提早的疑问:
2消费原理
图片
客户端经常使用Push形式启动后,消费信息时,分为以下两个阶段:
经过以上客户端消费原理可以看出,信息沉积的关键瓶颈在于本地客户端的消费才干,即消费耗时和消费并发度。
想要防止和处置信息沉积疑问,必需正当的管理消费耗时和信息并发度,其中消费耗时的优先级高于消费并发度,必需先保证消费耗时的正当性,再思考消费并发度疑问。
3消费瓶颈
3.1消费耗时
影响消费耗时的消费逻辑关键分为CPU内存计算和外部I/O操作,通常状况下代码中假设没有复杂的递归和循环的话,外部计算耗时相对外部I/O操作来说简直可以疏忽。
外部I/O操作通常包含如下业务逻辑:
这类外部调用的逻辑和系统容量要求提早梳理,把握每个调用操作预期的耗时,这样才干判别消费逻辑中I/O操作的耗时能否正当。
通常消费沉积都是由于这些下游系统出现了服务意外、容量限度造成的消费耗时参与。
例如:某业务消费逻辑中要求调用下游Dubbo接口,单次消费耗时为20ms,往常信息量小未出现意外。业务侧启动大促优惠时,下游Dubbo服务未启动优化,消费单条信息的耗时参与到200ms,业务侧可以显著感遭到消费速度大幅上涨。此时,经过优化消费并行度并不能处置疑问,要求大幅提上下游Dubbo服务性能才行。
3.2消费并发度
绝大局部信息消费行为都属于IO密集型,即或许是操作数据库,或许调用RPC,这类消费行为的消费速度在于后端数据库或许外系统的吞吐量,经过参与消费并行度,可以提高总的消费吞吐量,但是并行度参与到必定水平,反而会降低。
所以,运行必要求设置正当的并行度。如下有几种修正消费并行度的方法:
4处置战略
当面对信息沉积疑问时,咱们要求明白究竟哪个环节出现疑问了,不要镇静,也不要贸然入手。
4.1确认信息的消费耗时能否正当
首先,咱们要求检查消费耗时,确认信息的消费耗时能否正当。检查消费耗时普通来讲有两种方式:
1、打印日志
publicConsumeConcurrentlyStatusconsumeMessage(List<MessageExt>msgs,ConsumeConcurrentlyContextcontext){try{for(MessageExtmessageExt:msgs){longstart=System.currentTimeMillis();//TODO业务逻辑logger.info("MessageId:"+messageExt.getMsgId()+"costTime:"+(System.currentTimeMillis()-start));}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;}catch(Exceptione){logger.error("consumeMessageerror:",e);returnConsumeConcurrentlyStatus.RECONSUME_LATER;}}
2、检查信息轨迹
图片
当确定好消费耗时后,可以依据耗时大小,采取不同的措施。
4.2检查客户端JVM的堆栈
假设消费耗时十分高,要求检查Consumer实例JVM的堆栈。
catstack.log|grepConsumeMessageThread-A10--color
经常出现的意外堆栈信息如下:
图片
图片
图片
5总结
客户端经常使用Push形式启动后,消费信息时,分为以下两个阶段:拉取信息和消费信息。
客户端消费原理可以看出,信息沉积的关键瓶颈在于本地客户端的消费才干,即消费耗时和消费并发度。
首先剖析消费耗时,而后依据耗时大小,采取不同的措施。
参考文档:
RocketMQ - 如何实现顺序消息
顺序消息的使用场景
日常项目中需要保证顺序的应用场景非常多,比如交易场景中的订单创建、支付、退款等流程,先创建订单才能支付,支付完成的订单才能退款,这需要保证先进先出。又例如数据库的BinLog消息,数据库执行新增语句、修改语句,BinLog消息得到顺序也必须保证是新增消息、修改消息。
如何发送和消费顺序消息
我们使用RocketMQ顺序消息来模拟一下订单的场景,顺序消息分为两部分:顺序发送、顺序消费。
1. 顺序发消息
上面代码模拟了按顺序依次发送创建、支付、退款消息到TopicTest中。在配置文件中指定=true,默认是异步发送,此处改为同步发送。
MessageBuilder设置Header信息头,表示这是一条顺序消息,将消息固定地发送到第0个消息队列。
2. 顺序收消息
程序运行后,可以在控制台看到日志输出,也是按照顺序打印出来的
顺序发送的技术原理
RocketMQ的顺序消息分为2种情况:局部有序和全局有序。前面的例子是局部有序场景。
RocketMQ中消息发送有三种方式:同步、异步、单项。
顺序消息发送的原理比较简单,同一类消息发送到相同的队列即可。为了保证先发送的消息先存储到消息队列,必须使用同步发送的方式,否则可能出现先发送的消息后到消息队列中,此时消息就乱序了。
RocketMQ的核心代码如下:
选择队列的过程由messageQueueSelector和hashKey在实现类SelectMessageQueueByHash中完成
在队列列表的获取过程中,由Producer从NameServer根据Topic查询Broker列表,缓存在本地内存中,以便下次从缓存中读取。
普通发送的技术原理
RocketMQ中除了顺序消息外,还支持事务消息和延迟消息,非这三种特殊的消息称为普通消息。日常开发中最常用的是普通消息,这是因为最常用的场景就是系统间的异步解耦和流量的削峰填谷,这些场景下尽量保证消息高性能收发即可。
从普通消息与顺序消息的对比来看,普通消息在发送时选择消息队列的策略不同。普通消息发送选择队列有两种机制:轮询机制和故障规避机制。默认使用轮询机制,一个Topic有多个队列,轮询选择其中一个队列。
轮询机制的原理是路由信息TopicPublishInfo中维护了一个计数器sendWhichQueue,每发送一次消息需要查询一次路由,计算器就进行“+1”,通过计数器的值index与队列的数量取模计算来实现轮询算法。
轮询算法简单好用,但是有个弊端,如果轮询选择的队列是在宕机的Broker上,会导致消息发送失败,即使消息发送重试的时候重新选择队列,也可能还是在宕机的Broker上,无法规避发送失败的情况,因此就有了故障规避机制。
顺序消费的技术原理
RocketMQ支持两种消费模式:集群消费和广播消费。两者的区别是,在广播消费模式下每条消息会被ConsumerGroup的每个Consumer消费,在集群消费模式下每条消息只会被ConsumerGroup的一个Consumer消费。
多数场景都使用集群消费,消息每次消费代表一次业务处理,集群消费表示每条消息由业务应用集群中任意一个服务实例来处理。少数场景使用广播消费,例如数据发生变化,更新业务应用集群中每个服务的本地缓存,这就需要一条消息被整个集群都消费一次,默认是集群消费。
顺序消费也叫做有序消费,原理是同一个消息队列只允许Consumer中的一个消费线程拉取消费,Consumer中有个消费线程池,多个线程会同时消费消息。在顺序消费的场景下消费线程请求到Broker时会先申请独占锁,获得锁的请求则允许消费。
消息消费成功后,会向Broker提交消费进度,更新消费位点信息,避免下次拉取到已消费的消息,顺序消费中如果消费线程在监听器中进行业务处理时抛出异常,则不会提交消费进度,消费进度会阻塞在当前这条消息,并不会继续消费该队列中的后续消息,从而保证顺序消费。
在顺序消费的场景下,特别需要注意对异常的处理,如果重试也失败,会一直阻塞在当前消息,直到超出最大重试次数,从而在很长一段时间内无法消费后续消息造成队列消息堆积。
并发消费的原理
RocketMQ支持两种消费方式:顺序消费和并发消费。并发消费是默认的消费方式,日常开发过程中最常用的方式,除了顺序消费就是并发消费。
并发消费也称为乱序消费,其原理是同一个消息队列提供给Consumer中的多个消费线程拉取消费。Consumer中会维护一个消费线程池,多个消费线程可以并发去同一个消息队列中拉取消息进行消费。如果某个消费线程在监听器中进行业务处理时抛出异常,当前线程会进行重试,不影响其它消费线程和消费队列的消费进度,消费成功的线程正常提交消费进度。
并发消费相比于顺序消费没有资源争抢上锁的过程,消费消息的速度比顺序消费要快很多。
消息的幂等性
说到消息消费不得不提到消息的幂等性,业务代码中通常收到一条消息进行一次业务逻辑处理,如果一条相同的消息被重复收到几次,是否会导致业务重复处理?Consumer能够不重复接收消息?
RocketMQ不保证消息不被重复消费,如果业务对消息重复消费非常敏感,必须要在业务层面进行幂等性处理,具体实现可以通过分布式锁来完成。
在所有消息系统中消费消息有三种模式:at-most-once(最多一次)、at-least-once(最少一次)和exactly-only-once(精确仅一次),分布式消息系统都是在三者间取平衡,前两者是可行的并且被广泛使用。
RocketMQ的事务消息
RocketMQ的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账,A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息,要和“从A银行账户扣除一万元”这个操作同时成功或者同时失败。 RocketMQ采用两阶段提交的方式实现事务消息,TransactionMQProducer处理上面情况的流程是,先发一个“准备从B银行账户增加一万元”的消息,发送成功后做从A银行账户扣除一万元的操作,根据操作结果是否成功,确定之前的“准备从B银行账户增加一万元”的消息是做commit还是rollback,具体流程如下: 1)发送方向RocketMQ发送“待确认”消息。 2)RocketMQ将收到的“待确认”消息持久化成功后,向发送方回复消息已经发送成功,此时第一阶段消息发送完成。 3)发送方开始执行本地事件逻辑。 4)发送方根据本地事件执行结果向RocketMQ发送二次确认(Commit或是Rollback)消息,RocketMQ收到Commit状态则将第一阶段消息标记为可投递,订阅方将能够收到该消息;收到Rollback状态则删除第一阶段的消息,订阅方接收不到该消息。 5)如果出现异常情况,步骤4)提交的二次确认最终未到达RocketMQ,服务器在经过固定时间段后将对“待确认”消息发起回查请求。 6)发送方收到消息回查请求后(如果发送一阶段消息的Producer不能工作,回查请求将被发送到和Producer在同一个Group里的其他Producer),通过检查对应消息的本地事件执行结果返回Commit或Roolback状态。 7)RocketMQ收到回查请求后,按照步骤4)的逻辑处理。
上面的逻辑似乎很好地实现了事务消息功能,它也是RocketMQ之前的版本实现事务消息的逻辑。 但是因为RocketMQ依赖将数据顺序写到磁盘这个特征来提高性能,步骤4)却需要更改第一阶段消息的状态,这样会造成磁盘Catch的脏页过多,降低系统的性能。所以RocketMQ在4.x的版本中将这部分功能去除。系统中的一些上层Class都还在,用户可以根据实际需求实现自己的事务功能。 客户端有三个类来支持用户实现事务消息, 第一个类是LocalTransaction-Executer,用来实例化步骤3)的逻辑,根据情况返回_MESSAGE或者 _MESSAGE状态。 第二个类是TransactionMQProducer,它的用法和DefaultMQProducer类似,要通过它启动一个Producer并发消息,但是比DefaultMQProducer多设置本地事务处理函数和回查状态函数。 第三个类是TransactionCheckListener,实现步骤5)中MQ服务器的回查请求,返回_MESSAGE或者_MESSAGE
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。 1.事务消息发送及提交: (1) 发送消息(half消息)。 (2) 服务端响应消息写入结果。 (3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。 (4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)。 2.补偿流程: (1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”。 (2) Producer收到回查消息,检查回查消息对应的本地事务的状态。 (3) 根据本地事务状态,重新Commit或者Rollback。 其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息。然后二阶段会显示执行提交或者回滚half消息(逻辑删除)。当然,为了防止二阶段操作失败,RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。 在RocketMQ中,消息在服务端的存储结构如下,每条消息都会有对应的索引信息,Consumer通 过ConsumeQueue这个二级索引来读取消息实体内容,其流程如下:
RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是RocketMQ的常用“套路”,回想一下延时消息的实现机制。RMQ_SYS_TRANS_HALF_TOPIC
在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。
RocketMQ将Op消息写入到全局一个特定的Topic中通过源码中的方法— ();这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。
在执行二阶段Commit操作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。
如果在RocketMQ事务消息的二阶段过程中失败了,例如在做Commit操作时,出现网络问题导致Commit失败,那么需要通过一定的策略使这条消息最终被Commit。RocketMQ采用了一种补偿机制,称为“回查”。Broker端对未确定状态的消息发起回查,将消息发送到对应的Producer端(同一个Group的Producer),由Producer根据消息来检查本地事务的状态,进而执行Commit或者Rollback。 Broker端通过对比Half消息和Op消息进行事务消息的回查并且推进CheckPoint(记录那些事务消息的状态是确定的)。 值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。
TxConsumer类实现
免责声明:本文转载或采集自网络,版权归原作者所有。本网站刊发此文旨在传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及版权、内容等问题,请联系本网,我们将在第一时间删除。同时,本网站不对所刊发内容的准确性、真实性、完整性、及时性、原创性等进行保证,请读者仅作参考,并请自行核实相关内容。对于因使用或依赖本文内容所产生的任何直接或间接损失,本网站不承担任何责任。