Write the Code. Change the World.

分类目录
6月 15

虽然 dify 支持创建多个 api-token,也修改了 api_tokens 表,加上了 account_id 进行关联。但是,通过 api-token 转换成 current_user 的时候,这个 user 总是 tenant_account_joins 表里 role 字段是 owner 的用户。既然想给每个用户都有单独的 token,就要 current_user 能对应到每个用户身上。

api-key 到 account 的转换

从 post("/v1/datasets") 接口里,往回找,就可以找到用户绑定的地方:api\controllers\service_api\dataset\dataset.py

这里就有当前用户的获取: assert isinstance(current_user, Account)

assert isinstance(current_user, Account)

api\controllers\service_api\wraps.py 里,有用户查询相关的逻辑。

        tenant_account_join = db.session.execute(
            select(Tenant, TenantAccountJoin)
            .where(Tenant.id == api_token.tenant_id)
            .where(TenantAccountJoin.tenant_id == Tenant.id)
            .where(TenantAccountJoin.role.in_(["owner"]))
            .where(Tenant.status == TenantStatus.NORMAL)
        ).one_or_none()  # TODO: only owner information is required, so only one is returned.
        if tenant_account_join:
            tenant, ta = tenant_account_join
            account = db.session.get(Account, ta.account_id)
            # Login admin
            if account:
                account.current_tenant = tenant
                current_app.login_manager._update_request_context_with_user(account)  # type: ignore
                user_logged_in.send(current_app._get_current_object(), user=current_user)  # type: ignore
            else:
                raise Unauthorized("Tenant owner account does not exist.")

既然之前在 api_tokens 表中加了 account_id 字段,那么优先就应该从 api_tokens 表查询。如果这个表列没查出对应的 account_id, 再走原来的逻辑。

def validate_dataset_token[R](view: Callable[..., R]) -> Callable[..., R]:
    positional_parameters = [
        parameter
        for parameter in inspect.signature(view).parameters.values()
        if parameter.kind
        in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD)
    ]
    expects_bound_instance = bool(
        positional_parameters and positional_parameters[0].name in {"self", "cls"}
    )

    @wraps(view)
    def decorated(*args: object, **kwargs: object) -> R:
        api_token = validate_and_get_api_token("dataset")

        if api_token.account_id:
            account = db.session.get(Account, api_token.account_id)
            if not account:
                raise Unauthorized(
                    "By account_id. Token associated account does not exist."
                )

            tenant = db.session.get(Tenant, api_token.tenant_id)
            if not tenant or tenant.status == TenantStatus.ARCHIVE:
                raise Forbidden("Workspace is invalid or archived")

            account.current_tenant = tenant
            current_app.login_manager._update_request_context_with_user(account)  # type: ignore
            user_logged_in.send(current_app._get_current_object(), user=account)
        else:
            # Flask may pass URL path parameters positionally, so inspect both kwargs and args.
            dataset_id = kwargs.get("dataset_id")

            if not dataset_id and args:
                potential_id = args[0]
                try:
                    str_id = str(potential_id)
                    if len(str_id) == 36 and str_id.count("-") == 4:
                        dataset_id = str_id
                except Exception:
                    logger.exception("Failed to parse dataset_id from positional args")

            if dataset_id:
                dataset_id = str(dataset_id)
                dataset = db.session.scalar(
                    select(Dataset)
                    .where(
                        Dataset.id == dataset_id,
                        Dataset.tenant_id == api_token.tenant_id,
                    )
                    .limit(1)
                )
                if not dataset:
                    raise NotFound("Dataset not found.")
                if not dataset.enable_api:
                    raise Forbidden("Dataset api access is not enabled.")

            tenant_account_join = db.session.execute(
                select(Tenant, TenantAccountJoin)
                .where(Tenant.id == api_token.tenant_id)
                .where(TenantAccountJoin.tenant_id == Tenant.id)
                .where(TenantAccountJoin.role.in_(["owner"]))
                .where(Tenant.status == TenantStatus.NORMAL)
            ).one_or_none()  # TODO: only owner information is required, so only one is returned.
            if tenant_account_join:
                tenant, ta = tenant_account_join
                account = db.session.get(Account, ta.account_id)
                # Login admin
                if account:
                    account.current_tenant = tenant
                    current_app.login_manager._update_request_context_with_user(account)  # type: ignore
                    user_logged_in.send(current_app._get_current_object(), user=current_user)  # type: ignore
                else:
                    raise Unauthorized("Tenant owner account does not exist.")
            else:
                raise Unauthorized("Tenant does not exist.")

        if expects_bound_instance:
            if not args:
                raise TypeError(
                    "validate_dataset_token expected a bound resource instance."
                )
            return view(args[0], api_token.tenant_id, *args[1:], **kwargs)

        return view(api_token.tenant_id, *args, **kwargs)

    return decorated

只加了:

        if api_token.account_id:
            account = db.session.get(Account, api_token.account_id)
            if not account:
                raise Unauthorized(
                    "By account_id. Token associated account does not exist."
                )

            tenant = db.session.get(Tenant, api_token.tenant_id)
            if not tenant or tenant.status == TenantStatus.ARCHIVE:
                raise Forbidden("Workspace is invalid or archived")

            account.current_tenant = tenant
            current_app.login_manager._update_request_context_with_user(account)  # type: ignore
            user_logged_in.send(current_app._get_current_object(), user=account)

到这里看似是 ok 了。但是还是会报错。在 api_token = validate_and_get_api_token("dataset") 里,api_token 可能来自 ApiTokenCache。如果来自 ApiTokenCache 这里就会报错。在 ApiTokenCache 里根本就没有定要 account_id。如果不走缓存,之前在修改数据库表的时候,在模型中已经加入了 account_id 字段,自然不会报错。现在只需要在ApiTokenCache 中加入 account_id 即可。

# api\services\api_token_service.py

    id: str
    app_id: str | None
    tenant_id: str | None
    account_id: str | None
    type: str
    token: str
    last_used_at: datetime | None
    created_at: datetime | None

        cached = CachedApiToken(
            id=str(api_token.id),
            app_id=str(api_token.app_id) if api_token.app_id else None,
            tenant_id=str(api_token.tenant_id) if api_token.tenant_id else None,
            account_id=str(api_token.account_id) if api_token.account_id else None,
            type=api_token.type,
            token=api_token.token,
            last_used_at=api_token.last_used_at,
            created_at=api_token.created_at,
        )
6月 15

dify 默认的数据库表,不一定是自己想要的。这个时候就需要取修改数据库表。修改数据库表最爽的方式就是用迁移文件。在 dify 原本的迁移目录下新增迁移文件会出现迁移版本错位的问题。比如你当前建立了一个迁移文件,过段时间拉 dify 官网代码合并后,迁移有改动,这个时候就会出现迁移版本对不上。为了解决这个问题,自定义个人迁移目录,新建迁移文件就很有必要了。

现在想给 api_tokens 表新增一个 account_id 字段。因为之前的 api_tokens 表只和 tenant 表进行关联,满足不了需求。

新建自定义迁移文件夹【该思路放弃】

修改 docker-compose.yaml 的 api 的配置。主要修改 volumes,因为默认是没有挂载目录,就会出现在容器中生成的文件在宿主机上没有。

api:
    volumes:
    - ../api:/app/api
    # 匿名卷:让 .venv 目录不被宿主机覆盖,保留镜像内的内容
    - /app/api/.venv

然后启动服务,开始干活。(从尝试到放弃)

# 启动 docker 服务
docker compose up -d

# 进入 api 容器
docker exec -it docker-api-1 bash

# 创建自定义迁移目录
flask db init --directory migrations_custom

# 查看官方迁移当前版本
flask db current

# 给自定义迁移库打基线(7bad07dc267d 是上一步查询出来的)
flask db stamp --directory migrations_custom 7bad07dc267d --purge

从尝试到放弃

想法是很美好,可是执行起来,各种问题就来了。所以,还是放弃自建迁移文件的想法了。既然迁移文件不好使,那就调用 sql 来达到实现修改字段的目的了。

在 api/extensions 目录下,新建 ext_database_custom.py 文件。代码入下。

# ext_database_custom.py
from sqlalchemy import text as sql_text
from extensions.ext_database import db
from dify_app import DifyApp

def init_custom_db_fields():
    """初始化自定义数据库字段/索引,启动自动执行"""
    try:
        with db.engine.connect() as conn:
            # 新增 account_id 字段
            conn.execute(
                sql_text(
                    "ALTER TABLE api_tokens ADD COLUMN IF NOT EXISTS account_id UUID;"
                )
            )
            # 新增联合索引
            conn.execute(
                sql_text(
                    "CREATE INDEX IF NOT EXISTS api_token_account_idx ON api_tokens(account_id, type);"
                )
            )
            conn.commit()
        print("[CustomDB] 自定义字段&索引初始化完成")
    except Exception as e:
        print(f"[CustomDB] 初始化异常: {str(e)}")

# 关键:实现 Dify 扩展标准接口 init_app
def init_app(app: DifyApp):
    # 这里要在 app 上下文里执行,否则 db 可能没绑定
    with app.app_context():
        init_custom_db_fields()

然后在 api/app_factory.py 的 initialize_extensions 函数中,引入注册插件。放在 ext_database 后边即可。

最后执行 docker compose restart 就可以了。

运行起来,查看数据库,会看到 在 api_tokens 表中已经有 account_id 字段了。

虽然数据库表里字段有了。但是模型这里也要加。

# api\models\model.py

class ApiToken(Base):  
    account_id = mapped_column(StringUUID, nullable=True)
6月 15

拉取 dify 官方源代码构建服务。虽然能跑起来,正常运行。可是很多功能不是自己想要的,或者自己想要更多的功能。这个时候就需要修改源码。 dify 源码都提供了,修改后,重新构建镜像使用不就可以实现愿望了吗。

使用自定义镜像

修改 docker/docker-compose.yaml 文件,删掉官方的镜像,使用本地的镜像构建。前端 web 页面和 api 接口的配置修改。

  api:
    <<: *shared-api-worker-config
    image: user-dify-api:1.14.2
    user: root
    build:
      context: ..
      dockerfile: api/Dockerfile

  web:
    image: user-dify-web:1.14.2
    build:
      context: ..
      dockerfile: web/Dockerfile

这样就好了。如果需要增加环境变量,在 .env 和 .env.example 还有 docker-compose.yaml 中加入就好。 volumes 也看着办。

8月 04

mac 默认安装的是 python 2.7。即使你安装了 3.x 版本,终端中默认也不会使 调用这个。可以配置下就可以。

操作

处理之前

python --version
# 输出 Python  2.7

python3 --verison
# 输出 Python 3.9.6

pip list 
# 找不到

我自己的终端改成 item 了。对应的是 ~/.zshrc。默认是 ~/.bash_profile。
好了,我们改一波。

vim ~/.zshrc

# GG 移动到末尾 添加下边配置
alias python="/usr/local/bin/python3"
alias pip="/usr/local/bin/pip3"

# 然后 source 一下,要不不立马生效
source ~/.zshrc

好了,就这样了。然后再测试下版本以及pip。