dashboard-nanobot/backend/providers/bot_workspace_provider.py

271 lines
11 KiB
Python
Raw Permalink Normal View History

2026-04-14 02:04:12 +00:00
from __future__ import annotations
import json
import os
from typing import Any, Dict, List
_PROVIDER_ALIAS_MAP = {
"aliyun": "dashscope",
"qwen": "dashscope",
"aliyun-qwen": "dashscope",
"moonshot": "kimi",
"xunfei": "openai",
"iflytek": "openai",
"xfyun": "openai",
"vllm": "openai",
}
_MANAGED_WORKSPACE_FILES = ("AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md")
def _require_text(raw: Any, *, field: str) -> str:
value = str(raw if raw is not None else "").strip()
if not value:
raise RuntimeError(f"Missing required bot runtime field: {field}")
return value
def _normalize_markdown_text(raw: Any, *, field: str) -> str:
if raw is None:
raise RuntimeError(f"Missing required workspace markdown field: {field}")
return str(raw).replace("\r\n", "\n").strip() + "\n"
def _normalize_provider_name(raw_provider_name: str) -> tuple[str, str]:
normalized = raw_provider_name.strip().lower()
if not normalized:
raise RuntimeError("Missing required bot runtime field: llm_provider")
canonical = _PROVIDER_ALIAS_MAP.get(normalized, normalized)
return normalized, canonical
def _normalize_allow_from(raw: Any) -> List[str]:
rows: List[str] = []
if isinstance(raw, list):
for item in raw:
text = str(item or "").strip()
if text and text not in rows:
rows.append(text)
return rows or ["*"]
def _normalize_extra_config(raw: Any) -> Dict[str, Any]:
if raw is None:
return {}
if not isinstance(raw, dict):
raise RuntimeError("Channel extra_config must be an object")
return dict(raw)
def _write_json_atomic(path: str, payload: Dict[str, Any]) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp_path = f"{path}.tmp"
with open(tmp_path, "w", encoding="utf-8") as file:
json.dump(payload, file, ensure_ascii=False, indent=2)
os.replace(tmp_path, path)
def _write_text_atomic(path: str, content: str) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp_path = f"{path}.tmp"
with open(tmp_path, "w", encoding="utf-8") as file:
file.write(content)
os.replace(tmp_path, path)
class BotWorkspaceProvider:
def __init__(self, host_data_root: str):
self.host_data_root = host_data_root
def write_workspace(self, bot_id: str, bot_data: Dict[str, Any], channels: List[Dict[str, Any]]) -> str:
raw_provider_name, provider_name = _normalize_provider_name(_require_text(bot_data.get("llm_provider"), field="llm_provider"))
model_name = _require_text(bot_data.get("llm_model"), field="llm_model")
api_key = _require_text(bot_data.get("api_key"), field="api_key")
api_base = _require_text(bot_data.get("api_base"), field="api_base")
temperature = float(bot_data.get("temperature"))
top_p = float(bot_data.get("top_p"))
max_tokens = int(bot_data.get("max_tokens"))
send_progress = bool(bot_data.get("send_progress"))
send_tool_hints = bool(bot_data.get("send_tool_hints"))
bot_root = os.path.join(self.host_data_root, bot_id)
dot_nanobot_dir = os.path.join(bot_root, ".nanobot")
workspace_dir = os.path.join(dot_nanobot_dir, "workspace")
memory_dir = os.path.join(workspace_dir, "memory")
skills_dir = os.path.join(workspace_dir, "skills")
for path in (dot_nanobot_dir, workspace_dir, memory_dir, skills_dir):
os.makedirs(path, exist_ok=True)
provider_cfg: Dict[str, Any] = {
"apiKey": api_key,
"apiBase": api_base,
}
if raw_provider_name in {"xunfei", "iflytek", "xfyun", "vllm"}:
provider_cfg["dashboardProviderAlias"] = raw_provider_name
effective_model_name = model_name
if provider_name == "openai" and raw_provider_name in {"xunfei", "iflytek", "xfyun"} and "/" not in model_name:
effective_model_name = f"openai/{model_name}"
config_data: Dict[str, Any] = {
"agents": {
"defaults": {
"model": effective_model_name,
"temperature": temperature,
"topP": top_p,
"maxTokens": max_tokens,
}
},
"providers": {
provider_name: provider_cfg,
},
"channels": {
"sendProgress": send_progress,
"sendToolHints": send_tool_hints,
"dashboard": {
"enabled": True,
"host": "0.0.0.0",
"port": 9000,
"allowFrom": ["*"],
},
},
}
mcp_servers = bot_data.get("mcp_servers")
if mcp_servers is not None:
if not isinstance(mcp_servers, dict):
raise RuntimeError("mcp_servers must be an object")
config_data["tools"] = {"mcpServers": mcp_servers}
channels_cfg = config_data["channels"]
for channel in channels:
channel_type = str(channel.get("channel_type") or "").strip().lower()
if not channel_type or channel_type == "dashboard":
continue
extra = _normalize_extra_config(channel.get("extra_config"))
enabled = bool(channel.get("is_active"))
external_app_id = str(channel.get("external_app_id") or "").strip()
app_secret = str(channel.get("app_secret") or "").strip()
if channel_type == "telegram":
channels_cfg["telegram"] = {
"enabled": enabled,
"token": app_secret,
"proxy": str(extra.get("proxy") or "").strip(),
"replyToMessage": bool(extra.get("replyToMessage")),
"allowFrom": _normalize_allow_from(extra.get("allowFrom")),
}
continue
if channel_type == "feishu":
channels_cfg["feishu"] = {
"enabled": enabled,
"appId": external_app_id,
"appSecret": app_secret,
"encryptKey": str(extra.get("encryptKey") or "").strip(),
"verificationToken": str(extra.get("verificationToken") or "").strip(),
"allowFrom": _normalize_allow_from(extra.get("allowFrom")),
}
continue
if channel_type == "dingtalk":
channels_cfg["dingtalk"] = {
"enabled": enabled,
"clientId": external_app_id,
"clientSecret": app_secret,
"allowFrom": _normalize_allow_from(extra.get("allowFrom")),
}
continue
if channel_type == "slack":
channels_cfg["slack"] = {
"enabled": enabled,
"mode": str(extra.get("mode") or "socket"),
"botToken": external_app_id,
"appToken": app_secret,
"replyInThread": bool(extra.get("replyInThread", True)),
"groupPolicy": str(extra.get("groupPolicy") or "mention"),
"groupAllowFrom": extra.get("groupAllowFrom") if isinstance(extra.get("groupAllowFrom"), list) else [],
"reactEmoji": str(extra.get("reactEmoji") or "eyes"),
}
continue
if channel_type == "qq":
channels_cfg["qq"] = {
"enabled": enabled,
"appId": external_app_id,
"secret": app_secret,
"allowFrom": _normalize_allow_from(extra.get("allowFrom")),
}
continue
if channel_type == "weixin":
weixin_cfg: Dict[str, Any] = {
"enabled": enabled,
"allowFrom": _normalize_allow_from(extra.get("allowFrom")),
}
route_tag = str(extra.get("routeTag") or "").strip()
if route_tag:
weixin_cfg["routeTag"] = route_tag
state_dir = str(extra.get("stateDir") or "").strip()
if state_dir:
weixin_cfg["stateDir"] = state_dir
base_url = str(extra.get("baseUrl") or "").strip()
if base_url:
weixin_cfg["baseUrl"] = base_url
cdn_base_url = str(extra.get("cdnBaseUrl") or "").strip()
if cdn_base_url:
weixin_cfg["cdnBaseUrl"] = cdn_base_url
poll_timeout = extra.get("pollTimeout", extra.get("poll_timeout"))
if poll_timeout not in {None, ""}:
weixin_cfg["pollTimeout"] = max(1, int(poll_timeout))
channels_cfg["weixin"] = weixin_cfg
continue
if channel_type == "email":
channels_cfg["email"] = {
"enabled": enabled,
"consentGranted": bool(extra.get("consentGranted")),
"imapHost": str(extra.get("imapHost") or "").strip(),
"imapPort": max(1, min(int(extra.get("imapPort") or 993), 65535)),
"imapUsername": str(extra.get("imapUsername") or "").strip(),
"imapPassword": str(extra.get("imapPassword") or "").strip(),
"imapMailbox": str(extra.get("imapMailbox") or "INBOX"),
"imapUseSsl": bool(extra.get("imapUseSsl", True)),
"smtpHost": str(extra.get("smtpHost") or "").strip(),
"smtpPort": max(1, min(int(extra.get("smtpPort") or 587), 65535)),
"smtpUsername": str(extra.get("smtpUsername") or "").strip(),
"smtpPassword": str(extra.get("smtpPassword") or "").strip(),
"smtpUseTls": bool(extra.get("smtpUseTls", True)),
"smtpUseSsl": bool(extra.get("smtpUseSsl")),
"fromAddress": str(extra.get("fromAddress") or "").strip(),
"autoReplyEnabled": bool(extra.get("autoReplyEnabled", True)),
"pollIntervalSeconds": max(5, int(extra.get("pollIntervalSeconds") or 30)),
"markSeen": bool(extra.get("markSeen", True)),
"maxBodyChars": max(1, int(extra.get("maxBodyChars") or 12000)),
"subjectPrefix": str(extra.get("subjectPrefix") or "Re: "),
"allowFrom": _normalize_allow_from(extra.get("allowFrom")),
}
continue
channels_cfg[channel_type] = {
"enabled": enabled,
"appId": external_app_id,
"appSecret": app_secret,
**extra,
}
_write_json_atomic(os.path.join(dot_nanobot_dir, "config.json"), config_data)
workspace_files = {
"AGENTS.md": _normalize_markdown_text(bot_data.get("agents_md"), field="agents_md"),
"SOUL.md": _normalize_markdown_text(bot_data.get("soul_md"), field="soul_md"),
"USER.md": _normalize_markdown_text(bot_data.get("user_md"), field="user_md"),
"TOOLS.md": _normalize_markdown_text(bot_data.get("tools_md"), field="tools_md"),
"IDENTITY.md": _normalize_markdown_text(bot_data.get("identity_md"), field="identity_md"),
}
for filename in _MANAGED_WORKSPACE_FILES:
_write_text_atomic(os.path.join(workspace_dir, filename), workspace_files[filename])
return dot_nanobot_dir