当前位置: 首页 > article >正文

Kafka·Producer

Producer发送原理

  1. 拦截器进行拦截
    在这里插入图片描述

  2. 对key和value进行序列化
    org.apache.kafka.clients.producer.KafkaProducer#doSend
    在这里插入图片描述

  3. 分区选择
    计算消息要发送到topic的哪个分区上
    在这里插入图片描述
    在这里插入图片描述

    • 若指定了分区,则使用指定的值
    • 没有指定的话则使用分区器计算得到
    • 或者使用hash取余的方式
  4. 暂存消息到累加器
    Producer并不会立刻发送消息到Broker中,而是把消息暂存到累加器中,当消息达到一定量后,再异步批量发送
    在这里插入图片描述
    在这里插入图片描述在这里插入图片描述
    消息被暂存到这样一个结构中,可以看到每个topic中,按照分区将消息暂存到一个Deque中
    在这里插入图片描述在这里插入图片描述

  5. acks
    acks,消息发送后的确认机制

    • acks = 0:发送即确认
    • acks = 1:消息在leader落盘后确认
    • acks = all/-1:默认值,leader等待其他follower(ISR中的follower)都同步后回复确认
  6. 重试

  7. 幂等性
    开启幂等性配置(enable.idempotence默认true)后,生产者发送消息时,除了key-value,还会发送PID和Sequence Number。

    • PID:用来标识生产者
    • Sequence Number:表示消息的序列号,每次发送消息都会+1产生新的序列号(连续)

    也就是说PID和Sequence Number会跟随key-value一并先暂存于累加器中,最后发送到Broker中,当broker应答异常,Producer会进行重试发送

    消息发送到Broker后,虽然由于某些原因造成了生产者重试发送消息,但前一条消息可能已经成功落盘到Broker了,当重试发送的消息到达broker后,会通过PID和Sequence Number来判断

    • 若本次消息的Sequence Number 小于或等于 上一次消息的Sequence Number,说明消息重复,丢弃本次消息
    • 若本次消息的Sequence Number 大于 (上一次消息Sequence Number+1),说明存在消息丢失现象,然后抛异常

    虽然是否开启幂等性是在Producer配置的,但是幂等性是在broker中解决的(在Broker中判断PID和Sequence Number)


http://www.kler.cn/a/282204.html

相关文章:

  • 【3D Slicer】的小白入门使用指南九
  • Linux下多线程
  • 直接映射4条 cacheline,每条cacheline32位数据(混乱版)
  • 在 Node.js 中解决极验验证码:使用 Puppeteer 自动化
  • PETR/PETRv2/StreamPETR论文阅读
  • ISP是什么?
  • 通过历史进行数据预测的算法
  • 云计算实训39——部署Harbor、部署Docker-compose、YAML文件
  • Win11 操作(七)声音降噪
  • 【JVM】亿级流量调优(一)
  • Git Submodule 常用命令详解
  • 前端实现截图(Vue)
  • uniapp + Vue3自定义封装弹窗组件
  • Kubectl常用命令
  • 深度强化学习算法(三)(附带MATLAB程序)
  • Python酷库之旅-第三方库Pandas(105)
  • 2012-2022年各省新质生产力匹配数字经济数据
  • 通过 pnpm 安装依赖包会发生什么
  • 【Java】Java 设计模式之工厂模式与策略模式
  • 本地Docker部署Navidrome音乐服务器与远程访问听歌详细教程
  • 力扣题/回溯/子集
  • java.time包时间类浅谈
  • C语言中的int argc, char *argv[]是什么?
  • 设计模式 10 外观模式
  • mysql集群从零开始搭建
  • Qt详解QUrl