Flink转换算子——flatMap/map/filter/keyby/reduce综合案例
需求: 对流数据中的单词进行统计,排除敏感词TMD【腾讯美团滴滴】
此处用到了一个windows版本的软件 netcat,具体用法,先解压,然后在路径中输入cmd,来到黑窗口。
官网地址:netcat 1.11 for Win32/Win64
Netcat介绍及安装使用_netcat安装-CSDN博客
服务端的启动:
客户端就是双击 nc.exe 即可,里面无需写 nc 命令。
假如你想随时随地使用nc这个命令,需要配置环境变量。
代码演示:
package com.bigdata.day03;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;
public class ZongHeDemo {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//2. socket-加载数据
DataStream<String> dataStreamSource = env.socketTextStream("localhost", 8889);
dataStreamSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
return !line.contains("TMD");
}
}).flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] arr = line.split(" ");
for (String word : arr) {
collector.collect(word);
}
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return Tuple2.of(s,1);
}
}).keyBy(v -> v.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> t1) throws Exception {
return Tuple2.of(tuple2.f0,tuple2.f1 + t1.f1);
}
}).print();
//4. sink-数据输出
//5. execute-执行
env.execute();
}
}
package com.bigdata.source;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
class JdbcSource extends RichSourceFunction<String> {
Connection connection;
PreparedStatement statement;
@Override
public void open(Configuration parameters) throws Exception {
//使用jdbc
//Class.forName("com.jdbc.cj.mysql.Driver");
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/zuoye", "root", "123456");
statement = connection.prepareStatement("select word from fuck_words");
}
@Override
public void close() throws Exception {
statement.close();
connection.close();
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
ResultSet resultSet = statement.executeQuery();
while(resultSet.next()){
String word = resultSet.getString("word");
ctx.collect(word);
}
}
@Override
public void cancel() {
}
}
public class _07综合案例 {
public static void main(String[] args) throws Exception {
//1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//2. source-加载mysql数据库数据
DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);
//3. transformation-数据处理转换
DataStreamSource<String> jdbcSource = env.addSource(new JdbcSource());
jdbcSource.print();
ArrayList<String> words = new ArrayList<>();
jdbcSource.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
words.add(value);
return value;
}
}).print();
// 此路不通,因为我们的代码不是顺序执行的,而且我们的算子还是并行运行的 words没有任何值,悬案!
System.out.println(words);
String[] arr = {"tmd","fuck"};
// 此处的list 只能读取,不能修改和删除
List<String> list = Arrays.asList(arr);
dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] arr = value.split("\\s+");
for (String word : arr) {
// 此处完全可以直接将 不要的单词过滤掉,也可以将来使用filter方法过滤
out.collect(word);
}
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return !list.contains(value);
}
}).map(new MapFunction<String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value,1);
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).sum(1).print();
//4. sink-数据输出
//5. execute-执行
env.execute();
}
}