import logging from typing import Any, Callable, Dict, List, Optional, Tuple from sqlmodel import Session, select from clients.edge.errors import log_edge_failure from models.bot import BotInstance, NanobotImage from services.platform_activity_service import list_activity_events, prune_expired_activity_events from services.platform_common import read_bot_resources, workspace_usage_bytes from services.platform_settings_service import get_platform_settings from services.platform_usage_service import list_usage logger = logging.getLogger(__name__) def build_platform_overview( session: Session, read_runtime: Optional[Callable[[BotInstance], Tuple[str, Dict[str, Any]]]] = None, ) -> Dict[str, Any]: deleted = prune_expired_activity_events(session, force=False) if deleted > 0: session.commit() bots = session.exec(select(BotInstance)).all() images = session.exec(select(NanobotImage).order_by(NanobotImage.created_at.desc())).all() settings = get_platform_settings(session) running = 0 stopped = 0 disabled = 0 configured_cpu_total = 0.0 configured_memory_total = 0 configured_storage_total = 0 workspace_used_total = 0 workspace_limit_total = 0 live_cpu_percent_total = 0.0 live_memory_used_total = 0 live_memory_limit_total = 0 dirty = False bot_rows: List[Dict[str, Any]] = [] for bot in bots: enabled = bool(getattr(bot, "enabled", True)) resources = read_bot_resources(bot.id) runtime_status = str(bot.docker_status or "STOPPED").upper() runtime: Dict[str, Any] = {"usage": {}, "limits": {}, "docker_status": runtime_status} if callable(read_runtime): try: runtime_status, runtime = read_runtime(bot) except Exception as exc: log_edge_failure( logger, key=f"platform-overview-runtime:{bot.id}", exc=exc, message=f"Failed to read platform runtime snapshot for bot_id={bot.id}", ) runtime_status = str(runtime_status or runtime.get("docker_status") or "STOPPED").upper() runtime["docker_status"] = runtime_status if str(bot.docker_status or "").upper() != runtime_status: bot.docker_status = runtime_status session.add(bot) dirty = True if runtime_status != "RUNNING" and str(bot.current_state or "").upper() not in {"ERROR"}: next_state = "IDLE" if str(bot.current_state or "") != next_state: bot.current_state = next_state session.add(bot) dirty = True workspace_used = workspace_usage_bytes(runtime, bot.id) workspace_limit = int(resources["storage_gb"] or 0) * 1024 * 1024 * 1024 configured_cpu_total += float(resources["cpu_cores"] or 0) configured_memory_total += int(resources["memory_mb"] or 0) * 1024 * 1024 configured_storage_total += workspace_limit workspace_used_total += workspace_used workspace_limit_total += workspace_limit live_cpu_percent_total += float((runtime.get("usage") or {}).get("cpu_percent") or 0.0) live_memory_used_total += int((runtime.get("usage") or {}).get("memory_bytes") or 0) live_memory_limit_total += int((runtime.get("usage") or {}).get("memory_limit_bytes") or 0) if not enabled: disabled += 1 elif runtime_status == "RUNNING": running += 1 else: stopped += 1 bot_rows.append( { "id": bot.id, "name": bot.name, "enabled": enabled, "docker_status": runtime_status, "image_tag": bot.image_tag, "llm_provider": getattr(bot, "llm_provider", None), "llm_model": getattr(bot, "llm_model", None), "current_state": bot.current_state, "last_action": bot.last_action, "resources": resources, "workspace_usage_bytes": workspace_used, "workspace_limit_bytes": workspace_limit if workspace_limit > 0 else None, } ) if dirty: session.commit() usage = list_usage(session, limit=20) events = list_activity_events(session, limit=get_platform_settings(session).page_size, offset=0).get("items") or [] return { "summary": { "bots": { "total": len(bots), "running": running, "stopped": stopped, "disabled": disabled, }, "images": { "total": len(images), "ready": len([row for row in images if row.status == "READY"]), "abnormal": len([row for row in images if row.status != "READY"]), }, "resources": { "configured_cpu_cores": round(configured_cpu_total, 2), "configured_memory_bytes": configured_memory_total, "configured_storage_bytes": configured_storage_total, "live_cpu_percent": round(live_cpu_percent_total, 2), "live_memory_used_bytes": live_memory_used_total, "live_memory_limit_bytes": live_memory_limit_total, "workspace_used_bytes": workspace_used_total, "workspace_limit_bytes": workspace_limit_total, }, }, "images": [ { "tag": row.tag, "version": row.version, "status": row.status, "source_dir": row.source_dir, "created_at": row.created_at.isoformat() + "Z", } for row in images ], "bots": bot_rows, "settings": settings.model_dump(), "usage": usage, "events": events, } def build_node_resource_overview( session: Session, *, node_id: str, read_runtime: Optional[Callable[[BotInstance], Tuple[str, Dict[str, Any]]]] = None, ) -> Dict[str, Any]: normalized_node_id = str(node_id or "").strip().lower() bots = session.exec(select(BotInstance).where(BotInstance.node_id == normalized_node_id)).all() running = 0 stopped = 0 disabled = 0 configured_cpu_total = 0.0 configured_memory_total = 0 configured_storage_total = 0 workspace_used_total = 0 workspace_limit_total = 0 live_cpu_percent_total = 0.0 live_memory_used_total = 0 live_memory_limit_total = 0 dirty = False for bot in bots: enabled = bool(getattr(bot, "enabled", True)) resources = read_bot_resources(bot.id) runtime_status = str(bot.docker_status or "STOPPED").upper() runtime: Dict[str, Any] = {"usage": {}, "limits": {}, "docker_status": runtime_status} if callable(read_runtime): try: runtime_status, runtime = read_runtime(bot) except Exception as exc: log_edge_failure( logger, key=f"platform-node-runtime:{normalized_node_id}:{bot.id}", exc=exc, message=f"Failed to read node runtime snapshot for bot_id={bot.id}", ) runtime_status = str(runtime_status or runtime.get("docker_status") or "STOPPED").upper() runtime["docker_status"] = runtime_status if str(bot.docker_status or "").upper() != runtime_status: bot.docker_status = runtime_status session.add(bot) dirty = True workspace_used = workspace_usage_bytes(runtime, bot.id) workspace_limit = int(resources["storage_gb"] or 0) * 1024 * 1024 * 1024 configured_cpu_total += float(resources["cpu_cores"] or 0) configured_memory_total += int(resources["memory_mb"] or 0) * 1024 * 1024 configured_storage_total += workspace_limit workspace_used_total += workspace_used workspace_limit_total += workspace_limit live_cpu_percent_total += float((runtime.get("usage") or {}).get("cpu_percent") or 0.0) live_memory_used_total += int((runtime.get("usage") or {}).get("memory_bytes") or 0) live_memory_limit_total += int((runtime.get("usage") or {}).get("memory_limit_bytes") or 0) if not enabled: disabled += 1 elif runtime_status == "RUNNING": running += 1 else: stopped += 1 if dirty: session.commit() return { "node_id": normalized_node_id, "bots": { "total": len(bots), "running": running, "stopped": stopped, "disabled": disabled, }, "resources": { "configured_cpu_cores": round(configured_cpu_total, 2), "configured_memory_bytes": configured_memory_total, "configured_storage_bytes": configured_storage_total, "live_cpu_percent": round(live_cpu_percent_total, 2), "live_memory_used_bytes": live_memory_used_total, "live_memory_limit_bytes": live_memory_limit_total, "workspace_used_bytes": workspace_used_total, "workspace_limit_bytes": workspace_limit_total, }, }