当前位置: 首页 > article >正文

实时数仓:Flink 任务实现、Hudi 表设计细节或治理工具的具体配置

1. Flink 任务实现

1.1 数据流架构

在数据流中,Flink任务通常完成如下功能:

  1. 数据接入:从 Kafka 消费实时数据流。
  2. 数据清洗:进行数据过滤、字段映射和格式转换。
  3. 数据聚合:基于时间窗口或事件触发计算聚合指标。
  4. 数据写入:实时将结果写入 Hudi 表或 Kafka/Doris
1.2 核心代码示例

假设我们需要实现一个电商平台订单事件处理任务:

  • 数据源:Kafka。
  • 输出目标:Hudi表(如dwd_order_fact)或数据湖分层。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.hudi.sink.HudiSink;

import java.util.Properties;

public class FlinkHudiExample {

    public static void main(String[] args) throws Exception {
        // 1. 创建Flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(60000); // 每60秒做一次检查点

        // 2. Kafka源配置
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "kafka:9092");
        kafkaProps.setProperty("group.id", "order-group");

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
                "order_topic",
                new SimpleStringSchema(),
                kafkaProps
        );
        kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

        // 3. 数据流逻辑
        env
            .addSource(kafkaSource)
            .map(raw -> parseOrder(raw)) // 数据解析
            .filter(order -> order != null && order.isValid()) // 数据清洗
            .keyBy(order -> order.getUserId()) // 基于用户ID分组
            .window(TumblingEventTimeWindows.of(Time.minutes(1))) // 按1分钟窗口聚合
            .reduce((order1, order2) -> mergeOrders(order1, order2)) // 聚合逻辑
            .addSink(HudiSink.forTable("hdfs://path/to/hudi/dwd_order_fact")); // 写入Hudi表

        // 4. 执行任务
        env.execute("Flink Hudi Real-time ETL");
    }

    private static Order parseOrder(String raw) {
        // 假设订单数据是JSON格式,解析为Order对象
        return JsonUtils.fromJson(raw, Order.class);
    }

    private static Order mergeOrders(Order order1, Order order2) {
        // 合并订单逻辑:汇总订单金额、更新状态等
        order1.setAmount(order1.getAmount() + order2.getAmount());
        return order1;
    }
}
1.3 关键优化点
  1. Watermark配置

    • 如果事件时间不严格递增,使用 BoundedOutOfOrdernessWatermarks 设置最大延迟。
    • 示例:
      WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5));
      
  2. 状态管理

    • 使用 Flink 的 RocksDB 作为状态后端,保证大规模状态数据的可靠性。
  3. Checkpoint设置

    • 开启增量检查点,避免全量状态存储耗时:
      env.getCheckpointConfig().enableIncrementalCheckpoints(true);
      

2. Hudi 表设计细节

2.1 表结构设计
  • 表类型

    • Copy-on-Write (COW):适合查询频繁的场景(如ADS层)。
    • Merge-on-Read (MOR):适合实时写入场景(如ODS/DWD层)。
  • 分区字段

    1. 按业务字段分区:如order_date(按天分区)或order_status(按状态分区)。
    2. 路径设计{主题域}/{年}/{月}/{日}
  • 主键选择

    • 例如:order_id,保证数据去重。
2.2 Hudi 配置

配置 Hudi Sink 的关键参数:

hoodie.datasource.write.recordkey.field=order_id
hoodie.datasource.write.precombine.field=update_time
hoodie.datasource.write.partitionpath.field=order_date
hoodie.datasource.write.hive.sync.enable=true
hoodie.datasource.write.hive.database=real_time_dw
hoodie.datasource.write.hive.table=dwd_order_fact
hoodie.datasource.write.hive.partition_fields=order_date
hoodie.table.type=COPY_ON_WRITE
2.3 查询优化
  • Bloom索引:加速主键匹配,适用于去重。
  • 列式存储:存储格式选择 Parquet,压缩优化查询性能。
  • 数据更新策略:通过 precombine.field 确保按最新记录更新。

3. 治理工具配置

3.1 数据质量(Great Expectations)

规则配置示例:校验字段非空、唯一性、值域

expectations:
  - expectation_type: expect_column_values_to_not_be_null
    kwargs:
      column: order_id
  - expectation_type: expect_column_values_to_be_unique
    kwargs:
      column: order_id
  - expectation_type: expect_column_values_to_be_in_set
    kwargs:
      column: order_status
      value_set: ["CREATED", "PAID", "SHIPPED", "CANCELLED"]
3.2 数据权限(Apache Ranger)

策略配置示例:按主题域控制权限

用户组数据库表名权限
BI_Groupreal_time_dwads_*查询(SELECT)
ETL_Teamreal_time_dwdwd_*查询、写入
Adminsreal_time_dw*全部权限
3.3 数据血缘(Apache Atlas)
  1. 启用Flink血缘

    • 将 Flink 作业与 Hudi Sink 的血缘关系记录入 Atlas。
  2. 血缘展示

    • 支持可视化展示:
      • 从 Kafka -> Flink -> Hudi 表 -> BI工具。


http://www.kler.cn/a/474276.html

相关文章:

  • 2025新春烟花代码(二)HTML5实现孔明灯和烟花效果
  • Openwrt @ rk3568平台 固件编译实践(二)- ledeWRT版本
  • 掌握正则表达式:从入门到精通的实战指南
  • NRC优先级中比较特殊的—NRC0x13和NRC0x31
  • 继承(6)
  • 关于Mac使用VSCode连接虚拟机
  • DC/AC并网逆变器模型与仿真MATLAB
  • 计算机网络——网络层—IP数据报与分片
  • 高光谱相机的特点
  • JDK 17 模块化系统:构建可维护大型项目的基石
  • 关机重启后,GitLab服务异常
  • Yolo11改进:注意力改进|Block改进|ESSAformer,用于高光谱图像超分辨率的高效Transformer|即插即用
  • 本地LLM部署--Open WebUI(多媒体工具FFMPEG作用)
  • uniapp使用chooseLocation安卓篇
  • Ruby语言的数据类型
  • 给Kkfileview加请求头鉴权接入
  • docker 常用命令实践DEMO
  • 大风车excel:怎么把题库导入excel?题库导入excel
  • patchwork++地面分割学习笔记
  • 深度学习:原理、应用与前沿进展
  • MySQL Binlog 监听方案
  • 将文件上传至hdfs(SpringBoot)
  • 阿里云 AI 搜索方案解读:大模型驱动下的智能搜索,助力企业数字化转型
  • 2024大模型安全研究方向总结(附实践资料)
  • ZYNQ初识8(zynq_7010)FIFO_IP核
  • 【银河麒麟高级服务器操作系统】服务器异常重启故障分析及处理建议