当前位置: 首页 > article >正文

RocketMQ: 客户端使用指南

客户端如何寻址

  • RocketMQ 有多种配置方式可以令客户端找到 Name Server, 然后通过 Name Server 再找到 Broker,分别如下,优先级由高到低,高优先级会覆盖低优先级

一、代码中指定 Name Server 地址

producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
或
consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");

二、Java 启劢参数中指定 Name Server 地址

  • Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876

三、环境发量挃定 Name Server 地址

export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876

四、HTTP 静态服务器寻址(默认)

  • 客户端启动后,会定时访问一个静态 HTTP 服务器,地址如下:

    • http://jmenv.tbsite.net:8080/rocketmq/nsaddr
    • 这个 URL 的返回内容如下:192.168.0.1:9876;192.168.0.2:9876
  • 客户端默认每隔 2 分钟访问一次返个 HTTP 服务器,并更新本地的 Name Server 地址

  • URL 已经在代码中写死,可通过修改/etc/hosts 文件来改发要访问的服务器

    • 例如在/etc/hosts 增加如下配置
    • 10.232.22.67 jmenv.taobao.net
  • 推荐使用 HTTP 静态服务器寻址方式,好处是客户端部署简单,且 Name Server 集群可以热升级

自定义客户端行为


1 ) 客户端 API 形式

  • DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer 都继承于 ClientConfig 类,ClientConfig 为客户端的公共配置类
  • 客户端的配置都是 get、set 形式,每个参数都可以用 spring 来配置,也可以在代码中配置,例如 namesrvAddr 这个参数可以这样配置,其他参数同理
    • producer.setNamesrvAddr("192.168.0.1:9876");

2 )客户端的公共配置

参数名默认值说明
namesrvAddrName Server 地址列表,多个 NameServer 地址用分号隔开
clientIP本机 IP客户端本机 IP 地址,某些机器会发生无法识别客户端,
IP 地址情况,需要应用在代码中强制指定
instanceNameDEFAULT客户端实例名称,客户端创建的多个 Producer、Consumer 实际是共用一个内部实例
(这个实例包含网络连接、线程资源等)
clientCallbackExecutorThreads4通信层异步回调线程数
pollNameServerInteval30000轮询 Name Server 间隔时间,单位毫秒
heartbeatBrokerInterval30000向 Broker 发送心跳间隔时间,单位毫秒
persistConsumerOffsetInterval5000持久化 Consumer 消费进度间隔时间,单位毫秒

3 )Producer 配置

参数名默认值说明
producerGroupDEFAULT_PRODUCERProducer 组名,多个 Producer 如果属于一个应用,发送同样的消息,则应该将它们归为同一组
createTopicKeyTBW102
导管$1在发送消息时,自动创建服务器不存在的 topic,需要指定 Key
defaultTopicQueueNums4在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
sendMsgTimeout10000发送消息超时时间,单位毫秒
compressMsgBodyOverHowmuch4096消息 Body 超过多大开始压缩(Consumer 收到消息会自动解压缩),单位字节
retryAnotherBrokerWhenNotStoreOKFALSE如果发送消息返回 sendResult,但是sendStatus!=SEND_OK,是否重试发送
maxMessageSize131072客户端限制的消息大小,超过报错,同时服务端也会限制
transactionCheckListener事务消息回查监听器,如果发送事务消息,必须设置
checkThreadPoolMinSize1Broker 回查 Producer 事务状态时,线程池大小
checkThreadPoolMaxSize1Broker 回查 Producer 事务状态时,线程池大小
checkRequestHoldMax2000Broker 回查 Producer 事务状态时,Producer 本地缓冲请求队列大小

4 ) PushConsumer 配置

参数名默认值说明
consumerGroupDEFAULT_CONSUMERConsumer 组名,多个 Consumer 如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
messageModelCLUSTERING消息模型,支持以下两种
1、集群消费
2、广播消费
consumeFromWhereCONSUME_FROM_LAST_OFFSETConsumer 启动后,默认从什么位置开始消费
allocateMessageQueueStrategyAllocateMessageQueueAveragelyRebalance 算法实现策略
subscription{}订阅关系
messageListener消息监听器
offsetStore消费进度存储
consumeThreadMin10消费线程池数量
consumeThreadMax20消费线程池数量
consumeConcurrentlyMaxSpan2000单队列并行消费允许的最大跨度
pullThresholdForQueue1000拉消息本地队列缓存消息最大数
pullInterval0拉消息间隔,由于是长轮询,所以为 0,但是如果应用为了流控,也可以设置大于 0 的值,单位毫秒
consumeMessageBatchMaxSize1批量消费,一次消费多少条消息
pullBatchSize32批量拉消息,一次最多拉多少条

5 ) PullConsumer 配置

参数名默认值说明
consumerGroupDEFAULT_CONSUMERConsumer 组名,多个Consumer 如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
brokerSuspendMaxTimeMillis20000长轮询,Consumer 拉消息请求在 Broker 挂起最长时间,单位毫秒
consumerTimeoutMillisWhenSuspend30000长轮询,Consumer 拉消息请求在 Broker 挂起超过指定时间,客户端认为超时,单位毫秒
consumerPullTimeoutMillis10000非长轮询,拉消息超时时间,单位毫秒
messageModelBROADCASTING消息模型,支持以下两种
1、集群消费
2、广播消费
messageQueueListener监听队列变化
offsetStore消费进度存储
registerTopics[]注册的 topic 集合
allocateMessageQueueStrategyAllocateMessageQueueAveragelyRebalance 算法实现策略

Message 数据结构


1 ) 针对 Producer

字段名默认值说明
Topicnull必填,线下环境不需要申请,线上环境需要申请后才能使用
Bodynull必填,二进制形式,序列化由应用决定,Producer 与 Consumer 要协商好序列化形式
Tagsnull选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。
目前只支持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概念
Keysnull选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后,
可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能保证 key 唯一,例如订单号,商品 Id 等
Flag0选填,完全由应用来设置,RocketMQ 不做干预
DelayTimeLevel0选填,消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费
WaitStoreMsgOKTRUE选填,表示消息是否在服务器落盘后才返回应答
  • Message 数据结构各个字段都可以通过 get、set 方式访问,例如访问 topic
    • msg.getTopic();
    • msg.setTopic("TopicTest");
    • 其他字段访问方式类似

2 ) 针对 Consumer

  • 在Producer端,使用 com.alibaba.rocketmq.common.message.Message 则个数据结构
  • 由于 Broker 会为Message 增加数据结构,所以消息到达 Consumer 后
  • 会在 Message 基础之上增加多个字段,Consumer 看到的是
  • com.alibaba.rocketmq.common.message.MessageExt 返个数据结构
  • MessageExt 继承于 Message,MessageExt

http://www.kler.cn/a/407793.html

相关文章:

  • Spring框架深度剖析:特性、安全与优化
  • C 语言复习总结记录二
  • 性能测试的宏观分析:全面提升系统表现的关键
  • Robot | 用 RDK 做一个小型机器人(更新中)
  • React (三)
  • ssm面向品牌会员的在线商城小程序
  • Canvas 前端艺术家
  • Ubuntu20.04 rk3588交叉编译opencv4.10
  • MySQL面试题补
  • DAY1 网络编程(TCP客户端服务器)
  • Spring Boot入门——Spring Boot项目的创建
  • SpringBoot 集成 html2Pdf
  • Qt不同类之间参数的传递
  • ubuntu 配置 多个 git 客户端 账户
  • Modern Effective C++:Item 6 auto推导若非己愿,使用显式类型初始化惯用法
  • PostgreSQL技术内幕19:逻辑备份工具pg_dump、pg_dumpall
  • 【AI系统】GPU 架构回顾(从2010年-2017年)
  • Linux(1)
  • SparkSQL的执行过程:从源码角度解析逻辑计划、优化计划和物理计划
  • 前端:春节倒计时的简单实现
  • 在linux下用二进制方式安装mysql8
  • 5G NR:调制与编码策略(MCS)简介
  • C# 超链接控件LinkLabel无法触发Alt快捷键
  • 智能文档处理百宝箱,文档处理的必备利器
  • Java 对象头、Mark Word、monitor与synchronized关联关系以及synchronized锁优化
  • 学习嵩山版《Java 开发手册》:编程规约 - 命名风格(P3 ~ P4)