当前位置: 首页 > article >正文

Flink和Kafka连接时的精确一次保证

Flink写入Kafka两阶段提交

端到端的 exactly-once(精准一次)

kafka -> Flink -> kafka

1)输入端

输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)

2)Flink内部

Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义

3)输出端

两阶段提交(2PC)

写入 Kafka 的过程实际上是一个两段式的提交:处理完毕得到结果,写入 Kafka 时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”

如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。

必须的配置

1)必须启用检查点

2)指定 KafkaSink 的发送级别为 DeliveryGuarantee.EXACTLY_ONCE

3)配置 Kafka 读取数据的消费者的隔离级别【默认kafka消费者隔离级别是读未提交,2PC第一阶段预提交数据也会被读到,下游消费者需要设置为读已提交

4)事务超时配置

【配置的事务超时时间 transaction.timeout.ms 默认是1小时,而Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是15 分钟。在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。因此checkpoint 间隔 < 事务超时时间 < max的15分钟

代码实战

kafka -> Flink -> kafka【Flink处理kafka来源数据再输出到kafka】

public class KafkaEOSDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 【1】、启用检查点,设置为精准一次
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 2.读取 kafka
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop102:9092")
                .setGroupId("default")
                .setTopics("topic_1")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();
        DataStreamSource<String> kafkasource = env
                .fromSource(kafkaSource,
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");

        /*
         3.写出到 Kafka
          精准一次 写入 Kafka,需要满足以下条件,【缺一不可】
          1、开启 checkpoint
          2、sink 设置保证级别为 精准一次
          3、sink 设置事务前缀
          4、sink 设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的15分钟
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口
                .setBootstrapServers("hadoop102:9092")
                // 指定序列化器:指定 Topic 名称、具体的序列化
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("ws")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // 【3.1】 精准一次,开启 2pc
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)

                // 【3.2】 精准一次,必须设置 事务的前缀
                .setTransactionalIdPrefix("li-")
                // 【3.3】 设置事务超时时间
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();
        kafkasource.sinkTo(kafkaSink);
        env.execute();
    }
}

后续读取“ws”这个 topic 的消费者,要设置事务的隔离级别为“读已提交”

public class KafkaEOSConsumer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 消费 在前面使用【两阶段提交】写入的 Topic
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop102:9092")
                .setGroupId("default")
                .setTopics("ws")
                .setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())
                // 作为 下游的消费者,要设置事务的隔离级别为 【读已提交】
                .setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
                .build();
        env
                .fromSource(kafkaSource,
                        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource")
                .print();
        env.execute();
    }
}

处理程序以及消费程序如上设置才能真正实现端到端精准一次的保证。


http://www.kler.cn/a/131943.html

相关文章:

  • 竞赛 题目:基于机器视觉opencv的手势检测 手势识别 算法 - 深度学习 卷积神经网络 opencv python
  • 如何打包成一个可安装的Android应用程序包
  • 理解 R-CNN:目标检测的一场革命
  • 优化收益与用户体验:游戏APP需接入的广告类型
  • CAPL如何对以太网报文的长度字段和校验和字段设置错误值
  • 【数据结构】快速排序算法你会写几种?
  • OpenCV快速入门:像素操作和图像变换
  • The import xxx.xxx.xxxx is never used
  • hash 哈希表
  • 【洛谷 P2678】[NOIP2015 提高组] 跳石头 题解(二分答案+循环)
  • C# String转DateTime
  • 云课五分钟-08安装Opera成功-仓库中查找对应版本
  • 分发糖果(贪心算法)
  • python pip
  • 系列一、JVM概述
  • vue将base64编码转为pdf方法
  • 学习指南:如何快速上手媒体生态一致体验开发
  • vue3使用西瓜播放器播放flv、hls、mp4视频
  • C#,数值计算——插值和外推,双线性插值(Bilin_interp)的计算方法与源程序
  • 助力水泥基建裂痕自动化巡检,基于yolov5融合ASPP开发构建多尺度融合目标检测识别系统