在 Sanic 应用中使用内存缓存管理 IP 黑名单
[外链图片转存中…(img-Pm0K9mzd-1734859380698)]
在现代 web 应用中,保护 API 接口免受恶意请求的攻击至关重要。IP 黑名单是一种常见的安全措施,可以有效阻止某些 IP 地址的访问。本文将介绍如何在 Python 的 Sanic 框架中实现 IP 黑名单功能,并结合内存缓存提升性能。
1. 环境准备
首先,确保你已经安装了 Sanic 和必要的数据库驱动程序。我们将使用 aiomysql
作为 MySQL 的异步驱动。可以使用以下命令进行安装:
pip install sanic aiomysql
2. 创建数据库表
我们需要一个数据库表来存储黑名单 IP。可以使用以下 SQL 语句创建表:
CREATE TABLE api_ip_blacklist (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '唯一标识',
ip_address VARCHAR(45) NOT NULL COMMENT '被拦截的 IP 地址,支持 IPv4 和 IPv6',
status TINYINT DEFAULT 1 COMMENT '状态,1 表示有效(黑名单),0 表示无效(可用)',
reason VARCHAR(255) DEFAULT NULL COMMENT '记录添加到黑名单的原因',
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',
update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录最后更新时间',
UNIQUE KEY (ip_address) COMMENT '确保 IP 地址唯一'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='API 接口黑名单 IP 表';
3. 实现内存缓存
我们可以创建一个 CacheObject
类来管理内存中的黑名单 IP 缓存。以下是该类的实现:
class CacheObject(object):
def __init__(self):
self.data = dict()
def put(self, key, value):
"""将 IP 地址放入缓存"""
self.data[key] = value
def get(self, key):
"""从缓存中获取 IP 地址的状态"""
return self.data.get(key)
def load_blacklist(self, ip_list):
"""从数据库加载黑名单 IP 到缓存"""
for ip in ip_list:
self.put(ip, 1) # 1 表示有效(黑名单)
def clear(self):
"""清空缓存"""
self.data.clear()
4. 集成 Sanic 应用
接下来,我们将 CacheObject
集成到 Sanic 应用中,实现动态加载和检查黑名单功能。
from sanic import Sanic, response
from sanic.exceptions import Forbidden
import aiomysql
import asyncio
app = Sanic("IPBlacklistApp")
# 创建 CacheObject 实例
blacklist_cache = CacheObject()
# 数据库配置
DB_CONFIG = {
'host': 'localhost',
'port': 3306,
'user': 'your_username',
'password': 'your_password',
'db': 'your_database'
}
async def load_blacklist():
async with aiomysql.connect(**DB_CONFIG) as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT ip_address FROM api_ip_blacklist WHERE status = 1")
rows = await cursor.fetchall()
blacklist_cache.load_blacklist([row[0] for row in rows]) # 加载黑名单到缓存
async def update_blacklist():
while True:
await load_blacklist()
await asyncio.sleep(60) # 每60秒更新一次黑名单
@app.listener('before_server_start')
async def setup_db(app, loop):
await load_blacklist() # 启动时加载黑名单到缓存
app.add_task(update_blacklist()) # 定时更新黑名单
@app.middleware("request")
async def block_blacklisted_ips(request):
client_ip = request.ip
if blacklist_cache.get(client_ip) == 1: # 检查 IP 是否在黑名单中
raise Forbidden("Your IP address is blocked.")
@app.route("/")
async def index(request):
return response.json({"message": "Welcome to the API!"})
@app.route("/data")
async def data(request):
return response.json({"data": "Here is your data!"})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)
5. 代码解析
- 数据库连接:使用
aiomysql
连接到 MySQL 数据库并查询黑名单 IP。 - 内存缓存:通过
CacheObject
类将黑名单 IP 缓存到内存中,提高查询效率。 - 中间件:在请求处理中检查客户端 IP 是否在黑名单中,若在则抛出 Forbidden 异常。
6. 测试应用
启动应用后,您可以通过 Postman 或浏览器访问接口。确保数据库中有有效的黑名单 IP 数据。访问黑名单 IP 时,您将收到 403 Forbidden 响应,表示该 IP 被阻止。
7. 总结
通过结合 Sanic 框架和内存缓存,我们成功实现了一个高效的 IP 黑名单管理系统。这种方法有效地提高了查询速度,减少了对数据库的频繁访问,为 API 接口提供了更好的安全保障。