当前位置: 首页 > article >正文

Kafka - 图解生产者消息发送流程

文章目录

  • 发送原理
    • 1. 主线程 (main thread):
    • 2. Sender 线程:
    • 3. RecordAccumulator:
  • 发送原理小结
  • 重要参数

在这里插入图片描述


发送原理

在这里插入图片描述


Kafka的Producer发送消息采用的是异步发送的方式。

在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAccumulator

  • ①main线程中创建了一个双端队列RecordAccumulator,将消息发送给RecordAccumulator。

  • ②Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。


Kafka的Producer发送消息采用了异步发送的方式,这个过程确实涉及到多个线程以及共享变量。下面详细展开说明这个过程:

1. 主线程 (main thread):

主线程是生产者应用的线程,它负责创建消息并将这些消息发送给Kafka Producer API。主要的操作包括:

  • 创建消息:主线程创建消息,将它们封装成ProducerRecord对象。ProducerRecord通常包括消息的主题(topic)、分区(partition)、键(key)和值(value)等信息。

  • 发送消息到RecordAccumulator:主线程将创建的消息发送到一个双端队列(deque)叫做RecordAccumulator。这个队列用于缓冲消息,允许Producer线程将消息异步发送到Kafka集群,而不需要等待每条消息都被立刻发送。

2. Sender 线程:

Sender线程是Kafka Producer内部的一个后台线程,它负责从RecordAccumulator中拉取消息并发送到Kafka broker。Sender线程的主要工作如下:

  • 从RecordAccumulator拉取消息:Sender线程定期轮询(poll)RecordAccumulator,检查是否有新消息需要发送。这个轮询是异步的,因此主线程不需要等待消息被发送。

  • 构建请求:当Sender线程发现有消息需要发送,它会构建一个或多个ProducerRequest,每个请求包含多个消息,以便进行有效的批量发送。

  • 发送消息到Kafka broker:Sender线程将构建的请求发送到Kafka broker,等待来自broker的响应。一旦消息被成功接收并记录在Kafka broker中,Sender线程会通知RecordAccumulator,以便它可以更新消息的状态。

3. RecordAccumulator:

RecordAccumulator是Producer内部的一个共享变量,用于暂存即将发送到Kafka broker的消息。主要功能包括:

  • 暂存消息:主线程将消息发送到RecordAccumulator中,使其在等待Sender线程处理。

  • 管理消息的状态:RecordAccumulator跟踪每条消息的发送状态,以确保消息被成功发送到Kafka broker。一旦消息被成功写入到Kafka broker的日志中,RecordAccumulator会将消息的状态标记为已发送。

  • 负责消息批量化:RecordAccumulator也有助于消息的批量发送,以减少网络开销和提高性能。

发送原理小结

总结一下,Kafka的Producer采用异步发送消息的方式,

  • 主线程负责创建和发送消息到RecordAccumulator,

  • 而Sender线程负责从RecordAccumulator中拉取消息并将其发送到Kafka broker。

  • RecordAccumulator充当缓冲区,用于管理消息的状态以及批量发送,以提高性能和降低延迟。

这个架构充分利用了多线程和异步操作,使得Producer能够高效地发送消息到Kafka集群。


重要参数

参数名称描述
bootstrap.servers生产者连接集群所需的broker地址清单。可以设置1个或者多个,中间用逗号隔开。生产者从给定的broker里查找到其他broker信息。
key.serializer, value.serializer指定发送消息的key和value的序列化类型。要写全类名。(反射获取)
buffer.memoryRecordAccumulator缓冲区总大小,默认32m。
batch.size缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader数据落盘后应答.
-1(all):生产者发送过来的数据,Leader和isr队列里面的所有节点数据都落盘后应答。默认值是-1
max.in.flight.requests.per.connection允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。
Retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是100ms。
enable.idempotence是否开启幂等性,默认true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。默认是none,不压缩。支持压缩类型:none、gzip、snappy、lz4和zstd。

在这里插入图片描述


http://www.kler.cn/a/105345.html

相关文章:

  • UE5 Blueprint发送http请求
  • MarkDown详细入门笔记
  • 软考系列(系统架构师)- 2014年系统架构师软考案例分析考点
  • server error: Preprocessor dependency “less“ not found. Did you install it?
  • Java Boolean类,Java Character类,Java Number类
  • 【Java-框架-SpringMVC】(01) SpringMVC框架的简单创建与使用,快速上手 - 简易版
  • Tomcat+nginx负载均衡和动静分离
  • 【Tailwind CSS】当页面内容过少,怎样让footer保持在屏幕底部?
  • 2023 10月最新Vmd 下载安装教程,WindowsLinux
  • 【单片机学习笔记】Windows+Vscode+STM32F4+freeRTOS+FatFs gcc环境搭建
  • yyds,Elasticsearch Template自动化管理新索引创建
  • mysql创建自定义函数报错
  • 论文-分布式-分布式计算|容错-分布式控制下的自稳定系统
  • C#实现数据导出任一Word图表的通用呈现方法及一些体会
  • Nginx学习笔记01
  • PaddleX场景实战:PP-TS在电压预测场景上的应用
  • 【计算机网络】文件传输协议FTP和SFTP
  • DBeaver连接数据库报错:Public Key Retrieval is not allowed 的解决方案
  • Elasticsearch基础篇(六):es映射和常用的字段类型
  • python---闭包