ActiveMQ实战指南:实现发布/订阅(publish-subscribe)消息发送!
发布/订阅(publish-subscribe)消息发送
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点消息发送的方式不同,发布到 topic 的消息会被所有订阅者消费。
发布/订阅的消息模型
发布/订阅消息发送演示过程
1、消息生产者
在生产者工程 producer 里新建一个类 TopicSender 用于发布消息
package producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 发布/订阅(publish-subscribe)消息发送
*
* @author JPM
*/
public class TopicSender {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 创建消息的目的地,createTopic表示创建的是主题消息
Destination destination = session.createTopic("topic_01");
MessageProducer producer = session.createProducer(destination);
TextMessage textMessage = session
.createTextMessage("hello,topic_01!");
producer.send(textMessage);
session.commit();
session.close();
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
2、运行消息生产者类,查看 ActiveMQ 管理界面
运行消息生产者类 TopicSender,向名称为 “topic_01”的 Topic里发送一条消息 “hello,topic_01!” ,打开 ActiveMQ 管理界面,如下:
3、消息消费者
在消费者工程 consumer 里新建二个消费者类 TopicReceiver1 和 TopicReceiver2 用于订阅消息,TopicReceiver2 的代码与 TopicReceiver1 的代码类似,下面给出 TopicReceiver1 的代码:
package consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 发布/订阅(publish-subscribe)消息接收者1
*
* @author JPM
*/
public class TopicReceiver1 {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
// 设置客户端手动签收消息
Session session = connection.createSession(Boolean.FALSE,
Session.CLIENT_ACKNOWLEDGE);
// 创建消息的目的地,createTopic表示创建的是主题消息
Destination destination = session.createTopic("topic_01");
MessageConsumer consumer = session.createConsumer(destination);
TextMessage textMessage = (TextMessage) consumer.receive();
textMessage.acknowledge(); // 确认消息
System.out.println("TopicReceiver1--->" + textMessage.getText());
session.close();
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
4、运行2个消费者,查看 ActiveMQ 管理界面
注意,发布订阅模式默认需要首先运行消费者,然后开启生产者,这样才能消费者才能订阅到生产者发布的消息。
分别启动2个消费者 TopicReceiver1 和 TopicReceiver2,管理界面如下:
然后启动生产者 TopicSender,TopicReceiver1 和 TopicReceiver2 的控制台如下:
TopicReceiver1--->hello,topic_01!
TopicReceiver2--->hello,topic_01!
eclipse切换控制台的技巧
消息消费完成后,管理界面显示如下:
说明:
发布订阅的模式默认的情况下,消息的内容不存储在服务器上,当生产者发送了一个消息,如果消费者之前没有订阅,那么该消费者就获取不到这个消息。
点对点的模式默认的情况下,消息存储在服务器上,消费者随时来取,但是只能有一个消费者获取到这个消息。