dashboard-nanobot/backend/clients/edge/base.py

169 lines
4.4 KiB
Python

from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from fastapi import Request, UploadFile
from fastapi.responses import Response
from models.bot import BotInstance
class EdgeClient(ABC):
@abstractmethod
async def start_bot(self, *, bot: BotInstance, start_payload: Dict[str, Any]) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def stop_bot(self, *, bot: BotInstance) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def deliver_command(self, *, bot_id: str, command: str, media: Optional[List[str]] = None) -> Optional[str]:
raise NotImplementedError
@abstractmethod
def get_recent_logs(self, *, bot_id: str, tail: int = 300) -> List[str]:
raise NotImplementedError
@abstractmethod
def ensure_monitor(self, *, bot_id: str) -> bool:
raise NotImplementedError
@abstractmethod
def get_monitor_packets(self, *, bot_id: str, after_seq: int = 0, limit: int = 200) -> List[Dict[str, Any]]:
raise NotImplementedError
@abstractmethod
def get_runtime_status(self, *, bot_id: str) -> str:
raise NotImplementedError
@abstractmethod
def get_resource_snapshot(self, *, bot_id: str) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def get_node_resources(self) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def get_node_self(self) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def preflight_native(self, *, native_command: Optional[str] = None, native_workdir: Optional[str] = None) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def read_state(
self,
*,
bot_id: str,
state_key: str,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def write_state(
self,
*,
bot_id: str,
state_key: str,
data: Dict[str, Any],
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def sync_bot_workspace(
self,
*,
bot_id: str,
channels_override: Optional[List[Dict[str, Any]]] = None,
global_delivery_override: Optional[Dict[str, Any]] = None,
runtime_overrides: Optional[Dict[str, Any]] = None,
) -> None:
raise NotImplementedError
@abstractmethod
def purge_workspace(self, *, bot_id: str, workspace_root: Optional[str] = None) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def list_tree(
self,
*,
bot_id: str,
path: Optional[str] = None,
recursive: bool = False,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def read_file(
self,
*,
bot_id: str,
path: str,
max_bytes: int = 200000,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def write_markdown(
self,
*,
bot_id: str,
path: str,
content: str,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def write_text_file(
self,
*,
bot_id: str,
path: str,
content: str,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
async def upload_files(
self,
*,
bot_id: str,
files: List[UploadFile],
path: Optional[str] = None,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def delete_workspace_path(
self,
*,
bot_id: str,
path: str,
workspace_root: Optional[str] = None,
) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def serve_file(
self,
*,
bot_id: str,
path: str,
download: bool,
request: Request,
public: bool = False,
redirect_html_to_raw: bool = False,
workspace_root: Optional[str] = None,
) -> Response:
raise NotImplementedError