当前位置: 首页 > article >正文

flink学习(4)——方法的使用—对流的处理(keyBy,Reduce)

keyBy案例

package com.bigdata.day02;


public class _04_keyBy {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStreamSource<String> dataStreamSource = env.readTextFile("datas/a.log");
        //2. source-加载数据
        dataStreamSource.map(new MapFunction<String, LogBean>() {
            @Override
            public LogBean map(String s) throws Exception {
                String[] line = s.split("\\s+");
                LogBean logBean = new LogBean();
                logBean.setIp(line[0]);
                logBean.setUserId(Integer.parseInt(line[1]));
                logBean.setMethod(line[3]);
                // 17/05/2015:10:05:30
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:hh:mm:ss");
                Date date = simpleDateFormat.parse(line[2]);
                // 另一种方法
                Date date1 = DateUtils.parseDate(line[2], "dd/MM/yyyy:hh:mm:ss");
                logBean.setTimestamp(date1.getTime());
                logBean.setPath(line[4]);

                return logBean;
            }
        }).map(new MapFunction<LogBean, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(LogBean logBean) throws Exception {

                return new Tuple2<>(logBean.getIp(), 1);
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        }).print();


        env.execute();
    }
}
可以认为是将key值一样的数据放在一个分区中
new KeySelector<Tuple2<String, Integer>, String>()
第一个参数表示传入的值为Tuple2 ,String表示key是该类型的
public String getKey(Tuple2<String, Integer> stringIntegerTuple2)
可以看出返回值是String  类型的,也就是说可以随意指定按照某个key进行group(有点类似)

元组类型

单个字段keyBy
//用字段位置(已经被废弃)
wordAndOne.keyBy(0)

//用字段表达式
同上
多个字段keyBy
//用字段位置
wordAndOne.keyBy(0, 1);

//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
    @Override
    public Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {
        return Tuple2.of(value.f0, value.f1);
    }
});

//也就是说多个字段返回值就为Tuple类型

POJO

单个字段keyBy
source.keyBy(a -> a.getProvince());
多个字段keyBy
source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {
    @Override
    public Tuple2<String, String> getKey(PeopleCount value) throws Exception {
        return Tuple2.of(value.getProvince(), value.getCity());
    }
});
//也就是说多个字段返回值就为Tuple类型

Reduce案例

——sum的底层是reduce

package com.bigdata.day02;

public class _04_keyBy {

    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStreamSource<String> dataStreamSource = env.readTextFile("datas/a.log");
        KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = dataStreamSource.map(new MapFunction<String, LogBean>() {
            @Override
            public LogBean map(String s) throws Exception {
                String[] line = s.split("\\s+");
                LogBean logBean = new LogBean();
                logBean.setIp(line[0]);
                logBean.setUserId(Integer.parseInt(line[1]));
                logBean.setMethod(line[3]);
                // 17/05/2015:10:05:30
                Date date = DateUtils.parseDate(line[2], "dd/MM/yyyy:hh:mm:ss");
                logBean.setTimestamp(date.getTime());
                logBean.setPath(line[4]);   
                return logBean;
            }
        }).map(new MapFunction<LogBean, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(LogBean logBean) throws Exception {

                return new Tuple2<>(logBean.getIp(), 1);
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        });

//        tuple2StringKeyedStream.sum(1).print();
        tuple2StringKeyedStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {

                return new Tuple2<>(t1.f0,t1.f1+t2.f1);
            }
        }).print();

        env.execute();
    }
}

new ReduceFunction<Tuple2<String, Integer>>()

public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2)

t1 用于表示每一条数据,t2 用于表示 结果的累计 ,

其中t1 和 t2 的第一个值是一样的,t2 的第二个值 一直再累加,而t1的为1 1 1 1 1 1


http://www.kler.cn/a/406949.html

相关文章:

  • 【Mysql】开窗聚合函数----SUM,AVG, MIN,MAX
  • 生产制造领域的多元化模式探索
  • MyBatis Plus 项目的创建和使用
  • PG的并行查询
  • 大模型在智能客服中心领域的应用思考
  • 【Flask+Gunicorn+Nginx】部署目标检测模型API完整解决方案
  • Vue3 源码解析(三):静态提升
  • css样式覆盖
  • vue3 uniapp 扫普通链接或二维码打开小程序并获取携带参数
  • 什么是C++中的模板特化和偏特化?
  • 嵌入式:Flash的分类以及Jlink/J-flash的编程支持
  • 使用itextpdf进行pdf模版填充中文文本时部分字不显示问题
  • 超详细:Redis分布式锁
  • Java三大特性:封装、继承、多态【详解】
  • Leetcode148. 排序链表(HOT100)
  • 云轴科技ZStack亮相2024 IDC中国生态峰会,共塑AI时代IT生态新格局
  • 递归算法专题一>Pow(x, n)
  • 计算机毕业设计Python+卷积神经网络CNN交通标志识别 机器学习 深度学习 爬虫 数据可视化 人工智能 模型训练
  • Node.js 和 Socket.IO 实现实时通信
  • 【在Linux世界中追寻伟大的One Piece】多线程(一)
  • ElasticSearch学习笔记四:基础操作(二)
  • Android 基于Camera2 API进行摄像机图像预览
  • Unity DOTS中的Entity
  • 每日计划-1122
  • Linux上安装单机版Kibana6.8.1
  • pytest框架实现一些前后置(固件,夹具)处理,常用三种