from __future__ import annotations import json import os from typing import Any, Dict, List _PROVIDER_ALIAS_MAP = { "aliyun": "dashscope", "qwen": "dashscope", "aliyun-qwen": "dashscope", "moonshot": "kimi", "xunfei": "openai", "iflytek": "openai", "xfyun": "openai", "vllm": "openai", } _MANAGED_WORKSPACE_FILES = ("AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md") def _require_text(raw: Any, *, field: str) -> str: value = str(raw if raw is not None else "").strip() if not value: raise RuntimeError(f"Missing required bot runtime field: {field}") return value def _normalize_markdown_text(raw: Any, *, field: str) -> str: if raw is None: raise RuntimeError(f"Missing required workspace markdown field: {field}") return str(raw).replace("\r\n", "\n").strip() + "\n" def _normalize_provider_name(raw_provider_name: str) -> tuple[str, str]: normalized = raw_provider_name.strip().lower() if not normalized: raise RuntimeError("Missing required bot runtime field: llm_provider") canonical = _PROVIDER_ALIAS_MAP.get(normalized, normalized) return normalized, canonical def _normalize_allow_from(raw: Any) -> List[str]: rows: List[str] = [] if isinstance(raw, list): for item in raw: text = str(item or "").strip() if text and text not in rows: rows.append(text) return rows or ["*"] def _normalize_extra_config(raw: Any) -> Dict[str, Any]: if raw is None: return {} if not isinstance(raw, dict): raise RuntimeError("Channel extra_config must be an object") return dict(raw) def _write_json_atomic(path: str, payload: Dict[str, Any]) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) tmp_path = f"{path}.tmp" with open(tmp_path, "w", encoding="utf-8") as file: json.dump(payload, file, ensure_ascii=False, indent=2) os.replace(tmp_path, path) def _write_text_atomic(path: str, content: str) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) tmp_path = f"{path}.tmp" with open(tmp_path, "w", encoding="utf-8") as file: file.write(content) os.replace(tmp_path, path) class BotWorkspaceProvider: def __init__(self, host_data_root: str): self.host_data_root = host_data_root def write_workspace(self, bot_id: str, bot_data: Dict[str, Any], channels: List[Dict[str, Any]]) -> str: raw_provider_name, provider_name = _normalize_provider_name(_require_text(bot_data.get("llm_provider"), field="llm_provider")) model_name = _require_text(bot_data.get("llm_model"), field="llm_model") api_key = _require_text(bot_data.get("api_key"), field="api_key") api_base = _require_text(bot_data.get("api_base"), field="api_base") temperature = float(bot_data.get("temperature")) top_p = float(bot_data.get("top_p")) max_tokens = int(bot_data.get("max_tokens")) send_progress = bool(bot_data.get("send_progress")) send_tool_hints = bool(bot_data.get("send_tool_hints")) bot_root = os.path.join(self.host_data_root, bot_id) dot_nanobot_dir = os.path.join(bot_root, ".nanobot") workspace_dir = os.path.join(dot_nanobot_dir, "workspace") memory_dir = os.path.join(workspace_dir, "memory") skills_dir = os.path.join(workspace_dir, "skills") for path in (dot_nanobot_dir, workspace_dir, memory_dir, skills_dir): os.makedirs(path, exist_ok=True) provider_cfg: Dict[str, Any] = { "apiKey": api_key, "apiBase": api_base, } if raw_provider_name in {"xunfei", "iflytek", "xfyun", "vllm"}: provider_cfg["dashboardProviderAlias"] = raw_provider_name effective_model_name = model_name if provider_name == "openai" and raw_provider_name in {"xunfei", "iflytek", "xfyun"} and "/" not in model_name: effective_model_name = f"openai/{model_name}" config_data: Dict[str, Any] = { "agents": { "defaults": { "model": effective_model_name, "temperature": temperature, "topP": top_p, "maxTokens": max_tokens, } }, "providers": { provider_name: provider_cfg, }, "channels": { "sendProgress": send_progress, "sendToolHints": send_tool_hints, "dashboard": { "enabled": True, "host": "0.0.0.0", "port": 9000, "allowFrom": ["*"], }, }, } mcp_servers = bot_data.get("mcp_servers") if mcp_servers is not None: if not isinstance(mcp_servers, dict): raise RuntimeError("mcp_servers must be an object") config_data["tools"] = {"mcpServers": mcp_servers} channels_cfg = config_data["channels"] for channel in channels: channel_type = str(channel.get("channel_type") or "").strip().lower() if not channel_type or channel_type == "dashboard": continue extra = _normalize_extra_config(channel.get("extra_config")) enabled = bool(channel.get("is_active")) external_app_id = str(channel.get("external_app_id") or "").strip() app_secret = str(channel.get("app_secret") or "").strip() if channel_type == "telegram": channels_cfg["telegram"] = { "enabled": enabled, "token": app_secret, "proxy": str(extra.get("proxy") or "").strip(), "replyToMessage": bool(extra.get("replyToMessage")), "allowFrom": _normalize_allow_from(extra.get("allowFrom")), } continue if channel_type == "feishu": channels_cfg["feishu"] = { "enabled": enabled, "appId": external_app_id, "appSecret": app_secret, "encryptKey": str(extra.get("encryptKey") or "").strip(), "verificationToken": str(extra.get("verificationToken") or "").strip(), "allowFrom": _normalize_allow_from(extra.get("allowFrom")), } continue if channel_type == "dingtalk": channels_cfg["dingtalk"] = { "enabled": enabled, "clientId": external_app_id, "clientSecret": app_secret, "allowFrom": _normalize_allow_from(extra.get("allowFrom")), } continue if channel_type == "slack": channels_cfg["slack"] = { "enabled": enabled, "mode": str(extra.get("mode") or "socket"), "botToken": external_app_id, "appToken": app_secret, "replyInThread": bool(extra.get("replyInThread", True)), "groupPolicy": str(extra.get("groupPolicy") or "mention"), "groupAllowFrom": extra.get("groupAllowFrom") if isinstance(extra.get("groupAllowFrom"), list) else [], "reactEmoji": str(extra.get("reactEmoji") or "eyes"), } continue if channel_type == "qq": channels_cfg["qq"] = { "enabled": enabled, "appId": external_app_id, "secret": app_secret, "allowFrom": _normalize_allow_from(extra.get("allowFrom")), } continue if channel_type == "weixin": weixin_cfg: Dict[str, Any] = { "enabled": enabled, "allowFrom": _normalize_allow_from(extra.get("allowFrom")), } route_tag = str(extra.get("routeTag") or "").strip() if route_tag: weixin_cfg["routeTag"] = route_tag state_dir = str(extra.get("stateDir") or "").strip() if state_dir: weixin_cfg["stateDir"] = state_dir base_url = str(extra.get("baseUrl") or "").strip() if base_url: weixin_cfg["baseUrl"] = base_url cdn_base_url = str(extra.get("cdnBaseUrl") or "").strip() if cdn_base_url: weixin_cfg["cdnBaseUrl"] = cdn_base_url poll_timeout = extra.get("pollTimeout", extra.get("poll_timeout")) if poll_timeout not in {None, ""}: weixin_cfg["pollTimeout"] = max(1, int(poll_timeout)) channels_cfg["weixin"] = weixin_cfg continue if channel_type == "email": channels_cfg["email"] = { "enabled": enabled, "consentGranted": bool(extra.get("consentGranted")), "imapHost": str(extra.get("imapHost") or "").strip(), "imapPort": max(1, min(int(extra.get("imapPort") or 993), 65535)), "imapUsername": str(extra.get("imapUsername") or "").strip(), "imapPassword": str(extra.get("imapPassword") or "").strip(), "imapMailbox": str(extra.get("imapMailbox") or "INBOX"), "imapUseSsl": bool(extra.get("imapUseSsl", True)), "smtpHost": str(extra.get("smtpHost") or "").strip(), "smtpPort": max(1, min(int(extra.get("smtpPort") or 587), 65535)), "smtpUsername": str(extra.get("smtpUsername") or "").strip(), "smtpPassword": str(extra.get("smtpPassword") or "").strip(), "smtpUseTls": bool(extra.get("smtpUseTls", True)), "smtpUseSsl": bool(extra.get("smtpUseSsl")), "fromAddress": str(extra.get("fromAddress") or "").strip(), "autoReplyEnabled": bool(extra.get("autoReplyEnabled", True)), "pollIntervalSeconds": max(5, int(extra.get("pollIntervalSeconds") or 30)), "markSeen": bool(extra.get("markSeen", True)), "maxBodyChars": max(1, int(extra.get("maxBodyChars") or 12000)), "subjectPrefix": str(extra.get("subjectPrefix") or "Re: "), "allowFrom": _normalize_allow_from(extra.get("allowFrom")), } continue channels_cfg[channel_type] = { "enabled": enabled, "appId": external_app_id, "appSecret": app_secret, **extra, } _write_json_atomic(os.path.join(dot_nanobot_dir, "config.json"), config_data) workspace_files = { "AGENTS.md": _normalize_markdown_text(bot_data.get("agents_md"), field="agents_md"), "SOUL.md": _normalize_markdown_text(bot_data.get("soul_md"), field="soul_md"), "USER.md": _normalize_markdown_text(bot_data.get("user_md"), field="user_md"), "TOOLS.md": _normalize_markdown_text(bot_data.get("tools_md"), field="tools_md"), "IDENTITY.md": _normalize_markdown_text(bot_data.get("identity_md"), field="identity_md"), } for filename in _MANAGED_WORKSPACE_FILES: _write_text_atomic(os.path.join(workspace_dir, filename), workspace_files[filename]) return dot_nanobot_dir