当前位置: 首页 > article >正文

Redis 消息队列详解

Redis 消息队列详解

Redis 作为一个高性能的内存数据库,支持多种实现消息队列的方式,主要包括:

  1. Redis List(基于列表的队列)
  2. Redis Pub/Sub(发布/订阅)
  3. Redis Stream(消息流)
  4. Redis Sorted Set(延迟队列)

不同的方式适用于不同的场景,下面详细讲解各自的实现原理、适用场景以及示例代码。


1. 基于 Redis List 的消息队列

Redis 的 List 结构(链表)可以用来实现 任务队列,支持持久化存储,并允许消费者从队列中取出任务进行处理。

1.1. 基本原理

  • LPUSH queue_name message:将新任务加入队列 头部
  • RPUSH queue_name message:将新任务加入队列 尾部
  • LPOP queue_name / RPOP queue_name:从队列头部/尾部取出任务(非阻塞)。
  • BRPOP queue_name timeout:阻塞方式获取任务,如果队列为空,会等待新的任务到来。

1.2. 生产者代码

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# 添加任务到队列
r.lpush('task_queue', 'task1')
r.lpush('task_queue', 'task2')
print("Tasks added!")

1.3. 消费者代码

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

while True:
    task = r.brpop('task_queue', timeout=0)  # 阻塞模式
    if task:
        print(f"Processing task: {task[1]}")

1.4. 适用场景

  • 任务队列(Task Queue):适用于 任务分发异步处理
  • 多消费者竞争模式:多个消费者会竞争消费队列中的任务,每个任务只能被一个消费者取走。

1.5. 问题与优化

  • 消息丢失:如果消费者取出任务后崩溃,任务就会丢失。可以使用 RPOPLPUSH 实现任务持久化
    task = r.rpoplpush('task_queue', 'processing_queue')
    if task:
        process(task)
        r.lrem('processing_queue', 1, task)  # 任务完成后删除
    
  • 限流:可以使用 Redis 令牌桶滑动窗口算法 控制任务消费速率。

2. 基于 Redis Pub/Sub 的实时消息队列

Redis Pub/Sub 适用于实时广播通知,但不支持消息存储。

2.1. 基本原理

  • PUBLISH channel message:发布消息到频道。
  • SUBSCRIBE channel:订阅者监听频道消息。

2.2. 生产者代码

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)
r.publish('news_channel', 'Breaking News: Redis is awesome!')

2.3. 消费者代码

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

pubsub = r.pubsub()
pubsub.subscribe('news_channel')

for message in pubsub.listen():
    if message['type'] == 'message':
        print(f"Received message: {message['data']}")

2.4. 适用场景

  • 实时消息推送(如聊天系统、WebSocket)。
  • 事件驱动架构(如监控报警、状态变更通知)。

2.5. 问题与优化

  • 消息不会持久化,订阅者掉线后无法收到历史消息。
  • 不能回放消息,消息是实时的,一旦发送就无法重新获取。
  • 可以使用 Redis Stream 代替 来解决持久化问题。

3. 基于 Redis Stream 的持久化消息队列

Redis Stream 是一种高效、持久化的消息队列,支持消息存储、消费分组等特性,类似 Kafka。

3.1. 基本原理

  • XADD stream_name * field1 value1 field2 value2:添加消息。
  • XREAD COUNT 1 STREAMS stream_name last_id:读取消息。
  • XGROUP CREATE stream_name group_name $ MKSTREAM:创建消费组。

3.2. 生产者代码

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)
r.xadd('event_stream', {'event': 'user_signup', 'user_id': '12345'})
print("Event published!")

3.3. 消费者代码

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

while True:
    events = r.xread({'event_stream': '$'}, count=1, block=5000)
    for stream, messages in events:
        for message_id, message in messages:
            print(f"Received: {message}")

3.4. 适用场景

  • 大规模消息存储(Kafka 替代)。
  • 日志收集与分析
  • 事件溯源(能回放历史消息)。

4. 基于 Redis Sorted Set 的延迟队列

Sorted Set(有序集合)可以用于定时任务调度

4.1. 生产者代码

import time
import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)
r.zadd('delay_queue', {'task1': time.time() + 10})  # 10秒后执行

4.2. 消费者代码

import time
import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

while True:
    now = time.time()
    tasks = r.zrangebyscore('delay_queue', 0, now)
    for task in tasks:
        print(f"Processing {task}")
        r.zrem('delay_queue', task)
    time.sleep(1)

4.3. 适用场景

  • 定时任务(如定时短信、延迟执行)。
  • 超时检测(如订单过期、缓存清理)。

5. 选择合适的 Redis 消息队列

方案持久化支持多消费者适用场景
List竞争消费任务队列、异步任务
Pub/Sub广播消费实时推送、通知
Stream组消费事件流、日志收集
Sorted Set竞争消费延迟任务

总结

  • 如果需要任务队列,使用 List(支持竞争消费,适合任务调度)。
  • 如果需要实时推送,使用 Pub/Sub(广播消息,不存储)。
  • 如果需要持久化消息流,使用 Stream(类似 Kafka,支持回放)。
  • 如果需要定时任务,使用 Sorted Set(适合延迟队列)。

http://www.kler.cn/a/519867.html

相关文章:

  • 用深度学习优化供应链管理:让算法成为商业决策的引擎
  • 汽车OEMs一般出于什么目的来自定义Autosar CP一些内容
  • BLE透传方案,IoT短距无线通信的“中坚力量”
  • 设计模式之工厂模式
  • 蓝桥杯例题三
  • Vue 引入及简单示例
  • 亚博microros小车-原生ubuntu支持系列:12 URDF 模型
  • mysql如何修改密码
  • Unity开发一个单人FPS游戏的教程总结
  • 美创科技获浙江省网络空间安全协会年度表彰
  • Linux 中的poll、select和epoll有什么区别?
  • 【学习笔记】计算机网络(二)
  • 第29章 xUnit框架下的测试模式详解
  • 1、云计算
  • 什么是区块链
  • 单链表算法实战:解锁数据结构核心谜题——链表的回文结构
  • Leecode刷题C语言之完成所有交易的初始最少钱数
  • Rust 中的结构体使用指南
  • 積分方程與簡單的泛函分析8.具連續對稱核的非齊次第II類弗雷德霍姆積分算子方程
  • 【矩阵二分】力扣378. 有序矩阵中第 K 小的元素
  • 10 Hyperledger Fabric 介绍
  • 个性化的语言模型构建思路
  • 洛谷 P5709:Apples Prologue / 苹果和虫子
  • 2025年前端技术革新趋势
  • Leetcode求职题目(21)
  • 适合 C# 开发者的 Semantic Kernel 入门:用 AI 赋能你的 .NET 应用