Flink+Kafka中Source和Sink的使用
Flink的特点:Flink的相同代码逻辑,既可以做批处理也可以做流处理,相同的数据源可以做流处理也可以做批处理
Source读取数据
1、将普通List转成Source
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class Demo1ListSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> list = new ArrayList<>();
list.add("java");
list.add("hadoop");
list.add("flink");
list.add("spark");
/*
* list Source :有界流
*/
DataStream<String> listDS = env.fromCollection(list);
listDS.print();
env.execute();
}
}
2、文件的Source
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class Demo1FileSInk {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> linesDS = env.socketTextStream("master", 8888);
//创建file sink
FileSink<String> sink = FileSink
//指定保存数据的路径和数据格式
.forRowFormat(new Path("data/socket_path"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
//包含了至少10秒的数据量
.withRolloverInterval(Duration.ofSeconds(10))
//10秒没有新的记录滚动一个文件
.withInactivityInterval(Duration.ofSeconds(10))
//文件达到1M滚动一个新的文件
.withMaxPartSize(MemorySize.ofMebiBytes(1))
.build())
.build();
linesDS.sinkTo(sink);
env.execute();
}
}
3、Mysql的Source
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class Demo4MysqlSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Student> studentsDS = env.addSource(new MySQLSource());
studentsDS.print();
env.execute();
}
}
class MySQLSource implements SourceFunction<Student> {
@Override
public void run(SourceContext<Student> ctx) throws Exception {
//使用JDBC读取mysql中的数据
Class.forName("com.mysql.jdbc.Driver");
//创建数据库链接
Connection con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata31", "root", "123456");
//查看数据
PreparedStatement stat = con.prepareStatement("select id,name,age,gender,clazz from students");
ResultSet resultSet = stat.executeQuery();
while (resultSet.next()) {
long id = resultSet.getLong("id");
String name = resultSet.getString("name");
long age = resultSet.getLong("age");
String gender = resultSet.getString("gender");
String clazz = resultSet.getString("clazz");
//将数据发送到下游
ctx.collect(new Student(id, name, age, gender, clazz));
}
}
@Override
public void cancel() {
}
}
@Data
@AllArgsConstructor
class Student {
private Long id;
private String name;
private Long age;
private String gender;
private String clazz;
}
4、Kafka的Source
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Demo5KafkaSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//创建kafka source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("master:9092,node1:9092,node2:9092")//broker列表
.setTopics("bigdata")//指定topic
.setGroupId("Demo5KafkaSource")//指定消费者组,保证一条数据在一个组内只消费一次
.setStartingOffsets(OffsetsInitializer.earliest())//指定起始消费的位置
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
//使用kafka source 创建流(无界流)
DataStreamSource<String> kafkaDS = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
kafkaDS.print();
env.execute();
}
}
Sink存储(写)数据
1、文件的Sink
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class Demo1FileSInk {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> linesDS = env.socketTextStream("master", 8888);
//创建file sink
FileSink<String> sink = FileSink
//指定保存数据的路径和数据格式
.forRowFormat(new Path("data/socket_path"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
//包含了至少10秒的数据量
.withRolloverInterval(Duration.ofSeconds(10))
//10秒没有新的记录滚动一个文件
.withInactivityInterval(Duration.ofSeconds(10))
//文件达到1M滚动一个新的文件
.withMaxPartSize(MemorySize.ofMebiBytes(1))
.build())
.build();
linesDS.sinkTo(sink);
env.execute();
}
}
2、Mysql的Sink
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class Demo2MySQLSink {
public static void main(String[] args) throws Exception {
//1、创建flink的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> linesDS = env.socketTextStream("master", 8888);
//一行转换成多行
DataStream<String> wordsDS = linesDS
.flatMap((FlatMapFunction<String, String>) (line, out) -> {
for (String word : line.split(",")) {
//将数据发送到下游
out.collect(word);
}
}).returns(Types.STRING);
//转换成kv格式
DataStream<Tuple2<String, Integer>> kvDS = wordsDS
.map(word -> {
//返回一个二元组
return Tuple2.of(word, 1);
}).returns(Types.TUPLE(Types.STRING, Types.INT));
//按照单词进行分组
KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS
.keyBy(kv -> kv.f0);
//统计数量
DataStream<Tuple2<String, Integer>> countDS = keyByDS
.reduce((kv1, kv2) -> {
int count = kv1.f1 + kv2.f1;
return Tuple2.of(kv1.f0, count);
});
//使用自定义sink
countDS.addSink(new MySQLSink());
//3、启动flink
env.execute("wc");
}
}
//RichSinkFunction:多个open和close方法
//SinkFunction
class MySQLSink extends RichSinkFunction<Tuple2<String, Integer>> {
private Connection con;
private PreparedStatement stat;
//open方法在任务启动的是偶执行,每一个task内执行一次
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("创建数据库链接");
//使用JDBC读取mysql中的数据
Class.forName("com.mysql.jdbc.Driver");
//创建数据库链接
con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata31", "root", "123456");
//replace into :如果key存在就更新,不存在就插入,表需要有主键
stat = con.prepareStatement("replace into word_count values(?,?)");
}
//在任务取消时执行
@Override
public void close() throws Exception {
stat.close();
con.close();
}
//invoke每一条数据执行一次
@Override
public void invoke(Tuple2<String, Integer> kv, Context context) throws Exception {
String word = kv.f0;
Integer count = kv.f1;
stat.setString(1, word);
stat.setInt(2, count);
stat.execute();
}
}
3、Kafka的Sink
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Demo4KafkaSInk {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> studentDS = env.readTextFile("data/student_json");
//创建kafka sink
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("master:9092,node1:9092,node2:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("students")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//数据处理的语义
.build();
//使用kafka sink
studentDS.sinkTo(sink);
env.execute();
}
}