Redis 消息队列详解
Redis 消息队列详解
Redis 作为一个高性能的内存数据库,支持多种实现消息队列的方式,主要包括:
- Redis List(基于列表的队列)
- Redis Pub/Sub(发布/订阅)
- Redis Stream(消息流)
- Redis Sorted Set(延迟队列)
不同的方式适用于不同的场景,下面详细讲解各自的实现原理、适用场景以及示例代码。
1. 基于 Redis List 的消息队列
Redis 的 List
结构(链表)可以用来实现 任务队列,支持持久化存储,并允许消费者从队列中取出任务进行处理。
1.1. 基本原理
LPUSH queue_name message
:将新任务加入队列 头部。RPUSH queue_name message
:将新任务加入队列 尾部。LPOP queue_name
/RPOP queue_name
:从队列头部/尾部取出任务(非阻塞)。BRPOP queue_name timeout
:阻塞方式获取任务,如果队列为空,会等待新的任务到来。
1.2. 生产者代码
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# 添加任务到队列
r.lpush('task_queue', 'task1')
r.lpush('task_queue', 'task2')
print("Tasks added!")
1.3. 消费者代码
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
while True:
task = r.brpop('task_queue', timeout=0) # 阻塞模式
if task:
print(f"Processing task: {task[1]}")
1.4. 适用场景
- 任务队列(Task Queue):适用于 任务分发 和 异步处理。
- 多消费者竞争模式:多个消费者会竞争消费队列中的任务,每个任务只能被一个消费者取走。
1.5. 问题与优化
- 消息丢失:如果消费者取出任务后崩溃,任务就会丢失。可以使用
RPOPLPUSH
实现任务持久化:task = r.rpoplpush('task_queue', 'processing_queue') if task: process(task) r.lrem('processing_queue', 1, task) # 任务完成后删除
- 限流:可以使用 Redis 令牌桶 或 滑动窗口算法 控制任务消费速率。
2. 基于 Redis Pub/Sub 的实时消息队列
Redis Pub/Sub
适用于实时广播通知,但不支持消息存储。
2.1. 基本原理
PUBLISH channel message
:发布消息到频道。SUBSCRIBE channel
:订阅者监听频道消息。
2.2. 生产者代码
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
r.publish('news_channel', 'Breaking News: Redis is awesome!')
2.3. 消费者代码
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
pubsub = r.pubsub()
pubsub.subscribe('news_channel')
for message in pubsub.listen():
if message['type'] == 'message':
print(f"Received message: {message['data']}")
2.4. 适用场景
- 实时消息推送(如聊天系统、WebSocket)。
- 事件驱动架构(如监控报警、状态变更通知)。
2.5. 问题与优化
- 消息不会持久化,订阅者掉线后无法收到历史消息。
- 不能回放消息,消息是实时的,一旦发送就无法重新获取。
- 可以使用 Redis Stream 代替 来解决持久化问题。
3. 基于 Redis Stream 的持久化消息队列
Redis Stream 是一种高效、持久化的消息队列,支持消息存储、消费分组等特性,类似 Kafka。
3.1. 基本原理
XADD stream_name * field1 value1 field2 value2
:添加消息。XREAD COUNT 1 STREAMS stream_name last_id
:读取消息。XGROUP CREATE stream_name group_name $ MKSTREAM
:创建消费组。
3.2. 生产者代码
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
r.xadd('event_stream', {'event': 'user_signup', 'user_id': '12345'})
print("Event published!")
3.3. 消费者代码
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
while True:
events = r.xread({'event_stream': '$'}, count=1, block=5000)
for stream, messages in events:
for message_id, message in messages:
print(f"Received: {message}")
3.4. 适用场景
- 大规模消息存储(Kafka 替代)。
- 日志收集与分析。
- 事件溯源(能回放历史消息)。
4. 基于 Redis Sorted Set 的延迟队列
Sorted Set(有序集合)可以用于定时任务调度。
4.1. 生产者代码
import time
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
r.zadd('delay_queue', {'task1': time.time() + 10}) # 10秒后执行
4.2. 消费者代码
import time
import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
while True:
now = time.time()
tasks = r.zrangebyscore('delay_queue', 0, now)
for task in tasks:
print(f"Processing {task}")
r.zrem('delay_queue', task)
time.sleep(1)
4.3. 适用场景
- 定时任务(如定时短信、延迟执行)。
- 超时检测(如订单过期、缓存清理)。
5. 选择合适的 Redis 消息队列
方案 | 持久化 | 支持多消费者 | 适用场景 |
---|---|---|---|
List | ✅ | 竞争消费 | 任务队列、异步任务 |
Pub/Sub | ❌ | 广播消费 | 实时推送、通知 |
Stream | ✅ | 组消费 | 事件流、日志收集 |
Sorted Set | ✅ | 竞争消费 | 延迟任务 |
总结
- 如果需要任务队列,使用
List
(支持竞争消费,适合任务调度)。 - 如果需要实时推送,使用
Pub/Sub
(广播消息,不存储)。 - 如果需要持久化消息流,使用
Stream
(类似 Kafka,支持回放)。 - 如果需要定时任务,使用
Sorted Set
(适合延迟队列)。