当前位置: 首页 > article >正文

【flink】之新版本kafka到kafka

前言: 

通过sinkTo()的优点:更简洁、类型安全,适用于使用 Flink 提供的预定义 sink 或简单的自定义 sink

准备:

 引入Flink 1.12版本即可

    <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-connector-kafka</artifactId>
         <version>3.2.0-1.19</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>1.19.1</version>
    </dependency>

创建任务:

package com.iterge.flink.job;


import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author iterge
 * @version 1.0
 * @date 2024/10/30 16:20
 * @description kafka to kafka
 */

@Slf4j
public class KafkaToKafkaDemo {

    static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    public static void main(String[] args) throws Exception {
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("it.erge.test.topic")
                .setGroupId("it.erge.test.topic.6")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        DataStreamSource<String> msg = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        //添加sink
        KafkaSink<String> build = KafkaSink.<String>builder()
                .setBootstrapServers("localhost:9092")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("it.lph.test.topic")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();
        msg.sinkTo(build);
        env.execute();
    }
}


http://www.kler.cn/a/378603.html

相关文章:

  • 【二叉树的深搜】计算布尔二叉树的值 求根节点到叶节点数字之和
  • springboot基于安卓的智启教育服务平台app
  • 在 Kubernetes 上快速安装 KubeSphere v4.1.2
  • 美特CRM mcc_login.jsp存在SQL注入漏洞
  • 金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
  • C 语言雏启:擘画代码乾坤,谛观编程奥宇之初瞰
  • 五、Go语言快速入门值条件控制
  • strcat,strncat,strstr
  • VScode调试
  • nodejs爬虫系统
  • Excel:vba实现批量插入图片批注
  • AI助力医疗:未来的医生会是机器人吗?
  • 使用opencv调用TV_L1算法提取光流
  • MySQL表的增删改查(CRUD1)
  • rk3568 适配 CAN
  • 浏览器、性能优化、前端安全重难点面试题
  • LeetCode 0685.冗余连接 II:并查集(和I有何不同分析)——详细题解(附图)
  • ReactNative Fabric渲染器和组件(5)
  • 【NLP自然语言处理】深入解析Encoder与Decoder模块:结构、作用与深度学习应用
  • 简单题:Base32 编码和解码问题| 豆包MarsCode AI刷题
  • 【多线程奇妙屋】收藏多年的线程安全问题大全笔记(下篇) { 死锁问题 },笔记一生一起走,那些日子不再有
  • STM32 第22章 常用存储器介绍
  • JavaScript 判断数据类型有哪些方法?
  • 1、DevEco Studio 鸿蒙仓颉应用创建
  • Gradient descent algorithm
  • express搭建ts(TypeScript)运行环境