当前位置: 首页 > article >正文

DataStream API

DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都由以下几部分构成:

一、执行环境(Execution Environment)

Flink程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交到远程集群上运行。不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前Flink的运行环境,从而建立起与Flink框架之间的联系。

 1.1  创建执行环境

我们要获取的执行环境,是StreamExecutionEnvironment类的对象,这是所有Flink程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种。

1)getExecutionEnvironment

最简单的方式,就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

这种方式,用起来简单高效,是最常用的一种创建执行环境的方式。

2)createLocalEnvironment

这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

3)createRemoteEnvironment

这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment

   .createRemoteEnvironment(

     "host",                   // JobManager主机名

     1234,                     // JobManager进程端口号

    "path/to/jarFile.jar"  // 提交给JobManager的JAR包

);

在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

1.2 执行模式(Execution Mode)

从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API。

// 流处理环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream API执行模式包括:流执行模式、批执行模式和自动模式。

(1) 流执行模式(Streaming)

这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下程序使用的就是Streaming执行模式。

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

(2) 批执行模式(Batch)

专门用于批处理的执行模式。

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

(3) 自动模式(AutoMatic)

在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

批执行模式的使用。主要有两种方式:

(1)通过命令行配置

bin/flink run -Dexecution.runtime-mode=BATCH ...

在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。

(2)通过代码配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。

实际应用中一般不会在代码中配置而是使用命令行,这样更加灵活

1.3 触发程序执行

需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”

所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。

env.execute();

二、源算子(Source)

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。

在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:

DataStream<String> stream = env.addSource(...);

方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。

从Flink1.12开始,主要使用流批统一的新Source架构:

DataStreamSource<String> stream = env.fromSource(…)

Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

2.1 准备工作

2.2 从集合中读取数据

2.3 从文件读取数据

2.4 从Socket读取数据

2.5 从Kafka读取数据

2.6 从数据生成器读取数据

2.7 Flink支持的数据类型

三、转换算子(Transformation)

3.1 基本转换算子(map/ filter/ flatMap)

3.1.1映射(map)

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素

我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现;返回值类型还是DataStream,不过泛型(流中的元素类型)可能改变。

下面的代码用不同的方式,实现了提取WaterSensor中的id字段的功能。

 public class MapFunctionImpl implements MapFunction<WaterSensor,String> {
    @Override
    public String map(WaterSensor value) throws Exception {
        return value.getId();
    }
}
public class MapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );

        // TODO map算子: 一进一出

        // TODO 方式一: 匿名实现类
//        SingleOutputStreamOperator<String> map = sensorDS.map(new MapFunction<WaterSensor, String>() {
//            @Override
//            public String map(WaterSensor value) throws Exception {
//                return value.getId();
//            }
//        });

        // TODO 方式二: lambda表达式
//        SingleOutputStreamOperator<String> map = sensorDS.map(sensor -> sensor.getId());

        // TODO 方式三: 定义一个类来实现MapFunction
//        SingleOutputStreamOperator<String> map = sensorDS.map(new MyMapFunction());
        SingleOutputStreamOperator<String> map = sensorDS.map(new MapFunctionImpl());
        map.print();


        env.execute();
    }

    public static class MyMapFunction implements MapFunction<WaterSensor,String>{

        @Override
        public String map(WaterSensor value) throws Exception {
            return value.getId();
        }
    }
}

上面代码中,MapFunction实现类的泛型类型,与输入数据类型和输出数据的类型有关。在实现MapFunction接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还需要重写一个map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。 

运行结果:

3.1.2 过滤(filter)

filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。

进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。

案例需求:下面的代码会将数据流中传感器id为sensor_1的数据过滤出来。 

public class FilterFunctionImpl implements FilterFunction<WaterSensor> {

    public String id;

    public FilterFunctionImpl(String id) {
        this.id = id;
    }

    @Override
    public boolean filter(WaterSensor value) throws Exception {
        return this.id.equals(value.getId());
    }
}
public class FilterDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 11L, 11),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );

        // TODO filter: true保留,false过滤掉
//        SingleOutputStreamOperator<WaterSensor> filter = sensorDS.filter(new FilterFunction<WaterSensor>() {
//            @Override
//            public boolean filter(WaterSensor value) throws Exception {
//                return "s1".equals(value.getId());
//            }
//        });

        SingleOutputStreamOperator<WaterSensor> filter = sensorDS.filter(new FilterFunctionImpl("s1"));


        filter.print();


        env.execute();
    }


}

运行结果:

 

3.1.3  扁平映射(flatMap)

flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

同map一样,flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。

案例需求:如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc。

public class FlatmapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 11L, 11),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );


        /**
         * TODO flatmap: 一进多出(包含0出)
         *      对于s1的数据,一进一出
         *      对于s2的数据,一进2出
         *      对于s3的数据,一进0出(类似于过滤的效果)
         *
         *    map怎么控制一进一出:
         *      =》 使用 return
         *
         *    flatmap怎么控制的一进多出
         *      =》 通过 Collector来输出, 调用几次就输出几条
         *
         *
         */
        SingleOutputStreamOperator<String> flatmap = sensorDS.flatMap(new FlatMapFunction<WaterSensor, String>() {
            @Override
            public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
                if ("s1".equals(value.getId())) {
                    // 如果是 s1,输出 vc
                    out.collect(value.getVc().toString());
                } else if ("s2".equals(value.getId())) {
                    // 如果是 s2,分别输出ts和vc
                    out.collect(value.getTs().toString());
                    out.collect(value.getVc().toString());
                }
            }
        });

        flatmap.print();


        env.execute();
    }


}

运行结果:

3.2 聚合算子(Aggregation)

计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce中的reduce操作。

3.2.1 按键分区(keyBy)

对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。

keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。

基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。

在内部,是通过计算key的哈希值(hash code)对分区数进行取模运算来实现的。所以这里key如果是POJO的话,必须要重写hashCode()方法。

keyBy()方法需要传入一个参数,这个参数指定了一个或一组key。有很多不同的方法来指定key:比如对于Tuple数据类型,可以指定字段的位置或者多个位置的组合;对于POJO类型,可以指定字段的名称(String);另外,还可以传入Lambda表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取key的逻辑。

我们可以以id作为key做一个分区操作,代码实现如下:

public class KeybyDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);


        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 11L, 11),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );

        // 按照 id 分组
        /**
         * TODO keyby: 按照id分组
         * 要点:
         *  1、返回的是 一个 KeyedStream,键控流
         *  2、keyby不是 转换算子, 只是对数据进行重分区, 不能设置并行度
         *  3、分组 与 分区 的关系:
         *    1) keyby是对数据分组,保证 相同key的数据 在同一个分区(子任务)
         *    2) 分区: 一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
         */
        KeyedStream<WaterSensor, String> sensorKS = sensorDS
                .keyBy(new KeySelector<WaterSensor, String>() {
                    @Override
                    public String getKey(WaterSensor value) throws Exception {
                        return value.getId();
                    }
                });

        sensorKS.print();


        env.execute();
    }


}

运行结果:

需要注意的是,keyBy得到的结果将不再是DataStream,而是会将DataStream转换为KeyedStream。KeyedStream可以认为是“分区流”或者“键控流”,它是对DataStream按照key的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定key的类型。

KeyedStream也继承自DataStream,所以基于它的操作也都归属于DataStream API。但它跟之前的转换操作得到的SingleOutputStreamOperator不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如sum,reduce)。

3.2.2 简单聚合(sum/min/max/minBy/maxBy

有了按键分区的数据流KeyedStream,我们就可以基于它进行聚合操作了。Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:

  • sum():在输入流上,对指定的字段做叠加求和的操作。
  • min():在输入流上,对指定的字段求最小值。
  • max():在输入流上,对指定的字段求最大值。
  • minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。
  • maxBy():与max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。

简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称

对于元组类型的数据,可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以f0、f1、f2、…来命名的。

如果数据流的类型是POJO类,那么就只能通过字段名称来指定,不能通过位置来指定了。

public class SimpleAggregateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 11L, 11),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );




        KeyedStream<WaterSensor, String> sensorKS = sensorDS
                .keyBy(new KeySelector<WaterSensor, String>() {
                    @Override
                    public String getKey(WaterSensor value) throws Exception {
                        return value.getId();
                    }
                });

        /**
         * TODO 简单聚合算子
         *  1、 keyby之后才能调用
         *  2、 分组内的聚合:对同一个key的数据进行聚合
         */
        // 传位置索引的,适用于 Tuple类型,POJO不行
//        SingleOutputStreamOperator<WaterSensor> result = sensorKS.sum(2);
//        SingleOutputStreamOperator<WaterSensor> result = sensorKS.sum("vc");


        /**
         *   max\maxby的区别: 同min
         *       max:只会取比较字段的最大值,非比较字段保留第一次的值
         *       maxby:取比较字段的最大值,同时非比较字段 取 最大值这条数据的值
         */
//        SingleOutputStreamOperator<WaterSensor> result = sensorKS.max("vc");
//        SingleOutputStreamOperator<WaterSensor> result = sensorKS.min("vc");
        SingleOutputStreamOperator<WaterSensor> result = sensorKS.maxBy("vc");
//        SingleOutputStreamOperator<WaterSensor> result = sensorKS.minby("vc");

        result.print();


        env.execute();
    }


}

简单聚合算子返回的,同样是一个SingleOutputStreamOperator,也就是从KeyedStream又转换成了常规的DataStream。所以可以这样理解:keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个DataStream。而且经过简单聚合之后的数据流,元素的数据类型保持不变。

一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,应该只用在含有有限个key的数据流上。

运行结果:

注意:

  • max\maxby的区别: 同min
  • max:只会取比较字段的最大值,非比较字段保留第一次的值
  • maxby:取比较字段的最大值,同时非比较字段 取 最大值这条数据的值

3.2.3 归约聚合(reduce)

reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。

reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。

调用KeyedStream的reduce方法时,需要传入一个参数,实现ReduceFunction接口。接口在源码中的定义如下:

public interface ReduceFunction<T> extends Function, Serializable {

    T reduce(T value1, T value2) throws Exception;

}

ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

我们可以单独定义一个函数类实现ReduceFunction接口,也可以直接传入一个匿名类。当然,同样也可以通过传入Lambda表达式实现类似的功能

为了方便后续使用,定义一个WaterSensorMapFunction:

public class WaterSensorMapFunction implements MapFunction<String,WaterSensor> {

    @Override

    public WaterSensor map(String value) throws Exception {

        String[] datas = value.split(",");

        return new WaterSensor(datas[0],Long.valueOf(datas[1]) ,Integer.valueOf(datas[2]) );

    }

}

案例:使用reduce实现max和maxBy的功能。

public class ReduceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 11L, 11),
                new WaterSensor("s1", 21L, 21),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );


        KeyedStream<WaterSensor, String> sensorKS = sensorDS
                .keyBy(new KeySelector<WaterSensor, String>() {
                    @Override
                    public String getKey(WaterSensor value) throws Exception {
                        return value.getId();
                    }
                });

        /**
         * TODO reduce:
         * 1、keyby之后调用
         * 2、输入类型 = 输出类型,类型不能变
         * 3、每个key的第一条数据来的时候,不会执行reduce方法,存起来,直接输出
         * 4、reduce方法中的两个参数
         *     value1: 之前的计算结果,存状态
         *     value2: 现在来的数据
         */
        SingleOutputStreamOperator<WaterSensor> reduce = sensorKS.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                System.out.println("value1=" + value1);
                System.out.println("value2=" + value2);
                return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc);
            }
        });

        reduce.print();


        env.execute();
    }


}

reduce同简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以我们需要将reduce算子作用在一个有限key的流上。 

 运行结果:

3.3 用户自定义函数(UDF)

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。用户自定义函数分为:函数类、匿名函数、富函数类。

3.3.1 函数类(Function Classes)

Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。

需求:用来从用户的点击数据中筛选包含“sensor_1”的内容:

方式一:实现FilterFunction接口

public class TransFunctionUDF {



    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
                

new WaterSensor("sensor_1", 1, 1),

new WaterSensor("sensor_1", 2, 2),

new WaterSensor("sensor_2", 2, 2),

new WaterSensor("sensor_3", 3, 3)

        );
     

        DataStream<String> filter = stream.filter(new UserFilter());
   
        filter.print();

        env.execute();

    }


    public static class UserFilter implements FilterFunction<WaterSensor> {

        @Override

        public boolean filter(WaterSensor e) throws Exception {

            return e.id.equals("sensor_1");

        }

    }

}

方式二:通过匿名类来实现FilterFunction接口:

DataStream<String> stream = stream.filter(new FilterFunction< WaterSensor>() {

    @Override

    public boolean filter(WaterSensor e) throws Exception {

        return e.id.equals("sensor_1");

    }

});

方式二的优化:为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去。

DataStreamSource<WaterSensor> stream = env.fromElements(        

new WaterSensor("sensor_1", 1, 1),

new WaterSensor("sensor_1", 2, 2),

new WaterSensor("sensor_2", 2, 2),

new WaterSensor("sensor_3", 3, 3)

);


DataStream<String> stream = stream.filter(new FilterFunctionImpl("sensor_1"));

public static class FilterFunctionImpl implements FilterFunction<WaterSensor> {

    private String id;

    FilterFunctionImpl(String id) { this.id=id; }


    @Override

    public boolean filter(WaterSensor value) throws Exception {

        return thid.id.equals(value.id);

    }

}

方式三:采用匿名函数(Lambda)

public class TransFunctionUDF {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<WaterSensor> stream = env.fromElements(
             
new WaterSensor("sensor_1", 1, 1),

new WaterSensor("sensor_1", 2, 2),

new WaterSensor("sensor_2", 2, 2),

new WaterSensor("sensor_3", 3, 3)

        );    

        //map函数使用Lambda表达式,不需要进行类型声明

        SingleOutputStreamOperator<String> filter = stream.filter(sensor -> "sensor_1".equals(sensor.id));

        filter.print();

        env.execute();

    }

}

3.3.2 富函数类(Rich Function Classes)

“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。

与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

Rich Function有生命周期的概念。典型的生命周期方法有:

  • open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
  • close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。

需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。

来看一个例子说明:

public class RichFunctionExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        env

                .fromElements(1,2,3,4)

                .map(new RichMapFunction<Integer, Integer>() {

                    @Override

                    public void open(Configuration parameters) throws Exception {

                        super.open(parameters);

                        System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");

                    }


                    @Override

                    public Integer map(Integer integer) throws Exception {

                        return integer + 1;

                    }

                    @Override

                    public void close() throws Exception {

                        super.close();

                        System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");

                    }

                })

                .print();

        env.execute();

    }

}

3.4 物理分区算子(Physical Partitioning)

常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)。

public class PartitionDemo {
    public static void main(String[] args) throws Exception {
        // TODO 1. 创建执行环境
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(2);

        DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);

        // shuffle随机分区: random.nextInt(下游算子并行度)
//        socketDS.shuffle().print();

        // rebalance轮询:nextChannelToSendTo = (nextChannelToSendTo + 1) % 下游算子并行度
        // 如果是 数据源倾斜的场景, source后,调用rebalance,就可以解决 数据源的 数据倾斜
//        socketDS.rebalance().print();

        //rescale缩放: 实现轮询, 局部组队,比rebalance更高效
//        socketDS.rescale().print();


        // broadcast 广播:  发送给下游所有的子任务
//        socketDS.broadcast().print();

        // global 全局: 全部发往 第一个子任务
        // return 0;
        socketDS.global().print();

        // keyby: 按指定key去发送,相同key发往同一个子任务
        // one-to-one: Forward分区器


        // 总结: Flink提供了 7种分区器+ 1种自定义

        env.execute();
    }
}

3.4.1 随机分区(shuffle)

最简单的重分区方式就是直接“洗牌”通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。

随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。

stream.shuffle()

 

3.4.2  轮询分区(Round-Robin)

轮询,简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

stream.rebalance()

3.4.3 重缩放分区(rescale)

重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌

stream.rescale()

3.4.4  广播(broadcast)

这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

stream.broadcast()

3.4.5 全局分区(global)

全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

stream.global()

3.4.6 自定义分区(Custom)

当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。

1)自定义分区器

public class MyPartitioner implements Partitioner<String> {

    @Override

    public int partition(String key, int numPartitions) {

        return Integer.parseInt(key) % numPartitions;

    }

}

2)使用自定义分区

public class PartitionCustomDemo {

    public static void main(String[] args) throws Exception {

//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(2);

        DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);

        DataStream<String> myDS = socketDS

                .partitionCustom(

                        new MyPartitioner(),

                        value -> value);              

        myDS.print();

        env.execute();

    }

}

3.5 分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

3.5.1 简单实现

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。

案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

public class SplitByFilterDemo {
    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(1);

        DataStreamSource<String> socketDS = env.socketTextStream("hadoop1", 7777);

        /**
         * TODO 使用filter来实现分流效果
         * 缺点: 同一个数据,要被处理两遍(调用两次filter)
          */

        SingleOutputStreamOperator<String> even = socketDS.filter(value -> Integer.parseInt(value) % 2 == 0);
        SingleOutputStreamOperator<String> odd = socketDS.filter(value -> Integer.parseInt(value) % 2 == 1);


        even.print("偶数流");
        odd.print("奇数流");


        env.execute();
    }
}

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流stream复制三份然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?

运行结果:

3.5.2 使用侧输出流

关于处理函数中侧输出流的用法,只需要调用上下文ctx的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的id和类型。

代码实现:将WaterSensor按照Id类型进行分流。

public class SideOutputDemo {
    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop1", 7777)
                .map(new WaterSensorMapFunction());


        /**
         * TODO 使用侧输出流 实现分流
         * 需求: watersensor的数据,s1、s2的数据分别分开
         *
         * TODO 总结步骤:
         *    1、使用 process算子
         *    2、定义 OutputTag对象
         *    3、调用 ctx.output
         *    4、通过主流 获取 测流
         */

        /**
         * 创建OutputTag对象
         * 第一个参数: 标签名
         * 第二个参数: 放入侧输出流中的 数据的 类型,Typeinformation
         */
        OutputTag<WaterSensor> s1Tag = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
        OutputTag<WaterSensor> s2Tag = new OutputTag<>("s2", Types.POJO(WaterSensor.class));

        SingleOutputStreamOperator<WaterSensor> process = sensorDS
                .process(
                        new ProcessFunction<WaterSensor, WaterSensor>() {
                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
                                String id = value.getId();
                                if ("s1".equals(id)) {
                                    // 如果是 s1,放到侧输出流s1中
                                    /**
                                     * 上下文ctx 调用ouput,将数据放入侧输出流
                                     * 第一个参数: Tag对象
                                     * 第二个参数: 放入侧输出流中的 数据
                                     */
                                    ctx.output(s1Tag, value);
                                } else if ("s2".equals(id)) {
                                    // 如果是 s2,放到侧输出流s2中

                                    ctx.output(s2Tag, value);
                                } else {
                                    // 非s1、s2的数据,放到主流中
                                    out.collect(value);
                                }

                            }
                        }
                );
        // 从主流中,根据标签 获取 侧输出流
        SideOutputDataStream<WaterSensor> s1 = process.getSideOutput(s1Tag);
        SideOutputDataStream<WaterSensor> s2 = process.getSideOutput(s2Tag);


        // 打印主流
        process.print("主流-非s1、s2");

        //打印 侧输出流
        s1.printToErr("s1");
        s2.printToErr("s2");

        env.execute();
    }
}
  • 数据源:从 Socket 流中读取数据,并通过 WaterSensorMapFunction 转换为 WaterSensor 对象。
  • 使用侧输出流:通过 process 算子,根据 WaterSensorid 属性将数据分流到主流或不同的侧输出流(s1s2)。
  • 侧输出流标签:使用 OutputTag 为每个侧输出流指定唯一的标签。
  • 分流逻辑
    • ids1 的数据放入侧输出流 s1
    • ids2 的数据放入侧输出流 s2
    • 其他数据进入主流。
  • 输出结果:分别打印主流和两个侧输出流的内容。 

运行结果:

 

3.6 基本合流操作

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。

3.6.1 联合(Union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

在代码中,我们只要基于DataStream直接调用.union()方法,传入其他DataStream作为参数,就可以实现流的联合了;得到的依然是一个DataStream:

stream1.union(stream2, stream3, ...)

注意:union()的参数可以是多个DataStream,所以联合操作可以实现多条流的合并。

代码实现:我们可以用下面的代码做一个简单测试:

public class UnionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
        DataStreamSource<Integer> source2 = env.fromElements(11, 22, 33);
        DataStreamSource<String> source3 = env.fromElements("111", "222", "333");

        /**
         * TODO union:合并数据流
         * 1、 流的数据类型必须一致
         * 2、 一次可以合并多条流
         */
//        DataStream<Integer> union = source1.union(source2).union(source3.map(r -> Integer.valueOf(r)));
        DataStream<Integer> union = source1.union(source2, source3.map(r -> Integer.valueOf(r)));
        union.print();


        env.execute();
    }
}

运行结果: 

 

3.6.2 连接(Connect)

流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink还提供了另外一种方便的合流操作——连接(connect)

代码实现:需要分为两步:首先基于一条DataStream调用.connect()方法,传入另外一条DataStream作为参数,将两条流连接起来,得到一个ConnectedStreams;然后再调用同处理方法得到DataStream。这里可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法。

public class ConnectDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

//        DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
//        DataStreamSource<String> source2 = env.fromElements("a", "b", "c");

        SingleOutputStreamOperator<Integer> source1 = env
                .socketTextStream("hadoop1", 7777)
                .map(i -> Integer.parseInt(i));

        DataStreamSource<String> source2 = env.socketTextStream("hadoop2", 8888);


        /**
         * TODO 使用 connect 合流
         * 1、一次只能连接 2条流
         * 2、流的数据类型可以不一样
         * 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的
         */
        ConnectedStreams<Integer, String> connect = source1.connect(source2);

        SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {
            @Override
            public String map1(Integer value) throws Exception {
                return "来源于数字流:" + value.toString();
            }

            @Override
            public String map2(String value) throws Exception {
                return "来源于字母流:" + value;
            }
        });

        result.print();

        env.execute();
    }
}

 上面的代码中,ConnectedStreams有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要“一国两制”,因此调用.map()方法时传入的不再是一个简单的MapFunction,而是一个CoMapFunction,表示分别对两条流中的数据执行map操作。这个接口有三个类型参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常直白:.map1()就是对第一条流中数据的map操作,.map2()则是针对第二条流。

2)CoProcessFunction

与CoMapFunction类似,如果是调用.map()就需要传入一个CoMapFunction,需要实现map1()、map2()两个方法;而调用.process()时,传入的则是一个CoProcessFunction。它也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。

值得一提的是,ConnectedStreams也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个ConnectedStreams:

connectedStreams.keyBy(keySelector1, keySelector2);

这里传入两个参数keySelector1和keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的keyBy用法完全一致。ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起,然后针对来源的流再做各自处理,这在一些场景下非常有用。

案例需求:连接两条流,输出能根据id匹配上的数据(类似inner join效果)

public class ConnectKeybyDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
                Tuple2.of(1, "a1"),
                Tuple2.of(1, "a2"),
                Tuple2.of(2, "b"),
                Tuple2.of(3, "c")
        );
        DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(
                Tuple3.of(1, "aa1", 1),
                Tuple3.of(1, "aa2", 2),
                Tuple3.of(2, "bb", 1),
                Tuple3.of(3, "cc", 1)
        );

        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);

        // 多并行度下,需要根据 关联条件进行 keyby,才能保证 key相同的数据到一起去,才能匹配上
        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKeyby = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);

        /**
         * 实现互相匹配的效果:  两条流,,不一定谁的数据先来
         *  1、每条流,有数据来,存到一个变量中
         *      hashmap
         *      =》key=id,第一个字段值
         *      =》value=List<数据>
         *  2、每条流有数据来的时候,除了存变量中, 不知道对方是否有匹配的数据,要去另一条流存的变量中 查找是否有匹配上的
         */
        SingleOutputStreamOperator<String> process = connectKeyby.process(
                new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
                    // 每条流定义一个hashmap,用来存数据
                    Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
                    Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();


                    /**
                     * 第一条流的处理逻辑
                     * @param value 第一条流的数据
                     * @param ctx   上下文
                     * @param out   采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {
                        Integer id = value.f0;
                        // TODO 1. s1的数据来了,就存到变量中
                        if (!s1Cache.containsKey(id)) {
                            // 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
                            List<Tuple2<Integer, String>> s1Values = new ArrayList<>();
                            s1Values.add(value);
                            s1Cache.put(id, s1Values);
                        } else {
                            // 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
                            s1Cache.get(id).add(value);
                        }

                        // TODO 2.去 s2Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
                        if (s2Cache.containsKey(id)) {
                            for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
                                out.collect("s1:" + value + "<========>" + "s2:" + s2Element);
                            }
                        }

                    }

                    /**
                     * 第二条流的处理逻辑
                     * @param value 第二条流的数据
                     * @param ctx   上下文
                     * @param out   采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                        Integer id = value.f0;
                        // TODO 1. s2的数据来了,就存到变量中
                        if (!s2Cache.containsKey(id)) {
                            // 1.1 如果key不存在,说明是该key的第一条数据,初始化,put进map中
                            List<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();
                            s2Values.add(value);
                            s2Cache.put(id, s2Values);
                        } else {
                            // 1.2 key存在,不是该key的第一条数据,直接添加到 value的list中
                            s2Cache.get(id).add(value);
                        }

                        // TODO 2.去 s1Cache中查找是否有id能匹配上的,匹配上就输出,没有就不输出
                        if (s1Cache.containsKey(id)) {
                            for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
                                out.collect("s1:" + s1Element + "<========>" + "s2:" + value);
                            }
                        }
                    }
                }
        );

        process.print();

        env.execute();
    }
}
  • 使用 keyBy 对两条流按 id(即元组的第一个字段)进行分组。通过这种方式,Flink 保证了相同 id 的数据能够被发送到相同的并行任务中,确保能够进行匹配。
  • CoProcessFunction 是 Flink 提供的一个处理两个流数据的函数。processElement1 用于处理第一条流的数据,processElement2 用于处理第二条流的数据。每个流都有一个 HashMap(缓存),用于存储该流的未匹配数据。当某个流接收到数据时,会先将数据存入相应的缓存中,然后检查另一个流的缓存,若找到匹配的 id,则输出匹配结果。
  • 该代码的核心功能是连接两条流,通过 keyByCoProcessFunction 来实现相同 id 的数据匹配,类似于 SQL 的 INNER JOIN 操作。
  • 每条流的数据首先被缓存,当一条流的数据到来时,会尝试与另一条流中已缓存的数据进行匹配。如果匹配成功,则输出连接结果。

运行结果:

 

四、输出算子(Sink) 

Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

4.1 连接到外部系统

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。

Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

stream.addSink(new SinkFunction());

addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

Flink1.12开始,同样重构了Sink架构,

stream.sinkTo(…)

当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。

除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。

 除此以外,就需要用户自定义实现sink连接器了。

public class SinkCustom {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);


        sensorDS.addSink(new MySink());


        env.execute();
    }

    public static class MySink extends RichSinkFunction<String> {

        Connection conn = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            // 在这里 创建连接
            // conn = new xxxx
        }

        @Override
        public void close() throws Exception {
            super.close();
            // 做一些清理、销毁连接
        }

        /**
         * sink的核心逻辑,写出的逻辑就写在这个方法里
         * @param value
         * @param context
         * @throws Exception
         */
        @Override
        public void invoke(String value, Context context) throws Exception {
            // 写出逻辑
            // 这个方法是 来一条数据,调用一次,所以不要在这里创建 连接对象

        }
    }
}

4.2 输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

  • 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
public class SinkFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 每个目录中,都有 并行度个数的 文件在写入
        env.setParallelism(2);

        // 必须开启checkpoint,否则一直都是 .inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                Long.MAX_VALUE,
                RateLimiterStrategy.perSecond(1000),
                Types.STRING
        );

        DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

        // TODO 输出到文件系统
        FileSink<String> fieSink = FileSink
                // 输出行式存储的文件,指定路径、指定编码
                .<String>forRowFormat(new Path("你自己文件夹路径"), new SimpleStringEncoder<>("UTF-8"))
                // 输出文件的一些配置: 文件名的前缀、后缀
                .withOutputFileConfig(
                        OutputFileConfig.builder()
                                .withPartPrefix("atguigu-")
                                .withPartSuffix(".log")
                                .build()
                )
                // 按照目录分桶:如下,就是每个小时一个目录
                .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
                // 文件滚动策略:  1分钟 或 1m
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(Duration.ofMinutes(1))
                                .withMaxPartSize(new MemorySize(1024*1024))
                                .build()
                )
                .build();


        dataGen.sinkTo(fieSink);

        env.execute();
    }
}

 代码解析:

  • forRowFormat:指定了文件的输出格式,这里使用的是行式存储格式,并指定了输出目录路径和编码方式(UTF-8)。
  • withOutputFileConfig:设置文件输出的配置,包括:
    • withPartPrefix("atguigu-"):设置文件名前缀为 atguigu-
    • withPartSuffix(".log"):设置文件的后缀为 .log
  • withBucketAssigner:设置分桶策略,这里按每小时(yyyy-MM-dd HH)创建一个文件夹(目录),每个文件夹存储一个小时内生成的数据。
  • withRollingPolicy:设置文件滚动策略,文件滚动的条件是:
    • 每 1 分钟滚动一次(withRolloverInterval(Duration.ofMinutes(1)))。
    • 每个文件的最大大小为 1MB(withMaxPartSize(new MemorySize(1024 * 1024)))。如果文件达到这个大小,也会触发滚动。

成功的入本地文件夹:

这段代码通过 Flink 实现了以下功能:

  • 从自定义的数据源生成数据。
  • 配置了 Flink 的文件输出操作,将数据按小时分区、每分钟滚动并写入文件。
  • 每个输出文件的命名规则是带有前缀 atguigu- 和后缀 .log,且文件最大为 1MB。
  • 启用了精准一次(EXACTLY_ONCE)的检查点策略,保证数据的一致性。

4.3 输出到Kafka

(1)添加Kafka 连接器依赖

由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。

(2)启动Kafka集群

(3)编写输出到Kafka的示例代码

4.3.1 输出无key的record

package com.atguigu.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 */
public class SinkKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 如果是精准一次,必须开启checkpoint(后续章节介绍)
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop1", 7777);

        /**
         * Kafka Sink:
         * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
         * 1、开启checkpoint(后续介绍)
         * 2、设置事务前缀
         * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟
         */
        // Kafka Sink:
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                // 设置 Kafka 集群的地址(bootstrap servers)
                .setBootstrapServers("ip:port")

                // 设置记录序列化模式
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.<String>builder()
                                .setTopic("WJ-TEST")
                                // 设置消息的序列化方式(这里是简单的字符串序列化)
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )

                // 设置消息传输的一致性保证:精准一次(exactly once)
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)

                // 设置事务前缀,用于标识事务
                .setTransactionalIdPrefix("atguigu-")

                // 设置事务超时时间:应大于 checkpoint 间隔时间,并小于 15 分钟
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")

                // 设置用于安全连接的 SASL 配置
                .setProperty("security.protocol", "SASL_PLAINTEXT")
                .setProperty("sasl.mechanism", "SCRAM-SHA-512")
                .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"用户\" password=\"密码\";")

                // 构建 Kafka Sink 实例
                .build();


        sensorDS.sinkTo(kafkaSink);


        env.execute();
    }
}

总结: 

  • Flink 流式处理:从 Socket 中读取数据,并通过 Kafka Sink 写入 Kafka。
  • 精准一次(Exactly Once):开启了 Flink 的检查点机制,保证每条消息在 Kafka 中只会出现一次,且启用了 Kafka 事务支持。
  • 安全连接:使用 SASL 认证机制与 Kafka 集群进行安全通信。
  • 消息序列化:使用 SimpleStringSchema 对数据进行序列化,将字符串数据写入 Kafka。

成功写入Kafka:

4.3.2 自定义序列化器,实现带key的record

Key 和 Value 的含义:

  • Key:Kafka 中的 key 用于决定消息的分区(partition)。Kafka 会根据消息的 key 来将消息路由到不同的分区。这个 key 的值可以影响 Kafka 的负载均衡和数据的顺序性。如果 key 相同,消息将会发送到同一个分区。

  • Valuevalue 是 Kafka 消息的实际数据内容,它是我们希望存储到 Kafka 中的内容,通常是业务数据。

package com.atguigu.sink;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 */
public class SinkKafkaWithKey {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());


        SingleOutputStreamOperator<String> sensorDS = env
                .socketTextStream("hadoop102", 7777);


        /**
         * 如果要指定写入kafka的key
         * 可以自定义序列器:
         * 1、实现 一个接口,重写 序列化 方法
         * 2、指定key,转成 字节数组
         * 3、指定value,转成 字节数组
         * 4、返回一个 ProducerRecord对象,把key、value放进去
         *
         */
        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
                .setRecordSerializer(

                        new KafkaRecordSerializationSchema<String>() {

                            @Nullable
                            @Override
                            public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                                String[] datas = element.split(",");
                                byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                                byte[] value = element.getBytes(StandardCharsets.UTF_8);
                                return new ProducerRecord<>("ws", key, value);
                            }
                        }
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("atguigu-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                .build();


        sensorDS.sinkTo(kafkaSink);


        env.execute();
    }
}

Key 和 Value 的具体处理:

  • Key 的处理key 从输入的 String 数据中提取,代码使用 split(",") 方法将输入的数据分割成多个部分,并取第一个部分作为 key。例如,如果输入数据是 "123,Hello World",那么 key 就会是 "123",它将被转换为字节数组。

  • Value 的处理value 直接使用整个输入的字符串作为消息内容。比如,如果输入数据是 "123,Hello World",那么 value 就是整个 "123,Hello World" 字符串,也会被转换成字节数组。

ProducerRecord 对象的返回:

serialize() 方法中,ProducerRecord<byte[], byte[]> 被创建并返回。该 ProducerRecord 包含了:

  • key(字节数组形式):由输入数据的第一个部分(通过 split() 获得)转换而来。
  • value(字节数组形式):由整个输入数据字符串转换而来。

Kafka 将根据 key 来决定消息写入的分区,同时将 value 作为消息的内容发送到 Kafka 的指定主题(这里是 "ws")。

总结:

  • Key:表示消息的键,用于控制消息的分区。代码中通过分割字符串并取第一个部分来设置。
  • Value:表示消息的内容,代码中使用整个字符串作为值来发送到 Kafka。
  • 通过自定义 KafkaRecordSerializationSchema,我们能够灵活地处理 Kafka 消息的 keyvalue,并将它们以字节数组的形式发送到 Kafka 集群。

4.4 输出到MySQL(JDBC)

写入数据的MySQL的测试步骤如下。

(1)添加依赖

添加MySQL驱动:

<dependency>

    <groupId>mysql</groupId>

    <artifactId>mysql-connector-java</artifactId>

    <version>8.0.27</version>

</dependency>

(2)启动MySQL,在test库下建表ws

CREATE TABLE `ws` (

  `id` varchar(100) NOT NULL,

  `ts` bigint(20) DEFAULT NULL,

  `vc` int(11) DEFAULT NULL,

  PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)编写输出到MySQL的示例代码

package com.atguigu.sink;

import com.atguigu.bean.WaterSensor;
import com.atguigu.functions.WaterSensorMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 */
public class SinkMySQL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop1", 7777)
                .map(new WaterSensorMapFunction());


        /**
         * TODO 写入mysql
         * 1、只能用老的sink写法: addsink
         * 2、JDBCSink的4个参数:
         *    第一个参数: 执行的sql,一般就是 insert into
         *    第二个参数: 预编译sql, 对占位符填充值
         *    第三个参数: 执行选项 ---》 攒批、重试
         *    第四个参数: 连接选项 ---》 url、用户名、密码
         */
        SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
                "insert into ws values(?,?,?)",
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        //每收到一条WaterSensor,如何去填充占位符
                        preparedStatement.setString(1, waterSensor.getId());
                        preparedStatement.setLong(2, waterSensor.getTs());
                        preparedStatement.setInt(3, waterSensor.getVc());
                    }
                },
                JdbcExecutionOptions.builder()
                        // 重试次数
                        .withMaxRetries(3)
                        // 批次的大小:条数
                        .withBatchSize(100)
                        // 批次的时间
                        .withBatchIntervalMs(3000)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://ip:port/som?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("密码")
                        // 重试的超时时间
                        .withConnectionCheckTimeoutSeconds(60)
                        .build()
        );


        sensorDS.addSink(jdbcSink);


        env.execute();
    }
}

总结:

(a) 数据源:从 Socket 中读取数据,并将其转换为 WaterSensor 对象。

(b) JDBC Sink 配置:使用 JdbcSink 将数据写入 MySQL 数据库。通过配置 SQL 语句、JDBC 执行选项和数据库连接选项,Flink 可以将数据高效地写入数据库。

  • 第一个参数"insert into ws values(?,?,?)":这是 SQL 插入语句,用于将数据插入到 MySQL 表 ws 中。
  • 第二个参数JdbcStatementBuilder<WaterSensor>:实现了 JdbcStatementBuilder 接口,用于将每条 WaterSensor 数据填充到 SQL 的占位符中。具体地,它将 WaterSensor 对象的 idtsvc 设置为 SQL 语句中的三个占位符。
    • preparedStatement.setString(1, waterSensor.getId()):将 WaterSensor 对象的 id 设置到 SQL 语句的第一个占位符。
    • preparedStatement.setLong(2, waterSensor.getTs()):将 WaterSensor 对象的 ts 设置到 SQL 语句的第二个占位符。
    • preparedStatement.setInt(3, waterSensor.getVc()):将 WaterSensor 对象的 vc 设置到 SQL 语句的第三个占位符。
  • 第三个参数JdbcExecutionOptions:用于配置 JDBC 执行的选项。
    • .withMaxRetries(3):设置最大重试次数为 3,如果写入数据库失败,会尝试最多 3 次。
    • .withBatchSize(100):设置批次大小,每次批量写入 100 条数据。
    • .withBatchIntervalMs(3000):设置批次间隔时间为 3000 毫秒(即 3 秒)。每 3 秒会批量写入一次数据。
  • 第四个参数JdbcConnectionOptions:用于配置数据库连接的选项。
    • .withUrl("jdbc:mysql://ip:port/som?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"):设置 MySQL 数据库的连接 URL,包括数据库地址、端口、时区等。
    • .withUsername("root").withPassword("密码"):设置数据库的用户名和密码,用于连接 MySQL。
    • .withConnectionCheckTimeoutSeconds(60):设置连接检查超时时间为 60 秒,确保数据库连接在 60 秒内有效。

(c) 批量写入:通过设置批次大小和批次间隔时间,数据被批量写入 MySQL,从而提高了写入效率。

(d) 重试机制:设置了最大重试次数和超时时间,确保在数据库连接失败时能进行重试,避免数据丢失。

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

4.5 自定义Sink输出

如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。

stream.addSink(new MySinkFunction<String>());

在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。


http://www.kler.cn/a/516601.html

相关文章:

  • Redis使用基础
  • Go学习:常量
  • Linux进度条实现
  • Odoo免费开源ERP最佳业务实践:生产管理
  • 一个基于Python+Appium的手机自动化项目~~
  • 联想电脑怎么用u盘装系统_联想电脑用u盘装win10系统教程
  • mysql直接在sql中将分组查询出来的多个属性的list,拼接成一个字符串,最后的结果只要一个大的字符串
  • AAAI2024论文合集解读|Cost Minimization for Equilibrium Transition-water-merged
  • 双足机器人开源项目
  • 《中国网络安全产业分析报告(2023年)》解读
  • MySQL性能分析的“秘密武器”,深度剖析SQL问题
  • 从前端视角看设计模式之行为型模式篇
  • Recaptcha2 图像怎么识别
  • Linux pgrep 命令详解
  • vben5 admin ant design vue如何使用时间范围组件RangePicker
  • kotlin内联函数——takeIf和takeUnless
  • java读取设置pdf属性信息
  • 二分查找题目:快照数组
  • Docker Hub 全面解析及应对策略
  • 2【选修】再探宝可梦、数码宝贝分类器
  • 组播IGMP协议报文介绍
  • QT6 + CMAKE编译OPENCV3.9
  • 1.23寒假作业
  • linux中关闭服务的开机自启动
  • “上门按摩” 小程序开发项目:基于 SOP 的全流程管理
  • C语言文件操作:标准库与系统调用实践