Dify 中的 Bearer Token 与 API-Key 鉴权方式
本文使用 Dify v0.10.2 版本,在 Dify 中包括 Bearer Token 与 API-Key 鉴权这 2 种方式。console(URL 前缀/console/api)和 web(URL 前缀/api)蓝图使用的是 Bearer Token 鉴权方式,而 service_api(URL 前缀/v1)蓝图使用的是 API-Key 鉴权方式。console 蓝图通过 login_required 装饰器[1]实现,web 蓝图通过继承 WebApiResource,即 validate_jwt_token 装饰器实现,而 service_api 通过 validate_app_token 装饰器实现。
一.service_api 蓝图与 API-Key 鉴权
API-Key 与 Bearer Token 鉴权区别,如下所示:
特性 | API-Key | Bearer Token |
---|---|---|
定义 | 一种用于识别调用者身份的密钥 | 一种令牌,表示用户已经通过认证 |
格式 | 通常为简单的字符串 | 通常为 "Bearer <token>" 格式 |
存储位置 | 可以在请求头、URL 查询参数或请求体中 | 通常存储在请求头中 |
认证过程 | 直接使用 API-Key | 需要通过认证流程获取令牌 |
有效期 | 通常没有有效期或长期有效 | 通常有有效期(如几分钟到几小时) |
安全性 | 相对较低,容易被截获 | 更高,通过过期和刷新机制提高安全性 |
使用场景 | 简单的 API 访问 | 用户身份验证和授权 |
撤销机制 | 需要手动更换或删除 API-Key | 可以通过令牌失效或刷新实现 |
适用性 | 适用于无状态 API 调用 | 适用于需要用户会话的应用 |
1.service_api 蓝图
简单理解,只有这一个地方的 API 使用的是 API-Key 鉴权方式,其它地方均使用 earer Token 鉴权方式。
2.validate_app_token 装饰器
该装饰器 validate_app_token
用于验证应用程序的 API 令牌,检查相关的应用和租户状态,并根据需要提取用户信息,为后续的视图函数提供必要的上下文。
(1)函数定义和参数
def validate_app_token(view: Optional[Callable] = None, *, fetch_user_arg: Optional[FetchUserArg] = None):
- 定义一个装饰器
validate_app_token
,接受一个可选的视图函数view
和一个关键字参数fetch_user_arg
。 fetch_user_arg
是一个可选参数,用于指定如何从请求中获取用户 ID。
(2)装饰器内部定义
def decorator(view_func):
- 定义内部函数
decorator
,它接受要装饰的视图函数view_func
。
(3)装饰的视图函数
@wraps(view_func)
def decorated_view(*args, **kwargs):
- 使用
functools.wraps
保留原始视图函数的元数据。 - 定义内部函数
decorated_view
,接受任意位置和关键字参数。
(4)获取和验证 API 令牌
api_token = validate_and_get_api_token("app")
- 调用函数
validate_and_get_api_token
验证并获取 API 令牌,类型为 “app”。
(5)查询应用模型
app_model = db.session.query(App).filter(App.id == api_token.app_id).first()
if not app_model:
raise Forbidden("The app no longer exists.")
- 查询数据库,获取与
api_token
中的app_id
关联的应用模型app_model
。 - 如果未找到该应用模型,抛出
Forbidden
异常,说明应用不存在。
(6)检查应用状态
if app_model.status != "normal":
raise Forbidden("The app's status is abnormal.")
- 检查应用的状态是否为正常。如果不是,则抛出
Forbidden
异常,说明应用状态异常。
if not app_model.enable_api:
raise Forbidden("The app's API service has been disabled.")
- 检查应用的 API 服务是否启用。如果禁用,则抛出
Forbidden
异常。
(7)查询租户模型
tenant = db.session.query(Tenant).filter(Tenant.id == app_model.tenant_id).first()
if tenant.status == TenantStatus.ARCHIVE:
raise Forbidden("The workspace's status is archived.")
- 查询与
app_model
相关联的租户模型tenant
,并检查租户的状态。 - 如果租户状态为归档(
ARCHIVE
),则抛出Forbidden
异常。
(8)将应用模型传递给视图函数
kwargs["app_model"] = app_model
- 将
app_model
添加到kwargs
中,以便传递给被装饰的视图函数。
(9)获取用户 ID
if fetch_user_arg:
if fetch_user_arg.fetch_from == WhereisUserArg.QUERY:
user_id = request.args.get("user")
elif fetch_user_arg.fetch_from == WhereisUserArg.JSON:
user_id = request.get_json().get("user")
elif fetch_user_arg.fetch_from == WhereisUserArg.FORM:
user_id = request.form.get("user")
else:
# use default-user
user_id = None
- 如果
fetch_user_arg
被提供,根据其fetch_from
属性从不同来源获取用户 ID:- 如果是查询参数(
QUERY
),从请求参数中获取。 - 如果是 JSON 体(
JSON
),从请求 JSON 中获取。 - 如果是表单数据(
FORM
),从请求表单中获取。 - 如果没有指定来源,则
user_id
设置为None
。
- 如果是查询参数(
(10)检查用户 ID 的有效性
if not user_id and fetch_user_arg.required:
raise ValueError("Arg user must be provided.")
- 如果
user_id
不存在且fetch_user_arg
指定了该参数是必需的,则抛出ValueError
。
if user_id:
user_id = str(user_id)
如果 user_id
存在,则将其转换为字符串。
(11)创建或更新用户信息
kwargs["end_user"] = create_or_update_end_user_for_user_id(app_model, user_id)
- 调用函数
create_or_update_end_user_for_user_id
,根据app_model
和user_id
创建或更新用户信息,并将结果存入kwargs
中。
(12)返回视图函数的结果
return view_func(*args, **kwargs)
- 调用被装饰的视图函数,传递原始参数和
kwargs
,并返回结果。
(13)装饰器的返回
return decorated_view
- 返回装饰后的视图函数
decorated_view
。
(14)检查视图函数参数
if view is None:
return decorator
else:
return decorator(view)
- 如果
view
参数为None
,则返回decorator
函数;否则将视图函数view
传递给decorator
并返回。
二.console 蓝图与 Bearer Token 鉴权(用户登录操作)
源码位置:dify\api\controllers\console\auth\login.py
该接口返回结果示例,如下所示:
{
"result": "success",
"data": {
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiZWJiZjI1YmUtZWM4NS00ZDY4LTlkZDItNWJjMmVlMjg1NWU0IiwiZXhwIjoxNzMwNjg5Mjk4LCJpc3MiOiJTRUxGX0hPU1RFRCIsInN1YiI6IkNvbnNvbGUgQVBJIFBhc3Nwb3J0In0.EPfEaeoDY8NVeS7TDC95-B0rvFYjH9BVqiFY9IwYcVQ",
"refresh_token": "32a914bb7d7dac9b1b7b5ba98d6584007acbcba4638374d8e0171877f5c9eee34eae400917aadc433d7801181367579dd72894ae8f512aaba00929ab98bf509f"
}
}
1.AccountService.authenticate()方法
account = AccountService.authenticate(args[“email”], args[“password”])该方法实现了基于邮箱和密码的账户认证流程,包括账户查询、状态检查、密码哈希、首次登录处理和状态更新。如果所有条件都满足,最后将返回一个有效的 Account
对象。
@staticmethod
def authenticate(email: str, password: str, invite_token: Optional[str] = None) -> Account:
<em>"""authenticate account with email and password"""</em>
<em> </em>account = Account.query.filter_by(email=email).first()
if not account:
raise AccountNotFoundError()
if account.status == AccountStatus.BANNED.value:
raise AccountLoginError("Account is banned.")
if password and invite_token and account.password is None:
# if invite_token is valid, set password and password_salt
salt = secrets.token_bytes(16)
base64_salt = base64.b64encode(salt).decode()
password_hashed = hash_password(password, salt)
base64_password_hashed = base64.b64encode(password_hashed).decode()
account.password = base64_password_hashed
account.password_salt = base64_salt
if account.password is None or not compare_password(password, account.password, account.password_salt):
raise AccountPasswordError("Invalid email or password.")
if account.status == AccountStatus.PENDING.value:
account.status = AccountStatus.ACTIVE.value
account.initialized_at = datetime.now(timezone.utc).replace(tzinfo=None)
db.session.commit()
return account
生成 16 字节的随机盐值,并将其编码为 Base64 字符串,以便后续用于密码哈希。对用户提供的密码进行哈希处理,使用生成的盐值,然后将哈希结果编码为 Base64 字符串。将哈希后的密码和盐值存储到账户对象中。
2. AccountService.login()方法
token_pair = AccountService.login(account=account, ip_address=extract_remote_ip(request))
@staticmethod
def login(account: Account, *, ip_address: Optional[str] = None) -> TokenPair:
if ip_address:
AccountService.update_login_info(account=account, ip_address=ip_address)
if account.status == AccountStatus.PENDING.value:
account.status = AccountStatus.ACTIVE.value
db.session.commit()
access_token = AccountService.get_account_jwt_token(account=account)
refresh_token = _generate_refresh_token()
AccountService._store_refresh_token(refresh_token, account.id)
return TokenPair(access_token=access_token, refresh_token=refresh_token)
(1)AccountService.get_account_jwt_token(account=account)
该方法主要是生成 jwt token 的过程,如下所示:
@staticmethod
def get_account_jwt_token(account: Account) -> str:
exp_dt = datetime.now(timezone.utc) + timedelta(minutes=dify_config.ACCESS_TOKEN_EXPIRE_MINUTES)
exp = int(exp_dt.timestamp())
payload = {
"user_id": account.id,
"exp": exp,
"iss": dify_config.EDITION,
"sub": "Console API Passport",
}
token = PassportService().issue(payload) # jwt.encode(payload, self.sk, algorithm="HS256")
return token
(2)_generate_refresh_token()
该方法主要是生成 refresh token 的过程,如下所示:
def _generate_refresh_token(length: int = 64):
token = secrets.token_hex(length) # 返回一个随机的文本字符串,使用十六进制表示。
return token
(3)AccountService._store_refresh_token(refresh_token, account.id)
@staticmethod
def _store_refresh_token(refresh_token: str, account_id: str) -> None:
redis_client.setex(AccountService._get_refresh_token_key(refresh_token), REFRESH_TOKEN_EXPIRY, account_id)
redis_client.setex(
AccountService._get_account_refresh_token_key(account_id), REFRESH_TOKEN_EXPIRY, refresh_token
)
该方法的作用是将刷新令牌和账户 ID 以键值对的形式存储到 Redis 中,并设置过期时间,以便后续的身份验证和管理。当用户登录或需要刷新令牌时,可以快速查找这些信息。
3.AccountService.reset_login_error_rate_limit()方法
AccountService.reset_login_error_rate_limit(args[“email”])
@staticmethod
def reset_login_error_rate_limit(email: str):
key = f"login_error_rate_limit:{email}"
redis_client.delete(key)
该方法用于重置指定邮箱的登录错误次数限制,通过删除与该邮箱相关的 Redis 键。
三.console 蓝图与 Bearer Token 鉴权(用户登出操作)
1./console/api/logout 接口
http://localhost:5001/console/api/logout
Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiZWJiZjI1YmUtZWM4NS00ZDY4LTlkZDItNWJjMmVlMjg1NWU0IiwiZXhwIjoxNzMwNjg1NTcyLCJpc3MiOiJTRUxGX0hPU1RFRCIsInN1YiI6IkNvbnNvbGUgQVBJIFBhc3Nwb3J0In0.2e_5wPvf8GU9JARduOiIc06loBlQv0mgUrfkbMceeSg
2.AccountService.logout()
源码位置:dify\api\controllers\console\auth\login.py
AccountService.logout(account=account)该方法用于用户登出操作,首先尝试从 Redis 中获取用户的刷新令牌,如果存在,则删除该令牌,以实现用户登出的效果。
@staticmethod
def logout(*, account: Account) -> None:
refresh_token = redis_client.get(AccountService._get_account_refresh_token_key(account.id))
if refresh_token:
AccountService._delete_refresh_token(refresh_token.decode("utf-8"), account.id)
使用 redis_client
从 Redis 数据库中获取与给定 account
(账户)相关的刷新令牌(refresh token)。_get_account_refresh_token_key(account.id)
是一个静态方法,用于生成存储刷新令牌的键,account.id
是账户的唯一标识符。
如果获取到了刷新令牌,调用 AccountService
的 _delete_refresh_token
方法来删除该令牌。refresh_token.decode("utf-8")
将字节串解码为字符串,传入方法以执行删除操作,同时传入 account.id
以确保删除的是对应账户的令牌。
四.Web 蓝图与 Bearer Token 鉴权
1.Web 蓝图
简单理解,将应用发布后的运行 Web 页面调用接口 URL 前缀为/api,如下所示:
2.validate_jwt_token 装饰器
这里面的类基本都继承自 WebApiResource 类,而 WebApiResource 类实现,如下所示:
class WebApiResource(Resource):
method_decorators = [validate_jwt_token]
validate_jwt_token
装饰器用于在调用视图函数之前验证 JWT。它会解码 JWT,提取出相关的用户信息,并将其传递给视图函数。如下所示:
def validate_jwt_token(view=None):
def decorator(view):
@wraps(view)
def decorated(*args, **kwargs):
app_model, end_user = decode_jwt_token()
return view(app_model, end_user, *args, **kwargs)
return decorated
if view:
return decorator(view)
return decorator
参考文献
[1] Dify 中的 @login_required 装饰器:https://z0yrmerhgi8.feishu.cn/wiki/GaSqwbQRUimJjrkQV6pcV7TKnMb
NLP工程化(星球号)
欢迎加入我的知识星球,长按或扫码下方二维码。
星球号主要针对公众号提供增值服务,内容包括但不限于:源码解析及答疑(Dify等),书籍|报告下载(电子资料),代码|数据下载(公众号付费资料)。