当前位置: 首页 > article >正文

RocketMQ: 消息过滤,通信组件,服务发现

消息过滤


1 ) 简单消息过滤

/**
 * 订阅指定topic下tags分别等于 TagA 或 TagC 或 TagD
*/

consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
  • 如以上代码所示,简单消息过滤通过指定多个 Tag 来过滤消息,过滤的动作在服务器进行

2 ) 高级消息过滤

    1. Broker 所在的机器会启劢多个 FilterServer 过滤迕程
    1. Consumer 启劢后,会吐 FilterServer 上传一个过滤的 Java 类
    1. Consumer 从 FilterServer 拉消息,FilterServer 将请求转发给 Broker,FilterServer 从 Broker 收到消息后,按照 Consumer 上传的 Java 过滤程序做过滤,过滤完成后返回给 Consumer
  • 总结
    • 使用 CPU 资源来换取网卡流量资源
    • FilterServer 与 Broker 部署在同一台机器,数据通过本地回环通信,不走网卡
    • 一台 Broker 部署多个 FilterServer,充分利用 CPU 资源,因为单个 JVM 难以全面利用高配的物理机 CPU 资源
    • 因为过滤代码使用 Java 语言来编写,应用几乎可以做任意形式的服务器端消息过滤,例如通过 Message Header 进行过滤,甚至可以按照 Message Body 进行过滤
    • 使用 Java 语言进行作为过滤表达式是一个双刃剑,方便了应用的过滤操作,但是带来了服务器端的安全风险。需要应用来保证过滤代码安全,例如在过滤程序里尽可能不做申请大内存,创建线程等操作。避免 Broker 服务器发生资源泄漏

通信组件

  • RocketMQ 通信组件使用了 Netty-4.0.9.Final,在之上做了简单的协议封装

1 )网络协议

在这里插入图片描述

  1. 大端 4 个字节整数,等于 2、3、4 长度总和
  2. 大端 4 个字节整数,等于 3 的长度
  3. 使用 json 序列化数据
  4. 应用自定义二进制序列化数据
  • Header 格式
    {
    	"code": 0,
    	 "language": "JAVA",
    	 "version": 0,
    	 "opaque": 0,
    	 "flag": 1,
    	 "remark": "hello, I am respponse /127.0.0.1:27603",
    	 "extFields": {
    		 "count": "0",
    		 "messageTitle": "HelloMessageTitle"
    	 }
    }
    
Header 字段名类型RequestResponse
code整数请求操作代码,请求接收方
根据不同的代码做不同的操作
应答结果代码,0 表示成功,
非 0 表示各种错误代码
language字符串请求发起方实现语言,默认JAVA应答接收方实现语言
version整数请求发起方程序版本应答接收方程序版本
opaque整数请求发起方在同一连接上不同的请求标识代码,多线程连接复用使用应答方不做修改,直接返回
flag整数通信局的标志位通信局的标志位
remark字符串传输自定义文本信息错误详细描述信息
extFieldsHashMap<String,String>请求自定义字段应答自定义字段

2 )心跳处理

  • 通信组件本身不处理心跳,由上局进行心跳处理

3 )连接复用

  • 同一个网络连接,客户端多个线程可以同时发送请求,应答响应通过 header 中的 opaque 字段来标识

4 )超时连接

  • 如果某个连接超过特定时间没有活动(无读写事件),则自动关闭此连接
  • 并通知上层业务,清除连接对应的注册信息

RocketMQ 服务发现(Name Server)

  • Name Server 是专为 RocketMQ 设计的轻量级名称服务,代码小于 1000 行
  • 具有简单、可集群横向扩展、无状态等特点
  • 将要支持的主备自动切换功能会强依赖 Name Server

http://www.kler.cn/a/412977.html

相关文章:

  • 带有悬浮窗功能的Android应用
  • 从零开始:NetBox 4.1 Docker 部署和升级
  • LangGraph中的State管理
  • CSDN 博客自动发布脚本(Python 含自动登录、定时发布)
  • 数据库(总结自小林coding)|索引失效的场景、慢查询、原因及如何优化?undo log、redo log、binlog 作用、MySQL和Redis的区别
  • 【Python爬虫实战】深入解析 Scrapy:从阻塞与非阻塞到高效爬取的实战指南
  • 探索Python WebSocket新境界:picows库揭秘
  • 哈希表理解与底层模拟实现
  • Python的排序算法
  • 深度学习创新点不足?试试贝叶斯神经网络!
  • Python中的DrissionPage详解
  • Rust eyre 错误处理实战教程
  • 针对静态交通停车诱导系统解决方案及停车开源框架实现
  • 目录遍历漏洞-CVE-2021-41773
  • C#基础31-35
  • 极狐GitLab 17.6 正式发布几十项与 DevSecOps 相关的功能【一】
  • 『VUE』elementUI dialog的子组件created生命周期不刷新(详细图文注释)
  • 【go】查询某个依赖是否存在于这个代理
  • 【Python TensorFlow】进阶指南(续篇四)
  • 写一个流程,前面的圆点和线,第一个圆上面没有线,最后一个圆下面没有线
  • 初识java(3)
  • 深入理解 MySQL 锁机制:分类、实现与优化
  • 【AIGC】大模型面试高频考点-多模态RAG
  • 除了混合搜索,RAG 还需要哪些基础设施能力
  • 【小白学机器学习37】用numpy计算协方差cov(x,y) 和 皮尔逊相关系数 r(x,y)
  • 微信小程序蓝牙writeBLECharacteristicValue写入数据返回成功后,实际硬件内信息查询未存储?