当前位置: 首页 > article >正文

ActiveMQ实战指南:实现发布/订阅(publish-subscribe)消息发送!

发布/订阅(publish-subscribe)消息发送

消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点消息发送的方式不同,发布到 topic 的消息会被所有订阅者消费。

发布/订阅的消息模型

984302f6bbf9ff6c58d44d7794729303.png

发布/订阅消息发送演示过程

1、消息生产者

在生产者工程 producer 里新建一个类 TopicSender 用于发布消息

package producer;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnectionFactory;


/**
 * 发布/订阅(publish-subscribe)消息发送
 * 
 * @author JPM
 */
public class TopicSender {
  public static void main(String[] args) {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
        "tcp://localhost:61616");
    Connection connection = null;
    try {
      connection = connectionFactory.createConnection();
      connection.start();
      Session session = connection.createSession(Boolean.TRUE,
          Session.AUTO_ACKNOWLEDGE);
      // 创建消息的目的地,createTopic表示创建的是主题消息
      Destination destination = session.createTopic("topic_01");
      MessageProducer producer = session.createProducer(destination);
      TextMessage textMessage = session
          .createTextMessage("hello,topic_01!");
      producer.send(textMessage);
      session.commit();
      session.close();
    } catch (JMSException e) {
      e.printStackTrace();
    } finally {
      if (connection != null) {
        try {
          connection.close();
        } catch (JMSException e) {
          e.printStackTrace();
        }
      }
    }
  }
}

2、运行消息生产者类,查看 ActiveMQ 管理界面

运行消息生产者类 TopicSender,向名称为  “topic_01”的 Topic里发送一条消息 “hello,topic_01!” ,打开 ActiveMQ 管理界面,如下:

74a416903ba209ddf4639841e1920273.png

3、消息消费者

在消费者工程 consumer 里新建二个消费者类 TopicReceiver1 和 TopicReceiver2 用于订阅消息,TopicReceiver2 的代码与 TopicReceiver1 的代码类似,下面给出 TopicReceiver1 的代码:

package consumer;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnectionFactory;


/**
 * 发布/订阅(publish-subscribe)消息接收者1
 * 
 * @author JPM
 */
public class TopicReceiver1 {
  public static void main(String[] args) {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
        "tcp://localhost:61616");
    Connection connection = null;
    try {
      connection = connectionFactory.createConnection();
      connection.start();
            // 设置客户端手动签收消息
      Session session = connection.createSession(Boolean.FALSE,
          Session.CLIENT_ACKNOWLEDGE);
      // 创建消息的目的地,createTopic表示创建的是主题消息
      Destination destination = session.createTopic("topic_01");
      MessageConsumer consumer = session.createConsumer(destination);
      TextMessage textMessage = (TextMessage) consumer.receive();
      textMessage.acknowledge();  // 确认消息
      System.out.println("TopicReceiver1--->" + textMessage.getText());
      session.close();
    } catch (JMSException e) {
      e.printStackTrace();
    } finally {
      if (connection != null) {
        try {
          connection.close();
        } catch (JMSException e) {
          e.printStackTrace();
        }
      }
    }
  }
}

4、运行2个消费者,查看 ActiveMQ 管理界面

注意,发布订阅模式默认需要首先运行消费者,然后开启生产者,这样才能消费者才能订阅到生产者发布的消息。

分别启动2个消费者 TopicReceiver1 和 TopicReceiver2,管理界面如下:

a2dcc4967d5f0c2c88ffb406ab6da18c.png

然后启动生产者 TopicSender,TopicReceiver1 和 TopicReceiver2 的控制台如下:

TopicReceiver1--->hello,topic_01!

TopicReceiver2--->hello,topic_01!

eclipse切换控制台的技巧

2ec14711f1a13fe7634d9446fd65648b.png

消息消费完成后,管理界面显示如下:

9b47c13b9bf75d7e654108e184d56f09.png

说明:

发布订阅的模式默认的情况下,消息的内容不存储在服务器上,当生产者发送了一个消息,如果消费者之前没有订阅,那么该消费者就获取不到这个消息。

点对点的模式默认的情况下,消息存储在服务器上,消费者随时来取,但是只能有一个消费者获取到这个消息。

ActiveMQ 管理界面术语解释

e50d8334b306ff87f0c3b401627ceff2.png


http://www.kler.cn/a/288648.html

相关文章:

  • 从零开始:使用VSCode搭建Python数据科学开发环境
  • 如何在 Hive SQL 中处理复杂的数据类型?
  • STM32-笔记37-吸烟室管控系统项目
  • 【设计模式-2】23 种设计模式的分类和功能
  • OpenAI CEO 奥特曼发长文《反思》
  • [离线数仓] 总结二、Hive数仓分层开发
  • Unity Android 进阶之 【Android 添加一个启动动画】在Unity场景加载完之前,避免 【Unity 启动界面慢 黑屏时间长】的情况
  • 青远生态为云南林业规划院定制开发的自然保护地规划智能编制系统顺利通过验收
  • Golang | Leetcode Golang题解之第385题迷你语法分析器
  • Java图形用户界面之Applet设计
  • python django 使用教程
  • 使用 streamlink 把 m3u8 转为 mp4
  • 代码随想录 刷题记录-24 图论 (1)理论基础 、深搜与广搜
  • 保姆级Maven安装、配置、版本查询教程(包含配置本地仓库、阿里云私服、环境变量)
  • 射频指纹特征提取:揭秘无线通信设备的身份标识
  • 网络准入管理系统是什么?网络准入很重要,2024年国内外网络准入控制系统有哪些?(靠谱儿~)
  • filezilla使用教程(window下filezilla使用教程)
  • TF | SD 卡出现无法删除的文件,乱码文件该如何处理 macOS
  • 太速科技-基于Kintex-7 XC7K160T 的CameraLink转四路光纤数据转发卡(Full Camera Link图像转万兆以太网适配器 )
  • PostgreSQL + PostGIS:空间数据存储及管理解决方案
  • 【java入门】JDK的下载安装与配置,最新最详细教程!
  • vue3+vite+ts如何使用路由
  • pyecharts可视化数据大屏
  • 【Python】3.基础语法(3)函数
  • Python实现BASE64 算法
  • 网络安全售前入门09安全服务——安全加固服务