import os from datetime import datetime from typing import Any, Dict, Optional from fastapi import HTTPException from sqlmodel import Session from core.docker_instance import docker_manager from core.settings import BOTS_WORKSPACE_ROOT from models.bot import BotInstance from schemas.bot import ( BotEnvParamsUpdateRequest, BotMcpConfigUpdateRequest, ChannelConfigRequest, ChannelConfigUpdateRequest, ) from services.bot_service import ( channel_api_to_config, list_bot_channels_from_config, normalize_channel_extra, read_global_delivery_flags, sync_bot_workspace_channels, ) from services.bot_mcp_service import ( _merge_mcp_servers_preserving_extras, _normalize_mcp_servers, ) from services.bot_storage_service import ( get_bot_resource_limits, get_bot_workspace_snapshot, normalize_bot_env_params, read_bot_config_data, read_bot_env_params, write_bot_config_data, write_bot_env_params, ) from services.cache_service import _invalidate_bot_detail_cache MANAGED_WORKSPACE_FILENAMES = ("AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md") def _get_bot_or_404(session: Session, bot_id: str) -> BotInstance: bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") return bot def _read_bot_config_object(bot_id: str) -> Dict[str, Any]: config_data = read_bot_config_data(bot_id) return config_data if isinstance(config_data, dict) else {} def _read_bot_tools_cfg(bot_id: str) -> tuple[Dict[str, Any], Dict[str, Any]]: config_data = _read_bot_config_object(bot_id) tools_cfg = config_data.get("tools") if not isinstance(tools_cfg, dict): tools_cfg = {} config_data["tools"] = tools_cfg return config_data, tools_cfg def _read_bot_channels_cfg(bot_id: str) -> tuple[Dict[str, Any], Dict[str, Any]]: config_data = _read_bot_config_object(bot_id) channels_cfg = config_data.get("channels") if not isinstance(channels_cfg, dict): channels_cfg = {} config_data["channels"] = channels_cfg return config_data, channels_cfg def _managed_bot_file_paths(bot_id: str) -> Dict[str, str]: bot_root = os.path.join(BOTS_WORKSPACE_ROOT, bot_id, ".nanobot") workspace_root = os.path.join(bot_root, "workspace") paths = { "config": os.path.join(bot_root, "config.json"), "resources": os.path.join(bot_root, "resources.json"), } for filename in MANAGED_WORKSPACE_FILENAMES: paths[f"workspace:{filename}"] = os.path.join(workspace_root, filename) return paths def _snapshot_managed_bot_files(bot_id: str) -> Dict[str, Optional[bytes]]: snapshot: Dict[str, Optional[bytes]] = {} for key, path in _managed_bot_file_paths(bot_id).items(): if os.path.isfile(path): with open(path, "rb") as file: snapshot[key] = file.read() else: snapshot[key] = None return snapshot def _restore_managed_bot_files(bot_id: str, snapshot: Dict[str, Optional[bytes]]) -> None: for key, path in _managed_bot_file_paths(bot_id).items(): payload = snapshot.get(key) if payload is None: if os.path.exists(path): os.remove(path) continue os.makedirs(os.path.dirname(path), exist_ok=True) tmp_path = f"{path}.tmp" with open(tmp_path, "wb") as file: file.write(payload) os.replace(tmp_path, path) def _write_bot_config_state( session: Session, *, bot_id: str, config_data: Dict[str, Any], sync_workspace: bool = False, ) -> None: managed_file_snapshot = _snapshot_managed_bot_files(bot_id) if sync_workspace else None try: write_bot_config_data(bot_id, config_data) if sync_workspace: sync_bot_workspace_channels(session, bot_id) except Exception: if managed_file_snapshot is not None: _restore_managed_bot_files(bot_id, managed_file_snapshot) session.rollback() raise _invalidate_bot_detail_cache(bot_id) def _find_channel_row(rows: list[Dict[str, Any]], channel_id: str) -> Dict[str, Any]: channel_key = str(channel_id or "").strip().lower() row = next((item for item in rows if str(item.get("id") or "").lower() == channel_key), None) if not row: raise HTTPException(status_code=404, detail="Channel not found") return row def get_bot_resources_snapshot(session: Session, *, bot_id: str) -> Dict[str, Any]: bot = _get_bot_or_404(session, bot_id) configured = get_bot_resource_limits(bot_id) runtime = docker_manager.get_bot_resource_snapshot(bot_id) workspace = get_bot_workspace_snapshot(bot_id) workspace_root = str(workspace.get("path") or "") workspace_bytes = int(workspace.get("usage_bytes") or 0) configured_storage_bytes = int(workspace.get("configured_limit_bytes") or 0) workspace_percent = 0.0 if configured_storage_bytes > 0: workspace_percent = (workspace_bytes / configured_storage_bytes) * 100.0 limits = runtime.get("limits") or {} cpu_limited = (limits.get("cpu_cores") or 0) > 0 memory_limited = (limits.get("memory_bytes") or 0) > 0 storage_limited = bool(limits.get("storage_bytes")) or bool(limits.get("storage_opt_raw")) return { "bot_id": bot_id, "docker_status": runtime.get("docker_status") or bot.docker_status, "configured": configured, "runtime": runtime, "workspace": { "path": workspace_root, "usage_bytes": workspace_bytes, "configured_limit_bytes": configured_storage_bytes if configured_storage_bytes > 0 else None, "usage_percent": max(0.0, workspace_percent), }, "enforcement": { "cpu_limited": cpu_limited, "memory_limited": memory_limited, "storage_limited": storage_limited, }, "note": ( "Resource value 0 means unlimited. CPU/Memory limits come from Docker HostConfig and are enforced by cgroup. " "Storage limit depends on Docker storage driver support." ), "collected_at": datetime.utcnow().isoformat() + "Z", } def list_bot_channels_config(session: Session, *, bot_id: str): bot = _get_bot_or_404(session, bot_id) return list_bot_channels_from_config(bot) def get_bot_tools_config_state(session: Session, *, bot_id: str) -> Dict[str, Any]: _get_bot_or_404(session, bot_id) return { "bot_id": bot_id, "tools_config": {}, "managed_by_dashboard": False, "hint": "Tools config is disabled in dashboard. Configure tool-related env vars manually.", } def reject_bot_tools_config_update( session: Session, *, bot_id: str, payload: Any, ) -> None: _get_bot_or_404(session, bot_id) raise HTTPException( status_code=400, detail="Tools config is no longer managed by dashboard. Please set required env vars manually.", ) def get_bot_mcp_config_state(session: Session, *, bot_id: str) -> Dict[str, Any]: _get_bot_or_404(session, bot_id) _config_data, tools_cfg = _read_bot_tools_cfg(bot_id) mcp_servers = _normalize_mcp_servers(tools_cfg.get("mcpServers")) return { "bot_id": bot_id, "mcp_servers": mcp_servers, "locked_servers": [], "restart_required": True, } def update_bot_mcp_config_state( session: Session, *, bot_id: str, payload: BotMcpConfigUpdateRequest, ) -> Dict[str, Any]: _get_bot_or_404(session, bot_id) config_data, tools_cfg = _read_bot_tools_cfg(bot_id) normalized_mcp_servers = _normalize_mcp_servers(payload.mcp_servers or {}) current_mcp_servers = tools_cfg.get("mcpServers") merged_mcp_servers = _merge_mcp_servers_preserving_extras(current_mcp_servers, normalized_mcp_servers) tools_cfg["mcpServers"] = merged_mcp_servers sanitized_after_save = _normalize_mcp_servers(tools_cfg.get("mcpServers")) _write_bot_config_state(session, bot_id=bot_id, config_data=config_data) return { "status": "updated", "bot_id": bot_id, "mcp_servers": sanitized_after_save, "locked_servers": [], "restart_required": True, } def get_bot_env_params_state(session: Session, *, bot_id: str) -> Dict[str, Any]: _get_bot_or_404(session, bot_id) return { "bot_id": bot_id, "env_params": read_bot_env_params(bot_id), } def update_bot_env_params_state( session: Session, *, bot_id: str, payload: BotEnvParamsUpdateRequest, ) -> Dict[str, Any]: _get_bot_or_404(session, bot_id) normalized = normalize_bot_env_params(payload.env_params) write_bot_env_params(bot_id, normalized) _invalidate_bot_detail_cache(bot_id) return { "status": "updated", "bot_id": bot_id, "env_params": normalized, "restart_required": True, } def create_bot_channel_config( session: Session, *, bot_id: str, payload: ChannelConfigRequest, ) -> Dict[str, Any]: bot = _get_bot_or_404(session, bot_id) ctype = (payload.channel_type or "").strip().lower() if not ctype: raise HTTPException(status_code=400, detail="channel_type is required") if ctype == "dashboard": raise HTTPException(status_code=400, detail="dashboard channel is built-in and cannot be created manually") current_rows = list_bot_channels_from_config(bot) if any(str(row.get("channel_type") or "").lower() == ctype for row in current_rows): raise HTTPException(status_code=400, detail=f"Channel already exists: {ctype}") new_row = { "id": ctype, "bot_id": bot_id, "channel_type": ctype, "external_app_id": (payload.external_app_id or "").strip() or f"{ctype}-{bot_id}", "app_secret": (payload.app_secret or "").strip(), "internal_port": max(1, min(int(payload.internal_port or 8080), 65535)), "is_active": bool(payload.is_active), "extra_config": normalize_channel_extra(payload.extra_config), "locked": False, } config_data, channels_cfg = _read_bot_channels_cfg(bot_id) channels_cfg[ctype] = channel_api_to_config(new_row) _write_bot_config_state(session, bot_id=bot_id, config_data=config_data, sync_workspace=True) return new_row def update_bot_channel_config( session: Session, *, bot_id: str, channel_id: str, payload: ChannelConfigUpdateRequest, ) -> Dict[str, Any]: bot = _get_bot_or_404(session, bot_id) rows = list_bot_channels_from_config(bot) row = _find_channel_row(rows, channel_id) if str(row.get("channel_type") or "").strip().lower() == "dashboard" or bool(row.get("locked")): raise HTTPException(status_code=400, detail="dashboard channel is built-in and cannot be modified") update_data = payload.model_dump(exclude_unset=True) existing_type = str(row.get("channel_type") or "").strip().lower() new_type = existing_type if "channel_type" in update_data and update_data["channel_type"] is not None: new_type = str(update_data["channel_type"]).strip().lower() if not new_type: raise HTTPException(status_code=400, detail="channel_type cannot be empty") if existing_type == "dashboard" and new_type != "dashboard": raise HTTPException(status_code=400, detail="dashboard channel type cannot be changed") if new_type != existing_type and any(str(r.get("channel_type") or "").lower() == new_type for r in rows): raise HTTPException(status_code=400, detail=f"Channel already exists: {new_type}") if "external_app_id" in update_data and update_data["external_app_id"] is not None: row["external_app_id"] = str(update_data["external_app_id"]).strip() if "app_secret" in update_data and update_data["app_secret"] is not None: row["app_secret"] = str(update_data["app_secret"]).strip() if "internal_port" in update_data and update_data["internal_port"] is not None: row["internal_port"] = max(1, min(int(update_data["internal_port"]), 65535)) if "is_active" in update_data and update_data["is_active"] is not None: next_active = bool(update_data["is_active"]) if existing_type == "dashboard" and not next_active: raise HTTPException(status_code=400, detail="dashboard channel must remain enabled") row["is_active"] = next_active if "extra_config" in update_data: row["extra_config"] = normalize_channel_extra(update_data.get("extra_config")) row["channel_type"] = new_type row["id"] = new_type row["locked"] = new_type == "dashboard" config_data, channels_cfg = _read_bot_channels_cfg(bot_id) current_send_progress, current_send_tool_hints = read_global_delivery_flags(channels_cfg) if new_type == "dashboard": extra = normalize_channel_extra(row.get("extra_config")) channels_cfg["sendProgress"] = bool(extra.get("sendProgress", current_send_progress)) channels_cfg["sendToolHints"] = bool(extra.get("sendToolHints", current_send_tool_hints)) else: channels_cfg["sendProgress"] = current_send_progress channels_cfg["sendToolHints"] = current_send_tool_hints channels_cfg.pop("dashboard", None) if existing_type != "dashboard" and existing_type in channels_cfg and existing_type != new_type: channels_cfg.pop(existing_type, None) if new_type != "dashboard": channels_cfg[new_type] = channel_api_to_config(row) _write_bot_config_state(session, bot_id=bot_id, config_data=config_data, sync_workspace=True) return row def delete_bot_channel_config( session: Session, *, bot_id: str, channel_id: str, ) -> Dict[str, Any]: bot = _get_bot_or_404(session, bot_id) rows = list_bot_channels_from_config(bot) row = _find_channel_row(rows, channel_id) if str(row.get("channel_type") or "").lower() == "dashboard": raise HTTPException(status_code=400, detail="dashboard channel cannot be deleted") config_data, channels_cfg = _read_bot_channels_cfg(bot_id) channels_cfg.pop(str(row.get("channel_type") or "").lower(), None) _write_bot_config_state(session, bot_id=bot_id, config_data=config_data, sync_workspace=True) return {"status": "deleted"}