import shlex import time from typing import Any, Dict, Optional import httpx from fastapi import HTTPException from clients.edge.errors import log_edge_failure, summarize_edge_exception from clients.edge.http import HttpEdgeClient from providers.target import ProviderTarget from schemas.platform import ( ManagedNodeConnectivityResult, ManagedNodeNativePreflightResult, ManagedNodePayload, ) from services.node_registry_service import ManagedNode from api.platform_shared import logger def normalize_native_sandbox_mode(raw_value: Any) -> str: text = str(raw_value or "").strip().lower() if text in {"workspace", "sandbox", "strict"}: return "workspace" if text in {"full_access", "full-access", "danger-full-access", "escape"}: return "full_access" return "inherit" def normalize_node_payload(payload: ManagedNodePayload) -> ManagedNodePayload: normalized_node_id = str(payload.node_id or "").strip().lower() if not normalized_node_id: raise HTTPException(status_code=400, detail="node_id is required") transport_kind = str(payload.transport_kind or "edge").strip().lower() or "edge" if transport_kind != "edge": raise HTTPException(status_code=400, detail="Only edge transport is supported") runtime_kind = str(payload.runtime_kind or "docker").strip().lower() or "docker" core_adapter = str(payload.core_adapter or "nanobot").strip().lower() or "nanobot" native_sandbox_mode = normalize_native_sandbox_mode(payload.native_sandbox_mode) base_url = str(payload.base_url or "").strip() if transport_kind == "edge" and not base_url: raise HTTPException(status_code=400, detail="base_url is required for edge nodes") return payload.model_copy( update={ "node_id": normalized_node_id, "display_name": str(payload.display_name or normalized_node_id).strip() or normalized_node_id, "base_url": base_url, "auth_token": str(payload.auth_token or "").strip(), "transport_kind": transport_kind, "runtime_kind": runtime_kind, "core_adapter": core_adapter, "workspace_root": str(payload.workspace_root or "").strip(), "native_command": str(payload.native_command or "").strip(), "native_workdir": str(payload.native_workdir or "").strip(), "native_sandbox_mode": native_sandbox_mode, } ) def managed_node_from_payload(payload: ManagedNodePayload) -> ManagedNode: normalized = normalize_node_payload(payload) return ManagedNode( node_id=normalized.node_id, display_name=normalized.display_name, base_url=normalized.base_url, enabled=bool(normalized.enabled), auth_token=normalized.auth_token, metadata={ "transport_kind": normalized.transport_kind, "runtime_kind": normalized.runtime_kind, "core_adapter": normalized.core_adapter, "workspace_root": normalized.workspace_root, "native_command": normalized.native_command, "native_workdir": normalized.native_workdir, "native_sandbox_mode": normalized.native_sandbox_mode, }, ) def node_status(node: ManagedNode, *, refresh_failed: bool = False) -> str: if not bool(node.enabled): return "disabled" transport_kind = str((node.metadata or {}).get("transport_kind") or "edge").strip().lower() if transport_kind != "edge": return "unknown" if refresh_failed: return "offline" return "online" if node.last_seen_at else "unknown" def serialize_node(node: ManagedNode, *, refresh_failed: bool = False) -> Dict[str, Any]: metadata = dict(node.metadata or {}) return { "node_id": node.node_id, "display_name": node.display_name, "base_url": node.base_url, "enabled": bool(node.enabled), "transport_kind": str(metadata.get("transport_kind") or ""), "runtime_kind": str(metadata.get("runtime_kind") or ""), "core_adapter": str(metadata.get("core_adapter") or ""), "workspace_root": str(metadata.get("workspace_root") or ""), "native_command": str(metadata.get("native_command") or ""), "native_workdir": str(metadata.get("native_workdir") or ""), "native_sandbox_mode": str(metadata.get("native_sandbox_mode") or "inherit"), "metadata": metadata, "capabilities": dict(node.capabilities or {}), "resources": dict(getattr(node, "resources", {}) or {}), "last_seen_at": node.last_seen_at, "status": node_status(node, refresh_failed=refresh_failed), } def split_native_command(raw_command: Optional[str]) -> list[str]: text = str(raw_command or "").strip() if not text: return [] try: return [str(item or "").strip() for item in shlex.split(text) if str(item or "").strip()] except Exception: return [text] def runtime_native_supported(node_self: Dict[str, Any]) -> bool: capabilities = dict(node_self.get("capabilities") or {}) runtime_caps = dict(capabilities.get("runtime") or {}) return bool(runtime_caps.get("native") is True) def edge_node_self_with_native_preflight(*, client: HttpEdgeClient, node: ManagedNode) -> Dict[str, Any]: node_self = dict(client.heartbeat_node() or {}) metadata = dict(node.metadata or {}) native_command = str(metadata.get("native_command") or "").strip() or None native_workdir = str(metadata.get("native_workdir") or "").strip() or None runtime_kind = str(metadata.get("runtime_kind") or "docker").strip().lower() should_probe = bool(native_command or native_workdir or runtime_kind == "native") if not should_probe: return node_self try: preflight = dict(client.preflight_native(native_command=native_command, native_workdir=native_workdir) or {}) except Exception as exc: log_edge_failure( logger, key=f"platform-node-native-preflight:{node.node_id}", exc=exc, message=f"Failed to run native preflight for node_id={node.node_id}", ) return node_self caps = dict(node_self.get("capabilities") or {}) process_caps = dict(caps.get("process") or {}) if preflight.get("command"): process_caps["command"] = list(preflight.get("command") or []) process_caps["available"] = bool(preflight.get("ok")) process_caps["command_available"] = bool(preflight.get("command_available")) process_caps["workdir_exists"] = bool(preflight.get("workdir_exists")) process_caps["workdir"] = str(preflight.get("workdir") or "") process_caps["detail"] = str(preflight.get("detail") or "") caps["process"] = process_caps node_self["capabilities"] = caps node_self["native_preflight"] = preflight return node_self def test_edge_connectivity(resolve_edge_client, node: ManagedNode) -> ManagedNodeConnectivityResult: started = time.perf_counter() try: client = resolve_edge_client( ProviderTarget( node_id=node.node_id, transport_kind="edge", runtime_kind=str((node.metadata or {}).get("runtime_kind") or "docker"), core_adapter=str((node.metadata or {}).get("core_adapter") or "nanobot"), ) ) node_self = edge_node_self_with_native_preflight(client=client, node=node) latency_ms = max(1, int((time.perf_counter() - started) * 1000)) return ManagedNodeConnectivityResult( ok=True, status="online", latency_ms=latency_ms, detail="dashboard-edge reachable", node_self=node_self, ) except Exception as exc: latency_ms = max(1, int((time.perf_counter() - started) * 1000)) return ManagedNodeConnectivityResult( ok=False, status="offline", latency_ms=latency_ms, detail=summarize_edge_exception(exc), node_self=None, ) def test_edge_native_preflight( resolve_edge_client, node: ManagedNode, *, native_command: Optional[str] = None, native_workdir: Optional[str] = None, ) -> ManagedNodeNativePreflightResult: started = time.perf_counter() command_hint = split_native_command(native_command) workdir_hint = str(native_workdir or "").strip() try: client = resolve_edge_client( ProviderTarget( node_id=node.node_id, transport_kind="edge", runtime_kind=str((node.metadata or {}).get("runtime_kind") or "docker"), core_adapter=str((node.metadata or {}).get("core_adapter") or "nanobot"), ) ) node_self = dict(client.heartbeat_node() or {}) preflight = dict( client.preflight_native( native_command=native_command, native_workdir=native_workdir, ) or {} ) latency_ms = max(1, int((time.perf_counter() - started) * 1000)) command = [str(item or "").strip() for item in list(preflight.get("command") or []) if str(item or "").strip()] workdir = str(preflight.get("workdir") or "") detail = str(preflight.get("detail") or "") if not detail: detail = "native launcher ready" if bool(preflight.get("ok")) else "native launcher not ready" return ManagedNodeNativePreflightResult( ok=bool(preflight.get("ok")), status="online", latency_ms=latency_ms, detail=detail, command=command, workdir=workdir, command_available=bool(preflight.get("command_available")), workdir_exists=bool(preflight.get("workdir_exists")), runtime_native_supported=runtime_native_supported(node_self), node_self=node_self, ) except Exception as exc: latency_ms = max(1, int((time.perf_counter() - started) * 1000)) return ManagedNodeNativePreflightResult( ok=False, status="offline", latency_ms=latency_ms, detail=summarize_edge_exception(exc), command=command_hint, workdir=workdir_hint, command_available=False, workdir_exists=False if workdir_hint else True, runtime_native_supported=False, node_self=None, )