当前位置: 首页 > article >正文

Flink流批一体计算(23):Flink SQL之多流kafka写入多个mysql sink

目录

1. 准备工作

生成数据

创建数据表

2. 创建数据表

创建数据源表

创建数据目标表

3. 计算

WITH子句


1. 准备工作

生成数据

source kafka json 数据格式 :

topic  case_kafka_mysql:

{"ts": "20201011","id": 8,"price_amt":211}

topic  flink_test_2:

{"id": 8,"coupon_price_amt":100}

注意:针对双流中的每条记录都发触发

topic: case_kafka_mysql

docker exec -it 192d1369463a bash

bash-5.1# cd /opt/kafka_2.12-2.5.0/bin

bash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic case_kafka_mysql

>{"ts": "20201011","id": 8,"price_amt":211}

topic: flink_test_2

docker exec -it 192d1369463a bash

bash-5.1# cd /opt/kafka_2.12-2.5.0/bin

bash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic flink_test_2

>{"id": 8,"coupon_price_amt":100}

创建数据表

mysql 建表语句

CREATE TABLE `sync_test_2` (

  `id` bigint(11) NOT NULL AUTO_INCREMENT,

  `ts` varchar(64) DEFAULT NULL,

  `total_gmv` bigint(11) DEFAULT NULL,

  PRIMARY KEY (`id`),

  UNIQUE KEY `uidx` (`ts`) USING BTREE

) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;



CREATE TABLE `sync_test_22` (

  `id` bigint(11) NOT NULL AUTO_INCREMENT,

  `ts` varchar(64) DEFAULT NULL,

  `coupon_ratio` double DEFAULT NULL,

  PRIMARY KEY (`id`),

  UNIQUE KEY `uidx` (`ts`) USING BTREE

) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

2. 创建数据表

创建数据源表
create table flink_test_2_1 (
  id BIGINT,
  ts VARCHAR,
  price_amt BIGINT,
  proctime AS PROCTIME ()
)
 with (
   'connector' = 'kafka',
   'topic' = 'case_kafka_mysql',
   'properties.bootstrap.servers' = '127.0.0.1:9092',
   'properties.group.id' = 'flink_gp_test2-1',
   'scan.startup.mode' = 'earliest-offset',
   'format' = 'json',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true',
   'properties.zookeeper.connect' = '127.0.0.1:2181/kafka'
 );

create table flink_test_2_2 (
  id BIGINT,
  coupon_price_amt BIGINT,
  proctime AS PROCTIME ()
)
 with (
   'connector' = 'kafka',
   'topic' = 'flink_test_2',
   'properties.bootstrap.servers' = '127.0.0.1:9092',
   'properties.group.id' = 'flink_gp_test2-2',
   'scan.startup.mode' = 'earliest-offset',
   'format' = 'json',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true',
   'properties.zookeeper.connect' = '127.0.0.1:2181/kafka'
 );

关键配置的说明:

json.fail-on-missing-field:在json缺失字段时是否报错

json.ignore-parse-errors:在解析json失败时是否报错

一般无法保证json格式,所以以上两个配置是比较重要的。

创建数据目标表
CREATE TABLE sync_test_2 (
                   ts string,
                   total_gmv bigint,
                   PRIMARY KEY (ts) NOT ENFORCED

 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
   'table-name' = 'sync_test_2',
   'username' = 'root',
   'password' = 'Admin'
 );

CREATE TABLE sync_test_22 (
                   ts string,
                   coupon_ration bigint,
                   PRIMARY KEY (ts) NOT ENFORCED
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
   'table-name' = 'sync_test_2',
   'username' = 'root',
   'password' = 'Admin'
 );

3. 计算

一个作业中写入一个Sink或多个Sink

说明 写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。

BEGIN STATEMENT SET;      --写入多个Sink时,必填。
INSERT INTO sync_test_2
SELECT
  ts,
  SUM(price_amt - coupon_price_amt) AS total_gmv
FROM
  (
    SELECT
      a.ts as ts,
      a.price_amt as price_amt,
      b.coupon_price_amt as coupon_price_amt
    FROM
      flink_test_2_1 as a
      LEFT JOIN flink_test_2_2 b on b.id = a.id
  )
GROUP BY ts;

INSERT INTO sync_test_22
SELECT
  ts,
  sum(coupon_price_amt)/sum(amount) AS coupon_ration
FROM
  (
    SELECT
      a.ts as ts,
      a.price_amt as price_amt,
      b.coupon_price_amt as coupon_price_amt
    FROM
      flink_test_2_1 as a
      LEFT JOIN flink_test_2_2 b on b.id = a.id
  )
GROUP BY ts;;
END;      --写入多个Sink时,必填。

WITH子句

WITH提供了一种编写辅助语句以用于更大的查询的方法。这些语句通常被称为公共表表达式(CTE),可以被视为定义仅针对一个查询存在的临时视图。

改写上述查询:

BEGIN STATEMENT SET;      --写入多个Sink时,必填。
with orders_with_coupon AS (
    SELECT
      a.ts as ts,
      a.price_amt as price_amt,
      b.coupon_price_amt as coupon_price_amt
    FROM
      flink_test_2_1 as a
      LEFT JOIN flink_test_2_2 b on b.id = a.id
)

INSERT INTO sync_test_2
SELECT
  ts,
  SUM(price_amt - coupon_price_amt) AS total_gmv
FROM orders_with_coupon
GROUP BY ts;

INSERT INTO sync_test_22
SELECT
  ts,
  coupon_price_amt/price_amt AS coupon_ration
FROM orders_with_coupon
GROUP BY ts;;
END;      --写入多个Sink时,必填。


http://www.kler.cn/a/163297.html

相关文章:

  • Python 小高考篇(2)字符串
  • 使用Matlab神经网络工具箱
  • 由于找不到mfc120u.dll, 无法继续执行代码。重新安装程序可能解决引问题。
  • 基于STM32的智能充电桩:集成RTOS、MQTT与SQLite的先进管理系统设计思路
  • js-将JavaScript对象或值转换为JSON字符串 JSON.stringify(this.SelectDataListCourse)
  • AIGC专栏17——EasyAnimate V5版本详解 应用MMDIT结构,拓展模型规模到12B 支持不同控制输入的控制模型
  • 达梦数据库dm8守护集群部署手册
  • 浅谈Elastic Stack组件集成和应用
  • (时域和频域)控制系统响应速度和稳定性分析
  • 三种定时任务总结
  • C# .NET平台提取PDF表格数据,并转换为txt、CSV和Excel表格文件
  • 【51单片机系列】74HC595实现对LED点阵的控制
  • JS中的闭包
  • 做数据分析为何要学统计学(5)——什么问题适合使用t检验?
  • C语言 - 字符函数和字符串函数
  • 【vSphere | VM】虚拟机自定义规范Ⅲ —— 创建 Linux 虚拟机自定义规范
  • mongdb配置ssl
  • 用Python实现十大经典排序算法(附动图)
  • 周赛374(枚举、思维题、分组循环+枚举、组合数学)
  • Docker网络原理及Cgroup硬件资源占用控制
  • Apollo新版本Beta自动驾驶技术沙龙参会体验有感—百度自动驾驶开源框架
  • 电脑软件:TileIconifier开始菜单美化工具介绍
  • 大电流H桥电机驱动电路的设计与解析(包括自举电路的讲解,以IR2104+LR7843为例)
  • linux常用命令-yum命令详解(超详细)
  • 视频处理关键知识
  • 用C++实现队列顺序结构的基本操作