【flink】之新版本kafka到kafka
前言:
通过sinkTo()的优点:更简洁、类型安全,适用于使用 Flink 提供的预定义 sink 或简单的自定义 sink
准备:
引入Flink 1.12版本即可
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.2.0-1.19</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.19.1</version>
</dependency>
创建任务:
package com.iterge.flink.job;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author iterge
* @version 1.0
* @date 2024/10/30 16:20
* @description kafka to kafka
*/
@Slf4j
public class KafkaToKafkaDemo {
static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
public static void main(String[] args) throws Exception {
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("it.erge.test.topic")
.setGroupId("it.erge.test.topic.6")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> msg = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
//添加sink
KafkaSink<String> build = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("it.lph.test.topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
msg.sinkTo(build);
env.execute();
}
}