当前位置: 首页 > article >正文

使用 Flink CDC 构建 Streaming ETL

安装并配置 Flink

1.下载 Flink 1.20.0
curl -k -O https://archive.apache.org/dist/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
2.解压文件
tar -xzvf flink-1.20.0-bin-scala_2.12.tgz
3.移动到目标目录(可选)

将解压后的 Flink 目录移动到 /opt 或其他目标位置(可选):

sudo mv flink-1.20.0 /opt/flink
4. 配置环境变量

为了方便使用,可以将 Flink 的 bin 目录添加到系统的 PATH 环境变量中。编辑 ~/.bashrc 文件:

vi ~/.bashrc

添加以下内容:

export FLINK_HOME=/opt/flink
export PATH=$FLINK_HOME/bin:$PATH

保存并退出文件后,运行以下命令使修改生效:

source ~/.bashrc
5. 配置 Flink

Flink 默认已经配置好了一些基本设置,不需要集群配置,可以跳过 masters 和 workers 文件的配置。只需要调整一些参数,比如内存配置,或者其他作业配置。

可以根据需要修改 Flink 配置文件 config.yaml,文件位于 /opt/flink/conf 目录下:

目前配置中的 bind-host 设置为 localhost,这意味着 Flink 只能绑定到本地接口,无法接收来自其他机器的请求。需要将其改为 0.0.0.0,使 Flink 能够绑定所有网络接口。

cd /opt/flink/conf
vi config.yaml
jobmanager:
  bind-host:0.0.0.0
rpc:
  address:0.0.0.0
  port:6123
  memory:
    process:
      size:1600m
  execution:
    failover-strategy:region

taskmanager:
  bind-host:0.0.0.0
  host:0.0.0.0
  numberOfTaskSlots:1
  memory:
    process:
      size:1728m

parallelism:
  default:1

rest:
  address:0.0.0.0
  bind-address:0.0.0.0
6. 启动 Flink
cd /opt/flink
./bin/start-cluster.sh

关闭Flink

./bin/stop-cluster.sh
7. 访问 Flink Web UI

Flink 会启动一个 Web UI,默认地址为http://<your_server_ip>:8081 例如:http://192.168.173.67:8081,可以在浏览器中访问它来查看 Flink 集群的状态、提交作业等。

830acf27fac8b1665693da6fe478734b.png

使用 Flink CDC 构建 Streaming ETL

假设,商品和订单的数据存储在 MySQL 中,订单对应的物流信息存储在 Postgres 中。 对于订单表,为了方便进行分析,我们希望让它关联上其对应的商品和物流信息,构成一张宽表,并且实时把它写到 ElasticSearch 中。

接下来的内容将介绍如何使用 Flink Mysql/Postgres CDC 来实现这个需求,系统的整体架构如下图所示:

92bd35a864146054a10317d951bf05f5.png

下载 Flink 和所需要的依赖包

下载下面列出的依赖包,并将它们放到目录 lib/ 下:

  • • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar

  • • flink-sql-connector-mysql-cdc-3.2.1.jar

  • • flink-sql-connector-postgres-cdc-3.2.1.jar

准备数据

在 MySQL 数据库中准备数据

创建数据库和表productsorders 并插入数据

-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATETABLE products (
  id INTEGERNOTNULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255)NOTNULL,
  description VARCHAR(512)
);
ALTERTABLE products AUTO_INCREMENT =101;

INSERTINTO products
VALUES(default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");

CREATETABLE orders (
  order_id INTEGERNOTNULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOTNULL,
  customer_name VARCHAR(255)NOTNULL,
  price DECIMAL(10,5)NOTNULL,
  product_id INTEGERNOTNULL,
  order_status BOOLEANNOTNULL-- Whether order has been placed
) AUTO_INCREMENT =10001;

INSERTINTO orders
VALUES(default,'2020-07-30 10:08:22','Jark',50.50,102,false),
(default,'2020-07-30 10:11:09','Sally',15.00,105,false),
(default,'2020-07-30 12:00:30','Edward',25.25,106,false);
在 Postgres 数据库中准备数据

创建表shipments,并插入数据

-- PG
CREATETABLE shipments (
  shipment_id SERIAL NOTNULLPRIMARY KEY,
  order_id SERIAL NOTNULL,
  origin VARCHAR(255)NOTNULL,
  destination VARCHAR(255)NOTNULL,
  is_arrived BOOLEANNOTNULL
);
ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH1001;
ALTERTABLEpublic.shipments REPLICA IDENTITYFULL;
INSERTINTO shipments
VALUES(default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
启动 Flink SQL CLI

启动 Flink SQL CLI

./bin/sql-client.sh

启动成功后,可以看到如下的页面:

5805911356640be994448a17b5530c00.png

在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

然后, 对于数据库中的表 productsordersshipments, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

-- Flink SQL
FlinkSQL>CREATETABLE products (
    id INT,
    name STRING,
    description STRING,
PRIMARY KEY (id)NOT ENFORCED
)WITH(
'connector'='mysql-cdc',
'hostname'='localhost',
'port'='3306',
'username'='root',
'password'='123456',
'database-name'='mydb',
'table-name'='products'
);

FlinkSQL>CREATETABLE orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10,5),
   product_id INT,
   order_status BOOLEAN,
PRIMARY KEY (order_id)NOT ENFORCED
)WITH(
'connector'='mysql-cdc',
'hostname'='localhost',
'port'='3306',
'username'='root',
'password'='123456',
'database-name'='mydb',
'table-name'='orders'
);

FlinkSQL>CREATETABLE shipments (
   shipment_id INT,
   order_id INT,
   origin STRING,
   destination STRING,
   is_arrived BOOLEAN,
PRIMARY KEY (shipment_id)NOT ENFORCED
)WITH(
'connector'='postgres-cdc',
'hostname'='localhost',
'port'='5432',
'username'='postgres',
'password'='postgres',
'database-name'='postgres',
'schema-name'='public',
'table-name'='shipments',
'slot.name'='flink'
 );

最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中

-- Flink SQL
FlinkSQL>CREATETABLE enriched_orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10,5),
   product_id INT,
   order_status BOOLEAN,
   product_name STRING,
   product_description STRING,
   shipment_id INT,
   origin STRING,
   destination STRING,
   is_arrived BOOLEAN,
PRIMARY KEY (order_id)NOT ENFORCED
)WITH(
'connector'='elasticsearch-7',
'hosts'='http://localhost:9200',
'index'='enriched_orders'
 );
关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中

-- Flink SQL
Flink SQL> INSERT INTO enriched_orders
 SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
 FROM orders AS o
 LEFT JOIN products AS p ON o.product_id = p.id
 LEFT JOIN shipments AS s ON o.order_id = s.order_id;

现在,就可以在 Kibana 中看到包含商品和物流信息的订单数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders.

c2a9028e83da08197fd02c0d3795ae02.png

然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.

5e926aeab3f02b870b20c3c4c43112df.png

接下来,修改 MySQL 和 Postgres 数据库中表的数据,Kibana中显示的订单数据也将实时更新:

  1. 1. 在 MySQL 的orders表中插入一条数据--MySQL
    INSERT INTO orders
    VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

  2. 2. 在 Postgres 的shipment表中插入一条数据--PG
    INSERT INTO shipments
    VALUES (default,10004,'Shanghai','Beijing',false);

  3. 3. 在 MySQL 的orders表中更新订单的状态--MySQL
    UPDATE orders SET order_status = true WHERE order_id = 10004;

  4. 4. 在 Postgres 的shipment表中更新物流的状态--PG
    UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;

  5. 5. 在 MYSQL 的orders表中删除一条数据--MySQL
    DELETE FROM orders WHERE order_id = 10004;
    每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新,如下所示:

78d0a0e8758fd6e4fb58ff5ce7cdd84f.gif


http://www.kler.cn/a/471256.html

相关文章:

  • 虹软人脸识别
  • 对话|全年HUD前装将超330万台,疆程技术瞄准人机交互“第一屏”
  • PyCharm简单调试
  • 深度评测uni-app x:开启跨平台开发新篇章
  • 使用WebSocket 获取实时数据
  • 安徽省乡镇界面图层+arcgis数据shp格式-乡镇名称和编码2020年+wgs84坐标内容测评
  • C++ 提升编译速度的利器:前向声明
  • 【ABAP开发环境】(三)ABAP GIT
  • 根据python代码自动生成类图的实现方法[附带python源码]
  • Python实现应用决策树的实例程序
  • model_selection.cross_val_score函数介绍
  • CES 2025:ROG打造极致游戏体验
  • python-leetcode-加油站
  • VLMs之Agent之CogAgent:《CogAgent: A Visual Language Model for GUI Agents》翻译与解读
  • 《医院项目驻场半月记:从憧憬到反思的旅程》
  • AWS re:Invent 2024 现场实录 - It‘s all about Scale
  • Mac 安装psycopg2出错:Error:pg_config executable not found的解决
  • 黄仁勋演讲总结(2种显卡,1个开源大模型,1个数据采集平台)
  • 决策树模型与随机森林一文入门,原理、R语言示例
  • Kubernetes Ingress:流量管理的利器
  • 人工智能 前馈神经网络练习题
  • 文献阅读分享:跨域顺序推荐中的用户检索与大语言模型集成
  • L28.【LeetCode笔记】移动零(三种解法)
  • 基于Spring Boot的高校门禁管理系统
  • 【VUE】a链接下载跨域文件直接打开而非下载(解决办法)
  • Win11 上使用 Qume 搭建银河麒麟V10 arm版虚拟机