import json import os import re from typing import Any, Callable, Dict, List, Optional from zoneinfo import ZoneInfo import httpx from fastapi import HTTPException from sqlmodel import Session from clients.edge.errors import log_edge_failure from clients.edge.http import HttpEdgeClient from core.config_manager import BotConfigManager from models.bot import BotInstance from providers.target import ProviderTarget from services.node_registry_service import ManagedNode, NodeRegistryService ReadEnvStore = Callable[[str], Dict[str, str]] ReadBotRuntimeSnapshot = Callable[[BotInstance], Dict[str, Any]] NormalizeMediaList = Callable[[Any, str], List[str]] NormalizeProviderTarget = Callable[..., ProviderTarget] ProviderTargetFromConfig = Callable[..., ProviderTarget] ProviderTargetToDict = Callable[[ProviderTarget], Dict[str, Any]] ResolveProviderBundleKey = Callable[[ProviderTarget], Optional[str]] GetProvisionProvider = Callable[[Any, BotInstance], Any] class BotInfraService: _ENV_KEY_RE = re.compile(r"^[A-Z_][A-Z0-9_]{0,127}$") def __init__( self, *, app: Any, engine: Any, config_manager: BotConfigManager, node_registry_service: NodeRegistryService, logger: Any, bots_workspace_root: str, default_soul_md: str, default_agents_md: str, default_user_md: str, default_tools_md: str, default_identity_md: str, default_bot_system_timezone: str, normalize_provider_target: NormalizeProviderTarget, provider_target_from_config: ProviderTargetFromConfig, provider_target_to_dict: ProviderTargetToDict, resolve_provider_bundle_key: ResolveProviderBundleKey, get_provision_provider: GetProvisionProvider, read_env_store: ReadEnvStore, read_bot_runtime_snapshot: ReadBotRuntimeSnapshot, normalize_media_list: NormalizeMediaList, ) -> None: self._app = app self._engine = engine self._config_manager = config_manager self._node_registry_service = node_registry_service self._logger = logger self._bots_workspace_root = bots_workspace_root self._default_soul_md = default_soul_md self._default_agents_md = default_agents_md self._default_user_md = default_user_md self._default_tools_md = default_tools_md self._default_identity_md = default_identity_md self._default_bot_system_timezone = default_bot_system_timezone self._normalize_provider_target = normalize_provider_target self._provider_target_from_config = provider_target_from_config self._provider_target_to_dict = provider_target_to_dict self._resolve_provider_bundle_key = resolve_provider_bundle_key self._get_provision_provider = get_provision_provider self._read_env_store = read_env_store self._read_bot_runtime_snapshot = read_bot_runtime_snapshot self._normalize_media_list = normalize_media_list self._provider_target_overrides: Dict[str, ProviderTarget] = {} def config_json_path(self, bot_id: str) -> str: return os.path.join(self.bot_data_root(bot_id), "config.json") def read_bot_config(self, bot_id: str) -> Dict[str, Any]: if self.resolve_edge_state_context(bot_id) is not None: data = self.read_edge_state_data(bot_id=bot_id, state_key="config", default_payload={}) return data if isinstance(data, dict) else {} path = self.config_json_path(bot_id) if not os.path.isfile(path): return {} try: with open(path, "r", encoding="utf-8") as file: data = json.load(file) return data if isinstance(data, dict) else {} except Exception: return {} def write_bot_config(self, bot_id: str, config_data: Dict[str, Any]) -> None: normalized = dict(config_data if isinstance(config_data, dict) else {}) if self.write_edge_state_data(bot_id=bot_id, state_key="config", data=normalized): return path = self.config_json_path(bot_id) os.makedirs(os.path.dirname(path), exist_ok=True) tmp_path = f"{path}.tmp" with open(tmp_path, "w", encoding="utf-8") as file: json.dump(normalized, file, ensure_ascii=False, indent=2) os.replace(tmp_path, path) def default_provider_target(self) -> ProviderTarget: return self._normalize_provider_target( { "node_id": getattr(self._app.state, "provider_default_node_id", None), "transport_kind": getattr(self._app.state, "provider_default_transport_kind", None), "runtime_kind": getattr(self._app.state, "provider_default_runtime_kind", None), "core_adapter": getattr(self._app.state, "provider_default_core_adapter", None), }, fallback=ProviderTarget(), ) def read_bot_provider_target( self, bot_id: str, config_data: Optional[Dict[str, Any]] = None, ) -> ProviderTarget: normalized_bot_id = str(bot_id or "").strip() if normalized_bot_id and normalized_bot_id in self._provider_target_overrides: return self._provider_target_overrides[normalized_bot_id] if normalized_bot_id: with Session(self._engine) as session: bot = session.get(BotInstance, normalized_bot_id) if bot is not None: return self._normalize_provider_target( { "node_id": getattr(bot, "node_id", None), "transport_kind": getattr(bot, "transport_kind", None), "runtime_kind": getattr(bot, "runtime_kind", None), "core_adapter": getattr(bot, "core_adapter", None), }, fallback=self.default_provider_target(), ) raw_config = config_data if isinstance(config_data, dict) else self.read_bot_config(bot_id) return self._provider_target_from_config(raw_config, fallback=self.default_provider_target()) def resolve_bot_provider_target_for_instance(self, bot: BotInstance) -> ProviderTarget: normalized_bot_id = str(getattr(bot, "id", "") or "").strip() if normalized_bot_id and normalized_bot_id in self._provider_target_overrides: return self._provider_target_overrides[normalized_bot_id] inline_values = { "node_id": getattr(bot, "node_id", None), "transport_kind": getattr(bot, "transport_kind", None), "runtime_kind": getattr(bot, "runtime_kind", None), "core_adapter": getattr(bot, "core_adapter", None), } if any(str(value or "").strip() for value in inline_values.values()): return self._normalize_provider_target(inline_values, fallback=self.default_provider_target()) return self.read_bot_provider_target(str(bot.id or "")) def set_provider_target_override(self, bot_id: str, target: ProviderTarget) -> None: normalized_bot_id = str(bot_id or "").strip() if not normalized_bot_id: return self._provider_target_overrides[normalized_bot_id] = target def clear_provider_target_override(self, bot_id: str) -> None: normalized_bot_id = str(bot_id or "").strip() if not normalized_bot_id: return self._provider_target_overrides.pop(normalized_bot_id, None) def clear_provider_target_overrides(self) -> None: self._provider_target_overrides.clear() def apply_provider_target_to_bot(self, bot: BotInstance, target: ProviderTarget) -> None: bot.node_id = target.node_id bot.transport_kind = target.transport_kind bot.runtime_kind = target.runtime_kind bot.core_adapter = target.core_adapter def local_managed_node(self) -> ManagedNode: return ManagedNode( node_id="local", display_name="Local Node", base_url=str(os.getenv("LOCAL_EDGE_BASE_URL", "http://127.0.0.1:8010") or "http://127.0.0.1:8010").strip(), enabled=True, auth_token=str(os.getenv("EDGE_AUTH_TOKEN", "") or "").strip(), metadata={ "transport_kind": "edge", "runtime_kind": "docker", "core_adapter": "nanobot", "workspace_root": str( os.getenv("EDGE_WORKSPACE_ROOT", os.getenv("EDGE_BOTS_WORKSPACE_ROOT", "")) or "" ).strip(), "native_command": str(os.getenv("EDGE_NATIVE_COMMAND", "") or "").strip(), "native_workdir": str(os.getenv("EDGE_NATIVE_WORKDIR", "") or "").strip(), "native_sandbox_mode": str(os.getenv("EDGE_NATIVE_SANDBOX_MODE", "inherit") or "inherit").strip().lower(), }, ) def provider_target_from_node(self, node_id: Optional[str]) -> Optional[ProviderTarget]: normalized = str(node_id or "").strip().lower() if not normalized: return None node = self._node_registry_service.get_node(normalized) if node is None: return None metadata = dict(node.metadata or {}) return ProviderTarget( node_id=node.node_id, transport_kind=str(metadata.get("transport_kind") or "edge").strip().lower() or "edge", runtime_kind=str(metadata.get("runtime_kind") or "docker").strip().lower() or "docker", core_adapter=str(metadata.get("core_adapter") or "nanobot").strip().lower() or "nanobot", ) def node_display_name(self, node_id: str) -> str: node = self._node_registry_service.get_node(node_id) if node is not None: return str(node.display_name or node.node_id or node_id).strip() or str(node_id or "").strip() return str(node_id or "").strip() def node_metadata(self, node_id: str) -> Dict[str, Any]: node = self._node_registry_service.get_node(node_id) if node is None: return {} return dict(node.metadata or {}) def serialize_provider_target_summary(self, target: ProviderTarget) -> Dict[str, Any]: return { **self._provider_target_to_dict(target), "node_display_name": self.node_display_name(target.node_id), } def resolve_edge_client(self, target: ProviderTarget) -> HttpEdgeClient: try: node = self._node_registry_service.require_node(target.node_id) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc return HttpEdgeClient( node=node, http_client_factory=lambda: httpx.Client(timeout=15.0, trust_env=False), async_http_client_factory=lambda: httpx.AsyncClient(timeout=15.0, trust_env=False), ) def resolve_edge_state_context(self, bot_id: str) -> Optional[tuple[HttpEdgeClient, Optional[str], str]]: normalized_bot_id = str(bot_id or "").strip() if not normalized_bot_id: return None with Session(self._engine) as session: bot = session.get(BotInstance, normalized_bot_id) if bot is None: return None target = self.resolve_bot_provider_target_for_instance(bot) if str(target.transport_kind or "").strip().lower() != "edge": return None client = self.resolve_edge_client(target) metadata = self.node_metadata(target.node_id) workspace_root = str(metadata.get("workspace_root") or "").strip() or None return client, workspace_root, target.node_id def read_edge_state_data( self, *, bot_id: str, state_key: str, default_payload: Dict[str, Any], ) -> Dict[str, Any]: context = self.resolve_edge_state_context(bot_id) if context is None: return dict(default_payload) client, workspace_root, node_id = context try: payload = client.read_state( bot_id=bot_id, state_key=state_key, workspace_root=workspace_root, ) except Exception as exc: log_edge_failure( self._logger, key=f"edge-state-read:{node_id}:{bot_id}:{state_key}", exc=exc, message=f"Failed to read edge state for bot_id={bot_id}, state_key={state_key}", ) return dict(default_payload) data = payload.get("data") if isinstance(data, dict): return dict(data) return dict(default_payload) def write_edge_state_data( self, *, bot_id: str, state_key: str, data: Dict[str, Any], ) -> bool: context = self.resolve_edge_state_context(bot_id) if context is None: return False client, workspace_root, node_id = context try: client.write_state( bot_id=bot_id, state_key=state_key, data=dict(data or {}), workspace_root=workspace_root, ) except Exception as exc: log_edge_failure( self._logger, key=f"edge-state-write:{node_id}:{bot_id}:{state_key}", exc=exc, message=f"Failed to write edge state for bot_id={bot_id}, state_key={state_key}", ) raise return True def resources_json_path(self, bot_id: str) -> str: return os.path.join(self.bot_data_root(bot_id), "resources.json") def write_bot_resources(self, bot_id: str, cpu_cores: Any, memory_mb: Any, storage_gb: Any) -> None: normalized = self.normalize_resource_limits(cpu_cores, memory_mb, storage_gb) payload = { "cpuCores": normalized["cpu_cores"], "memoryMB": normalized["memory_mb"], "storageGB": normalized["storage_gb"], } if self.write_edge_state_data(bot_id=bot_id, state_key="resources", data=payload): return path = self.resources_json_path(bot_id) os.makedirs(os.path.dirname(path), exist_ok=True) tmp_path = f"{path}.tmp" with open(tmp_path, "w", encoding="utf-8") as file: json.dump(payload, file, ensure_ascii=False, indent=2) os.replace(tmp_path, path) def read_legacy_resource_values( self, bot_id: str, config_data: Optional[Dict[str, Any]] = None, ) -> tuple[Any, Any, Any]: cpu_raw: Any = None memory_raw: Any = None storage_raw: Any = None path = self.resources_json_path(bot_id) if os.path.isfile(path): try: with open(path, "r", encoding="utf-8") as file: data = json.load(file) if isinstance(data, dict): cpu_raw = data.get("cpuCores", data.get("cpu_cores")) memory_raw = data.get("memoryMB", data.get("memory_mb")) storage_raw = data.get("storageGB", data.get("storage_gb")) except Exception: pass if cpu_raw is None or memory_raw is None or storage_raw is None: cfg = config_data if isinstance(config_data, dict) else self.read_bot_config(bot_id) runtime_cfg = cfg.get("runtime") if isinstance(runtime_cfg, dict): resources_raw = runtime_cfg.get("resources") if isinstance(resources_raw, dict): if cpu_raw is None: cpu_raw = resources_raw.get("cpuCores", resources_raw.get("cpu_cores")) if memory_raw is None: memory_raw = resources_raw.get("memoryMB", resources_raw.get("memory_mb")) if storage_raw is None: storage_raw = resources_raw.get("storageGB", resources_raw.get("storage_gb")) return cpu_raw, memory_raw, storage_raw def read_bot_resources(self, bot_id: str, config_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: edge_context = self.resolve_edge_state_context(bot_id) cpu_raw: Any = None memory_raw: Any = None storage_raw: Any = None if edge_context is not None: data = self.read_edge_state_data( bot_id=bot_id, state_key="resources", default_payload={}, ) cpu_raw = data.get("cpuCores", data.get("cpu_cores")) memory_raw = data.get("memoryMB", data.get("memory_mb")) storage_raw = data.get("storageGB", data.get("storage_gb")) if cpu_raw is None or memory_raw is None or storage_raw is None: legacy_cpu, legacy_memory, legacy_storage = self.read_legacy_resource_values( bot_id, config_data=config_data, ) if cpu_raw is None: cpu_raw = legacy_cpu if memory_raw is None: memory_raw = legacy_memory if storage_raw is None: storage_raw = legacy_storage return self.normalize_resource_limits(cpu_raw, memory_raw, storage_raw) cpu_raw, memory_raw, storage_raw = self.read_legacy_resource_values(bot_id, config_data=config_data) return self.normalize_resource_limits(cpu_raw, memory_raw, storage_raw) def migrate_bot_resources_store(self, bot_id: str) -> None: if self.resolve_edge_state_context(bot_id) is not None: return config_data = self.read_bot_config(bot_id) runtime_cfg = config_data.get("runtime") resources_raw: Dict[str, Any] = {} if isinstance(runtime_cfg, dict): legacy_raw = runtime_cfg.get("resources") if isinstance(legacy_raw, dict): resources_raw = legacy_raw path = self.resources_json_path(bot_id) if not os.path.isfile(path): self.write_bot_resources( bot_id, resources_raw.get("cpuCores", resources_raw.get("cpu_cores")), resources_raw.get("memoryMB", resources_raw.get("memory_mb")), resources_raw.get("storageGB", resources_raw.get("storage_gb")), ) if isinstance(runtime_cfg, dict) and "resources" in runtime_cfg: runtime_cfg.pop("resources", None) if not runtime_cfg: config_data.pop("runtime", None) self.write_bot_config(bot_id, config_data) @staticmethod def normalize_channel_extra(raw: Any) -> Dict[str, Any]: if not isinstance(raw, dict): return {} return raw @staticmethod def normalize_allow_from(raw: Any) -> List[str]: rows: List[str] = [] if isinstance(raw, list): for item in raw: text = str(item or "").strip() if text and text not in rows: rows.append(text) if not rows: return ["*"] return rows def read_global_delivery_flags(self, channels_cfg: Any) -> tuple[bool, bool]: if not isinstance(channels_cfg, dict): return False, False send_progress = channels_cfg.get("sendProgress") send_tool_hints = channels_cfg.get("sendToolHints") dashboard_cfg = channels_cfg.get("dashboard") if isinstance(dashboard_cfg, dict): if send_progress is None and "sendProgress" in dashboard_cfg: send_progress = dashboard_cfg.get("sendProgress") if send_tool_hints is None and "sendToolHints" in dashboard_cfg: send_tool_hints = dashboard_cfg.get("sendToolHints") return bool(send_progress), bool(send_tool_hints) def channel_cfg_to_api_dict(self, bot_id: str, ctype: str, cfg: Dict[str, Any]) -> Dict[str, Any]: normalized_type = str(ctype or "").strip().lower() enabled = bool(cfg.get("enabled", True)) port = max(1, min(int(cfg.get("port", 8080) or 8080), 65535)) extra: Dict[str, Any] = {} external_app_id = "" app_secret = "" if normalized_type == "feishu": external_app_id = str(cfg.get("appId") or "") app_secret = str(cfg.get("appSecret") or "") extra = { "encryptKey": cfg.get("encryptKey", ""), "verificationToken": cfg.get("verificationToken", ""), "allowFrom": self.normalize_allow_from(cfg.get("allowFrom", [])), } elif normalized_type == "dingtalk": external_app_id = str(cfg.get("clientId") or "") app_secret = str(cfg.get("clientSecret") or "") extra = {"allowFrom": self.normalize_allow_from(cfg.get("allowFrom", []))} elif normalized_type == "telegram": app_secret = str(cfg.get("token") or "") extra = { "proxy": cfg.get("proxy", ""), "replyToMessage": bool(cfg.get("replyToMessage", False)), "allowFrom": self.normalize_allow_from(cfg.get("allowFrom", [])), } elif normalized_type == "slack": external_app_id = str(cfg.get("botToken") or "") app_secret = str(cfg.get("appToken") or "") extra = { "mode": cfg.get("mode", "socket"), "replyInThread": bool(cfg.get("replyInThread", True)), "groupPolicy": cfg.get("groupPolicy", "mention"), "groupAllowFrom": cfg.get("groupAllowFrom", []), "reactEmoji": cfg.get("reactEmoji", "eyes"), } elif normalized_type == "qq": external_app_id = str(cfg.get("appId") or "") app_secret = str(cfg.get("secret") or "") extra = {"allowFrom": self.normalize_allow_from(cfg.get("allowFrom", []))} elif normalized_type == "email": extra = { "consentGranted": bool(cfg.get("consentGranted", False)), "imapHost": str(cfg.get("imapHost") or ""), "imapPort": int(cfg.get("imapPort") or 993), "imapUsername": str(cfg.get("imapUsername") or ""), "imapPassword": str(cfg.get("imapPassword") or ""), "imapMailbox": str(cfg.get("imapMailbox") or "INBOX"), "imapUseSsl": bool(cfg.get("imapUseSsl", True)), "smtpHost": str(cfg.get("smtpHost") or ""), "smtpPort": int(cfg.get("smtpPort") or 587), "smtpUsername": str(cfg.get("smtpUsername") or ""), "smtpPassword": str(cfg.get("smtpPassword") or ""), "smtpUseTls": bool(cfg.get("smtpUseTls", True)), "smtpUseSsl": bool(cfg.get("smtpUseSsl", False)), "fromAddress": str(cfg.get("fromAddress") or ""), "autoReplyEnabled": bool(cfg.get("autoReplyEnabled", True)), "pollIntervalSeconds": int(cfg.get("pollIntervalSeconds") or 30), "markSeen": bool(cfg.get("markSeen", True)), "maxBodyChars": int(cfg.get("maxBodyChars") or 12000), "subjectPrefix": str(cfg.get("subjectPrefix") or "Re: "), "allowFrom": self.normalize_allow_from(cfg.get("allowFrom", [])), } else: external_app_id = str( cfg.get("appId") or cfg.get("clientId") or cfg.get("botToken") or cfg.get("externalAppId") or "" ) app_secret = str( cfg.get("appSecret") or cfg.get("clientSecret") or cfg.get("secret") or cfg.get("token") or cfg.get("appToken") or "" ) extra = { key: value for key, value in cfg.items() if key not in { "enabled", "port", "appId", "clientId", "botToken", "externalAppId", "appSecret", "clientSecret", "secret", "token", "appToken", } } return { "id": normalized_type, "bot_id": bot_id, "channel_type": normalized_type, "external_app_id": external_app_id, "app_secret": app_secret, "internal_port": port, "is_active": enabled, "extra_config": extra, "locked": normalized_type == "dashboard", } def channel_api_to_cfg(self, row: Dict[str, Any]) -> Dict[str, Any]: ctype = str(row.get("channel_type") or "").strip().lower() enabled = bool(row.get("is_active", True)) extra = self.normalize_channel_extra(row.get("extra_config")) external_app_id = str(row.get("external_app_id") or "") app_secret = str(row.get("app_secret") or "") port = max(1, min(int(row.get("internal_port") or 8080), 65535)) if ctype == "feishu": return { "enabled": enabled, "appId": external_app_id, "appSecret": app_secret, "encryptKey": extra.get("encryptKey", ""), "verificationToken": extra.get("verificationToken", ""), "allowFrom": self.normalize_allow_from(extra.get("allowFrom", [])), } if ctype == "dingtalk": return { "enabled": enabled, "clientId": external_app_id, "clientSecret": app_secret, "allowFrom": self.normalize_allow_from(extra.get("allowFrom", [])), } if ctype == "telegram": return { "enabled": enabled, "token": app_secret, "proxy": extra.get("proxy", ""), "replyToMessage": bool(extra.get("replyToMessage", False)), "allowFrom": self.normalize_allow_from(extra.get("allowFrom", [])), } if ctype == "slack": return { "enabled": enabled, "mode": extra.get("mode", "socket"), "botToken": external_app_id, "appToken": app_secret, "replyInThread": bool(extra.get("replyInThread", True)), "groupPolicy": extra.get("groupPolicy", "mention"), "groupAllowFrom": extra.get("groupAllowFrom", []), "reactEmoji": extra.get("reactEmoji", "eyes"), } if ctype == "qq": return { "enabled": enabled, "appId": external_app_id, "secret": app_secret, "allowFrom": self.normalize_allow_from(extra.get("allowFrom", [])), } if ctype == "email": return { "enabled": enabled, "consentGranted": bool(extra.get("consentGranted", False)), "imapHost": str(extra.get("imapHost") or ""), "imapPort": max(1, min(int(extra.get("imapPort") or 993), 65535)), "imapUsername": str(extra.get("imapUsername") or ""), "imapPassword": str(extra.get("imapPassword") or ""), "imapMailbox": str(extra.get("imapMailbox") or "INBOX"), "imapUseSsl": bool(extra.get("imapUseSsl", True)), "smtpHost": str(extra.get("smtpHost") or ""), "smtpPort": max(1, min(int(extra.get("smtpPort") or 587), 65535)), "smtpUsername": str(extra.get("smtpUsername") or ""), "smtpPassword": str(extra.get("smtpPassword") or ""), "smtpUseTls": bool(extra.get("smtpUseTls", True)), "smtpUseSsl": bool(extra.get("smtpUseSsl", False)), "fromAddress": str(extra.get("fromAddress") or ""), "autoReplyEnabled": bool(extra.get("autoReplyEnabled", True)), "pollIntervalSeconds": max(5, int(extra.get("pollIntervalSeconds") or 30)), "markSeen": bool(extra.get("markSeen", True)), "maxBodyChars": max(1, int(extra.get("maxBodyChars") or 12000)), "subjectPrefix": str(extra.get("subjectPrefix") or "Re: "), "allowFrom": self.normalize_allow_from(extra.get("allowFrom", [])), } merged = dict(extra) merged.update( { "enabled": enabled, "appId": external_app_id, "appSecret": app_secret, "port": port, } ) return merged def get_bot_channels_from_config(self, bot: BotInstance) -> List[Dict[str, Any]]: config_data = self.read_bot_config(bot.id) channels_cfg = config_data.get("channels") if not isinstance(channels_cfg, dict): channels_cfg = {} send_progress, send_tool_hints = self.read_global_delivery_flags(channels_cfg) rows: List[Dict[str, Any]] = [ { "id": "dashboard", "bot_id": bot.id, "channel_type": "dashboard", "external_app_id": f"dashboard-{bot.id}", "app_secret": "", "internal_port": 9000, "is_active": True, "extra_config": { "sendProgress": send_progress, "sendToolHints": send_tool_hints, }, "locked": True, } ] for ctype, cfg in channels_cfg.items(): if ctype in {"sendProgress", "sendToolHints", "dashboard"}: continue if not isinstance(cfg, dict): continue rows.append(self.channel_cfg_to_api_dict(bot.id, ctype, cfg)) return rows def normalize_initial_channels(self, bot_id: str, channels: Optional[List[Any]]) -> List[Dict[str, Any]]: rows: List[Dict[str, Any]] = [] seen_types: set[str] = set() for channel in channels or []: ctype = str(getattr(channel, "channel_type", "") or "").strip().lower() if not ctype or ctype == "dashboard" or ctype in seen_types: continue seen_types.add(ctype) rows.append( { "id": ctype, "bot_id": bot_id, "channel_type": ctype, "external_app_id": str(getattr(channel, "external_app_id", "") or "").strip() or f"{ctype}-{bot_id}", "app_secret": str(getattr(channel, "app_secret", "") or "").strip(), "internal_port": max(1, min(int(getattr(channel, "internal_port", 8080) or 8080), 65535)), "is_active": bool(getattr(channel, "is_active", True)), "extra_config": self.normalize_channel_extra(getattr(channel, "extra_config", None)), "locked": False, } ) return rows def parse_message_media(self, bot_id: str, media_raw: Optional[str]) -> List[str]: if not media_raw: return [] try: parsed = json.loads(media_raw) return self._normalize_media_list(parsed, bot_id) except Exception: return [] def normalize_env_params(self, raw: Any) -> Dict[str, str]: if not isinstance(raw, dict): return {} rows: Dict[str, str] = {} for key, value in raw.items(): normalized_key = str(key or "").strip().upper() if not normalized_key or not self._ENV_KEY_RE.fullmatch(normalized_key): continue rows[normalized_key] = str(value or "").strip() return rows def get_default_system_timezone(self) -> str: value = str(self._default_bot_system_timezone or "").strip() or "Asia/Shanghai" try: ZoneInfo(value) return value except Exception: return "Asia/Shanghai" def normalize_system_timezone(self, raw: Any) -> str: value = str(raw or "").strip() if not value: return self.get_default_system_timezone() try: ZoneInfo(value) except Exception as exc: raise ValueError("Invalid system timezone. Use an IANA timezone such as Asia/Shanghai.") from exc return value def resolve_bot_env_params(self, bot_id: str, raw: Optional[Dict[str, str]] = None) -> Dict[str, str]: env_params = self.normalize_env_params(raw if isinstance(raw, dict) else self._read_env_store(bot_id)) try: env_params["TZ"] = self.normalize_system_timezone(env_params.get("TZ")) except ValueError: env_params["TZ"] = self.get_default_system_timezone() return env_params def parse_env_params(self, raw: Any) -> Dict[str, str]: return self.normalize_env_params(raw) @staticmethod def safe_float(raw: Any, default: float) -> float: try: return float(raw) except Exception: return default @staticmethod def safe_int(raw: Any, default: int) -> int: try: return int(raw) except Exception: return default def normalize_resource_limits(self, cpu_cores: Any, memory_mb: Any, storage_gb: Any) -> Dict[str, Any]: cpu = self.safe_float(cpu_cores, 1.0) memory = self.safe_int(memory_mb, 1024) storage = self.safe_int(storage_gb, 10) if cpu < 0: cpu = 1.0 if memory < 0: memory = 1024 if storage < 0: storage = 10 normalized_cpu = 0.0 if cpu == 0 else min(16.0, max(0.1, cpu)) normalized_memory = 0 if memory == 0 else min(65536, max(256, memory)) normalized_storage = 0 if storage == 0 else min(1024, max(1, storage)) return { "cpu_cores": normalized_cpu, "memory_mb": normalized_memory, "storage_gb": normalized_storage, } def sync_workspace_channels( self, session: Session, bot_id: str, channels_override: Optional[List[Dict[str, Any]]] = None, global_delivery_override: Optional[Dict[str, Any]] = None, runtime_overrides: Optional[Dict[str, Any]] = None, ) -> None: bot = session.get(BotInstance, bot_id) if not bot: return snapshot = self._read_bot_runtime_snapshot(bot) default_target = self.default_provider_target() bot_data: Dict[str, Any] = { "name": bot.name, "node_id": snapshot.get("node_id") or default_target.node_id, "transport_kind": snapshot.get("transport_kind") or default_target.transport_kind, "runtime_kind": snapshot.get("runtime_kind") or default_target.runtime_kind, "core_adapter": snapshot.get("core_adapter") or default_target.core_adapter, "system_prompt": snapshot.get("system_prompt") or self._default_soul_md, "soul_md": snapshot.get("soul_md") or self._default_soul_md, "agents_md": snapshot.get("agents_md") or self._default_agents_md, "user_md": snapshot.get("user_md") or self._default_user_md, "tools_md": snapshot.get("tools_md") or self._default_tools_md, "identity_md": snapshot.get("identity_md") or self._default_identity_md, "llm_provider": snapshot.get("llm_provider") or "dashscope", "llm_model": snapshot.get("llm_model") or "", "api_key": snapshot.get("api_key") or "", "api_base": snapshot.get("api_base") or "", "temperature": self.safe_float(snapshot.get("temperature"), 0.2), "top_p": self.safe_float(snapshot.get("top_p"), 1.0), "max_tokens": self.safe_int(snapshot.get("max_tokens"), 8192), "cpu_cores": self.safe_float(snapshot.get("cpu_cores"), 1.0), "memory_mb": self.safe_int(snapshot.get("memory_mb"), 1024), "storage_gb": self.safe_int(snapshot.get("storage_gb"), 10), "send_progress": bool(snapshot.get("send_progress")), "send_tool_hints": bool(snapshot.get("send_tool_hints")), } if isinstance(runtime_overrides, dict): for key, value in runtime_overrides.items(): if key in {"api_key", "llm_provider", "llm_model"}: text = str(value or "").strip() if not text: continue bot_data[key] = text continue if key == "api_base": bot_data[key] = str(value or "").strip() continue bot_data[key] = value resources = self.normalize_resource_limits( bot_data.get("cpu_cores"), bot_data.get("memory_mb"), bot_data.get("storage_gb"), ) bot_data["cpu_cores"] = resources["cpu_cores"] bot_data["memory_mb"] = resources["memory_mb"] bot_data["storage_gb"] = resources["storage_gb"] send_progress = bool(bot_data.get("send_progress", False)) send_tool_hints = bool(bot_data.get("send_tool_hints", False)) if isinstance(global_delivery_override, dict): if "sendProgress" in global_delivery_override: send_progress = bool(global_delivery_override.get("sendProgress")) if "sendToolHints" in global_delivery_override: send_tool_hints = bool(global_delivery_override.get("sendToolHints")) channels_data = channels_override if channels_override is not None else self.get_bot_channels_from_config(bot) bot_data["send_progress"] = send_progress bot_data["send_tool_hints"] = send_tool_hints normalized_channels: List[Dict[str, Any]] = [] for row in channels_data: ctype = str(row.get("channel_type") or "").strip().lower() if not ctype or ctype == "dashboard": continue normalized_channels.append( { "channel_type": ctype, "external_app_id": str(row.get("external_app_id") or ""), "app_secret": str(row.get("app_secret") or ""), "internal_port": max(1, min(int(row.get("internal_port") or 8080), 65535)), "is_active": bool(row.get("is_active", True)), "extra_config": self.normalize_channel_extra(row.get("extra_config")), } ) self._config_manager.update_workspace( bot_id=bot_id, bot_data=bot_data, channels=normalized_channels, ) self.write_bot_resources( bot_id, bot_data.get("cpu_cores"), bot_data.get("memory_mb"), bot_data.get("storage_gb"), ) def set_bot_provider_target(self, bot_id: str, target: ProviderTarget) -> None: self.set_provider_target_override(bot_id, target) def sync_bot_workspace_via_provider( self, session: Session, bot: BotInstance, *, target_override: Optional[ProviderTarget] = None, channels_override: Optional[List[Dict[str, Any]]] = None, global_delivery_override: Optional[Dict[str, Any]] = None, runtime_overrides: Optional[Dict[str, Any]] = None, ) -> None: bot_id = str(bot.id or "") previous_override = self._provider_target_overrides.get(bot_id) wrote_target = False try: if target_override is not None: self.set_bot_provider_target(bot_id, target_override) wrote_target = True self._get_provision_provider(self._app.state, bot).sync_bot_workspace( session=session, bot_id=bot_id, channels_override=channels_override, global_delivery_override=global_delivery_override, runtime_overrides=runtime_overrides, ) except Exception: if wrote_target: if previous_override is not None: self.set_provider_target_override(bot_id, previous_override) else: self.clear_provider_target_override(bot_id) raise def workspace_root(self, bot_id: str) -> str: return os.path.abspath(os.path.join(self._bots_workspace_root, bot_id, ".nanobot", "workspace")) def bot_data_root(self, bot_id: str) -> str: return os.path.abspath(os.path.join(self._bots_workspace_root, bot_id, ".nanobot")) def cron_store_path(self, bot_id: str) -> str: return os.path.join(self.bot_data_root(bot_id), "cron", "jobs.json") def env_store_path(self, bot_id: str) -> str: return os.path.join(self.bot_data_root(bot_id), "env.json") def sessions_root(self, bot_id: str) -> str: return os.path.join(self.workspace_root(bot_id), "sessions") def clear_bot_sessions(self, bot_id: str) -> int: edge_context = self.resolve_edge_state_context(bot_id) if edge_context is not None: client, workspace_root, node_id = edge_context try: payload = client.list_tree( bot_id=bot_id, path="sessions", recursive=True, workspace_root=workspace_root, ) except Exception as exc: log_edge_failure( self._logger, key=f"sessions-clear-list:{node_id}:{bot_id}", exc=exc, message=f"Failed to list edge session files for bot_id={bot_id}", ) return 0 deleted = 0 for entry in list(payload.get("entries") or []): if not isinstance(entry, dict): continue if str(entry.get("type") or "").strip().lower() != "file": continue rel_path = str(entry.get("path") or "").strip().replace("\\", "/") if not rel_path.lower().startswith("sessions/") or not rel_path.lower().endswith(".jsonl"): continue try: result = client.delete_workspace_path( bot_id=bot_id, path=rel_path, workspace_root=workspace_root, ) if bool(result.get("deleted")): deleted += 1 except Exception as exc: log_edge_failure( self._logger, key=f"sessions-clear-delete:{node_id}:{bot_id}:{rel_path}", exc=exc, message=f"Failed to delete edge session file for bot_id={bot_id}, path={rel_path}", ) return deleted root = self.sessions_root(bot_id) if not os.path.isdir(root): return 0 deleted = 0 for name in os.listdir(root): path = os.path.join(root, name) if not os.path.isfile(path): continue if not name.lower().endswith(".jsonl"): continue try: os.remove(path) deleted += 1 except Exception: continue return deleted def clear_bot_dashboard_direct_session(self, bot_id: str) -> Dict[str, Any]: edge_context = self.resolve_edge_state_context(bot_id) if edge_context is not None: client, workspace_root, node_id = edge_context path = "sessions/dashboard_direct.jsonl" existed = False try: payload = client.list_tree( bot_id=bot_id, path="sessions", recursive=False, workspace_root=workspace_root, ) existed = any( isinstance(entry, dict) and str(entry.get("type") or "").strip().lower() == "file" and str(entry.get("path") or "").strip().replace("\\", "/") == path for entry in list(payload.get("entries") or []) ) except Exception as exc: log_edge_failure( self._logger, key=f"dashboard-session-check:{node_id}:{bot_id}", exc=exc, message=f"Failed to inspect edge dashboard session file for bot_id={bot_id}", ) try: client.write_text_file( bot_id=bot_id, path=path, content="", workspace_root=workspace_root, ) except Exception as exc: log_edge_failure( self._logger, key=f"dashboard-session-clear:{node_id}:{bot_id}", exc=exc, message=f"Failed to truncate edge dashboard session file for bot_id={bot_id}", ) raise return {"path": path, "existed": existed} root = self.sessions_root(bot_id) os.makedirs(root, exist_ok=True) path = os.path.join(root, "dashboard_direct.jsonl") existed = os.path.exists(path) with open(path, "w", encoding="utf-8"): pass return {"path": path, "existed": existed} def resolve_workspace_path(self, bot_id: str, rel_path: Optional[str] = None) -> tuple[str, str]: root = self.workspace_root(bot_id) rel = str(rel_path or "").strip().replace("\\", "/") target = os.path.abspath(os.path.join(root, rel)) if os.path.commonpath([root, target]) != root: raise HTTPException(status_code=400, detail="invalid workspace path") return root, target @staticmethod def calc_dir_size_bytes(path: str) -> int: total = 0 if not os.path.exists(path): return 0 for root, _, files in os.walk(path): for filename in files: try: file_path = os.path.join(root, filename) if os.path.islink(file_path): continue total += os.path.getsize(file_path) except Exception: continue return max(0, total) @staticmethod def is_image_attachment_path(path: str) -> bool: lower = str(path or "").strip().lower() return lower.endswith(".png") or lower.endswith(".jpg") or lower.endswith(".jpeg") or lower.endswith(".webp") @staticmethod def is_video_attachment_path(path: str) -> bool: lower = str(path or "").strip().lower() return ( lower.endswith(".mp4") or lower.endswith(".mov") or lower.endswith(".m4v") or lower.endswith(".webm") or lower.endswith(".mkv") or lower.endswith(".avi") ) def is_visual_attachment_path(self, path: str) -> bool: return self.is_image_attachment_path(path) or self.is_video_attachment_path(path) def ensure_provider_target_supported(self, target: ProviderTarget) -> None: key = self._resolve_provider_bundle_key(target) if key is None: raise HTTPException(status_code=400, detail=f"Execution target is not supported yet: {target.key}")