当前位置: 首页 > article >正文

Flinksql 模拟 视图 监听

工作中遇到这样的一个业务,业务方给的是一个视图,查了一下文档视图不能监听,这个时候想着要不要用datastream去自定义,然后发现flinksql也是可以实现
创建对应数据库和表

-- 创建班级表 tb_class
CREATE TABLE tb_class (
    class_id INT PRIMARY KEY AUTO_INCREMENT,  -- 主键
    class_name VARCHAR(50) NOT NULL           -- 班级名称
);

-- 创建学生表 tb_student
CREATE TABLE tb_student (
    student_id INT PRIMARY KEY AUTO_INCREMENT,  -- 主键
    student_name VARCHAR(50) NOT NULL,          -- 姓名
    class_id INT,                               -- 班级ID
    FOREIGN KEY (class_id) REFERENCES tb_class(class_id)  -- 外键关联到 tb_class
);
-- 向 tb_class 表中插入数据
INSERT INTO tb_class (class_name) VALUES ('Class A');
INSERT INTO tb_class (class_name) VALUES ('Class B');
INSERT INTO tb_class (class_name) VALUES ('Class C');

-- 向 tb_student 表中插入数据
INSERT INTO tb_student (student_name, class_id) VALUES ('Alice', 1);
INSERT INTO tb_student (student_name, class_id) VALUES ('Bob', 2);
INSERT INTO tb_student (student_name, class_id) VALUES ('Charlie', 3);
INSERT INTO tb_student (student_name, class_id) VALUES ('Diana', 1);

-- 创建视图 tb_student_view
CREATE VIEW tb_student_view AS
SELECT 
    s.student_id AS student_id,       -- 主键
    s.student_name AS student_name,   -- 学生姓名
    c.class_name AS class_name        -- 班级名称
FROM 
    tb_student s
JOIN 
    tb_class c ON s.class_id = c.class_id;
		
		select * from tb_student_view;

-- 创建 ads_student 表
CREATE TABLE ads_student (
    student_id INT PRIMARY KEY AUTO_INCREMENT,  -- 主键
    student_name VARCHAR(50) NOT NULL,          -- 学生姓名
    class_name VARCHAR(50) NOT NULL             -- 班级名称
);

编写对应的flinksql代码,这里没有flinksql客户端,只能在idea上完成了

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQLJoinExample {
    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建 Table 环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        env.setParallelism(2);  // 将并行度设置为2,根据需要调整
        // 定义 tb_student 表
        tableEnv.executeSql(
                "CREATE TABLE tb_student (" +
                        "   student_id INT, " +
                        "   student_name STRING, " +
                        "   class_id INT ," +
                        "   PRIMARY KEY (student_id) NOT ENFORCED" +
                        ") WITH (" +
                        "   'connector' = 'mysql-cdc', " +  // 使用 datagen 连接器模拟数据,开发测试用
                        "    'hostname' = 'localhost'," +
                        "  'port' = '3307', " +
                        "   'username' = 'root', " +
                        "   'password' = '123456'," +
                        "    'database-name' = 't2', " +
                        "    'scan.incremental.snapshot.enabled' = 'false', " +

                        "   'table-name' = 'tb_student' " +
                        ")"
        );

        // 定义 tb_class 表
        tableEnv.executeSql(
                "CREATE TABLE tb_class (" +
                        "   class_id INT, " +
                        "   class_name STRING, " +
                        "   PRIMARY KEY (class_id) NOT ENFORCED" +
                        ") WITH (" +
                        "  'connector' = 'mysql-cdc', " +  // 使用 datagen 连接器模拟数据,开发测试用
                        "    'hostname' = 'localhost'," +
                        "  'port' = '3307', " +
                        "   'username' = 'root', " +
                        "   'password' = '123456'," +
                        "    'database-name' = 't2', " +
                        "    'scan.incremental.snapshot.enabled' = 'false', " +

                        "   'table-name' = 'tb_class' " +
                        ")"
        );


        // 定义 ads_student 表,使用 jdbc 连接器作为目标表
        tableEnv.executeSql(
                "CREATE TABLE ads_student (" +
                        "   student_id INT, " +
                        "   student_name STRING, " +
                        "   class_name STRING, " +
                        "   PRIMARY KEY (student_id) NOT ENFORCED" +
                        ") WITH (" +
                        "   'connector' = 'jdbc', " +
                        "   'url' = 'jdbc:mysql://localhost:3307/t2', " +
                        "   'table-name' = 'ads_student', " +
                        "   'username' = 'root', " +
                        "   'password' = '123456' " +
                        ")"
        );

        // 执行 JOIN 查询并插入到 ads_student 表
        tableEnv.executeSql(
                "INSERT INTO ads_student " +
                        "SELECT s.student_id, s.student_name, c.class_name " +
                        "FROM tb_student AS s " +
                        "JOIN tb_class AS c ON s.class_id = c.class_id"
        );

        // 启动任务
        env.execute("Flink SQL Join Example");
    }
}

注意要加上额外的pom依赖

# sink 使用的是jdbc连接的方式
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-jdbc</artifactId>
      <version>3.2.0-1.19</version>
    </dependency>


<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.15.0</version>
</dependency>

http://www.kler.cn/a/382422.html

相关文章:

  • 继承(7)
  • 大模型技术与应用:从幻觉到蒸馏,全面解析
  • Linux内核TTY子系统有什么(6)
  • 人工智能与物联网:智慧城市的未来
  • PLC实现HTTP协议JSON格式数据上报对接的参数配置说明
  • docker搭建atlassian-confluence:7.2.0
  • Python(PySimpleGUI 库)
  • gulp入门教程16:gulp插件gulp-uglify
  • 软件测试学习笔记丨Flask操作数据库-一对多
  • 电商行业企业员工培训的在线知识库构建
  • git常用操作指令
  • oasys系统代码审计
  • mmsegmentation训练自己的数据集
  • java语言基本编程原理
  • 5.Java 数组(一维数组、二维数组、数组实例实操)
  • ubuntu20安装opencv3.2记录
  • 洛谷P1090 [NOIP2004 提高组] 合并果子
  • Halcon 从XML中读取配置参数
  • 系统思考—深层结构
  • 《Ooga》进不去游戏解决方法
  • Java基础-组件及事件处理(下)
  • C语言程序的机器表示(逆向+函数调用栈详解版)
  • 情怀系列国际版棋牌完整源码具备强大的多语言扩展功能,涵盖了900多款子游戏,专为全球市场的游戏开发和运营设计。
  • 关于SwitchCase中变量定义及使用变量的一些注意事项参数传递参数时不能实现多态动态绑定的问题c++语法
  • 基于C#实现Windows后台窗口操作与图像处理技术分析
  • 【Linux】从零开始使用多路转接IO --- select