Write the Code. Change the World.

9月 10
精选文章

该后台使用 vite + ts + pnpm + vue3 + element-plus + tailwindcss 等技术栈构成。没有添加任意可视化图标等插件。以最小功能,最基础功能展现。用户可以额外添加可使用的插件逻辑。

该后台后端使用 php8.2 + laravel 10 + mysql

该后台后端 go 语言版本开发中。将使用 gframe2.5.2

源码: https://github.com/vini123/simpleAdmin

在线体验: https://www.zeipan.com/admin

权限以及密码一键复位: https://v3test.yuepaibao.com/admin/api/reset

测试账号以及密码: zhoulin@xiangrong.pro、 111111 (如果发现登录不了,可一键复位谢谢)

阅读全文 >>

6月 15

运行起 dify 后,在控制台里可以看到登录调用的接口文件定义在这里:api\controllers\console\auth\login.py 现在想扩展新的接口,可以在当前目录下新建一个 register.py 文件,加入以下代码。

import os
import hashlib
import functools
import time
import logging
import secrets
import base64

from flask import make_response, request
from flask_restx import Resource
from pydantic import BaseModel, Field, field_validator, ValidationError
from sqlalchemy import select, update
from typing import Literal, Optional

from controllers.common.fields import SimpleResultOptionalDataResponse
from controllers.common.schema import (
    register_response_schema_models,
    register_schema_models,
)
from controllers.console import console_ns
from libs.helper import EmailStr
from libs.helper import timezone as validate_timezone_string
from libs.password import hash_password
from constants.languages import get_valid_language
from models import Account, Tenant, TenantAccountJoin
from extensions.ext_database import db
from services.errors.account import AccountRegisterError
from controllers.console.wraps import (
    decrypt_password_field,
    setup_required,
)

from sqlalchemy import func
from models import ApiToken

logger = logging.getLogger(__name__)

# 签名校验装饰器
def private_api_auth_required(f):
    @functools.wraps(f)
    def decorated(*args, **kwargs):
        private_secret = os.environ.get("CONSOLE_PRIVATE_API_SECRET")
        if not private_secret or len(private_secret) != 12:
            return {"result": "fail", "message": "Private API service unavailable"}, 503

        req_time = request.headers.get("X-API-TIME", "")
        req_sign = request.headers.get("X-API-SIGN", "")

        if not req_time or not req_sign:
            return {"result": "fail", "message": "Missing required headers"}, 400

        try:
            timestamp = int(req_time)
            if timestamp > 9999999999:
                timestamp = timestamp // 1000
        except (ValueError, TypeError):
            return {"result": "fail", "message": "Invalid timestamp"}, 400

        if abs(int(time.time()) - timestamp) > 300:
            return {"result": "fail", "message": "Request expired"}, 401

        calc_sign = (
            hashlib.md5(f"{private_secret}{req_time}".encode()).hexdigest().lower()
        )
        if calc_sign != req_sign.lower():
            return {"result": "fail", "message": "Invalid signature"}, 401

        return f(*args, **kwargs)

    return decorated

# 请求模型:字段改为 name,timezone 默认 Asia/Shanghai
class RegisterPayload(BaseModel):
    email: EmailStr = Field(..., description="邮箱")
    name: str = Field(..., min_length=1, max_length=50, description="用户名")
    password: str = Field(..., description="明文密码")
    language: str | None = Field(default=None, description="界面语言")
    timezone: str = Field(
        default="Asia/Shanghai", description="时区,默认 Asia/Shanghai"
    )

    @field_validator("timezone")
    @classmethod
    def validate_timezone(cls, value):
        return validate_timezone_string(value) if value else "Asia/Shanghai"

# 获取用户详情
class UserInfoPayload(BaseModel):
    account_id: str = Field(..., description="用户ID")

class GenerateApiTokenPayload(BaseModel):
    account_id: str = Field(..., description="用户ID")
    tenant_id: str = Field(..., description="租户ID")
    # 限定只能是 dataset / app
    type: Literal["dataset", "app"] = Field(
        ..., description="令牌类型,仅支持 dataset、app"
    )
    # 非必填,默认 None
    app_id: Optional[str] = Field(None, description="应用ID,type 为 app 时必填")

    @field_validator("app_id")
    def check_app_id_required(cls, v, values):
        # values.data 获取模型已解析的其他字段
        token_type = values.data.get("type")
        if token_type == "app" and not v:
            raise ValueError("当令牌类型为 app 时,app_id 不能为空")
        if token_type == "dataset" and v is not None:
            raise ValueError("当令牌类型为 dataset 时,无需传入 app_id")
        return v

register_schema_models(
    console_ns,
    RegisterPayload,
    UserInfoPayload,
    GenerateApiTokenPayload,
)

register_response_schema_models(console_ns, SimpleResultOptionalDataResponse)

def get_first_tenant():
    stmt = select(Tenant).order_by(Tenant.created_at)
    return db.session.scalar(stmt)

# 对齐 Dify 原生密码哈希逻辑
def create_password_hash(raw_pwd: str) -> tuple[str, str]:
    salt = secrets.token_bytes(16)
    hashed = hash_password(raw_pwd, salt)
    return base64.b64encode(hashed).decode("utf-8"), base64.b64encode(salt).decode(
        "utf-8"
    )

@console_ns.route("/register")
class RegisterApi(Resource):
    @setup_required
    @private_api_auth_required
    @console_ns.expect(console_ns.models[RegisterPayload.__name__])
    @console_ns.response(
        200, "Success", console_ns.models[SimpleResultOptionalDataResponse.__name__]
    )
    @decrypt_password_field
    def post(self):
        # 单独捕获参数校验异常
        try:
            args = RegisterPayload.model_validate(console_ns.payload)
        except ValidationError as ve:
            err_info = ve.errors()[0]
            field = ".".join(map(str, err_info["loc"]))
            msg = f"Parameter error: {field} - {err_info['msg']}"
            return {"result": "fail", "message": msg}, 403

        try:
            email = args.email.strip().lower()
            name = args.name.strip()
            raw_pwd = args.password.strip()
            lang = get_valid_language(args.language)
            tz = args.timezone.strip()

            # 邮箱去重
            exist_stmt = select(Account).where(Account.email == email)
            if db.session.scalar(exist_stmt):
                raise AccountRegisterError("Email already exists")

            # 生成密码
            pwd_hash, pwd_salt = create_password_hash(raw_pwd)

            # 新建账号(字段完全对齐数据库)
            new_account = Account(
                email=email,
                name=name,
                password=pwd_hash,
                password_salt=pwd_salt,
                interface_language=lang,
                timezone=tz,
            )
            db.session.add(new_account)
            db.session.flush()

            # 租户处理
            tenant = get_first_tenant()
            if not tenant:
                tenant = Tenant(name=f"{name}'s Workspace")
                db.session.add(tenant)
                db.session.flush()

            is_first_tenant = get_first_tenant() is None
            join_rel = TenantAccountJoin(
                account_id=new_account.id,
                tenant_id=tenant.id,
                role="owner" if is_first_tenant else "admin",
                current=True,
            )
            db.session.add(join_rel)

            # 更新当前租户标记
            update_all = (
                update(TenantAccountJoin)
                .where(TenantAccountJoin.account_id == new_account.id)
                .values(current=False)
            )
            db.session.execute(update_all)

            update_curr = (
                update(TenantAccountJoin)
                .where(
                    TenantAccountJoin.account_id == new_account.id,
                    TenantAccountJoin.tenant_id == tenant.id,
                )
                .values(current=True)
            )
            db.session.execute(update_curr)

            # 统一提交,事务原子化
            db.session.commit()

            return make_response(
                {
                    "result": "success",
                    "data": {
                        "account_id": str(new_account.id),
                        "email": new_account.email,
                        "name": new_account.name,
                        "tenant_id": str(tenant.id),
                        "tenant_name": tenant.name,
                    },
                }
            )

        except Exception as e:
            db.session.rollback()
            logger.exception("Register failed, rollback transaction")
            if isinstance(e, AccountRegisterError):
                return {"result": "fail", "message": str(e)}, 400
            return {"result": "fail", "message": "Register failed"}, 500

# ===================== 获取用户基本信息 =====================
@console_ns.route("/user/info")
class UserInfoApi(Resource):
    @setup_required
    @private_api_auth_required
    def get(self):
        try:
            # 从 URL 查询参数取值,而非 payload
            account_id = request.args.get("account_id", "").strip()
            if not account_id:
                return {"result": "fail", "message": "account_id is required"}, 403
            args = UserInfoPayload(account_id=account_id)
        except ValidationError as ve:
            err_info = ve.errors()[0]
            field = ".".join(map(str, err_info["loc"]))
            msg = f"Parameter error: {field} - {err_info['msg']}"
            return {"result": "fail", "message": msg}, 403

        try:
            account_id = args.account_id.strip()
            # 1. 查询单条用户基础信息
            user_stmt = select(
                Account.id,
                Account.email,
                Account.name,
                Account.status,
                Account.timezone,
                Account.interface_language,
                Account.created_at,
            ).where(Account.id == account_id)

            user_row = db.session.execute(user_stmt).first()
            if not user_row:
                return {"result": "success", "data": None, "message": "用户不存在"}

            # 2. 查询该用户关联的租户信息
            tenant_stmt = (
                select(
                    TenantAccountJoin.account_id,
                    TenantAccountJoin.tenant_id,
                    TenantAccountJoin.role,
                    TenantAccountJoin.current,
                    Tenant.name.label("tenant_name"),
                    Tenant.created_at.label("tenant_created_at"),
                )
                .join(Tenant, TenantAccountJoin.tenant_id == Tenant.id)
                .where(TenantAccountJoin.account_id == account_id)
            )
            tenant_row = db.session.execute(tenant_stmt).first()

            # 查询用户的 api_tokens
            api_token_stmt = select(
                ApiToken.id,
                ApiToken.tenant_id,
                ApiToken.account_id,
                ApiToken.type,
                ApiToken.token,
                ApiToken.app_id,
                ApiToken.last_used_at,
            ).where(ApiToken.account_id == account_id)
            api_token_rows = db.session.execute(api_token_stmt).all()

            api_token_list = []
            for row in api_token_rows:
                api_token_list.append(
                    {
                        "id": str(row.id),
                        "tenant_id": str(row.tenant_id),
                        "account_id": str(row.account_id),
                        "type": row.type,
                        "token": row.token,
                        "app_id": row.app_id,
                        "last_used_at": (
                            row.last_used_at.isoformat() if row.last_used_at else None
                        ),
                    }
                )

            # 3. 组装最终返回数据
            user_data = {
                "user_id": str(user_row.id),
                "email": user_row.email,
                "name": user_row.name,
                "timezone": user_row.timezone,
                "interface_language": user_row.interface_language,
                "created_at": (
                    user_row.created_at.isoformat() if user_row.created_at else None
                ),
                "tenant": tenant_row,
                "api_token_list": api_token_list,
            }
            return {"result": "success", "data": user_data}

        except Exception as e:
            logger.exception("Get user list failed")
            return {"result": "fail", "message": "Query failed"}, 500

# ===================== 创建 api key =====================
@console_ns.route("/generate/api_key")
class GenerateApiTokenApi(Resource):
    @setup_required
    @private_api_auth_required
    @console_ns.expect(console_ns.models[GenerateApiTokenPayload.__name__])
    def post(self):
        try:
            args = GenerateApiTokenPayload.model_validate(console_ns.payload)
        except ValidationError as ve:
            err_info = ve.errors()[0]
            field = ".".join(map(str, err_info["loc"]))
            msg = f"Parameter error: {field} - {err_info['msg']}"
            return {"result": "fail", "message": msg}, 403

        try:
            account_id = args.account_id.strip()
            tenant_id = args.tenant_id.strip()
            type = args.type
            app_id = args.app_id

            # 一个租户最多可以创建 10 个 api tokens(todo 要破掉)
            current_key_count = db.session.scalar(
                select(func.count(ApiToken.id)).where(ApiToken.tenant_id == tenant_id)
            )

            if current_key_count >= 10:
                logger.warning(
                    f"User {account_id} has reached maximum API key limit (10)"
                )
                return {"result": "fail", "message": "API key limit reached"}, 403

            # 2. 生成API Key(和Dify原生生成规则完全一致)
            token_prefix = "sk-"  # Dify原生用户API Key前缀
            key = ApiToken.generate_api_key(token_prefix, 24)

            # 3. 创建并保存API Token
            api_token = ApiToken()
            api_token.tenant_id = tenant_id
            api_token.account_id = account_id
            api_token.type = type
            api_token.token = key

            if app_id:
                api_token.app_id = app_id
            db.session.add(api_token)
            db.session.commit()

            return {"result": "success", "data": {"token": key}}

        except Exception as e:
            logger.exception("Get user list failed")
            return {"result": "fail", "message": "Query failed"}, 500

# ===================== 用户列表接口(含租户 + API Token) =====================
@console_ns.route("/user/list")
class UserListApi(Resource):
    @setup_required
    @private_api_auth_required
    def get(self):
        try:
            # 解析分页参数
            page = request.args.get("page", 1, type=int)
            page_size = request.args.get("page_size", 20, type=int)
            # 限制最大条数,防护大数据查询
            page_size = min(page_size, 100)
            offset = (page - 1) * page_size

            # 1. 查询用户总数
            total_stmt = select(func.count(Account.id))
            total = db.session.scalar(total_stmt) or 0

            # 2. 分页查询用户基础信息
            user_stmt = (
                select(
                    Account.id,
                    Account.email,
                    Account.name,
                    Account.timezone,
                    Account.interface_language,
                    Account.created_at,
                )
                .order_by(Account.created_at.desc())
                .offset(offset)
                .limit(page_size)
            )
            user_rows = db.session.execute(user_stmt).all()
            user_id_list = [row.id for row in user_rows]

            # 无用户直接返回空列表
            if not user_id_list:
                return {
                    "result": "success",
                    "data": {
                        "total": total,
                        "page": page,
                        "page_size": page_size,
                        "list": [],
                    },
                }

            # 3. 批量查询 用户-租户 关联信息(修复变量名笔误)
            tenant_stmt = (
                select(
                    TenantAccountJoin.account_id,
                    TenantAccountJoin.tenant_id,
                    TenantAccountJoin.role,
                    TenantAccountJoin.current,
                    Tenant.name.label("tenant_name"),
                    Tenant.created_at.label("tenant_created_at"),
                )
                .join(Tenant, TenantAccountJoin.tenant_id == Tenant.id)
                .where(TenantAccountJoin.account_id.in_(user_id_list))
            )
            # 此处原代码写错:tenant_rows → tenant_stmt
            tenant_rows = db.session.execute(tenant_stmt).all()

            # 按用户ID分组租户数据
            tenant_map = {}
            for row in tenant_rows:
                aid = row.account_id
                if aid not in tenant_map:
                    tenant_map[aid] = []
                tenant_map[aid].append(
                    {
                        "tenant_id": str(row.tenant_id),
                        "tenant_name": row.tenant_name,
                        "role": row.role,
                        "is_current": row.current,
                        "tenant_created_at": (
                            row.tenant_created_at.isoformat()
                            if row.tenant_created_at
                            else None
                        ),
                    }
                )

            # 5. 组装最终返回数据
            user_list = []
            for user in user_rows:
                user_id = user.id
                user_list.append(
                    {
                        "user_id": str(user_id),
                        "email": user.email,
                        "name": user.name,
                        "timezone": user.timezone,
                        "interface_language": user.interface_language,
                        "created_at": (
                            user.created_at.isoformat() if user.created_at else None
                        ),
                        "tenant_list": tenant_map.get(user_id, []),
                        # "api_token_list": token_map.get(user_id, []),
                    }
                )

            return {
                "result": "success",
                "data": {
                    "total": total,
                    "page": page,
                    "page_size": page_size,
                    "list": user_list,
                },
            }

        except Exception as e:
            logger.exception("Get user list failed")
            return {"result": "fail", "message": "Query failed"}, 500

并且将该文件注册进去。修改 api/controllers/console/init.py, 添加 "register" 就好。

然后看看最近这几个功能修改涉及到的文件。

最后,如果有需要可以去打一个镜像。

docker compose build api

阅读全文 >>

6月 15

虽然 dify 支持创建多个 api-token,也修改了 api_tokens 表,加上了 account_id 进行关联。但是,通过 api-token 转换成 current_user 的时候,这个 user 总是 tenant_account_joins 表里 role 字段是 owner 的用户。既然想给每个用户都有单独的 token,就要 current_user 能对应到每个用户身上。

api-key 到 account 的转换

从 post("/v1/datasets") 接口里,往回找,就可以找到用户绑定的地方:api\controllers\service_api\dataset\dataset.py

这里就有当前用户的获取: assert isinstance(current_user, Account)

assert isinstance(current_user, Account)

api\controllers\service_api\wraps.py 里,有用户查询相关的逻辑。

        tenant_account_join = db.session.execute(
            select(Tenant, TenantAccountJoin)
            .where(Tenant.id == api_token.tenant_id)
            .where(TenantAccountJoin.tenant_id == Tenant.id)
            .where(TenantAccountJoin.role.in_(["owner"]))
            .where(Tenant.status == TenantStatus.NORMAL)
        ).one_or_none()  # TODO: only owner information is required, so only one is returned.
        if tenant_account_join:
            tenant, ta = tenant_account_join
            account = db.session.get(Account, ta.account_id)
            # Login admin
            if account:
                account.current_tenant = tenant
                current_app.login_manager._update_request_context_with_user(account)  # type: ignore
                user_logged_in.send(current_app._get_current_object(), user=current_user)  # type: ignore
            else:
                raise Unauthorized("Tenant owner account does not exist.")

既然之前在 api_tokens 表中加了 account_id 字段,那么优先就应该从 api_tokens 表查询。如果这个表列没查出对应的 account_id, 再走原来的逻辑。

def validate_dataset_token[R](view: Callable[..., R]) -> Callable[..., R]:
    positional_parameters = [
        parameter
        for parameter in inspect.signature(view).parameters.values()
        if parameter.kind
        in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD)
    ]
    expects_bound_instance = bool(
        positional_parameters and positional_parameters[0].name in {"self", "cls"}
    )

    @wraps(view)
    def decorated(*args: object, **kwargs: object) -> R:
        api_token = validate_and_get_api_token("dataset")

        if api_token.account_id:
            account = db.session.get(Account, api_token.account_id)
            if not account:
                raise Unauthorized(
                    "By account_id. Token associated account does not exist."
                )

            tenant = db.session.get(Tenant, api_token.tenant_id)
            if not tenant or tenant.status == TenantStatus.ARCHIVE:
                raise Forbidden("Workspace is invalid or archived")

            account.current_tenant = tenant
            current_app.login_manager._update_request_context_with_user(account)  # type: ignore
            user_logged_in.send(current_app._get_current_object(), user=account)
        else:
            # Flask may pass URL path parameters positionally, so inspect both kwargs and args.
            dataset_id = kwargs.get("dataset_id")

            if not dataset_id and args:
                potential_id = args[0]
                try:
                    str_id = str(potential_id)
                    if len(str_id) == 36 and str_id.count("-") == 4:
                        dataset_id = str_id
                except Exception:
                    logger.exception("Failed to parse dataset_id from positional args")

            if dataset_id:
                dataset_id = str(dataset_id)
                dataset = db.session.scalar(
                    select(Dataset)
                    .where(
                        Dataset.id == dataset_id,
                        Dataset.tenant_id == api_token.tenant_id,
                    )
                    .limit(1)
                )
                if not dataset:
                    raise NotFound("Dataset not found.")
                if not dataset.enable_api:
                    raise Forbidden("Dataset api access is not enabled.")

            tenant_account_join = db.session.execute(
                select(Tenant, TenantAccountJoin)
                .where(Tenant.id == api_token.tenant_id)
                .where(TenantAccountJoin.tenant_id == Tenant.id)
                .where(TenantAccountJoin.role.in_(["owner"]))
                .where(Tenant.status == TenantStatus.NORMAL)
            ).one_or_none()  # TODO: only owner information is required, so only one is returned.
            if tenant_account_join:
                tenant, ta = tenant_account_join
                account = db.session.get(Account, ta.account_id)
                # Login admin
                if account:
                    account.current_tenant = tenant
                    current_app.login_manager._update_request_context_with_user(account)  # type: ignore
                    user_logged_in.send(current_app._get_current_object(), user=current_user)  # type: ignore
                else:
                    raise Unauthorized("Tenant owner account does not exist.")
            else:
                raise Unauthorized("Tenant does not exist.")

        if expects_bound_instance:
            if not args:
                raise TypeError(
                    "validate_dataset_token expected a bound resource instance."
                )
            return view(args[0], api_token.tenant_id, *args[1:], **kwargs)

        return view(api_token.tenant_id, *args, **kwargs)

    return decorated

只加了:

        if api_token.account_id:
            account = db.session.get(Account, api_token.account_id)
            if not account:
                raise Unauthorized(
                    "By account_id. Token associated account does not exist."
                )

            tenant = db.session.get(Tenant, api_token.tenant_id)
            if not tenant or tenant.status == TenantStatus.ARCHIVE:
                raise Forbidden("Workspace is invalid or archived")

            account.current_tenant = tenant
            current_app.login_manager._update_request_context_with_user(account)  # type: ignore
            user_logged_in.send(current_app._get_current_object(), user=account)

到这里看似是 ok 了。但是还是会报错。在 api_token = validate_and_get_api_token("dataset") 里,api_token 可能来自 ApiTokenCache。如果来自 ApiTokenCache 这里就会报错。在 ApiTokenCache 里根本就没有定要 account_id。如果不走缓存,之前在修改数据库表的时候,在模型中已经加入了 account_id 字段,自然不会报错。现在只需要在ApiTokenCache 中加入 account_id 即可。

# api\services\api_token_service.py

    id: str
    app_id: str | None
    tenant_id: str | None
    account_id: str | None
    type: str
    token: str
    last_used_at: datetime | None
    created_at: datetime | None

        cached = CachedApiToken(
            id=str(api_token.id),
            app_id=str(api_token.app_id) if api_token.app_id else None,
            tenant_id=str(api_token.tenant_id) if api_token.tenant_id else None,
            account_id=str(api_token.account_id) if api_token.account_id else None,
            type=api_token.type,
            token=api_token.token,
            last_used_at=api_token.last_used_at,
            created_at=api_token.created_at,
        )

阅读全文 >>

6月 15

dify 默认的数据库表,不一定是自己想要的。这个时候就需要取修改数据库表。修改数据库表最爽的方式就是用迁移文件。在 dify 原本的迁移目录下新增迁移文件会出现迁移版本错位的问题。比如你当前建立了一个迁移文件,过段时间拉 dify 官网代码合并后,迁移有改动,这个时候就会出现迁移版本对不上。为了解决这个问题,自定义个人迁移目录,新建迁移文件就很有必要了。

现在想给 api_tokens 表新增一个 account_id 字段。因为之前的 api_tokens 表只和 tenant 表进行关联,满足不了需求。

新建自定义迁移文件夹【该思路放弃】

修改 docker-compose.yaml 的 api 的配置。主要修改 volumes,因为默认是没有挂载目录,就会出现在容器中生成的文件在宿主机上没有。

api:
    volumes:
    - ../api:/app/api
    # 匿名卷:让 .venv 目录不被宿主机覆盖,保留镜像内的内容
    - /app/api/.venv

然后启动服务,开始干活。(从尝试到放弃)

# 启动 docker 服务
docker compose up -d

# 进入 api 容器
docker exec -it docker-api-1 bash

# 创建自定义迁移目录
flask db init --directory migrations_custom

# 查看官方迁移当前版本
flask db current

# 给自定义迁移库打基线(7bad07dc267d 是上一步查询出来的)
flask db stamp --directory migrations_custom 7bad07dc267d --purge

从尝试到放弃

想法是很美好,可是执行起来,各种问题就来了。所以,还是放弃自建迁移文件的想法了。既然迁移文件不好使,那就调用 sql 来达到实现修改字段的目的了。

在 api/extensions 目录下,新建 ext_database_custom.py 文件。代码入下。

# ext_database_custom.py
from sqlalchemy import text as sql_text
from extensions.ext_database import db
from dify_app import DifyApp

def init_custom_db_fields():
    """初始化自定义数据库字段/索引,启动自动执行"""
    try:
        with db.engine.connect() as conn:
            # 新增 account_id 字段
            conn.execute(
                sql_text(
                    "ALTER TABLE api_tokens ADD COLUMN IF NOT EXISTS account_id UUID;"
                )
            )
            # 新增联合索引
            conn.execute(
                sql_text(
                    "CREATE INDEX IF NOT EXISTS api_token_account_idx ON api_tokens(account_id, type);"
                )
            )
            conn.commit()
        print("[CustomDB] 自定义字段&索引初始化完成")
    except Exception as e:
        print(f"[CustomDB] 初始化异常: {str(e)}")

# 关键:实现 Dify 扩展标准接口 init_app
def init_app(app: DifyApp):
    # 这里要在 app 上下文里执行,否则 db 可能没绑定
    with app.app_context():
        init_custom_db_fields()

然后在 api/app_factory.py 的 initialize_extensions 函数中,引入注册插件。放在 ext_database 后边即可。

最后执行 docker compose restart 就可以了。

运行起来,查看数据库,会看到 在 api_tokens 表中已经有 account_id 字段了。

虽然数据库表里字段有了。但是模型这里也要加。

# api\models\model.py

class ApiToken(Base):  
    account_id = mapped_column(StringUUID, nullable=True)

阅读全文 >>

6月 15

拉取 dify 官方源代码构建服务。虽然能跑起来,正常运行。可是很多功能不是自己想要的,或者自己想要更多的功能。这个时候就需要修改源码。 dify 源码都提供了,修改后,重新构建镜像使用不就可以实现愿望了吗。

使用自定义镜像

修改 docker/docker-compose.yaml 文件,删掉官方的镜像,使用本地的镜像构建。前端 web 页面和 api 接口的配置修改。

  api:
    <<: *shared-api-worker-config
    image: user-dify-api:1.14.2
    user: root
    build:
      context: ..
      dockerfile: api/Dockerfile

  web:
    image: user-dify-web:1.14.2
    build:
      context: ..
      dockerfile: web/Dockerfile

这样就好了。如果需要增加环境变量,在 .env 和 .env.example 还有 docker-compose.yaml 中加入就好。 volumes 也看着办。

阅读全文 >>

5月 26

如果遇到安装插件失败。比如:

dfiy 安装插件失败,报错 failed to launch plugin: failed to install dependencies: failed to install dependencies: signal: killed, output: DEBUG Found workspace root: `/app/storage/cwd/langgenius/deepseek-0.0.15@725407927b04e236212083d20e92830d60fa944e42cd357ef6902c160414f6f1`
DEBUG Adding root workspace member: `/app/storage/cwd/langgenius/deepseek-0.0.15@725407927b04e236212083d20e92830d60fa944e42cd357ef6902c160414f6f1`
DEBUG Skipping `pyproject.toml` in `/app/storage/cwd/langgenius/deepseek-0.0.15@725407927b04e236212083d20e92830d6...multidict-6.7.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl
DEBUG Sending fresh GET request for: https://files.pythonhosted.org/packages/0d/e2/9baffdae21a76f77ef8447f1a05a96ec4bc0a24dae08767abc0a2fe680b8/multidict-6.7.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl
Downloading pydantic-core (2.0MiB)
Downloading gevent (2.0MiB)
Downloading tiktoken (1.1MiB)
init process exited due to no activity for 120 seconds
failed to init environment

可以将超时时间设置长一点,还有可以配置 python 镜像。

# .env 中,将默认的 120 秒改为 300秒
PLUGIN_PYTHON_ENV_INIT_TIMEOUT=600
PLUGIN_DAEMON_TIMEOUT=600.0

还可以设置镜像

# PIP_MIRROR_URL=https://mirrors.aliyun.com
PIP_MIRROR_URL=https://pypi.tuna.tsinghua.edu.cn/simple

阅读全文 >>

5月 09

MCP(模型上下文协议)是一个使大型语言模型(LLMs,如 Claude)能够与外部工具和数据源交互的协议。使用 MCP,您可以:

  • 构建为 LLMs 提供工具和数据的服务器
  • 将这些服务器连接到兼容 MCP 的客户端
  • 通过自定义功能扩展 LLM 的能力

中文文档

英文文档

阅读全文 >>

3月 26

运行 dify

下载好 dify 源码后,就可以使用 docker 开始运行 dify。

cd docker

cp .env.example .env

docker compose up -d

等待拉取镜像,构建服务,初始化。


如果宿主机的80,443 端口已经被使用了。请在 .env 中修改 dify 的本地映射端口为其他端口。比如:

EXPOSE_NGINX_PORT=8080
EXPOSE_NGINX_SSL_PORT=8443

这个时候,如果宿主机的 nginx 想访问 dify 的服务,可以用宿主机的 ip + dify 本地映射端口来访问。

通过 hostname -I 可以查看宿主机的 ip,第一个就是的。下边给一个 nginx 的示例。

# 映射 WebSocket 升级头
map $http_upgrade $connection_upgrade {
    default upgrade;
    '' close;
}

server {
    listen       80;
    server_name  xxx.yuepaibao.com;

    return 301 https://$host$request_uri;
}

server {
    listen       443 ssl;
    server_name  xxx.yuepaibao.com;

    charset utf-8;
    index  index.php index.html index.htm;

    ssl_certificate /etc/letsencrypt/live/xxx.yuepaibao.com/fullchain.pem; # managed by Certbot
    ssl_certificate_key /etc/letsencrypt/live/xxx.yuepaibao.com/privkey.pem; # managed by Certbot
    ssl_dhparam /etc/letsencrypt/ssl-dhparams.pem; # managed by Certbot
    ssl_session_timeout 5m;
    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;
    ssl_protocols TLSv1.1 TLSv1.2 TLSv1.3;
    ssl_prefer_server_ciphers on;
    add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
    add_header X-XSS-Protection "1; mode=block";
    add_header X-Frame-Options SAMEORIGIN always;
    add_header X-Content-Type-Options nosniff always;

    add_header X-Frame-Options "ALLOW-FROM https://www.yuepaibao.com";
    add_header Content-Security-Policy "frame-ancestors https://www.yuepaibao.com";

    location / {
    proxy_pass http://172.17.0.1:8080;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header X-Forwarded-Host $server_name;

        # WebSocket 支持(Dify 实时聊天必需)
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;

        # 超时配置:适配长连接
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
        proxy_connect_timeout 60s;
    }

    # API 路径:增大文件上传限制
    location /api {
        proxy_pass http://172.17.0.1:8080;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        client_max_body_size 100M;
    }

    location /xxabcxxx.txt {
        return 200 "33b454d18da17ebb845c9e046e2c4956";
    }

    error_page   500 502 503 504  /50x.html;
    location = /50x.html {
        root   /usr/share/nginx/html;
    }

    access_log  /var/log/nginx/xxx.yuepaibao.com.log  main;
    error_log  /var/log/nginx/xxx.yuepaibao.com.error.log  warn;
}

使用 docker compose ps -a 可以查看服务的情况。其中 docker-init_permissions-1 只是在启动的时候调用一下,然后就退出了。

配置管理员账号

运行起来后,输入 http://localhost/ 会进入安装配置管理员界面。设置好邮箱,账号和密码确定好就可以。

开始使用

创建知识库

然后开始配置知识库。

Dify 里的知识库检索必须用 Embedding 模型,作用是:把文字变成向量 → 让系统能计算相似度 → 实现文档检索、问答匹配。

知识库 embedding 模型选择,可以选择使用云端,也可以使用本地的。都是以插件的方式进行安装,配置和使用。

1 选择云端。

模型 优点 缺点
OpenAI text-embedding-3-small 效果顶级、稳定、Dify 原生支持 需要付费 API Key
阿里通义 Embedding 国内速度快、便宜 需要阿里云账号
百度千帆 Embedding 国内稳定、免费额度高 需要百度智能云
智谱 GLM Embedding 中文效果好 需要 API Key

先配置云端插件。

插件已经下载安装,还需要配置。去对应的官网获取 apikey 信息,配置进去。

然后添加知识库文件,配置 embedding 模型,继续下去。

参考和相关

https://blog.csdn.net/weixin_28931449/article/details/156266015

阅读全文 >>

3月 23

dify 官方仓库https://github.com/langgenius/dify/

下载 dify 和建立 git 仓库

# 先 clone 最新版本,体积小,不容易出错
git clone --depth 1 https://github.com/langgenius/dify.git

# 更新完整历史
git fetch --unshallow

# 添加官方仓库为 upstream(用于同步更新)
git remote add upstream https://github.com/langgenius/dify.git

# 移除默认的 origin,添加自己的 git 仓库
git remote remove origin
git remote add origin git@github.com:xxxx/dify.git

建立自己的项目分支

假如你的项目叫 mimo

# 创建 mimo 分支。后期修改都在这个分支做。
git checkout -b mimo

# 将代码推送到远程服务器上
git push -u origin mimo

开发日常

# 切换到 mimo 分支
git checkout mimo

# 修改代码后,提交代码
git add .
git commit -m '修改 ui界面,替换 logo 等'

# 推送到服务端
git push

同步更新

# 切换到 main 分支
git checkout main

# 拉取官方最新的代码
git pull upstream main

# 推送更新后的 main 分支
git push origin main

# 切换到项目分支
git checkout mimo

# 合并 main 分支
git merge main

阅读全文 >>