当前位置: 首页 > article >正文

实时数仓:Apache Iceberg 的表管理与实时数仓架构设计

Apache Iceberg 的表管理与实时数仓架构设计

Apache Iceberg 是一种面向大规模数据湖的开源表格式,适用于批处理和流处理场景,具有 ACID 事务支持、高效的元数据管理、时间旅行等特点。以下内容涵盖 Iceberg 的表设计、实时数据处理(Flink 或 Spark)、性能优化及数据治理细节。


1. Iceberg 表设计

1.1 表结构设计

Iceberg 表可以分层组织,适应 ODS、DWD、DWS 和 ADS 层的不同需求。

表定义示例
以订单数据为例:

CREATE TABLE real_time_dw.dwd_order_fact (
    order_id STRING COMMENT '订单ID',
    user_id STRING COMMENT '用户ID',
    product_id STRING COMMENT '产品ID',
    amount DOUBLE COMMENT '订单金额',
    order_date DATE COMMENT '订单日期',
    update_time TIMESTAMP COMMENT '更新时间'
)
PARTITIONED BY (order_date)
STORED AS ICEBERG;
1.2 分区设计

Iceberg 支持灵活的分区策略,避免小文件问题。

  • 分区字段

    • 按日期分区:order_date
    • 按小时分区(实时场景):order_date, HOUR(update_time)
  • 隐藏分区(Partition Transformations)
    Iceberg 支持复杂的分区表达式,简化查询逻辑。

    PARTITIONED BY (years(order_date), months(order_date), days(order_date))
    
1.3 数据压缩与存储格式
  • 存储格式:Iceberg 支持 ParquetORC,推荐使用列式存储的 Parquet。
  • 压缩策略:可选 SnappyZSTD,综合性能优选 ZSTD

配置示例:

write.format.default=parquet
write.compress.default=zstd

2. 实时数据处理

2.1 Flink 实现

Flink 与 Iceberg 集成,通过流式写入实现实时 ETL。

实时消费 Kafka 并写入 Iceberg

import org.apache.flink.table.api.*;
import org.apache.flink.types.Row;

public class FlinkIcebergExample {
    public static void main(String[] args) throws Exception {
        // 创建表环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        // 定义 Kafka 源表
        tableEnv.executeSql(
            "CREATE TABLE kafka_orders ( " +
            "  order_id STRING, " +
            "  user_id STRING, " +
            "  product_id STRING, " +
            "  amount DOUBLE, " +
            "  order_date DATE, " +
            "  update_time TIMESTAMP(3), " +
            "  WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND " +
            ") WITH ( " +
            "  'connector' = 'kafka', " +
            "  'topic' = 'order_topic', " +
            "  'properties.bootstrap.servers' = 'localhost:9092', " +
            "  'format' = 'json' " +
            ")"
        );

        // 定义 Iceberg 目标表
        tableEnv.executeSql(
            "CREATE TABLE iceberg_orders ( " +
            "  order_id STRING, " +
            "  user_id STRING, " +
            "  product_id STRING, " +
            "  amount DOUBLE, " +
            "  order_date DATE, " +
            "  update_time TIMESTAMP(3) " +
            ") PARTITIONED BY (order_date) " +
            "WITH ( " +
            "  'connector' = 'iceberg', " +
            "  'catalog-name' = 'hadoop_catalog', " +
            "  'catalog-type' = 'hadoop', " +
            "  'warehouse' = 'hdfs://path/to/warehouse' " +
            ")"
        );

        // 实现流式写入
        tableEnv.executeSql(
            "INSERT INTO iceberg_orders " +
            "SELECT order_id, user_id, product_id, amount, order_date, update_time FROM kafka_orders"
        );
    }
}
2.2 Spark 实现

Spark 提供批量和流式处理的支持,以下为流式写入 Iceberg 的示例。

实时流写入 Iceberg

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType, DateType

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("Spark Iceberg Streaming") \
    .config("spark.sql.catalog.hadoop_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.hadoop_catalog.type", "hadoop") \
    .config("spark.sql.catalog.hadoop_catalog.warehouse", "hdfs://path/to/warehouse") \
    .getOrCreate()

# 定义 Kafka 数据 Schema
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("order_date", DateType(), True),
    StructField("update_time", TimestampType(), True)
])

# 从 Kafka 消费数据
kafka_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "order_topic") \
    .load()

parsed_df = kafka_df.selectExpr("CAST(value AS STRING)").select(from_json(col("value"), schema).alias("data")).select("data.*")

# 写入 Iceberg 表
parsed_df.writeStream \
    .format("iceberg") \
    .outputMode("append") \
    .option("path", "hadoop_catalog.real_time_dw.dwd_order_fact") \
    .start()

3. Iceberg 表治理

3.1 清理无效数据
  • 使用 ExpireSnapshots 清理旧的快照:
    CALL hadoop_catalog.system.expire_snapshots(
        table => 'real_time_dw.dwd_order_fact',
        older_than => TIMESTAMP '2025-01-01 00:00:00'
    );
    
3.2 合并小文件
  • 合并小文件以优化查询性能:
    CALL hadoop_catalog.system.rewrite_data_files(
        table => 'real_time_dw.dwd_order_fact'
    );
    
3.3 表分层管理
  • 为不同层(如 ODS、DWD、ADS)设置不同的优化策略。例如:
    • ODS 层:多小文件快速写入,延迟优化。
    • DWD 层:定期合并小文件,确保查询效率。

4. 性能优化

4.1 小文件问题

Iceberg 内置合并策略,减少小文件的生成:

write.distribution-mode=hash
write.target-file-size-bytes=134217728  # 128 MB
4.2 查询优化
  • 启用 vectorized reads 提高读取性能:

    read.parquet.vectorization.enabled=true
    
  • 利用分区裁剪(Partition Pruning)减少扫描数据量:

    SELECT * FROM dwd_order_fact WHERE order_date = '2025-01-01';
    
4.3 动态分区写入

Iceberg 支持动态分区生成:

write.partition.default-name=my_partition

5. 数据治理工具

5.1 数据血缘

通过 Apache Atlas 集成 Iceberg 的表和元数据,实现血缘追踪。

5.2 数据质量校验

使用工具(如 Great Expectations)校验 Iceberg 数据表:

validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_in_set("order_status", ["CREATED", "PAID", "CANCELLED"])
5.3 权限控制

结合 Apache Ranger 实现 Iceberg 表级权限控制。



http://www.kler.cn/a/473033.html

相关文章:

  • React Native 项目 Error: EMFILE: too many open files, watch
  • vue2日历组件
  • 单片机实现模式转换
  • 力扣刷题:数组OJ篇(下)
  • 路由器的转发表
  • FastAPI 的依赖注入与生命周期管理深度解析
  • [读书日志]从零开始学习Chisel 第八篇:Scala的集合(敏捷硬件开发语言Chisel与数字系统设计)
  • MySQL error [Warning] InnoDB: Table mysql/innodb_index_stats has length
  • win32汇编环境,怎么进行乘法运算的
  • (一)使用 WebGL 绘制一个简单的点和原理解析
  • 《计算机视觉解锁图像理解密码:编程实现图片场景文字描述生成》
  • Ungoogled Chromium127 编译指南 MacOS篇(七)- 安装依赖包
  • 【郑大主办、ACM出版、EI稳定检索】第四届密码学、网络安全与通信技术国际会议 (CNSCT 2025)
  • 瑞芯微rk3568平台 openwrt系统适配ffmpeg硬件解码(rkmpp)
  • spring cloud alibaba-dubbo3 rpc运行原理
  • 最新版Edge浏览器加载ActiveX控件之Adobe PDF阅读器控件
  • 【MySQL高可用】什么时候分库、分表?怎样进行分库分表?
  • 网络安全 | 网络安全的未来趋势
  • 数智大咖谈 | 志邦家居:智慧家居与智慧管理并驾齐驱
  • select下拉框,首次进入页面没有显示value的情况
  • 如何构建多层决策树
  • c#编写基于ffmpeg的视频裁剪
  • 【VBA】【EXCEL】将某列内容横向粘贴到指定行
  • 点击底部的 tabBar 属于 wx.switchTab 跳转方式,目标页面的 onLoad 不会触发(除非是第一次加载)
  • P1909 [NOIP2016 普及组] 买铅笔 题解
  • python初体验: 处理excel数据