当前位置: 首页 > article >正文

Flink算子

文章目录

    • map
    • filter
    • flatMap
    • keyBy
    • aggregations
    • reduce
    • 物理分区算子
    • 富函数
    • split
    • side output
    • union(联合)
    • connect(连接)

map

Map 算子会遍历数据流的每一个元素产生一个新的元素。

 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        SingleOutputStreamOperator<Integer> source = env.socketTextStream("192.168.235.130", 8888).map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String s) throws Exception {
                return Integer.valueOf(s)*10;
            }
        });
        source.print();
        env.execute();
    }

在这里插入图片描述

filter

filter算子通过一个布尔表达式对数据流的元素进行过滤,若为true则正常输出该元素,若为false则过滤掉该元素。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        SingleOutputStreamOperator<String> filter = env.socketTextStream("192.168.235.130", 8888).filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                String[] data = s.split(",");
                return "10".equals(data[1]);
            }
        });
        filter.print();
        env.execute();
    }

在这里插入图片描述

flatMap

flatMap遍历数据流中的每一个元素产生N(N >= 0)个元素。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        SingleOutputStreamOperator<String> flatMap = env.socketTextStream("192.168.235.130", 8888).flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] data = s.split(",");
                for (String str : data) {
                    collector.collect(str);
                }
            }
        });
        flatMap.print();
        env.execute();
    }

在这里插入图片描述

keyBy

在使用聚合算子之前通常要经过keyBy分组,keyBy通过指定的key将数据流中的数据划分到不同的分区,那么具有相同key的数据都被发送到同一个分区,但一个分区中可能存在不同key的数据,底层原理是通过计算key的哈希值对分区数取模来实现的,如果key是POJO类型必须重写hashCode()方法。

 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(10);
        KeyedStream<String, String> keyedStream = env.socketTextStream("192.168.235.130", 8888).keyBy(new KeySelector<String, String>() {
            @Override
            public String getKey(String s) throws Exception {
                String[] data = s.split(",");
                return data[1];
            }
        });
        keyedStream.print();
        env.execute();
    }

在这里插入图片描述

aggregations

aggregations包含以下聚合算子,在数据流中,sum()用于对指定的字段求和,min()对指定的字段求最小值,max()对指定的字段求最大值,maxby()取比较字段的最大值,同时非比较字段 取 最大值这条数据的值,minBy()同理,取比较字段的最小值,同时非比较字段 取 最小值这条数据的值。

public class WaterSensor {
    public String id;
    public Long ts;
    public Integer vc;

    // 要提供一个空参的构造器
    public WaterSensor() {
    }

    public WaterSensor(String id, Long ts, Integer vc) {
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }
    }
  public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 10L, 10),
                new WaterSensor("s1", 20L, 11),
                new WaterSensor("s1", 30L, 10),
                new WaterSensor("s2", 40L, 2),
                new WaterSensor("s3", 50L, 3)
        );
        
        KeyedStream<WaterSensor, String> sensorKS = sensorDS
                .keyBy(new KeySelector<WaterSensor, String>() {
                    @Override
                    public String getKey(WaterSensor value) throws Exception {
                        return value.getId();
                    }
                });
        
        SingleOutputStreamOperator<WaterSensor> result = sensorKS.maxBy("vc");

//        SingleOutputStreamOperator<WaterSensor> result = sensorKS.max("vc");
//        SingleOutputStreamOperator<WaterSensor> result = sensorKS.min("vc");
      //  SingleOutputStreamOperator<WaterSensor> result = sensorKS.maxBy("vc");
//        SingleOutputStreamOperator<WaterSensor> result = sensorKS.minby("vc");

        result.print();
        env.execute();
    }

在这里插入图片描述

reduce

reduce用于对分组完的数据流进行聚合处理,把新输入的数据和当前已经归约出来的数据进行聚合计算,因此每组的第一个元素不会执行reduce操作,需要等待同组的下一个元素到来后再进行计算。

  public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        
        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 10L, 1),
                new WaterSensor("s1", 20L, 11),
                new WaterSensor("s1", 30L, 21),
                new WaterSensor("s2", 40L, 2),
                new WaterSensor("s3", 50L, 3)
        );


        KeyedStream<WaterSensor, String> sensorKS = sensorDS
                .keyBy(new KeySelector<WaterSensor, String>() {
                    @Override
                    public String getKey(WaterSensor value) throws Exception {
                        return value.getId();
                    }
                });

        SingleOutputStreamOperator<WaterSensor> reduce = sensorKS.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                System.out.println("value1=" + value1);
                System.out.println("value2=" + value2);
                return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc);
            }
        });

        reduce.print();
        env.execute();
    }

在这里插入图片描述

物理分区算子

常见的物理分区策略包含以下几种:随机分区、轮询分区、重缩放,广播,全局分区和自定义分区。

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(8);
        DataStreamSource<String> socketDS = env.socketTextStream("192.168.235.130", 8888);
        // shuffle随机分区
        socketDS.shuffle().print();

        // rebalance轮询
        // 如果是数据源倾斜的场景,调用rebalance,就可以解决数据源的数据倾斜
      //  socketDS.rebalance().print();

        //rescale缩放:实现轮询,比rebalance更高效
       // socketDS.rescale().print();
        
        // broadcast广播:发送给下游所有的子任务
     //   socketDS.broadcast().print();

        // global全局:全部发往第一个子任务
      // socketDS.global().print();

        // keyby: 按指定key去发送,相同key发往同一个子任务
        // one-to-one: Forward分区器
        
        env.execute();
    }

在这里插入图片描述

富函数

Flink函数类都有对应的Rich版本,例如RichMapFunction、RichFilterFunction、RichReduceFunction等,富函数类与常规函数类的主要区别在于,富函数类可以获取运行环境的上下文,并且拥有生命周期的方法,所以富函数类能够实现更复杂的功能。

   public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(3);

        DataStreamSource<String> source = env.socketTextStream("192.168.235.130", 8888);
        SingleOutputStreamOperator<Integer> map = source.map(new RichMapFunction<String, Integer>() {

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                System.out.println(
                        "子任务编号=" + getRuntimeContext().getIndexOfThisSubtask()
                                + ",子任务名称=" + getRuntimeContext().getTaskNameWithSubtasks()
                                + ",调用open()");
            }

            @Override
            public void close() throws Exception {
                super.close();
                System.out.println(
                        "子任务编号=" + getRuntimeContext().getIndexOfThisSubtask()
                                + ",子任务名称=" + getRuntimeContext().getTaskNameWithSubtasks()
                                + ",调用close()");
            }

            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value) + 1;
            }
        });

        map.print();

        env.execute();
    }

注: 富函数在启动时,open()调用一次,结束时,close()调用一次。

在这里插入图片描述

split

split与side output都是分流算子,分流就是定义一些筛选条件,将一条数据流拆分成多条数据流。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<String> source = env.socketTextStream("192.168.235.130", 8888);
        SingleOutputStreamOperator<String> even = source.filter(value -> Integer.valueOf(value) % 2 == 0);
        SingleOutputStreamOperator<String> odd = source.filter(value -> Integer.valueOf(value) % 2 == 1);
        even.print("偶数流");
        odd.print("奇数流");
        env.execute();

    }

在这里插入图片描述

split的缺点:每一个数据都要调用两次filter处理,效率低,一般不用。

side output

side output在处理数据流时,可以将数据流中的元素根据条件发送到额外的输出流中,而不需要复制整个数据流。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<String> source = env.socketTextStream("192.168.235.130", 8888);
        SingleOutputStreamOperator<WaterSensor> map = source.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] data = s.split(",");
                return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[1]));
            }
        });

        OutputTag<WaterSensor> tag1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
        OutputTag<WaterSensor> tag2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class));
        SingleOutputStreamOperator<WaterSensor> process = map.process(new ProcessFunction<WaterSensor, WaterSensor>() {
            @Override
            public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
                String id = value.getId();
                if ("s1".equals(id)) {
                    ctx.output(tag1, value);
                } else if ("s2".equals(id)) {
                    ctx.output(tag2, value);
                } else {
                    out.collect(value);
                }
            }
        });

        SideOutputDataStream<WaterSensor> sideOutput1 = process.getSideOutput(tag1);
        SideOutputDataStream<WaterSensor> sideOutput2 = process.getSideOutput(tag2);
        process.print("主流");
        sideOutput1.printToErr("s1");
        sideOutput2.printToErr("s2");
        env.execute();
    }

在这里插入图片描述

union(联合)

union是最简单的合流操作,可以直接将多条数据流合在一起,但要求流中的数据类型必须相同,

  public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<Integer> source1 = env.fromElements(10, 20, 30, 40);
        DataStreamSource<Integer> source2 = env.fromElements(5, 6, 7, 8);
        DataStreamSource<String> source3 = env.fromElements("100", "200", "300", "400");
      //  DataStream<Integer> union1 = source1.union(source2, source3.map(value -> Integer.valueOf(value)));
       DataStream<Integer> union2 = source1.union(source3.map(value -> Integer.valueOf(value)));
     //   union1.print("union1");
        union2.print("union2");
        env.execute();
    }

在这里插入图片描述
union的缺点:要求数据类型必须相同,不能改变,缺少灵活性,所以很少用。

connect(连接)

connect每次能连接2条流,流的数据类型可以不一样,两条流连接后可以各自调用函数map、flatmap、process等处理。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(2);
        SingleOutputStreamOperator<Integer> source1 = environment.socketTextStream("192.168.235.130", 9999).map(value -> Integer.valueOf(value));
        DataStreamSource<String> source2 = environment.socketTextStream("192.168.235.130", 8888);
        ConnectedStreams<Integer, String> connect = source1.connect(source2);
        SingleOutputStreamOperator<Object> map = connect.map(new CoMapFunction<Integer, String, Object>() {
            @Override
            public Object map1(Integer value) throws Exception {
                value *= 10;
                return "来源于数字流"+value.toString();
            }

            @Override
            public Object map2(String value) throws Exception {
                return "来源于字母流"+value;
            }
        });
        map.print();
        environment.execute();
    }

在这里插入图片描述


http://www.kler.cn/a/400422.html

相关文章:

  • 通过华为鲲鹏认证发行上市的集成平台产品推荐
  • 鸿蒙HarmonyOS 地图定位到当前位置 site查询等操作
  • 模式:每个服务一个数据库
  • 【蓝桥杯备赛】深秋的苹果
  • 鸿蒙网络编程系列48-仓颉版UDP回声服务器示例
  • 【Nginx】反向代理Https时相关参数:
  • Vue 3 中 ref 属性详解:操作 DOM 元素的利器
  • Python的3D可视化库 - vedo (1)简介和模块功能概览
  • ThinkPHP6的ORM模型
  • hive-内部表外部表-详细介绍
  • Java 网络编程:Socket 与网络通信
  • Jtti:服务器总是自动重启怎么办?
  • 如何保存python文件
  • 最新6.7分非肿瘤纯生信,使用机器学习筛选慢阻肺中的关键基因。机器学习在非肿瘤生信文章中正火,可重复!
  • Python自动化DevOps任务入门
  • stm32学习笔记----51单片机和stm32单片机的区别
  • w043基于springboot的“衣依”服装销售平台的设计与实现
  • postgresql(功能最强大的开源数据库)继承特性和分区实现
  • STM32 ADC --- DMA乒乓缓存
  • Spark:大数据处理的强大引擎
  • Elasticsearch的查询语法——DSL 查询
  • 网页作业9
  • esp32c3开发板通过micropython的mqtt库连MQTT物联网消息服务器
  • @JsonSerialize修复前端精度问题
  • 【Nginx从入门到精通】03 、安装部署-让虚拟机可以联网
  • executed_gtid_set 与 purged_gtid_set