当前位置: 首页 > article >正文

Docker:安装Apache Pulsar 消息队列的详细指南

请关注微信公众号:拾荒的小海螺
博客地址:http://lsk-ww.cn/

1、简述

Apache Pulsar 是 Apache 软件基金会顶级项目,是一个 pub-sub (发布-订阅)模型的消息队列系统,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。

在这里插入图片描述

官网地址:https://pulsar.apache.org/

2、安装

利用Docker来安装和管理Pulsar可以简化部署和运维过程,使开发人员能够专注于应用开发。本文将详细介绍如何在Docker中安装Pulsar。

2.1 安装Pulsar

创建数据映射的目录:

mkdir -p /data/pulsar/data
mkdir -p /data/pulsar/conf

首先,我们需要从Docker Hub上拉取Pulsar的官方镜像。

docker pull apachepulsar/pulsar:2.7.2

我们可以使用以下命令来启动一个单节点的Pulsar容器:

docker run -itd \
   --name pulsar-standalone \
   -p 6650:6650 \
   -p 8080:8080 \
   -d --restart=always \
   --mount source=pulsardata,target=/data/pulsar/data \
   --mount source=pulsarconf,target=/data/pulsar/conf \
   apachepulsar/pulsar:2.7.2 \
   sh -c  "bin/pulsar standalone > pulsar.log 2>&1 & \
           sleep 30 && bin/pulsar-admin clusters update standalone \
           --url http://pulsar-standalone:8080 \
           --broker-url pulsar://pulsar-standalone:6650 & \
           tail -F pulsar.log"

这将启动一个Pulsar standalone实例,并映射6650和8080端口到宿主机。6650端口用于Pulsar客户端连接,8080端口用于管理界面访问。

2.2 安装Pulsar Manager

Pulsar Manager 是一个基于 web 的 GUI 管理和监视工具,可帮助管理员和用户管理和监视租户、命名空间、主题、订阅、代理、集群等,并支持对多个环境进行动态配置。

首先,我们需要从Docker Hub上拉取Pulsar Manager 的官方镜像。

docker pull apachepulsar/pulsar-manager:v0.2.0

我们可以使用以下命令来启动Pulsar-Manager容器:

docker run -itd \
   --name pulsar-manager \
   -p 9527:9527 -p 7750:7750 \
   -d --restart=always \
   -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
   --link pulsar-standalone \
   --entrypoint="" \
   apachepulsar/pulsar-manager:v0.2.0 \
   sh -c  "sed -i '/^default.environment.name/ s|.*|default.environment.name=pulsar-standalone|' /pulsar-manager/pulsar-manager/application.properties & \
           sed -i '/^default.environment.service_url/ s|.*|default.environment.service_url=http://pulsar-standalone:8080|' /pulsar-manager/pulsar-manager/application.properties & \
           /pulsar-manager/entrypoint.sh & \
           tail -F /pulsar-manager/pulsar-manager/pulsar-manager.log" 

同时启动Pulsar Manager管理界面UI。通过创建sh 脚本来初始化超级管理员密码:

#!/bin/bash
# 初始化管理员账号

CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
    -H "X-XSRF-TOKEN: $CSRF_TOKEN" \
    -H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
    -H 'Content-Type: application/json' \
    -X PUT http://localhost:7750/pulsar-manager/users/superuser \
    -d '{"name": "admin", "password": "admin123", "description": "test", "email": "username@test.org"}'

访问localhost:9527来登录当前Pulsar管理平台:

账号:admin
密码:admin123

在这里插入图片描述

2.3 指令

通过Docker exec指令进入pulsar容器:

docker exec -it pulsar-standalone /bin/bash

查看系统有哪些租户(public 是系统默认的租户)

root@758c28190466:/pulsar# ./bin/pulsar-admin tenants list
"public"
"sample"

创建租户:

root@758c28190466:/pulsar# ./bin/pulsar-admin tenants create myshop

删除租户:

root@758c28190466:/pulsar# ./bin/pulsar-admin tenants delete myshop

查看指定租户下边的命名空间:

root@758c28190466:/pulsar# ./bin/pulsar-admin namespaces list myshop

创建指定租户命名空间:

root@758c28190466:/pulsar# ./bin/pulsar-admin namespaces create myshop/my-namespace

删除指定租户命名空间:

root@758c28190466:/pulsar# ./bin/pulsar-admin namespaces delete myshop/my-namespace

在这里插入图片描述

3、集成

Apache Pulsar 是一个高性能的分布式消息流平台,广泛用于消息传递和事件流处理。Spring Boot 提供了简化的开发方式,使得集成 Pulsar 变得更加容易。以下介绍如何在 Spring Boot 项目中集成 Pulsar,并提供实际使用的示例。

3.1 添加 Pulsar 依赖

在 pom.xml 中添加 Pulsar 的 Maven 依赖:

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.8.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
3.2 配置 Pulsar 客户端

在 application.properties 文件中添加 Pulsar 服务的相关配置:

pulsar.service-url=pulsar://localhost:6650
pulsar.topic=my-topic
3.3 创建 Pulsar 配置类

创建一个 PulsarConfig 类,用于配置 Pulsar 客户端:

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class PulsarConfig {

    @Value("${pulsar.service-url}")
    private String serviceUrl;

    @Value("${pulsar.topic}")
    private String topic;

    @Bean
    public PulsarClient pulsarClient() throws Exception {
        return PulsarClient.builder()
                .serviceUrl(serviceUrl)
                .build();
    }

    @Bean
    public Producer<String> pulsarProducer(PulsarClient pulsarClient) throws Exception {
        return pulsarClient.newProducer(Schema.STRING)
                .topic(topic)
                .create();
    }
}

3.4 创建消息生产者服务

创建一个 PulsarProducerService 类,用于发送消息:

import org.apache.pulsar.client.api.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class PulsarProducerService {

    @Autowired
    private Producer<String> producer;

    public void sendMessage(String message) {
        try {
            producer.send(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
3.4 创建消息消费者服务

创建一个 PulsarConsumerService 类,用于接收消息:

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

@Service
public class PulsarConsumerService {

    @Value("${pulsar.service-url}")
    private String serviceUrl;

    @Value("${pulsar.topic}")
    private String topic;

    @Value("${pulsar.subscription}")
    private String subscription;

    @PostConstruct
    public void startConsumer() throws Exception {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl(serviceUrl)
                .build();

        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                .topic(topic)
                .subscriptionName(subscription)
                .subscribe();

        new Thread(() -> {
            while (true) {
                try {
                    Message<String> msg = consumer.receive();
                    System.out.println("Received message: " + msg.getValue());
                    consumer.acknowledge(msg);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
3.5 创建 REST 控制器

创建一个 PulsarController 类,通过 REST 接口发送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/pulsar")
public class PulsarController {

    @Autowired
    private PulsarProducerService producerService;

    @PostMapping("/send")
    public void sendMessage(@RequestParam String message) {
        producerService.sendMessage(message);
    }
}
3.6 测试应用

使用 curl 或 Postman 测试发送消息的接口:

curl -X POST "http://localhost:8080/api/pulsar/send?message=Hello+Pulsar"

在控制台上,你应该能够看到消费者接收到的消息:

Received message: Hello Pulsar

4、优势

4.1 优点
  • 快速部署:使用Docker可以在几分钟内启动Pulsar实例,极大简化了安装流程。
  • 隔离性:Docker容器提供了良好的隔离性,避免了与宿主机环境的冲突。
  • 可移植性:Docker镜像可以在任何支持Docker的平台上运行,提高了应用的可移植性。
  • 易于管理:通过Docker Compose等工具,可以方便地管理和扩展Pulsar集群。
4.2 缺点
  • 资源开销:运行在Docker中的Pulsar实例会有一定的资源开销,可能会影响性能。
  • 网络性能:由于Docker的网络虚拟化,网络性能可能会有所下降。
  • 复杂性:对于需要集群部署的场景,Docker的配置和管理会变得更加复杂。
    在这里插入图片描述

5、总结

通过Docker安装Apache Pulsar可以极大简化部署和管理流程,使开发人员能够更专注于业务逻辑的实现。虽然Docker化的Pulsar实例在某些方面存在性能和资源开销的问题,但其快速部署和良好的隔离性仍然使其成为开发和测试环境中的理想选择。希望本文对你在Docker中安装和使用Pulsar有所帮助。

如果你有任何问题或建议,欢迎在评论区留言讨论。


http://www.kler.cn/a/316083.html

相关文章:

  • Ceph 中PG与PGP的概述
  • Git在版本控制中的应用
  • python——面向对象
  • Python——NumPy库的简单用法,超级详细教程使用
  • #include<string>和#include<string.h>有什么区别
  • Elasticsearch 8.16:适用于生产的混合对话搜索和创新的向量数据量化,其性能优于乘积量化 (PQ)
  • Python 课程16-Pygame
  • LabVIEW软件维护的内容是什么呢?
  • [2025]基于微信小程序慢性呼吸系统疾病的健康管理(源码+文档+解答)
  • 【数据结构与算法 | 灵神题单 | 栈基础篇】力扣155, 1472, 1381
  • 微信小程序03-页面交互
  • vue3中使用iframe不成功的问题
  • 逻辑回归 和 支持向量机(SVM)比较
  • 【深入理解SpringCloud微服务】了解微服务的熔断、限流、降级,手写实现一个微服务熔断限流器
  • 【spring】引入 Jackson 依赖 对java对象序列号和反序列化
  • 基于单片机的智能温控风扇系统的设计
  • C语言实现冒泡排序
  • 在泰国旅游不会口语怎么办?求推荐翻译软件!!!
  • 网安新声 | 黎巴嫩BP机爆炸事件带来的安全新挑战与反思
  • 计算机毕业设计选题推荐-基于python+Django的全屋家具定制服务平台
  • Vue3实现类ChatGPT聊天式流式输出(vue-sse实现)
  • torch.embedding 报错 IndexError: index out of range in self
  • 数据结构之二叉树遍历
  • 【Linux系统编程】第二十一弹---进程的地址空间
  • 《概率论与数理统计》学渣笔记
  • uni-app功能 1. 实现点击置顶,滚动吸顶2.swiper一个轮播显示一个半内容且实现无缝滚动3.穿透修改uni-ui的样式