当前位置: 首页 > article >正文

Flink+Kafka中Source和Sink的使用

Flink的特点:Flink的相同代码逻辑,既可以做批处理也可以做流处理,相同的数据源可以做流处理也可以做批处理

Source读取数据

1、将普通List转成Source

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

public class Demo1ListSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        ArrayList<String> list = new ArrayList<>();
        list.add("java");
        list.add("hadoop");
        list.add("flink");
        list.add("spark");

        /*
         * list Source :有界流
         */
        DataStream<String> listDS = env.fromCollection(list);

        listDS.print();

        env.execute();
    }
}

2、文件的Source

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class Demo1FileSInk {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        //创建file sink
        FileSink<String> sink = FileSink
        //指定保存数据的路径和数据格式
        .forRowFormat(new Path("data/socket_path"), new SimpleStringEncoder<String>("UTF-8"))
        .withRollingPolicy(
            DefaultRollingPolicy.builder()
            //包含了至少10秒的数据量
            .withRolloverInterval(Duration.ofSeconds(10))
            //10秒没有新的记录滚动一个文件
            .withInactivityInterval(Duration.ofSeconds(10))
            //文件达到1M滚动一个新的文件
            .withMaxPartSize(MemorySize.ofMebiBytes(1))
            .build())
        .build();

        linesDS.sinkTo(sink);

        env.execute();
    }
}


3、Mysql的Source

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class Demo4MysqlSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Student> studentsDS = env.addSource(new MySQLSource());

        studentsDS.print();

        env.execute();
    }
}

class MySQLSource implements SourceFunction<Student> {
    @Override
    public void run(SourceContext<Student> ctx) throws Exception {
        //使用JDBC读取mysql中的数据
        Class.forName("com.mysql.jdbc.Driver");
        //创建数据库链接
        Connection con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata31", "root", "123456");
        //查看数据
        PreparedStatement stat = con.prepareStatement("select id,name,age,gender,clazz from students");
        ResultSet resultSet = stat.executeQuery();
        while (resultSet.next()) {
            long id = resultSet.getLong("id");
            String name = resultSet.getString("name");
            long age = resultSet.getLong("age");
            String gender = resultSet.getString("gender");
            String clazz = resultSet.getString("clazz");
            //将数据发送到下游
            ctx.collect(new Student(id, name, age, gender, clazz));
        }
    }

    @Override
    public void cancel() {
    }
}


@Data
@AllArgsConstructor
class Student {
    private Long id;
    private String name;
    private Long age;
    private String gender;
    private String clazz;
}

4、Kafka的Source

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo5KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //创建kafka source
        KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers("master:9092,node1:9092,node2:9092")//broker列表
        .setTopics("bigdata")//指定topic
        .setGroupId("Demo5KafkaSource")//指定消费者组,保证一条数据在一个组内只消费一次
        .setStartingOffsets(OffsetsInitializer.earliest())//指定起始消费的位置
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();

        //使用kafka source 创建流(无界流)
        DataStreamSource<String> kafkaDS = env
        .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        kafkaDS.print();

        env.execute();
    }
}

Sink存储(写)数据

1、文件的Sink

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class Demo1FileSInk {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        //创建file sink
        FileSink<String> sink = FileSink
        //指定保存数据的路径和数据格式
        .forRowFormat(new Path("data/socket_path"), new SimpleStringEncoder<String>("UTF-8"))
        .withRollingPolicy(
            DefaultRollingPolicy.builder()
            //包含了至少10秒的数据量
            .withRolloverInterval(Duration.ofSeconds(10))
            //10秒没有新的记录滚动一个文件
            .withInactivityInterval(Duration.ofSeconds(10))
            //文件达到1M滚动一个新的文件
            .withMaxPartSize(MemorySize.ofMebiBytes(1))
            .build())
        .build();

        linesDS.sinkTo(sink);

        env.execute();
    }
}

2、Mysql的Sink

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class Demo2MySQLSink {

    public static void main(String[] args) throws Exception {
        //1、创建flink的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        //一行转换成多行
        DataStream<String> wordsDS = linesDS
        .flatMap((FlatMapFunction<String, String>) (line, out) -> {
            for (String word : line.split(",")) {
                //将数据发送到下游
                out.collect(word);
            }
        }).returns(Types.STRING);

        //转换成kv格式
        DataStream<Tuple2<String, Integer>> kvDS = wordsDS
        .map(word -> {
            //返回一个二元组
            return Tuple2.of(word, 1);
        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        //按照单词进行分组
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS
        .keyBy(kv -> kv.f0);

        //统计数量
        DataStream<Tuple2<String, Integer>> countDS = keyByDS
        .reduce((kv1, kv2) -> {
            int count = kv1.f1 + kv2.f1;
            return Tuple2.of(kv1.f0, count);
        });

        //使用自定义sink
        countDS.addSink(new MySQLSink());

        //3、启动flink
        env.execute("wc");
    }
}

//RichSinkFunction:多个open和close方法
//SinkFunction
class MySQLSink extends RichSinkFunction<Tuple2<String, Integer>> {

    private Connection con;
    private PreparedStatement stat;

    //open方法在任务启动的是偶执行,每一个task内执行一次
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("创建数据库链接");
        //使用JDBC读取mysql中的数据
        Class.forName("com.mysql.jdbc.Driver");
        //创建数据库链接
        con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata31", "root", "123456");
        //replace into :如果key存在就更新,不存在就插入,表需要有主键
        stat = con.prepareStatement("replace into word_count values(?,?)");
    }

    //在任务取消时执行
    @Override
    public void close() throws Exception {
        stat.close();
        con.close();
    }

    //invoke每一条数据执行一次
    @Override
    public void invoke(Tuple2<String, Integer> kv, Context context) throws Exception {
        String word = kv.f0;
        Integer count = kv.f1;
        stat.setString(1, word);
        stat.setInt(2, count);
        stat.execute();
    }
}

3、Kafka的Sink

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo4KafkaSInk {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> studentDS = env.readTextFile("data/student_json");

        //创建kafka sink
        KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers("master:9092,node1:9092,node2:9092")
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                             .setTopic("students")
                             .setValueSerializationSchema(new SimpleStringSchema())
                             .build()
                            )
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//数据处理的语义
        .build();

        //使用kafka sink
        studentDS.sinkTo(sink);

        env.execute();

    }
}


http://www.kler.cn/a/393566.html

相关文章:

  • 软考 高级 架构师 第十 章软件工程3
  • 网络安全的学习与实践经验(附资料合集)
  • JavaScript性能
  • Halcon 显示异常
  • node内置模块之---path 模块
  • VBA 64位API声明语句第005讲
  • ONLYOFFICE8.2版本测评,团队协作的办公软件
  • 新160个crackme - 096-xtFusion-k1
  • 免费送源码:Java+springboot+MySQL 物流车辆管理系统的设计与实现 计算机毕业设计原创定制
  • kafka可视化管理平台-kafka-console-ui
  • vue el-date-picker 日期选择器禁用失效问题
  • 蓝队基础(详见B站泷羽sec)
  • 计算机网络之表示层
  • 【ReactPress】React + antd + NestJS + NextJS + MySQL 的简洁兼时尚的博客网站
  • FFmpeg 怎么裁剪m4a的音频,从一个时间点开始,裁剪15秒钟的视频
  • 如何在 IntelliJ IDEA 中去掉 Java 方法注释后的空行
  • CUDA error: device-side assert triggered 报错解决
  • androidstudio下载gradle慢
  • openai Realtime API (实时语音)
  • 鸿蒙版APP-图书购物商城案例
  • 2023年MathorCup数学建模A题量子计算机在信用评分卡组合优化中的应用解题全过程文档加程序
  • ip addr show
  • 建筑施工特种作业人员安全生产知识试题
  • docker 镜像索引和用法
  • c++学习:封装继承多态
  • 「QT」几何数据类 之 QVector4D 四维向量类