import base64 import codecs import json import os import re import threading import time from typing import Any, Callable, Dict, List, Optional, Tuple import docker import httpx from app.runtime.base import EdgeRuntimeBackend class EdgeDockerManager(EdgeRuntimeBackend): runtime_kind = "docker" def __init__(self, host_data_root: str, base_image: str = "nanobot-base:v0.1.4") -> None: try: self.client = docker.from_env(timeout=6) self.client.version() print("✅ Edge Docker engine connected") except Exception as exc: self.client = None print(f"⚠️ Edge Docker engine unavailable: {exc}") self.host_data_root = host_data_root self.base_image = base_image self.active_monitors: Dict[str, threading.Thread] = {} self._last_delivery_error: Dict[str, str] = {} def capabilities(self) -> Dict[str, Any]: return { "protocol": {"version": "1"}, "runtime": {"docker": bool(self.client is not None), "native": False}, "workspace": { "tree": True, "read_file": True, "write_markdown": True, "upload_files": True, "serve_file": True, }, "monitor": {"logs": True, "ensure": True}, } @staticmethod def _normalize_resource_limits( cpu_cores: Optional[float], memory_mb: Optional[int], storage_gb: Optional[int], ) -> Tuple[float, int, int]: try: cpu = float(cpu_cores) if cpu_cores is not None else 1.0 except Exception: cpu = 1.0 try: memory = int(memory_mb) if memory_mb is not None else 1024 except Exception: memory = 1024 try: storage = int(storage_gb) if storage_gb is not None else 10 except Exception: storage = 10 if cpu < 0: cpu = 1.0 if memory < 0: memory = 1024 if storage < 0: storage = 10 cpu = 0.0 if cpu == 0 else min(16.0, max(0.1, cpu)) memory = 0 if memory == 0 else min(65536, max(256, memory)) storage = 0 if storage == 0 else min(1024, max(1, storage)) return cpu, memory, storage def has_image(self, tag: str) -> bool: if not self.client: return False try: self.client.images.get(tag) return True except Exception: return False def start_bot( self, bot_id: str, image_tag: Optional[str] = None, env_vars: Optional[Dict[str, str]] = None, workspace_root: Optional[str] = None, native_command: Optional[str] = None, native_workdir: Optional[str] = None, cpu_cores: Optional[float] = None, memory_mb: Optional[int] = None, storage_gb: Optional[int] = None, on_state_change: Optional[Callable[[str, dict], None]] = None, ) -> bool: if not self.client: return False image = image_tag or self.base_image if not self.has_image(image): return False state_nanobot_dir = self._state_nanobot_dir(bot_id=bot_id, workspace_root=workspace_root) workspace_dir = self._workspace_dir(bot_id=bot_id, workspace_root=workspace_root) default_workspace_dir = os.path.join(state_nanobot_dir, "workspace") container_name = f"worker_{bot_id}" os.makedirs(state_nanobot_dir, exist_ok=True) os.makedirs(workspace_dir, exist_ok=True) cpu, memory, storage = self._normalize_resource_limits(cpu_cores, memory_mb, storage_gb) volumes = { state_nanobot_dir: {"bind": "/root/.nanobot", "mode": "rw"}, } if os.path.abspath(workspace_dir) != os.path.abspath(default_workspace_dir): volumes[workspace_dir] = {"bind": "/root/.nanobot/workspace", "mode": "rw"} base_kwargs = { "image": image, "name": container_name, "detach": True, "stdin_open": True, "tty": True, "environment": env_vars or {}, "volumes": volumes, "network_mode": "bridge", } if memory > 0: base_kwargs["mem_limit"] = f"{memory}m" if cpu > 0: base_kwargs["nano_cpus"] = int(cpu * 1_000_000_000) try: try: container = self.client.containers.get(container_name) container.reload() if container.status == "running": if on_state_change: self.ensure_monitor(bot_id, on_state_change) return True container.remove(force=True) except docker.errors.NotFound: pass if storage > 0: try: container = self.client.containers.run( storage_opt={"size": f"{storage}G"}, **base_kwargs, ) except Exception: container = self.client.containers.run(**base_kwargs) else: container = self.client.containers.run(**base_kwargs) if on_state_change: monitor_thread = threading.Thread( target=self._monitor_container_logs, args=(bot_id, container, on_state_change), daemon=True, ) monitor_thread.start() self.active_monitors[bot_id] = monitor_thread return True except Exception: return False def ensure_monitor(self, bot_id: str, on_state_change: Callable[[str, dict], None]) -> bool: if not self.client: return False existing = self.active_monitors.get(bot_id) if existing and existing.is_alive(): return True try: container = self.client.containers.get(f"worker_{bot_id}") container.reload() if container.status != "running": return False monitor_thread = threading.Thread( target=self._monitor_container_logs, args=(bot_id, container, on_state_change), daemon=True, ) monitor_thread.start() self.active_monitors[bot_id] = monitor_thread return True except Exception: return False def stop_bot(self, bot_id: str) -> bool: if not self.client: return False try: container = self.client.containers.get(f"worker_{bot_id}") container.stop(timeout=5) container.remove() self.active_monitors.pop(bot_id, None) return True except docker.errors.NotFound: self.active_monitors.pop(bot_id, None) return False except Exception: return False def get_bot_status(self, bot_id: str) -> str: if not self.client: return "STOPPED" try: container = self.client.containers.get(f"worker_{bot_id}") container.reload() raw = str(container.status or "").strip().lower() if raw in {"running", "restarting"}: return "RUNNING" return "STOPPED" except Exception: return "STOPPED" @staticmethod def _parse_size_to_bytes(raw: Any) -> Optional[int]: if raw is None: return None text = str(raw).strip() if not text: return None try: return int(float(text)) except Exception: pass match = re.fullmatch(r"([0-9]+(?:\.[0-9]+)?)\s*([kmgtp]?)(i?b)?", text.lower()) if not match: return None number = float(match.group(1)) unit = (match.group(2) or "").lower() scale = { "": 1, "k": 1024, "m": 1024 ** 2, "g": 1024 ** 3, "t": 1024 ** 4, "p": 1024 ** 5, }.get(unit, 1) return int(number * scale) @staticmethod def _calc_cpu_percent(stats: Dict[str, Any]) -> float: try: cpu_stats = stats.get("cpu_stats") or {} precpu_stats = stats.get("precpu_stats") or {} cpu_total = float((cpu_stats.get("cpu_usage") or {}).get("total_usage") or 0) prev_cpu_total = float((precpu_stats.get("cpu_usage") or {}).get("total_usage") or 0) cpu_delta = cpu_total - prev_cpu_total system_total = float(cpu_stats.get("system_cpu_usage") or 0) prev_system_total = float(precpu_stats.get("system_cpu_usage") or 0) system_delta = system_total - prev_system_total online_cpus = int( cpu_stats.get("online_cpus") or len((cpu_stats.get("cpu_usage") or {}).get("percpu_usage") or []) or 1 ) if cpu_delta <= 0 or system_delta <= 0: return 0.0 return max(0.0, (cpu_delta / system_delta) * online_cpus * 100.0) except Exception: return 0.0 def get_bot_resource_snapshot(self, bot_id: str) -> Dict[str, Any]: snapshot: Dict[str, Any] = { "docker_status": "STOPPED", "limits": { "cpu_cores": None, "memory_bytes": None, "storage_bytes": None, "nano_cpus": 0, "storage_opt_raw": "", }, "usage": { "cpu_percent": 0.0, "memory_bytes": 0, "memory_limit_bytes": 0, "memory_percent": 0.0, "network_rx_bytes": 0, "network_tx_bytes": 0, "blk_read_bytes": 0, "blk_write_bytes": 0, "pids": 0, "container_rw_bytes": 0, }, } if not self.client: return snapshot try: container = self.client.containers.get(f"worker_{bot_id}") container.reload() status_raw = str(container.status or "").strip().lower() snapshot["docker_status"] = "RUNNING" if status_raw in {"running", "restarting"} else "STOPPED" inspect: Dict[str, Any] try: inspect = self.client.api.inspect_container(container.id, size=True) except TypeError: inspect = self.client.api.inspect_container(container.id) except Exception as e: if "unexpected keyword argument 'size'" in str(e): inspect = self.client.api.inspect_container(container.id) else: raise host_cfg = inspect.get("HostConfig") or {} nano_cpus = int(host_cfg.get("NanoCpus") or 0) cpu_quota = int(host_cfg.get("CpuQuota") or 0) cpu_period = int(host_cfg.get("CpuPeriod") or 0) memory_bytes = int(host_cfg.get("Memory") or 0) storage_opt = host_cfg.get("StorageOpt") or {} storage_raw = storage_opt.get("size") storage_bytes = self._parse_size_to_bytes(storage_raw) if nano_cpus > 0: cpu_cores = nano_cpus / 1_000_000_000 elif cpu_quota > 0 and cpu_period > 0: cpu_cores = cpu_quota / cpu_period else: cpu_cores = None snapshot["limits"] = { "cpu_cores": cpu_cores, "memory_bytes": memory_bytes if memory_bytes > 0 else None, "storage_bytes": storage_bytes, "nano_cpus": nano_cpus, "storage_opt_raw": str(storage_raw or ""), } snapshot["usage"]["container_rw_bytes"] = int(inspect.get("SizeRw") or 0) if snapshot["docker_status"] == "RUNNING": stats = container.stats(stream=False) or {} memory_stats = stats.get("memory_stats") or {} memory_usage = int(memory_stats.get("usage") or 0) memory_limit = int(memory_stats.get("limit") or 0) if memory_usage > 0: cache = int((memory_stats.get("stats") or {}).get("inactive_file") or 0) memory_usage = max(0, memory_usage - cache) networks = stats.get("networks") or {} rx_total = 0 tx_total = 0 for _, row in networks.items(): if isinstance(row, dict): rx_total += int(row.get("rx_bytes") or 0) tx_total += int(row.get("tx_bytes") or 0) blk_stats = stats.get("blkio_stats") or {} io_rows = blk_stats.get("io_service_bytes_recursive") or [] blk_read = 0 blk_write = 0 for row in io_rows: if not isinstance(row, dict): continue op = str(row.get("op") or "").upper() value = int(row.get("value") or 0) if op == "READ": blk_read += value elif op == "WRITE": blk_write += value pids_current = int((stats.get("pids_stats") or {}).get("current") or 0) cpu_percent = self._calc_cpu_percent(stats) memory_percent = 0.0 if memory_limit > 0: memory_percent = (memory_usage / memory_limit) * 100.0 if snapshot["usage"]["container_rw_bytes"] <= 0: storage_stats = stats.get("storage_stats") or {} rw_size = int(storage_stats.get("size_rw") or storage_stats.get("rw_size") or 0) snapshot["usage"]["container_rw_bytes"] = max(0, rw_size) snapshot["usage"].update( { "cpu_percent": cpu_percent, "memory_bytes": memory_usage, "memory_limit_bytes": memory_limit, "memory_percent": max(0.0, memory_percent), "network_rx_bytes": rx_total, "network_tx_bytes": tx_total, "blk_read_bytes": blk_read, "blk_write_bytes": blk_write, "pids": pids_current, } ) except docker.errors.NotFound: return snapshot except Exception: return snapshot return snapshot def send_command(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool: if not self.client: self._last_delivery_error[bot_id] = "Docker client is not available" return False media_paths = [str(v).strip().replace("\\", "/") for v in (media or []) if str(v).strip()] self._last_delivery_error.pop(bot_id, None) for attempt in range(3): if self._send_command_via_exec(bot_id, command, media_paths): self._last_delivery_error.pop(bot_id, None) return True time.sleep(0.25 * (attempt + 1)) if self._send_command_via_host_http(bot_id, command, media_paths): self._last_delivery_error.pop(bot_id, None) return True if bot_id not in self._last_delivery_error: self._last_delivery_error[bot_id] = "Unknown delivery failure" return False def get_last_delivery_error(self, bot_id: str) -> str: return str(self._last_delivery_error.get(bot_id, "") or "").strip() def get_recent_logs(self, bot_id: str, tail: int = 300) -> List[str]: if not self.client: return [] try: container = self.client.containers.get(f"worker_{bot_id}") raw = container.logs(tail=max(1, int(tail))) text = raw.decode("utf-8", errors="ignore") return [line for line in text.splitlines() if line.strip()] except Exception: return [] def parse_monitor_packet(self, line: str) -> Optional[Dict[str, Any]]: return self._parse_log_line(str(line or "").strip()) def _workspace_dir(self, *, bot_id: str, workspace_root: Optional[str]) -> str: return os.path.abspath(os.path.join(self._state_nanobot_dir(bot_id=bot_id, workspace_root=workspace_root), "workspace")) def _state_nanobot_dir(self, *, bot_id: str, workspace_root: Optional[str]) -> str: configured_root = str(workspace_root or "").strip() if not configured_root: return os.path.abspath(os.path.join(self.host_data_root, bot_id, ".nanobot")) normalized_root = os.path.abspath(os.path.expanduser(configured_root)) return os.path.abspath(os.path.join(normalized_root, bot_id, ".nanobot")) def _monitor_container_logs(self, bot_id: str, container: Any, callback: Callable[[str, dict], None]) -> None: try: buffer = "" decoder = codecs.getincrementaldecoder("utf-8")("replace") since_ts = int(time.time()) for chunk in container.logs(stream=True, follow=True, since=since_ts): text = decoder.decode(chunk) if isinstance(chunk, bytes) else str(chunk) if not text: continue buffer += text while "\n" in buffer: line, buffer = buffer.split("\n", 1) normalized = line.strip("\r").strip() if normalized: state_packet = self._parse_log_line(normalized) if state_packet: callback(bot_id, state_packet) callback(bot_id, {"type": "RAW_LOG", "text": normalized}) rest = decoder.decode(b"", final=True) if rest: buffer += rest tail = buffer.strip() if tail: state_packet = self._parse_log_line(tail) if state_packet: callback(bot_id, state_packet) callback(bot_id, {"type": "RAW_LOG", "text": tail}) except Exception: return def _parse_monitor_packet_json(self, line: str) -> Optional[Dict[str, Any]]: if "__DASHBOARD_DATA_START__" not in line or "__DASHBOARD_DATA_END__" not in line: return None try: raw_json = line.split("__DASHBOARD_DATA_START__", 1)[1].split("__DASHBOARD_DATA_END__", 1)[0].strip() data = json.loads(raw_json) event_type = str(data.get("type", "")).upper() content = str(data.get("content") or data.get("text") or "").strip() media = [str(v).strip().replace("\\", "/") for v in (data.get("media") or []) if str(v).strip()] is_progress = bool(data.get("is_progress", False)) is_tool = bool(data.get("is_tool", False)) usage = data.get("usage") if isinstance(data.get("usage"), dict) else None request_id = str(data.get("request_id") or "").strip() or None provider = str(data.get("provider") or "").strip() or None model = str(data.get("model") or "").strip() or None if event_type == "AGENT_STATE": payload = data.get("payload") or {} state = str(payload.get("state") or data.get("state") or ("TOOL_CALL" if is_tool else "THINKING")) action_msg = str(payload.get("action_msg") or payload.get("msg") or content) return { "type": "AGENT_STATE", "channel": "dashboard", "payload": {"state": state, "action_msg": action_msg}, "request_id": request_id, } if event_type == "ASSISTANT_MESSAGE": if content or media: return { "type": "ASSISTANT_MESSAGE", "channel": "dashboard", "text": content, "media": media, "usage": usage, "request_id": request_id, "provider": provider, "model": model, } return None if event_type == "BUS_EVENT" or is_progress: return { "type": "BUS_EVENT", "channel": "dashboard", "content": content, "media": media, "is_progress": is_progress, "is_tool": is_tool, "usage": usage, "request_id": request_id, "provider": provider, "model": model, } if content or media: return { "type": "ASSISTANT_MESSAGE", "channel": "dashboard", "text": content, "media": media, "usage": usage, "request_id": request_id, "provider": provider, "model": model, } except Exception: return None return None def _parse_log_line(self, line: str) -> Optional[Dict[str, Any]]: if "__DASHBOARD_DATA_START__" in line: packet = self._parse_monitor_packet_json(line) if packet: return packet process_match = re.search(r"Processing message from ([\w\-]+):[^:]+:\s*(.+)$", line) if process_match: channel = process_match.group(1).strip().lower() action_msg = process_match.group(2).strip() return { "type": "AGENT_STATE", "channel": channel, "payload": { "state": "THINKING", "action_msg": action_msg[:4000], }, } response_match = re.search(r"Response to ([\w\-]+):[^:]+:\s*(.+)$", line) if response_match: channel = response_match.group(1).strip().lower() action_msg = response_match.group(2).strip() if channel == "dashboard": return { "type": "ASSISTANT_MESSAGE", "channel": "dashboard", "text": action_msg[:4000], } return { "type": "AGENT_STATE", "channel": channel, "payload": { "state": "SUCCESS", "action_msg": action_msg[:4000], }, } lower = line.lower() tool_call_match = re.search(r"tool call:\s*(.+)$", line, re.IGNORECASE) if tool_call_match: return { "type": "AGENT_STATE", "payload": { "state": "TOOL_CALL", "action_msg": tool_call_match.group(1).strip()[:4000], }, } if "error" in lower or "traceback" in lower: return { "type": "AGENT_STATE", "payload": {"state": "ERROR", "action_msg": "执行异常,请检查日志"}, } return None def _send_command_via_exec(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool: try: container = self.client.containers.get(f"worker_{bot_id}") container.reload() if container.status != "running": self._last_delivery_error[bot_id] = f"Container status is {container.status}" return False dashboard_port = self._resolve_dashboard_port(container=container, bot_id=bot_id) dashboard_url = f"http://127.0.0.1:{dashboard_port}/chat" payload_json = json.dumps({"message": command, "media": media or []}, ensure_ascii=False) result = container.exec_run( [ "curl", "-sS", "--fail", "--max-time", "6", "-X", "POST", "-H", "Content-Type: application/json", "-d", payload_json, dashboard_url, ] ) output = result.output.decode("utf-8", errors="ignore") if isinstance(result.output, (bytes, bytearray)) else str(result.output) if result.exit_code != 0: payload_b64 = base64.b64encode(payload_json.encode("utf-8")).decode("ascii") py_script = ( "import base64,json,os,urllib.request\n" "payload=json.loads(base64.b64decode(os.environ['DASHBOARD_PAYLOAD_B64']).decode('utf-8'))\n" "req=urllib.request.Request(os.environ.get('DASHBOARD_CHAT_URL', 'http://127.0.0.1:9000/chat')," "data=json.dumps(payload,ensure_ascii=False).encode('utf-8')," "headers={'Content-Type':'application/json'})\n" "with urllib.request.urlopen(req, timeout=8) as resp:\n" " print(resp.read().decode('utf-8','ignore'))\n" ) for py_bin in ["python3", "python"]: py_result = container.exec_run( [py_bin, "-c", py_script], environment={ "DASHBOARD_PAYLOAD_B64": payload_b64, "DASHBOARD_CHAT_URL": dashboard_url, }, ) py_output = py_result.output.decode("utf-8", errors="ignore") if isinstance(py_result.output, (bytes, bytearray)) else str(py_result.output) if py_result.exit_code == 0: return True self._last_delivery_error[bot_id] = f"exec fallback failed: {py_output[:300]}" self._last_delivery_error[bot_id] = f"exec curl failed: {output[:300]}" return False return True except Exception as exc: self._last_delivery_error[bot_id] = f"exec curl exception: {exc}" return False def _send_command_via_host_http(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool: try: container = self.client.containers.get(f"worker_{bot_id}") container.reload() ip_address = self._resolve_container_ip(container) if not ip_address: self._last_delivery_error[bot_id] = "host HTTP failed: container has no reachable IP address" return False dashboard_port = self._resolve_dashboard_port(container=container, bot_id=bot_id) target_url = f"http://{ip_address}:{dashboard_port}/chat" with httpx.Client(timeout=4.0) as client: resp = client.post(target_url, json={"message": command, "media": media or []}) if resp.status_code == 200: return True self._last_delivery_error[bot_id] = f"host HTTP failed: {resp.status_code} - {resp.text[:300]}" return False except Exception as exc: self._last_delivery_error[bot_id] = f"host HTTP exception: {exc}" return False def _resolve_dashboard_port(self, *, container: Any, bot_id: str) -> int: # Dashboard channel port may be per-bot dynamic; read from mounted config.json when available. default_port = 9000 config_path = self._resolve_mounted_config_path(container=container, bot_id=bot_id) if not config_path or not os.path.isfile(config_path): return default_port try: with open(config_path, "r", encoding="utf-8") as fh: payload = json.load(fh) if not isinstance(payload, dict): return default_port channels = payload.get("channels") if not isinstance(channels, dict): return default_port dashboard = channels.get("dashboard") if not isinstance(dashboard, dict): return default_port raw_port = int(dashboard.get("port") or default_port) if 1 <= raw_port <= 65535: return raw_port except Exception: return default_port return default_port def _resolve_mounted_config_path(self, *, container: Any, bot_id: str) -> str: mounts = list((container.attrs or {}).get("Mounts") or []) for row in mounts: if not isinstance(row, dict): continue destination = str(row.get("Destination") or "").strip() if destination != "/root/.nanobot": continue source = str(row.get("Source") or "").strip() if source: return os.path.join(source, "config.json") return os.path.join(self.host_data_root, bot_id, ".nanobot", "config.json") @staticmethod def _resolve_container_ip(container: Any) -> str: attrs = dict(getattr(container, "attrs", {}) or {}) network = dict(attrs.get("NetworkSettings") or {}) primary = str(network.get("IPAddress") or "").strip() if primary: return primary networks = dict(network.get("Networks") or {}) for _, row in networks.items(): if not isinstance(row, dict): continue ip = str(row.get("IPAddress") or "").strip() if ip: return ip return ""