当前位置: 首页 > article >正文

seatunnel数据集成(二)数据同步

seatunnel数据集成(一)简介与安装
seatunnel数据集成(二)数据同步
seatunnel数据集成(三)多表同步
seatunnel数据集成(四)连接器使用


1、Connector类型

seatunnel连接器类型丰富,支持以下类型:

Source

Sink

Clickhouse

Clickhouse

Elasticsearch

Elasticsearch

FakeSource

FakeSource

Ftp

Ftp

Github/Gitlab

Github/Gitlab

Greenplum

Greenplum

Hdfs file

Hdfs file

Hive

Hive

Http

Http

Hudi/Iceberg

Hudi/Iceberg

JDBC

JDBC

Kudu

Kudu

MongoDB

MongoDB

Mysql / MySQL CDC

Mysql / MySQL CDC

Redis

Redis

Kafka

Kafka

StarRocks

StarRocks

Phoenix

Phoenix

...

...

2、mysql to mysql

参数(必备):

mysql source

  • url
  • driver
  • query

mysql sink

  • url
  • driver

样例:

env {
  # You can set flink configuration here
  execution.parallelism = 2
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        query = "select * from base_region limit 4"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/dw"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "user"
    password = "password"
    query = "insert into base_region(id,region_name) values(?,?)"
  }
}

脚本执行:

./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf

3、mysql to hive

如果用的是Spark/Flink引擎,需要Spark/Flink已经集成好了Hive

如果用SeaTunnel Zeta引擎,需要将

  • seatunnel-hadoop3-3.1.4-uber.jar
  • hive-exec-2.3.9.jar

放到 $SEATUNNEL_HOME/lib/ 目录下。

样例:

env {
  job.mode = "BATCH"
}

source {
    Jdbc {
        url = "jdbc:mysql:///127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "password"
        query = "select * from source_user"
    }
}

transform {
}

sink {
  Hive {
    table_name = "ods.sink_user"
    metastore_uri = "thrift://bigdata101:9083"
  }

}

 执行脚本: 

./bin/seatunnel.sh --config ./config/mysql2hive.conf

4、增量同步(参数)

需求:根据创建时间,每天增量抽取

表结构:

-- db 
CREATE TABLE `t_order_detail` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '编号',
  `order_id` bigint DEFAULT NULL COMMENT '订单编号',
  `sku_id` bigint DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称',
  `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=863 DEFAULT CHARSET=utf8mb3 COMMENT='订单明细表';

-- dw
CREATE TABLE `ods_t_order_detail_di` (
  `id` bigint NOT NULL COMMENT '编号',
  `order_id` bigint DEFAULT NULL COMMENT '订单编号',
  `sku_id` bigint DEFAULT NULL COMMENT 'sku_id',
  `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',
  `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称',
  `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
  `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间'
) ENGINE=InnoDB AUTO_INCREMENT=863 DEFAULT CHARSET=utf8mb3 COMMENT='ODS订单明细表';


SELECT id,order_id,sku_id,sku_name,img_url,order_price,sku_num,create_time 
FROM t_order_detail 
WHERE create_time >= '2024-02-05' 
and create_time < DATE_ADD('2024-02-05',interval 1 day) 

样例:

env {
  execution.parallelism = 2
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
        query = "select * from t_order_detail where create_time >= REPLACE('"${etl_dt}"', 'T', ' ') and create_time < date_add(REPLACE('"${etl_dt}"', 'T', ' '),interval 1 day);"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
  jdbc {
        url = "jdbc:mysql://127.0.0.1:3306/test"
        driver = "com.mysql.cj.jdbc.Driver"
        connection_check_timeout_sec = 100
        user = "user"
        password = "password"
    query = "insert into ods_t_order_detail_di (id,order_id,sku_id,sku_name,img_url,order_price,sku_num,create_time) values(?,?,?,?,?,?,?,?)"
  }
}

脚本执行:

./bin/seatunnel.sh --config ./config/mysql2mysql_ods_t_order_detail_di.conf -i etl_dt='2024-02-05'

5、实时

指定作业模式为:STREAMING

job.mode = "STREAMING"

基于mysql cdc:

env {
  execution.parallelism = 2
  job.mode = "STREAMING"
  checkpoint.interval = 10000
  #execution.checkpoint.interval = 10000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
  MySQL-CDC {
    username = "user"
    password = "password"
    table-names = ["test.source_user"]
    base-url = "jdbc:mysql://127.0.0.1:3306/test"
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/dw"
    driver = "com.mysql.cj.jdbc.Driver"
    username = "user"
    password = "password"
    generate_sink_sql = true
    database = "dw"
    table = "source_user_01"
    primary_keys = ["userid"]
  }
}

执行脚本: 

./bin/seatunnel.sh --config ./config/mysql2mysql_rt.conf


http://www.kler.cn/a/230073.html

相关文章:

  • 【MySQL系列文章】Linux环境下安装部署MySQL
  • Git最便捷的迁移方式
  • 有关Redis的相关概述
  • Linux pget 下载命令详解
  • 嵌入式技术之Linux(Ubuntu) 一
  • Springboot SAP Docker 镜像打包问题
  • 简单指针运算c语言
  • JAVA中的main方法
  • 如何使用Docker部署DashDot服务器仪表盘并结合cpolar实现公网访问
  • Django连接Mysql
  • CSS是一门需要单独学习的技术吗?
  • STM32之USART
  • 敏捷开发的INVEST原则
  • Python 中的 os 模块常见方法
  • Leetcode 518 零钱兑换 II
  • GPTs保姆级教程之实践
  • 一周学会Django5 Python Web开发-Django5介绍及安装
  • npm 上传一个自己的应用(3) 在项目中导入及使用自己上传到NPM的工具
  • 【Vitis】HLS高层次综合的优势
  • 【Linux系统化学习】进程替换
  • 阿里云OSS对象存储
  • 大型装备制造企业案例分享——通过CRM系统管理全球业务
  • Ubuntu下anaconda的常用操作
  • JAVA中的抽象类
  • Oracle systemstate、gdb、dbx介绍
  • docker 部署springboot项目详细步骤