当前位置: 首页 > article >正文

Redis 简单的消息队列

使用redis 进行简单的队列很容易,不需要使用较为复杂的MQ队列,直接使用redis 进行,不过唯一不足的需要自己构造生产者消费者,这里使用while True的方法进行消费者操作

目录

  • 介绍
  • 数据类型
    • String
    • Hash
  • 重要命令
  • 消息队列

介绍

key-value 存储系统,是跨平台的非关系型数据库。Redis 通常被称为数据结构服务器,因为值(value)可以是字符串(String)、哈希(Hash)、列表(list)、集合(sets)和有序集合(sorted sets)等类型。

常用于缓存、消息队列、会话存储等应用场景。

  • **性能极高:**Redis 以其极高的性能而著称,能够支持每秒数十万次的读写操作24。这使得Redis成为处理高并发请求的理想选择,尤其是在需要快速响应的场景中,如缓存、会话管理、排行榜等。
  • **丰富的数据类型:**Redis 不仅支持基本的键值存储,还提供了丰富的数据类型,包括字符串、列表、集合、哈希表、有序集合等。这些数据类型为开发者提供了灵活的数据操作能力,使得Redis可以适应各种不同的应用场景。
  • **原子性操作:**Redis 的所有操作都是原子性的,这意味着操作要么完全执行,要么完全不执行。这种特性对于确保数据的一致性和完整性至关重要,尤其是在高并发环境下处理事务时。
  • **持久化:**Redis 支持数据的持久化,可以将内存中的数据保存到磁盘中,以便在系统重启后恢复数据。这为 Redis 提供了数据安全性,确保数据不会因为系统故障而丢失。
  • **支持发布/订阅模式:**Redis 内置了发布/订阅模式(Pub/Sub),允许客户端之间通过消息传递进行通信。这使得 Redis 可以作为消息队列和实时数据传输的平台。
  • **单线程模型:**尽管 Redis 是单线程的,但它通过高效的事件驱动模型来处理并发请求,确保了高性能和低延迟。单线程模型也简化了并发控制的复杂性。
  • **主从复制:**Redis 支持主从复制,可以通过从节点来备份数据或分担读请求,提高数据的可用性和系统的伸缩性。
  • **应用场景广泛:**Redis 被广泛应用于各种场景,包括但不限于缓存系统、会话存储、排行榜、实时分析、地理空间数据索引等。
  • **社区支持:**Redis 拥有一个活跃的开发者社区,提供了大量的文档、教程和第三方库,这为开发者提供了强大的支持和丰富的资源。
  • **跨平台兼容性:**Redis 可以在多种操作系统上运行,包括 Linux、macOS 和 Windows,这使得它能够在不同的技术栈中灵活部署。
    • **支持 Lua 脚本:**Redis 支持使用 Lua 脚本来编写复杂的操作,这些脚本可以在服务器端执行,提供了更多的灵活性和强大的功能。

数据类型

Redis主要支持以下几种数据类型:

  • string(字符串):

    基本的数据存储单元,可以存储字符串、整数或者浮点数。

  • hash(哈希):

    一个键值对集合,可以存储多个字段。

  • list(列表):

    一个简单的列表,可以存储一系列的字符串元素。

  • set(集合):

    一个无序集合,可以存储不重复的字符串元素。

  • zset(sorted set:社群集合):

    相似集合,但是每个元素都有一个分数(score)关联。

  • 位图(Bitmaps):

    基于操作字符串类型,可以对每个位进行。

  • 超日志(HyperLogLogs):

    用于基本统计,可以提示集合中的唯一元素数量。

  • 地理空间(Geospatial):

    用于存储断层信息。

  • 发布/订阅(Pub/Sub):

    一种消息通信模式,允许客户端订阅消息通道,并接收发布到该通道的消息。

  • 流(Streams):

    用于消息队列和日志存储,支持消息的持久化和时间排序。

  • 模块(Modules):

    Redis支持动态加载模块,可以扩展Redis的功能。

String

string 是 redis 最基本的类型,string类型最大存储512MB

  • SET key value:设置键的值。
  • GET key:获取键的值。
  • INCR key:将键的值加 1。
  • DECR key:将键的值减 1。
  • APPEND key value:将值追加到键的值之后。
> set run "1"

Hash

Redis hash 是一个键值(key=>value)对集合,类似于一个小型的 NoSQL 数据库。

Redis hash 是一个 string 类型的 field 和 value 的映射表,hash 特别适合用于存储对象。

  • HSET key field value:设置哈希表中字段的值。
  • HGET key field:获取哈希表中字段的值。
  • HGETALL key:获取哈希表中所有字段和值。
  • HDEL key field:删除哈希表中的一个或多个字段。

最常用的还是string,其他用到了再说。

Redis支持多个数据库,并且每个数据库的数据是隔离的不能共享,并且基于单机才有,如果是集群就没有数据库的概念。

Redis是一个字典结构的存储服务器,而实际上一个Redis实例提供了多个用来存储数据的字典,客户端可以指定将数据存储在哪个字典中。这与我们熟知的在一个关系数据库实例中可以创建多个数据库类似,所以可以将其中的每个字典都理解成一个独立的数据库。

每个数据库对外都是一个从0开始的递增数字命名,Redis默认支持16个数据库(可以通过配置文件支持更多,无上限),可以通过配置databases来修改这一数字。客户端与Redis建立连接后会自动选择0号数据库,不过可以随时使用SELECT命令更换数据库

重要命令

命令执行后输出 (integer) 1,否则将输出 (integer) 0

set
del
get
keys * 
lrange queue 0 -1  从第一个元素 (0) 到最后一个元素 (-1)

BLPOP key1 [key2 ] timeout
移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

发布订阅 pub/sub 模式

在这里插入图片描述

需要开启两个redis-cli 客户端

第一个:

subscribe chat 
订阅该频道

第二个:

publish chat "redis-test"
往chat频道发消息

消息队列

使用redis 进行消息队列,关键就是有消费者、生产者的动作,这里以python一段代码进行启发:

#redis_aclient.py
from contextlib import asynccontextmanager

from django.conf import settings
from redis.asyncio import from_url as redis_from_url

class RedisClient:
    def __init__(self):
        self._client = None

    def connect(self):
        if self._client is None:
            # logger.info("Connecting to Redis...")
            self._client = redis_from_url(settings.REDIS_URL, decode_responses=True)

    @asynccontextmanager
    async def get_client(self):
        if self._client is None:
            self.connect()
        try:
            yield self._client
        finally:
            await self.close()

    async def close(self):
        if self._client:
            # logger.info("Shutting down Redis connection...")
            await self._client.aclose()
            # logger.info("Redis connection closed.")
            self._client = None

# 单例模式 - 实例化 RedisClient
redis_client_manager = RedisClient()

调用生产者消费者的逻辑

import asyncio
import json
import os

import django
from asgiref.sync import sync_to_async



async def generate_by_queue(answer_id, answer_data):
    """
    向 Redis 队列中推送任务
    """
    task_data = {
        "answer_id": answer_id,
        "answer_data": answer_data
    }
    try:
        async with redis_client_manager.get_client() as redis_client:
            logger.info(f"Pushing task to queue answer_id: {answer_id}")
            await redis_client.rpush("ai_report_task_queue", json.dumps(task_data))
    except Exception as e:
        logger.info(f"Error pushing task to queue: {e}")

async def process_tasks_by_ai_explain_answer():
    """
    从 Redis 队列中获取任务并进行处理
    """
    try:
        async with redis_client_manager.get_client() as redis_client:
            while True:
                try:
                    # 阻塞直到有任务出现
                    task = await redis_client.blpop("queue")
                    if task:
                        task_data = json.loads(task[1])  # 解析任务数据
                        answer_id = task_data.get("answer_id")
                        logger.info(f"Processing task answer_id {answer_id}")
                        answer_data = task_data.get("answer_data")

                        ai_explain = await generate_ai_report(answer_data)

                        await update_answer_record(answer_id, ai_explain)
                        logger.info(f"Task answer_id {answer_id} processed")
                except Exception as e:
                    logger.info(f"Error processing task {e}")
    except asyncio.CancelledError:
        logger.info("Task processing cancelled.")
    except Exception as e:
        logger.info(f"Unexpected error: {e}")


if __name__ == "__main__":
    try:
        asyncio.run(process_tasks_by_ai_explain_answer())
    except KeyboardInterrupt:
        asyncio.run(redis_client_manager.close())


http://www.kler.cn/news/326302.html

相关文章:

  • 并发、并行和异步设计
  • Linux 信号保存
  • 【菜菜的sklearn机器学习】(4)随机森林
  • 六,MyBatis-Plus 扩展功能(逻辑删除,通用枚举,字段类型处理,自动填充功能,防全表更新与删除插件,MybatisX快速开发插件)
  • 探索基于知识图谱和 ChatGPT 结合制造服务推荐前沿
  • 【Android 14源码分析】WMS-窗口显示-流程概览与应用端流程分析
  • C语言中的日志机制:打造全面强大的日志系统
  • 翻译:Recent Event Camera Innovations: A Survey
  • 30秒内交易股票,程序化交易的定义与特点
  • 【Windows】自定义显示器的分辨率
  • @Transactional的实现原理
  • openKylin--安装 .net6.0
  • 【linux】gdb
  • 鸿蒙HarmonyOS之封装Http请求工具类
  • Spring Boot 进阶-第一个程序HelloWorld
  • C语言 | Leetcode C语言题解之第447题回旋镖的数量
  • knowLedge-Vue I18n 是 Vue.js 的国际化插件
  • SpringMVC源码-AbstractHandlerMethodMapping处理器映射器将@Controller修饰类方法存储到处理器映射器
  • 关于开发板与虚拟机网络不通问题排查
  • 在线点餐新体验:Spring Boot 点餐系统
  • excel不经过后台实现解析和预览(vue)
  • YOLOv8 Flask整合问题
  • Git 使用方法
  • c++泛型编程
  • 【hot100-java】【二叉树的层序遍历】
  • Excel:常用函数
  • vue中异步批量删除列表数据
  • 常用的MySQL日期、时间函数
  • 视频集成与融合项目中需要视频编码,但是分辨率不兼容怎么办?
  • 使用 C++ 实现卷积运算:从理论到实践的详细指南