当前位置: 首页 > article >正文

3. kafka事务消息

一. kafka事务提交流程

1.Transaction ID
为了实现跨分区跨跨会话的事务,需要引入一个全局唯一的Transaction ID,并将生产者获得的id和Transaction ID绑定。这样当生产者重启后可以通过正在进行的Transaction ID获取原来的id

2.Transaction Coordinator
transaction coordinator(事务协调者)是运行在每个 kafka broker 上的一个模块,是 kafka broker 进程承载的新功能之一。transaction coordinator负责分配PID和管理事务以及读写Transaction log。

3.Transaction log
transaction log 是 kafka 的一个内部 topic,transaction log 有多个分区,每个分区都有一个 leader,该 leade对应哪个 kafka broker,哪个 broker 上的 transaction coordinator 就负责对这些分区的写操作。transaction log 存储事务的最新状态和其相关元数据信息。

在这里插入图片描述

  1. 生产者向任意kafka服务器发起请求获取相应事务协调者的地址。
  2. 生产者通过指定的TID向事务协调者请求PID,若TID存在返回PID,不存在新建一个PID。每次请求TID,TID会加上一个Epoch值,防止旧的生产者因为宕机重启后重试事务,造成事务重复。
  3. 生产者将消息存储的分区信息发给事务协调者,事务协调者将分区信息持久化。
  4. 生产者向对应分区发送消息。
  5. 生产者发起提交(commit)或者回滚请求(abort),事务协调者持久化该请求,标记为准备状态。
  6. 事务协调者向分区发送提交或者回滚请求,分区执行完成后返回结果。
  7. 事务协调者将处理结果持久化。
二. 代码演示

Kafka 的事务一共有涉及以下5个API

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

下面以生产者为例演示事务消息

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaTransactionDemo {

    private final static String BOOTSTRAP_SERVERS = "192.168.47.128:9092,192.168.47.129:9092,192.168.47.130:9092";

    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 设置事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        // 初始化事务
        producer.initTransactions();
        // 开启事务
        producer.beginTransaction();
        try {
            for (int i = 0; i < 2; i++) {
                producer.send(new ProducerRecord<String, String>("test","message: hello,world"));
            }
            
            // 提交事务
            producer.commitTransaction();
        } catch (Exception e) {
            // 事务回滚
            producer.abortTransaction();
        } finally {
            producer.close();
        }
    }
}

运行结果:

# 启动一个console消费者, 当生产者事务提交时, 消息成功发送
[root@hadoop1 kafka-3.6.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.47.128:9092 --topic test
message: hello,world
message: hello,world

以上演示了消息正常提交的情况,如果发生异常呢?

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaTransactionDemo {

    private final static String BOOTSTRAP_SERVERS = "192.168.47.128:9092,192.168.47.129:9092,192.168.47.130:9092";

    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 设置事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        // 初始化事务
        producer.initTransactions();
        // 开启事务
        producer.beginTransaction();
        try {
            for (int i = 0; i < 2; i++) {
                producer.send(new ProducerRecord<String, String>("test","message: hello,world"));
            }
            // 会发生异常
            int i = 1 / 0;
            // 提交事务
            producer.commitTransaction();
        } catch (Exception e) {
            // 事务回滚
            producer.abortTransaction();
        } finally {
            producer.close();
        }
    }
}

当发生异常,事务将会回滚,终端消费者将不会收到消息

[root@hadoop1 kafka-3.6.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.47.128:9092 --topic test

http://www.kler.cn/a/535990.html

相关文章:

  • Node.js 环境配置
  • 大语言模型极速部署:Ollama 、 One-API、OpenWebUi 完美搭建教程
  • 网络工程师 (21)网络的性能
  • 第二次连接k8s平台注意事项
  • 在python中Scrapy 和 requests 哪个更适合处理动态网页?
  • 2.Mkdocs配置说明(mkdocs.yml)【最新版】
  • 分布式事务实战 ——Seata 与最终一致性方案
  • Cables Finance发布 V1.1 白皮书:开创RWA敞口新范式
  • 第二篇:前端VSCode常用快捷键-以及常用技巧
  • ORACLE 数据库的启动和关闭
  • LLM的Deep Research功能:重构人类认知与创新的新范式
  • SQL Server中RANK()函数:处理并列排名与自然跳号
  • tomcat如何配置保存7天滚动日志
  • NLP知识点
  • 7-1 什么是机器学习
  • C语言数据结构编程练习-排序算法
  • 2.1-STL库中string类的模拟实现
  • DIY Shell:探秘进程构建与命令解析的核心原理
  • 蓝桥杯小白打卡第二天
  • 【大模型LLM面试合集】大语言模型架构_Transformer架构细节
  • java高级工程师面试题_java高级工程师面试题及答案解析
  • 【原子工具】快速幂 快速乘
  • 基于Flask的商城应用系统的设计与实现
  • vscode中的编辑器、终端、输出、调试控制台(转载)
  • 大数据系统文件格式ORC与Parquet
  • 算法设计-四后问题(C++)