当前位置: 首页 > article >正文

Dify 中的 Bearer Token 与 API-Key 鉴权方式

本文使用 Dify v0.10.2 版本,在 Dify 中包括 Bearer Token 与 API-Key 鉴权这 2 种方式。console(URL 前缀/console/api)和 web(URL 前缀/api)蓝图使用的是 Bearer Token 鉴权方式,而 service_api(URL 前缀/v1)蓝图使用的是 API-Key 鉴权方式。console 蓝图通过 login_required 装饰器[1]实现,web 蓝图通过继承 WebApiResource,即 validate_jwt_token 装饰器实现,而 service_api 通过 validate_app_token 装饰器实现。

一.service_api 蓝图与 API-Key 鉴权

API-Key 与 Bearer Token 鉴权区别,如下所示:

特性API-KeyBearer Token
定义一种用于识别调用者身份的密钥一种令牌,表示用户已经通过认证
格式通常为简单的字符串通常为 "Bearer <token>" 格式
存储位置可以在请求头、URL 查询参数或请求体中通常存储在请求头中
认证过程直接使用 API-Key需要通过认证流程获取令牌
有效期通常没有有效期或长期有效通常有有效期(如几分钟到几小时)
安全性相对较低,容易被截获更高,通过过期和刷新机制提高安全性
使用场景简单的 API 访问用户身份验证和授权
撤销机制需要手动更换或删除 API-Key可以通过令牌失效或刷新实现
适用性适用于无状态 API 调用适用于需要用户会话的应用

1.service_api 蓝图

简单理解,只有这一个地方的 API 使用的是 API-Key 鉴权方式,其它地方均使用 earer Token 鉴权方式。

2.validate_app_token 装饰器

该装饰器 validate_app_token 用于验证应用程序的 API 令牌,检查相关的应用和租户状态,并根据需要提取用户信息,为后续的视图函数提供必要的上下文。

(1)函数定义和参数

def validate_app_token(view: Optional[Callable] = None, *, fetch_user_arg: Optional[FetchUserArg] = None):
  • 定义一个装饰器 validate_app_token,接受一个可选的视图函数 view 和一个关键字参数 fetch_user_arg
  • fetch_user_arg 是一个可选参数,用于指定如何从请求中获取用户 ID。

(2)装饰器内部定义

def decorator(view_func):
  • 定义内部函数 decorator,它接受要装饰的视图函数 view_func

(3)装饰的视图函数

@wraps(view_func)
def decorated_view(*args, **kwargs):
  • 使用 functools.wraps 保留原始视图函数的元数据。
  • 定义内部函数 decorated_view,接受任意位置和关键字参数。

(4)获取和验证 API 令牌

api_token = validate_and_get_api_token("app")
  • 调用函数 validate_and_get_api_token 验证并获取 API 令牌,类型为 “app”。

(5)查询应用模型

app_model = db.session.query(App).filter(App.id == api_token.app_id).first()
if not app_model:
    raise Forbidden("The app no longer exists.")
  • 查询数据库,获取与 api_token 中的 app_id 关联的应用模型 app_model
  • 如果未找到该应用模型,抛出 Forbidden 异常,说明应用不存在。

(6)检查应用状态

if app_model.status != "normal":
    raise Forbidden("The app's status is abnormal.")
  • 检查应用的状态是否为正常。如果不是,则抛出 Forbidden 异常,说明应用状态异常。
if not app_model.enable_api:
    raise Forbidden("The app's API service has been disabled.")
  • 检查应用的 API 服务是否启用。如果禁用,则抛出 Forbidden 异常。

(7)查询租户模型

tenant = db.session.query(Tenant).filter(Tenant.id == app_model.tenant_id).first()
if tenant.status == TenantStatus.ARCHIVE:
    raise Forbidden("The workspace's status is archived.")
  • 查询与 app_model 相关联的租户模型 tenant,并检查租户的状态。
  • 如果租户状态为归档(ARCHIVE),则抛出 Forbidden 异常。

(8)将应用模型传递给视图函数

kwargs["app_model"] = app_model
  • app_model 添加到 kwargs 中,以便传递给被装饰的视图函数。

(9)获取用户 ID

if fetch_user_arg:
    if fetch_user_arg.fetch_from == WhereisUserArg.QUERY:
        user_id = request.args.get("user")
    elif fetch_user_arg.fetch_from == WhereisUserArg.JSON:
        user_id = request.get_json().get("user")
    elif fetch_user_arg.fetch_from == WhereisUserArg.FORM:
        user_id = request.form.get("user")
    else:
        # use default-user
        user_id = None
  • 如果 fetch_user_arg 被提供,根据其 fetch_from 属性从不同来源获取用户 ID:
    • 如果是查询参数(QUERY),从请求参数中获取。
    • 如果是 JSON 体(JSON),从请求 JSON 中获取。
    • 如果是表单数据(FORM),从请求表单中获取。
    • 如果没有指定来源,则 user_id 设置为 None

(10)检查用户 ID 的有效性

if not user_id and fetch_user_arg.required:
    raise ValueError("Arg user must be provided.")
  • 如果 user_id 不存在且 fetch_user_arg 指定了该参数是必需的,则抛出 ValueError
if user_id:
    user_id = str(user_id)

如果 user_id 存在,则将其转换为字符串。

(11)创建或更新用户信息

kwargs["end_user"] = create_or_update_end_user_for_user_id(app_model, user_id)
  • 调用函数 create_or_update_end_user_for_user_id,根据 app_modeluser_id 创建或更新用户信息,并将结果存入 kwargs 中。

(12)返回视图函数的结果

return view_func(*args, **kwargs)
  • 调用被装饰的视图函数,传递原始参数和 kwargs,并返回结果。

(13)装饰器的返回

return decorated_view
  • 返回装饰后的视图函数 decorated_view

(14)检查视图函数参数

if view is None:
    return decorator
else:
    return decorator(view)
  • 如果 view 参数为 None,则返回 decorator 函数;否则将视图函数 view 传递给 decorator 并返回。

二.console 蓝图与 Bearer Token 鉴权(用户登录操作)

源码位置:dify\api\controllers\console\auth\login.py

该接口返回结果示例,如下所示:

{
    "result": "success",
    "data": {
        "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiZWJiZjI1YmUtZWM4NS00ZDY4LTlkZDItNWJjMmVlMjg1NWU0IiwiZXhwIjoxNzMwNjg5Mjk4LCJpc3MiOiJTRUxGX0hPU1RFRCIsInN1YiI6IkNvbnNvbGUgQVBJIFBhc3Nwb3J0In0.EPfEaeoDY8NVeS7TDC95-B0rvFYjH9BVqiFY9IwYcVQ",
        "refresh_token": "32a914bb7d7dac9b1b7b5ba98d6584007acbcba4638374d8e0171877f5c9eee34eae400917aadc433d7801181367579dd72894ae8f512aaba00929ab98bf509f"
    }
}

1.AccountService.authenticate()方法

account = AccountService.authenticate(args[“email”], args[“password”])该方法实现了基于邮箱和密码的账户认证流程,包括账户查询、状态检查、密码哈希、首次登录处理和状态更新。如果所有条件都满足,最后将返回一个有效的 Account 对象。

@staticmethod
def authenticate(email: str, password: str, invite_token: Optional[str] = None) -> Account:
    <em>"""authenticate account with email and password"""</em>

<em>    </em>account = Account.query.filter_by(email=email).first()
    if not account:
        raise AccountNotFoundError()

    if account.status == AccountStatus.BANNED.value:
        raise AccountLoginError("Account is banned.")

    if password and invite_token and account.password is None:
        # if invite_token is valid, set password and password_salt
        salt = secrets.token_bytes(16)
        base64_salt = base64.b64encode(salt).decode()
        password_hashed = hash_password(password, salt)
        base64_password_hashed = base64.b64encode(password_hashed).decode()
        account.password = base64_password_hashed
        account.password_salt = base64_salt

    if account.password is None or not compare_password(password, account.password, account.password_salt):
        raise AccountPasswordError("Invalid email or password.")

    if account.status == AccountStatus.PENDING.value:
        account.status = AccountStatus.ACTIVE.value
        account.initialized_at = datetime.now(timezone.utc).replace(tzinfo=None)

    db.session.commit()

    return account

生成 16 字节的随机盐值,并将其编码为 Base64 字符串,以便后续用于密码哈希。对用户提供的密码进行哈希处理,使用生成的盐值,然后将哈希结果编码为 Base64 字符串。将哈希后的密码和盐值存储到账户对象中。

2. AccountService.login()方法

token_pair = AccountService.login(account=account, ip_address=extract_remote_ip(request))

@staticmethod
def login(account: Account, *, ip_address: Optional[str] = None) -> TokenPair:
    if ip_address:
        AccountService.update_login_info(account=account, ip_address=ip_address)

    if account.status == AccountStatus.PENDING.value:
        account.status = AccountStatus.ACTIVE.value
        db.session.commit()

    access_token = AccountService.get_account_jwt_token(account=account)
    refresh_token = _generate_refresh_token()

    AccountService._store_refresh_token(refresh_token, account.id)

    return TokenPair(access_token=access_token, refresh_token=refresh_token)

(1)AccountService.get_account_jwt_token(account=account)

该方法主要是生成 jwt token 的过程,如下所示:

@staticmethod
def get_account_jwt_token(account: Account) -> str:
    exp_dt = datetime.now(timezone.utc) + timedelta(minutes=dify_config.ACCESS_TOKEN_EXPIRE_MINUTES)
    exp = int(exp_dt.timestamp())
    payload = {
        "user_id": account.id,
        "exp": exp,
        "iss": dify_config.EDITION,
        "sub": "Console API Passport",
    }

    token = PassportService().issue(payload)  # jwt.encode(payload, self.sk, algorithm="HS256")
    return token

(2)_generate_refresh_token()

该方法主要是生成 refresh token 的过程,如下所示:

def _generate_refresh_token(length: int = 64):
    token = secrets.token_hex(length)  #  返回一个随机的文本字符串,使用十六进制表示。
    return token

(3)AccountService._store_refresh_token(refresh_token, account.id)

@staticmethod
def _store_refresh_token(refresh_token: str, account_id: str) -> None:
    redis_client.setex(AccountService._get_refresh_token_key(refresh_token), REFRESH_TOKEN_EXPIRY, account_id)
    redis_client.setex(
        AccountService._get_account_refresh_token_key(account_id), REFRESH_TOKEN_EXPIRY, refresh_token
    )

该方法的作用是将刷新令牌和账户 ID 以键值对的形式存储到 Redis 中,并设置过期时间,以便后续的身份验证和管理。当用户登录或需要刷新令牌时,可以快速查找这些信息。

3.AccountService.reset_login_error_rate_limit()方法

AccountService.reset_login_error_rate_limit(args[“email”])

@staticmethod
def reset_login_error_rate_limit(email: str):
    key = f"login_error_rate_limit:{email}"
    redis_client.delete(key)

该方法用于重置指定邮箱的登录错误次数限制,通过删除与该邮箱相关的 Redis 键。

三.console 蓝图与 Bearer Token 鉴权(用户登出操作)

1./console/api/logout 接口

http://localhost:5001/console/api/logout

Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiZWJiZjI1YmUtZWM4NS00ZDY4LTlkZDItNWJjMmVlMjg1NWU0IiwiZXhwIjoxNzMwNjg1NTcyLCJpc3MiOiJTRUxGX0hPU1RFRCIsInN1YiI6IkNvbnNvbGUgQVBJIFBhc3Nwb3J0In0.2e_5wPvf8GU9JARduOiIc06loBlQv0mgUrfkbMceeSg

2.AccountService.logout()

源码位置:dify\api\controllers\console\auth\login.py

AccountService.logout(account=account)该方法用于用户登出操作,首先尝试从 Redis 中获取用户的刷新令牌,如果存在,则删除该令牌,以实现用户登出的效果。

@staticmethod
def logout(*, account: Account) -> None:
    refresh_token = redis_client.get(AccountService._get_account_refresh_token_key(account.id))
    if refresh_token:
        AccountService._delete_refresh_token(refresh_token.decode("utf-8"), account.id)

使用 redis_client 从 Redis 数据库中获取与给定 account(账户)相关的刷新令牌(refresh token)。_get_account_refresh_token_key(account.id) 是一个静态方法,用于生成存储刷新令牌的键,account.id 是账户的唯一标识符。

如果获取到了刷新令牌,调用 AccountService_delete_refresh_token 方法来删除该令牌。refresh_token.decode("utf-8") 将字节串解码为字符串,传入方法以执行删除操作,同时传入 account.id 以确保删除的是对应账户的令牌。

四.Web 蓝图与 Bearer Token 鉴权

1.Web 蓝图

简单理解,将应用发布后的运行 Web 页面调用接口 URL 前缀为/api,如下所示:

2.validate_jwt_token 装饰器

这里面的类基本都继承自 WebApiResource 类,而 WebApiResource 类实现,如下所示:

class WebApiResource(Resource):
    method_decorators = [validate_jwt_token]

validate_jwt_token 装饰器用于在调用视图函数之前验证 JWT。它会解码 JWT,提取出相关的用户信息,并将其传递给视图函数。如下所示:

def validate_jwt_token(view=None):
    def decorator(view):
        @wraps(view)
        def decorated(*args, **kwargs):
            app_model, end_user = decode_jwt_token()

            return view(app_model, end_user, *args, **kwargs)

        return decorated

    if view:
        return decorator(view)
    return decorator

参考文献

[1] Dify 中的 @login_required 装饰器:https://z0yrmerhgi8.feishu.cn/wiki/GaSqwbQRUimJjrkQV6pcV7TKnMb




NLP工程化(星球号)
欢迎加入我的知识星球,长按或扫码下方二维码。

星球号主要针对公众号提供增值服务,内容包括但不限于:源码解析及答疑(Dify等),书籍|报告下载(电子资料),代码|数据下载(公众号付费资料)。


http://www.kler.cn/a/379112.html

相关文章:

  • Python酷库之旅-第三方库Pandas(193)
  • vue系列==vue路由
  • 智能网联汽车:人工智能与汽车行业的深度融合
  • aws(学习笔记第十课) 对AWS的EBS如何备份(snapshot)以及使用snapshot恢复数据,AWS实例存储
  • TDengine数据备份与恢复
  • SpringMVC笔记 一万字
  • 【postman】工具下载安装
  • UOS 安装usb wifi 网卡驱动
  • GBase与梧桐数据库数据加载与导出的差异
  • gin框架可以构建微服务吗?
  • ip报文头解析
  • 探索 ONLYOFFICE 8.2 版本:更高效、更安全的云端办公新体验
  • 51c自动驾驶~合集4
  • Autosar CP 内存抽象接口MemIf规范导读
  • 前端拖拽库方案之react-beautiful-dnd
  • 微服务day02
  • ChatGPT搜索引擎推出Chrome插件
  • React Router v6 中使用useRouteLoaderData,获取访问父路由或兄弟路由的由 loader 函数加载的数据
  • SAP ABAP开发学习——BADI增强操作步骤示例2
  • ## EPSANet论文阅读心得
  • Redis 分布式锁:原理、实现及最佳实践
  • c++ 贪心算法
  • GitGraphPro 图管理系统
  • DNS域名系统
  • c语言-8进制的表示方法
  • 【网络】网络层协议IP