当前位置: 首页 > article >正文

Flink转换算子——flatMap/map/filter/keyby/reduce综合案例

需求: 对流数据中的单词进行统计,排除敏感词TMD【腾讯美团滴滴】
此处用到了一个windows版本的软件 netcat,具体用法,先解压,然后在路径中输入cmd,来到黑窗口。

 官网地址:netcat 1.11 for Win32/Win64
Netcat介绍及安装使用_netcat安装-CSDN博客

服务端的启动:

image.png


客户端就是双击 nc.exe 即可,里面无需写 nc 命令。

image.png

假如你想随时随地使用nc这个命令,需要配置环境变量。
代码演示:

package com.bigdata.day03;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;


public class ZongHeDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. socket-加载数据
        DataStream<String> dataStreamSource = env.socketTextStream("localhost", 8889);

        dataStreamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String line) throws Exception {
                return !line.contains("TMD");
            }
        }).flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] arr = line.split(" ");
                for (String word : arr) {

                    collector.collect(word);

                }
            }
        }).map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return Tuple2.of(s,1);
            }
        }).keyBy(v -> v.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> t1) throws Exception {
                return Tuple2.of(tuple2.f0,tuple2.f1 + t1.f1);
            }
        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}
package com.bigdata.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

class JdbcSource extends RichSourceFunction<String> {

    Connection connection;
    PreparedStatement statement;
    @Override
    public void open(Configuration parameters) throws Exception {
         //使用jdbc
         //Class.forName("com.jdbc.cj.mysql.Driver");
         connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/zuoye", "root", "123456");
         statement = connection.prepareStatement("select word from fuck_words");
    }

    @Override
    public void close() throws Exception {
        statement.close();
        connection.close();
    }

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        ResultSet resultSet = statement.executeQuery();
        while(resultSet.next()){
            String word = resultSet.getString("word");
            ctx.collect(word);
        }
    }

    @Override
    public void cancel() {

    }
}

public class _07综合案例 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //2. source-加载mysql数据库数据
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);
        //3. transformation-数据处理转换

        DataStreamSource<String> jdbcSource = env.addSource(new JdbcSource());
        jdbcSource.print();
        ArrayList<String> words = new ArrayList<>();
        jdbcSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                words.add(value);
                return value;
            }
        }).print();
        // 此路不通,因为我们的代码不是顺序执行的,而且我们的算子还是并行运行的  words没有任何值,悬案!
        System.out.println(words);


        String[] arr = {"tmd","fuck"};
        // 此处的list 只能读取,不能修改和删除
        List<String> list = Arrays.asList(arr);

        dataStreamSource.flatMap(new FlatMapFunction<String, String>() {


            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] arr = value.split("\\s+");
                for (String word : arr) {
                    // 此处完全可以直接将 不要的单词过滤掉,也可以将来使用filter方法过滤
                    out.collect(word);
                }
            }
        }).filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return !list.contains(value);
            }
        }).map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return new Tuple2<>(value,1);
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        }).sum(1).print();
        //4. sink-数据输出

        //5. execute-执行
        env.execute();
    }
}


http://www.kler.cn/a/408752.html

相关文章:

  • 基于Java Springboot公园管理系统
  • python如何解压缩文件或文件夹
  • Swift 实现查找链表入环点:快慢指针法
  • 数仓工具—Hive语法之窗口函数中的 case when
  • 基于阿里云服务器部署静态的website
  • JDBC 详解:从基础到高级完全指南
  • meterpreter常用命令 上
  • Python爬虫:如何优雅地获取1688商品详情接口
  • 使用windows窗口展示go-echarts图表
  • Stable Diffusion中的自注意力替换技术与Diffusers实现
  • React中Ant Design组件日期编辑回显
  • 【FPGA开发】Vivado自定义封装IP核,绑定总线
  • ajax (一)
  • timm库加载的模型可视化
  • 【Python-办公自动化】实现自动化输出模板表格报告
  • MongoDB 中设置登录账号密码可以通过以下步骤实现
  • 基于SSM的婚庆管理系统+LW示例参考
  • 了解rk3588单片机
  • 大模型工程化部署:使用FastChat部署基于OpenAI API兼容大模型服务
  • 应用案例 | 西门子能源选用ASPION G-Log 2冲击记录仪,揭秘高压开关设备运输背后的安全保障
  • UG NX二次开发(C++)-UIStyler-指定平面的对象和参数获取
  • 零基础学指针(上)
  • Python爬取豆瓣电影全部分类数据并存入数据库
  • 【大数据学习 | Spark-Core】关于distinct算子
  • STM32完全学习——使用标准库完成PWM输出
  • Spring Cloud Consul实现选举机制