Docker:安装Apache Pulsar 消息队列的详细指南
请关注微信公众号:拾荒的小海螺
博客地址:http://lsk-ww.cn/
1、简述
Apache Pulsar 是 Apache 软件基金会顶级项目,是一个 pub-sub (发布-订阅)模型的消息队列系统,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。
官网地址:https://pulsar.apache.org/
2、安装
利用Docker来安装和管理Pulsar可以简化部署和运维过程,使开发人员能够专注于应用开发。本文将详细介绍如何在Docker中安装Pulsar。
2.1 安装Pulsar
创建数据映射的目录:
mkdir -p /data/pulsar/data
mkdir -p /data/pulsar/conf
首先,我们需要从Docker Hub上拉取Pulsar的官方镜像。
docker pull apachepulsar/pulsar:2.7.2
我们可以使用以下命令来启动一个单节点的Pulsar容器:
docker run -itd \
--name pulsar-standalone \
-p 6650:6650 \
-p 8080:8080 \
-d --restart=always \
--mount source=pulsardata,target=/data/pulsar/data \
--mount source=pulsarconf,target=/data/pulsar/conf \
apachepulsar/pulsar:2.7.2 \
sh -c "bin/pulsar standalone > pulsar.log 2>&1 & \
sleep 30 && bin/pulsar-admin clusters update standalone \
--url http://pulsar-standalone:8080 \
--broker-url pulsar://pulsar-standalone:6650 & \
tail -F pulsar.log"
这将启动一个Pulsar standalone实例,并映射6650和8080端口到宿主机。6650端口用于Pulsar客户端连接,8080端口用于管理界面访问。
2.2 安装Pulsar Manager
Pulsar Manager 是一个基于 web 的 GUI 管理和监视工具,可帮助管理员和用户管理和监视租户、命名空间、主题、订阅、代理、集群等,并支持对多个环境进行动态配置。
首先,我们需要从Docker Hub上拉取Pulsar Manager 的官方镜像。
docker pull apachepulsar/pulsar-manager:v0.2.0
我们可以使用以下命令来启动Pulsar-Manager容器:
docker run -itd \
--name pulsar-manager \
-p 9527:9527 -p 7750:7750 \
-d --restart=always \
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
--link pulsar-standalone \
--entrypoint="" \
apachepulsar/pulsar-manager:v0.2.0 \
sh -c "sed -i '/^default.environment.name/ s|.*|default.environment.name=pulsar-standalone|' /pulsar-manager/pulsar-manager/application.properties & \
sed -i '/^default.environment.service_url/ s|.*|default.environment.service_url=http://pulsar-standalone:8080|' /pulsar-manager/pulsar-manager/application.properties & \
/pulsar-manager/entrypoint.sh & \
tail -F /pulsar-manager/pulsar-manager/pulsar-manager.log"
同时启动Pulsar Manager管理界面UI。通过创建sh 脚本来初始化超级管理员密码:
#!/bin/bash
# 初始化管理员账号
CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
-H "X-XSRF-TOKEN: $CSRF_TOKEN" \
-H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
-H 'Content-Type: application/json' \
-X PUT http://localhost:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "admin123", "description": "test", "email": "username@test.org"}'
访问localhost:9527来登录当前Pulsar管理平台:
账号:admin
密码:admin123
2.3 指令
通过Docker exec指令进入pulsar容器:
docker exec -it pulsar-standalone /bin/bash
查看系统有哪些租户(public 是系统默认的租户)
root@758c28190466:/pulsar# ./bin/pulsar-admin tenants list
"public"
"sample"
创建租户:
root@758c28190466:/pulsar# ./bin/pulsar-admin tenants create myshop
删除租户:
root@758c28190466:/pulsar# ./bin/pulsar-admin tenants delete myshop
查看指定租户下边的命名空间:
root@758c28190466:/pulsar# ./bin/pulsar-admin namespaces list myshop
创建指定租户命名空间:
root@758c28190466:/pulsar# ./bin/pulsar-admin namespaces create myshop/my-namespace
删除指定租户命名空间:
root@758c28190466:/pulsar# ./bin/pulsar-admin namespaces delete myshop/my-namespace
3、集成
Apache Pulsar 是一个高性能的分布式消息流平台,广泛用于消息传递和事件流处理。Spring Boot 提供了简化的开发方式,使得集成 Pulsar 变得更加容易。以下介绍如何在 Spring Boot 项目中集成 Pulsar,并提供实际使用的示例。
3.1 添加 Pulsar 依赖
在 pom.xml 中添加 Pulsar 的 Maven 依赖:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
3.2 配置 Pulsar 客户端
在 application.properties 文件中添加 Pulsar 服务的相关配置:
pulsar.service-url=pulsar://localhost:6650
pulsar.topic=my-topic
3.3 创建 Pulsar 配置类
创建一个 PulsarConfig 类,用于配置 Pulsar 客户端:
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class PulsarConfig {
@Value("${pulsar.service-url}")
private String serviceUrl;
@Value("${pulsar.topic}")
private String topic;
@Bean
public PulsarClient pulsarClient() throws Exception {
return PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
}
@Bean
public Producer<String> pulsarProducer(PulsarClient pulsarClient) throws Exception {
return pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();
}
}
3.4 创建消息生产者服务
创建一个 PulsarProducerService 类,用于发送消息:
import org.apache.pulsar.client.api.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class PulsarProducerService {
@Autowired
private Producer<String> producer;
public void sendMessage(String message) {
try {
producer.send(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.4 创建消息消费者服务
创建一个 PulsarConsumerService 类,用于接收消息:
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service
public class PulsarConsumerService {
@Value("${pulsar.service-url}")
private String serviceUrl;
@Value("${pulsar.topic}")
private String topic;
@Value("${pulsar.subscription}")
private String subscription;
@PostConstruct
public void startConsumer() throws Exception {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subscription)
.subscribe();
new Thread(() -> {
while (true) {
try {
Message<String> msg = consumer.receive();
System.out.println("Received message: " + msg.getValue());
consumer.acknowledge(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
3.5 创建 REST 控制器
创建一个 PulsarController 类,通过 REST 接口发送消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/pulsar")
public class PulsarController {
@Autowired
private PulsarProducerService producerService;
@PostMapping("/send")
public void sendMessage(@RequestParam String message) {
producerService.sendMessage(message);
}
}
3.6 测试应用
使用 curl 或 Postman 测试发送消息的接口:
curl -X POST "http://localhost:8080/api/pulsar/send?message=Hello+Pulsar"
在控制台上,你应该能够看到消费者接收到的消息:
Received message: Hello Pulsar
4、优势
4.1 优点
- 快速部署:使用Docker可以在几分钟内启动Pulsar实例,极大简化了安装流程。
- 隔离性:Docker容器提供了良好的隔离性,避免了与宿主机环境的冲突。
- 可移植性:Docker镜像可以在任何支持Docker的平台上运行,提高了应用的可移植性。
- 易于管理:通过Docker Compose等工具,可以方便地管理和扩展Pulsar集群。
4.2 缺点
- 资源开销:运行在Docker中的Pulsar实例会有一定的资源开销,可能会影响性能。
- 网络性能:由于Docker的网络虚拟化,网络性能可能会有所下降。
- 复杂性:对于需要集群部署的场景,Docker的配置和管理会变得更加复杂。
5、总结
通过Docker安装Apache Pulsar可以极大简化部署和管理流程,使开发人员能够更专注于业务逻辑的实现。虽然Docker化的Pulsar实例在某些方面存在性能和资源开销的问题,但其快速部署和良好的隔离性仍然使其成为开发和测试环境中的理想选择。希望本文对你在Docker中安装和使用Pulsar有所帮助。
如果你有任何问题或建议,欢迎在评论区留言讨论。