当前位置: 首页 > article >正文

Flink-WordCount源码解析

一、源码下载

下载地址:Apache Download Mirrors

二、idea构建

使用idea打开解压后的源码,并设置好jdk和maven配置,静等下载依赖即可。

三、WordCount源码探寻

WordCount示例是对文本文件上的单词进行计算,可以在流式和批处理执行模式下执行。

输入参数有4个:

  • input :要读取的输入文件或目录列表。如果没有提供输入,程序将使用WordCountData 中的默认数据运行
  • discovery-interval : 将文件读取器转换为连续源,该源将每隔一段时间监视提供的输入目录并读取任何新文件
  • output :作业将写入结果的输出目录。如果没有提供输出路径,则作业将结果打印到 stdout
  • execution-mode :执行模式,有三种(BATCH、STREAMING、AUTOMATIC)

1、执行模式

DataStream程序的执行模式还控制着任务调度、网络洗牌行为和时间语义。某些操作还将根据配置的执行模式更改其记录排放行为。

STREAMING

        使用流语义执行。所有任务都将在执行开始前部署,检查点将启用,并且将完全支持处理和事件时间。

BATCH

        使用批处理语义执行。任务将根据它们所属的调度区域逐步调度,区域之间的混洗将被阻塞,水印被假设为“完美”的,即没有延迟数据,处理时间被假设在执行过程中不会提前。

AUTOMATIC

        如果所有源都是有界的,Flink会将执行模式设置为BATCH

        或者如果至少有一个源是无界的,则将执行模式设为STREAMING

2、源码

public class WordCount {

    // *************************************************************************
    // PROGRAM  程序
    // *************************************************************************

    public static void main(String[] args) throws Exception {
        //对输出参数进行解析
        final CLI params = CLI.fromArgs(args);

        // 创建执行环境。这是构建Flink应用程序的主要入口点。
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Apache Flink对流和批处理的统一方法意味着,在有界输入上执行的DataStream应用程序将产生相同的最终结果,
        // 而不管配置的执行模式如何。重要的是要注意这里的final意味着什么:
        // 在STREAMING模式下执行的作业可能会产生增量更新(想想数据库中的异常),
        // 而在批处理模式下,它最终只会产生一个最终结果。
        // 如果解释正确,最终结果将是相同的,但到达那里可能会有所不同
        //
        // DataStream API的“经典”执行行为称为STREAMING执行模式。
        // 应用程序应将流式执行用于需要连续增量处理且预计将无限期保持在线的无限制作业。
        //
        // 通过启用BATCH执行,我们允许Flink应用额外的优化,只有当我们知道我们的输入是有界的时,我们才能这样做。
        // 例如,除了允许更有效的任务调度和故障恢复行为的不同shuffle实现外,还可以使用不同的连接/聚合策略。
        //
        // 通过将运行时模式设置为AUTOMATIC,如果所有源都有界,Flink将选择BATCH,否则选择STREAMING。

        //默认的 执行模式 为 : STREAMING
        env.setRuntimeMode(params.getExecutionMode());

        // 此可选步骤使输入参数在Flink UI中可用。
        env.getConfig().setGlobalJobParameters(params);

        DataStream<String> text;
        if (params.getInputs().isPresent()) {
            // 创建一个新的文件源,该文件源将从给定的目录集中读取文件。每个文件都将作为纯文本处理,并根据换行进行拆分。
            FileSource.FileSourceBuilder<String> builder =
                    FileSource.forRecordStreamFormat(
                            new TextLineInputFormat(), params.getInputs().get());

            // 如果提供了发现间隔,则源将持续监视给定目录中的新文件。
            params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);

            text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input");
        } else {
            text = env.fromData(WordCountData.WORDS).name("in-memory-input");
        }

        DataStream<Tuple2<String, Integer>> counts =
                // 使用用户定义的函数将从源读取的文本行拆分为单词。下面实现的标记器将每个单词输出为包含(word, 1)的(2元组)
                text.flatMap(new Tokenizer())
                        .name("tokenizer")
                        // keyBy根据单词“0”字段对元组进行分组。
                        // 使用keyBy允许在每个键的基础上对数据执行聚合和其他有状态转换。
                        // 这类似于SQL查询中的GROUP BY子句。
                        .keyBy(value -> value.f0)
                        // each time it sees a new instance of each word in the stream.
                        // 对于每个键,我们执行“1”字段的简单求和,即计数。
                        // 如果输入数据流有界,sum将输出每个单词的最终计数。
                        // 如果它是无界的,则每次看到流中每个单词的新实例时,它都会不断输出更新。
                        .sum(1)
                        .name("counter");

        if (params.getOutput().isPresent()) {
            // 给定一个输出目录,Flink将使用简单的字符串编码将结果写入文件。
            // 在生产环境中,这可能是更结构化的东西,如CSV、Avro、JSON或Parquet。
            counts.sinkTo(
                            FileSink.<Tuple2<String, Integer>>forRowFormat(
                                            params.getOutput().get(), new SimpleStringEncoder<>())
                                    .withRollingPolicy(
                                            DefaultRollingPolicy.builder()
                                                    .withMaxPartSize(MemorySize.ofMebiBytes(1))
                                                    .withRolloverInterval(Duration.ofSeconds(10))
                                                    .build())
                                    .build())
                    .name("file-sink");
        } else {
            counts.print().name("print-sink");
        }

        // Apache Flink应用程序是延迟编写的。调用execute提交作业并开始处理。
        env.execute("WordCount");
    }

    // *************************************************************************
    // USER FUNCTIONS 用户函数
    // *************************************************************************

    /**
     * 实现字符串标记器,将句子拆分为单词,作为用户定义的FlatMapFunction。
     * 该函数接收一行(String),并将其拆分为多对,形式为“(word,1)”({@code Tuple2<String,Integer>})。
     */
    public static final class Tokenizer
            implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line  对一行数据进行归一化和拆分
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs 发出配对,将每个单词作为key value设置成1 并作为 Tuple2 输出
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

3、总结

1、解析输入参数

2、为Flink应用程序创建执行环境,该环境提供了控制作业执行(如设置并行性或容错/检查点参数)和与外部交互(数据访问)的方法。

3、为环境设置执行模式,默认为STREAMING

4、将Flink UI中的参数设置到环境中

5、根据输入文件创建DataStreamSource<String>(它表示DataStream的起点),如果输入参数没有设置则用事先在内存中准备好的数据,这是整个程序第一个DataStream,像Spark程序一样可以使用算子将其转化成其他DataStream

6、通过flatMap算子中的自定义函数将从源读取的文本行拆分为单词(格式Tuple2)

7、通过keyBy算子对元组进行按key分组,并在每个键的基础上对数据执行聚合和其他有状态转换。这类似于SQL查询中的GROUP BY子句

8、通过sum算子对每个key对应的value求和。如果输入数据流有界,sum将输出每个单词的最终计数。如果它是无界的,则每次看到流中每个单词的新实例时,它都会不断输出更新。

9、如果参数给了输出目录,则使用简单的字符串编码将结果写入文件,否则输出到日志

10、调用execute提交作业并开始处理(Apache Flink应用程序是延迟编写的)

4、画图


http://www.kler.cn/a/535354.html

相关文章:

  • 【Prometheus】如何通过golang生成prometheus格式数据
  • 基础篇05-直方图操作
  • GitHub 使用教程:从入门到进阶
  • app专项测试(网络测试流程)
  • k8s常见面试题2
  • MongoDB学习笔记-解析jsonCommand内容
  • OpenGL学习笔记(十):初级光照:材质 Materials
  • DEEPSEKK GPT等AI体的出现如何重构工厂数字化架构:从设备控制到ERP MES系统的全面优化
  • 新星杯进化史:个人发起到CSDN官方支持,创作活动的新篇章
  • 从零手写Spring IoC容器(一):传统对象管理的困境与基础容器的实现
  • 参数3说明
  • Windows 中学习Docker环境准备3、在Ubuntu中安装Docker
  • Linux之守护进程,应用层协议
  • 【数据结构】_队列的结构与实现
  • 【Leetcode 每日一题】90. 子集 II
  • 基于多重算法的医院增强型50G全光网络设计与实践:构建智慧医疗新基石(上)
  • MS SQL Server partition by 函数实战二 编排考场人员
  • vite共享配置之---css相关
  • 【玩转 Postman 接口测试与开发2_018】第14章:利用 Postman 初探 API 安全测试
  • MAC OS安装Homebrew
  • android用eclipse开发碰到65535问题的完美解决方案
  • DMZ区的作用和原则
  • 【Windows 开发NVIDIA相关组件】CUDA、cuDNN、TensorRT
  • 3.5 Go(特殊函数)
  • 计算机毕业设计hadoop+spark+hive民宿推荐系统 酒店推荐系统 民宿价格预测 酒店价预测 机器学习 深度学习 Python爬虫 HDFS集群
  • 嵌入式硬件篇---OpenMV基本使用自动增益\曝光\白平衡