博客
关于我
RocketMQ 源码分析 —— Message 拉取与消费(上)
阅读量:796 次
发布时间:2023-03-22

本文共 1603 字,大约阅读时间需要 5 分钟。

RocketMQ 消费逻辑深度解析

消费逻辑概述

RocketMQ 的消费逻辑主要负责将 Broker 提供的消息从 CommitLog 中读取,并通过 ConsumeQueue 存储到消费者可读的位置。整个过程分为几个关键环节:拉取消息、消费消息、持久化消费进度等。以下将从 ConsumeQueue 的结构和存储开始,逐步展开对消费逻辑的深入理解。


ConsumeQueue 结构

ConsumeQueue 是 RocketMQ 消费逻辑的核心数据结构,主要负责存储消息的位置信息。其结构与 MappedFileQueue 和 MappedFile 有直接关系:

  • MappedFile:存储实际的消息数据,文件名格式为 fileName[n] = fileName[n-1] + mappedFileSize,默认文件大小为 6000000 字节。
  • MappedFileQueue:对 MappedFile 的管理,提供文件的封装和统一的文件大小管理。
  • ConsumeQueue:对 MappedFileQueue 的封装,主要用于存储消息的位置信息和前置空白占位(BLANK)。

ConsumeQueue 的数据存储分为两种类型:

  • MESSAGE_POSITION_INFO:存储消息的位置信息,包括 offset、消息长度、tagsCode 等。
  • BLANK:用于消息删除时填充的占位信息,表示对应位置的消息已被删除。

  • ConsumeQueue 存储

    ConsumeQueue 的存储逻辑主要涉及两个服务:ReputMessageService 和 FlushConsumeQueueService。

    ReputMessageService

    • 功能:负责将 Broker 提供的消息位置信息写入 ConsumeQueue。
    • 实现
      • 通过 doReput 方法循环读取 CommitLog 的数据。
      • 将读取到的消息生成 DispatchRequest,并调用 Broker 的接收逻辑。
      • 根据请求结果,将消息位置信息写入 ConsumeQueue 或 IndexFile。

    FlushConsumeQueueService

    • 功能:定期将 ConsumeQueue 中的消息持久化到存储检查点。
    • 实现
      • 每隔一定时间(默认为 1000ms)执行一次 flush 操作。
      • 遍历所有主题和队列,调用 MappedFile 的 flush 方法,将内容写入存储系统。

    Broker 提供的接口

    拉取消息接口

    PullMessageRequestHeader 是拉取消息请求的核心 Header,包含以下字段:

    • consumerGroup:消费者分组信息。
    • topic:消息主题。
    • queueId:消息队列编号。
    • queueOffset:队列开始位置。
    • maxMsgNums:最大拉取消息数量。
    • sysFlag:系统标识。
    • commitOffset:提交消费进度位置。
    • suspendTimeoutMillis:挂起超时时间。
    • subscription:订阅表达式。
    • subVersion:订阅版本号。

    更新消费进度接口

    Broker 提供了更新消费进度的接口,主要用于通知消费者的消费进度。消费进度的持久化存储文件为 consumerOffset.json,每次更新会备份到 consumerOffset.json.bak


    发回消息接口

    SendMessageProcessor 负责在消费失败时将消息发回 Broker。主要逻辑包括:

    • 初始化响应。
    • 检查 Broker 的写入权限。
    • 处理重试队列。
    • 计算新的队列编号和系统标识。
    • 更新消息的延迟等级。
    • 创建消息实体并添加到 Broker。

    结尾

    感谢大家对本文的阅读和收藏。如果有任何疑问或建议,欢迎在评论区交流。一起探索 RocketMQ 的精妙架构吧!

    转载地址:http://liqfk.baihongyu.com/

    你可能感兴趣的文章
    Objective-C实现卡尔曼滤波(附完整源码)
    查看>>
    Objective-C实现卡尔曼滤波(附完整源码)
    查看>>
    Objective-C实现压缩文件夹(附完整源码)
    查看>>
    Objective-C实现原型模式(附完整源码)
    查看>>
    Objective-C实现双向A*算法(附完整源码)
    查看>>
    Objective-C实现双向广度优先搜索算法(附完整源码)
    查看>>
    Objective-C实现双向循环链表(附完整源码)
    查看>>
    Objective-C实现双向链表(附完整源码)
    查看>>
    Objective-C实现双端队列算法(附完整源码)
    查看>>
    Objective-C实现双线性插值(附完整源码)
    查看>>
    Objective-C实现双重链表(附完整源码)
    查看>>
    Objective-C实现反向传播神经网络算法(附完整源码)
    查看>>
    Objective-C实现反转位算法(附完整源码)
    查看>>
    Objective-C实现反转字符串算法(附完整源码)
    查看>>
    Objective-C实现合并两棵二叉树算法(附完整源码)
    查看>>
    Objective-C实现后缀表达式(附完整源码)
    查看>>
    Objective-C实现向量叉乘(附完整源码)
    查看>>
    Objective-C实现哈希查找(附完整源码)
    查看>>
    Objective-C实现哈希表算法(附完整源码)
    查看>>
    Objective-C实现哥德巴赫猜想(附完整源码)
    查看>>