实时数仓:Flink 任务实现、Hudi 表设计细节或治理工具的具体配置
1. Flink 任务实现
1.1 数据流架构
在数据流中,Flink任务通常完成如下功能:
- 数据接入:从 Kafka 消费实时数据流。
- 数据清洗:进行数据过滤、字段映射和格式转换。
- 数据聚合:基于时间窗口或事件触发计算聚合指标。
- 数据写入:实时将结果写入 Hudi 表或 Kafka/Doris。
1.2 核心代码示例
假设我们需要实现一个电商平台订单事件处理任务:
- 数据源:Kafka。
- 输出目标:Hudi表(如
dwd_order_fact
)或数据湖分层。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.hudi.sink.HudiSink;
import java.util.Properties;
public class FlinkHudiExample {
public static void main(String[] args) throws Exception {
// 1. 创建Flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // 每60秒做一次检查点
// 2. Kafka源配置
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "kafka:9092");
kafkaProps.setProperty("group.id", "order-group");
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
"order_topic",
new SimpleStringSchema(),
kafkaProps
);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
// 3. 数据流逻辑
env
.addSource(kafkaSource)
.map(raw -> parseOrder(raw)) // 数据解析
.filter(order -> order != null && order.isValid()) // 数据清洗
.keyBy(order -> order.getUserId()) // 基于用户ID分组
.window(TumblingEventTimeWindows.of(Time.minutes(1))) // 按1分钟窗口聚合
.reduce((order1, order2) -> mergeOrders(order1, order2)) // 聚合逻辑
.addSink(HudiSink.forTable("hdfs://path/to/hudi/dwd_order_fact")); // 写入Hudi表
// 4. 执行任务
env.execute("Flink Hudi Real-time ETL");
}
private static Order parseOrder(String raw) {
// 假设订单数据是JSON格式,解析为Order对象
return JsonUtils.fromJson(raw, Order.class);
}
private static Order mergeOrders(Order order1, Order order2) {
// 合并订单逻辑:汇总订单金额、更新状态等
order1.setAmount(order1.getAmount() + order2.getAmount());
return order1;
}
}
1.3 关键优化点
-
Watermark配置:
- 如果事件时间不严格递增,使用
BoundedOutOfOrdernessWatermarks
设置最大延迟。 - 示例:
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5));
- 如果事件时间不严格递增,使用
-
状态管理:
- 使用 Flink 的 RocksDB 作为状态后端,保证大规模状态数据的可靠性。
-
Checkpoint设置:
- 开启增量检查点,避免全量状态存储耗时:
env.getCheckpointConfig().enableIncrementalCheckpoints(true);
- 开启增量检查点,避免全量状态存储耗时:
2. Hudi 表设计细节
2.1 表结构设计
-
表类型:
- Copy-on-Write (COW):适合查询频繁的场景(如ADS层)。
- Merge-on-Read (MOR):适合实时写入场景(如ODS/DWD层)。
-
分区字段:
- 按业务字段分区:如
order_date
(按天分区)或order_status
(按状态分区)。 - 路径设计:
{主题域}/{年}/{月}/{日}
。
- 按业务字段分区:如
-
主键选择:
- 例如:
order_id
,保证数据去重。
- 例如:
2.2 Hudi 配置
配置 Hudi Sink 的关键参数:
hoodie.datasource.write.recordkey.field=order_id
hoodie.datasource.write.precombine.field=update_time
hoodie.datasource.write.partitionpath.field=order_date
hoodie.datasource.write.hive.sync.enable=true
hoodie.datasource.write.hive.database=real_time_dw
hoodie.datasource.write.hive.table=dwd_order_fact
hoodie.datasource.write.hive.partition_fields=order_date
hoodie.table.type=COPY_ON_WRITE
2.3 查询优化
- Bloom索引:加速主键匹配,适用于去重。
- 列式存储:存储格式选择 Parquet,压缩优化查询性能。
- 数据更新策略:通过
precombine.field
确保按最新记录更新。
3. 治理工具配置
3.1 数据质量(Great Expectations)
规则配置示例:校验字段非空、唯一性、值域
expectations:
- expectation_type: expect_column_values_to_not_be_null
kwargs:
column: order_id
- expectation_type: expect_column_values_to_be_unique
kwargs:
column: order_id
- expectation_type: expect_column_values_to_be_in_set
kwargs:
column: order_status
value_set: ["CREATED", "PAID", "SHIPPED", "CANCELLED"]
3.2 数据权限(Apache Ranger)
策略配置示例:按主题域控制权限
用户组 | 数据库 | 表名 | 权限 |
---|---|---|---|
BI_Group | real_time_dw | ads_* | 查询(SELECT) |
ETL_Team | real_time_dw | dwd_* | 查询、写入 |
Admins | real_time_dw | * | 全部权限 |
3.3 数据血缘(Apache Atlas)
-
启用Flink血缘:
- 将 Flink 作业与 Hudi Sink 的血缘关系记录入 Atlas。
-
血缘展示:
- 支持可视化展示:
- 从 Kafka -> Flink -> Hudi 表 -> BI工具。
- 支持可视化展示: