当前位置: 首页 > article >正文

【消息队列】聊一下生产者消息发送流程

消息发送流程

在这里插入图片描述
1.生产者main线程调用send发送消息,先走拦截器,然后会将消息进行序列化,然后选择对应的分区器,将消息发送到RecordAccumulator中,默认是32m
2.Sender线程会异步读取,要不数据达到batch的大小 进行数据拉取,要么数据达到linger的时间,读取数据之后通过网络进行将数据发送到Kafka集群。如果出现失败在进行重试,直到Broker返回ACK确认信息。
其中设计到的有main线程、Sender线程、拦截器、序列化器、分区器,以及一个双端内存队列。

生产者重要参数

  • bootstrap.servers 生产者连接集群所需的broker地址清单
  • key.serializer和value.serializer 指定发送消息的key和value的序列化类型。一定要写全类名。
  • buffer.memory RecordAccumulator缓冲区总大小,默认32m
  • batch.size 缓冲区一批数据最大值,默认16k
  • linger.ms 如果数据未达到batch.size ,sender等待linger.ms 之后发送数据,默认是0
  • acks 0 不需要数据落盘应答,1leader收到数据应答, -1(ALL)leader+isr队列所有阶段收到数据应答,默认值-1
  • max.in.flight.requests.per.connection 允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
  • retries:消息发送出现错误的时候,重新发送,默认是int最大值,如果有重试,还想保证消息顺序性,MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
  • retry.backoff.ms 两次重试之间的时间间隔,默认100ms
  • enable.idempotence 是否开启幂等,默认true,开启幂等
  • compression.type 压缩方式,默认值none,支持gzip,snappy等格式,主要通过压缩来提高生产者发送消息的性能,空间换时间。

http://www.kler.cn/a/9868.html

相关文章:

  • vue面试题7|[2024-11-14]
  • Webkit 滚动条样式属性
  • 用MVVM设计模式提升WPF开发体验:分层架构与绑定实例解析
  • 〔 MySQL 〕数据类型
  • git配置远程仓库的认证信息
  • 如何保护 Microsoft 网络免受中间人攻击
  • 7nm舱泊一体SoC的新玩家
  • 三大前端框架Vue, Angular, React
  • 月薪过 3w 的 软件测试工程师 都是怎么做到的?
  • Spring Security 6.0系列【16】授权篇之访问控制
  • 国网B接口调阅实时视频规范解读和代码示例分析
  • 代码随想录_二叉树_leetcode236
  • Java阶段一Day19
  • 值栈的概念和作用是什么?
  • 关于iptables封禁国外IP方法
  • 多模态模型技术综述
  • 第一章 webpack与构建发展简史
  • 【Spring】@ConfigurationProperties 注解的简单使用和介绍
  • Hive概论、架构和基本操作
  • ios逆向工具有那些
  • 2022国赛32:NFS服务
  • 【数据库】面试题合集
  • 【JS运算】分组求和/平均值(reduce函数)
  • 2023 年 MQTT 协议的 7 个技术趋势|描绘物联网的未来
  • 使用Nginx反向代理OpenAI API
  • python内置方法的使用方法及示例