使用 Flink CDC 构建 Streaming ETL
安装并配置 Flink
1.下载 Flink 1.20.0
curl -k -O https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
2.解压文件
tar -xzvf flink-1.20.0-bin-scala_2.12.tgz
3.移动到目标目录(可选)
将解压后的 Flink 目录移动到 /opt
或其他目标位置(可选):
sudo mv flink-1.20.0 /opt/flink
4. 配置环境变量
为了方便使用,可以将 Flink 的 bin
目录添加到系统的 PATH
环境变量中。编辑 ~/.bashrc
文件:
vi ~/.bashrc
添加以下内容:
export FLINK_HOME=/opt/flink
export PATH=$FLINK_HOME/bin:$PATH
保存并退出文件后,运行以下命令使修改生效:
source ~/.bashrc
5. 配置 Flink
Flink 默认已经配置好了一些基本设置,不需要集群配置,可以跳过 masters
和 workers
文件的配置。只需要调整一些参数,比如内存配置,或者其他作业配置。
可以根据需要修改 Flink 配置文件 config.yaml
,文件位于 /opt/flink/conf
目录下:
目前配置中的 bind-host
设置为 localhost
,这意味着 Flink 只能绑定到本地接口,无法接收来自其他机器的请求。需要将其改为 0.0.0.0
,使 Flink 能够绑定所有网络接口。
cd /opt/flink/conf
vi config.yaml
jobmanager:
bind-host:0.0.0.0
rpc:
address:0.0.0.0
port:6123
memory:
process:
size:1600m
execution:
failover-strategy:region
taskmanager:
bind-host:0.0.0.0
host:0.0.0.0
numberOfTaskSlots:1
memory:
process:
size:1728m
parallelism:
default:1
rest:
address:0.0.0.0
bind-address:0.0.0.0
6. 启动 Flink
cd /opt/flink
./bin/start-cluster.sh
关闭Flink
./bin/stop-cluster.sh
7. 访问 Flink Web UI
Flink 会启动一个 Web UI,默认地址为http://<your_server_ip>:8081
例如:http://192.168.173.67:8081
,可以在浏览器中访问它来查看 Flink 集群的状态、提交作业等。
使用 Flink CDC 构建 Streaming ETL
假设,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres 中。 对于订单表,为了方便进行分析,我们希望让它关联上其对应的商品和物流信息,构成一张宽表,并且实时把它写到 ElasticSearch 中。
接下来的内容将介绍如何使用 Flink Mysql/Postgres CDC 来实现这个需求,系统的整体架构如下图所示:
下载 Flink 和所需要的依赖包
下载下面列出的依赖包,并将它们放到目录 lib/
下:
• flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
• flink-sql-connector-mysql-cdc-3.2.1.jar
• flink-sql-connector-postgres-cdc-3.2.1.jar
准备数据
在 MySQL 数据库中准备数据
创建数据库和表products
,orders
并插入数据
-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATETABLE products (
id INTEGERNOTNULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255)NOTNULL,
description VARCHAR(512)
);
ALTERTABLE products AUTO_INCREMENT =101;
INSERTINTO products
VALUES(default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATETABLE orders (
order_id INTEGERNOTNULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOTNULL,
customer_name VARCHAR(255)NOTNULL,
price DECIMAL(10,5)NOTNULL,
product_id INTEGERNOTNULL,
order_status BOOLEANNOTNULL-- Whether order has been placed
) AUTO_INCREMENT =10001;
INSERTINTO orders
VALUES(default,'2020-07-30 10:08:22','Jark',50.50,102,false),
(default,'2020-07-30 10:11:09','Sally',15.00,105,false),
(default,'2020-07-30 12:00:30','Edward',25.25,106,false);
在 Postgres 数据库中准备数据
创建表shipments
,并插入数据
-- PG
CREATETABLE shipments (
shipment_id SERIAL NOTNULLPRIMARY KEY,
order_id SERIAL NOTNULL,
origin VARCHAR(255)NOTNULL,
destination VARCHAR(255)NOTNULL,
is_arrived BOOLEANNOTNULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH1001;
ALTERTABLEpublic.shipments REPLICA IDENTITYFULL;
INSERTINTO shipments
VALUES(default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
启动 Flink SQL CLI
启动 Flink SQL CLI
./bin/sql-client.sh
启动成功后,可以看到如下的页面:
在 Flink SQL CLI 中使用 Flink DDL 创建表
首先,开启 checkpoint,每隔3秒做一次 checkpoint
-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s;
然后, 对于数据库中的表 products
, orders
, shipments
, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据
-- Flink SQL
FlinkSQL>CREATETABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id)NOT ENFORCED
)WITH(
'connector'='mysql-cdc',
'hostname'='localhost',
'port'='3306',
'username'='root',
'password'='123456',
'database-name'='mydb',
'table-name'='products'
);
FlinkSQL>CREATETABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10,5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id)NOT ENFORCED
)WITH(
'connector'='mysql-cdc',
'hostname'='localhost',
'port'='3306',
'username'='root',
'password'='123456',
'database-name'='mydb',
'table-name'='orders'
);
FlinkSQL>CREATETABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id)NOT ENFORCED
)WITH(
'connector'='postgres-cdc',
'hostname'='localhost',
'port'='5432',
'username'='postgres',
'password'='postgres',
'database-name'='postgres',
'schema-name'='public',
'table-name'='shipments',
'slot.name'='flink'
);
最后,创建 enriched_orders
表, 用来将关联后的订单数据写入 Elasticsearch 中
-- Flink SQL
FlinkSQL>CREATETABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10,5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id)NOT ENFORCED
)WITH(
'connector'='elasticsearch-7',
'hosts'='http://localhost:9200',
'index'='enriched_orders'
);
关联订单数据并且将其写入 Elasticsearch 中
使用 Flink SQL 将订单表 order
与 商品表 products
,物流信息表 shipments
关联,并将关联后的订单信息写入 Elasticsearch 中
-- Flink SQL
Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;
现在,就可以在 Kibana 中看到包含商品和物流信息的订单数据。
首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders
.
然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.
接下来,修改 MySQL 和 Postgres 数据库中表的数据,Kibana中显示的订单数据也将实时更新:
1. 在 MySQL 的
orders
表中插入一条数据--MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);2. 在 Postgres 的
shipment
表中插入一条数据--PG
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);3. 在 MySQL 的
orders
表中更新订单的状态--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;4. 在 Postgres 的
shipment
表中更新物流的状态--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;5. 在 MYSQL 的
orders
表中删除一条数据--MySQL
每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新,如下所示:
DELETE FROM orders WHERE order_id = 10004;