Pulsar消息服务之Java工具类
Pulsar介绍
Pulsar是一种多租户、高性能的服务器到服务器消息传递解决方案。Pulsar最初由雅虎开发,目前由Apache软件基金会管理。
官方网站
https://pulsar.apache.org/
目的
基于Pulsar客户端jar包,用Java开发MQ消息发送与接收工具类;
JDK
17+
pom.xml
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
<version>3.3.2</version>
</dependency>
PulsarMqUtils.java
package com.example;
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* @Description Pulsar MQ服务生产与消费客户端工具类
* @Version V1.0
*/
public class PulsarMqUtils {
private static Logger logger = LoggerFactory.getLogger(PulsarMqUtils.class);
/**
* pulsar服务地址
*/
private static final String SERVER_URL = "pulsar://%s:%s";
/**
* topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,示例:persistent://public/space/topic
* 注意:
* persistent 表示持久化主题,无消费ack确认的消息会被broker持久化存储在节点磁盘中;
* non-persistent 表示非持久化主题,消息将在生产者确认后存储在内存中,一旦服务broker关闭或重启、或主题无任何订阅者(消费者)则消息丢失。
*/
private static final String TOPIC_NAME = "persistent://%s/%s/%s";
private PulsarClient client;
private PulsarMqUtils() {}
/**
* 创建Pulsar工具类对象
* @param host 主机
* @param port 端口
* @return
* @throws PulsarClientException
*/
public static PulsarMqUtils buildServer(String host, int port) throws PulsarClientException {
return new PulsarMqUtils().createPulsarClient(host, port);
}
/**
* 创建Pulsar工具类对象
* @param serviceUrl 示例:pulsar://localhost:6650,多个服务示列:pulsar://localhost:6550,localhost:6651
* @return
* @throws PulsarClientException
*/
public static PulsarMqUtils buildServer(String serviceUrl) throws PulsarClientException {
return new PulsarMqUtils().createPulsarClient(serviceUrl);
}
/**
* 创建Pulsar客户端连接
* @param host 主机
* @param port 端口
* @return
* @throws PulsarClientException
*/
private PulsarMqUtils createPulsarClient(String host, int port) throws PulsarClientException {
return createPulsarClient(String.format(SERVER_URL, host, port));
}
/**
* 创建Pulsar客户端连接
* @param serviceUrl
* @return
* @throws PulsarClientException
*/
private PulsarMqUtils createPulsarClient(String serviceUrl) throws PulsarClientException {
client = PulsarClient.builder()
//pulsar服务地址
.serviceUrl(serviceUrl)
// 如果超过60秒未使用链接,则释放连接
.connectionMaxIdleSeconds(60)
// 超时连接5秒
.connectionTimeout(5, TimeUnit.SECONDS)
// 每个代理连接最大并发请求数(默认5000),按需调整以防占用pulsar资源过高
.maxConcurrentLookupRequests(1000)
.build();
return this;
}
/**
* 创建Pulsar客户端指定(租户/命名空间/主题)生产者对象
* @param tenant
* @param namespace
* @param topic
* @param enableBatching
* @return
* @throws PulsarClientException
*/
public Producer<byte[]> newProducer(String tenant, String namespace, String topic, boolean enableBatching) throws PulsarClientException {
return newProducer(String.format(TOPIC_NAME, tenant, namespace, topic), enableBatching);
}
/**
* 创建Pulsar客户端指定(租户/命名空间/主题)生产者对象
* @param topicName
* @param enableBatching
* @return
* @throws PulsarClientException
*/
private Producer<byte[]> newProducer(String topicName, boolean enableBatching) throws PulsarClientException {
return client.newProducer(Schema.BYTES)
.topic(topicName)
// 消息发送超时时间3s(默认值:30秒);设置为零将使超时设置为无穷大,对重复数据删除功能时非常有用
.sendTimeout(0, TimeUnit.SECONDS)
// 启用了批处理消息,一次性产生多个消息
.enableBatching(enableBatching)
// 批次模式DEFAULT | KEY_BASED
.batcherBuilder(BatcherBuilder.DEFAULT)
// 默认值为1ms,如果设置为非零值,消息将排队,直到此时间间隔到期
.batchingMaxPublishDelay(5, TimeUnit.MILLISECONDS)
// 批处理中的最大消息数(默认值为1000)
.batchingMaxMessages(1000)
.create();
}
/**
* 创建Pulsar客户端指定(租户/命名空间/主题)消者者对象
* @param topicNames
* @param subscriptionName
* @param properties
* @param messageListener
* @return
* @throws PulsarClientException
*/
private Consumer<byte[]> newConsumer(String [] topicNames, String subscriptionName, Map<String, String> properties, MessageListener<byte[]> messageListener) throws PulsarClientException {
return client.newConsumer(Schema.BYTES)
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic
.topic(topicNames)
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName(subscriptionName)
// 声明消费模式为共享模式
// Exclusive:独占模式(默认),此subscriptionName只能被一个客户端消费,有其它客户端订阅同名则会抛错
// Shared: 多用户共享模式,客户端之间的循环分配,不保证消费顺序
// Failover: 故障切换模,多个客户端连接,只能其中一个进行消费,待客户端产生故障后自动分配一个可用的新消费者
// Key_Shared: 多个用户将能够使用相同的订阅,消息发给指定密钥的消费者
.subscriptionType(SubscriptionType.Shared)
// 订阅相关参数,tag订阅等
.subscriptionProperties(properties)
// Earliest 配置从最早开始消费,否则可能会消费不到历史消息
// Latest 配置从最新开始消费,会丢失已到达的历史消息
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
// 监听消息
.messageListener(messageListener)
// ack确认超时
.ackTimeout(5, TimeUnit.SECONDS)
// 检查ack确认超时的间隔
.ackTimeoutTickTime(2, TimeUnit.SECONDS)
// 消费消息ack否定确认后,延迟指定时间后,重新投递回broker,再重新分配消费
.negativeAckRedeliveryDelay(5, TimeUnit.SECONDS)
// 设置批量接收消息策略
.batchReceivePolicy(BatchReceivePolicy.builder()
// 接收消息数量
.maxNumMessages(100)
// 接收消息大小
.maxNumBytes(10 * 1024 * 1024)
// 等待超时
.timeout(200, TimeUnit.MILLISECONDS)
.build())
.subscribe();
}
/**
* 生产者发送批量消息(发送完即关闭对象)
* @param tenant
* @param namespace
* @param topic
* @param messages
* @param properties
* @throws PulsarClientException
*/
public void sendBatchMq(String tenant, String namespace, String topic, List<String> messages, Map<String, String> properties) throws PulsarClientException {
Producer<byte[]> producer = newProducer(tenant, namespace, topic, true);
for (String message : messages) {
send(producer, message, properties, 0);
}
closeProducer(producer);
}
/**
* 生产者发送消息(发送完即关闭对象)
* @param tenant
* @param namespace
* @param topic
* @param message
* @param properties
* @param delayTime
* @throws PulsarClientException
*/
public void sendMq(String tenant, String namespace, String topic, String message, Map<String, String> properties, long delayTime) throws PulsarClientException {
Producer<byte[]> producer = newProducer(tenant, namespace, topic, false);
sendMq(producer, message, properties, delayTime);
closeProducer(producer);
}
/**
* 生产者发送消息(需手动关闭生产者对象)
* @param producer
* @param message
* @param properties
* @param delayTime
* @throws PulsarClientException
*/
public void sendMq(Producer<byte[]> producer, String message, Map<String, String> properties, long delayTime) throws PulsarClientException {
send(producer, message, properties, delayTime);
}
/**
* 生产者发送消息核心方法
* @param producer
* @param message
* @param properties
* @param delayTime
* @throws PulsarClientException
*/
private void send(Producer<byte[]> producer, String message, Map<String, String> properties, long delayTime) throws PulsarClientException {
MessageId messageId = producer.newMessage()
.properties(properties)
// 为消息设置密钥
//.key("")
.value(message.getBytes(StandardCharsets.UTF_8))
// deliverAt : 定时方法, deliverAfter:延时方法
// 延时消息的时长取值范围为0 - 864000秒(0秒 - 10天)。如10月1日12:00开始,最长可以设置864000秒。定时和延时消息在精度上会有1秒左右的偏差。
// 延时消息的消费模式仅支持使用 Shared 模式进行消费,否则会失去延时效果(Key-shared 也不支持)。
// 设定定时时间后,TTL 的时间依旧会从发送消息的时间点开始算消息的最长保留时间;要确保 TTL 的时间要大于延时的时间,否则 TTL 到期时,消息会被删除。
.deliverAfter(delayTime, TimeUnit.SECONDS)
.send();
logger.info("pulsar client sened message! id : {}", messageId);
}
/**
* 消费者(订阅者)接收Pulsar服务主题下的消息
* @param tenant
* @param namespace
* @param topic
* @param subscriptionName
* @param properties
* @param headerInterface
* @throws IOException
*/
public void receiveMq(String tenant, String namespace, String topic, String subscriptionName, Map<String, String> properties, HeaderInterface headerInterface) throws IOException {
receiveMq(new String[] {String.format(TOPIC_NAME, tenant, namespace, topic)}, subscriptionName, properties, headerInterface);
}
/**
* 消费者(订阅者)接收Pulsar服务主题下的消息
* @param topics 多个主题:"persistent://public/space/topic1,persistent://public/space/topic2,..."
* @param subscriptionName
* @param properties
* @param headerInterface
* @throws IOException
*/
public void receiveMq(String topics, String subscriptionName, Map<String, String> properties, HeaderInterface headerInterface) throws IOException {
receiveMq(topics.split(","), subscriptionName, properties, headerInterface);
}
/**
* 消费者(订阅者)接收Pulsar服务主题下的消息
* @param topicNames 多个主题:["persistent://public/space/topic1","persistent://public/space/topic2",...]
* @param subscriptionName
* @param properties
* @param headerInterface
* @throws IOException
*/
public void receiveMq(String [] topicNames, String subscriptionName, Map<String, String> properties, HeaderInterface headerInterface) throws IOException {
MessageListener<byte[]> messageListener = (consumer, message) -> {
try {
// 回调逻辑方法
headerInterface.execute(message);
//ack应答消息已消费成功
consumer.acknowledge(message);
} catch (IOException ioe) {
logger.error("pulsar consumer receive message error! " + ioe.getMessage(), ioe);
// ack应答消息消费失败
consumer.negativeAcknowledge(message);
}
};
Consumer<byte[]> consumer = newConsumer(topicNames, subscriptionName, properties, messageListener);
logger.info("pulsar consumer start listener message!");
addShutdownHook(consumer);
}
/**
* 关闭生产者对象
* @param producer
*/
public void closeProducer(Producer<byte[]> producer) {
if (producer != null && producer.isConnected()) {
try {
producer.flush();
producer.close();
logger.info("pulsar producer is close!");
} catch(PulsarClientException e) {
logger.error("pulsar producer close error!", e);
}
}
}
/**
* 关闭消费者对象
* @param consumer
*/
private void closeConsumer(Consumer<byte[]> consumer) {
if (consumer != null && consumer.isConnected()) {
try {
consumer.close();
logger.info("pulsar consumer is close!");
} catch(PulsarClientException e) {
logger.error("pulsar consumer close error!", e);
}
}
}
/**
* 关闭客户端连接
*/
public void close() {
if (client != null && !client.isClosed()) {
try {
client.close();
logger.info("pulsar client is close!");
} catch(PulsarClientException e) {
logger.error("pulsar client close error!", e);
}
}
}
/**
* 函数式回调接口
*/
@FunctionalInterface
interface HeaderInterface{
void execute(Message<byte[]> message) throws IOException;
}
/**
* 注册虚拟机器关机钩子事件
* @param consumer
*/
private void addShutdownHook(Consumer<byte[]> consumer){
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("jvm shutdown hook: close pulsar consumer server ...");
closeConsumer(consumer);
close();
}));
}
/**
* 示例
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
// 配置生产者/消费者相关参数
HashMap<String, String> properties = new HashMap<>();
properties.put("tag1","1");
// PulsarMqUtils pulsarMqUtils = PulsarMqUtils.buildServer("localhost", 6650);
PulsarMqUtils pulsarMqUtils = PulsarMqUtils.buildServer("pulsar://localhost:6650");
//生产消息(延迟10s)
pulsarMqUtils.sendMq("public", "default", "topic1", "test Message1, time:" + System.currentTimeMillis(), properties, 10L);
//生产消息
pulsarMqUtils.sendMq("public", "default", "topic2", "test Message2, time:" + System.currentTimeMillis(), properties, 0L);
//生产批量消息
// List<String> messages = new ArrayList<>();
// for (int i=0;i< 1000; i++) {
// messages.add("test Message, time:" + System.currentTimeMillis() + "," + i);
// }
// pulsarMqUtils.sendBatchMq("public", "default", "topic1", messages, properties);
//消费消息
// pulsarMqUtils.receiveMq("public", "default", "topic1", "topic_sub1", properties, (message) -> {
// logger.info("message:{}" , new String(message.getValue(), StandardCharsets.UTF_8));
// });
//多个主题订阅消费消息
String topics = "persistent://public/default/topic1,persistent://public/default/topic2";
pulsarMqUtils.receiveMq(topics, "topic_sub1", properties, (message) -> {
logger.info("message:{}" , new String(message.getValue(), StandardCharsets.UTF_8));
});
//关闭客户端
// pulsarMqUtils.close();
}
}
基于Pulsar客户端jar实现Java创建生产者与消费者操作工具类,支持发送mq消息、监听消费mq消息,简单易用引入到项目代码中使用;
此Java工具类有经过实战与测试,但难免有不足之处,欢迎交流与指正;