dashboard-nanobot/backend/clients/edge/http.py

544 lines
20 KiB
Python

import mimetypes
import os
from typing import Any, Callable, Dict, List, Optional
from urllib.parse import quote
import httpx
from fastapi import HTTPException, Request, UploadFile
from fastapi.responses import RedirectResponse, Response
from clients.edge.base import EdgeClient
from clients.edge.errors import edge_transport_http_exception
from models.bot import BotInstance
from schemas.edge import (
EdgeCommandRequest,
EdgeLogsResponse,
EdgeNativePreflightRequest,
EdgeNativePreflightResponse,
EdgeNodeHeartbeatResponse,
EdgeMonitorPacketsResponse,
EdgeMarkdownWriteRequest,
EdgeMonitorEnsureResponse,
EdgeNodeResourcesResponse,
EdgeNodeSelfResponse,
EdgeStateResponse,
EdgeStateWriteRequest,
EdgeStartBotRequest,
EdgeStatusResponse,
EdgeWorkspaceSyncRequest,
)
from services.node_registry_service import ManagedNode
EDGE_AUTH_HEADER = "x-dashboard-edge-token"
class HttpEdgeClient(EdgeClient):
def __init__(
self,
*,
node: ManagedNode,
http_client_factory: Optional[Callable[[], httpx.Client]] = None,
async_http_client_factory: Optional[Callable[[], httpx.AsyncClient]] = None,
) -> None:
self._node = node
self._http_client_factory = http_client_factory or (lambda: httpx.Client(timeout=15.0, trust_env=False))
self._async_http_client_factory = async_http_client_factory or (
lambda: httpx.AsyncClient(timeout=15.0, trust_env=False)
)
async def start_bot(self, *, bot: BotInstance, start_payload: Dict[str, Any]) -> Dict[str, Any]:
payload = await self._async_request_json(
"POST",
f"/api/edge/bots/{bot.id}/start",
json=EdgeStartBotRequest.model_validate(start_payload).model_dump(),
)
return EdgeStatusResponse.model_validate(payload).model_dump()
def stop_bot(self, *, bot: BotInstance) -> Dict[str, Any]:
payload = self._request_json("POST", f"/api/edge/bots/{bot.id}/stop")
return EdgeStatusResponse.model_validate(payload).model_dump()
def deliver_command(self, *, bot_id: str, command: str, media: Optional[List[str]] = None) -> Optional[str]:
self._request_json(
"POST",
f"/api/edge/bots/{bot_id}/command",
json=EdgeCommandRequest(command=command, media=list(media or [])).model_dump(),
)
return None
def get_recent_logs(self, *, bot_id: str, tail: int = 300) -> List[str]:
payload = self._request_json(
"GET",
f"/api/edge/bots/{bot_id}/logs",
params={"tail": max(1, int(tail or 300))},
)
return EdgeLogsResponse.model_validate(payload).logs
def ensure_monitor(self, *, bot_id: str) -> bool:
payload = self._request_json("POST", f"/api/edge/bots/{bot_id}/monitor/ensure")
return bool(EdgeMonitorEnsureResponse.model_validate(payload).ensured)
def get_monitor_packets(self, *, bot_id: str, after_seq: int = 0, limit: int = 200) -> List[Dict[str, Any]]:
payload = self._request_json(
"GET",
f"/api/edge/bots/{bot_id}/monitor/packets",
params={"after_seq": max(0, int(after_seq or 0)), "limit": max(1, int(limit or 200))},
)
parsed = EdgeMonitorPacketsResponse.model_validate(payload)
rows: List[Dict[str, Any]] = []
for item in parsed.packets or []:
rows.append(item.model_dump())
return rows
def get_runtime_status(self, *, bot_id: str) -> str:
payload = self._request_json("GET", f"/api/edge/bots/{bot_id}/runtime/status")
return str(payload.get("status") or "STOPPED").upper()
def get_resource_snapshot(self, *, bot_id: str) -> Dict[str, Any]:
return self._request_json("GET", f"/api/edge/bots/{bot_id}/resources")
def get_node_resources(self) -> Dict[str, Any]:
payload = self._request_json("GET", "/api/edge/node/resources")
return EdgeNodeResourcesResponse.model_validate(payload).model_dump()
def get_node_self(self) -> Dict[str, Any]:
payload = self._request_json("GET", "/api/edge/node/self")
return EdgeNodeSelfResponse.model_validate(payload).model_dump()
def heartbeat_node(self) -> Dict[str, Any]:
payload = self._request_json("POST", "/api/edge/node/heartbeat")
return EdgeNodeHeartbeatResponse.model_validate(payload).model_dump()
def preflight_native(self, *, native_command: Optional[str] = None, native_workdir: Optional[str] = None) -> Dict[str, Any]:
payload = self._request_json(
"POST",
"/api/edge/runtime/native/preflight",
json=EdgeNativePreflightRequest(
native_command=str(native_command or "").strip() or None,
native_workdir=str(native_workdir or "").strip() or None,
).model_dump(),
)
return EdgeNativePreflightResponse.model_validate(payload).model_dump()
def read_state(
self,
*,
bot_id: str,
state_key: str,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
params: Dict[str, Any] = {}
if workspace_root:
params["workspace_root"] = str(workspace_root).strip()
payload = self._request_json(
"GET",
f"/api/edge/bots/{bot_id}/state/{state_key}",
params=params or None,
)
return EdgeStateResponse.model_validate(payload).model_dump()
def write_state(
self,
*,
bot_id: str,
state_key: str,
data: Dict[str, Any],
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
payload = self._request_json(
"PUT",
f"/api/edge/bots/{bot_id}/state/{state_key}",
json=EdgeStateWriteRequest(
data=dict(data or {}),
workspace_root=str(workspace_root or "").strip() or None,
).model_dump(),
)
return EdgeStateResponse.model_validate(payload).model_dump()
def sync_bot_workspace(
self,
*,
bot_id: str,
channels_override: Optional[List[Dict[str, Any]]] = None,
global_delivery_override: Optional[Dict[str, Any]] = None,
runtime_overrides: Optional[Dict[str, Any]] = None,
) -> None:
self._request_json(
"POST",
f"/api/edge/bots/{bot_id}/workspace/sync",
json=EdgeWorkspaceSyncRequest(
channels_override=channels_override,
global_delivery_override=global_delivery_override,
runtime_overrides=runtime_overrides,
).model_dump(),
)
def purge_workspace(self, *, bot_id: str, workspace_root: Optional[str] = None) -> Dict[str, Any]:
params: Dict[str, Any] = {}
if workspace_root:
params["workspace_root"] = str(workspace_root).strip()
payload = self._request_json(
"POST",
f"/api/edge/bots/{bot_id}/workspace/purge",
params=params or None,
)
return EdgeStatusResponse.model_validate(payload).model_dump()
def list_tree(
self,
*,
bot_id: str,
path: Optional[str] = None,
recursive: bool = False,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
params: Dict[str, Any] = {"recursive": bool(recursive)}
if path:
params["path"] = path
if workspace_root:
params["workspace_root"] = str(workspace_root).strip()
return self._request_json("GET", f"/api/edge/bots/{bot_id}/workspace/tree", params=params)
def read_file(
self,
*,
bot_id: str,
path: str,
max_bytes: int = 200000,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
params: Dict[str, Any] = {"path": path, "max_bytes": max(4096, int(max_bytes or 200000))}
if workspace_root:
params["workspace_root"] = str(workspace_root).strip()
return self._request_json(
"GET",
f"/api/edge/bots/{bot_id}/workspace/file",
params=params,
)
def write_markdown(
self,
*,
bot_id: str,
path: str,
content: str,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
params: Dict[str, Any] = {"path": path}
if workspace_root:
params["workspace_root"] = str(workspace_root).strip()
return self._request_json(
"PUT",
f"/api/edge/bots/{bot_id}/workspace/file/markdown",
params=params,
json=EdgeMarkdownWriteRequest(content=str(content or "")).model_dump(),
)
def write_text_file(
self,
*,
bot_id: str,
path: str,
content: str,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
params: Dict[str, Any] = {"path": path}
if workspace_root:
params["workspace_root"] = str(workspace_root).strip()
return self._request_json(
"PUT",
f"/api/edge/bots/{bot_id}/workspace/file/text",
params=params,
json=EdgeMarkdownWriteRequest(content=str(content or "")).model_dump(),
)
async def upload_files(
self,
*,
bot_id: str,
files: List[UploadFile],
path: Optional[str] = None,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
base_url = self._require_base_url()
multipart_files = []
response: httpx.Response | None = None
try:
async with self._async_http_client_factory() as client:
for upload in files:
await upload.seek(0)
multipart_files.append(
(
"files",
(
upload.filename or "upload.bin",
upload.file,
upload.content_type or "application/octet-stream",
),
)
)
response = await client.request(
method="POST",
url=f"{base_url}/api/edge/bots/{quote(bot_id, safe='')}/workspace/upload",
headers=self._headers(),
params=self._workspace_upload_params(path=path, workspace_root=workspace_root),
files=multipart_files,
)
except httpx.RequestError as exc:
raise edge_transport_http_exception(exc, node=self._node) from exc
finally:
for upload in files:
await upload.close()
if response is None:
raise HTTPException(status_code=502, detail="dashboard-edge upload request failed before receiving a response")
return self._parse_json_response(response)
def delete_workspace_path(
self,
*,
bot_id: str,
path: str,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
params: Dict[str, Any] = {"path": path}
if workspace_root:
params["workspace_root"] = str(workspace_root).strip()
return self._request_json(
"DELETE",
f"/api/edge/bots/{bot_id}/workspace/file",
params=params,
)
def upload_local_files(
self,
*,
bot_id: str,
local_paths: List[str],
path: Optional[str] = None,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
if not local_paths:
return {"bot_id": bot_id, "files": []}
base_url = self._require_base_url()
multipart_files = []
handles = []
response: httpx.Response | None = None
try:
for local_path in local_paths:
normalized = os.path.abspath(os.path.expanduser(str(local_path or "").strip()))
if not os.path.isfile(normalized):
raise HTTPException(status_code=400, detail=f"Local upload file not found: {local_path}")
handle = open(normalized, "rb")
handles.append(handle)
multipart_files.append(
(
"files",
(
os.path.basename(normalized),
handle,
mimetypes.guess_type(normalized)[0] or "application/octet-stream",
),
)
)
with self._http_client_factory() as client:
response = client.request(
method="POST",
url=f"{base_url}/api/edge/bots/{quote(bot_id, safe='')}/workspace/upload",
headers=self._headers(),
params=self._workspace_upload_params(path=path, workspace_root=workspace_root),
files=multipart_files,
)
except OSError as exc:
raise HTTPException(status_code=500, detail=f"Failed to open local upload file: {exc.strerror or str(exc)}") from exc
except httpx.RequestError as exc:
raise edge_transport_http_exception(exc, node=self._node) from exc
finally:
for handle in handles:
try:
handle.close()
except Exception:
continue
if response is None:
raise HTTPException(status_code=502, detail="dashboard-edge upload request failed before receiving a response")
return self._parse_json_response(response)
def serve_file(
self,
*,
bot_id: str,
path: str,
download: bool,
request: Request,
public: bool = False,
redirect_html_to_raw: bool = False,
workspace_root: Optional[str] = None,
) -> Response:
media_type, _ = mimetypes.guess_type(path)
if redirect_html_to_raw and not download and str(media_type or "").startswith("text/html"):
raw_url = self._build_dashboard_raw_url(bot_id=bot_id, path=path, public=public)
if raw_url:
return RedirectResponse(url=raw_url, status_code=307)
base_url = self._require_base_url()
url = self._build_edge_file_url(
bot_id=bot_id,
path=path,
download=download,
raw=not redirect_html_to_raw,
workspace_root=workspace_root,
)
headers = self._headers()
range_header = request.headers.get("range", "").strip()
if range_header and not download:
headers["range"] = range_header
try:
with self._http_client_factory() as client:
response = client.request(
method="GET",
url=f"{base_url}{url}",
headers=headers,
)
except httpx.RequestError as exc:
raise edge_transport_http_exception(exc, node=self._node) from exc
self._raise_for_status(response)
return Response(
content=response.content,
status_code=response.status_code,
media_type=response.headers.get("content-type") or "application/octet-stream",
headers=self._response_proxy_headers(response),
)
def _request_json(
self,
method: str,
path: str,
*,
params: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
base_url = self._require_base_url()
try:
with self._http_client_factory() as client:
response = client.request(
method=method.upper(),
url=f"{base_url}{path}",
headers=self._headers(),
params=params,
json=json,
)
except httpx.RequestError as exc:
raise edge_transport_http_exception(exc, node=self._node) from exc
return self._parse_json_response(response)
async def _async_request_json(
self,
method: str,
path: str,
*,
params: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
base_url = self._require_base_url()
try:
async with self._async_http_client_factory() as client:
response = await client.request(
method=method.upper(),
url=f"{base_url}{path}",
headers=self._headers(),
params=params,
json=json,
)
except httpx.RequestError as exc:
raise edge_transport_http_exception(exc, node=self._node) from exc
return self._parse_json_response(response)
def _headers(self) -> Dict[str, str]:
headers = {"accept": "application/json"}
token = str(self._node.auth_token or "").strip()
if token:
headers[EDGE_AUTH_HEADER] = token
return headers
def _require_base_url(self) -> str:
base_url = str(self._node.base_url or "").strip().rstrip("/")
if not base_url:
raise self._not_implemented("connect to node")
return base_url
@staticmethod
def _raise_for_status(response: httpx.Response) -> None:
try:
response.raise_for_status()
except httpx.HTTPStatusError as exc:
detail = exc.response.text.strip() or str(exc)
raise HTTPException(status_code=502, detail=f"dashboard-edge request failed: {detail[:400]}") from exc
@classmethod
def _parse_json_response(cls, response: httpx.Response) -> Dict[str, Any]:
cls._raise_for_status(response)
try:
payload = response.json()
except Exception as exc:
raise HTTPException(status_code=502, detail="dashboard-edge returned invalid JSON") from exc
if not isinstance(payload, dict):
raise HTTPException(status_code=502, detail="dashboard-edge returned unexpected payload")
return payload
@staticmethod
def _build_dashboard_raw_url(bot_id: str, path: str, public: bool) -> str:
normalized = "/".join(part for part in str(path or "").strip().split("/") if part)
if not normalized:
return ""
prefix = "/public" if public else "/api"
return f"{prefix}/bots/{quote(bot_id, safe='')}/workspace/raw/{quote(normalized, safe='/')}"
@staticmethod
def _build_edge_file_url(
*,
bot_id: str,
path: str,
download: bool,
raw: bool,
workspace_root: Optional[str] = None,
) -> str:
workspace_root_qs = ""
normalized_workspace_root = str(workspace_root or "").strip()
if normalized_workspace_root:
workspace_root_qs = f"&workspace_root={quote(normalized_workspace_root, safe='/')}"
if raw:
normalized = "/".join(part for part in str(path or "").strip().split("/") if part)
if not normalized:
raise HTTPException(status_code=400, detail="invalid workspace path")
return (
f"/api/edge/bots/{quote(bot_id, safe='')}/workspace/raw/"
f"{quote(normalized, safe='/')}?download={'true' if download else 'false'}{workspace_root_qs}"
)
return (
f"/api/edge/bots/{quote(bot_id, safe='')}/workspace/download"
f"?path={quote(str(path or ''), safe='/')}&download={'true' if download else 'false'}{workspace_root_qs}"
)
@staticmethod
def _workspace_upload_params(*, path: Optional[str], workspace_root: Optional[str]) -> Optional[Dict[str, Any]]:
params: Dict[str, Any] = {}
if path:
params["path"] = path
normalized_workspace_root = str(workspace_root or "").strip()
if normalized_workspace_root:
params["workspace_root"] = normalized_workspace_root
return params or None
@staticmethod
def _response_proxy_headers(response: httpx.Response) -> Dict[str, str]:
kept: Dict[str, str] = {}
for name in ("accept-ranges", "content-disposition", "content-length", "content-range", "cache-control"):
value = response.headers.get(name)
if value:
kept[name] = value
return kept
def _not_implemented(self, capability: str) -> HTTPException:
node_label = self._node.display_name or self._node.node_id
return HTTPException(status_code=501, detail=f"dashboard-edge {capability} is not implemented yet for node {node_label}")