大数据组件(二)快速入门数据集成平台SeaTunnel
大数据组件(二)快速入门数据集成平台SeaTunnel
-
SeaTunnel是一个超高性能的分布式数据集成平台,支持实时海量数据同步。 每天可稳定高效同步数百亿数据,已被近百家企业应用于生产。
-
SeaTunnel的运行流程如下图所示:
- 工作流程为:Source Connector负责并行读取数据,将数据发送到下游Transform或直接发送到Sink,Sink将数据写入目的地;
- 需要注意的是:Transform只能用于对数据进行一些简单的转换,例如将一列的数据转换为大写或小写;
- 目前支持3种引擎:SeaTunnel引擎(Zeta,默认)、Flink以及Spark
-
SeaTunnel支持两种作业开发方法:编码、可视化配置(画布设计,需要安装基于spring-boot开发的seatunnel-web项目)
-
今天,我们用几个简单案例,快速了解下SeaTunnel
- 更多信息可以参考官网:https://seatunnel.apache.org/zh-CN/
- SeaTunnel github地址: https://github.com/apache/incubator-seatunnel
- SeaTunnel-web github地址:https://github.com/apache/seatunnel-web
1 SeaTunnel的下载及安装
注意:
-
SeaTunnel Web和SeaTunnel Engine有严格的版本依赖关系,如果不按版本对应关系安装,会出现很多bug(类找不到异常),你可能需要修改seatunnel-web的源码,比较麻烦。
-
下图是版本对应关系,我们这里安装版本:
seatunnel(2.3.8)、seatunnel-web(1.0.2)
1.1 SeaTunnel的下载及安装
# 1、下载seatunnel包、解压、重命名
export version="2.3.8"
wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz"
[root@centos01 apps]# tar -zxvf apache-seatunnel-2.3.8-bin.tar.gz
[root@centos01 apps]# mv apache-seatunnel-2.3.8/ seatunnel-2.3.8
# 2、下载连接器插件
# 在第一次使用时,可以执行命令来安装连接器
# 后面可以从Maven Repository手动下载连接器,然后将其移动至connectors目录下
# 注意:系统默认自动下载时会下载所有的连接器JAR,
# 下载脚本执行之前先在seatunnel-2.3.8/config/pulgun_config配置中注释掉不需要的连接器
[root@centos01 seatunnel-2.3.8]# sh bin/install-plugin.sh 2.3.8
# 耐心等待
......
[INFO] Resolving org.apache.seatunnel:connector-paimon:jar:2.3.8
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-paimon/2.3.8/connector-paimon-2.3.8.jar
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-paimon/2.3.8/connector-paimon-2.3.8.jar (48 MB at 7.8 MB/s)
[WARNING] destination/dest parameter is deprecated: it will disappear in future version.
[INFO] Copying /root/.m2/repository/org/apache/seatunnel/connector-paimon/2.3.8/connector-paimon-2.3.8.jar to /opt/apps/seatunnel-2.3.8/connectors
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.187 s
[INFO] Finished at: 2025-01-02T10:57:06+08:00
[INFO] ------------------------------------------------------------------------
# 下载好的jar包就在connectors目录下
[root@centos01 seatunnel-2.3.8]# ll connectors/
total 669276
-rw-r--r--. 1 root root 31998536 Jan 2 10:55 connector-cdc-mysql-2.3.8.jar
-rw-r--r--. 1 root root 30970846 Jan 2 10:55 connector-clickhouse-2.3.8.jar
-rw-r--r--. 1 root root 196600 Nov 9 2023 connector-console-2.3.8.jar
-rw-r--r--. 1 root root 11585245 Jan 2 10:55 connector-doris-2.3.8.jar
-rw-r--r--. 1 root root 113521263 Jan 2 10:56 connector-druid-2.3.8.jar
-rw-r--r--. 1 root root 874386 Jan 2 10:55 connector-email-2.3.8.jar
-rw-r--r--. 1 root root 335483 Nov 9 2023 connector-fake-2.3.8.jar
-rw-r--r--. 1 root root 42442805 Jan 2 10:55 connector-file-hadoop-2.3.8.jar
-rw-r--r--. 1 root root 42439445 Jan 2 10:55 connector-file-local-2.3.8.jar
-rw-r--r--. 1 root root 42744405 Jan 2 10:55 connector-file-sftp-2.3.8.jar
-rw-r--r--. 1 root root 51064692 Jan 2 10:56 connector-hbase-2.3.8.jar
-rw-r--r--. 1 root root 54238906 Jan 2 10:56 connector-hive-2.3.8.jar
-rw-r--r--. 1 root root 5352823 Jan 2 10:56 connector-http-base-2.3.8.jar
-rw-r--r--. 1 root root 162286208 Jan 2 10:56 connector-hudi-2.3.8.jar
-rw-r--r--. 1 root root 1025577 Jan 2 10:56 connector-jdbc-2.3.8.jar
-rw-r--r--. 1 root root 32869689 Jan 2 10:56 connector-kafka-2.3.8.jar
-rw-r--r--. 1 root root 47606664 Jan 2 10:57 connector-paimon-2.3.8.jar
-rw-r--r--. 1 root root 1514183 Jan 2 10:56 connector-redis-2.3.8.jar
-rw-r--r--. 1 root root 304892 Jan 2 10:56 connector-socket-2.3.8.jar
-rw-r--r--. 1 root root 7114 Nov 9 2023 plugin-mapping.properties
-rw-r--r--. 1 root root 11921607 Nov 9 2023 seatunnel-transforms-v2-2.3.8.jar
# 3、修改FLINK_HOME
# config/seatunnel-env.sh脚本中声明了SPARK_HOME和FLINK_HOME两个路径,这里只修改flink
[root@centos01 seatunnel-2.3.8]# vim config/seatunnel-env.sh
# Home directory of flink distribution.
FLINK_HOME=/opt/apps/flink-1.14.4
# 4、配置环境变量
[root@centos01 seatunnel-2.3.8]# vim /etc/profile
export SEATUNNEL_HOME=/opt/apps/seatunnel-2.3.8
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$FLINK_HOME/bin:$HIVE_HOME/bin:$SEATUNNEL_HOME/bin
[root@centos01 seatunnel-2.3.8]# source /etc/profile
# 上传mysql的jar包
[root@centos01 seatunnel-2.3.8]# cp /opt/apps/hive-3.1.2/lib/mysql-connector-java-8.0.28.jar /opt/apps/seatunnel-2.3.8/lib/
# 5、本地测试
# 本地脚本
[root@centos01 seatunnel-2.3.8]# ./bin/seatunnel.sh --config ./config/v2.batch.config.template -m local
# 在seatunnel的控制台会输出结果:
2025-01-02 11:02:49,099 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1 rowIndex=1: SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : vcDDj, 1693245180
2025-01-02 11:02:49,099 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : QCFuB, 15374559
2025-01-02 11:02:49,099 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1 rowIndex=2: SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : ULydR, 1710108169
2025-01-02 11:02:49,099 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : mkCgx, 205555495
2025-01-02 11:02:49,099 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1 rowIndex=3: SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : AiLvO, 1396632512
2025-01-02 11:02:49,099 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : yVNuc, 719969187
2025-01-02 11:02:49,099 INFO [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1 rowIndex=4: SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : KGsDQ, 616598528
......
# 还会输出下面统计信息
2025-01-02 11:02:50,192 INFO [o.a.s.e.c.j.ClientJobProxy ] [main] - Job (927034158943830017) end with state FINISHED
2025-01-02 11:02:50,225 INFO [s.c.s.s.c.ClientExecuteCommand] [main] -
***********************************************
Job Statistic Information
***********************************************
Start Time : 2025-01-02 11:02:46
End Time : 2025-01-02 11:02:50
Total Time(s) : 3
Total Read Count : 32
Total Write Count : 32
Total Failed Count : 0
***********************************************
完成本地测试后,我们再用Flink作为引擎,实现几个简单案例:
# 先启动flink集群
[root@centos01 flink-1.14.4]# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host centos01.
Starting taskexecutor daemon on host centos01.
1.2 官方示例
# Flink版本1.12.x到1.14.x使用如下命令:
[root@centos01 seatunnel-2.3.8]# bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/v2.streaming.conf.template
# Flink版本1.15.x到1.18.x使用如下命令:
# bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/v2.streaming.conf.template
env {
# You can set SeaTunnel environment configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 2000
}
source {
# 产生fake数据
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source
}
sink {
# 打印到控制台
Console {
}
# If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink
}
1.3 socket2mysql任务
-
在SeaTunnel中,最重要的事情就是配置文件,配置文件的主要格式是
hocon
。 -
配置文件包括几个部分:env、source、transform和sink。不同的模块具有不同的功能:
- env用于添加引擎可选的参数,不管是什么引擎(Zeta、Spark 或者 Flink),对应的可选参数应该在这里填写;
- source用于定义SeaTunnel在哪儿检索数据,并将检索的数据用于下一步。 可以同时定义多个source。目前支持的source:Source of SeaTunnel。每种source都有自己特定的参数用来 定义如何检索数据,SeaTunnel也抽象了每种source所使用的参数,例如
result_table_name
参数,用于指定当前source生成的数据的名称, 方便后续其他模块使用; - 当我们有了数据源之后,我们可能需要对数据进行进一步的处理,所以我们就有了transform模块。当然,也可以直接将transform视为不存在,直接从source到sink。
- SeaTunnel的作用是将数据从一个地方同步到其它地方,同样也可以定义多个sink。
- 更多的source、transform以及sink示例可以参考官网:Source(V2) of SeaTunnel | Apache SeaTunnel
-
启动socket2mysql任务
[root@centos01 seatunnel-2.3.8]# bin/start-seatunnel-flink-13-connector-v2.sh --config ./seatunnel_jobs/_02_socket2mysql.conf # 另外一个窗口 [root@centos01 ~]# nc -lk 9999 p001#3600#u001 p002#600#u001 p003#100#u001 p004#100#u001
-
在TaskManager上可以看到控制台输出
- 在Mysql数据库中可以看到数据已经插入
env {
execution.parallelism = 1 # 设置Flink作业的全局并行度为1
job.mode = "STREAMING" # 定义作业模式为流模式(STREAMING)
# job.mode = "BATCH"
}
source {
# Socket输入源配置,用于从指定的主机和端口读取数据
socket {
host = "centos01"
port = 9999
result_table_name = "socket_input"
}
}
transform {
# 1、 Split转换操作配置,用于将来自'source_table_name'定义的输入表中的单个字段拆分成多个字段。
Split {
source_table_name = "socket_input"
result_table_name = "split_output"
split_field = "value"
output_fields = ["page_id", "page_duration", "user_id"]
separator = "#"
}
# 2、SQL转换操作配置,允许使用SQL查询对数据进行处理
Sql {
source_table_name = "split_output"
result_table_name = "sql_output"
query = "select page_id, page_duration, user_id from split_output"
schema = {
fields {
page_id = "string"
page_duration = "string"
user_id = "string"
}
}
}
}
sink {
# Console Sink配置,将数据打印到控制台
Console {
source_table_name = "sql_output"
}
# JDBC Sink配置,用于将数据写入关系型数据库
jdbc {
source_table_name = "sql_output"
url = "jdbc:mysql://localhost:3306/my_database?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&useSSL=false"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
query = "insert into test_socket_table(page_id,page_duration,user_id) values(?,?,?)"
batch_size = "2" # 每次插入数据的批次大小
}
}
2 SeaTunnel-web的下载及安装
2.1 SeaTunnel-web的下载及安装
- SeaTunnel-web下载地址:https://dlcdn.apache.org/seatunnel/seatunnel-web/
- github地址:https://github.com/apache/seatunnel-web
# 1、seatunnel-web的下载、重命名
[root@centos01 apps]# wget https://dlcdn.apache.org/seatunnel/seatunnel-web/1.0.2/apache-seatunnel-web-1.0.2-bin.tar.gz
[root@centos01 apps]# tar -zxvf apache-seatunnel-web-1.0.2-bin.tar.gz
[root@centos01 apps]# mv apache-seatunnel-web-1.0.2-bin/ seatunnel-web-1.0.2
# 2、数据库初始化
# 将script/seatunnel_server_env.sh相关配置改为你的对应的数据库信息
[root@centos01 script]# cat seatunnel_server_env.sh
export HOSTNAME="127.0.0.1"
export PORT="3306"
export USERNAME="root"
export PASSWORD="123456"
# 进入seatunnel-web的安装目录,然后执行命令sh init_sql.sh,无异常则执行成功
# 成功后,会在数据库中新增seatunnel库和相关表
[root@centos01 script]# sh ./init_sql.sh
mysql: [Warning] Using a password on the command line interface can be insecure.
mysql> use seatunnel
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> show tables;
+---------------------------+
| Tables_in_seatunnel |
+---------------------------+
| role |
| role_user_relation |
| t_st_datasource |
| t_st_job_definition |
| t_st_job_instance |
| t_st_job_instance_history |
| t_st_job_line |
| t_st_job_metrics |
| t_st_job_task |
| t_st_job_version |
| t_st_virtual_table |
| user |
| user_login_log |
+---------------------------+
13 rows in set (0.00 sec)
# 3、修改spring程序数据源连接信息
# 注意:如果web页面上显示时间不对,可以在连接mysql时候加上时区信息
[root@centos01 conf]# pwd
/opt/apps/seatunnel-web-1.0.2/conf
[root@centos01 conf]# vim application.yml
server:
port: 8801
spring:
application:
name: seatunnel
jackson:
date-format: yyyy-MM-dd HH:mm:ss
# 1、修改数据源信息
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/seatunnel?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true&serverTimezone=UTC
username: root
password: 123456
mvc:
pathmatch:
matching-strategy: ant_path_matcher
jwt:
expireTime: 86400
# please add key when deploy
# 2、设置jwt下的secretKey
secretKey: https://dlcdn.apache.org/seatunnel
algorithm: HS256
# 4、copy相关文件
# 将seatunnel引擎服务节点的安装目录下的config目录下的关于引擎客户端的配置文件拷贝到seatunnel-web安装目录下的conf目录下
# 注意:这里配置的是localhost:5801,如果在虚机上跑SeaTunnelServer服务,而在本地IDEA调试web项目,可以设置虚机上的IP
[root@centos01 conf]# cp /opt/apps/seatunnel-2.3.8/config/hazelcast-client.yaml /opt/apps/seatunnel-web-1.0.2/conf/
# 将seatunnel引擎服务节点的安装目录下的connectors目录下的plugin-mapping.properties配置文件拷贝到seatunnel-web安装目录下的conf目录下
[root@centos01 conf]# cp /opt/apps/seatunnel-2.3.8/connectors/plugin-mapping.properties /opt/apps/seatunnel-web-1.0.2/conf/
# 复制seatunnel-2.3.8下的connectors
[root@centos01 seatunnel-web-1.0.2]# cp /opt/apps/seatunnel-2.3.8/connectors/connector-*.jar /opt/apps/seatunnel-web-1.0.2/libs/
# 复制mysql-connector包
[root@centos01 seatunnel-web-1.0.2]# cp /opt/apps/hive-3.1.2/lib/mysql-connector-java-8.0.28.jar /opt/apps/seatunnel-web-1.0.2/libs/
# 5、下载配置数据源JAR包
# 默认下载时候,有几个会下载失败,这里直接注释了
# 可以看到,目前web项目支持的数据源还是有限的
[root@centos01 seatunnel-web-1.0.2]# vim bin/download_datasource.sh
datasource_list=(
"datasource-plugins-api"
"datasource-elasticsearch"
"datasource-hive"
"datasource-jdbc-clickhouse"
"datasource-jdbc-hive"
"datasource-jdbc-mysql"
"datasource-jdbc-oracle"
"datasource-jdbc-postgresql"
"datasource-jdbc-redshift"
"datasource-jdbc-sqlserver"
"datasource-jdbc-starrocks"
"datasource-jdbc-tidb"
"datasource-kafka"
"datasource-mysql-cdc"
"datasource-s3"
"datasource-sqlserver-cdc"
"datasource-starrocks"
)
# "datasource-mongodb"
# "datasource-fakesource"
# "datasource-console"
[root@centos01 seatunnel-web-1.0.2]# sh bin/download_datasource.sh
......
[WARNING] Notice transitive dependencies won't be copied.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 0.622 s
[INFO] Finished at: 2024-12-31T12:50:46+08:00
[INFO] ------------------------------------------------------------------------
# 6、启动服务
# 启动server服务
[root@centos01 seatunnel-2.3.8]# nohup sh bin/seatunnel-cluster.sh 2>&1 &
# 启动web服务
[root@centos01 seatunnel-web-1.0.2]# sh bin/seatunnel-backend-daemon.sh start
[root@centos01 seatunnel-web-1.0.2]# jps
6499 Jps
6452 SeatunnelApplication
5150 SeaTunnelServer
# web页面(页面中可以设置显示为中文语言)
http://192.168.42.101:8801
默认用户名和密码:admin/admin
2.2 web端Mysql2mysql任务
1、创建数据源
2、创建同步任务
3、运行任务
可以看到已经插入到数据库中: