seatunnel数据集成(二)数据同步
seatunnel数据集成(一)简介与安装
seatunnel数据集成(二)数据同步
seatunnel数据集成(三)多表同步
seatunnel数据集成(四)连接器使用
1、Connector类型
seatunnel连接器类型丰富,支持以下类型:
Source | Sink |
Clickhouse | Clickhouse |
Elasticsearch | Elasticsearch |
FakeSource | FakeSource |
Ftp | Ftp |
Github/Gitlab | Github/Gitlab |
Greenplum | Greenplum |
Hdfs file | Hdfs file |
Hive | Hive |
Http | Http |
Hudi/Iceberg | Hudi/Iceberg |
JDBC | JDBC |
Kudu | Kudu |
MongoDB | MongoDB |
Mysql / MySQL CDC | Mysql / MySQL CDC |
Redis | Redis |
Kafka | Kafka |
StarRocks | StarRocks |
Phoenix | Phoenix |
... | ... |
2、mysql to mysql
参数(必备):
mysql source
- url
- driver
- query
mysql sink
- url
- driver
样例:
env {
# You can set flink configuration here
execution.parallelism = 2
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
query = "select * from base_region limit 4"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/dw"
driver = "com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
query = "insert into base_region(id,region_name) values(?,?)"
}
}
脚本执行:
./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf
3、mysql to hive
如果用的是Spark/Flink引擎,需要Spark/Flink已经集成好了Hive
如果用SeaTunnel Zeta引擎,需要将
- seatunnel-hadoop3-3.1.4-uber.jar
- hive-exec-2.3.9.jar
放到 $SEATUNNEL_HOME/lib/ 目录下。
样例:
env {
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql:///127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
driver = "com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
query = "select * from source_user"
}
}
transform {
}
sink {
Hive {
table_name = "ods.sink_user"
metastore_uri = "thrift://bigdata101:9083"
}
}
执行脚本:
./bin/seatunnel.sh --config ./config/mysql2hive.conf
4、增量同步(参数)
需求:根据创建时间,每天增量抽取
表结构:
-- db
CREATE TABLE `t_order_detail` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
`order_id` bigint DEFAULT NULL COMMENT '订单编号',
`sku_id` bigint DEFAULT NULL COMMENT 'sku_id',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片名称',
`order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
`sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=863 DEFAULT CHARSET=utf8mb3 COMMENT='订单明细表';
-- dw
CREATE TABLE `ods_t_order_detail_di` (
`id` bigint NOT NULL COMMENT '编号',
`order_id` bigint DEFAULT NULL COMMENT '订单编号',
`sku_id` bigint DEFAULT NULL COMMENT 'sku_id',
`sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',
`img_url` varchar(200) DEFAULT NULL COMMENT '图片名称',
`order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
`sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
`create_time` datetime DEFAULT NULL COMMENT '创建时间'
) ENGINE=InnoDB AUTO_INCREMENT=863 DEFAULT CHARSET=utf8mb3 COMMENT='ODS订单明细表';
SELECT id,order_id,sku_id,sku_name,img_url,order_price,sku_num,create_time
FROM t_order_detail
WHERE create_time >= '2024-02-05'
and create_time < DATE_ADD('2024-02-05',interval 1 day)
样例:
env {
execution.parallelism = 2
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
query = "select * from t_order_detail where create_time >= REPLACE('"${etl_dt}"', 'T', ' ') and create_time < date_add(REPLACE('"${etl_dt}"', 'T', ' '),interval 1 day);"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
query = "insert into ods_t_order_detail_di (id,order_id,sku_id,sku_name,img_url,order_price,sku_num,create_time) values(?,?,?,?,?,?,?,?)"
}
}
脚本执行:
./bin/seatunnel.sh --config ./config/mysql2mysql_ods_t_order_detail_di.conf -i etl_dt='2024-02-05'
5、实时
指定作业模式为:STREAMING
job.mode = "STREAMING"
基于mysql cdc:
env {
execution.parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 10000
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
MySQL-CDC {
username = "user"
password = "password"
table-names = ["test.source_user"]
base-url = "jdbc:mysql://127.0.0.1:3306/test"
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/dw"
driver = "com.mysql.cj.jdbc.Driver"
username = "user"
password = "password"
generate_sink_sql = true
database = "dw"
table = "source_user_01"
primary_keys = ["userid"]
}
}
执行脚本:
./bin/seatunnel.sh --config ./config/mysql2mysql_rt.conf