当前位置: 首页 > article >正文

Flink 实现无界流

Flink 实现无界流

package org.example.test;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * DataSet API使用
 */
public class WordCount2 {

    public static void main(String[] args) throws Exception {
        //该类主要是用于进行批处理
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //读取文本
        DataSource<String> stringDataStreamSource = env.readTextFile("input/test.txt");
        //进行ETL处理,Tuple2 是二元数组的意思
        FlatMapOperator<String, Tuple2<String, Integer>> stringTuple2FlatMapOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    Tuple2<String, Integer> oneTuple2 = Tuple2.of(word, 1);
                    out.collect(oneTuple2);
                }
            }
        });
        //进行分组,分组字段取下标第0个
        UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping =
                stringTuple2FlatMapOperator.groupBy(0);
        //进行sum操作
        AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);
        sum.print();
    }
}

ExecutionEnvironment 是批处理的方式,DataSource会慢慢被淘汰


http://www.kler.cn/news/310853.html

相关文章:

  • 十七,Spring Boot 整合 MyBatis 的详细步骤(两种方式)
  • 四、JVM原理-4.2、内存管理
  • 计算机视觉(CV)技术是指计算机系统通过模拟人类视觉系统来识别、理解和解释图像和视频的能力。它可以在各种领域中发挥巨大作用,但也面临一些挑战。
  • tasklist命令的应用实例
  • 力扣150题——位运算
  • 小程序开发设计-第一个小程序:创建小程序项目④
  • Java学习路线指南
  • PowerShell install 一键部署Oracle23ai
  • 基于安卓的音乐app设计与实现(全套)
  • Android 开发高频面试题之——Flutter
  • 【JVM】类加载过程|双亲委派模型
  • RTX 4090 系列即将停产,RTX 5090 系列蓄势待发
  • 【系统架构设计】系统的可靠性分析与设计
  • 接口自动化框架入门(requests+pytest)
  • 最好用的翻译器:什么是DeepL?如何订阅支付DeepL,订阅DeepL Pro以及申请DeepL API?
  • 蓝桥杯—STM32G431RBT6按键的多方式使用(包含软件消抖方法精讲)从原理层面到实际应用(一)
  • TS - tsconfig.json 和 tsconfig.node.json 的关系,如何在TS 中使用 JS 不报错
  • 产品经理注意!11月NPDP考试预报名已开启
  • Oracle 11gR2打PSU补丁详细教程
  • 监控系列之-Grafana面板展示及制作
  • Flutter 优化技巧分享
  • Linux 内存分析工具 —— heaptrack
  • 代理IP批理检测工具,支持socks5,socks4,http和https代理批量检测是否可用
  • MySQL基础作业五
  • Text2SQL论文笔记 A Survey on Employing Large Language Models for Text-to-SQL Tasks
  • linux-L3-linux 复制文件
  • HBase在大数据实时处理中的角色
  • 【IEEEACM Fellow、CCF组委】第三届人工智能与智能信息处理国际学术会议(AIIIP 2024)
  • Python | Leetcode Python题解之第405题数字转换为十六进制数
  • java泛型的概念与好处