当前位置: 首页 > article >正文

SpringBoot Redis list 消息队列

文章目录

  • 参考
  • 消息队列
  • list
    • 源码
  • pub/sub
    • 源码

参考

https://www.cnblogs.com/uniqueDong/p/15904837.html
https://www.cnblogs.com/wzh2010/p/17205390.html
https://blog.csdn.net/qq_16557637/article/details/121015736
https://developer.aliyun.com/article/1095035
https://blog.csdn.net/sco5282/article/details/132904956

消息队列

消息队列可以实现消息解耦、消息路由、异步处理、流量削峰填谷。主流消息队列有kafka, rabbitmq, rocketmq
Redis也可以实现消息队列。方式有

  1. list
  2. pub/sub
  3. stream

list

redis的list底层是链表,满足先进先出。
list实现队列比较方便。同时可以满足有序,消息去重。缺点是

  1. 没有订阅功能,消费者要主动查询队列。而为了避免频繁查询队列消耗CPU资源,可以采用阻塞式查询。redis中阻塞查询命令是brpop
  2. 无法保证可靠性。缺少消息确认机制,无法及时感知遗漏消息,导致数据不一致。

源码

完整项目在https://gitcode.com/zsss1/redis_mq/overview
pom.xml添加redisson依赖。

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-redis</artifactId>
 </dependency>
 <dependency>
     <groupId>org.redisson</groupId>
     <artifactId>redisson-spring-boot-starter</artifactId>
     <version>3.40.2</version>
 </dependency>

Redission封装依赖。

@SpringBootTest(classes = DemoApplication.class)
public class RedisListTest {
    @Autowired
    private RedissonClient client;

    private static final String REDIS_QUEUE = "list_queue";

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListTest.class);
    
    @Test
    public void test_redis_list_mq() throws Exception {
        RedissonBlockingDeque r;
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                producer("message" + i);
            }
        }).start();

        new Thread(() -> {
            consumer();
        }).start();

        Thread.currentThread().join();
    }

    // 消费者,阻塞
    public void consumer() {
        RBlockingDeque<String> deque = client.getBlockingDeque(REDIS_QUEUE);
        boolean isCheck = true;
        while (isCheck) {
            try {
                String message = deque.takeLast();
                System.out.println("consumer: " + message);
            } catch (InterruptedException e) {
                LOGGER.error("consumer failed, cause: {}", e.getMessage());
            }
        }
    }

    // 生产者
    public void producer(String message) {
        RBlockingDeque<String> deque = client.getBlockingDeque(REDIS_QUEUE);
        System.out.println(deque.getClass());
        try {
            deque.putFirst(message);
        } catch (InterruptedException e) {
            LOGGER.error("producer failed, msg: {}, cause: {}", message, e.getMessage());
        }
    }
}

pub/sub

发布订阅模式是一种消息传递模式。发送者将消息发送到频道,订阅者订阅频道即可及时收到消息。
它支持组生产者与消费者。但是它会丢失消息。
Redis在server端为每个消费者保留一块内存区域,存储该消费者订阅的数据。如果消费者处理速度慢,内存区域满了,那么Redis会断开消费者连接,这会导致消息丢失。

源码

  1. 定义频道。
public class TopicChannel {
    public static final String SEND_PHONE = "send_phone";
    public static final String SEND_EMAIL = "send_email";
}
  1. 定义监听频道的订阅者。分清org.springframework.data.redis.connection.MessageListenerorg.redisson.api.listener.MessageListener
public class MyMessageListener implements MessageListener {

    private static Map<String, Consumer<String>> RULE = new HashMap<>();

    static {
        RULE.put(TopicChannel.SEND_EMAIL, MyMessageListener::sendEmail);
        RULE.put(TopicChannel.SEND_PHONE, MyMessageListener::sendPhone);
    }

    public static void sendEmail(String msg) {
        System.out.println("listen email:" + msg);
    }

    public static void sendPhone(String msg) {
        System.out.println("listen phone:" + msg);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] byteChannel = message.getChannel();
        byte[] byteBody = message.getBody();
        try {
            String channel = new String(byteChannel);
            String body = new String(byteBody);
            System.out.println("channel: + " + channel + ", body: " + body);
            RULE.get(channel).accept(body);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}
  1. 在redis注册订阅者。
@Component
public class RedisConfig {
    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        return new MessageListenerAdapter(new MyMessageListener());
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // messageListenerAdapter 订阅 SEND_EMAIL 频道
        container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicChannel.SEND_EMAIL));
        // messageListenerAdapter 订阅 SEND_PHONE 频道
        container.addMessageListener(messageListenerAdapter, new PatternTopic(TopicChannel.SEND_PHONE));
        return container;
    }
}
  1. 测试
@SpringBootTest(classes = DemoApplication.class)
public class MyListener {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Test
    public void test_pub() {
        redisTemplate.convertAndSend(TopicChannel.SEND_EMAIL, "pub email message");
        redisTemplate.convertAndSend(TopicChannel.SEND_PHONE, "pub phone message");
    }
}

测试结果

channel: + send_email, body: pub email message
listen email:pub email message
channel: + send_phone, body: pub phone message
listen phone:pub phone message

http://www.kler.cn/a/446290.html

相关文章:

  • 实现 WebSocket 接入文心一言
  • 【JavaEE进阶】关于Maven
  • 【机器人】ATM 用于策略学习的任意点轨迹建模 RSS 2024 | 论文精读
  • 开放词汇目标检测(Open-Vocabulary Object Detection, OVOD)综述
  • AlipayHK支付宝HK接入-商户收款(PHP)
  • 蓝桥杯练习生第四天
  • HTTP—03
  • Flutter组件————FloatingActionButton
  • 【优先算法】双指针 --(结合例题讲解解题思路)(C++)
  • 【java】全文索引,普通索引,以及ES搜索引擎组件的关系
  • MATLAB中cvx工具箱的使用
  • 三次翻转实现数组元素的旋转
  • 深入了解Python模拟负载均衡器:将请求高效分发至多个服务器
  • Emacs折腾日记(四)——elisp控制结构
  • Django 模板分割及多语言支持案例【需求文档】-->【实现方案】
  • springboot 3 websocket react 系统提示,选手实时数据更新监控
  • Flask内存马学习
  • (12)YOLOv10算法基本原理
  • 基于语义的NLP任务去重:大语言模型应用与实践
  • 数据结构-栈与队列
  • GaussDB数据库迁移方案介绍
  • 某医疗行业用户基于Apache SeaTunnel从调研选型到企业数据集成框架的落地实践
  • 智慧商城:购物车模块基本静态结构 + 构建vuex cart模块,获取数据存储(异步actions)
  • 图解HTTP-HTTP状态码
  • ECharts散点图-SymbolShapeMorph,附视频讲解与代码下载
  • Go 语言GC(垃圾回收)的工作原理