1.4 RocketMQ

RocketMQ(消息队列)是一款开源的分布式消息系统,提供低时延、高可靠的消息发布与订阅服务,具有为分布式应用提供异步解耦和削峰填谷的功能,支持事务消息、顺序消息、延迟消息、批量消息、消息回溯等。关于RocketMQ的高频面试题目简述如下:RocketMQ整体架构、使用场景、消费者与消费者组之间的关系、消息发送与消费的流程、事务消息、延迟消息、顺序消息、消息过滤、Offset管理、Rebalance危害、怎么保证消息不丢失、持久化策略、底层文件存储设计、RocketMQ与Kafka的区别等。

1.4.1 RocketMQ的整体架构

面试官提问

● 消息队列的整体架构是怎样的?

RocketMQ的整体架构主要包含服务发现、生产者、Broker集群和消费者,如图1-111所示。

图1-111 RocketMQ的整体架构

● Name Server:Broker向NameServer注册路由,NameServer为生产者、消费者提供最新的路由信息。

● Broker:负责消息的持久化存储、消息的HA机制以及服务器端消息过滤等功能。一个Master Broker可以有多个Slave Broker,一个Slave Broker只能有一个Master Broker。Broker启动后将自己注册到Name Server,定期向Name Server上报Topic路由信息。

● Producer:生产者与Name Server集群中的某个节点(随机)建立长连接,定期从Name Server读取Topic路由信息,并与提供Topic服务的Master Broker建立长连接,且定时向Master Broker发送心跳。

● Consumer:消费者与Name Server集群中的某个节点(随机)建立长连接,定期从Name Server拉取Topic路由信息,并与提供Topic服务的Master Broker、Slave Broker建立长连接,且定时向Master Broker、Slave Broker发送心跳。Consumer既可以从Master Broker订阅消息,也可以从Slave Broker订阅消息。

1.4.2 消息队列典型的适用场景

面试官提问

● 请说明一下消息队列的应用场景。

消息队列典型的应用场景主要有以下两种:

1.系统解耦

场景说明:用户下单后,订单系统通知库存完成后续操作,如图1-112所示。

图1-112 消息队列用于系统解耦

订单系统:用户下单后,订单系统完成内部处理流程,将消息投递至MQ,返回用户下单成功。

库存系统:订阅MQ的消息,进行库存扣减等操作。即使在用户下单时库存系统宕机,也不影响用户完成下单流程,实现订单系统与库存系统的解耦。

2.削峰填谷

场景说明:秒杀活动,面对用户高并发同步请求,应用存在宕机风险,如图1-113所示。

图1-113 消息队列用于削峰填谷

秒杀业务系统收到用户请求后,首先将请求写入消息队列,然后根据自己的处理能力拉取消息进行处理。

1.4.3 消费者、消费者组、队列之间的关系

面试官提问

● 就消费者的消费模式来讲,什么是集群消费与广播消费?

● 消费者与消费者组的关系是什么?

当使用集群消费模式时,RocketMQ的一条消息只被集群内的任意一个消费者处理。

● 一个队列同一时间只允许被费者组下的某一个消费者消费。

● 消费者组下的某个消费者,可以同时消费同一个Topic下不同队列的消息。

● 不同消费者组下的消费者,可以同时消费同一个Topic下相同队列的消息。

当使用广播消费模式时,消息队列会将每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。

1.4.4 RocketMQ消息发送与消费流程

面试官提问

● 说明消息队列发送与消费一条消息的全流程。

● 说明消息队列底层设计。

● 消息是顺序写入的吗?

● 消息的消费是否存在随机读?

消息发送与消费流程如下:首先生产者发送的消息被顺序写入CommitLog,如图1-114所示。此处在写CommitLog时使用PageCache会非常高效。

图1-114 生产者顺序写入CommitLog

然后,CommitLog日志消息被异步转发到对应的逻辑队列(ConsumeQueue),如图1-115所示。

图1-115 消息从CommitLog转发到ConsumeQueue

ConsumeQueue与Paratiton的概念相对应,并且转发至ConsumeQueue中的消息也是顺序写入的,消费者可以从ConsumeQueue中拉取消息进行消费,如图1-116所示。从旧到新顺序拉取消息,这里也可以使用PageCache。

图1-116 消费者消费ConsumeQueue中的消息

ConsumeQueue消息结构包含三个部分:消息在CommitLog中的物理位置偏移量offset、消息实体内容的大小和Message Tag的hash值。我们可以根据offset定位消息在CommitLog文件中的具体位置,该过程是随机读,如图1-117所示。

图1-117 根据ConsumeQueue中的物理位置偏移量和消息实体大小随机读取CommitLog中的具体消息

不同队列共享同一个CommitLog,回读CommitLog的过程虽然是随机读,但是如果不同队列消息写入量和消费速度大致相同,那么不同消费者随机读取的CommitLog物理位置较为集中,该区间极可能还在PageCache的范围内。此场景下同样可以充分利用PageCache,如图1-118所示。

图1-118 CommitLog随机读,依然可以使用PageCache高效读

1.4.5 消息刷盘策略

面试官提问

● 消息队列刷盘策略是什么?

● 怎么保证Broker端的消息不丢失?

消息刷盘(持久化)策略有同步或异步刷盘;为了保证消息在Broker端不丢失,有同步双写或者异步复制的策略将消息存储到从节点。

1.刷盘策略分为同步和异步刷盘

消息队列刷盘策略如图1-119所示。

图1-119 消息队列刷盘策略

● 同步刷盘:等待消息持久化在硬盘上, Broker端才同步返回给Producer ACK。写入性能差但可靠性高。

● 异步刷盘:消息写入PageCache即将成功的ACK返回给Producer端。异步线程刷盘降低了读写延迟,提高了吞吐量,但会出现少量消息丢失的情况。

2.同步双写、异步复制

● 多Master多Slave模式,同步双写,主备都写成功才向Producer返回成功。同步双写避免单点问题,但影响写入性能,适合对消息可靠性要求极高的场合,比如支付业务。

● 多Master多Slave模式,异步复制,只要Master写成功即可向Producer返回成功。写入性能好,但一旦发现Master宕机、磁盘损坏的情况,就会丢失少量消息。

1.4.6 底层文件存储设计

面试官提问

● RocketMQ有哪些文件,作用分别是什么?

● RocketMQ消息存储与Kafka的区别?

RocketMQ消息的主要存储文件包括CommitLog、ConsumeQueue、IndexFile,具体结构如图1-120所示。

图1-120 消息对列底层存储设计

● CommitLog:消息持久化的物理文件,所有队列消息顺序写入同一个CommitLog文件,每个Broker上的CommitLog被当前服务器上的所有ConsumeQueue共享。

● ConsumeQueue:消息顺序写入CommitLog文件后将被异步转发到逻辑队列(ConsumeQueue)中,消费者可以消费逻辑队列中的消息。ConsumeQueue中的消息结构包含3个部分:该消息在CommitLog中的物理位置偏移量offset、消息实体内容的大小和Message Tag的hash值。

● IndexFile:Index索引文件提供了对CommitLog进行数据检索的能力,可以通过key或者时间区间来查找CommitLog中的消息。IndexFile在面试中基本不会出现,了解即可。

1.4.7 事务消息

面试官提问

● 解释一下发送事务消息的流程。

● 什么是半消息?

● 由于超时、重启等原因,Commit消息没有被Broker持久化,该场景下怎样保证事务的一致性?

RocketMQ事务消息保证数据的最终一致性,其发送事消息的流程如图1-121所示。

图1-121 RMQ事务消息发送流程

步骤01 发送消息。

步骤02 MQ Server将消息持久化后返回发送方ACK,确认消息发送成功,此时消息为半消息。

步骤03 发送方执行本地事务。

步骤04 发送方根据本地事务执行结果向MQ Server提交Commit或Rollback,若MQ Server收到Commit则将半消息标记为可投递,订阅方可消费到该消息;若MQ Server收到Rollback则删除半消息,订阅方将不会消费该消息。

步骤05 在网络中断、服务重启等特殊情况下,步骤04提交的Commit或Rollback未能到达MQ Server,经过固定时间后MQ Server将对该消息发起回查。

步骤06 发送方收到消息回查后,检查对应消息的本地事务执行的最终结果。

步骤07 发送方根据本地事务执行的最终结果再次Commit或Rollback,MQ Server仍按照步骤04对半消息进行处理。

1.4.8 延迟消息

面试官提问

● 延迟消息的实现原理是什么?发送延迟消息的一般流程是怎样的?

● 延迟消息的应用场景有哪些?

● 发送延迟消息时,ConsumeQueue消息结构中Tag字段的作用是什么?

● 延迟消息支持任意时间的延迟吗?

延迟消息是指生产者发送消息后,需要等待指定的时间才可以被消费。

延迟消息典型应用场景举例:用户下单后需要在30分钟内付款,到期前发送消息提醒用户支付。

RocketMQ延迟消息在Broker内部的处理流程如图1-122所示。

图1-122 RocketMQ延迟消息处理流程

(1)消息发送时修改Topic名称和队列信息。消息一旦由CommitLog转发到ConsumeQueue就会被立即消费,为了避免延迟消息被立即消费,发送消息时将主题的名称修改为特定Topic (SCHEDULE_TOPIC_XXXX),并根据延迟级别确定投递的队列。同时消息相关属性里保存了要投递的目标Topic和队列信息。

(2)转发消息到延迟主题的CosumeQueue中。消息写入CommitLog后会转发到CosumeQueue,计算延迟消息投递时间,投递时间=消息存储时间+延迟级别对应的时间,将它作为消息Tag的哈希值存储到CosumeQueue中。

(3)延迟服务消费SCHEDULE_TOPIC_XXXX消息。ScheduleMessageService消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。ScheduleMessageService根据延迟级别的个数启动对应数量的TimerTask,每个TimerTask负责一个延迟级别的消息消费与投递,根据Tag值判断对应队列的第一个消息是否到期,若到期则进行投递,并继续检查之后的消息,若当前消息未到期则不再检查后续消息。

(4)到期消息重新写入CommitLog。消息到期后,需要投递到目标Topic,由于第一步记录了目标Topic和队列信息,因此重新设置后直接消息存储到CommitLog即可。

(5)CommitLog消息转发至目标Topic下的CosumeQueue,被消费者消费。消息的Topic被重置后写入CommitLog,转发至CosumeQueue后会被消费者直接消费。

若要支持任意时间精度的延迟消息,则需在Broker端做消息排序,开销较大,因此RocketMQ仅支持特定级别的延迟消息。

1.4.9 如何保证消息不丢失

面试官提问

● 消息队列怎么保证消息不丢失?

● 解决消息不丢失问题,你觉得下面两种方案哪种更靠谱?方案一:完全依赖消息队列保证消息不丢失。方案二:生产者发送消息失败后重试,确保成功与消费者幂等消费,既能保证消息队列的高吞吐又能保证业务消息的不丢失。

从Producer端来说,如果消息未能持久化在Broker端,那么消息是丢失的,消息的发送需要做好失败重试(消费端需要幂等)。

从Broker端来说,保证消息不丢失需要选择合适的刷盘策略(同步刷盘、异步刷盘)、主从复制策略(同步双写或者异步复制)。

从Consumer端来说,要确保消费者拉取到的消息被成功消费。

● 如果Consumer消费消息失败,返回Broker对应失败状态,那么消息会再次被拉取并进行消费

● 如果Consumer消费消息失败,返回Broker对应失败状态时Broker宕机,那么Consumer会定时重试。

● 如果Consumer和Broker一起宕机,那么由于Consumer消费的offset是定时异步持久化的,因此Consumer和Broker重启后,消息不会丢失但可能产生重复消费。

从全局来看,保证消息不丢失可能存在重复消费的情况,需要业务方幂等重试。

1.4.10 消息过滤

面试官提问

● 消息是在服务器端还是消费端过滤?

● 消息过滤的实现方式是怎样的?

Consumer在订阅消息时除了指定Topic外,还可以指定Tag,如果一个消息有多个Tag,可以用||分隔。ConsumeQueue里面包含消息在CommitLog中的物理位置偏移量offset、消息实体内容的大小和Message Tag的hash值,消费者拉取Broker端消息时,首先会用Tag hash值与ConsumeQueue消息中Tag hash值进行比对过滤,但使用hash值无法精确地对Tag原始字符串进行过滤,所以在消费端拉取到消息后,还需要对消息的原始Tag字符串进行二次比对过滤,若比较过滤的结果不同,则丢弃该消息,不进行消费。综上所述,消息是在服务器端和消费端两侧过滤的。

1.4.11 顺序消息

面试官提问

● 消息队列如何保证消息的顺序性?

● 顺序消息消费失败会阻塞后续消息消费吗?

● 全局顺序与分区顺序怎么实现?

顺序消息分为全局顺序消息与分区顺序消息。

1.全局顺序消息

全局顺序消息如图1-123所示,对于一个指定的Topic,所有消息严格按照先入先出(FIFO)的顺序进行发布和消费,此时使用一个队列保证全局顺序会存在严重的性能瓶颈。

图1-123 全局顺序消息

2.分区顺序消息

一般场景下,不要求消息的全局顺序,例如一个订单产生了3条消息,分别是订单创建、付款、完成,消费时同一个订单要按照这个顺序消费才有意义,但是不同订单之间是可以并行消费的。分区顺序消息如图1-124所示,对于一个指定的Topic,所有消息根据sharding key进行分区(比如按照订单id)。同一个分区内的消息严格按照FIFO顺序进行发布和消费(由于是根据订单id分区的,因此同一个订单的创建、付款、完成消息会出现在同一个队列里并保持顺序性)。图1-124中的sharding key是顺序消息中用来区分不同分区的关键字段。

图1-124 分区消息

为了保证消费端的顺序性,某条消息消费失败会阻塞后续消息的消费。

1.4.12 消费者offset管理

面试官提问

● offset维护是在消费端还是Broker端?

● offset提交是异步还是同步的?

● 在消费者组增加新的消费者会引起消息重复消费吗?

● 增加新的消费者组,offset从哪里开始消费?

广播模式下,RocketMQ对消费者组下的各个Consumer实例都投递一次消息,消费者之间没有交集,所以offset在本地维护即可。

集群模式下,一条消息只会投递到消费者组下面的一个实例去消费,offset维护在Broker端,以键值对形式存储,key为消费者组+队列,value为offset。

● 如果在旧的消费者组下新增消费者(Rebalance),那么会读取Broke端存储的消费进度,由于offset是异步提交的,因此可能出现重复消费的现象。

● 如果有新的消费者组加入,而Broker端没有存储相关信息,此时将根据配置的默认策略开始消费,比如从最新的offset开始消费或消费者启动的时间戳对应的offset处开始消费等。

1.4.13 Rebalance的危害

面试官提问

● 谈谈消息队列的Rebalance机制。

● 谈谈Rebalance的危害,Rebalance为什么会引起消息的重复消费?

Rebalance是指将一个Topic下的多个队列在同一个消费者组内的多个消费者实例之间进行重新分配。比如一个Topic下有三个队列、一个消费者,为了提升消息的并行处理能力,可以新增一个消费者,使其中一个消费者处理两个队列中的消息,另一个消费者处理一个队列中的消息。

Rebalance的危害如下:

● 消费暂停:新增消费者触发Rebalance,分配给新消费者的队列消息会被原来的消费者暂停消费,直到该队列成功分配给新消费者后才能继续被消费。

● 重复消费:新消费者在消费分配给自己的队列时,需要从原来消费者已消费到的offset处继续开始消费,然而offset是消费者异步提交的,可能出现重复消费的情况。举例来说,consumer1当前消费到offset等于7的地方,但是异步提交给Broker的offset为3,若此时发生rebalance,则consumer2从offset等于3处开始消费,那么就会重复消费4条消息。

1.4.14 RocketMQ与Kafka的对比

面试官提问

● RocketMQ与Kafka的区别是什么?

● 消息队列技术选型,选Kafka还是RocketMQ?

RocketMQ与Kafka的区别主要有以下几点:

1.存储形式

Kafka每个partition对应一个文件,若Broker的partition过多,则消息顺序写入将退化为随机写,写入性能下降。

RocketMQ所有队列消息顺序写入同一个CommitLog文件,然后被转发至ConsumeQueue才被消费者消费。

2.延时消息

RocketMQ支持固定级别的延时消息,但kfaka不支持延时消息。

3.消息重复

RocketMQ仅支持At Least Once,Kafka支持At Least Once、Exactly Once

4.消息过滤

RocketMQ根据Tag hash值在Broker端进行消息过滤,消费者拉取到消息后对消息的原始Tag字符串进行二次比对过滤。Kafka不支持Broker端的消息过滤,需要在消费端自定义实现。

5.消费失败重试

RocketMQ支持失败重试。Kafka不支持消费失败重试,也没有死信队列。

6.服务发现

对于服务发现,RocketMQ使用NameServer,Kafka使用ZooKeeper。

7.高可用

RocketMQ高可用的粒度是Broker,刷盘策略支持异步/同步刷盘,HA策略支持同步双写、异步复制。Kafka高可用的粒度是分区,Producer向Broker发送消息时会根据ACK配置来确定需要等待几个副本同步了消息才相应成功。