使用Redis防止重复发送RabbitMQ消息
问题
今天遇到一个问题,发送MQ
消息的时候需要保证不会重复发送,注意不是可靠到达(可靠到达可以通过消息确认机制和回调接口保证),这里保证的是不会生产多条一样的消息。
方法
综合讨论下来决定使用Redis
缓存来解决,因为相比于将记录插入数据库Redis
更为高效和便捷。
检验是否已经发送
在发送消息之前根据相关信息组合成key
去Redis
中查找,找到后检测值是否为存在并且是否为设定的值,若存在且与设定的值一样,则返回false
,说明该消息已经发送过了。
public boolean isSend(String messageType, Long bizId, int hashCode) {
// 根据消息类型、业务id和哈希值组合成key
String key = this.genKey(messageType, bizId, hashCode);
Long value = super.get(key);
if (value != null && value.equals(DEFAULT_VALUE)) {
return false;
}
return true;
}
/**get方法*/
public V get(K key) {
if (key == null) {
return null;
} else {
try {
// 在key前添加前缀和名字,并将原来的key进行json序列化
String realKey = this.genRealKey(key);
String content = (String)this.redisTemplate.opsForValue().get(realKey);
// 若get到的值不为null则进行json反序列化
return content == null ? null : this.valueSerializer.deserialize(content);
} catch (Exception e) {
CACHE.error("", key.toString(), "", "0", e);
return null;
}
}
}
以上就是检验消息是否重复的方法,需要注意的是JSON
序列化,因为Redis默认使用的是JDK
序列化,这种序列化后的内容不仅多而且不易于阅读,因此将其改为Json序列化。
发送后添加缓存
在发送消息的时候会先在Redis
中put
一个以相关信息组合为key
,value
为默认值的记录,过期时间为5min
。
public void sendMessage(String messageType, Long bizId, int hashCode) {
super.put(genKey(messageType, bizId, hashCode), DEFAULT_VALUE);
}
/**put方法*/
public void put(K key, V value) {
try {
if (key != null && null != value) {
// 进行json序列化
String content = this.valueSerializer.serialize(value);
this.redisTemplate.opsForValue().set(this.genRealKey(key), content, this.expires, this.timeUnit);
}
} catch (Throwable e) {
e.printStackTrace();
}
}
发送消息方法
最后的发送消息方法大致代码如下:
public void sendMQMessage(Long bizId, String messageTemplateCode, String msg, int msgHashCode, String exchange, String routingKey) {
//加入缓存
boolean send = true;
//String messageType = MessageTypeUtil.getMessageType(messageTemplateCode);
if (bizId != null) {
// 检测是否已经发送
send = sendMessageCache.isSend(messageTemplateCode, bizId, msgHashCode);
}
//发送mq消息
if (send) {
if (bizId != null) {
// 加入缓存
sendMessageCache.sendMessage(messageTemplateCode, bizId, msgHashCode);
}
// 发送消息
messageSender.send(exchange, routingKey, msg);
}
}