Flinksql 模拟 视图 监听
工作中遇到这样的一个业务,业务方给的是一个视图,查了一下文档视图不能监听,这个时候想着要不要用datastream去自定义,然后发现flinksql也是可以实现
创建对应数据库和表
-- 创建班级表 tb_class
CREATE TABLE tb_class (
class_id INT PRIMARY KEY AUTO_INCREMENT, -- 主键
class_name VARCHAR(50) NOT NULL -- 班级名称
);
-- 创建学生表 tb_student
CREATE TABLE tb_student (
student_id INT PRIMARY KEY AUTO_INCREMENT, -- 主键
student_name VARCHAR(50) NOT NULL, -- 姓名
class_id INT, -- 班级ID
FOREIGN KEY (class_id) REFERENCES tb_class(class_id) -- 外键关联到 tb_class
);
-- 向 tb_class 表中插入数据
INSERT INTO tb_class (class_name) VALUES ('Class A');
INSERT INTO tb_class (class_name) VALUES ('Class B');
INSERT INTO tb_class (class_name) VALUES ('Class C');
-- 向 tb_student 表中插入数据
INSERT INTO tb_student (student_name, class_id) VALUES ('Alice', 1);
INSERT INTO tb_student (student_name, class_id) VALUES ('Bob', 2);
INSERT INTO tb_student (student_name, class_id) VALUES ('Charlie', 3);
INSERT INTO tb_student (student_name, class_id) VALUES ('Diana', 1);
-- 创建视图 tb_student_view
CREATE VIEW tb_student_view AS
SELECT
s.student_id AS student_id, -- 主键
s.student_name AS student_name, -- 学生姓名
c.class_name AS class_name -- 班级名称
FROM
tb_student s
JOIN
tb_class c ON s.class_id = c.class_id;
select * from tb_student_view;
-- 创建 ads_student 表
CREATE TABLE ads_student (
student_id INT PRIMARY KEY AUTO_INCREMENT, -- 主键
student_name VARCHAR(50) NOT NULL, -- 学生姓名
class_name VARCHAR(50) NOT NULL -- 班级名称
);
编写对应的flinksql代码,这里没有flinksql客户端,只能在idea上完成了
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQLJoinExample {
public static void main(String[] args) throws Exception {
// 设置流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建 Table 环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
env.setParallelism(2); // 将并行度设置为2,根据需要调整
// 定义 tb_student 表
tableEnv.executeSql(
"CREATE TABLE tb_student (" +
" student_id INT, " +
" student_name STRING, " +
" class_id INT ," +
" PRIMARY KEY (student_id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'mysql-cdc', " + // 使用 datagen 连接器模拟数据,开发测试用
" 'hostname' = 'localhost'," +
" 'port' = '3307', " +
" 'username' = 'root', " +
" 'password' = '123456'," +
" 'database-name' = 't2', " +
" 'scan.incremental.snapshot.enabled' = 'false', " +
" 'table-name' = 'tb_student' " +
")"
);
// 定义 tb_class 表
tableEnv.executeSql(
"CREATE TABLE tb_class (" +
" class_id INT, " +
" class_name STRING, " +
" PRIMARY KEY (class_id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'mysql-cdc', " + // 使用 datagen 连接器模拟数据,开发测试用
" 'hostname' = 'localhost'," +
" 'port' = '3307', " +
" 'username' = 'root', " +
" 'password' = '123456'," +
" 'database-name' = 't2', " +
" 'scan.incremental.snapshot.enabled' = 'false', " +
" 'table-name' = 'tb_class' " +
")"
);
// 定义 ads_student 表,使用 jdbc 连接器作为目标表
tableEnv.executeSql(
"CREATE TABLE ads_student (" +
" student_id INT, " +
" student_name STRING, " +
" class_name STRING, " +
" PRIMARY KEY (student_id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://localhost:3307/t2', " +
" 'table-name' = 'ads_student', " +
" 'username' = 'root', " +
" 'password' = '123456' " +
")"
);
// 执行 JOIN 查询并插入到 ads_student 表
tableEnv.executeSql(
"INSERT INTO ads_student " +
"SELECT s.student_id, s.student_name, c.class_name " +
"FROM tb_student AS s " +
"JOIN tb_class AS c ON s.class_id = c.class_id"
);
// 启动任务
env.execute("Flink SQL Join Example");
}
}
注意要加上额外的pom依赖
# sink 使用的是jdbc连接的方式
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.2.0-1.19</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.15.0</version>
</dependency>