160 lines
7.4 KiB
Python
160 lines
7.4 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Request
|
|
from sqlmodel import Session, select
|
|
|
|
from core.database import get_session
|
|
from models.bot import BotInstance
|
|
from providers.target import ProviderTarget
|
|
from services.node_registry_service import ManagedNode
|
|
|
|
from api.platform_node_support import (
|
|
edge_node_self_with_native_preflight,
|
|
managed_node_from_payload,
|
|
normalize_node_payload,
|
|
serialize_node,
|
|
)
|
|
from api.platform_shared import (
|
|
cached_platform_nodes_payload,
|
|
invalidate_platform_nodes_cache,
|
|
invalidate_platform_overview_cache,
|
|
logger,
|
|
store_platform_nodes_payload,
|
|
)
|
|
from clients.edge.errors import log_edge_failure
|
|
from schemas.platform import ManagedNodePayload
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/api/platform/nodes")
|
|
def list_platform_nodes(request: Request, session: Session = Depends(get_session)):
|
|
cached_payload = cached_platform_nodes_payload()
|
|
if cached_payload is not None:
|
|
return cached_payload
|
|
|
|
node_registry = getattr(request.app.state, "node_registry_service", None)
|
|
if node_registry is None or not hasattr(node_registry, "list_nodes"):
|
|
return {"items": []}
|
|
resolve_edge_client = getattr(request.app.state, "resolve_edge_client", None)
|
|
refreshed_items = []
|
|
for node in node_registry.list_nodes():
|
|
metadata = dict(node.metadata or {})
|
|
refresh_failed = False
|
|
if callable(resolve_edge_client) and str(metadata.get("transport_kind") or "").strip().lower() == "edge" and bool(node.enabled):
|
|
try:
|
|
client = resolve_edge_client(
|
|
ProviderTarget(
|
|
node_id=node.node_id,
|
|
transport_kind="edge",
|
|
runtime_kind=str(metadata.get("runtime_kind") or "docker"),
|
|
core_adapter=str(metadata.get("core_adapter") or "nanobot"),
|
|
)
|
|
)
|
|
node_self = edge_node_self_with_native_preflight(client=client, node=node)
|
|
node = node_registry.mark_node_seen(
|
|
session,
|
|
node_id=node.node_id,
|
|
display_name=str(node.display_name or node_self.get("display_name") or node.node_id),
|
|
capabilities=dict(node_self.get("capabilities") or {}),
|
|
resources=dict(node_self.get("resources") or {}),
|
|
)
|
|
except Exception as exc:
|
|
refresh_failed = True
|
|
log_edge_failure(
|
|
logger,
|
|
key=f"platform-node-refresh:{node.node_id}",
|
|
exc=exc,
|
|
message=f"Failed to refresh edge node metadata for node_id={node.node_id}",
|
|
)
|
|
refreshed_items.append((node, refresh_failed))
|
|
return store_platform_nodes_payload([
|
|
serialize_node(node, refresh_failed=refresh_failed)
|
|
for node, refresh_failed in refreshed_items
|
|
])
|
|
|
|
|
|
@router.get("/api/platform/nodes/{node_id}")
|
|
def get_platform_node(node_id: str, request: Request, session: Session = Depends(get_session)):
|
|
normalized_node_id = str(node_id or "").strip().lower()
|
|
node_registry = getattr(request.app.state, "node_registry_service", None)
|
|
if node_registry is None or not hasattr(node_registry, "get_node"):
|
|
raise HTTPException(status_code=500, detail="node registry is unavailable")
|
|
node = node_registry.get_node(normalized_node_id)
|
|
if node is None:
|
|
raise HTTPException(status_code=404, detail=f"Managed node not found: {normalized_node_id}")
|
|
return serialize_node(node)
|
|
|
|
|
|
@router.post("/api/platform/nodes")
|
|
def create_platform_node(payload: ManagedNodePayload, request: Request, session: Session = Depends(get_session)):
|
|
node_registry = getattr(request.app.state, "node_registry_service", None)
|
|
if node_registry is None or not hasattr(node_registry, "get_node"):
|
|
raise HTTPException(status_code=500, detail="node registry is unavailable")
|
|
normalized = normalize_node_payload(payload)
|
|
if node_registry.get_node(normalized.node_id) is not None:
|
|
raise HTTPException(status_code=409, detail=f"Node already exists: {normalized.node_id}")
|
|
node = node_registry.upsert_node(session, managed_node_from_payload(normalized))
|
|
invalidate_platform_overview_cache()
|
|
invalidate_platform_nodes_cache()
|
|
return serialize_node(node)
|
|
|
|
|
|
@router.put("/api/platform/nodes/{node_id}")
|
|
def update_platform_node(node_id: str, payload: ManagedNodePayload, request: Request, session: Session = Depends(get_session)):
|
|
normalized_node_id = str(node_id or "").strip().lower()
|
|
node_registry = getattr(request.app.state, "node_registry_service", None)
|
|
if node_registry is None or not hasattr(node_registry, "get_node"):
|
|
raise HTTPException(status_code=500, detail="node registry is unavailable")
|
|
existing = node_registry.get_node(normalized_node_id)
|
|
if existing is None:
|
|
raise HTTPException(status_code=404, detail=f"Managed node not found: {normalized_node_id}")
|
|
normalized = normalize_node_payload(payload)
|
|
if normalized.node_id != normalized_node_id:
|
|
raise HTTPException(status_code=400, detail="node_id cannot be changed")
|
|
node = node_registry.upsert_node(
|
|
session,
|
|
ManagedNode(
|
|
node_id=normalized_node_id,
|
|
display_name=normalized.display_name,
|
|
base_url=normalized.base_url,
|
|
enabled=bool(normalized.enabled),
|
|
auth_token=normalized.auth_token or existing.auth_token,
|
|
metadata={
|
|
"transport_kind": normalized.transport_kind,
|
|
"runtime_kind": normalized.runtime_kind,
|
|
"core_adapter": normalized.core_adapter,
|
|
"workspace_root": normalized.workspace_root,
|
|
"native_command": normalized.native_command,
|
|
"native_workdir": normalized.native_workdir,
|
|
"native_sandbox_mode": normalized.native_sandbox_mode,
|
|
},
|
|
capabilities=dict(existing.capabilities or {}),
|
|
resources=dict(existing.resources or {}),
|
|
last_seen_at=existing.last_seen_at,
|
|
),
|
|
)
|
|
invalidate_platform_overview_cache()
|
|
invalidate_platform_nodes_cache()
|
|
return serialize_node(node)
|
|
|
|
|
|
@router.delete("/api/platform/nodes/{node_id}")
|
|
def delete_platform_node(node_id: str, request: Request, session: Session = Depends(get_session)):
|
|
normalized_node_id = str(node_id or "").strip().lower()
|
|
if normalized_node_id == "local":
|
|
raise HTTPException(status_code=400, detail="Local node cannot be deleted")
|
|
node_registry = getattr(request.app.state, "node_registry_service", None)
|
|
if node_registry is None or not hasattr(node_registry, "get_node"):
|
|
raise HTTPException(status_code=500, detail="node registry is unavailable")
|
|
if node_registry.get_node(normalized_node_id) is None:
|
|
raise HTTPException(status_code=404, detail=f"Managed node not found: {normalized_node_id}")
|
|
attached_bot_ids = session.exec(select(BotInstance.id).where(BotInstance.node_id == normalized_node_id)).all()
|
|
if attached_bot_ids:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Node {normalized_node_id} still has bots assigned: {', '.join(str(item) for item in attached_bot_ids[:5])}",
|
|
)
|
|
node_registry.delete_node(session, normalized_node_id)
|
|
invalidate_platform_overview_cache()
|
|
invalidate_platform_nodes_cache()
|
|
return {"status": "deleted", "node_id": normalized_node_id}
|