【Python3】异步操作 redis
aioredis 在高版本已经不支持了, 不要用
代码示例
- redis 连接池
- 异步操作redis以及接口
import asyncio
from sanic import Sanic
from sanic.response import json
import redis.asyncio as redis
from redis.asyncio import ConnectionPool
# 创建 Sanic 应用
app = Sanic("RedisSanicApp")
# 初始化 Redis 连接池
pool = ConnectionPool(host='localhost', port=6379, db=0)
redis_client = redis.StrictRedis(connection_pool=pool)
@app.route("/set_redis/<key>/<value>")
async def set_redis(request, key, value):
try:
# 使用连接池异步设置键值对
await redis_client.set(key, value)
return json({"status": "OK", "message": f"Data saved: {key} = {value}"})
except Exception as e:
return json({"status": "error", "message": str(e)})
@app.route("/get_redis/<key>")
async def get_redis(request, key):
try:
# 使用连接池异步获取数据
value = await redis_client.get(key)
if value:
return json({"status": "OK", "key": key, "value": value.decode()})
else:
return json({"status": "not_found", "message": f"Key '{key}' not found"})
except Exception as e:
return json({"status": "error", "message": str(e)})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
测试接口
存数据到 Redis:
curl http://localhost:8000/set_redis/my_key/my_value
这将会在 Redis 中存入键为 my_key,值为 my_value 的数据。
从 Redis 获取数据:
curl http://localhost:8000/get_redis/my_key
这将返回 Redis 中 my_key 对应的值,应该是 my_value。
异步redis pipeline
from sanic import Sanic
from sanic.response import json
import redis.asyncio as redis
# 创建 Sanic 应用实例
app = Sanic("async_redis_pipeline_example")
# 创建 Redis 客户端
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
@app.route("/pipeline_set/<key>/<value>")
async def pipeline_set(request, key, value):
# 使用 pipeline 批量执行多个 Redis 命令
async with redis_client.pipeline() as pipe:
# 将多个命令添加到 pipeline
await pipe.set(key, value)
await pipe.get(key)
# 执行所有命令
result = await pipe.execute()
# 获取执行结果
set_result = result[0] # set 命令的返回结果
get_result = result[1] # get 命令的返回结果
return json({"set_result": set_result, "key": key, "value": get_result.decode('utf-8')})
@app.route("/pipeline_incr/<key>")
async def pipeline_incr(request, key):
# 使用 pipeline 执行多个增量操作
async with redis_client.pipeline() as pipe:
# 预先增加多个值
await pipe.incr(key)
await pipe.incr(key)
await pipe.incr(key)
# 执行所有命令
result = await pipe.execute()
# 返回最后结果
return json({"key": key, "value": result[-1]})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
注意
虽然可以手动管理 pipeline,
# 全局定义的 pipeline
pipeline = redis_client.pipeline()
资源管理问题:全局的 pipeline 是共享的,如果不同的请求同时使用它,可能会导致并发问题,因为 Pipeline 并不是线程安全的。
如果你在一个请求期间同时修改和执行管道的命令,可能会导致错误。
但最好使用
async with redis_client.pipeline() as pipe
来确保管道在每次请求处理后都能正确关闭,避免因手动管理造成的资源泄漏或并发问题。这样做更安全、简洁。