Redis 简单的消息队列
使用redis 进行简单的队列很容易,不需要使用较为复杂的MQ队列,直接使用redis 进行,不过唯一不足的需要自己构造生产者消费者,这里使用while True的方法进行消费者操作
目录
- 介绍
- 数据类型
- String
- Hash
- 重要命令
- 消息队列
介绍
key-value 存储系统,是跨平台的非关系型数据库。Redis 通常被称为数据结构服务器,因为值(value)可以是字符串(String)、哈希(Hash)、列表(list)、集合(sets)和有序集合(sorted sets)等类型。
常用于缓存、消息队列、会话存储等应用场景。
- **性能极高:**Redis 以其极高的性能而著称,能够支持每秒数十万次的读写操作24。这使得Redis成为处理高并发请求的理想选择,尤其是在需要快速响应的场景中,如缓存、会话管理、排行榜等。
- **丰富的数据类型:**Redis 不仅支持基本的键值存储,还提供了丰富的数据类型,包括字符串、列表、集合、哈希表、有序集合等。这些数据类型为开发者提供了灵活的数据操作能力,使得Redis可以适应各种不同的应用场景。
- **原子性操作:**Redis 的所有操作都是原子性的,这意味着操作要么完全执行,要么完全不执行。这种特性对于确保数据的一致性和完整性至关重要,尤其是在高并发环境下处理事务时。
- **持久化:**Redis 支持数据的持久化,可以将内存中的数据保存到磁盘中,以便在系统重启后恢复数据。这为 Redis 提供了数据安全性,确保数据不会因为系统故障而丢失。
- **支持发布/订阅模式:**Redis 内置了发布/订阅模式(Pub/Sub),允许客户端之间通过消息传递进行通信。这使得 Redis 可以作为消息队列和实时数据传输的平台。
- **单线程模型:**尽管 Redis 是单线程的,但它通过高效的事件驱动模型来处理并发请求,确保了高性能和低延迟。单线程模型也简化了并发控制的复杂性。
- **主从复制:**Redis 支持主从复制,可以通过从节点来备份数据或分担读请求,提高数据的可用性和系统的伸缩性。
- **应用场景广泛:**Redis 被广泛应用于各种场景,包括但不限于缓存系统、会话存储、排行榜、实时分析、地理空间数据索引等。
- **社区支持:**Redis 拥有一个活跃的开发者社区,提供了大量的文档、教程和第三方库,这为开发者提供了强大的支持和丰富的资源。
- **跨平台兼容性:**Redis 可以在多种操作系统上运行,包括 Linux、macOS 和 Windows,这使得它能够在不同的技术栈中灵活部署。
• **支持 Lua 脚本:**Redis 支持使用 Lua 脚本来编写复杂的操作,这些脚本可以在服务器端执行,提供了更多的灵活性和强大的功能。
数据类型
Redis主要支持以下几种数据类型:
-
string(字符串):
基本的数据存储单元,可以存储字符串、整数或者浮点数。
-
hash(哈希):
一个键值对集合,可以存储多个字段。
-
list(列表):
一个简单的列表,可以存储一系列的字符串元素。
-
set(集合):
一个无序集合,可以存储不重复的字符串元素。
-
zset(sorted set:社群集合):
相似集合,但是每个元素都有一个分数(score)关联。
-
位图(Bitmaps):
基于操作字符串类型,可以对每个位进行。
-
超日志(HyperLogLogs):
用于基本统计,可以提示集合中的唯一元素数量。
-
地理空间(Geospatial):
用于存储断层信息。
-
发布/订阅(Pub/Sub):
一种消息通信模式,允许客户端订阅消息通道,并接收发布到该通道的消息。
-
流(Streams):
用于消息队列和日志存储,支持消息的持久化和时间排序。
-
模块(Modules):
Redis支持动态加载模块,可以扩展Redis的功能。
String
string 是 redis 最基本的类型,string类型最大存储512MB
SET key value
:设置键的值。GET key
:获取键的值。INCR key
:将键的值加 1。DECR key
:将键的值减 1。APPEND key value
:将值追加到键的值之后。
> set run "1"
Hash
Redis hash 是一个键值(key=>value)对集合,类似于一个小型的 NoSQL 数据库。
Redis hash 是一个 string 类型的 field 和 value 的映射表,hash 特别适合用于存储对象。
HSET key field value
:设置哈希表中字段的值。HGET key field
:获取哈希表中字段的值。HGETALL key
:获取哈希表中所有字段和值。HDEL key field
:删除哈希表中的一个或多个字段。
最常用的还是string,其他用到了再说。
Redis支持多个数据库,并且每个数据库的数据是隔离的不能共享,并且基于单机才有,如果是集群就没有数据库的概念。
Redis是一个字典结构的存储服务器,而实际上一个Redis实例提供了多个用来存储数据的字典,客户端可以指定将数据存储在哪个字典中。这与我们熟知的在一个关系数据库实例中可以创建多个数据库类似,所以可以将其中的每个字典都理解成一个独立的数据库。
每个数据库对外都是一个从0开始的递增数字命名,Redis默认支持16个数据库(可以通过配置文件支持更多,无上限),可以通过配置databases来修改这一数字。客户端与Redis建立连接后会自动选择0号数据库,不过可以随时使用SELECT命令更换数据库
重要命令
命令执行后输出 (integer) 1,否则将输出 (integer) 0
set
del
get
keys *
lrange queue 0 -1 从第一个元素 (0) 到最后一个元素 (-1)
BLPOP key1 [key2 ] timeout
移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
发布订阅 pub/sub 模式
需要开启两个redis-cli 客户端
第一个:
subscribe chat
订阅该频道
第二个:
publish chat "redis-test"
往chat频道发消息
消息队列
使用redis 进行消息队列,关键就是有消费者、生产者的动作,这里以python一段代码进行启发:
#redis_aclient.py
from contextlib import asynccontextmanager
from django.conf import settings
from redis.asyncio import from_url as redis_from_url
class RedisClient:
def __init__(self):
self._client = None
def connect(self):
if self._client is None:
# logger.info("Connecting to Redis...")
self._client = redis_from_url(settings.REDIS_URL, decode_responses=True)
@asynccontextmanager
async def get_client(self):
if self._client is None:
self.connect()
try:
yield self._client
finally:
await self.close()
async def close(self):
if self._client:
# logger.info("Shutting down Redis connection...")
await self._client.aclose()
# logger.info("Redis connection closed.")
self._client = None
# 单例模式 - 实例化 RedisClient
redis_client_manager = RedisClient()
调用生产者消费者的逻辑
import asyncio
import json
import os
import django
from asgiref.sync import sync_to_async
async def generate_by_queue(answer_id, answer_data):
"""
向 Redis 队列中推送任务
"""
task_data = {
"answer_id": answer_id,
"answer_data": answer_data
}
try:
async with redis_client_manager.get_client() as redis_client:
logger.info(f"Pushing task to queue answer_id: {answer_id}")
await redis_client.rpush("ai_report_task_queue", json.dumps(task_data))
except Exception as e:
logger.info(f"Error pushing task to queue: {e}")
async def process_tasks_by_ai_explain_answer():
"""
从 Redis 队列中获取任务并进行处理
"""
try:
async with redis_client_manager.get_client() as redis_client:
while True:
try:
# 阻塞直到有任务出现
task = await redis_client.blpop("queue")
if task:
task_data = json.loads(task[1]) # 解析任务数据
answer_id = task_data.get("answer_id")
logger.info(f"Processing task answer_id {answer_id}")
answer_data = task_data.get("answer_data")
ai_explain = await generate_ai_report(answer_data)
await update_answer_record(answer_id, ai_explain)
logger.info(f"Task answer_id {answer_id} processed")
except Exception as e:
logger.info(f"Error processing task {e}")
except asyncio.CancelledError:
logger.info("Task processing cancelled.")
except Exception as e:
logger.info(f"Unexpected error: {e}")
if __name__ == "__main__":
try:
asyncio.run(process_tasks_by_ai_explain_answer())
except KeyboardInterrupt:
asyncio.run(redis_client_manager.close())