181 lines
8.3 KiB
Python
181 lines
8.3 KiB
Python
from datetime import datetime
|
|
from typing import Any, Callable, Dict, Optional
|
|
|
|
from fastapi import HTTPException
|
|
from sqlmodel import Session
|
|
|
|
from clients.edge.errors import log_edge_failure
|
|
from models.bot import BotInstance
|
|
|
|
CacheKeyBotsList = Callable[[Optional[int]], str]
|
|
CacheKeyBotDetail = Callable[[str], str]
|
|
RefreshBotRuntimeStatus = Callable[[Any, BotInstance], str]
|
|
SerializeBot = Callable[[BotInstance], Dict[str, Any]]
|
|
SerializeBotListItem = Callable[[BotInstance], Dict[str, Any]]
|
|
ReadBotResources = Callable[[str], Dict[str, Any]]
|
|
ResolveBotProviderTarget = Callable[[BotInstance], Any]
|
|
WorkspaceRoot = Callable[[str], str]
|
|
CalcDirSizeBytes = Callable[[str], int]
|
|
|
|
|
|
class BotQueryService:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
cache: Any,
|
|
cache_key_bots_list: CacheKeyBotsList,
|
|
cache_key_bot_detail: CacheKeyBotDetail,
|
|
refresh_bot_runtime_status: RefreshBotRuntimeStatus,
|
|
serialize_bot: SerializeBot,
|
|
serialize_bot_list_item: SerializeBotListItem,
|
|
read_bot_resources: ReadBotResources,
|
|
resolve_bot_provider_target: ResolveBotProviderTarget,
|
|
get_runtime_provider: Callable[[Any, BotInstance], Any],
|
|
workspace_root: WorkspaceRoot,
|
|
calc_dir_size_bytes: CalcDirSizeBytes,
|
|
logger: Any,
|
|
) -> None:
|
|
self._cache = cache
|
|
self._cache_key_bots_list = cache_key_bots_list
|
|
self._cache_key_bot_detail = cache_key_bot_detail
|
|
self._refresh_bot_runtime_status = refresh_bot_runtime_status
|
|
self._serialize_bot = serialize_bot
|
|
self._serialize_bot_list_item = serialize_bot_list_item
|
|
self._read_bot_resources = read_bot_resources
|
|
self._resolve_bot_provider_target = resolve_bot_provider_target
|
|
self._get_runtime_provider = get_runtime_provider
|
|
self._workspace_root = workspace_root
|
|
self._calc_dir_size_bytes = calc_dir_size_bytes
|
|
self._logger = logger
|
|
|
|
def _require_bot(self, *, session: Session, bot_id: str) -> BotInstance:
|
|
bot = session.get(BotInstance, bot_id)
|
|
if not bot:
|
|
raise HTTPException(status_code=404, detail="Bot not found")
|
|
return bot
|
|
|
|
def list_bots(self, *, app_state: Any, session: Session, current_user_id: int) -> list[Dict[str, Any]]:
|
|
from models.sys_auth import SysUser
|
|
from services.sys_auth_service import list_accessible_bots_for_user
|
|
|
|
cached = self._cache.get_json(self._cache_key_bots_list(current_user_id))
|
|
if isinstance(cached, list):
|
|
return cached
|
|
current_user = session.get(SysUser, current_user_id) if current_user_id > 0 else None
|
|
if current_user is None:
|
|
raise HTTPException(status_code=401, detail="Authentication required")
|
|
bots = list_accessible_bots_for_user(session, current_user)
|
|
dirty = False
|
|
for bot in bots:
|
|
previous_status = str(bot.docker_status or "").upper()
|
|
previous_state = str(bot.current_state or "")
|
|
actual_status = self._refresh_bot_runtime_status(app_state, bot)
|
|
if previous_status != actual_status or previous_state != str(bot.current_state or ""):
|
|
session.add(bot)
|
|
dirty = True
|
|
if dirty:
|
|
session.commit()
|
|
for bot in bots:
|
|
session.refresh(bot)
|
|
rows = [self._serialize_bot_list_item(bot) for bot in bots]
|
|
self._cache.set_json(self._cache_key_bots_list(current_user_id), rows, ttl=30)
|
|
return rows
|
|
|
|
def get_bot_detail(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
|
|
cached = self._cache.get_json(self._cache_key_bot_detail(bot_id))
|
|
if isinstance(cached, dict):
|
|
return cached
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
previous_status = str(bot.docker_status or "").upper()
|
|
previous_state = str(bot.current_state or "")
|
|
actual_status = self._refresh_bot_runtime_status(app_state, bot)
|
|
if previous_status != actual_status or previous_state != str(bot.current_state or ""):
|
|
session.add(bot)
|
|
session.commit()
|
|
session.refresh(bot)
|
|
row = self._serialize_bot(bot)
|
|
self._cache.set_json(self._cache_key_bot_detail(bot_id), row, ttl=30)
|
|
return row
|
|
|
|
def get_bot_resources(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
configured = self._read_bot_resources(bot_id)
|
|
try:
|
|
runtime = self._get_runtime_provider(app_state, bot).get_resource_snapshot(bot_id=bot_id)
|
|
except Exception as exc:
|
|
log_edge_failure(
|
|
self._logger,
|
|
key=f"bot-resources:{bot_id}",
|
|
exc=exc,
|
|
message=f"Failed to refresh bot resources for bot_id={bot_id}",
|
|
)
|
|
runtime = {"usage": {}, "limits": {}, "docker_status": str(bot.docker_status or "STOPPED").upper()}
|
|
runtime_status = str(runtime.get("docker_status") or "").upper()
|
|
previous_status = str(bot.docker_status or "").upper()
|
|
previous_state = str(bot.current_state or "")
|
|
if runtime_status:
|
|
bot.docker_status = runtime_status
|
|
if runtime_status != "RUNNING" and str(bot.current_state or "").upper() not in {"ERROR"}:
|
|
bot.current_state = "IDLE"
|
|
if previous_status != str(bot.docker_status or "").upper() or previous_state != str(bot.current_state or ""):
|
|
session.add(bot)
|
|
session.commit()
|
|
session.refresh(bot)
|
|
target = self._resolve_bot_provider_target(bot)
|
|
usage_payload = dict(runtime.get("usage") or {})
|
|
workspace_bytes = int(usage_payload.get("container_rw_bytes") or usage_payload.get("workspace_used_bytes") or 0)
|
|
workspace_root = ""
|
|
if workspace_bytes <= 0:
|
|
workspace_root = self._workspace_root(bot_id)
|
|
workspace_bytes = self._calc_dir_size_bytes(workspace_root)
|
|
elif target.transport_kind != "edge":
|
|
workspace_root = self._workspace_root(bot_id)
|
|
configured_storage_bytes = int(configured.get("storage_gb", 0) or 0) * 1024 * 1024 * 1024
|
|
workspace_percent = 0.0
|
|
if configured_storage_bytes > 0:
|
|
workspace_percent = (workspace_bytes / configured_storage_bytes) * 100.0
|
|
|
|
limits = runtime.get("limits") or {}
|
|
cpu_limited = (limits.get("cpu_cores") or 0) > 0
|
|
memory_limited = (limits.get("memory_bytes") or 0) > 0
|
|
storage_limited = bool(limits.get("storage_bytes")) or bool(limits.get("storage_opt_raw"))
|
|
|
|
return {
|
|
"bot_id": bot_id,
|
|
"docker_status": runtime.get("docker_status") or bot.docker_status,
|
|
"configured": configured,
|
|
"runtime": runtime,
|
|
"workspace": {
|
|
"path": workspace_root or None,
|
|
"usage_bytes": workspace_bytes,
|
|
"configured_limit_bytes": configured_storage_bytes if configured_storage_bytes > 0 else None,
|
|
"usage_percent": max(0.0, workspace_percent),
|
|
},
|
|
"enforcement": {
|
|
"cpu_limited": cpu_limited,
|
|
"memory_limited": memory_limited,
|
|
"storage_limited": storage_limited,
|
|
},
|
|
"note": (
|
|
"Resource value 0 means unlimited. CPU/Memory limits come from Docker HostConfig and are enforced by cgroup. "
|
|
"Storage limit depends on Docker storage driver support."
|
|
),
|
|
"collected_at": datetime.utcnow().isoformat() + "Z",
|
|
}
|
|
|
|
def get_tools_config(self, *, session: Session, bot_id: str) -> Dict[str, Any]:
|
|
self._require_bot(session=session, bot_id=bot_id)
|
|
return {
|
|
"bot_id": bot_id,
|
|
"tools_config": {},
|
|
"managed_by_dashboard": False,
|
|
"hint": "Tools config is disabled in dashboard. Configure tool-related env vars manually.",
|
|
}
|
|
|
|
def update_tools_config(self, *, session: Session, bot_id: str, payload: Any) -> Dict[str, Any]:
|
|
self._require_bot(session=session, bot_id=bot_id)
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Tools config is no longer managed by dashboard. Please set required env vars manually.",
|
|
)
|