Python FastApi 实现签名验证
大家在写后台接口时,都想要设计一个安全的,稳定的架构来支持各种业务,此文章介绍的Token的机制,和签名的验证。Token作为鉴权,签名作防篡改
目录
1.Token
2.签名
3.接口中的实现
1.Token
此处介绍的实现方式较简单,用的缓存来记录用户对应的Token,想要更完善一些的机制,可以使用数据库做记录,运行时做恢复读取处理。看实际项目规模来定
此处缓存的实现参考我的另一篇文章,有详细的介绍:Python Cache 实现缓存管理类_python缓存之importlib.cache的使用详解-CSDN博客
# 创建数据库连接对象
dao = UserDAO()
# 根据登录名查询用户信息
use = dao.GetUserByLogin(loginName=Account)
if (use != None):
# 验证密码是否一致
if (use.Password == MD5(PassWord)):
# 创建Token
token = __createToken(dao, use)
# 将生成的Token 保存到缓存中
PyCache.set(use.UserCode, use)
res = MsgModel(True, MsgErrorType.Success, "", token)
else:
# 返回错误信息
res = MsgModel(False, MsgErrorType.Login_NameOrPwdError)
else:
# 返回错误信息
res = MsgModel(False, MsgErrorType.Login_NameOrPwdError)
当用户登录成功后,会有相应的Token返回,前端就自己做登录状态记录。
请求头参数 和 Token的数据实例 参考如下:
class HeaderModel(BaseModelPY):
UserCode :str #用户编码
Timestamp :str #时间戳
Nonce:str #随机数
Signature:str #签名
Permit:int #权限
#Token令牌
class TicketAuth(BaseModelPY):
UserCode:str #用户编码
Token:str #Token令牌
CreateTime:datetime=None #创建时间
ExpireTime:datetime=None #过期时间
RefreshToken:str #刷新Token令牌
RefreshTokenExpireTime:datetime=None #刷新Token令牌 过期时间
Token的机制:
Token 不当做请求参数,但会参与签名的生成(下面会介绍)
时间戳用来判断请求时间,超过范围的视作无效的请求,也会参与签名的生成
随机数:作为参数,也会参与签名的生成
权限:一般作为权限阈值的判断,要不要使用就看项目定义了
Token有过期时效 后台判断过期后,则视为Token无效
RefreshToken的过期时效比Token要长,当Token过期后,可以使用RefreshToken进行刷新Token,前端可以自行做逻辑刷新(可以返回特定的Code作判断),从而让客户无感
当Token和RefreshToken都过期后,则需要重新登录了
2.签名
先定一个验证签名的方法 verify_signature
从请求头里边获取相应的信息,如用户编码、时间戳、随机数和签名信息
继而做一些逻辑判断,数据为空,时间戳超出范围等
用 用户编码 从缓存对像中获取后台保存的Token,判断Token的时效性
最后 后台通过缓存的Token 去生成一次签名,跟请求中的签名进行判断是否一致
签名的组装逻辑可以自行定义,比如下面的举例:
# 签名 UserCode+Timestamp+Nonce+Token 进行MD5加密 大写
将用户编码、时间戳、随机数和Token进行字符拼装,再进行MD5加密,随后转成大写
前端也是这么操作,后台也是如此生成 ; 前端通过登录时获取的Token去进行拼装,后台用缓存对象中的Token进行拼装;
这里我检讨一下:生成签名的数据应该还要包含 请求参数 才能防篡改,我这里是因为项目不对外开放,就写简单了。
实现方式就大概这么做,可以自行扩展
def verify_signature(request: Request) -> MsgModel:
try:
# 从请求头中获取认证信息
headerModel = HeaderModel()
token: TicketAuth
msg = MsgModel.Success()
IsValid: bool = True
# 通过request获取Header里边的数据模型
headerModel.UserCode = request.headers.get("UserCode") # 用户编码
headerModel.Timestamp = request.headers.get("Timestamp") # 时间戳
headerModel.Nonce = request.headers.get("Nonce") # 随机数
headerModel.Signature = request.headers.get("Signature") # 签名
# Header数据不能为空
if (IsValid and headerModel.UserCode == None or not headerModel.UserCode):
IsValid = False
msg = MsgModel(False, MsgErrorType.Header_Empty, "Header is empty")
if (IsValid):
# 验证时间戳
# 将时间戳转换为日期时间对象
ft = float(headerModel.Timestamp)/1000
ts = datetime.fromtimestamp(timestamp=ft)
# 时间计算差
time_difference = datetime.now() - ts
# 时间戳一分钟内有效
if (time_difference.seconds > 300):
IsValid = False
msg = MsgModel(False, MsgErrorType.Request_TimeOut,
f"Request Time Out[{time_difference.seconds}]")
if (IsValid):
# 判断Token是否有效
# 从缓存中取数据
cache_key = headerModel.UserCode+"_Token"
token = PyCache.get(cache_key)
if (token is not None):
# 判断Token是否过期
if (token.ExpireTime < datetime.now()):
IsValid = False
msg = MsgModel(False, MsgErrorType.Token_Expired,
"Token Have Expired")
else:
IsValid = False
msg = MsgModel(False, MsgErrorType.Token_Expired,
"Token Have Expired")
if (IsValid):
# 验证用户请求是否合法
# 签名 UserCode+Timestamp+Nonce+Token 进行MD5加密 大写
strRaw = headerModel.UserCode+headerModel.Timestamp+headerModel.Nonce+token.Token
sign = MD5(strRaw).upper() # 变为大写
# 验证参数中的签名是否一致
if (headerModel.Signature != sign):
IsValid = False
msg = MsgModel(False, MsgErrorType.Sign_Invalid,
"The signature is invalid")
if (IsValid):
try:
#从缓存数据获取用户数据 得到用户类型
#获取用户类型相应的权限
#再接口中判断是否足够的权限来调用
user = PyCache.get(headerModel.UserCode)
if (user is not None):
#获取用户类型的权限
dictList = PyCache.get("DictData")
dict = next(
(x for x in dictList if x.DictCode == user.UserType), None)
if (dict is not None):
headerModel.Permit = int(dict.DictValue)
else:
headerModel.Permit = 0
else:
headerModel.Permit = 0
except Exception as ex:
headerModel.Permit = 0
LogOperate.error("获取用户权限发生异常", ex)
return MsgModel.Internal_Error()
# headerModel.Permit=100
msg.MsgContent = headerModel
return msg
except Exception as ex:
LogOperate.error("签名验证发生异常", ex)
return MsgModel.Internal_Error()
3.接口中的实现
最后再说下在每个接口中如何实现:
--- 无需签名验证的接口 : 比如登录接口
只需要定义入参 即可
@userLoginController.post(apiPrefix+'Login', name="登录-用户登录", response_model=None, summary="登录-用户登录")
async def user_login(Account: str = Form(...), PassWord: str = Form(...)):
res = MsgModel.Failure()
return res
--- 需要签名验证的接口:
一是定义入参,二是增加调用签名验证方法 verify_signature
下面的例子:
除了分页必要的参数 page、pageSize 等等,最后有个IsValid=Depends(verify_signature) 即是签名验证方法的调用
第一行就是作为判断签名验证是否通过了
如果签名验证失败了,记得要返回IsValid对象,否则接口会返回null
if (IsValid.MsgSuccessed):
else:
return IsValid
from fastapi import APIRouter, Depends
from datetime import datetime
from Modules.Models.MsgModel import MsgModel, MsgErrorType, MsgPageModel
from Modules.SysFrame.String import getApiPrefix, getVersion, IsNullOrEmpty, Guid, MD5
from Modules.SysFrame.GlobalData import verify_signature
@orderController.get(apiPrefix+'GetOrderPageList', name="Order-分页获取订单列表数据")
async def GetOrderPageList(page: int, pageSize: int, begin: datetime, end: datetime, orderId: str, customer: str, IsValid=Depends(verify_signature)):
if (IsValid.MsgSuccessed):
try:
orderDao = OrderDAO()
res = orderDao.GetOrderPageList(
page, pageSize, begin, end, orderId, customer)
if (res.IsSucessed):
mList = res.Tag
models = []
for m in mList:
model = _Order_Entity_To_Model(m)
models.append(model.tojson())
return MsgPageModel.Success(models, total=res.Total)
else:
return MsgPageModel.Failure(msgDes=res.Message)
except Exception as ex:
LogOperate.error("GetOrderPageList", ex)
return MsgPageModel.Internal_Error()
else:
return IsValid