当前位置: 首页 > article >正文

操作018:Stream Queue

文章目录

  • 操作018:Stream Queue
  • 一、启用插件
  • 二、负载均衡
  • 三、Java代码
    • 1、引入依赖
    • 2、创建Stream
      • ①代码方式创建
      • ②ManagementUI创建
    • 3、生产者端程序
      • ①内部机制说明
        • [1]官方文档
        • [2]解析
        • [3]配置
      • ②示例代码
    • 4、消费端程序
  • 四、指定偏移量消费
    • 1、偏移量
    • 2、官方文档说明
    • 3、指定Offset消费
    • 4、对比

操作018:Stream Queue

一、启用插件

说明:只有启用了Stream插件,才能使用流式队列的完整功能

在集群每个节点中依次执行如下操作:

# 启用Stream插件
rabbitmq-plugins enable rabbitmq_stream

# 重启rabbit应用
rabbitmqctl stop_app
rabbitmqctl start_app

# 查看插件状态
rabbitmq-plugins list

在这里插入图片描述

二、负载均衡

在文件/etc/haproxy/haproxy.cfg末尾追加:

frontend rabbitmq_stream_frontend
bind 192.168.200.100:33333
mode tcp
default_backend rabbitmq_stream_backend

backend rabbitmq_stream_backend
mode tcp
balance roundrobin
server rabbitmq1 192.168.200.100:5552 check
server rabbitmq2 192.168.200.150:5552 check
server rabbitmq3 192.168.200.200:5552 check

三、Java代码

1、引入依赖

Stream 专属 Java 客户端官方网址:https://github.com/rabbitmq/rabbitmq-stream-java-client


Stream 专属 Java 客户端官方文档网址:https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>stream-client</artifactId>
        <version>0.15.0</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.2.3</version>
    </dependency>
</dependencies>

2、创建Stream

说明:不需要创建交换机

①代码方式创建

Environment environment = Environment.builder()
        .host("192.168.200.100")
        .port(33333)
        .username("atguigu")
        .password("123456")
        .build();

environment.streamCreator().stream("stream.atguigu.test2").create();

environment.close();

②ManagementUI创建

在这里插入图片描述

3、生产者端程序

①内部机制说明

[1]官方文档

Internally, the Environment will query the broker to find out about the topology of the stream and will create or re-use a connection to publish to the leader node of the stream.

翻译:

在内部,Environment将查询broker以了解流的拓扑结构,并将创建或重用连接以发布到流的 leader 节点。

[2]解析
  • 在 Environment 中封装的连接信息仅负责连接到 broker
  • Producer 在构建对象时会访问 broker 拉取集群中 Leader 的连接信息
  • 将来实际访问的是集群中的 Leader 节点
  • Leader 的连接信息格式是:节点名称:端口号

在这里插入图片描述

[3]配置

为了让本机的应用程序知道 Leader 节点名称对应的 IP 地址,我们需要在本地配置 hosts 文件,建立从节点名称到 IP 地址的映射关系

在这里插入图片描述

②示例代码

Environment environment = Environment.builder()
        .host("192.168.200.100")
        .port(33333)
        .username("atguigu")
        .password("123456")
        .build();

Producer producer = environment.producerBuilder()
        .stream("stream.atguigu.test")
        .build();

byte[] messagePayload = "hello rabbit stream".getBytes(StandardCharsets.UTF_8);

CountDownLatch countDownLatch = new CountDownLatch(1);

producer.send(
        producer.messageBuilder().addData(messagePayload).build(),
        confirmationStatus -> {
            if (confirmationStatus.isConfirmed()) {
                System.out.println("[生产者端]the message made it to the broker");
            } else {
                System.out.println("[生产者端]the message did not make it to the broker");
            }

            countDownLatch.countDown();
        });

countDownLatch.await();

producer.close();

environment.close();

4、消费端程序

Environment environment = Environment.builder()
        .host("192.168.200.100")
        .port(33333)
        .username("atguigu")
        .password("123456")
        .build();

environment.consumerBuilder()
        .stream("stream.atguigu.test")
        .name("stream.atguigu.test.consumer")
        .autoTrackingStrategy()
        .builder()
        .messageHandler((offset, message) -> {
            byte[] bodyAsBinary = message.getBodyAsBinary();
            String messageContent = new String(bodyAsBinary);
            System.out.println("[消费者端]messageContent = " + messageContent + " Offset=" + offset.offset());
        })
        .build();

四、指定偏移量消费

1、偏移量

在这里插入图片描述

2、官方文档说明

The offset is the place in the stream where the consumer starts consuming from. The possible values for the offset parameter are the following:

  • OffsetSpecification.first(): starting from the first available offset. If the stream has not been truncated, this means the beginning of the stream (offset 0).
  • OffsetSpecification.last(): starting from the end of the stream and returning the last chunk of messages immediately (if the stream is not empty).
  • OffsetSpecification.next(): starting from the next offset to be written. Contrary to OffsetSpecification.last(), consuming with OffsetSpecification.next() will not return anything if no-one is publishing to the stream. The broker will start sending messages to the consumer when messages are published to the stream.
  • OffsetSpecification.offset(offset): starting from the specified offset. 0 means consuming from the beginning of the stream (first messages). The client can also specify any number, for example the offset where it left off in a previous incarnation of the application.
  • OffsetSpecification.timestamp(timestamp): starting from the messages stored after the specified timestamp. Note consumers can receive messages published a bit before the specified timestamp. Application code can filter out those messages if necessary.

3、指定Offset消费

Environment environment = Environment.builder()
        .host("192.168.200.100")
        .port(33333)
        .username("atguigu")
        .password("123456")
        .build();

CountDownLatch countDownLatch = new CountDownLatch(1);

Consumer consumer = environment.consumerBuilder()
        .stream("stream.atguigu.test")
        .offset(OffsetSpecification.first())
        .messageHandler((offset, message) -> {
            byte[] bodyAsBinary = message.getBodyAsBinary();
            String messageContent = new String(bodyAsBinary);
            System.out.println("[消费者端]messageContent = " + messageContent);
            countDownLatch.countDown();
        })
        .build();

countDownLatch.await();

consumer.close();

4、对比

  • autoTrackingStrategy 方式:始终监听Stream中的新消息(狗狗看家,忠于职守)
  • 指定偏移量方式:针对指定偏移量的消息消费之后就停止(狗狗叼飞盘,叼回来就完)

http://www.kler.cn/a/464392.html

相关文章:

  • ES6中定义私有属性详解
  • IEEE PDF eXpress遇到Font TimesNewRomanPSMT is not embedded的解决方案
  • 我的桌面 1.9.75 | 个性化定制手机桌面,丰富的小组件和主题
  • 17爬虫:关于DrissionPage相关内容的学习01
  • 深入浅出 Beam Search:自然语言处理中的高效搜索利器
  • Unity UGUI使用技巧与经验总结(不定期更新)
  • 【2025优质学术推荐】征稿控制科学、仪器、智能系统、通信、计算机、电子信息、人工智能、大数据、机器学习、软件工程、网络安全方向
  • Leetcode打卡:分割数组
  • 使用 Python结合ffmpeg 实现单线程和多线程推流
  • 婚庆摄影小程序ssm+论文源码调试讲解
  • UE5.3 虚幻引擎 Windows插件开发打包(带源码插件打包、无源码插件打包)
  • 神经网络入门实战:(二十三)使用本地数据集进行训练和验证
  • Qt使用CMake编译项目时报错:#undefined reference to `vtable for MainView‘
  • 网络安全 | 量子计算与网络安全:未来的威胁与机遇
  • 量子计算:定义、使用方法和示例
  • UE4.27 Android环境下获取手机电量
  • Tushare提示本接口即将停止更新,请尽快使用Pro版接口:https://tushare.pro/document/2
  • 从0入门自主空中机器人-4-【PX4与Gazebo入门】
  • 【大模型系列 02】LLM大模型基础知识
  • 第12关:博客系统之删除评论
  • 汇编语言:从键盘输入数字字符,(计算阶乘),以无符号十进制形式输出(分支、循环程序)
  • Wend看源码-Java.util 工具类学习(上)
  • CertiK《Hack3d:2024年度安全报告》(附报告全文链接)
  • 【Java 学习】Comparable接口 和 Comparator接口,掌控排序逻辑解析,深入 Comparable 和 Comparator 的优雅切换
  • linux进阶
  • Kafka优势剖析-分布式架构