当前位置: 首页 > article >正文

flinkSql 将流和表的互相转换

流——>表

方式一

方式二

方式一:写sql 
DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
// 表名,流,字段名称
tableEnv.createTemporaryView("t_1",source,$("word"));

方式二:使用dsl
DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
// 表名,流,字段名称
Table table = tableEnv.fromDataStream(source,$("word"));

 表——>流

Table table = tEnv.sqlQuery("select word,count(1) wordCount from t_1 group by word");

// 方式一:toAppendStream
DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);

// 报错:toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])

// 这个不支持分组和聚合操作,若出现聚合操作使用方式二将表转为流

//方式二:toRetractStream
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);

 wordCount案例

方式一:使用sql

package com.bigdata.day07;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @基本功能:
 * @program:flinkProject
 * @author: 堇年
 * @create:2024-11-28 14:42:27
 **/
public class _06_flink_wordcounnt {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 获取tableEnv对象
        // 通过env 获取一个table 环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
        SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(s);
                }
            }
        });
        //2. 创建表对象
        tEnv.createTemporaryView("t_1",flatMap,$("word"));
        //3. 编写sql语句
        Table table = tEnv.sqlQuery("select word,count(1) wordCount from t_1 group by word");
        //4. 将Table变为stream流
        //使用toAppendStream时会报错 因为有聚合操作
        //DataStream<Row> appendStream = tEnv.toAppendStream(table, Row.class);
        // toAppendStream doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, SUM(num) AS sumNum])
        // 在这里可以映射为ROW对象,也可以映射为自己定义的实体类
        DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(table, Row.class);
        retractStream.filter(new FilterFunction<Tuple2<Boolean, Row>>() {
            @Override
            public boolean filter(Tuple2<Boolean, Row> value) throws Exception {
                return value.f0;
            }
        }).print();


        //5. execute-执行
        env.execute();
    }
}

方式二:使用dsl语句 

package com.bigdata.day07;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;

public class _06_flink_wordcounnt_dsl {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 获取tableEnv对象
        // 通过env 获取一个table 环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8881);
        SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(s);
                }
            }
        });
        //2. 创建表对象
        Table table = tEnv.fromDataStream(flatMap,$("word"));

        //3. 编写sql语句
        Table rsTable = table.groupBy($("word")).select($("word"),$("word").count().as("wordcount"));
        rsTable.printSchema();

        //4. 将Table变为stream流

        DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(rsTable, Row.class);
        retractStream.filter(new FilterFunction<Tuple2<Boolean, Row>>() {
            @Override
            public boolean filter(Tuple2<Boolean, Row> value) throws Exception {
                return value.f0;
            }
        }).print();


        //5. execute-执行
        env.execute();
    }
}

结果展示 

+I 表示有一条新数据进行了插入
+U 表示有一条已存在的数据有插入了一条,需要进行更新
-U 在+U前表示,先删除原本的,在update新的

 


http://www.kler.cn/a/428931.html

相关文章:

  • lvm快照备份
  • 大模型UI:Gradio全解11——Chatbot:融合大模型的聊天机器人(4)
  • 搭建一个基于Spring Boot的书籍学习平台
  • Linux 操作二:文件映射与文件状态
  • 港湾周评|万科的多重压力
  • HackMyVM-Klim靶机的测试报告
  • mysql Kill脚本
  • 【干旱指数】非一致性干旱指数:SnsPI
  • 游戏引擎学习第34天
  • vscode通过ssh连接虚拟机进行开发
  • Next.js 系统性教学:深入理解缓存与数据优化策略
  • Spring07——AOP通知以及几个相关案例
  • 4.4 可靠传输的工作原理
  • 自然语言处理的未来愿景
  • 【Nacos03】消息队列与微服务之Nacos 集群部署
  • Android 还在使用LogCat打日志?XLog框架;日志打印到控制台,打印到文件中。
  • ES(elasticsearch)整合Spring boot使用实例
  • C—初阶结构体
  • 第23周:机器学习及文献阅读
  • 【postgresql初级使用】在表的多个频繁使用列上创建一个索引,多条件查询优化,多场景案例揭示索引失效
  • WEB_星河飞雪_Windows(全)
  • HCIA-Access V2.5_2_2网络通信基础_TCP/IP模型结构
  • Unity引擎UI滚动列表——滚动复用基础介绍
  • Javascript Clipper library, v6(介绍目录)
  • 六、隐语PIR功能及使用介绍
  • 漫画之家:基于Spring Boot的漫画社交网络平台