dashboard-nanobot/backend/core/docker_manager.py

1150 lines
46 KiB
Python

import os
import re
import threading
import time
import codecs
import base64
from typing import Any, Callable, Dict, List, Optional, Tuple
import json
import docker
class BotDockerManager:
_RUNTIME_BOOTSTRAP_LABEL_KEY = "dashboard.runtime_bootstrap"
_RUNTIME_BOOTSTRAP_LABEL_VALUE = "env-json-v1"
_DASHBOARD_READY_LOG_MARKERS = (
"nanobot.channels.dashboard:start",
"dashboard channel 代理已上线",
)
_DASHBOARD_FAILURE_LOG_MARKERS = (
"failed to start channel dashboard",
"dashboard channel not available",
)
def __init__(
self,
host_data_root: str,
base_image: str = "nanobot-base",
network_name: str = "",
):
try:
self.client = docker.from_env(timeout=6)
self.client.version()
print("✅ Docker 引擎连接成功")
except Exception as e:
self.client = None
print(f"⚠️ 警告: 无法连接到 Docker 引擎。请确保 Docker Desktop 已启动。错误: {e}")
self.host_data_root = host_data_root
self.base_image = base_image
self.network_name = str(network_name or "").strip()
self.active_monitors = {}
self._last_delivery_error: Dict[str, str] = {}
self._storage_limit_supported: Optional[bool] = None
self._storage_limit_warning_emitted = False
@staticmethod
def _normalize_resource_limits(
cpu_cores: Optional[float],
memory_mb: Optional[int],
storage_gb: Optional[int],
) -> Tuple[float, int, int]:
try:
cpu = float(cpu_cores) if cpu_cores is not None else 1.0
except Exception:
cpu = 1.0
try:
memory = int(memory_mb) if memory_mb is not None else 1024
except Exception:
memory = 1024
try:
storage = int(storage_gb) if storage_gb is not None else 10
except Exception:
storage = 10
if cpu < 0:
cpu = 1.0
if memory < 0:
memory = 1024
if storage < 0:
storage = 10
cpu = 0.0 if cpu == 0 else min(16.0, max(0.1, cpu))
memory = 0 if memory == 0 else min(65536, max(256, memory))
storage = 0 if storage == 0 else min(1024, max(1, storage))
return cpu, memory, storage
def has_image(self, tag: str) -> bool:
if not self.client:
return False
try:
self.client.images.get(tag)
return True
except docker.errors.ImageNotFound:
return False
except Exception:
return False
def list_images_by_repo(self, repository: str = "nanobot-base") -> List[Dict[str, str]]:
"""List docker images by repository prefix, returning normalized tag/id pairs."""
if not self.client:
return []
rows: List[Dict[str, str]] = []
try:
images = self.client.images.list(name=repository)
for image in images:
for tag in image.tags:
repo, _, version = tag.partition(":")
if repo != repository or not version:
continue
rows.append(
{
"tag": tag,
"version": version.removeprefix("v"),
"image_id": image.id,
}
)
except Exception as e:
print(f"[DockerManager] list_images_by_repo failed: {e}")
return rows
@staticmethod
def _docker_error_message(exc: Exception) -> str:
explanation = getattr(exc, "explanation", None)
if isinstance(explanation, bytes):
try:
explanation = explanation.decode("utf-8", errors="replace")
except Exception:
explanation = str(explanation)
if explanation:
return str(explanation)
response = getattr(exc, "response", None)
text = getattr(response, "text", None)
if text:
return str(text)
return str(exc)
@classmethod
def _is_unsupported_storage_opt_error(cls, exc: Exception) -> bool:
message = cls._docker_error_message(exc).lower()
if "storage-opt" not in message and "storage opt" not in message:
return False
markers = (
"overlay over xfs",
"overlay2 over xfs",
"pquota",
"project quota",
"storage driver does not support",
"xfs",
)
return any(marker in message for marker in markers)
def _cleanup_container_if_exists(self, container_name: str) -> None:
if not self.client:
return
try:
container = self.client.containers.get(container_name)
container.remove(force=True)
except docker.errors.NotFound:
pass
except Exception as e:
print(f"[DockerManager] failed to cleanup container {container_name}: {e}")
def _resolve_container_network(self) -> str:
if not self.client or not self.network_name:
return "bridge"
try:
self.client.networks.get(self.network_name)
return self.network_name
except docker.errors.NotFound:
print(f"[DockerManager] network '{self.network_name}' not found; falling back to bridge")
except Exception as e:
print(f"[DockerManager] failed to inspect network '{self.network_name}': {e}; falling back to bridge")
return "bridge"
@staticmethod
def _container_uses_network(container: Any, network_name: str) -> bool:
attrs = getattr(container, "attrs", {}) or {}
network_settings = attrs.get("NetworkSettings") or {}
networks = network_settings.get("Networks") or {}
if network_name in networks:
return True
if network_name == "bridge" and not networks and str(network_settings.get("IPAddress") or "").strip():
return True
return False
@staticmethod
def _get_container_network_ip(container: Any, preferred_network: str = "") -> str:
attrs = getattr(container, "attrs", {}) or {}
network_settings = attrs.get("NetworkSettings") or {}
networks = network_settings.get("Networks") or {}
if preferred_network:
preferred = networks.get(preferred_network) or {}
preferred_ip = str(preferred.get("IPAddress") or "").strip()
if preferred_ip:
return preferred_ip
for network in networks.values():
ip_address = str((network or {}).get("IPAddress") or "").strip()
if ip_address:
return ip_address
return str(network_settings.get("IPAddress") or "").strip()
@classmethod
def _container_uses_expected_bootstrap(cls, container: Any) -> bool:
attrs = getattr(container, "attrs", {}) or {}
config = attrs.get("Config") or {}
labels = config.get("Labels") or {}
return str(labels.get(cls._RUNTIME_BOOTSTRAP_LABEL_KEY) or "").strip() == cls._RUNTIME_BOOTSTRAP_LABEL_VALUE
@staticmethod
def _runtime_bootstrap_entrypoint() -> List[str]:
bootstrap_code = "\n".join(
[
"import json",
"import os",
"import pathlib",
"import re",
"",
"path = pathlib.Path('/root/.nanobot/env.json')",
"pattern = re.compile(r'^[A-Z_][A-Z0-9_]{0,127}$')",
"data = {}",
"if path.is_file():",
" try:",
" data = json.loads(path.read_text(encoding='utf-8'))",
" except Exception:",
" data = {}",
"if not isinstance(data, dict):",
" data = {}",
"for raw_key, raw_value in data.items():",
" key = str(raw_key or '').strip().upper()",
" if not pattern.fullmatch(key):",
" continue",
" os.environ[key] = str(raw_value or '').strip()",
"os.execvp('nanobot', ['nanobot', 'gateway'])",
]
)
return [
"python",
"-c",
bootstrap_code,
]
@staticmethod
def _container_has_mount(container: Any, source: str, destination: str) -> bool:
attrs = getattr(container, "attrs", {}) or {}
mounts = attrs.get("Mounts") or []
expected_source = os.path.normpath(source)
expected_destination = str(destination or "").strip()
for mount in mounts:
if not isinstance(mount, dict):
continue
current_source = os.path.normpath(str(mount.get("Source") or ""))
current_destination = str(mount.get("Destination") or "").strip()
if current_source != expected_source or current_destination != expected_destination:
continue
if mount.get("RW") is False:
continue
return True
return False
@staticmethod
def _desired_memory_bytes(memory_mb: int) -> int:
return int(memory_mb) * 1024 * 1024 if int(memory_mb or 0) > 0 else 0
@staticmethod
def _desired_storage_bytes(storage_gb: int) -> Optional[int]:
storage = int(storage_gb or 0)
if storage <= 0:
return None
return storage * 1024 * 1024 * 1024
@staticmethod
def _get_container_cpu_cores(container: Any) -> float:
attrs = getattr(container, "attrs", {}) or {}
host_cfg = attrs.get("HostConfig") or {}
nano_cpus = int(host_cfg.get("NanoCpus") or 0)
if nano_cpus > 0:
return nano_cpus / 1_000_000_000
cpu_quota = int(host_cfg.get("CpuQuota") or 0)
cpu_period = int(host_cfg.get("CpuPeriod") or 0)
if cpu_quota > 0 and cpu_period > 0:
return cpu_quota / cpu_period
return 0.0
@staticmethod
def _normalize_image_id(raw: Any) -> str:
text = str(raw or "").strip().lower()
if text.startswith("sha256:"):
return text[7:]
return text
@classmethod
def _get_container_image_id(cls, container: Any) -> str:
attrs = getattr(container, "attrs", {}) or {}
image_id = attrs.get("Image")
if image_id:
return cls._normalize_image_id(image_id)
image = getattr(container, "image", None)
return cls._normalize_image_id(getattr(image, "id", ""))
def _resolve_image_id(self, image_ref: str) -> str:
if not self.client:
return ""
try:
image = self.client.images.get(image_ref)
except Exception as e:
print(f"[DockerManager] failed to resolve image id for {image_ref}: {e}")
return ""
return self._normalize_image_id(getattr(image, "id", ""))
def _container_storage_matches(self, actual_storage_bytes: Optional[int], desired_storage_gb: int) -> bool:
expected_storage_bytes = self._desired_storage_bytes(desired_storage_gb)
if expected_storage_bytes is None:
return actual_storage_bytes in {None, 0}
if actual_storage_bytes == expected_storage_bytes:
return True
return actual_storage_bytes is None and self._storage_limit_supported is not True
def _container_matches_runtime(
self,
container: Any,
*,
image_id: str,
cpu_cores: float,
memory_mb: int,
storage_gb: int,
bot_workspace: str,
network_name: str,
) -> bool:
attrs = getattr(container, "attrs", {}) or {}
host_cfg = attrs.get("HostConfig") or {}
current_image_id = self._get_container_image_id(container)
desired_image_id = self._normalize_image_id(image_id)
if not desired_image_id or not current_image_id or current_image_id != desired_image_id:
return False
if not self._container_uses_expected_bootstrap(container):
return False
if not self._container_uses_network(container, network_name):
return False
if not self._container_has_mount(container, bot_workspace, "/root/.nanobot"):
return False
actual_memory_bytes = int(host_cfg.get("Memory") or 0)
if actual_memory_bytes != self._desired_memory_bytes(memory_mb):
return False
desired_cpu = float(cpu_cores or 0)
actual_cpu = self._get_container_cpu_cores(container)
if abs(actual_cpu - desired_cpu) > 0.01:
return False
storage_opt = host_cfg.get("StorageOpt") or {}
actual_storage_bytes = self._parse_size_to_bytes(storage_opt.get("size"))
if not self._container_storage_matches(actual_storage_bytes, storage_gb):
return False
return True
def _run_container_with_storage_fallback(
self,
bot_id: str,
container_name: str,
storage_gb: int,
**base_kwargs: Any,
):
if not self.client:
raise RuntimeError("Docker client is not available")
if storage_gb <= 0:
return self.client.containers.run(**base_kwargs)
if self._storage_limit_supported is False:
return self.client.containers.run(**base_kwargs)
try:
container = self.client.containers.run(
storage_opt={"size": f"{storage_gb}G"},
**base_kwargs,
)
self._storage_limit_supported = True
return container
except Exception as exc:
if not self._is_unsupported_storage_opt_error(exc):
raise
self._storage_limit_supported = False
if not self._storage_limit_warning_emitted:
print(
"[DockerManager] storage limit not supported by current Docker storage driver; "
f"falling back to unlimited container filesystem size. Details: {self._docker_error_message(exc)}"
)
self._storage_limit_warning_emitted = True
else:
print(f"[DockerManager] storage limit skipped for {bot_id}: unsupported by current Docker storage driver")
self._cleanup_container_if_exists(container_name)
return self.client.containers.run(**base_kwargs)
def start_bot(
self,
bot_id: str,
image_tag: Optional[str] = None,
env_vars: Optional[Dict[str, str]] = None,
cpu_cores: Optional[float] = None,
memory_mb: Optional[int] = None,
storage_gb: Optional[int] = None,
on_state_change: Optional[Callable[[str, dict], None]] = None,
) -> bool:
if not self.client:
print("❌ 错误: Docker 客户端未初始化,无法启动机器人。")
return False
image = image_tag or self.base_image
if not self.has_image(image):
print(f"❌ 错误: 镜像不存在: {image}")
return False
desired_image_id = self._resolve_image_id(image)
if not desired_image_id:
print(f"❌ 错误: 无法解析镜像 ID: {image}")
return False
bot_workspace = os.path.join(self.host_data_root, bot_id, ".nanobot")
container_name = f"worker_{bot_id}"
os.makedirs(bot_workspace, exist_ok=True)
cpu, memory, storage = self._normalize_resource_limits(cpu_cores, memory_mb, storage_gb)
target_network = self._resolve_container_network()
base_kwargs = {
"image": image,
"name": container_name,
"detach": True,
"stdin_open": True,
"tty": True,
"entrypoint": self._runtime_bootstrap_entrypoint(),
"labels": {
self._RUNTIME_BOOTSTRAP_LABEL_KEY: self._RUNTIME_BOOTSTRAP_LABEL_VALUE,
},
"volumes": {
bot_workspace: {"bind": "/root/.nanobot", "mode": "rw"},
},
"network": target_network,
}
if memory > 0:
base_kwargs["mem_limit"] = f"{memory}m"
if cpu > 0:
base_kwargs["nano_cpus"] = int(cpu * 1_000_000_000)
try:
try:
container = self.client.containers.get(container_name)
container.reload()
runtime_matches = self._container_matches_runtime(
container,
image_id=desired_image_id,
cpu_cores=cpu,
memory_mb=memory,
storage_gb=storage,
bot_workspace=bot_workspace,
network_name=target_network,
)
if container.status in {"running", "restarting"} and runtime_matches:
if on_state_change:
self.ensure_monitor(bot_id, on_state_change)
return True
if container.status in {"running", "restarting"}:
if not self._container_uses_network(container, target_network):
print(
f"[DockerManager] recreating {container_name} to switch network "
f"from current attachment to '{target_network}'"
)
else:
print(f"[DockerManager] recreating {container_name} because container config no longer matches desired runtime")
container.remove(force=True)
elif runtime_matches:
container.start()
if on_state_change:
self.ensure_monitor(bot_id, on_state_change)
return True
else:
print(f"[DockerManager] recreating {container_name} because container config no longer matches desired runtime")
container.remove(force=True)
except docker.errors.NotFound:
pass
container = None
container = self._run_container_with_storage_fallback(
bot_id,
container_name,
storage,
**base_kwargs,
)
if on_state_change:
monitor_thread = threading.Thread(
target=self._monitor_container_logs,
args=(bot_id, container, on_state_change),
daemon=True,
)
monitor_thread.start()
self.active_monitors[bot_id] = monitor_thread
return True
except Exception as e:
print(f"[DockerManager] Error starting bot {bot_id}: {e}")
return False
def ensure_monitor(self, bot_id: str, on_state_change: Callable[[str, dict], None]) -> bool:
"""Ensure an active log monitor exists for a running bot container."""
if not self.client:
return False
existing = self.active_monitors.get(bot_id)
if existing and existing.is_alive():
return True
try:
container = self.client.containers.get(f"worker_{bot_id}")
container.reload()
if container.status != "running":
return False
monitor_thread = threading.Thread(
target=self._monitor_container_logs,
args=(bot_id, container, on_state_change),
daemon=True,
)
monitor_thread.start()
self.active_monitors[bot_id] = monitor_thread
return True
except docker.errors.NotFound:
return False
except Exception as e:
print(f"[DockerManager] Error ensuring monitor for {bot_id}: {e}")
return False
def stop_bot(self, bot_id: str, remove: bool = False) -> bool:
if not self.client:
return False
container_name = f"worker_{bot_id}"
try:
container = self.client.containers.get(container_name)
container.reload()
if str(container.status or "").strip().lower() in {"running", "restarting", "paused"}:
container.stop(timeout=5)
if remove:
container.remove()
self.active_monitors.pop(bot_id, None)
return True
except docker.errors.NotFound:
self.active_monitors.pop(bot_id, None)
return False
except Exception as e:
print(f"[DockerManager] Error stopping bot {bot_id}: {e}")
return False
def send_command(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
"""Send a command to dashboard channel with robust container-local delivery."""
if not self.client:
self._last_delivery_error[bot_id] = "Docker client is not available"
return False
media_paths = [str(v).strip().replace("\\", "/") for v in (media or []) if str(v).strip()]
self._last_delivery_error.pop(bot_id, None)
if not self._wait_for_dashboard_ready(bot_id):
if bot_id not in self._last_delivery_error:
self._last_delivery_error[bot_id] = "Dashboard channel is not ready"
return False
# Primary path on Docker Desktop/Mac: execute curl inside container namespace.
for attempt in range(3):
if self._send_command_via_exec(bot_id, command, media_paths):
self._last_delivery_error.pop(bot_id, None)
return True
time.sleep(0.25 * (attempt + 1))
# Secondary path for environments where host can reach container IP.
if self._send_command_via_host_http(bot_id, command, media_paths):
self._last_delivery_error.pop(bot_id, None)
return True
if bot_id not in self._last_delivery_error:
self._last_delivery_error[bot_id] = "Unknown delivery failure"
return False
def get_last_delivery_error(self, bot_id: str) -> str:
return str(self._last_delivery_error.get(bot_id, "") or "").strip()
@classmethod
def _log_indicates_dashboard_ready(cls, line: str) -> bool:
lowered = str(line or "").strip().lower()
return any(marker in lowered for marker in cls._DASHBOARD_READY_LOG_MARKERS)
@classmethod
def _log_indicates_dashboard_failure(cls, line: str) -> bool:
lowered = str(line or "").strip().lower()
return any(marker in lowered for marker in cls._DASHBOARD_FAILURE_LOG_MARKERS)
def _wait_for_dashboard_ready(
self,
bot_id: str,
timeout_seconds: float = 15.0,
poll_interval_seconds: float = 0.5,
) -> bool:
deadline = time.monotonic() + max(1.0, timeout_seconds)
while time.monotonic() < deadline:
status = self.get_bot_status(bot_id)
if status != "RUNNING":
self._last_delivery_error[bot_id] = f"Container status is {status.lower()}"
return False
logs = self.get_recent_logs(bot_id, tail=200)
for line in logs:
if self._log_indicates_dashboard_failure(line):
detail = str(line or "").strip()
self._last_delivery_error[bot_id] = detail[:300] if detail else "Dashboard channel failed to start"
return False
if self._log_indicates_dashboard_ready(line):
return True
time.sleep(max(0.1, poll_interval_seconds))
self._last_delivery_error[bot_id] = (
f"Dashboard channel was not ready within {int(max(1.0, timeout_seconds))}s"
)
return False
def get_bot_status(self, bot_id: str) -> str:
"""Return normalized runtime status from Docker: RUNNING or STOPPED."""
if not self.client:
return "STOPPED"
try:
container = self.client.containers.get(f"worker_{bot_id}")
container.reload()
raw = str(container.status or "").strip().lower()
if raw in {"running", "restarting"}:
return "RUNNING"
return "STOPPED"
except docker.errors.NotFound:
return "STOPPED"
except Exception:
return "STOPPED"
@staticmethod
def _parse_size_to_bytes(raw: Any) -> Optional[int]:
if raw is None:
return None
text = str(raw).strip()
if not text:
return None
try:
return int(float(text))
except Exception:
pass
match = re.fullmatch(r"([0-9]+(?:\.[0-9]+)?)\s*([kmgtp]?)(i?b)?", text.lower())
if not match:
return None
number = float(match.group(1))
unit = (match.group(2) or "").lower()
scale = {
"": 1,
"k": 1024,
"m": 1024 ** 2,
"g": 1024 ** 3,
"t": 1024 ** 4,
"p": 1024 ** 5,
}.get(unit, 1)
return int(number * scale)
@staticmethod
def _calc_cpu_percent(stats: Dict[str, Any]) -> float:
try:
cpu_stats = stats.get("cpu_stats") or {}
precpu_stats = stats.get("precpu_stats") or {}
cpu_total = float((cpu_stats.get("cpu_usage") or {}).get("total_usage") or 0)
prev_cpu_total = float((precpu_stats.get("cpu_usage") or {}).get("total_usage") or 0)
cpu_delta = cpu_total - prev_cpu_total
system_total = float(cpu_stats.get("system_cpu_usage") or 0)
prev_system_total = float(precpu_stats.get("system_cpu_usage") or 0)
system_delta = system_total - prev_system_total
online_cpus = int(
cpu_stats.get("online_cpus")
or len((cpu_stats.get("cpu_usage") or {}).get("percpu_usage") or [])
or 1
)
if cpu_delta <= 0 or system_delta <= 0:
return 0.0
return max(0.0, (cpu_delta / system_delta) * online_cpus * 100.0)
except Exception:
return 0.0
def get_bot_resource_snapshot(self, bot_id: str) -> Dict[str, Any]:
snapshot: Dict[str, Any] = {
"docker_status": "STOPPED",
"limits": {
"cpu_cores": None,
"memory_bytes": None,
"storage_bytes": None,
"nano_cpus": 0,
"storage_opt_raw": "",
},
"usage": {
"cpu_percent": 0.0,
"memory_bytes": 0,
"memory_limit_bytes": 0,
"memory_percent": 0.0,
"network_rx_bytes": 0,
"network_tx_bytes": 0,
"blk_read_bytes": 0,
"blk_write_bytes": 0,
"pids": 0,
"container_rw_bytes": 0,
},
}
if not self.client:
return snapshot
try:
container = self.client.containers.get(f"worker_{bot_id}")
container.reload()
status_raw = str(container.status or "").strip().lower()
snapshot["docker_status"] = "RUNNING" if status_raw in {"running", "restarting"} else "STOPPED"
inspect: Dict[str, Any]
try:
inspect = self.client.api.inspect_container(container.id, size=True)
except TypeError:
# Older docker SDK versions do not support `size` kwarg.
inspect = self.client.api.inspect_container(container.id)
except Exception as e:
if "unexpected keyword argument 'size'" in str(e):
inspect = self.client.api.inspect_container(container.id)
else:
raise
host_cfg = inspect.get("HostConfig") or {}
nano_cpus = int(host_cfg.get("NanoCpus") or 0)
cpu_quota = int(host_cfg.get("CpuQuota") or 0)
cpu_period = int(host_cfg.get("CpuPeriod") or 0)
memory_bytes = int(host_cfg.get("Memory") or 0)
storage_opt = host_cfg.get("StorageOpt") or {}
storage_raw = storage_opt.get("size")
storage_bytes = self._parse_size_to_bytes(storage_raw)
if nano_cpus > 0:
cpu_cores = nano_cpus / 1_000_000_000
elif cpu_quota > 0 and cpu_period > 0:
cpu_cores = cpu_quota / cpu_period
else:
cpu_cores = None
snapshot["limits"] = {
"cpu_cores": cpu_cores,
"memory_bytes": memory_bytes if memory_bytes > 0 else None,
"storage_bytes": storage_bytes,
"nano_cpus": nano_cpus,
"storage_opt_raw": str(storage_raw or ""),
}
snapshot["usage"]["container_rw_bytes"] = int(inspect.get("SizeRw") or 0)
if snapshot["docker_status"] == "RUNNING":
stats = container.stats(stream=False) or {}
memory_stats = stats.get("memory_stats") or {}
memory_usage = int(memory_stats.get("usage") or 0)
memory_limit = int(memory_stats.get("limit") or 0)
if memory_usage > 0:
cache = int((memory_stats.get("stats") or {}).get("inactive_file") or 0)
memory_usage = max(0, memory_usage - cache)
networks = stats.get("networks") or {}
rx_total = 0
tx_total = 0
for _, row in networks.items():
if isinstance(row, dict):
rx_total += int(row.get("rx_bytes") or 0)
tx_total += int(row.get("tx_bytes") or 0)
blk_stats = stats.get("blkio_stats") or {}
io_rows = blk_stats.get("io_service_bytes_recursive") or []
blk_read = 0
blk_write = 0
for row in io_rows:
if not isinstance(row, dict):
continue
op = str(row.get("op") or "").upper()
value = int(row.get("value") or 0)
if op == "READ":
blk_read += value
elif op == "WRITE":
blk_write += value
pids_current = int((stats.get("pids_stats") or {}).get("current") or 0)
cpu_percent = self._calc_cpu_percent(stats)
memory_percent = 0.0
if memory_limit > 0:
memory_percent = (memory_usage / memory_limit) * 100.0
if snapshot["usage"]["container_rw_bytes"] <= 0:
storage_stats = stats.get("storage_stats") or {}
rw_size = int(
storage_stats.get("size_rw")
or storage_stats.get("rw_size")
or 0
)
snapshot["usage"]["container_rw_bytes"] = max(0, rw_size)
snapshot["usage"].update(
{
"cpu_percent": cpu_percent,
"memory_bytes": memory_usage,
"memory_limit_bytes": memory_limit,
"memory_percent": max(0.0, memory_percent),
"network_rx_bytes": rx_total,
"network_tx_bytes": tx_total,
"blk_read_bytes": blk_read,
"blk_write_bytes": blk_write,
"pids": pids_current,
}
)
except docker.errors.NotFound:
return snapshot
except Exception as e:
print(f"[DockerManager] get_bot_resource_snapshot failed for {bot_id}: {e}")
return snapshot
def _send_command_via_exec(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
try:
container = self.client.containers.get(f"worker_{bot_id}")
container.reload()
if container.status != "running":
self._last_delivery_error[bot_id] = f"Container status is {container.status}"
return False
payload_json = json.dumps({"message": command, "media": media or []}, ensure_ascii=False)
# Try direct curl first (no shell dependency).
result = container.exec_run(
[
"curl",
"-sS",
"--fail",
"--max-time",
"6",
"-X",
"POST",
"-H",
"Content-Type: application/json",
"-d",
payload_json,
"http://127.0.0.1:9000/chat",
]
)
output = result.output.decode("utf-8", errors="ignore") if isinstance(result.output, (bytes, bytearray)) else str(result.output)
if result.exit_code != 0:
reason = f"exec curl failed: exit={result.exit_code}, out={output[:300]}"
print(f"[DockerManager] {reason}")
self._last_delivery_error[bot_id] = reason
# Fallback inside container without curl/shell.
payload_b64 = base64.b64encode(payload_json.encode("utf-8")).decode("ascii")
py_script = (
"import base64,json,os,urllib.request\n"
"payload=json.loads(base64.b64decode(os.environ['DASHBOARD_PAYLOAD_B64']).decode('utf-8'))\n"
"req=urllib.request.Request('http://127.0.0.1:9000/chat',"
"data=json.dumps(payload,ensure_ascii=False).encode('utf-8'),"
"headers={'Content-Type':'application/json'})\n"
"with urllib.request.urlopen(req, timeout=8) as resp:\n"
" print(resp.read().decode('utf-8','ignore'))\n"
)
py_bins = ["python3", "python"]
for py_bin in py_bins:
py_result = container.exec_run(
[py_bin, "-c", py_script],
environment={"DASHBOARD_PAYLOAD_B64": payload_b64},
)
py_output = py_result.output.decode("utf-8", errors="ignore") if isinstance(py_result.output, (bytes, bytearray)) else str(py_result.output)
if py_result.exit_code != 0:
py_reason = f"exec {py_bin} fallback failed: exit={py_result.exit_code}, out={py_output[:300]}"
print(f"[DockerManager] {py_reason}")
self._last_delivery_error[bot_id] = py_reason
continue
if py_output.strip():
try:
parsed = json.loads(py_output)
if str(parsed.get("status", "")).lower() != "ok":
py_reason = f"exec {py_bin} fallback non-ok response: {py_output[:300]}"
print(f"[DockerManager] {py_reason}")
self._last_delivery_error[bot_id] = py_reason
continue
except Exception:
pass
return True
return False
if output.strip():
try:
parsed = json.loads(output)
if str(parsed.get("status", "")).lower() != "ok":
reason = f"exec curl non-ok response: {output[:300]}"
print(f"[DockerManager] {reason}")
self._last_delivery_error[bot_id] = reason
return False
except Exception:
# Non-JSON but zero exit still treated as success.
pass
return True
except Exception as e:
reason = f"exec curl exception: {e}"
print(f"[DockerManager] {reason}")
self._last_delivery_error[bot_id] = reason
return False
def _send_command_via_host_http(self, bot_id: str, command: str, media: Optional[List[str]] = None) -> bool:
try:
import httpx
container_name = f"worker_{bot_id}"
payload = {"message": command, "media": media or []}
container = self.client.containers.get(container_name)
container.reload()
ip_address = self._get_container_network_ip(container, preferred_network=self.network_name) or "127.0.0.1"
target_url = f"http://{ip_address}:9000/chat"
with httpx.Client(timeout=4.0) as client:
resp = client.post(target_url, json=payload)
if resp.status_code == 200:
return True
reason = f"host HTTP failed: {resp.status_code} - {resp.text[:300]}"
print(f"[DockerManager] {reason}")
self._last_delivery_error[bot_id] = reason
return False
except Exception as e:
reason = f"host HTTP exception: {e}"
print(f"[DockerManager] {reason}")
self._last_delivery_error[bot_id] = reason
return False
def _read_log_lines_with_client(self, client, bot_id: str, tail: Optional[int] = None) -> List[str]:
container = client.containers.get(f"worker_{bot_id}")
raw = container.logs(tail=max(1, int(tail))) if tail is not None else container.logs()
if isinstance(raw, (bytes, bytearray)):
text = raw.decode("utf-8", errors="ignore")
else:
text = str(raw or "")
return [line for line in text.splitlines() if line.strip()]
def _read_log_lines(self, bot_id: str, tail: Optional[int] = None) -> List[str]:
if not self.client:
return []
try:
return self._read_log_lines_with_client(self.client, bot_id, tail=tail)
except Exception as e:
print(f"[DockerManager] Error reading logs for {bot_id}: {e}")
return []
def get_recent_logs(self, bot_id: str, tail: int = 300) -> List[str]:
return self._read_log_lines(bot_id, tail=max(1, int(tail)))
def get_logs_page(
self,
bot_id: str,
offset: int = 0,
limit: int = 50,
reverse: bool = True,
) -> Dict[str, Any]:
safe_offset = max(0, int(offset))
safe_limit = max(1, int(limit))
if reverse:
# Docker logs API supports tail but not arbitrary offsets. For reverse pagination
# we only read the minimal newest slice needed for the requested page.
tail_count = safe_offset + safe_limit + 1
lines = self._read_log_lines(bot_id, tail=tail_count)
ordered = list(reversed(lines))
page = ordered[safe_offset:safe_offset + safe_limit]
has_more = len(lines) > safe_offset + safe_limit
return {
"logs": page,
"total": None,
"offset": safe_offset,
"limit": safe_limit,
"has_more": has_more,
"reverse": reverse,
}
lines = self._read_log_lines(bot_id, tail=None)
total = len(lines)
page = lines[safe_offset:safe_offset + safe_limit]
return {
"logs": page,
"total": total,
"offset": safe_offset,
"limit": safe_limit,
"has_more": safe_offset + safe_limit < total,
"reverse": reverse,
}
def _monitor_container_logs(self, bot_id: str, container, callback: Callable[[str, dict], None]):
try:
buffer = ""
dashboard_capture: Optional[str] = None
decoder = codecs.getincrementaldecoder("utf-8")("replace")
# Only tail new logs from "now" to avoid replaying historical stdout
# (which would repopulate cleared chat messages from old dashboard packets).
since_ts = int(time.time())
for chunk in container.logs(stream=True, follow=True, since=since_ts):
if isinstance(chunk, bytes):
text = decoder.decode(chunk)
else:
text = str(chunk)
if not text:
continue
buffer += text
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
normalized = line.strip("\r").strip()
if not normalized:
continue
if dashboard_capture is not None:
dashboard_capture = f"{dashboard_capture}\n{normalized}"
if "__DASHBOARD_DATA_END__" in dashboard_capture:
state_packet = self._parse_dashboard_packet(dashboard_capture)
if state_packet:
callback(bot_id, state_packet)
dashboard_capture = None
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
continue
if "__DASHBOARD_DATA_START__" in normalized and "__DASHBOARD_DATA_END__" not in normalized:
dashboard_capture = normalized
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
continue
state_packet = self._parse_log_line(normalized)
if state_packet:
callback(bot_id, state_packet)
callback(bot_id, {"type": "RAW_LOG", "text": normalized})
rest = decoder.decode(b"", final=True)
if rest:
buffer += rest
tail = buffer.strip()
if tail:
state_packet = self._parse_log_line(tail)
if state_packet:
callback(bot_id, state_packet)
callback(bot_id, {"type": "RAW_LOG", "text": tail})
except Exception as e:
print(f"[DockerManager] Log stream closed for {bot_id}: {e}")
def _parse_dashboard_packet(self, line: str):
if "__DASHBOARD_DATA_START__" not in line or "__DASHBOARD_DATA_END__" not in line:
return None
try:
raw_json = line.split("__DASHBOARD_DATA_START__", 1)[1].split("__DASHBOARD_DATA_END__", 1)[0].strip()
data = json.loads(raw_json)
event_type = str(data.get("type", "")).upper()
content = str(data.get("content") or data.get("text") or "").strip()
media = [str(v).strip().replace("\\", "/") for v in (data.get("media") or []) if str(v).strip()]
is_progress = bool(data.get("is_progress", False))
is_tool = bool(data.get("is_tool", False))
usage = data.get("usage") if isinstance(data.get("usage"), dict) else None
request_id = str(data.get("request_id") or "").strip() or None
provider = str(data.get("provider") or "").strip() or None
model = str(data.get("model") or "").strip() or None
if event_type == "AGENT_STATE":
payload = data.get("payload") or {}
state = str(payload.get("state") or data.get("state") or ("TOOL_CALL" if is_tool else "THINKING"))
action_msg = str(payload.get("action_msg") or payload.get("msg") or content)
return {
"type": "AGENT_STATE",
"channel": "dashboard",
"payload": {"state": state, "action_msg": action_msg},
"request_id": request_id,
}
if event_type == "ASSISTANT_MESSAGE":
if content or media:
return {
"type": "ASSISTANT_MESSAGE",
"channel": "dashboard",
"text": content,
"media": media,
"usage": usage,
"request_id": request_id,
"provider": provider,
"model": model,
}
return None
if event_type == "BUS_EVENT" or is_progress:
return {
"type": "BUS_EVENT",
"channel": "dashboard",
"content": content,
"media": media,
"is_progress": is_progress,
"is_tool": is_tool,
"usage": usage,
"request_id": request_id,
"provider": provider,
"model": model,
}
if content or media:
return {
"type": "ASSISTANT_MESSAGE",
"channel": "dashboard",
"text": content,
"media": media,
"usage": usage,
"request_id": request_id,
"provider": provider,
"model": model,
}
except Exception:
return None
return None
def _parse_log_line(self, line: str):
# 1. 结构化数据解析(首选,直接从机器人总线获取)
if "__DASHBOARD_DATA_START__" in line:
packet = self._parse_dashboard_packet(line)
if packet:
return packet
# 2. 解析全渠道运行态日志(用于右侧状态面板)
process_match = re.search(r"Processing message from ([\w\-]+):[^:]+:\s*(.+)$", line)
if process_match:
channel = process_match.group(1).strip().lower()
action_msg = process_match.group(2).strip()
return {
"type": "AGENT_STATE",
"channel": channel,
"payload": {
"state": "THINKING",
"action_msg": action_msg[:4000],
},
}
response_match = re.search(r"Response to ([\w\-]+):[^:]+:\s*(.+)$", line)
if response_match:
channel = response_match.group(1).strip().lower()
action_msg = response_match.group(2).strip()
return {
"type": "AGENT_STATE",
"channel": channel,
"payload": {
"state": "SUCCESS",
"action_msg": action_msg[:4000],
},
}
# 3. 备选方案:常规日志解析
lower = line.lower()
tool_call_match = re.search(r"tool call:\s*(.+)$", line, re.IGNORECASE)
if tool_call_match:
return {
"type": "AGENT_STATE",
"payload": {
"state": "TOOL_CALL",
"action_msg": tool_call_match.group(1).strip()[:4000],
},
}
if "error" in lower or "traceback" in lower:
return {
"type": "AGENT_STATE",
"payload": {"state": "ERROR", "action_msg": "执行异常,请检查日志"},
}
return None