本文共 1603 字,大约阅读时间需要 5 分钟。
RocketMQ 消费逻辑深度解析
消费逻辑概述
RocketMQ 的消费逻辑主要负责将 Broker 提供的消息从 CommitLog 中读取,并通过 ConsumeQueue 存储到消费者可读的位置。整个过程分为几个关键环节:拉取消息、消费消息、持久化消费进度等。以下将从 ConsumeQueue 的结构和存储开始,逐步展开对消费逻辑的深入理解。
ConsumeQueue 结构
ConsumeQueue 是 RocketMQ 消费逻辑的核心数据结构,主要负责存储消息的位置信息。其结构与 MappedFileQueue 和 MappedFile 有直接关系:
- MappedFile:存储实际的消息数据,文件名格式为
fileName[n] = fileName[n-1] + mappedFileSize,默认文件大小为 6000000 字节。 - MappedFileQueue:对 MappedFile 的管理,提供文件的封装和统一的文件大小管理。
- ConsumeQueue:对 MappedFileQueue 的封装,主要用于存储消息的位置信息和前置空白占位(BLANK)。
ConsumeQueue 的数据存储分为两种类型:
MESSAGE_POSITION_INFO:存储消息的位置信息,包括 offset、消息长度、tagsCode 等。 BLANK:用于消息删除时填充的占位信息,表示对应位置的消息已被删除。
ConsumeQueue 存储
ConsumeQueue 的存储逻辑主要涉及两个服务:ReputMessageService 和 FlushConsumeQueueService。
ReputMessageService
- 功能:负责将 Broker 提供的消息位置信息写入 ConsumeQueue。
- 实现:
- 通过
doReput 方法循环读取 CommitLog 的数据。 - 将读取到的消息生成 DispatchRequest,并调用 Broker 的接收逻辑。
- 根据请求结果,将消息位置信息写入 ConsumeQueue 或 IndexFile。
FlushConsumeQueueService
- 功能:定期将 ConsumeQueue 中的消息持久化到存储检查点。
- 实现:
- 每隔一定时间(默认为 1000ms)执行一次 flush 操作。
- 遍历所有主题和队列,调用 MappedFile 的 flush 方法,将内容写入存储系统。
Broker 提供的接口
拉取消息接口
PullMessageRequestHeader 是拉取消息请求的核心 Header,包含以下字段:
- consumerGroup:消费者分组信息。
- topic:消息主题。
- queueId:消息队列编号。
- queueOffset:队列开始位置。
- maxMsgNums:最大拉取消息数量。
- sysFlag:系统标识。
- commitOffset:提交消费进度位置。
- suspendTimeoutMillis:挂起超时时间。
- subscription:订阅表达式。
- subVersion:订阅版本号。
更新消费进度接口
Broker 提供了更新消费进度的接口,主要用于通知消费者的消费进度。消费进度的持久化存储文件为 consumerOffset.json,每次更新会备份到 consumerOffset.json.bak。
发回消息接口
SendMessageProcessor 负责在消费失败时将消息发回 Broker。主要逻辑包括:
- 初始化响应。
- 检查 Broker 的写入权限。
- 处理重试队列。
- 计算新的队列编号和系统标识。
- 更新消息的延迟等级。
- 创建消息实体并添加到 Broker。
结尾
感谢大家对本文的阅读和收藏。如果有任何疑问或建议,欢迎在评论区交流。一起探索 RocketMQ 的精妙架构吧!
转载地址:http://liqfk.baihongyu.com/