分布式的消息流平台之Pulsar
Pulsar 流处理详解
Apache Pulsar 是一个分布式的消息流平台,集成了**消息队列(MQ)和流处理(Stream Processing)**能力。Pulsar 不仅提供低延迟、高吞吐的消息传输能力,还支持基于 Pulsar Functions、Flink、Spark Streaming 的流式处理能力。
本篇详细介绍 Pulsar 的流处理能力,涵盖 核心概念、流处理模式、编程模型、集成生态、应用场景 等方面。
1. Pulsar 流处理概述
(1)Pulsar 的流处理能力
Pulsar 主要通过以下方式实现流处理:
- Pulsar Functions:轻量级流处理框架,适用于简单的 ETL、数据转换、事件处理等任务。
- Flink & Spark Streaming 集成:Pulsar 提供 Flink 和 Spark Streaming 连接器,支持复杂流处理任务,如窗口计算、数据聚合、模式匹配等。
- Pulsar IO:内置的 Source/Sink 连接器,支持数据流的输入输出,如 Kafka、Elasticsearch、JDBC、HDFS 等。
(2)Pulsar 流处理 VS 传统流处理
特性 | Pulsar Functions | Flink on Pulsar | Kafka Streams |
---|---|---|---|
复杂度 | 低(适合轻量任务) | 高(适合复杂任务) | 中等(偏向事件流处理) |
集成性 | 内置在 Pulsar 中 | 需集成 Flink/Spark | 依赖 Kafka |
扩展性 | 高(自动扩展) | 高(分布式计算) | 中等(依赖 Kafka 集群) |
窗口计算 | 支持基本窗口计算 | 强大,支持滚动、滑动、会话窗口 | 支持窗口操作 |
2. Pulsar 流处理核心概念
(1)Pulsar Functions
Pulsar Functions 是一种轻量级计算框架,专为 Pulsar 设计,允许开发者编写无状态(Stateless)或有状态(Stateful)的流处理逻辑,并直接运行在 Pulsar 集群中,而无需额外的计算框架(如 Flink 或 Spark)。
Pulsar Functions 关键特性
- 轻量级:无需外部计算框架,适用于简单任务。
- 原生集成:与 Pulsar 主题(Topic)无缝对接,延迟低。
- 内置管理:支持负载均衡、故障恢复。
- 支持多种语言:可用 Java、Python、Go 编写。
Pulsar Functions 编程模型
Pulsar Functions 的计算逻辑类似于 map-reduce,用户编写 Function(函数) 处理输入数据,并将结果写入另一个 Pulsar 主题。
示例:Java 版 Pulsar Function
public class MyFunction implements Function<String, String> {
@Override
public String process(String input, Context context) {
return input.toUpperCase(); // 处理逻辑:转换为大写
}
}
注册 Pulsar Function:
pulsar-admin functions create \
--tenant public --namespace default \
--name my-function \
--inputs persistent://public/default/input-topic \
--output persistent://public/default/output-topic \
--classname MyFunction \
--jar my-function.jar
(2)Pulsar IO
Pulsar IO 提供了开箱即用的 Source(数据源)和 Sink(数据输出)连接器,允许 Pulsar 作为数据流的中心,连接各种外部存储和计算系统。
常见 Source/Sink 连接器
类型 | 连接器示例 |
---|---|
数据库 | MySQL、PostgreSQL、MongoDB |
消息系统 | Kafka、RabbitMQ |
存储系统 | HDFS、S3、Elasticsearch |
计算引擎 | Flink、Spark |
示例:启动一个 Kafka Source 连接器
pulsar-admin sources create \
--name kafka-source \
--tenant public --namespace default \
--source-type kafka \
--destination-topic-name persistent://public/default/kafka-topic \
--source-config '{
"bootstrapServers": "kafka-broker:9092",
"topic": "source-topic"
}'
(3)Pulsar + Flink/Spark Streaming
Pulsar 也可作为 Flink / Spark Streaming 的流式数据源,支持复杂计算,如:
- 窗口计算(Tumbling, Sliding, Session Window)
- 聚合计算(sum, avg, count)
- 状态管理(Stateful Processing)
- 事件模式检测(CEP)
示例:Flink Pulsar 读取流数据
PulsarSource<String> source = PulsarSource.builder()
.setServiceUrl("pulsar://localhost:6650")
.setTopics("persistent://public/default/input-topic")
.setDeserializationSchema(SimpleStringSchema.class)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
stream.map(value -> value.toUpperCase()).print();
env.execute();
3. Pulsar 流处理的运行模式
Pulsar Functions 支持三种运行模式:
运行模式 | 说明 |
---|---|
本地模式(LocalRun) | 在本地测试和运行 Functions |
进程模式(Process) | 在 Pulsar Worker 进程中独立运行 |
Kubernetes 模式(K8s) | 在 Kubernetes 集群中运行 Pulsar Functions |
示例:在 Kubernetes 上运行 Pulsar Function
pulsar-admin functions create \
--name my-k8s-function \
--runtime JAVA \
--inputs persistent://public/default/input-topic \
--output persistent://public/default/output-topic \
--parallelism 3 \
--jar my-function.jar \
--kubernetes-namespace pulsar
4. Pulsar 流处理应用场景
(1)实时数据流处理
- 实时 ETL:流式数据清洗、转换,存入数据湖或数据仓库(Iceberg、Doris)。
- 用户行为分析:分析用户操作日志,计算热点数据。
(2)事件驱动架构(EDA)
- 金融风控:实时监控交易流,检测欺诈行为。
- IoT 监控:处理物联网传感器数据,异常报警。
(3)数据同步 & 数据管道
- CDC 数据同步:从 MySQL/PostgreSQL 读取变更数据,实时写入 Pulsar 供下游消费。
- 消息系统桥接:Kafka → Pulsar → Flink,实现高效流数据处理。
5. 总结
Pulsar 提供强大的流处理能力,主要包括:
- Pulsar Functions(轻量级流处理)
- Pulsar IO(数据连接器)
- Flink / Spark Streaming(复杂流计算)
- 多种运行模式(Local、Process、K8s)
Pulsar 适用于高吞吐、低延迟的流式数据处理场景,可用于数据管道、事件驱动架构、实时分析等领域。
如果你的应用场景需要 流处理 + 消息队列,Pulsar 是一个值得考虑的方案!🚀