当前位置: 首页 > article >正文

Flink Sink的使用

经过一系列Transformation转换操作后,最后一定要调用Sink操作,才会形成一个完整的DataFlow拓扑。只有调用了Sink操作,才会产生最终的计算结果,这些数据可以写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台.

flink在批处理中常见的sink

  1. print 打印
  2. writerAsText 以文本格式输出
  3. writeAsCsv 以csv格式输出
  4. writeUsingOutputFormat 以指定的格式输出
  5. writeToSocket 输出到网络端口
  6. 自定义连接器(addSink)

参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/overview/#data-sinks

1、print

打印是最简单的一个Sink,通常是用来做实验和测试时使用。如果想让一个DataStream输出打印的结果,直接可以在该DataStream调用print方法。另外,该方法还有一个重载的方法,可以传入一个字符,指定一个Sink的标识名称,如果有多个打印的Sink,用来区分到底是哪一个Sink的输出。

以下演示了print打印,以及自定义print打印。

package com.bigdata.day03;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;


public class SinkPrintDemo {

    public static void main(String[] args) throws Exception {
        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8888);
        // 打印,普通的打印
        // 6> helllo world
        //dataStreamSource.print();

        dataStreamSource.addSink(new MySink());

        // 接着手动实现该print 打印

        env.execute();
    }

    static class MySink extends RichSinkFunction<String> {

        @Override
        public void invoke(String value, Context context) throws Exception {
            // 得到一个分区号,因为要模仿print打印效果
            int partitionId = getRuntimeContext().getIndexOfThisSubtask() + 1;
            String msg = partitionId +"> " +value;
            System.out.println(msg);
        }
    }

}

 

package com.bigdata.day03;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;


public class Demo01 {
    static class MyPrint extends RichSinkFunction<String>{
        private String msg;
        public MyPrint(){

        }
        public MyPrint(String msg){
            this.msg = msg;
        }
        @Override
        public void invoke(String value, Context context) throws Exception {
            int partition = getRuntimeContext().getIndexOfThisSubtask();
            if(msg == null){
                System.out.println(partition+"> "+value);
            }else{
                System.out.println(msg+">>>:"+partition+"> "+value);
            }

        }
    }

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        DataStream<String> data = env.fromElements("hello", "world", "baotianman", "laoyan");
        //3. transformation-数据处理转换
        //4. sink-数据输出
        //data.print();
        //data.print("普通打印>>>");
        data.addSink(new MyPrint());
        data.addSink(new MyPrint("模仿:"));


        //5. execute-执行
        env.execute();
    }
}

 

下面的结果是WordCount例子中调用print Sink输出在控制台的结果,细心的读者会发现,在输出的单词和次数之前,有一个数字前缀,我这里是1~4,这个数字是该Sink所在subtask的Index + 1。有的读者运行的结果数字前缀是1~8,该数字前缀其实是与任务的并行度相关的,由于该任务是以local模式运行,默认的并行度是所在机器可用的逻辑核数即线程数,我的电脑是2核4线程的,所以subtask的Index范围是0~3,将Index + 1,显示的数字前缀就是1~4了。这里在来仔细的观察一下运行的结果发现:相同的单词输出结果的数字前缀一定相同,即经过keyBy之后,相同的单词会被shuffle到同一个subtask中,并且在同一个subtask的同一个组内进行聚合。一个subtask中是可能有零到多个组的,如果是有多个组,每一个组是相互独立的,累加的结果不会相互干扰。

sum之后的:

1> hello 3

2> world 4

汇总之前,keyBy之后

1> hello 1

1> hello 1

1> hello 1

2、writerAsText 以文本格式输出

该方法是将数据以文本格式实时的写入到指定的目录中,本质上使用的是TextOutputFormat格式写入的。每输出一个元素,在该内容后面同时追加一个换行符,最终以字符的形式写入到文件中,目录中的文件名称是该Sink所在subtask的Index + 1。该方法还有一个重载的方法,可以额外指定一个枚举类型的参数writeMode,默认是WriteMode.NO_OVERWRITE,如果指定相同输出目录下有相同的名称文件存在,就会出现异常。如果是WriteMode.OVERWRITE,会将以前的文件覆盖。

package com.bigdata.day03;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;


public class SinkTextDemo {

    public static void main(String[] args) throws Exception {
        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(2);

        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8880);
        // 写入到文件的时候,OVERWRITE 模式是重写的意思,假如以前有结果直接覆盖
        // 如果并行度为1 ,最后输出的结果是一个文件,假如并行度 > 1 最后的结果是一个文件夹,文件夹中的文件名是 分区号(任务号)

        dataStreamSource.writeAsText("F:\\BD230801\\FlinkDemo\\datas\\result", FileSystem.WriteMode.OVERWRITE);

        env.execute();
    }
}
package com.bigdata.day03;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Demo02 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        //3. transformation-数据处理转换
        //4. sink-数据输出
        //DataStreamSource<String> streamSource = env.socketTextStream("localhost", 8899);
        //streamSource.writeAsText("datas/socket", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromElements(
                Tuple2.of("篮球", 1),
                Tuple2.of("篮球", 2),
                Tuple2.of("篮球", 3),
                Tuple2.of("足球", 3),
                Tuple2.of("足球", 2),
                Tuple2.of("足球", 3)
        );
        // writeAsCsv 只能保存 tuple类型的DataStream流,因为如果不是多列的话,没必要使用什么分隔符
        streamSource.writeAsCsv("datas/csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);


        //5. execute-执行
        env.execute();
    }
}

 3、连接器Connectors

JDBC Connector

该连接器可以向JDBC 数据库写入数据

JDBC | Apache Flink

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

<!--假如你是连接低版本的,使用5.1.49-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>

案例演示:

将结果读取,写入到MySQL

package com.bigdata.day03;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.sql.PreparedStatement;
import java.sql.SQLException;

@Data
@AllArgsConstructor
@NoArgsConstructor
class Student{
    private int id;
    private String name;
    private int age;
}
public class JdbcSinkDemo {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Student> studentStream = env.fromElements(new Student(1, "jack", 54));


        JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://localhost:3306/test1")
                .withDriverName("com.mysql.jdbc.Driver")
                .withUsername("root")
                .withPassword("123456")
                .build();
        studentStream.addSink(JdbcSink.sink(
                "insert into student values(null,?,?)",
                new JdbcStatementBuilder<Student>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, Student student) throws SQLException {

                        preparedStatement.setString(1,student.getName());
                        preparedStatement.setInt(2,student.getAge());
                    }
                    // 假如是流的方式可以设置两条插入一次
                }, JdbcExecutionOptions.builder().withBatchSize(2).build(),jdbcConnectionOptions

        ));

        env.execute();
    }
}
package com.bigdata.day03;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.sql.PreparedStatement;
import java.sql.SQLException;


@Data
@AllArgsConstructor
@NoArgsConstructor
class Student{
    private int id;
    private String name;
    private int age;
}
public class Demo03 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStreamSource<Student> studentDataStreamSource = env.fromElements(
                new Student(1, "张三", 19),
                new Student(2, "lisi", 20),
                new Student(3, "wangwu", 19)
        );

        JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://localhost:3306/kettle")
                .withDriverName("com.mysql.cj.jdbc.Driver")
                .withUsername("root")
                .withPassword("root")
                .build();

        studentDataStreamSource.addSink(JdbcSink.sink(
                "insert into student values(null,?,?)",
                new JdbcStatementBuilder<Student>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, Student student) throws SQLException {
                        preparedStatement.setString(1,student.getName());
                        preparedStatement.setInt(2,student.getAge());
                    }
                },jdbcConnectionOptions
        ));
        //2. source-加载数据
        //3. transformation-数据处理转换
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

运行结果正常:

KafkaConnector

Kafka | Apache Flink

从Kafka的topic1中消费日志数据,并做实时ETL,将状态为success的数据写入到Kafka的topic2中

 

kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic1
kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic2

使用控制台当做kafka消息的生产者向kafka中的topic1 发送消息
kafka-console-producer.sh  --bootstrap-server bigdata01:9092 --topic topic1

消费kafka中topic2中的数据
kafka-console-consumer.sh  --bootstrap-server bigdata01:9092 --topic topic2

操作:
通过黑窗口向topic1中发送消息,含有success字样的消息,会出现在topic2中。

package com.bigdata.day03;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.KafkaProducer;

import java.util.Properties;


public class KafkaSinkDemo {

    // 从topic1中获取数据,放入到topic2中,训练了读和写
    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);
        DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);
        //3. transformation-数据处理转换
        SingleOutputStreamOperator<String> filterStream = dataStreamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.contains("success");
            }
        });
        //4. sink-数据输出
        FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<String>("topic2",new SimpleStringSchema(),properties);
        filterStream.addSink(kafkaProducer);


        //5. execute-执行
        env.execute();
    }
}

Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java 或者Scala 对象。KafkaDeserializationSchema 允许用户指定这样的 schema,每条 Kafka 中的消息会调用 T deserialize(ConsumerRecord<byte[], byte[]> record) 反序列化。

为了方便使用,Flink 提供了以下几种 schemas:

SimpleStringSchema:按照字符串方式序列化、反序列化

剩余还有 TypeInformationSerializationSchema、JsonDeserializationSchema、AvroDeserializationSchema等。

自定义Sink--模拟jdbcSink的实现

jdbcSink官方已经提供过了,此处仅仅是模拟它的实现,从而学习如何自定义sink

package com.bigdata.day03;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;



public class CustomJdbcSinkDemo {

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class Student{
        private int id;
        private String name;
        private int age;
    }

    static class MyJdbcSink  extends RichSinkFunction<Student> {

        Connection conn =null;
        PreparedStatement ps = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            // 这个里面编写连接数据库的代码
            Class.forName("com.mysql.jdbc.Driver");
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1", "root", "123456");
            ps = conn.prepareStatement("INSERT INTO `student` (`id`, `name`, `age`) VALUES (null, ?, ?)");
        }

        @Override
        public void close() throws Exception {
            // 关闭数据库的代码
            ps.close();
            conn.close();
        }

        @Override
        public void invoke(Student student, Context context) throws Exception {
            // 将数据插入到数据库中
            ps.setString(1,student.getName());
            ps.setInt(2,student.getAge());
            ps.execute();

        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Student> studentStream = env.fromElements(new Student(1, "马斯克", 51));

        studentStream.addSink(new MyJdbcSink());

        env.execute();
    }
}


http://www.kler.cn/a/411205.html

相关文章:

  • Linux进程与资源管理
  • # [Unity] 【游戏开发】Unity开发基础2-Unity脚本编程基础详解
  • HttpServletRequest req和前端的关系,req.getParameter详细解释,req.getParameter和前端的关系
  • Selenium 包介绍
  • 香港大带宽服务器:助力高效网络应用
  • Python和C++急性损伤空间结构
  • 【贪心算法第四弹——376.摆动序列】
  • VisionPro 机器视觉案例 之 凹点检测
  • JAVA面向对象核心部分
  • C++设计模式之组合模式实践原则
  • 在 Mac(ARM 架构)上安装 JDK 8 环境
  • React 第八节组件生命周期钩子-类式组件,函数式组件模拟生命周期用法
  • 2024小迪安全基础入门第七课
  • 【实用技能】使用 DHTMLX Diagram让复杂流程可视化
  • C++11特性(详解)
  • SQL on Hadoop
  • 文心一言与千帆大模型平台的区别:探索百度AI生态的双子星
  • 网络安全:关于SecOC及测试开发实践简介
  • 华硕笔记本电脑用U盘重装windows系统
  • 自动化立体仓库堆垛机货叉故障处理
  • Faster R-CNN (目标检测)
  • Perforce SAST专家详解:自动驾驶汽车的安全与技术挑战,Klocwork、Helix QAC等静态代码分析成必备合规性工具
  • [自动化测试:实践01]:2:(4-1 )元素定位(selenium)在实际场景中的应用2
  • 【C#小知识】abstract、virtual、override、sealed关键字
  • Webpack前端工程化进阶系列(二) —— HMR热模块更新(图文+代码)
  • SpringBoot整合RabbitMQ应用