当前位置: 首页 > article >正文

Kafka 幂等性与事务

文章目录

  • 幂等性
    • 实现机制
    • 配置使用
    • 局限性
  • 事务
    • 使用场景
    • 配置使用
    • 实现机制
    • 事务过程
      • 事务初始化
      • 事务开始
      • 事务提交
      • 事务取消
      • 事务消费

幂等性

Producer 无论向 Broker 发送多少次重复的数据,Broker 端只会持久化一条,保证数据不丢失且不重复。

实现机制

通过引入ProducerID和SequenceNumber来实现Broker对于每条接收的消息都会验证PID,同时会检查SeqNumber是否比Broker维护的SeqNumber值严格+1,只有符合要求的才是合法的,其他情况都会丢弃。

  • ProducerID:Producer初始化时由Broker分配,作为每个Producer会话的唯一标识
  • SequenceNumber:Producer发送的每条消息的标识(更准确地说是每一个消息批次,即ProducerBatch),从0开始单调递增。Broker根据它来判断写入的消息是否可接受。

配置使用

Producer设置

  • enable.idempotence=true:表示使用幂等性生产者。当enable.idempotence配置为true时,acks必须配置为all。并且建议max.in.flight.requests.per.connection的值小于5。
  • acks=all

局限性

  • 只能保证 Producer 在单个会话内不丟不重 ,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
  • 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性 ,当涉及多个 Topic-Partition 时,这中间的状态无法同步。

事务

Kafka 事务基于幂等性实现,通过事务机制,Kafka 可以实现对多个 Topic 、多个 Partition 的原子性的写入,即处于同一个事务内的所有消息,最终结果是要么全部写成功,要么全部写失败。

使用场景

  • 对多个 Topic 、多个 Partition 的原子性的写入
  • Consumer-Transform-Producer模式下,将消费者提交偏移量操作和生成者一系列生成消息的操作封装成一个原子操作。避免重复消费

配置使用

Producer设置

  • transactional.id:事务id,类型为string,客户端自定义

Consumer设置

  • isolation.level:read_committed。事务隔离级别,默认为空。

实现机制

引入以下组件:

  • Transactional Coordinator‌:负责管理和协调事务。每个Kafka broker上都会运行一个Transactional Coordinator实例。
  • Transaction Log‌:这是一个内部Topic(__transaction_state),用于存储事务的元数据信息,包括事务的状态、参与的分区等。
  • Control Messages:由Transactional Coordinator‌写入topic的一种特殊消息,但对于Consumer来说不可见。是用来让Broker告知consumer拉取的消息是否已被原子性提交。
  • TransactionId:事务ID,类型为String字符串,由Producer客户端自定义。提供稳定不变的ID意义在于可以在异常后重启从断点进行恢复。
  • Epoch:单调递增的事务Id标识,可以保证具有相同TransactionId的Producer,旧的无法写入。
  • ProducerID、SequenceNumber:标记生产者、消息的唯一标识

事务过程

事务初始化

所有的事务操作都需要Transactional Coordinator‌管理和协调
1.获取Transactional Coordinator‌地址
Producer发送携带Transactionid的请求到任意一个Broker,Broker对获取到Transactionid做hashcode后对topic(__transaction_state)默认分区(50)取模,所得分区主副本所在的Broker作为TransactionalCoordinator‌
2.获取ProducerID和Epoch
Producer对TransactionalCoordinator‌发送请求,此时会分配ProducerId及Epoch,并将信息持久化。最后向Producer返回ProducerId+Epoch。之后的每次请求都会携带ProducerId和Epoch。
(__transaction_state中信息格式为key-value,key为Transactionid,value包含ProducerID、Epoch、事务和分区信息等)

事务开始

3.消息写入
Producer开始事务写入,先将本地事务状态更改为IN_TRANSACTION,然后发送消息之前,Producer会将topic-partition相关的信息发送给TransactionalCoordinator‌,由它完成持久化(更新__transaction_state)。之后Producer开始对相关topic-partition发送消息

事务提交

4.Producer触发事务提交
Producer首先发送请求给TransactionalCoordinator‌,由它更新__transaction_state将事务状态更改为PrepareCommit,之后返回成功响应给Producer。TransactionalCoordinator‌发送Control Messages(会持续重试,直到成功)给涉及此次事务的topic-partition,写入成功之后,再次更新__transaction_state,将事务状态更新为CompleteCommit。

事务取消

5.Producer或Coordinator触发事务取消
事物取消可以由Producer发起取消或者TransactionalCoordinator‌检测到事务超时而取消,此时均会更新__transaction_state更改为PrepareAbort,之后返回成功响应给Producer。TransactionalCoordinator‌发送Control Messages给涉及此次事务的topic-partition,写入成功之后,再次更新__transaction_state,将事务状态更新为CompleteAbort。

取消的事务会记录在.txnindex文件中,主要包含以下信息:currentVersion、producerId、firstOffset(当前事务的开始offset)、lastOffset(当前事务的结束offset)、lastStableOffset(存储时的LSO)

事务消费

正常消费时
读隔离级别为 read-committed, 在内部会使用存储在topic-partition中的Control messgae,来过滤掉没有提交的消息。(回滚的消息也没有删除,只是在读数据时过滤该数据)

对于Consumer-Transform-Producer下,会通过groupId算出__consumer_offsets topic中对应的partition,然后加该partition的信息也加入到Transaction Log‌中,最终在统一取消或提交。同样也会将Control message写入__consumer_offsets对应的分区。

  • 需要将enable.auto.commit设置为false
  • 使用producer.sendOffsetsToTransaction()来提交offset

在这里插入图片描述

参考
https://z.itpub.net/article/detail/F86DD78AECAC4DEC92468DEFFEB4ED0D
https://www.cnblogs.com/hongdada/p/16945086.html
学习笔记之Kafka幂等和事务_transaction.state.log.replication.factor-CSDN博客


http://www.kler.cn/a/459828.html

相关文章:

  • php 静态变量
  • 设计模式 创建型 建造者模式(Builder Pattern)与 常见技术框架应用 解析
  • Colyseus 与 HTTP API 的集成
  • 人脑处理信息的速度与效率:超越计算机的直观判断能力
  • SpringAOP之日志和身份验证
  • 【SpringBoot教程】搭建SpringBoot项目之编写pom.xml
  • VIM: Vision Mamba基于双向状态空间模型的高效视觉表示学习
  • STM32完全学习——FLASH上FATFS文件管理系统
  • OpenHarmony源码编译后烧录镜像教程,RK3566鸿蒙开发板演示
  • 本地创建了一个 Git 仓库推送到GitHub中
  • Android笔试面试题AI答之非技术问题(2)
  • OPPO手机如何正确使用金融理财计算器
  • vue3学习笔记(11)-组件通信
  • XL系列433芯片、2.4G收发芯片 通讯对码说明
  • 【LeetCode 面试经典150题】详细题解之矩阵篇
  • 人民医院网络安全规划与设计
  • 正大的资产配置理念解析
  • day-102 二进制矩阵中的最短路径
  • STM32 高级 WIFi案例1:测试AT指令
  • 数据库自增 id 过大导致前端时数据丢失
  • 活动预告 |【Part1】Microsoft Azure 在线技术公开课:数据基础知识
  • Debian12使用RKE2离线部署3master2node三主两从的k8s集群详细教程
  • 通过iptables限制docker 容器的运行端口
  • Spring Boot 项目 与 其他依赖版本兼容对应表
  • K8S网络流量路径
  • 数据库系列之分布式数据库下误删表怎么恢复?