当前位置: 首页 > article >正文

Apache SeaTunnel脚本升级及参数调优实战

最近作者针对实时数仓的Apache SeaTunnel同步链路,完成了双引擎架构升级与全链路参数深度调优,希望本文能够给大家有所启发,欢迎批评指正!

在这里插入图片描述

Apache SeaTunnel 版本 :2.3.9

Doris版本:2.0.6

MySQL JDBC Connector : 8.0.28

架构升级

  • 批处理链路:JDBC并行度进行提升,基于ID分区实现分片读取,结合批量参数(fetch_size=10000+batch_size=5000)使全量同步吞吐量大幅增加

  • 实时增量链路:引入MySQL-CDC组件,通过initial快照模式+chunk.size.rows=8096实现全量/增量平滑切换,事件延迟压降至500ms内

稳定性增强

  • 资源管控:JDBC连接池动态扩容(max_size=20)+ CDC限流策略(rows_per_second=1000),源库CPU峰值负载下降40%

  • 容错机制:Doris两阶段提交(enable-2pc=true)配合检查点(checkpoint.interval=10s),故障恢复时间缩短80%

写入优化

  • 缓冲区三级联控(buffer-size=10000+buffer-count=3+flush.interval=5s)提升Doris写入批次质量

  • Tablet粒度控制(request_tablet_size=5)使BE节点负载均衡度提升

实战演示

同步之前创建Doris表

在这里插入图片描述

-- DROP TABLE IF EXISTS ods.ods_activity_info_full;
CREATE TABLE ods.ods_activity_info_full
(
    `id`            VARCHAR(255) COMMENT '活动id',
    `k1`            DATE NOT NULL   COMMENT '分区字段',
    `activity_name` STRING COMMENT '活动名称',
    `activity_type` STRING COMMENT '活动类型',
    `activity_desc` STRING COMMENT '活动描述',
    `start_time`    STRING COMMENT '开始时间',
    `end_time`      STRING COMMENT '结束时间',
    `create_time`   STRING COMMENT '创建时间'
)
    ENGINE=OLAP  -- 使用Doris的OLAP引擎,适用于高并发分析场景
    UNIQUE KEY(`id`,`k1`)  -- 唯一键约束,保证(id, k1)组合的唯一性(Doris聚合模型特性)
COMMENT '活动信息全量表'
PARTITION BY RANGE(`k1`) ()  -- 按日期范围分区(具体分区规则由动态分区配置决定)
DISTRIBUTED BY HASH(`id`)  -- 按id哈希分桶,保证相同id的数据分布在同一节点
PROPERTIES
(
    "replication_allocation" = "tag.location.default: 1",  -- 副本分配策略:默认标签分配1个副本
    "is_being_synced" = "false",          -- 是否处于同步状态(通常保持false)
    "storage_format" = "V2",             -- 存储格式版本(V2支持更高效压缩和索引)
    "light_schema_change" = "true",      -- 启用轻量级schema变更(仅修改元数据,无需数据重写)
    "disable_auto_compaction" = "false", -- 启用自动压缩(合并小文件提升查询性能)
    "enable_single_replica_compaction" = "false", -- 禁用单副本压缩(多副本时保持数据一致性)

    "dynamic_partition.enable" = "true",            -- 启用动态分区
    "dynamic_partition.time_unit" = "DAY",          -- 按天创建分区
    "dynamic_partition.start" = "-60",             -- 保留最近60天的历史分区
    "dynamic_partition.end" = "3",                 -- 预先创建未来3天的分区
    "dynamic_partition.prefix" = "p",              -- 分区名前缀(如p20240101)
    "dynamic_partition.buckets" = "32",            -- 每个分区的分桶数(影响并行度)
    "dynamic_partition.create_history_partition" = "true", -- 自动创建缺失的历史分区

    "bloom_filter_columns" = "id,activity_name",  -- 为高频过滤字段(id/名称)创建布隆过滤器,加速WHERE查询
    "compaction_policy" = "time_series",          -- 按时间序合并策略优化时序数据(适合活动时间字段)
    "enable_unique_key_merge_on_write" = "true",  -- 唯一键写时合并(实时更新场景减少读放大)
    "in_memory" = "false"                        -- 关闭全内存存储(仅小表可开启)
);
配置SeaTunnel JDBC同步脚本

在这里插入图片描述

env {
  # 环境配置
  parallelism = 8                     # 增加并行度以提高吞吐量
  job.mode = "STREAMING"              # 使用流式处理模式进行实时同步
  checkpoint.interval = 10000         # 检查点间隔,单位毫秒

  # 限流配置 - 避免对源数据库造成过大压力
  read_limit.bytes_per_second = 10000000  # 每秒读取字节数限制,约10MB/s
  read_limit.rows_per_second = 1000       # 每秒读取行数限制

  # 本地检查点配置
  execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints"
  execution.checkpoint.max-concurrent = 1  # 最大并发检查点数

  # 性能优化参数
  execution.buffer-timeout = 5000          # 缓冲超时时间(毫秒)
  execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"
}

source {
  MySQL-CDC {
    # 基本连接配置
    # server-id = 5652-5657             # MySQL复制客户端的唯一ID范围
    username = "root"                # 数据库用户名
    password = ""                # 数据库密码
    table-names = ["gmall.activity_info"]  # 要同步的表
    base-url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"

    # CDC 特有配置
    schema-changes.enabled = true     # 启用架构变更捕获
    server-time-zone = "Asia/Shanghai"  # 服务器时区

    # 性能优化配置
    snapshot.mode = "initial"         # 初始快照模式
    snapshot.fetch.size = 10000       # 快照获取大小
    chunk.size.rows = 8096            # 分块大小,用于并行快照
    connection.pool.size = 10         # 连接池大小

    # 高级配置
    include.schema.changes = true     # 包含架构变更事件
    scan.startup.mode = "initial"     # 启动模式:initial(全量+增量)
    scan.incremental.snapshot.chunk.size = 8096  # 增量快照分块大小
    debezium.min.row.count.to.stream.results = 1000  # 流式结果的最小行数

    # 容错配置
    connect.timeout = 30000           # 连接超时时间(毫秒)
    connect.max-retries = 3           # 最大重试次数

    # 输出表名
    result_table_name = "mysql_cdc_source"
  }
}

# 可选的转换逻辑,如果需要对数据进行处理
transform {
  Sql {
    source_table_name = "mysql_cdc_source"
    result_table_name = "doris_sink_data"

    # 根据需要转换字段,这里添加了一个分区字段k1
    query = """
      select
        id,
        formatdatetime(create_time,'yyyy-MM-dd') as k1,
        activity_name,
        activity_type,
        activity_desc,
        start_time,
        end_time,
        create_time
      from mysql_cdc_source
    """
  }
}

sink {
  Doris {
    # 基本连接配置
    source_table_name = "doris_sink_data"  # 或直接使用 "mysql_cdc_source"
    fenodes = "192.168.241.128:8030"
    username = "root"
    password = ""
    table.identifier = "ods.ods_activity_info_full"  # Doris目标表

    # 事务和标签配置
    sink.enable-2pc = "true"          # 启用两阶段提交,确保一致性
    sink.label-prefix = "cdc_sync"    # 导入标签前缀

    # 写入模式配置
    sink.properties {
      format = "json"
      read_json_by_line = "true"
      column_separator = "\t"         # 列分隔符
      line_delimiter = "\n"           # 行分隔符
      max_filter_ratio = "0.1"        # 允许的最大错误率

      # CDC特有配置 - 处理不同操作类型
      # 使用Doris的UPSERT模式处理CDC事件
      merge_type = "MERGE"            # 合并类型:APPEND或MERGE
      delete_enable = "true"          # 启用删除操作
    }

    # 性能优化配置
    sink.buffer-size = 10000          # 缓冲区大小
    sink.buffer-count = 3             # 缓冲区数量
    sink.flush.interval-ms = 5000     # 刷新间隔
    sink.max-retries = 3              # 最大重试次数
    sink.parallelism = 8              # 写入并行度

    # Doris连接优化
    doris.config = {
      format = "json"
      read_json_by_line = "true"
      request_connect_timeout_ms = "5000"  # 连接超时
      request_timeout_ms = "30000"         # 请求超时
      request_tablet_size = "5"            # 每个请求的tablet数量
    }
  }
}
配置SeaTunnel MySQLCDC 同步脚本

在这里插入图片描述

env {
  parallelism = 8
  job.mode = "BATCH"
  checkpoint.interval = 30000

  # 本地文件系统检查点
  execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints"
  execution.buffer-timeout = 5000

  # JVM 参数优化
  execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"
}

source {
  Jdbc {
    result_table_name = "mysql_seatunnel"
    url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 30
    user = "gmall"
    password = "gmall"

    # 使用分区并行读取
    query = "select id, activity_name, activity_type, activity_desc, start_time, end_time, create_time from gmall.activity_info"
    partition_column = "id"
    partition_num = 8

    # 连接池配置
    connection_pool {
      max_size = 20
      min_idle = 5
      max_idle_ms = 60000
    }

    # 批处理配置
    fetch_size = 10000
    batch_size = 5000
    is_exactly_once = true
  }
}

transform {
  Sql {
    source_table_name = "mysql_seatunnel"
    result_table_name = "seatunnel_doris"

    query = """
      select 
        id, 
        formatdatetime(create_time,'yyyy-MM-dd') as k1,  
        activity_name, 
        activity_type, 
        activity_desc, 
        start_time, 
        end_time, 
        create_time 
      from mysql_seatunnel
    """
  }
}

sink {
  Doris {
    source_table_name = "seatunnel_doris"
    fenodes = "192.168.241.128:8030"
    username = "root"
    password = ""
    table.identifier = "ods.ods_activity_info_full"
    sink.enable-2pc = "true"
    sink.label-prefix = "test_json"

    # 优化Doris写入配置
    sink.properties {
      format = "json"
      read_json_by_line = "true"
      column_separator = "\t"
      line_delimiter = "\n"
      max_filter_ratio = "0.1"
    }

    # 批量写入配置
    sink.buffer-size = 10000
    sink.buffer-count = 3
    sink.flush.interval-ms = 5000
    sink.max-retries = 3
    sink.parallelism = 8

    doris.config = {
      format = "json"
      read_json_by_line = "true"
      request_connect_timeout_ms = "5000"
      request_timeout_ms = "30000"
      request_tablet_size = "5"
    }
  }
}

最终Apache Doris数据:
在这里插入图片描述

本文完!


http://www.kler.cn/a/594452.html

相关文章:

  • 智能护栏报警系统提升高速公路安全
  • 无人机智能控制系统未来技术发展分析
  • Docker DockerFile和Django最佳实践
  • VSTO(C#)Excel开发11:自定义任务窗格与多个工作簿
  • uniapp中的路由、本地存储与网络请求
  • C# 获取Type对象的方式
  • 从设计到量产:MHO5000如何实现电源EMIEMC测试全流程自动化?
  • 奇瑞汽车智能化战略发布,开启“四大平权”新时代
  • Cool Request:可以统计任意方法耗时
  • Android AI ChatBot-v1.6.3-28-开心版[免登录使用GPT-4o和DeepSeek]
  • vue3中如何缓存路由组件
  • LiteratureReading:[2016] Enriching Word Vectors with Subword Information
  • 分布式 IO 模块:助力实现智慧仓储
  • 如何通过Odoo 18创建与配置服务器操作
  • 从需求到智能管理:用 AntSK 和 DeepSeek-R1 打造企业级知识库与测试用例生成系统
  • PyTorch 深度学习实战(19):离线强化学习与 Conservative Q-Learning (CQL) 算法
  • 210、【图论】课程表(Python)
  • 高可用环境下Nginx服务管理脚本优化实践
  • VUE中使用路由router跳转页面
  • 传统金融和分布式金融