当前位置: 首页 > article >正文

如何基于Apache SeaTunnel 读取Oracle的数据

引言

在大数据时代,企业面临着数据的快速增长和多样化需求,如何高效地处理和整合来自不同数据源的数据成为了关键问题。

Apache SeaTunnel作为一款开源数据集成工具,提供了灵活的数据处理和实时数据同步能力,广泛应用于数据仓库、数据湖及实时分析场景中。与此同时,Oracle 数据库以其高性能和可靠性,成为许多企业数据存储的首选。结合 Apache SeaTunnel 与 Oracle 数据库,可以实现高效的数据迁移与转换。

Apache SeaTunnel

Apache SeaTunnel 是一款数据集成工具,支持批处理和流处理。它允许用户通过简单的配置实现数据的抽取、转换和加载(ETL)。SeaTunnel 提供了多种连接器,能够轻松集成不同的数据源和目标,包括关系型数据库、NoSQL 数据库、文件系统等。其灵活性和扩展性使其成为企业数据集成的重要选择。

Oracle 数据库

Oracle 数据库是一款广泛使用的关系型数据库管理系统,以其强大的事务处理能力和安全性而著称。它适用于处理大型企业级应用程序的数据存储需求。Oracle 支持多种数据类型和复杂查询,能够高效地管理和分析大量数据,适合用于金融、电信、医疗等行业。

Oracle JDBC 连接器概述

连接器描述

Oracle JDBC 连接器通过 JDBC 方式读取外部数据源的数据,支持 Apache SeaTunnel 的多种引擎,包括 Spark、Flink 和 SeaTunnel Zeta。

使用依赖

  1. 确保将 JDBC 驱动 jar 包 放置在 ${SEATUNNEL_HOME}/plugins/ 目录中。
  2. 为支持国际化字符集,复制 orai18n.jar${SEATUNNEL_HOME}/plugins/ 目录。

SeaTunnel Zeta 引擎

  1. 确保将 JDBC 驱动 jar 包 放置在 ${SEATUNNEL_HOME}/lib/ 目录中。
  2. 为支持国际化字符集,复制 orai18n.jar${SEATUNNEL_HOME}/lib/ 目录。

主要特性

  • 批处理支持
  • 精确一次(exactly-once)语义
  • 列投影(column projection)
  • 并行处理(parallelism)
  • 支持用户定义的分割(user-defined split)

支持的数据源信息

数据源支持版本驱动URLMaven
Oracle依赖版本不同有不同驱动类oracle.jdbc.OracleDriverjdbc:oracle:thin:@datasource01:1523:xehttps://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8

数据库依赖

请下载对应于 Maven 的支持列表,并将其复制到 ${SEATUNNEL_HOME}/plugins/jdbc/lib/ 工作目录中。

例如 Oracle 数据源:

cp ojdbc8-xxxxxx.jar $SEATUNNEL_HOME/lib/

为支持国际化字符集,复制 orai18n.jar${SEATUNNEL_HOME}/lib/ 目录中。

数据类型映射

Oracle 数据类型SeaTunnel 数据类型
INTEGERDECIMAL(38,0)
FLOATDECIMAL(38,18)
NUMBER(precision <= 9, scale == 0)INT
NUMBER(9 < precision <= 18, scale == 0)BIGINT
NUMBER(18 < precision, scale == 0)DECIMAL(38,0)
NUMBER(scale != 0)DECIMAL(38,18)
BINARY_DOUBLEDOUBLE
BINARY_FLOAT
REAL
FLOAT
CHAR
NCHAR
VARCHAR
NVARCHAR2
VARCHAR2
LONG
ROWID
NCLOB
CLOB
XML
STRING
DATETIMESTAMP
TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE
TIMESTAMP
BLOB
RAW
LONG RAW
BFILE
BYTES

源选项

名称类型是否必需默认值描述
urlString-JDBC 连接的 URL。例如:jdbc:oracle:thin:@datasource01:1523:xe
driverString-用于连接远程数据源的 JDBC 类名,例如 oracle.jdbc.OracleDriver
userString-连接实例的用户名
passwordString-连接实例的密码
queryString-查询语句
connection_check_timeout_secInt30验证连接时的超时时间(秒)
partition_columnString-用于并行处理的分区列名,仅支持数值类型主键
partition_lower_boundBigDecimal-扫描的 partition_column 最小值
partition_upper_boundBigDecimal-扫描的 partition_column 最大值
partition_numIntjob parallelism分区数量,仅支持正整数,默认值为作业并行性
fetch_sizeInt0查询时的行抓取大小,0 表示使用 JDBC 默认值
propertiesMap-其他连接配置参数
名称类型是否必需默认值描述
urlString-JDBC 连接的 URL,例如:jdbc:mysql://localhost:3306/test
driverString-用于连接远程数据源的 JDBC 类名,例如 MySQL 为 com.mysql.cj.jdbc.Driver
userString-连接实例的用户名
passwordString-连接实例的密码
queryString-查询语句
connection_check_timeout_secInt30验证连接时的超时时间(秒)
partition_columnString-用于并行处理的分区列名,仅支持数值类型主键,且只能配置一列。
partition_lower_boundBigDecimal-扫描的 partition_column 最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。
partition_upper_boundBigDecimal-扫描的 partition_column 最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。
partition_numIntjob parallelism分区数量,仅支持正整数,默认值为作业并行性。
fetch_sizeInt0查询时的行抓取大小,0 表示使用 JDBC 默认值。
propertiesMap-其他连接配置参数,当 properties 和 URL 有相同参数时,优先级由具体驱动的实现决定。
table_pathString-表的完整路径,可以使用该配置替代 query。例如: MySQL 为 testdb.table1,Oracle 为 test_schema.table1
table_listArray-要读取的表列表,可以使用该配置替代 table_path。例如: [{ table_path = "testdb.table1"}, {table_path = "testdb.table2", query = "select * id, name from testdb.table2"}]
where_conditionString-所有表/查询的通用行过滤条件,必须以 where 开头,例如 where id > 100
split.sizeInt8096表的拆分大小(行数),捕获的表在读取时被拆分为多个部分。
split.even-distribution.factor.lower-boundDouble0.05块键分布因子的下限,用于判断表数据是否均匀分布。
split.even-distribution.factor.upper-boundDouble100块键分布因子的上限,用于判断表数据是否均匀分布。
split.sample-sharding.thresholdInt10000触发样本分片策略的估计分片数量阈值。
split.inverse-sampling.rateInt1000样本分片策略中使用的采样率的反值。
common-options-源插件通用参数,详细信息请参考 Source Common Options。

并行读取

JDBC 源连接器支持从表中并行读取数据。SeaTunnel 将使用一定规则拆分表中的数据,并将其交给读取器进行读取。读取器的数量由 parallelism 选项决定。

拆分键规则

  1. 如果 partition_column 不为空,则使用该列进行计算。该列必须为 支持的拆分数据类型
  2. 如果 partition_column 为空,SeaTunnel 将读取表的模式并获取主键和唯一索引。如果主键和唯一索引中有多个列,则使用第一个支持的拆分数据类型列进行拆分。例如,表的主键为 (guid, name),因为 guid 不在 支持的拆分数据类型 中,因此使用 name 列进行拆分。

支持的拆分数据类型

  • String
  • Number (int, bigint, decimal, ...)
  • Date

拆分相关选项

split.size

每个拆分中包含的行数,捕获的表在读取时被拆分为多个部分。

split.even-distribution.factor.lower-bound

不推荐使用

块键分布因子的下限。用于判断表数据是否均匀分布。如果计算出的分布因子大于或等于该下限(即 (MAX(id) - MIN(id) + 1) / row count),则表块将被优化为均匀分布。否则,如果分布因子小于下限,则表将被视为不均匀分布,如果估计的分片数量超过 sample-sharding.threshold 指定的值,将使用基于采样的分片策略。默认值为 0.05。

split.even-distribution.factor.upper-bound

不推荐使用

块键分布因子的上限。用于判断表数据是否均匀分布。如果计算出的分布因子小于或等于该上限(即 (MAX(id) - MIN(id) + 1) / row count),则表块将被优化为均匀分布。否则,如果分布因子大于上限,则表将被视为不均匀分布,如果估计的分片数量超过 sample-sharding.threshold 指定的值,将使用基于采样的分片策略。默认值为 100.0。

split.sample-sharding.threshold

此配置指定估计分片数量的阈值,以触发样本分片策略。当分布因子超出指定的上下限时,且估计的分片数量(计算为近似行数 / 块大小)超过此阈值时,将使用样本分片策略。这可以帮助更高效地处理大型数据集。默认值为 1000 个分片。

split.inverse-sampling.rate

样本分片策略中使用的采样率的反值。例如,如果该值设置为 1000,则在采样过程中应用 1/1000 的采样率。此选项提供灵活性,以控制采样的粒度,从而影响最终的分片数量。尤其在处理非常大的数据集时,较低的采样率更为合适。默认值为 1000。

partition_column [string]

拆分数据的列名。

partition_upper_bound [BigDecimal]

扫描的 partition_column 最大值,如果未设置,SeaTunnel 将查询数据库获取最大值。

partition_lower_bound [BigDecimal]

扫描的 partition_column 最小值,如果未设置,SeaTunnel 将查询数据库获取最小值。

partition_num [int]

不推荐使用,正确的方法是通过 split.size 控制拆分的数量。

我们需要拆分成多少个块,仅支持正整数,默认值为作业并行性。

小贴士

如果表无法拆分(例如,表没有主键或唯一索引,且未设置 partition_column),则将以单一并发执行。

使用 table_path 替代 query 进行单表读取。如果需要读取多个表,使用 table_list

示例任务

以下是一些使用 JDBC 源连接器的任务示例:

简单示例

该示例在 TEST_TABLE 表中查询所有字段,您还可以指定要查询哪些字段以最终输出到控制台。

env {
  parallelism = 4
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        user = "root"
        password = "123456"
        query = "SELECT * FROM TEST_TABLE"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

partition_column 并行读取

使用配置的分片字段和分片数据并行读取查询表,如果想读取整个表,可以使用此方法。

env {
  parallelism = 4
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        user = "root"
        password = "123456"
        partition_column = "ID"
        partition_num = 10
    }
}
sink {
  Console {}
}

按主键或唯一索引并行读取

通过配置 table_path 开启自动分片,可以配置 split.* 来调整分片策略。

env {
  parallelism = 4
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        user = "root"
        password = "123456"
        table_path = "DA.SCHEMA1.TABLE1"
        query = "select * from SCHEMA1.TABLE1"
        split.size = 10000
    }
}
sink {
  Console {}
}

并行读取

根据配置的上下限更高效地读取数据。

source {
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        user = "root"
        password = "123456"
        partition_column = "ID"
        partition_lower_bound = 1
        partition_upper_bound = 500
        partition_num = 10
    }
}

多表读取

配置 table_list 将开启自动拆分,以配置split.来调整分割策略*。

env {
  job.mode = "BATCH"
  parallelism = 4
}
source {
  Jdbc {
    url = "jdbc:oracle:thin:@datasource01:1523:xe"
    driver = "oracle.jdbc.OracleDriver"
    connection_check_timeout_sec = 100
    user = "root"
    password = "123456"
    "table_list"=[
        {
            "table_path"="XE.TEST.USER_INFO"
        },
        {
            "table_path"="XE.TEST.YOURTABLENAME"
        }
    ]
    #where_condition= "where id > 100"
    split.size = 10000
    #split.even-distribution.factor.upper-bound = 100
    #split.even-distribution.factor.lower-bound = 0.05
    #split.sample-sharding.threshold = 1000
    #split.inverse-sampling.rate = 1000
  }
}

sink {
  Console {}
}

总结

Apache SeaTunnel为连接和集成Oracle数据库提供了灵活的解决方案。通过简单的配置,用户可以高效地从Oracle数据库读取和处理数据,满足不同的业务需求。

无论是在实时数据处理还是批量数据集成场景中,SeaTunnel都能为用户带来显著的便利和高效。

本文由 白鲸开源科技 提供发布支持!


http://www.kler.cn/a/374374.html

相关文章:

  • 使用 pyreqs 快速创建 requirements.txt PyCharm 中 UnicodeDecodeError 问题
  • json字符串或者json文件转换成相应的bean,报错“Unrecognized field xxx , not marked as ignorable”
  • Burp炮台实现(动态ip发包)
  • UE5 崩溃问题汇总!!!
  • B树的实现
  • 查看php已安装扩展命令
  • Metasploit(MSF)使用
  • elasticsearch7.x在k8s中的部署
  • 【Visual Studio】解决 CC++ 控制台程序 printf 函数输出中文和换行符显示异常
  • logback 替换日志中的类名
  • 【论文复现】以思维链为线索推理隐含情感
  • git commit应遵循的提交规范
  • 【设计模式】Java创建型设计模式之工厂模式魔法:打造灵活的冰箱工厂
  • 科研项目:利用AI大模型获得基金资助的10个原则
  • 家用储能用什么电表呢?
  • CentOS 9 Stream 上安装 WebStorm
  • 在浏览器和Node.js环境中使用Puppeteer的Rollup与Webpack打包指南
  • 【p2p、分布式,区块链笔记 分布式容错算法】: 拜占庭将军问题+实用拜占庭容错算法PBFT
  • 【客户端开发】electron 中无法使用 js-cookie 的问题
  • 基于单片机的家用电器电能测量仪设计
  • ElSelect 组件的 onChange 和 onInput 事件的区别
  • 三菱FX5UPLC 安全功能
  • EMQX MQTT消息服务器安装内网穿透配置WS公网地址远程连接
  • C# 编程基础:深入解析构造函数与析构函数
  • ClickHouse 5节点集群安装
  • node.js_npm : 无法加载文件 D:\Program Files\nodejs\npm.ps1