当前位置: 首页 > article >正文

【Python3】异步操作 redis

aioredis 在高版本已经不支持了, 不要用

代码示例

  • redis 连接池
  • 异步操作redis以及接口
import asyncio
from sanic import Sanic
from sanic.response import json
import redis.asyncio as redis
from redis.asyncio import ConnectionPool

# 创建 Sanic 应用
app = Sanic("RedisSanicApp")

# 初始化 Redis 连接池
pool = ConnectionPool(host='localhost', port=6379, db=0)
redis_client = redis.StrictRedis(connection_pool=pool)

@app.route("/set_redis/<key>/<value>")
async def set_redis(request, key, value):
    try:
        # 使用连接池异步设置键值对
        await redis_client.set(key, value)
        return json({"status": "OK", "message": f"Data saved: {key} = {value}"})
    except Exception as e:
        return json({"status": "error", "message": str(e)})

@app.route("/get_redis/<key>")
async def get_redis(request, key):
    try:
        # 使用连接池异步获取数据
        value = await redis_client.get(key)
        if value:
            return json({"status": "OK", "key": key, "value": value.decode()})
        else:
            return json({"status": "not_found", "message": f"Key '{key}' not found"})
    except Exception as e:
        return json({"status": "error", "message": str(e)})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

测试接口
存数据到 Redis:

curl http://localhost:8000/set_redis/my_key/my_value

这将会在 Redis 中存入键为 my_key,值为 my_value 的数据。

从 Redis 获取数据:

curl http://localhost:8000/get_redis/my_key

这将返回 Redis 中 my_key 对应的值,应该是 my_value。

异步redis pipeline

from sanic import Sanic
from sanic.response import json
import redis.asyncio as redis

# 创建 Sanic 应用实例
app = Sanic("async_redis_pipeline_example")

# 创建 Redis 客户端
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

@app.route("/pipeline_set/<key>/<value>")
async def pipeline_set(request, key, value):
    # 使用 pipeline 批量执行多个 Redis 命令
    async with redis_client.pipeline() as pipe:
        # 将多个命令添加到 pipeline
        await pipe.set(key, value)
        await pipe.get(key)
        # 执行所有命令
        result = await pipe.execute()
    
    # 获取执行结果
    set_result = result[0]  # set 命令的返回结果
    get_result = result[1]  # get 命令的返回结果
    return json({"set_result": set_result, "key": key, "value": get_result.decode('utf-8')})

@app.route("/pipeline_incr/<key>")
async def pipeline_incr(request, key):
    # 使用 pipeline 执行多个增量操作
    async with redis_client.pipeline() as pipe:
        # 预先增加多个值
        await pipe.incr(key)
        await pipe.incr(key)
        await pipe.incr(key)
        # 执行所有命令
        result = await pipe.execute()
    
    # 返回最后结果
    return json({"key": key, "value": result[-1]})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

注意

虽然可以手动管理 pipeline,

# 全局定义的 pipeline
pipeline = redis_client.pipeline()

资源管理问题:全局的 pipeline 是共享的,如果不同的请求同时使用它,可能会导致并发问题,因为 Pipeline 并不是线程安全的。

如果你在一个请求期间同时修改和执行管道的命令,可能会导致错误。

最好使用

async with redis_client.pipeline() as pipe 

来确保管道在每次请求处理后都能正确关闭,避免因手动管理造成的资源泄漏或并发问题。这样做更安全、简洁。


http://www.kler.cn/a/473344.html

相关文章:

  • java mail 535 Login Fail. Please enter your authorization code to login
  • 《解锁计算机视觉智慧:编程实现图片场景文字描述的开源宝藏》
  • 云安全博客阅读(三)
  • MATLAB深度学习实战文字识别
  • 用豆包MarsCode IDE打造精美数据大屏:从零开始的指南
  • 鸿蒙开发(29)弹性布局 (Flex)
  • 69.基于SpringBoot + Vue实现的前后端分离-家乡特色推荐系统(项目 + 论文PPT)
  • 通过可穿戴外骨骼,以更灵活的方式操作你的机器人。
  • 分布式主键ID生成方式-snowflake雪花算法
  • 迅为RK3568开发板篇OpenHarmony配置HDF驱动控制LED-修改HCS硬件配置
  • 电脑硬盘系统迁移及问题处理
  • linux相关conda操作
  • 深度学习中的卷积和反卷积(二)——反卷积的介绍
  • 智能化API接口:重塑电商数据交互的未来
  • 软件工程期末总结
  • 互联网全景消息(9)之Kafka深度剖析(上)
  • Agent | Dify中的两种可选模式
  • VUE3封装一个Hook
  • C#Struct堆栈
  • MYSQL重置密码
  • Rakuten 乐天积分系统从 Cassandra 到 TiDB 的选型与实战
  • Mysql连接报错排查解决记录
  • docker学习记录:创建mongodb副本集
  • RAG应用在得物开放平台的智能答疑的探索
  • linux 服务器清理
  • Go语言的数据库编程