当前位置: 首页 > article >正文

大数据组件(二)快速入门数据集成平台SeaTunnel

大数据组件(二)快速入门数据集成平台SeaTunnel

  • SeaTunnel是一个超高性能的分布式数据集成平台,支持实时海量数据同步。 每天可稳定高效同步数百亿数据,已被近百家企业应用于生产。

  • SeaTunnel的运行流程如下图所示:

    • 工作流程为:Source Connector负责并行读取数据,将数据发送到下游Transform或直接发送到Sink,Sink将数据写入目的地;
    • 需要注意的是:Transform只能用于对数据进行一些简单的转换,例如将一列的数据转换为大写或小写;
    • 目前支持3种引擎:SeaTunnel引擎(Zeta,默认)、Flink以及Spark
    • SeaTunnel Work Flowchart
  • SeaTunnel支持两种作业开发方法:编码、可视化配置(画布设计,需要安装基于spring-boot开发的seatunnel-web项目)

  • 今天,我们用几个简单案例,快速了解下SeaTunnel

    • 更多信息可以参考官网:https://seatunnel.apache.org/zh-CN/
    • SeaTunnel github地址: https://github.com/apache/incubator-seatunnel
    • SeaTunnel-web github地址:https://github.com/apache/seatunnel-web

1 SeaTunnel的下载及安装

注意:

  • SeaTunnel Web和SeaTunnel Engine有严格的版本依赖关系,如果不按版本对应关系安装,会出现很多bug(类找不到异常),你可能需要修改seatunnel-web的源码,比较麻烦。

  • 下图是版本对应关系,我们这里安装版本:seatunnel(2.3.8)、seatunnel-web(1.0.2)

    在这里插入图片描述

1.1 SeaTunnel的下载及安装

# 1、下载seatunnel包、解压、重命名
export version="2.3.8"
wget "https://archive.apache.org/dist/seatunnel/${version}/apache-seatunnel-${version}-bin.tar.gz"

[root@centos01 apps]# tar -zxvf apache-seatunnel-2.3.8-bin.tar.gz
[root@centos01 apps]# mv apache-seatunnel-2.3.8/ seatunnel-2.3.8

# 2、下载连接器插件
# 在第一次使用时,可以执行命令来安装连接器
# 后面可以从Maven Repository手动下载连接器,然后将其移动至connectors目录下

# 注意:系统默认自动下载时会下载所有的连接器JAR,
# 下载脚本执行之前先在seatunnel-2.3.8/config/pulgun_config配置中注释掉不需要的连接器
[root@centos01 seatunnel-2.3.8]# sh bin/install-plugin.sh 2.3.8

# 耐心等待
......
[INFO] Resolving org.apache.seatunnel:connector-paimon:jar:2.3.8
Downloading from central: https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-paimon/2.3.8/connector-paimon-2.3.8.jar
Downloaded from central: https://repo.maven.apache.org/maven2/org/apache/seatunnel/connector-paimon/2.3.8/connector-paimon-2.3.8.jar (48 MB at 7.8 MB/s)
[WARNING] destination/dest parameter is deprecated: it will disappear in future version.
[INFO] Copying /root/.m2/repository/org/apache/seatunnel/connector-paimon/2.3.8/connector-paimon-2.3.8.jar to /opt/apps/seatunnel-2.3.8/connectors
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  7.187 s
[INFO] Finished at: 2025-01-02T10:57:06+08:00
[INFO] ------------------------------------------------------------------------


# 下载好的jar包就在connectors目录下
[root@centos01 seatunnel-2.3.8]# ll connectors/
total 669276
-rw-r--r--. 1 root root  31998536 Jan  2 10:55 connector-cdc-mysql-2.3.8.jar
-rw-r--r--. 1 root root  30970846 Jan  2 10:55 connector-clickhouse-2.3.8.jar
-rw-r--r--. 1 root root    196600 Nov  9  2023 connector-console-2.3.8.jar
-rw-r--r--. 1 root root  11585245 Jan  2 10:55 connector-doris-2.3.8.jar
-rw-r--r--. 1 root root 113521263 Jan  2 10:56 connector-druid-2.3.8.jar
-rw-r--r--. 1 root root    874386 Jan  2 10:55 connector-email-2.3.8.jar
-rw-r--r--. 1 root root    335483 Nov  9  2023 connector-fake-2.3.8.jar
-rw-r--r--. 1 root root  42442805 Jan  2 10:55 connector-file-hadoop-2.3.8.jar
-rw-r--r--. 1 root root  42439445 Jan  2 10:55 connector-file-local-2.3.8.jar
-rw-r--r--. 1 root root  42744405 Jan  2 10:55 connector-file-sftp-2.3.8.jar
-rw-r--r--. 1 root root  51064692 Jan  2 10:56 connector-hbase-2.3.8.jar
-rw-r--r--. 1 root root  54238906 Jan  2 10:56 connector-hive-2.3.8.jar
-rw-r--r--. 1 root root   5352823 Jan  2 10:56 connector-http-base-2.3.8.jar
-rw-r--r--. 1 root root 162286208 Jan  2 10:56 connector-hudi-2.3.8.jar
-rw-r--r--. 1 root root   1025577 Jan  2 10:56 connector-jdbc-2.3.8.jar
-rw-r--r--. 1 root root  32869689 Jan  2 10:56 connector-kafka-2.3.8.jar
-rw-r--r--. 1 root root  47606664 Jan  2 10:57 connector-paimon-2.3.8.jar
-rw-r--r--. 1 root root   1514183 Jan  2 10:56 connector-redis-2.3.8.jar
-rw-r--r--. 1 root root    304892 Jan  2 10:56 connector-socket-2.3.8.jar
-rw-r--r--. 1 root root      7114 Nov  9  2023 plugin-mapping.properties
-rw-r--r--. 1 root root  11921607 Nov  9  2023 seatunnel-transforms-v2-2.3.8.jar


# 3、修改FLINK_HOME
# config/seatunnel-env.sh脚本中声明了SPARK_HOME和FLINK_HOME两个路径,这里只修改flink
[root@centos01 seatunnel-2.3.8]# vim config/seatunnel-env.sh 
# Home directory of flink distribution.
FLINK_HOME=/opt/apps/flink-1.14.4


# 4、配置环境变量
[root@centos01 seatunnel-2.3.8]# vim /etc/profile
export  SEATUNNEL_HOME=/opt/apps/seatunnel-2.3.8
export  PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$FLINK_HOME/bin:$HIVE_HOME/bin:$SEATUNNEL_HOME/bin
[root@centos01 seatunnel-2.3.8]# source /etc/profile
# 上传mysql的jar包
[root@centos01 seatunnel-2.3.8]# cp /opt/apps/hive-3.1.2/lib/mysql-connector-java-8.0.28.jar /opt/apps/seatunnel-2.3.8/lib/

# 5、本地测试
# 本地脚本
[root@centos01 seatunnel-2.3.8]# ./bin/seatunnel.sh --config ./config/v2.batch.config.template -m local


# 在seatunnel的控制台会输出结果:
2025-01-02 11:02:49,099 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=1:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : vcDDj, 1693245180
2025-01-02 11:02:49,099 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=1:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : QCFuB, 15374559
2025-01-02 11:02:49,099 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=2:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : ULydR, 1710108169
2025-01-02 11:02:49,099 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=2:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : mkCgx, 205555495
2025-01-02 11:02:49,099 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=3:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : AiLvO, 1396632512
2025-01-02 11:02:49,099 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=0  rowIndex=3:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : yVNuc, 719969187
2025-01-02 11:02:49,099 INFO  [.a.s.c.s.c.s.ConsoleSinkWriter] [st-multi-table-sink-writer-1] - subtaskIndex=1  rowIndex=4:  SeaTunnelRow#tableId=fake SeaTunnelRow#kind=INSERT : KGsDQ, 616598528

......
# 还会输出下面统计信息
2025-01-02 11:02:50,192 INFO  [o.a.s.e.c.j.ClientJobProxy    ] [main] - Job (927034158943830017) end with state FINISHED
2025-01-02 11:02:50,225 INFO  [s.c.s.s.c.ClientExecuteCommand] [main] - 
***********************************************
           Job Statistic Information
***********************************************
Start Time                : 2025-01-02 11:02:46
End Time                  : 2025-01-02 11:02:50
Total Time(s)             :                   3
Total Read Count          :                  32
Total Write Count         :                  32
Total Failed Count        :                   0
***********************************************

完成本地测试后,我们再用Flink作为引擎,实现几个简单案例:

# 先启动flink集群
[root@centos01 flink-1.14.4]# bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host centos01.
Starting taskexecutor daemon on host centos01.

1.2 官方示例

# Flink版本1.12.x到1.14.x使用如下命令:
[root@centos01 seatunnel-2.3.8]# bin/start-seatunnel-flink-13-connector-v2.sh --config ./config/v2.streaming.conf.template


# Flink版本1.15.x到1.18.x使用如下命令:
# bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/v2.streaming.conf.template

在这里插入图片描述

在这里插入图片描述

env {
  # You can set SeaTunnel environment configuration here
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}

source {
  # 产生fake数据
  FakeSource {
    parallelism = 2
    result_table_name = "fake"
    row.num = 16
    schema = {
      fields {
        name = "string"
        age = "int"
      }
    }
  }

  # If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
  # please go to https://seatunnel.apache.org/docs/connector-v2/source
}

sink {
  # 打印到控制台
  Console {
  }

  # If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
  # please go to https://seatunnel.apache.org/docs/connector-v2/sink
}

1.3 socket2mysql任务

  • 在SeaTunnel中,最重要的事情就是配置文件,配置文件的主要格式是 hocon

  • 配置文件包括几个部分:env、source、transform和sink。不同的模块具有不同的功能:

    • env用于添加引擎可选的参数,不管是什么引擎(Zeta、Spark 或者 Flink),对应的可选参数应该在这里填写;
    • source用于定义SeaTunnel在哪儿检索数据,并将检索的数据用于下一步。 可以同时定义多个source。目前支持的source:Source of SeaTunnel。每种source都有自己特定的参数用来 定义如何检索数据,SeaTunnel也抽象了每种source所使用的参数,例如 result_table_name 参数,用于指定当前source生成的数据的名称, 方便后续其他模块使用;
    • 当我们有了数据源之后,我们可能需要对数据进行进一步的处理,所以我们就有了transform模块。当然,也可以直接将transform视为不存在,直接从source到sink。
    • SeaTunnel的作用是将数据从一个地方同步到其它地方,同样也可以定义多个sink。
    • 更多的source、transform以及sink示例可以参考官网:Source(V2) of SeaTunnel | Apache SeaTunnel
  • 启动socket2mysql任务

    [root@centos01 seatunnel-2.3.8]# bin/start-seatunnel-flink-13-connector-v2.sh --config ./seatunnel_jobs/_02_socket2mysql.conf
    
    # 另外一个窗口
    [root@centos01 ~]# nc -lk 9999
    p001#3600#u001
    p002#600#u001
    p003#100#u001
    p004#100#u001
    
  • 在TaskManager上可以看到控制台输出

在这里插入图片描述

  • 在Mysql数据库中可以看到数据已经插入

在这里插入图片描述

env {
  execution.parallelism = 1   # 设置Flink作业的全局并行度为1
  job.mode = "STREAMING"      # 定义作业模式为流模式(STREAMING)
  # job.mode = "BATCH"
}

source {
  # Socket输入源配置,用于从指定的主机和端口读取数据
  socket {
    host = "centos01"
    port = 9999
    result_table_name = "socket_input"
  }
}

transform {
  # 1、 Split转换操作配置,用于将来自'source_table_name'定义的输入表中的单个字段拆分成多个字段。
  Split {
	source_table_name = "socket_input"  
	result_table_name = "split_output"
    split_field = "value"
    output_fields = ["page_id", "page_duration", "user_id"]
    separator = "#"
  }
  # 2、SQL转换操作配置,允许使用SQL查询对数据进行处理
  Sql {
    source_table_name = "split_output"
    result_table_name = "sql_output"
    query = "select page_id, page_duration, user_id from split_output"
	schema = {
      fields {
        page_id = "string"
        page_duration = "string"
        user_id = "string"
      }
    }
  }
}


sink {
    # Console Sink配置,将数据打印到控制台
	Console {
		source_table_name = "sql_output"
	}
	# JDBC Sink配置,用于将数据写入关系型数据库
    jdbc {
        source_table_name = "sql_output"
        url = "jdbc:mysql://localhost:3306/my_database?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&useSSL=false"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "root"
        password = "123456"
        query = "insert into test_socket_table(page_id,page_duration,user_id) values(?,?,?)"
	    batch_size = "2"  # 每次插入数据的批次大小
    }
}

2 SeaTunnel-web的下载及安装

2.1 SeaTunnel-web的下载及安装

  • SeaTunnel-web下载地址:https://dlcdn.apache.org/seatunnel/seatunnel-web/
  • github地址:https://github.com/apache/seatunnel-web


# 1、seatunnel-web的下载、重命名
[root@centos01 apps]# wget https://dlcdn.apache.org/seatunnel/seatunnel-web/1.0.2/apache-seatunnel-web-1.0.2-bin.tar.gz
[root@centos01 apps]# tar -zxvf apache-seatunnel-web-1.0.2-bin.tar.gz
[root@centos01 apps]# mv apache-seatunnel-web-1.0.2-bin/ seatunnel-web-1.0.2

# 2、数据库初始化
# 将script/seatunnel_server_env.sh相关配置改为你的对应的数据库信息
[root@centos01 script]# cat seatunnel_server_env.sh 
export HOSTNAME="127.0.0.1"
export PORT="3306"
export USERNAME="root"
export PASSWORD="123456"

# 进入seatunnel-web的安装目录,然后执行命令sh init_sql.sh,无异常则执行成功
# 成功后,会在数据库中新增seatunnel库和相关表
[root@centos01 script]# sh ./init_sql.sh 
mysql: [Warning] Using a password on the command line interface can be insecure.


mysql> use seatunnel
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+---------------------------+
| Tables_in_seatunnel       |
+---------------------------+
| role                      |
| role_user_relation        |
| t_st_datasource           |
| t_st_job_definition       |
| t_st_job_instance         |
| t_st_job_instance_history |
| t_st_job_line             |
| t_st_job_metrics          |
| t_st_job_task             |
| t_st_job_version          |
| t_st_virtual_table        |
| user                      |
| user_login_log            |
+---------------------------+
13 rows in set (0.00 sec)

# 3、修改spring程序数据源连接信息
# 注意:如果web页面上显示时间不对,可以在连接mysql时候加上时区信息
[root@centos01 conf]# pwd
/opt/apps/seatunnel-web-1.0.2/conf
[root@centos01 conf]# vim application.yml 
server:
  port: 8801

spring:
  application:
    name: seatunnel
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
  # 1、修改数据源信息  
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/seatunnel?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true&serverTimezone=UTC
    username: root
    password: 123456
  mvc:
    pathmatch:
      matching-strategy: ant_path_matcher

jwt:
  expireTime: 86400
  # please add key when deploy
  # 2、设置jwt下的secretKey
  secretKey: https://dlcdn.apache.org/seatunnel
  algorithm: HS256

# 4、copy相关文件

# 将seatunnel引擎服务节点的安装目录下的config目录下的关于引擎客户端的配置文件拷贝到seatunnel-web安装目录下的conf目录下
# 注意:这里配置的是localhost:5801,如果在虚机上跑SeaTunnelServer服务,而在本地IDEA调试web项目,可以设置虚机上的IP
[root@centos01 conf]# cp /opt/apps/seatunnel-2.3.8/config/hazelcast-client.yaml /opt/apps/seatunnel-web-1.0.2/conf/

在这里插入图片描述

# 将seatunnel引擎服务节点的安装目录下的connectors目录下的plugin-mapping.properties配置文件拷贝到seatunnel-web安装目录下的conf目录下
[root@centos01 conf]# cp /opt/apps/seatunnel-2.3.8/connectors/plugin-mapping.properties /opt/apps/seatunnel-web-1.0.2/conf/

# 复制seatunnel-2.3.8下的connectors
[root@centos01 seatunnel-web-1.0.2]# cp /opt/apps/seatunnel-2.3.8/connectors/connector-*.jar /opt/apps/seatunnel-web-1.0.2/libs/

# 复制mysql-connector包
[root@centos01 seatunnel-web-1.0.2]# cp /opt/apps/hive-3.1.2/lib/mysql-connector-java-8.0.28.jar /opt/apps/seatunnel-web-1.0.2/libs/

# 5、下载配置数据源JAR包
# 默认下载时候,有几个会下载失败,这里直接注释了
# 可以看到,目前web项目支持的数据源还是有限的
[root@centos01 seatunnel-web-1.0.2]# vim  bin/download_datasource.sh 
datasource_list=(
  "datasource-plugins-api"
  "datasource-elasticsearch"
  "datasource-hive"
  "datasource-jdbc-clickhouse"
  "datasource-jdbc-hive"
  "datasource-jdbc-mysql"
  "datasource-jdbc-oracle"
  "datasource-jdbc-postgresql"
  "datasource-jdbc-redshift"
  "datasource-jdbc-sqlserver"
  "datasource-jdbc-starrocks"
  "datasource-jdbc-tidb"
  "datasource-kafka"
  "datasource-mysql-cdc"
  "datasource-s3"
  "datasource-sqlserver-cdc"
  "datasource-starrocks"
)
 # "datasource-mongodb"
 # "datasource-fakesource"
 # "datasource-console"


[root@centos01 seatunnel-web-1.0.2]# sh bin/download_datasource.sh 

......
[WARNING] Notice transitive dependencies won't be copied.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  0.622 s
[INFO] Finished at: 2024-12-31T12:50:46+08:00
[INFO] ------------------------------------------------------------------------


# 6、启动服务
# 启动server服务
[root@centos01 seatunnel-2.3.8]# nohup sh bin/seatunnel-cluster.sh 2>&1 &
# 启动web服务
[root@centos01 seatunnel-web-1.0.2]# sh bin/seatunnel-backend-daemon.sh start

[root@centos01 seatunnel-web-1.0.2]# jps
6499 Jps
6452 SeatunnelApplication
5150 SeaTunnelServer


# web页面(页面中可以设置显示为中文语言)
http://192.168.42.101:8801
默认用户名和密码:admin/admin

在这里插入图片描述

2.2 web端Mysql2mysql任务

1、创建数据源

在这里插入图片描述

2、创建同步任务

在这里插入图片描述

在这里插入图片描述

3、运行任务

在这里插入图片描述

可以看到已经插入到数据库中:

在这里插入图片描述


http://www.kler.cn/a/464827.html

相关文章:

  • 详解MySQL SQL删除(超详,7K,含实例与分析)
  • 使用LLM自回归与超级转义词表生成图像:超越传统扩散模型的新范式
  • Halcon 显示异常
  • Elasticsearch: 高级搜索
  • Leetcode 第426场周赛分析总结
  • node.js之---内置模块
  • 建造者模式 Builder Pattern
  • docker下载redis,zookeeper,kafka超时time out
  • 软件工程大复习(五) 需求工程与需求分析
  • Linux系统安装es详细教程
  • 【IEEE冠名会议】2025年IEEE第二届深度学习与计算机视觉国际会议(DLCV 2025)
  • Go recover的执行时机
  • 剪映--关键帧教程:制作视频文字说明,文字动态划线,透明文字,虚拟触控,画面旋转缩小退出
  • TCP IP 网络协议基础入门 1
  • 加速开发体验:为 Android Studio 设置国内镜像源
  • VSCode函数调用关系图插件开发(d3-graphviz)
  • Git核心概念总结
  • 2022浙江大学信号与系统笔记
  • 小程序租赁系统的优势与应用探索
  • Android笔试面试题AI答之Android基础(11)
  • BFS中的双向广搜和A-star
  • 深入理解 PHP 构造函数和析构函数:附示例代码
  • 【JVM】JVM自学笔记(类加载子系统、运行时数据区、执行引擎)
  • Python世界:高频小技巧总结
  • 低代码开发:开启企业数智化转型“快捷键”
  • Python 图像处理:生成美丽的书籍封面