当前位置: 首页 > article >正文

记 ==> 首次使用rabbitMQ优化项目

昨天刚学习完了rabbitMQ,刚好我的项目有个模块挺符合使用rabbitMQ进行异步处理的。

这个模块大概功能是:用户发送的所有帖子都会添加到他的发件箱,当有个新用户关注了他,他发件箱内所有的博客都会被添加到关注他的用户的收件箱里

比如:A关注B,A的收件箱内添加B的所有帖子

后续B再发帖子,直接推送到A的收件箱

有点类似抖音,当我们关注了一个up主,我们可以在关注列表里刷到这个up主的视频,并且是按新发布时间降序排序的

接下来看看我的代码,(ps:本人小白勿喷)

当用户关注一个博主后,利用redis的Zset集合完成feed流TimeLine模式的读扩散(拉模式),从博主的收件箱内拉取所有帖子到用户收件箱。当博主后续再新发帖子,才用写扩散(推模式)推送到收件箱

当用户取关后,从用户收件箱内移除掉被取关博主的帖子

我的理解是,这个接口的功能就是单纯的实现用户关注和取关,并不关注帖子的拉取和删除,并不该把这两个功能冗杂在一个方法里。

于是我便打算用rabbitMQ的directExchange模式进行消息发送,关注或者取关后把消息发布出去,不关注对帖子的拉取的行为,让监听者去慢慢的拉。

修改代码~~

把当前用户和被关注取关的用户ID一起当做消息发送出去

引入AMQP依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml内添加rabbitmq配置

更改序列化规则,后续消息使用map发送

写一个config,配置一下交互机,队列,并完成绑定

然后写两个方法分别接听拉取帖子和取消帖子这两个queue

拉取队列

取关队列

我把源码贴出来,献丑啦

package com.brrbaii.mqListener;

import com.brrbaii.dto.Result;
import com.brrbaii.dto.UserDTO;
import com.brrbaii.utils.UserHolder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Set;

import static com.brrbaii.common.MQConstants.*;
import static com.brrbaii.common.RedisConstants.*;
import static com.brrbaii.utils.SystemConstants.ALREADY_FOLLOW;

@Component
public class FollowListener {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    /**
     * 用户关注后,把被关注用户发件箱内的所有BLOG添加到当前用户的收件箱
     * @param map:被关注的用户ID和当前用户ID
     */
    @RabbitListener(queues = PULL_BLOG_QUEUE)
    public void pullBlog2UserBox(Map<String,String> map){
        if( map == null){
            return;
        }
        String followerId = map.get("followedUserId");
        String userId = map.get("userId");
        //被关注的用户key
        String followBox = FEED_USER_KEY + followerId;

        //功能:关注后,把该用户的发件箱内数据拷贝到粉丝收件箱
        //获取被关注用户所有博客--从0到当前时间就是获取全部,并按照降序排序
        Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
                .reverseRangeByScoreWithScores(followBox, 0, System.currentTimeMillis());

        //用户没有博客,不用获取
        if (typedTuples==null || typedTuples.isEmpty()){
            return;
        }
        //获取当前用户的key
        String userBOx = FEED_PUSH_KEY + userId;

        //把被关注用户的blog添加到“我”的收件箱
        for(ZSetOperations.TypedTuple<String> tuple:typedTuples){
            stringRedisTemplate.opsForZSet().add(
                    userBOx,
                    tuple.getValue(),
                    tuple.getScore()
            );
        }
    }

    /**
     * 用户取关后,从用户的收件箱内清除被取关用户的BLOG
     * @param map:被关注的用户ID和当前用户ID
     */
    @RabbitListener(queues = PUSH_BLOG_QUEUE)
    public void pushBlogFromUserBox(Map<String,String> map){
        if( map == null){
            return;
        }
        String followerId = map.get("followedUserId");
        String userId = map.get("userId");

        String cancelFollower = FEED_USER_KEY + followerId;
        //获取被取关用户的所有blog,这里无需关注score
        Set<String> cancelFollowerBox = stringRedisTemplate.opsForZSet().range(cancelFollower, 0, -1);

        //没有BLOG,直接结束
        if(cancelFollowerBox == null || cancelFollowerBox.isEmpty()){
            return;
        }

        //获取当前用户收件箱
        String userBox = FEED_PUSH_KEY + userId;

        //从当前用户的收件箱里,把被取关用户的博客删除
        for(String blogId : cancelFollowerBox){
            stringRedisTemplate.opsForZSet().remove(userBox,blogId);
        }
    }

}
package com.brrbaii.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static com.brrbaii.common.MQConstants.*;

@Configuration
public class MQConfig {
    //拉取blog队列
    @Bean
    public Queue pullBlogQueue(){
        System.out.println("为什么没给我创建队列?");
        return new Queue(PULL_BLOG_QUEUE);
    }

    //移出blog队列
    @Bean
    public Queue pushBlogQueue(){
        return new Queue(PUSH_BLOG_QUEUE);
    }

    //声明交换机
    @Bean
    public DirectExchange blogDirectExchange(){
        return new DirectExchange(DIRECT_EXCHANGE_BLOG);
    }

    @Bean
    public Binding pullBlogBinding(Queue pullBlogQueue, DirectExchange blogDirectExchange){
        return BindingBuilder
                .bind(pullBlogQueue)
                .to(blogDirectExchange)
                .with(PULL_BLOG_ROUTING_KEY);
    }
    @Bean
    public Binding pushBlogBinding(Queue pushBlogQueue, DirectExchange blogDirectExchange){
        return BindingBuilder
                .bind(pushBlogQueue)
                .to(blogDirectExchange)
                .with(PUSH_BLOG_ROUTING_KEY);
    }

}


http://www.kler.cn/news/11872.html

相关文章:

  • 万字长文介绍R package “vegan”——入门学习与重复文献数据
  • 奥威BI数据可视化大屏分享|多场景、多风格
  • 300元左右的蓝牙耳机哪个好?300左右音质最好的蓝牙耳机
  • 全面解析反欺诈(羊毛盾)API,助你识别各类欺诈风险
  • 尚融宝14-集成redis缓存
  • 【C语言】位运算 {位运算的应用 :关闭位,判断位,打开位,转置位;位域}
  • 服装标签二维码,要如何制作?含品牌案例
  • SQL Server 数据操控,视图和索引
  • copilot使用教程
  • X509证书以及相关java常用接口
  • 如何采药才最值当——记忆化搜索
  • 锁子甲 bulid+sim
  • 【SpringBoot】使用JWT验证
  • Redis 6.0的多线程是如何工作的
  • 决策树算法介绍
  • MySQL索引及SQL优化
  • SpringBoot中对Bean的处理方法以及第三方Bean处理的方式
  • 快问快答--时间序列常用的数据增强方法有哪些?python有集成库吗?
  • pandas中df.groupby详解?
  • 车企跨界背后,智能手机进入新增长时代
  • BBR算法
  • C++——模板初阶与泛型编程
  • 【Linux】进程地址空间
  • 两层神经网络的参数求导过程
  • Redis分布式锁的实现方式及底层原理
  • 数字图像处理(第三版)绪论笔记
  • vue监视数据的原理、收集表单数据
  • 「SQL面试题库」 No_36 树节点
  • 【机器视觉------标定篇(二)】三点成圆算法(求相机旋转中心)
  • 【算法LearnNO.1】算法介绍以及算法的时间复杂度和空间复杂度