Flink学习连载第二篇-使用flink编写WordCount(多种情况演示)
使用Flink编写代码,步骤非常固定,大概分为以下几步,只要牢牢抓住步骤,基本轻松拿下:
1. env-准备环境
2. source-加载数据
3. transformation-数据处理转换
4. sink-数据输出
5. execute-执行
DataStream API开发
//nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/overview/
0. 添加依赖
<properties>
<flink.version>1.13.6</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ssh</artifactId>
<version>2.8</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>wagon-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<!--上传的本地jar的位置-->
<fromFile>target/${project.build.finalName}.jar</fromFile>
<!--远程拷贝的地址-->
<url>scp://root:root@bigdata01:/opt/app</url>
</configuration>
</plugin>
</plugins>
</build>
-
编写代码
package com.bigdata.day01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount01 {
/**
* 1. env-准备环境
* 2. source-加载数据
* 3. transformation-数据处理转换
* 4. sink-数据输出
* 5. execute-执行
*/
public static void main(String[] args) throws Exception {
// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行,都这么写,非常方便
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 这个是 自动 ,根据流的性质,决定是批处理还是流处理
//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 批处理流, 一口气把数据算出来
// env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 流处理,默认是这个 可以通过打印批和流的处理结果,体会流和批的含义
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类
DataStream<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");
DataStream<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] arr = line.split(" ");
for (String word : arr) {
// 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStream
collector.collect(word);
}
}
});
//flatMapStream.print();
// Tuple2 指的是2元组
DataStream<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1); // ("hello",1)
}
});
DataStream<Tuple2<String, Integer>> sumResult = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
return tuple2.f0;
}
// 此处的1 指的是元组的第二个元素,进行相加的意思
}).sum(1);
sumResult.print();
// 执行
env.execute();
}
}
查看本机的CPU的逻辑处理器的数量,逻辑处理器的数量就是你的分区数量。
12> spark
13> kakfa
11> spark
11> flink
11> kafka
13> hadoop
12> sqoop
13> flink
12> flink
前面的数字是分区数,默认跟逻辑处理器的数量有关系。
对结果进行解释:
什么是批,什么是流?
批处理结果:前面的序号代表分区
流处理结果:
也可以通过如下方式修改分区数量:
env.setParallelism(2);
关于并行度的代码演示:
系统以及算子都可以设置并行度,或者获取并行度
package com.bigdata.day01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount01 {
/**
* 1. env-准备环境
* 2. source-加载数据
* 3. transformation-数据处理转换
* 4. sink-数据输出
* 5. execute-执行
*/
public static void main(String[] args) throws Exception {
// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行,都这么写,非常方便
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 这个是 自动 ,根据流的性质,决定是批处理还是流处理
//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 批处理流, 一口气把数据算出来
// env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 流处理,默认是这个 可以通过打印批和流的处理结果,体会流和批的含义
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 将任务的并行度设置为2
// env.setParallelism(2);
// 通过这个获取系统的并行度
int parallelism = env.getParallelism();
System.out.println(parallelism);
// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类
DataStream<String> dataStream01 = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");
DataStream<String> flatMapStream = dataStream01.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] arr = line.split(" ");
for (String word : arr) {
// 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStream
collector.collect(word);
}
}
});
// 每一个算子也有自己的并行度,一般跟系统保持一致
System.out.println("flatMap的并行度:"+flatMapStream.getParallelism());
//flatMapStream.print();
// Tuple2 指的是2元组
DataStream<Tuple2<String, Integer>> mapStream = flatMapStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1); // ("hello",1)
}
});
DataStream<Tuple2<String, Integer>> sumResult = mapStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
return tuple2.f0;
}
// 此处的1 指的是元组的第二个元组,进行相加的意思
}).sum(1);
sumResult.print();
// 执行
env.execute();
}
}
- 打包、上传
文件夹不需要提前准备好,它可以帮我创建
- 提交我们自己开发打包的任务
flink run -c com.bigdata.day01.WordCount01 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar
去界面中查看运行结果:
因为你这个是集群运行的,所以标准输出流中查看,假如第一台没有,去第二台查看,一直点。
获取主函数参数工具类
可以通过外部传参的方式给定一个路径
以下代码可以做到,假如给定路径,就获取路径的数据,假如没给,就读取默认数据:
package com.bigdata.day01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount02 {
/**
* 1. env-准备环境
* 2. source-加载数据
* 3. transformation-数据处理转换
* 4. sink-数据输出
* 5. execute-执行
*/
public static void main(String[] args) throws Exception {
// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行,都这么写,非常方便
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 这个是 自动 ,根据流的性质,决定是批处理还是流处理
//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 批处理流, 一口气把数据算出来
// env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 流处理,默认是这个 可以通过打印批和流的处理结果,体会流和批的含义
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 将任务的并行度设置为2
// env.setParallelism(2);
// 通过这个获取系统的并行度
int parallelism = env.getParallelism();
System.out.println(parallelism);
// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类
// 连着写的本质就是 因为每一个算子的返回值都是DataStream的子类,所以可以这么写
// 以下代码中路径是写死的,能不能通过外部传参进来,当然可以! agrs
DataStream<String> dataStream = null;
System.out.println(args.length);
if(args.length !=0){
String path = args[0];
dataStream = env.readTextFile(path);
}else{
dataStream = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");
}
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] arr = line.split(" ");
for (String word : arr) {
// 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStream
collector.collect(word);
}
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1); // ("hello",1)
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
return tuple2.f0;
}
// 此处的1 指的是元组的第二个元组,进行相加的意思
}).sum(1).print();
// 执行
env.execute();
}
}
flink run -c com.bigdata.day01.Demo02 FlinkDemo-1.0-SNAPSHOT.jar /home/wc.txt
这样做,跟我们以前的做法还是不一样。以前的运行方式是这样的
flink run /opt/installs/flink/examples/batch/WordCount.jar --input /home/wc.txt
这个写法,传递参数的时候,带有--字样,而我们的没有。
以上代码进行升级,我想将参数前面追加一个 --input 这样,怎么写?
ParameterTool parameterTool = ParameterTool.fromArgs(args);
if(parameterTool.has("output")){
path = parameterTool.get("output");
}
在代码中的使用:
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String output = "";
if (parameterTool.has("output")) {
output = parameterTool.get("output");
System.out.println("指定了输出路径使用:" + output);
} else {
output = "hdfs://node01:9820/wordcount/output47_";
System.out.println("可以指定输出路径使用 --output ,没有指定使用默认的:" + output);
}
升级过的代码:
package com.bigdata.day01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount02 {
/**
* 1. env-准备环境
* 2. source-加载数据
* 3. transformation-数据处理转换
* 4. sink-数据输出
* 5. execute-执行
*/
public static void main(String[] args) throws Exception {
// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行,都这么写,非常方便
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 这个是 自动 ,根据流的性质,决定是批处理还是流处理
//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 批处理流, 一口气把数据算出来
// env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 流处理,默认是这个 可以通过打印批和流的处理结果,体会流和批的含义
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 将任务的并行度设置为2
// env.setParallelism(2);
// 通过这个获取系统的并行度
int parallelism = env.getParallelism();
System.out.println(parallelism);
// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类
// 连着写的本质就是 因为每一个算子的返回值都是DataStream的子类,所以可以这么写
// 以下代码中路径是写死的,能不能通过外部传参进来,当然可以! agrs
DataStream<String> dataStream = null;
System.out.println(args.length);
if(args.length !=0){
String path ;
ParameterTool parameterTool = ParameterTool.fromArgs(args);
if(parameterTool.has("input")){
path = parameterTool.get("input");
}else{
path = args[0];
}
dataStream = env.readTextFile(path);
}else{
dataStream = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");
}
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] arr = line.split(" ");
for (String word : arr) {
// 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStream
collector.collect(word);
}
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1); // ("hello",1)
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
return tuple2.f0;
}
// 此处的1 指的是元组的第二个元组,进行相加的意思
}).sum(1).print();
// 执行
env.execute();
}
}
DataStream (Lambda表达式-扩展 了解)
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* Desc 演示Flink-DataStream-流批一体API完成批处理WordCount
* 使用Java8的lambda表示完成函数式风格的WordCount
*/
public class WordCount02 {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//指定计算模式为流
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);//指定计算模式为批
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动
//不设置的话默认是流模式defaultValue(RuntimeExecutionMode.STREAMING)
//TODO 2.source-加载数据
DataStream<String> dataStream = env.fromElements("flink hadoop spark", "flink hadoop spark", "flink hadoop", "flink");
//TODO 3.transformation-数据转换处理
//3.1对每一行数据进行分割并压扁
/*
public interface FlatMapFunction<T, O> extends Function, Serializable {
void flatMap(T value, Collector<O> out) throws Exception;
}
*/
/*DataStream<String> wordsDS = dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
});*/
//注意:Java8的函数的语法/lambda表达式的语法: (参数)->{函数体}
DataStream<String> wordsDS = dataStream.flatMap(
(String value, Collector<String> out) -> {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
).returns(Types.STRING);
//3.2 每个单词记为<单词,1>
/*
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception;
}
*/
/*DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});*/
DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(
(String value) -> Tuple2.of(value, 1)
).returns(Types.TUPLE(Types.STRING, Types.INT));
//3.3分组
//注意:DataSet中分组用groupBy,DataStream中分组用keyBy
//KeyedStream<Tuple2<String, Integer>, Tuple> keyedDS = wordAndOneDS.keyBy(0);
/*
public interface KeySelector<IN, KEY> extends Function, Serializable {
KEY getKey(IN value) throws Exception;
}
*/
/*KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});*/
KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy((Tuple2<String, Integer> value) -> value.f0);
//3.4聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);
//TODO 4.sink-数据输出
result.print();
//TODO 5.execute-执行
env.execute();
}
}
此处有一个大坑,就是使用完lambda表达式以后,需要添加一个returns(Types.STRING); 否则报错,这样的话,使用lambda也不是特别快了。
连着写的版本如下:
package com.bigdata.day01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount03 {
/**
* 1. env-准备环境
* 2. source-加载数据
* 3. transformation-数据处理转换
* 4. sink-数据输出
* 5. execute-执行
*/
public static void main(String[] args) throws Exception {
// 导入常用类时要注意 不管是在本地开发运行还是在集群上运行,都这么写,非常方便
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 这个是 自动 ,根据流的性质,决定是批处理还是流处理
//env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 批处理流, 一口气把数据算出来
// env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 流处理,默认是这个 可以通过打印批和流的处理结果,体会流和批的含义
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 将任务的并行度设置为2
// env.setParallelism(2);
// 通过这个获取系统的并行度
int parallelism = env.getParallelism();
System.out.println(parallelism);
// 获取数据 多态的写法 DataStreamSource 它是 DataStream 的子类
// 连着写的本质就是 因为每一个算子的返回值都是DataStream的子类,所以可以这么写
// 以下代码中路径是写死的,能不能通过外部传参进来,当然可以! agrs
DataStream<String> dataStream = null;
System.out.println(args.length);
if(args.length !=0){
String path ;
ParameterTool parameterTool = ParameterTool.fromArgs(args);
if(parameterTool.has("input")){
path = parameterTool.get("input");
}else{
path = args[0];
}
dataStream = env.readTextFile(path);
}else{
dataStream = env.fromElements("spark flink kafka", "spark sqoop flink", "kakfa hadoop flink");
}
dataStream.flatMap((String line, Collector<String> collector) -> {
String[] arr = line.split(" ");
for (String word : arr) {
// 循环遍历每一个切割完的数据,放入到收集器中,就可以形成一个新的DataStream
collector.collect(word);
}
}).returns(Types.STRING).map((String word)-> {
return Tuple2.of(word, 1); // ("hello",1)
}).returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy((Tuple2<String, Integer> tuple2)-> {
return tuple2.f0;
}).sum(1).print();
// 执行
env.execute();
}
}