记 ==> 首次使用rabbitMQ优化项目
昨天刚学习完了rabbitMQ,刚好我的项目有个模块挺符合使用rabbitMQ进行异步处理的。
这个模块大概功能是:用户发送的所有帖子都会添加到他的发件箱,当有个新用户关注了他,他发件箱内所有的博客都会被添加到关注他的用户的收件箱里
比如:A关注B,A的收件箱内添加B的所有帖子
后续B再发帖子,直接推送到A的收件箱
有点类似抖音,当我们关注了一个up主,我们可以在关注列表里刷到这个up主的视频,并且是按新发布时间降序排序的
接下来看看我的代码,(ps:本人小白勿喷)
当用户关注一个博主后,利用redis的Zset集合完成feed流TimeLine模式的读扩散(拉模式),从博主的收件箱内拉取所有帖子到用户收件箱。当博主后续再新发帖子,才用写扩散(推模式)推送到收件箱
当用户取关后,从用户收件箱内移除掉被取关博主的帖子
我的理解是,这个接口的功能就是单纯的实现用户关注和取关,并不关注帖子的拉取和删除,并不该把这两个功能冗杂在一个方法里。
于是我便打算用rabbitMQ的directExchange模式进行消息发送,关注或者取关后把消息发布出去,不关注对帖子的拉取的行为,让监听者去慢慢的拉。
修改代码~~
把当前用户和被关注取关的用户ID一起当做消息发送出去
引入AMQP依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
yml内添加rabbitmq配置
更改序列化规则,后续消息使用map发送
写一个config,配置一下交互机,队列,并完成绑定
然后写两个方法分别接听拉取帖子和取消帖子这两个queue
拉取队列
取关队列
我把源码贴出来,献丑啦
package com.brrbaii.mqListener;
import com.brrbaii.dto.Result;
import com.brrbaii.dto.UserDTO;
import com.brrbaii.utils.UserHolder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Set;
import static com.brrbaii.common.MQConstants.*;
import static com.brrbaii.common.RedisConstants.*;
import static com.brrbaii.utils.SystemConstants.ALREADY_FOLLOW;
@Component
public class FollowListener {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 用户关注后,把被关注用户发件箱内的所有BLOG添加到当前用户的收件箱
* @param map:被关注的用户ID和当前用户ID
*/
@RabbitListener(queues = PULL_BLOG_QUEUE)
public void pullBlog2UserBox(Map<String,String> map){
if( map == null){
return;
}
String followerId = map.get("followedUserId");
String userId = map.get("userId");
//被关注的用户key
String followBox = FEED_USER_KEY + followerId;
//功能:关注后,把该用户的发件箱内数据拷贝到粉丝收件箱
//获取被关注用户所有博客--从0到当前时间就是获取全部,并按照降序排序
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(followBox, 0, System.currentTimeMillis());
//用户没有博客,不用获取
if (typedTuples==null || typedTuples.isEmpty()){
return;
}
//获取当前用户的key
String userBOx = FEED_PUSH_KEY + userId;
//把被关注用户的blog添加到“我”的收件箱
for(ZSetOperations.TypedTuple<String> tuple:typedTuples){
stringRedisTemplate.opsForZSet().add(
userBOx,
tuple.getValue(),
tuple.getScore()
);
}
}
/**
* 用户取关后,从用户的收件箱内清除被取关用户的BLOG
* @param map:被关注的用户ID和当前用户ID
*/
@RabbitListener(queues = PUSH_BLOG_QUEUE)
public void pushBlogFromUserBox(Map<String,String> map){
if( map == null){
return;
}
String followerId = map.get("followedUserId");
String userId = map.get("userId");
String cancelFollower = FEED_USER_KEY + followerId;
//获取被取关用户的所有blog,这里无需关注score
Set<String> cancelFollowerBox = stringRedisTemplate.opsForZSet().range(cancelFollower, 0, -1);
//没有BLOG,直接结束
if(cancelFollowerBox == null || cancelFollowerBox.isEmpty()){
return;
}
//获取当前用户收件箱
String userBox = FEED_PUSH_KEY + userId;
//从当前用户的收件箱里,把被取关用户的博客删除
for(String blogId : cancelFollowerBox){
stringRedisTemplate.opsForZSet().remove(userBox,blogId);
}
}
}
package com.brrbaii.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static com.brrbaii.common.MQConstants.*;
@Configuration
public class MQConfig {
//拉取blog队列
@Bean
public Queue pullBlogQueue(){
System.out.println("为什么没给我创建队列?");
return new Queue(PULL_BLOG_QUEUE);
}
//移出blog队列
@Bean
public Queue pushBlogQueue(){
return new Queue(PUSH_BLOG_QUEUE);
}
//声明交换机
@Bean
public DirectExchange blogDirectExchange(){
return new DirectExchange(DIRECT_EXCHANGE_BLOG);
}
@Bean
public Binding pullBlogBinding(Queue pullBlogQueue, DirectExchange blogDirectExchange){
return BindingBuilder
.bind(pullBlogQueue)
.to(blogDirectExchange)
.with(PULL_BLOG_ROUTING_KEY);
}
@Bean
public Binding pushBlogBinding(Queue pushBlogQueue, DirectExchange blogDirectExchange){
return BindingBuilder
.bind(pushBlogQueue)
.to(blogDirectExchange)
.with(PUSH_BLOG_ROUTING_KEY);
}
}