dashboard-nanobot/backend/services/bot_query_service.py

181 lines
8.3 KiB
Python

from datetime import datetime
from typing import Any, Callable, Dict, Optional
from fastapi import HTTPException
from sqlmodel import Session
from clients.edge.errors import log_edge_failure
from models.bot import BotInstance
CacheKeyBotsList = Callable[[Optional[int]], str]
CacheKeyBotDetail = Callable[[str], str]
RefreshBotRuntimeStatus = Callable[[Any, BotInstance], str]
SerializeBot = Callable[[BotInstance], Dict[str, Any]]
SerializeBotListItem = Callable[[BotInstance], Dict[str, Any]]
ReadBotResources = Callable[[str], Dict[str, Any]]
ResolveBotProviderTarget = Callable[[BotInstance], Any]
WorkspaceRoot = Callable[[str], str]
CalcDirSizeBytes = Callable[[str], int]
class BotQueryService:
def __init__(
self,
*,
cache: Any,
cache_key_bots_list: CacheKeyBotsList,
cache_key_bot_detail: CacheKeyBotDetail,
refresh_bot_runtime_status: RefreshBotRuntimeStatus,
serialize_bot: SerializeBot,
serialize_bot_list_item: SerializeBotListItem,
read_bot_resources: ReadBotResources,
resolve_bot_provider_target: ResolveBotProviderTarget,
get_runtime_provider: Callable[[Any, BotInstance], Any],
workspace_root: WorkspaceRoot,
calc_dir_size_bytes: CalcDirSizeBytes,
logger: Any,
) -> None:
self._cache = cache
self._cache_key_bots_list = cache_key_bots_list
self._cache_key_bot_detail = cache_key_bot_detail
self._refresh_bot_runtime_status = refresh_bot_runtime_status
self._serialize_bot = serialize_bot
self._serialize_bot_list_item = serialize_bot_list_item
self._read_bot_resources = read_bot_resources
self._resolve_bot_provider_target = resolve_bot_provider_target
self._get_runtime_provider = get_runtime_provider
self._workspace_root = workspace_root
self._calc_dir_size_bytes = calc_dir_size_bytes
self._logger = logger
def _require_bot(self, *, session: Session, bot_id: str) -> BotInstance:
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
return bot
def list_bots(self, *, app_state: Any, session: Session, current_user_id: int) -> list[Dict[str, Any]]:
from models.sys_auth import SysUser
from services.sys_auth_service import list_accessible_bots_for_user
cached = self._cache.get_json(self._cache_key_bots_list(current_user_id))
if isinstance(cached, list):
return cached
current_user = session.get(SysUser, current_user_id) if current_user_id > 0 else None
if current_user is None:
raise HTTPException(status_code=401, detail="Authentication required")
bots = list_accessible_bots_for_user(session, current_user)
dirty = False
for bot in bots:
previous_status = str(bot.docker_status or "").upper()
previous_state = str(bot.current_state or "")
actual_status = self._refresh_bot_runtime_status(app_state, bot)
if previous_status != actual_status or previous_state != str(bot.current_state or ""):
session.add(bot)
dirty = True
if dirty:
session.commit()
for bot in bots:
session.refresh(bot)
rows = [self._serialize_bot_list_item(bot) for bot in bots]
self._cache.set_json(self._cache_key_bots_list(current_user_id), rows, ttl=30)
return rows
def get_bot_detail(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
cached = self._cache.get_json(self._cache_key_bot_detail(bot_id))
if isinstance(cached, dict):
return cached
bot = self._require_bot(session=session, bot_id=bot_id)
previous_status = str(bot.docker_status or "").upper()
previous_state = str(bot.current_state or "")
actual_status = self._refresh_bot_runtime_status(app_state, bot)
if previous_status != actual_status or previous_state != str(bot.current_state or ""):
session.add(bot)
session.commit()
session.refresh(bot)
row = self._serialize_bot(bot)
self._cache.set_json(self._cache_key_bot_detail(bot_id), row, ttl=30)
return row
def get_bot_resources(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
configured = self._read_bot_resources(bot_id)
try:
runtime = self._get_runtime_provider(app_state, bot).get_resource_snapshot(bot_id=bot_id)
except Exception as exc:
log_edge_failure(
self._logger,
key=f"bot-resources:{bot_id}",
exc=exc,
message=f"Failed to refresh bot resources for bot_id={bot_id}",
)
runtime = {"usage": {}, "limits": {}, "docker_status": str(bot.docker_status or "STOPPED").upper()}
runtime_status = str(runtime.get("docker_status") or "").upper()
previous_status = str(bot.docker_status or "").upper()
previous_state = str(bot.current_state or "")
if runtime_status:
bot.docker_status = runtime_status
if runtime_status != "RUNNING" and str(bot.current_state or "").upper() not in {"ERROR"}:
bot.current_state = "IDLE"
if previous_status != str(bot.docker_status or "").upper() or previous_state != str(bot.current_state or ""):
session.add(bot)
session.commit()
session.refresh(bot)
target = self._resolve_bot_provider_target(bot)
usage_payload = dict(runtime.get("usage") or {})
workspace_bytes = int(usage_payload.get("container_rw_bytes") or usage_payload.get("workspace_used_bytes") or 0)
workspace_root = ""
if workspace_bytes <= 0:
workspace_root = self._workspace_root(bot_id)
workspace_bytes = self._calc_dir_size_bytes(workspace_root)
elif target.transport_kind != "edge":
workspace_root = self._workspace_root(bot_id)
configured_storage_bytes = int(configured.get("storage_gb", 0) or 0) * 1024 * 1024 * 1024
workspace_percent = 0.0
if configured_storage_bytes > 0:
workspace_percent = (workspace_bytes / configured_storage_bytes) * 100.0
limits = runtime.get("limits") or {}
cpu_limited = (limits.get("cpu_cores") or 0) > 0
memory_limited = (limits.get("memory_bytes") or 0) > 0
storage_limited = bool(limits.get("storage_bytes")) or bool(limits.get("storage_opt_raw"))
return {
"bot_id": bot_id,
"docker_status": runtime.get("docker_status") or bot.docker_status,
"configured": configured,
"runtime": runtime,
"workspace": {
"path": workspace_root or None,
"usage_bytes": workspace_bytes,
"configured_limit_bytes": configured_storage_bytes if configured_storage_bytes > 0 else None,
"usage_percent": max(0.0, workspace_percent),
},
"enforcement": {
"cpu_limited": cpu_limited,
"memory_limited": memory_limited,
"storage_limited": storage_limited,
},
"note": (
"Resource value 0 means unlimited. CPU/Memory limits come from Docker HostConfig and are enforced by cgroup. "
"Storage limit depends on Docker storage driver support."
),
"collected_at": datetime.utcnow().isoformat() + "Z",
}
def get_tools_config(self, *, session: Session, bot_id: str) -> Dict[str, Any]:
self._require_bot(session=session, bot_id=bot_id)
return {
"bot_id": bot_id,
"tools_config": {},
"managed_by_dashboard": False,
"hint": "Tools config is disabled in dashboard. Configure tool-related env vars manually.",
}
def update_tools_config(self, *, session: Session, bot_id: str, payload: Any) -> Dict[str, Any]:
self._require_bot(session=session, bot_id=bot_id)
raise HTTPException(
status_code=400,
detail="Tools config is no longer managed by dashboard. Please set required env vars manually.",
)