RocketMQ原理

论坛 期权论坛 编程之家     
选择匿名的用户   2021-5-30 09:12   11   0

01 Topic 的路由机制

介绍路由注册机制之前,先简单看下 RocketMQ 的整体架构:

  • Producer:消息生产者,用于向消息服务器发送消息;
  • NameServer:路由注册中心;
  • Broker:消息存储服务器;
  • Consumer:消息消费者,该流程图中未涉及。

1. 联通性

  1. NameServer 之间互不通信,无法感知对方的存在。
  2. Producer 生产者与 NameServer 集群中的一台服务器建立长连接,并持有整个 NameServer 集群的列表。
  3. Broker 服务会与每台 NameServer 保持长连接。

2. Topic路由注册与剔除流程

  1. Broker 每30s向 NameServer 发送心跳包,心跳包中包含主题的路由信息(主题的读写队列数、操作权限等),NameServer 会通过 HashMap 更新 Topic 的路由信息,并记录最后一次收到 Broker 的时间戳。
  2. NameServer 以每10s的频率清除已宕机的 Broker,NameServer 认为 Broker 宕机的依据是如果当前系统时间戳减去最后一次收到 Broker 心跳包的时间戳大于120s。
  3. 消息生产者以每30s的频率去拉取主题的路由信息,即消息生产者并不会立即感知 Broker 服务器的新增与删除。

3. 该部分涉及到的编程技巧

  1. 基于长连接的编程模型、心跳包。
  2. 多线程编程,读写锁经典使用场景。

思考:由于消息生产者无法实时感知 Broker 服务器的宕机,那消息发送的高可用性如何保证呢?

02 消息发送高可用设计

消息发送队列负载默认采用轮询机制,消息发送时默认选择重试机制来保证消息发送的高可用。

当 Broker 宕机后,虽然消息发送者无法第一时间感知 Broker 宕机,但是当消息发送者向 Broker 发送消息返回异常后,生产者会在接下来一定时间内,例如5分钟内不会再次选择该 Broker上的队列,这样就规避了发生故障的 Broker,结合重试机制,巧妙实现消息发送的高可用。

03 消息存储文件设计

RocketMQ 存储设计主要包含 CommitLog 文件、ConsumeQueue 文件和 IndexFile 文件。

1. CommitLog 文件

消息存储文件,所有主题的消息随着到达 Broker 的顺序写入 CommitLog 文件,每个文件默认为1G,文件的命名也及其巧妙,使用该存储在消息文件中的第一个全局偏移量来命名文件,这样的设计主要是方便根据消息的物理偏移量,快速定位到消息所在的物理文件。RocketMQ CommitLog 文件使用顺序写,极大提高了文件的写性能。

2. ConsumeQueue 文件

消息消费队列文件,是 CommitLog 文件的基于 Topic 的索引文件,主要用于消费者根据 Topic消费消息,其组织方式为 /topic/queue,同一个队列中存在多个文件,ConsumeQueue 设计极具技巧性,其每个条目使用固定长度(8字节 CommitLog 物理偏移量、4字节消息长度、8字节 Tag HashCode)。

这里不是存储 tag 的原始字符串,而是存储 HashCode,目的就是确保每个条目的长度固定,可以使用访问类似数组下标的方式来快速定位条目,极大的提高了 ConsumeQueue文件的读取性能。

试想一下,消息消费者根据 Topic、消息消费进度(ConsumeQueue 逻辑偏移量),即第几个 ConsumeQueue 条目,这样根据消费进度去访问消息的方法为使用逻辑偏移量logicOffset* 20即可找到该条目的起始偏移量( ConsumeQueue 文件中的偏移量),然后读取该偏移量后20个字节即得到了一个条目,无需遍历 ConsumeQueue 文件。

3. IndexFile 文件

基于物理磁盘文件实现 Hash 索引。其文件由40字节的文件头、500W个 Hash 槽,每个 Hash 槽为4个字节,最后由2000万个 Index 条目,每个条目由20个字节构成,分别为4字节的索引key的 HashCode、8字节消息物理偏移量、4字节时间戳、4字节的前一个Index条目( Hash 冲突的链表结构)。

4. 存储文件部分的编程技巧

  1. 内存映射文件编程技巧。
  2. 内存锁定技术。
  3. 剹消息后,将消息写入本地commitlog文件中,然后向Master汇报拉取进度,并更新下一次待拉取偏移量;
  4. 然后重复第3步。

06 事务消息

RocketMQ事务消息的实现原理是类似基于二阶段提交与事务状态回查来实现的。事务消息的发送只支持同步方式,其实现的关键点包括:

A. 在应用程序端,在一个本地事务中,通过发送消息API向Broker发送Prepare状态的消息,收到消息服务器返回成功后执行事件回调函数,在事件函数的职责就是记录该消息的事务状态,通常采用消息发送本地事务表,即往本地事务表中插入一条记录,如果业务处理成功,则消息本地事务中会存在相关记录;如果本地事务执行失败而导致事务回滚,此时本地事务状态中不存在该消息的事务状态。

B.消息服务端收到Prepare的消息时,如何保证消息不会被消费端立即处理呢?原来消息服务端收到Prepare状态的消息,会先备份原消息的主题与队列,然后变更主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC,队列为0。

C. 消息服务端会开启一个专门的线程,以每60s的频率从RMQ_SYS_TRANS_OP_HALF_TOPIC中拉取一批消息,进行事务状态的回查,其实现原理是根据消息所属的消息生产者组名随机获取一个生产者,向其询问该消息对应的本地事务是否成功,如果本地事务成功(该部分是由业务提供的事务回查监听器来实现),则消息服务端执行提交动作;如果事务状态返回失败,则消息服务端执行回滚动作;如果事务状态未知,则不做处理,待下一次定时任务触发再检查。默认如果连续5次回查都无法得到确切的事务状态,则执行回滚动作。

以上只是 RocketMQ 所有核心的一部分,在文章的结尾处,我想再分享一下我学习 RocketMQ的一些心得:

  1. 通读 RocketMQ 官方文档,从全局上了解 RocketMQ。
  2. 在IDE工具中搭建 RocketMQ 调试环境,启动 NameServer、Broker 服务器,并重点关注源码的 example 包,运行一个快速入门示例。
  3. 根据功能模块进行学习,例如消息发送、消息存储、消息消费,同时注意不要发散,例如在学习消息发送相关的流程时,遇到消息存储后,可暂时不去理会消息存储相关的细节,先一笔带过,待学完消息发送后,再去重点学习其他分支,例如存储、刷盘,主从同步等。
分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:3875789
帖子:775174
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP