011 rocketmq过滤消息
文章目录
- 过滤消息
- TAG模式过滤
- FilterByTagProducer.java
- FilterByTagConsumer.java
- SQL表达式过滤
- FilterBySQLProducer.java
- FilterBySQLConsumer.java
- 类过滤模式(基于4.2.0版本)
过滤消息
消息过滤包括基于表达式过滤与基于类模式两种过滤模式。其中表达式过滤⼜分为TAG和SQL92模式
TAG模式过滤
发送消息时我们会为每⼀条消息设置TAG标签,同⼀⼤类中的消息放在⼀个主题TOPIC下,但是如果
进⾏分类我们则可以根据TAG进⾏分类,每⼀类消费者可能不是关系某个主题下的所有消息,我们就可
以通过TAG进⾏过滤,订阅关注的某⼀类数据。
FilterByTagProducer.java
package com.example.rocketmq.demo.filter;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
//通过TAG 实现 过滤消息
public class FilterByTagProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
String[] tags = {"TAGA","TAGB","TAGC"};
for (int i = 0; i < 10; i++) {
String tag = tags[i%tags.length];
//每个消息设置一个tag,tag 二级分类
Message msg = new Message("TopicTest",tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
FilterByTagConsumer.java
package com.example.rocketmq.demo.filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class FilterByTagConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
//订阅Topic,只订阅标签为A或B的消息
consumer.subscribe("TopicTest", "TAGA || TAGB");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
SQL表达式过滤
SQL92表达式消息过滤,是通过消息的属性运⾏SQL过滤表达式进⾏条件匹配,消息发送时需要设置⽤户的属性putUserProperty⽅法设置属性。
支持的语法:
- 数值⽐较, 如 > , >= , < , <= , BETWEEN , = ;
- 字符⽐较, 如 = , <> , IN ;
- IS NULL or IS NOT NULL ;
- 逻辑连接符 AND , OR , NOT ;
支持的类型:
- 数值型, 如123, 3.1415;
- 字符型, 如 ‘abc’, 必须⽤单引号;
- NULL , 特殊常数;
- 布尔值, TRUE or FALSE ;
FilterBySQLProducer.java
package com.example.rocketmq.demo.filter;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class FilterBySQLProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] tags = {"TagA","TagB","TagC","TagD"};
for (int i = 0; i < 10; i++) {
try {
String tag = tags[i % tags.length];
//构建消息
Message msg = new Message("TopicTest" /* Topic */,
tag /* Tag */,
("RocketMQ消息测试,消息的TAG="+tag+ ", 属性 age = " + i + ", == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//每个消息设置属性为age,age值为0-9
msg.putUserProperty("age", i+"");
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
// Thread.sleep(1000);
}
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
FilterBySQLConsumer.java
package com.example.rocketmq.demo.filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
public class FilterBySQLConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
//订阅Topic
consumer.subscribe("TopicTest", MessageSelector.bySql("age between 0 and 6"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer===启动成功!");
}
}
类过滤模式(基于4.2.0版本)
RocketMQ通过定义消息过滤类的接⼝实现消息过滤