2.Flink的项目初始化和Hello-world
目录
1.Flink项目初始化
2.Hello-world
1.Flink项目初始化
新建maven项目或者gradle项目,这里使用maven项目为例。
在项目的pom.xml文件中添加Flink依赖,如下所示,为Hello-World例子的最小依赖:
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- 指定Flink版本 -->
<flink.version>1.12.2</flink.version> <!-- 请根据需要调整版本号 -->
<scala.binary.version>2.12</scala.binary.version> <!-- 如果使用Scala,请指定Scala版本 -->
<java.version>1.8</java.version> <!-- 或者更高的Java版本 -->
</properties>
<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 如果你打算使用Table API或SQL,还需要添加以下依赖 -->
<!-- 其他可能需要的依赖,如连接器、格式等 -->
<!-- 测试依赖 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
为了加速依赖的下载,尤其是在国内,可以配置maven的国内镜像,配置方法为设置maven的settings.xml文件。如果是mac用户,该文件位于家目录下.m2/settings.xml。不存在可以vim一个,早文件中添加以下内容。
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<mirrors>
<!-- 配置阿里云Maven镜像 -->
<mirror>
<id>aliyunmaven</id>
<mirrorOf>central</mirrorOf>
<name>Aliyun Maven</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>
<!-- 如果还需要其他镜像,可以继续添加 -->
</mirrors>
</settings>
2.Hello-world
还是一如既往的大数据经典案例:Hello World
代码如下所示:
package com.wlh.p1;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class HelloWorld {
public static void main(String[] args) throws Exception {
//
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("localhost", 7777)
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] split = s.split(" ");
for (String word : split) {
collector.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
})
.sum(1)
.print();
//
env.execute();
}
}
代码不是很复杂,Flink的api相对而言和Spark比较类似,先创建一个上下文的执行环境,在项目结束的位置,使用execute运行,Spark中也有collect操作。上面的代码就是一个计算Word-Count的例子。
当我们输入:
hello world
hello rust
hello spark
程序输出:
4> (hello,1)
6> (world,1)
7> (rust,1)
4> (hello,2)
4> (hello,3)
1> (spark,1)
对于输出的一些解释,对于输出我们做一些解释,可以看出流计算和批计算的差异
输入hello world后,输出(hello,1) (world,1)
输入hello rust后,输出(hello,2) (rust,1)
输入hello spark后,输出(hello,3) (spark,1)
通过上面的计算输出,可以看出流计算的特点
a.立即计算,收到一条,计算一次
b.有状态,通过对hello的追踪,可以看出计算时是可以读写状态的