当前位置: 首页 > article >正文

Flink有界流实现(1)

flink实现有界流需要使用StreamExecutionEnvironment类,并且最后需要使用env.execute() 方法,有界和无界的算子有时候会有不同的
复杂的写法

package org.example.test;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
/**
 * DataSet API使用
 */
public class WordCountDataStream {

    public static void main(String[] args) throws Exception {
        //TODO 1、获取流的类
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 2、读取文件
//        DataStreamSource<String> stringDataStreamSource = env.readTextFile("input/test.txt");

        DataStreamSource<String> stringDataStreamSource = env.socketTextStream("localhost", 9000, "\n");

        //TODO 3 ETL
        //TODO 3.1 转换成二元数组,简单ETL的过程
        SingleOutputStreamOperator<Tuple2<String, Integer>> process = stringDataStreamSource.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void processElement(String value, ProcessFunction<String, Tuple2<String, Integer>>.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    Tuple2<String, Integer> tuple2 = Tuple2.of(word, 1);
                    out.collect(tuple2);
                }
            }
        });

        //TODO 3.1 分组
        KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = process.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {

            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //TODO 3.2 聚合计算
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = tuple2StringKeyedStream.sum(1);

        //TODO 4、打印
        sum.print();

        //TODO 5、无界流需要这个不断执行的方法
        env.execute();
    }
}


lamb表达式写法
在这里插入图片描述

无界流在输出的时候前面会带上一个数字,这个数字是线程数
在这里插入图片描述


http://www.kler.cn/a/306584.html

相关文章:

  • Python——NumPy库的简单用法,超级详细教程使用
  • 力扣662:二叉树的最大宽度
  • 基于海思soc的智能产品开发(两个图像处理来源)
  • 量化交易系统开发-实时行情自动化交易-3.4.2.2.Okex交易数据
  • 提取神经网络数学表达式
  • 【云计算解决方案面试整理】1-2云计算基础概念及云计算技术原理
  • 【Python】谷歌浏览器总是自动更新,使用selenium跟chromedriver版本不匹配怎么办?
  • 01,大数据总结,zookeeper
  • 算法练习题27——疫情下的电影院(模拟)
  • AI辅助癌症诊断取得了进展
  • Angular面试题一
  • 大模型 LLM(Large Language Models)如今十分火爆,对于初入此领域的新人小白来说,应该如何入门 LLM 呢?是否有值得推荐的入门教程呢?
  • 深度学习自编码器 - 引言篇
  • java基于PDF底层内容流的解析对文本内容进行编辑
  • 象过河手机进销存,外出办公更方便,随时了解经营情况
  • C# 静态static
  • 基于HTML5的下拉刷新效果
  • 如何避免长距离遗忘问题
  • HarmonyOS NEXT 封装实现好用的网络模块(基于最新5.0的API12)
  • Android 12 Launcher3 去掉Hotseat
  • JVM 调优篇7 调优案例3- gc overhead limit exceed
  • ListBox显示最新数据、左移和右移操作
  • K8s中HPA自动扩缩容及hml
  • idea2024.2永久使用
  • MFC工控项目实例之十五定时刷新PC6325A模拟量输入
  • HTML添加文字