当前位置: 首页 > article >正文

使用 Redis List 和 Pub/Sub 实现简单的消息队列

使用 Redis List 和 Pub/Sub 实现简单的消息队列

Redis 本身不是专门的消息队列系统,但它提供了多种数据结构(如 List、Pub/Sub、Stream)来实现消息队列功能。根据不同的业务需求,可以选择不同的方式:

在 Redis 中,可以使用 ListPub/Sub 模块实现简单的消息队列。两者的适用场景和实现方式有所不同:

  • List(列表):适用于任务队列(Task Queue),支持持久化存储,消费者可以消费历史消息,支持 多消费者竞争消费(类似于 Kafka)。
  • Pub/Sub(发布/订阅):适用于实时推送(Event Notification),不存储消息,消费者只能接收发布时刻的消息,适合 多消费者广播消费(类似于 RabbitMQ Fanout)。

方式一:使用 Redis List 实现简单的消息队列

Redis 的 LPUSHBRPOP 操作可以用来构建一个 基于拉取的消息队列

1. 生产者(Producer)

生产者将消息推送到 Redis List 的尾部:

LPUSH my_queue "message1"
LPUSH my_queue "message2"

或在 Python 中:

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)
r.lpush('my_queue', 'message1')
r.lpush('my_queue', 'message2')
  • LPUSH my_queue "message":将新消息插入队列的 左侧(头部)
  • RPUSH my_queue "message" 也可以使用,它会将消息插入队列的 右侧(尾部)

2. 消费者(Consumer)

消费者使用 BRPOP(阻塞)或 RPOP(非阻塞)从队列的 右侧 弹出消息:

BRPOP my_queue 0

或在 Python 中:

while True:
    msg = r.brpop('my_queue', timeout=0)  # 阻塞模式
    if msg:
        print("Received:", msg[1])  # msg[1] 是消息内容
  • BRPOP my_queue 0:如果队列为空,则 阻塞 等待新的消息。
  • RPOP my_queue:如果队列为空,直接返回 None,不会阻塞。

3. 多消费者

多个消费者可以竞争消费消息,即每条消息只会被 其中一个 消费者消费。例如,有两个消费者在 BRPOP 同一个队列,Redis 只会把某个消息分配给其中一个。

4. 消息确认与持久化

由于 Redis List 只存储消息,不支持自动重试,因此可以配合 LPUSH+RPOPLPUSH 事务 实现持久化:

  • 先用 RPOPLPUSH my_queue processing_queue 把消息从 my_queue 转移到 processing_queue,然后再处理。
  • 处理完成后,从 processing_queue 中删除该消息。
msg = r.rpoplpush('my_queue', 'processing_queue')  # 转移到处理中队列
if msg:
    process_message(msg)  # 处理消息
    r.lrem('processing_queue', 1, msg)  # 处理完成后删除

方式二:使用 Redis Pub/Sub 实现消息队列

Pub/Sub 适用于实时消息推送,消息不会存储,适合事件广播。

1. 生产者(Publisher)

发布者向某个频道(channel)发送消息:

PUBLISH my_channel "message1"

或在 Python 中:

r.publish('my_channel', 'message1')

2. 消费者(Subscriber)

订阅者监听消息:

SUBSCRIBE my_channel

或在 Python:

pubsub = r.pubsub()
pubsub.subscribe('my_channel')

for message in pubsub.listen():
    if message['type'] == 'message':
        print("Received:", message['data'].decode())

3. Pub/Sub 适用场景

  • 实时消息推送(如 WebSocket、聊天室)。
  • 事件驱动系统(如日志收集、状态变更通知)。
  • 多消费者广播消费,所有订阅者都会收到相同的消息。

4. Pub/Sub 局限性

  • 消息 不会持久化,如果订阅者掉线,它不会收到丢失的消息。
  • 不能确保 消息按顺序消费
  • 无法回溯历史消息(相比 Kafka)。

总结:Redis List vs Pub/Sub

特性Redis ListRedis Pub/Sub
消息存储存储在 List,直到被消费不存储,实时传输
消费者模型多消费者竞争消费(类似任务队列)多消费者广播消费(类似事件通知)
可靠性支持重试和确认机制订阅者掉线会丢失消息
适用场景任务队列(如延迟任务、任务分发)实时推送(如聊天、事件通知)

如果需要 持久化队列,建议使用 Redis List;如果只是 实时推送,可以用 Pub/Sub



http://www.kler.cn/a/520044.html

相关文章:

  • 14-6-1C++STL的list
  • Synology 群辉NAS安装(4)docker-compose
  • 高频 SQL 50 题(基础版)_620. 有趣的电影
  • 每日一题 427. 建立四叉树
  • HTML常见文本标签解析:从基础到进阶的全面指南
  • 电力场效应晶体管(电力 MOSFET),全控型器件
  • 代码随想录训练营第五十八天| 拓扑排序精讲 dijkstra(朴素版)精讲
  • Vue3 provide/inject用法总结
  • 解锁.NET Standard库:从0到1的创建与打包秘籍
  • 使用递归函数求1~n之和
  • 基于SpringBoot的网上考试系统
  • 11.渲染管线——光栅化阶段
  • 低代码系统-产品架构案例介绍、简道云(七)
  • Linux编译安装Netgen/NGSolve
  • Kafka与ZooKeeper
  • RabbitMQ5-死信队列
  • 深度学习项目--基于LSTM的糖尿病预测探究(pytorch实现)
  • 4070s显卡部署Deepseek R1
  • 如何快速开发LabVIEW项目,成为LabVIEW开发的高手
  • Java实战项目-基于 springboot 的校园选课小程序(附源码,部署,文档)
  • 网工_PPP协议
  • Pyecharts之图表组合与布局优化
  • 从音频到 PDF:AI 全流程打造完美英文绘本教案
  • 自然语言处理——从原理、经典模型到应用
  • Alibaba Spring Cloud 六 Seata 的核心组件:RM
  • 【6】YOLOv8 训练自己的分割数据集