当前位置: 首页 > article >正文

2.Flink的项目初始化和Hello-world

目录

1.Flink项目初始化

2.Hello-world


1.Flink项目初始化

新建maven项目或者gradle项目,这里使用maven项目为例。

在项目的pom.xml文件中添加Flink依赖,如下所示,为Hello-World例子的最小依赖:

 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!-- 指定Flink版本 -->
        <flink.version>1.12.2</flink.version> <!-- 请根据需要调整版本号 -->
        <scala.binary.version>2.12</scala.binary.version> <!-- 如果使用Scala,请指定Scala版本 -->
        <java.version>1.8</java.version> <!-- 或者更高的Java版本 -->
    </properties>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- 如果你打算使用Table API或SQL,还需要添加以下依赖 -->

        <!-- 其他可能需要的依赖,如连接器、格式等 -->

        <!-- 测试依赖 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

为了加速依赖的下载,尤其是在国内,可以配置maven的国内镜像,配置方法为设置maven的settings.xml文件。如果是mac用户,该文件位于家目录下.m2/settings.xml。不存在可以vim一个,早文件中添加以下内容。

<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
    <mirrors>
        <!-- 配置阿里云Maven镜像 -->
        <mirror>
            <id>aliyunmaven</id>
            <mirrorOf>central</mirrorOf>
            <name>Aliyun Maven</name>
            <url>https://maven.aliyun.com/repository/public</url>
        </mirror>
        <!-- 如果还需要其他镜像,可以继续添加 -->
    </mirrors>
</settings>

2.Hello-world

还是一如既往的大数据经典案例:Hello World

代码如下所示:

package com.wlh.p1;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        //
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.socketTextStream("localhost", 7777)
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        String[] split = s.split(" ");
                        for (String word : split) {
                            collector.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                        return stringIntegerTuple2.f0;
                    }
                })
                .sum(1)
                .print();

        //
        env.execute();

    }
}

代码不是很复杂,Flink的api相对而言和Spark比较类似,先创建一个上下文的执行环境,在项目结束的位置,使用execute运行,Spark中也有collect操作。上面的代码就是一个计算Word-Count的例子。

当我们输入:

hello world
hello rust
hello spark

程序输出:

4> (hello,1)
6> (world,1)
7> (rust,1)
4> (hello,2)
4> (hello,3)
1> (spark,1)

对于输出的一些解释,对于输出我们做一些解释,可以看出流计算和批计算的差异

输入hello world后,输出(hello,1) (world,1)

输入hello rust后,输出(hello,2) (rust,1)

输入hello spark后,输出(hello,3) (spark,1)

通过上面的计算输出,可以看出流计算的特点

a.立即计算,收到一条,计算一次

b.有状态,通过对hello的追踪,可以看出计算时是可以读写状态的


http://www.kler.cn/a/429080.html

相关文章:

  • 移动端布局 ---- 学习分享
  • 3D 视觉语言推理中的态势感知
  • 【PyCharm】连接Jupyter Notebook
  • 深度学习项目--基于LSTM的火灾预测研究(pytorch实现)
  • 玩转大语言模型——使用graphRAG+Ollama构建知识图谱
  • Redis 性能优化:多维度技术解析与实战策略
  • 「Mac玩转仓颉内测版47」小学奥数篇10 - 数列求和
  • 电脑无法识别usb设备怎么办?电脑无法识别usb解决方法
  • 基于STM32的DS18B20温度报警器_可调上下限Proteus仿真设计(仿真+程序+设计报告+讲解视频)
  • 鸿蒙ZRouter动态路由框架—服务路由
  • java+springboot+mysql游乐园管理系统
  • 重生之我在学Vue-- Vue3 学习路径总览
  • 生成SSH秘钥文件
  • Python实现ARIMA-LSTM回归模型预测股票价格项目实战
  • 深入了解架构中常见的4种缓存模式及其实现
  • Linux —— 管理文件
  • yolov10 生成json 自动标注
  • 《Python数据分析:活用pandas库》学习笔记Day1:Panda DataFrame基础知识
  • k8s集群环境时间同步
  • Ethernet 系列(10)-- 基础学习::UDP
  • 突破空间限制!从2D到3D:北大等开源Lift3D,助力精准具身智能操作!
  • 【Linux系统】Ubuntu 缓冲区机制
  • 【在Linux世界中追寻伟大的One Piece】HTTP cookie
  • NDK编译(使用Android.mk)C/C++程序和库
  • pytest(三)json数据驱动
  • flutter实现如何 检测键盘的显示和隐藏状态