当前位置: 首页 > article >正文

Flink有界流实现(1)

flink实现有界流需要使用StreamExecutionEnvironment类,并且最后需要使用env.execute() 方法,有界和无界的算子有时候会有不同的
复杂的写法

package org.example.test;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
/**
 * DataSet API使用
 */
public class WordCountDataStream {

    public static void main(String[] args) throws Exception {
        //TODO 1、获取流的类
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 2、读取文件
//        DataStreamSource<String> stringDataStreamSource = env.readTextFile("input/test.txt");

        DataStreamSource<String> stringDataStreamSource = env.socketTextStream("localhost", 9000, "\n");

        //TODO 3 ETL
        //TODO 3.1 转换成二元数组,简单ETL的过程
        SingleOutputStreamOperator<Tuple2<String, Integer>> process = stringDataStreamSource.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void processElement(String value, ProcessFunction<String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    Tuple2<String, Integer> tuple2 = Tuple2.of(word, 1);
                    out.collect(tuple2);
                }
            }
        });

        //TODO 3.1 分组
        KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = process.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {

            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //TODO 3.2 聚合计算
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = tuple2StringKeyedStream.sum(1);

        //TODO 4、打印
        sum.print();

        //TODO 5、无界流需要这个不断执行的方法
        env.execute();
    }
}


lamb表达式写法
在这里插入图片描述

无界流在输出的时候前面会带上一个数字,这个数字是线程数
在这里插入图片描述


http://www.kler.cn/news/306584.html

相关文章:

  • 【Python】谷歌浏览器总是自动更新,使用selenium跟chromedriver版本不匹配怎么办?
  • 01,大数据总结,zookeeper
  • 算法练习题27——疫情下的电影院(模拟)
  • AI辅助癌症诊断取得了进展
  • Angular面试题一
  • 大模型 LLM(Large Language Models)如今十分火爆,对于初入此领域的新人小白来说,应该如何入门 LLM 呢?是否有值得推荐的入门教程呢?
  • 深度学习自编码器 - 引言篇
  • java基于PDF底层内容流的解析对文本内容进行编辑
  • 象过河手机进销存,外出办公更方便,随时了解经营情况
  • C# 静态static
  • 基于HTML5的下拉刷新效果
  • 如何避免长距离遗忘问题
  • HarmonyOS NEXT 封装实现好用的网络模块(基于最新5.0的API12)
  • Android 12 Launcher3 去掉Hotseat
  • JVM 调优篇7 调优案例3- gc overhead limit exceed
  • ListBox显示最新数据、左移和右移操作
  • K8s中HPA自动扩缩容及hml
  • idea2024.2永久使用
  • MFC工控项目实例之十五定时刷新PC6325A模拟量输入
  • HTML添加文字
  • 【深度学习】Pytorch基础
  • 分享一些成功的 SQL 优化案例
  • 2024工业机器视觉产业现状
  • 多模态大语言模型综述(中)-算法实用指南
  • 如何在Django中创建新的模型实例
  • MFC工控项目实例之十六输入信号验证
  • app抓包 chrome://inspect/#devices
  • 2024.9.12(k8s环境搭建2)
  • WebSocket vs. Server-Sent Events:选择最适合你的实时数据流技术
  • VUE3中ref与reactive