import asyncio import os import time from typing import Any, Callable, Dict from sqlmodel import Session from clients.edge.errors import log_edge_failure from models.bot import BotInstance from providers.target import provider_target_to_dict class BotRuntimeSnapshotService: _AGENT_LOOP_READY_MARKER = "Agent loop started" def __init__( self, *, engine: Any, logger: Any, docker_manager: Any, default_soul_md: str, default_agents_md: str, default_user_md: str, default_tools_md: str, default_identity_md: str, workspace_root: Callable[[str], str], resolve_edge_state_context: Callable[[str], Any], read_bot_config: Callable[[str], Dict[str, Any]], resolve_bot_env_params: Callable[[str], Dict[str, str]], resolve_bot_provider_target_for_instance: Callable[[BotInstance], Any], read_global_delivery_flags: Callable[[Any], tuple[bool, bool]], safe_float: Callable[[Any, float], float], safe_int: Callable[[Any, int], int], get_default_system_timezone: Callable[[], str], read_bot_resources: Callable[[str, Any], Dict[str, Any]], node_display_name: Callable[[str], str], get_runtime_provider: Callable[[Any, BotInstance], Any], invalidate_bot_detail_cache: Callable[[str], None], record_activity_event: Callable[..., None], ) -> None: self._engine = engine self._logger = logger self._docker_manager = docker_manager self._default_soul_md = default_soul_md self._default_agents_md = default_agents_md self._default_user_md = default_user_md self._default_tools_md = default_tools_md self._default_identity_md = default_identity_md self._workspace_root = workspace_root self._resolve_edge_state_context = resolve_edge_state_context self._read_bot_config = read_bot_config self._resolve_bot_env_params = resolve_bot_env_params self._resolve_bot_provider_target_for_instance = resolve_bot_provider_target_for_instance self._read_global_delivery_flags = read_global_delivery_flags self._safe_float = safe_float self._safe_int = safe_int self._get_default_system_timezone = get_default_system_timezone self._read_bot_resources = read_bot_resources self._node_display_name = node_display_name self._get_runtime_provider = get_runtime_provider self._invalidate_bot_detail_cache = invalidate_bot_detail_cache self._record_activity_event = record_activity_event def read_workspace_md(self, bot_id: str, filename: str, default_value: str) -> str: edge_context = self._resolve_edge_state_context(bot_id) if edge_context is not None: client, workspace_root, node_id = edge_context try: payload = client.read_file( bot_id=bot_id, path=filename, max_bytes=1_000_000, workspace_root=workspace_root, ) if bool(payload.get("is_markdown")): content = payload.get("content") if isinstance(content, str): return content.strip() except Exception as exc: log_edge_failure( self._logger, key=f"workspace-md-read:{node_id}:{bot_id}:{filename}", exc=exc, message=f"Failed to read edge workspace markdown for bot_id={bot_id}, file={filename}", ) return default_value path = os.path.join(self._workspace_root(bot_id), filename) if not os.path.isfile(path): return default_value try: with open(path, "r", encoding="utf-8") as file: return file.read().strip() except Exception: return default_value def read_bot_runtime_snapshot(self, bot: BotInstance) -> Dict[str, Any]: config_data = self._read_bot_config(bot.id) env_params = self._resolve_bot_env_params(bot.id) target = self._resolve_bot_provider_target_for_instance(bot) provider_name = "" provider_cfg: Dict[str, Any] = {} providers_cfg = config_data.get("providers") if isinstance(providers_cfg, dict): for p_name, p_cfg in providers_cfg.items(): provider_name = str(p_name or "").strip() if isinstance(p_cfg, dict): provider_cfg = p_cfg break agents_defaults: Dict[str, Any] = {} agents_cfg = config_data.get("agents") if isinstance(agents_cfg, dict): defaults = agents_cfg.get("defaults") if isinstance(defaults, dict): agents_defaults = defaults channels_cfg = config_data.get("channels") send_progress, send_tool_hints = self._read_global_delivery_flags(channels_cfg) llm_provider = provider_name or "dashscope" llm_model = str(agents_defaults.get("model") or "") api_key = str(provider_cfg.get("apiKey") or "").strip() api_base = str(provider_cfg.get("apiBase") or "").strip() api_base_lower = api_base.lower() if llm_provider == "openai" and ("spark-api-open.xf-yun.com" in api_base_lower or "xf-yun.com" in api_base_lower): llm_provider = "xunfei" soul_md = self.read_workspace_md(bot.id, "SOUL.md", self._default_soul_md) resources = self._read_bot_resources(bot.id, config_data=config_data) return { **provider_target_to_dict(target), "llm_provider": llm_provider, "llm_model": llm_model, "api_key": api_key, "api_base": api_base, "temperature": self._safe_float(agents_defaults.get("temperature"), 0.2), "top_p": self._safe_float(agents_defaults.get("topP"), 1.0), "max_tokens": self._safe_int(agents_defaults.get("maxTokens"), 8192), "cpu_cores": resources["cpu_cores"], "memory_mb": resources["memory_mb"], "storage_gb": resources["storage_gb"], "system_timezone": env_params.get("TZ") or self._get_default_system_timezone(), "send_progress": send_progress, "send_tool_hints": send_tool_hints, "soul_md": soul_md, "agents_md": self.read_workspace_md(bot.id, "AGENTS.md", self._default_agents_md), "user_md": self.read_workspace_md(bot.id, "USER.md", self._default_user_md), "tools_md": self.read_workspace_md(bot.id, "TOOLS.md", self._default_tools_md), "identity_md": self.read_workspace_md(bot.id, "IDENTITY.md", self._default_identity_md), "system_prompt": soul_md, } def serialize_bot(self, bot: BotInstance) -> Dict[str, Any]: runtime = self.read_bot_runtime_snapshot(bot) target = self._resolve_bot_provider_target_for_instance(bot) return { "id": bot.id, "name": bot.name, "enabled": bool(getattr(bot, "enabled", True)), "avatar_model": "base", "avatar_skin": "blue_suit", "image_tag": bot.image_tag, "llm_provider": runtime.get("llm_provider") or "", "llm_model": runtime.get("llm_model") or "", "system_prompt": runtime.get("system_prompt") or "", "api_base": runtime.get("api_base") or "", "temperature": self._safe_float(runtime.get("temperature"), 0.2), "top_p": self._safe_float(runtime.get("top_p"), 1.0), "max_tokens": self._safe_int(runtime.get("max_tokens"), 8192), "cpu_cores": self._safe_float(runtime.get("cpu_cores"), 1.0), "memory_mb": self._safe_int(runtime.get("memory_mb"), 1024), "storage_gb": self._safe_int(runtime.get("storage_gb"), 10), "system_timezone": str(runtime.get("system_timezone") or self._get_default_system_timezone()), "send_progress": bool(runtime.get("send_progress")), "send_tool_hints": bool(runtime.get("send_tool_hints")), "node_id": target.node_id, "node_display_name": self._node_display_name(target.node_id), "transport_kind": target.transport_kind, "runtime_kind": target.runtime_kind, "core_adapter": target.core_adapter, "soul_md": runtime.get("soul_md") or "", "agents_md": runtime.get("agents_md") or "", "user_md": runtime.get("user_md") or "", "tools_md": runtime.get("tools_md") or "", "identity_md": runtime.get("identity_md") or "", "workspace_dir": bot.workspace_dir, "docker_status": bot.docker_status, "current_state": bot.current_state, "last_action": bot.last_action, "created_at": bot.created_at, "updated_at": bot.updated_at, } def serialize_bot_list_item(self, bot: BotInstance) -> Dict[str, Any]: runtime = self.read_bot_runtime_snapshot(bot) target = self._resolve_bot_provider_target_for_instance(bot) return { "id": bot.id, "name": bot.name, "enabled": bool(getattr(bot, "enabled", True)), "image_tag": bot.image_tag, "llm_provider": runtime.get("llm_provider") or "", "llm_model": runtime.get("llm_model") or "", "node_id": target.node_id, "node_display_name": self._node_display_name(target.node_id), "transport_kind": target.transport_kind, "runtime_kind": target.runtime_kind, "core_adapter": target.core_adapter, "docker_status": bot.docker_status, "current_state": bot.current_state, "last_action": bot.last_action, "updated_at": bot.updated_at, } def refresh_bot_runtime_status(self, app_state: Any, bot: BotInstance) -> str: current_status = str(bot.docker_status or "STOPPED").upper() try: status = str(self._get_runtime_provider(app_state, bot).get_runtime_status(bot_id=str(bot.id or "")) or "STOPPED").upper() except Exception as exc: log_edge_failure( self._logger, key=f"bot-runtime-status:{bot.id}", exc=exc, message=f"Failed to refresh runtime status for bot_id={bot.id}", ) return current_status bot.docker_status = status if status != "RUNNING" and str(bot.current_state or "").upper() not in {"ERROR"}: bot.current_state = "IDLE" return status async def wait_for_agent_loop_ready( self, bot_id: str, timeout_seconds: float = 12.0, poll_interval_seconds: float = 0.5, ) -> bool: deadline = time.monotonic() + max(1.0, timeout_seconds) marker = self._AGENT_LOOP_READY_MARKER.lower() while time.monotonic() < deadline: logs = self._docker_manager.get_recent_logs(bot_id, tail=200) if any(marker in str(line or "").lower() for line in logs): return True await asyncio.sleep(max(0.1, poll_interval_seconds)) return False async def record_agent_loop_ready_warning( self, bot_id: str, timeout_seconds: float = 12.0, poll_interval_seconds: float = 0.5, ) -> None: try: agent_loop_ready = await self.wait_for_agent_loop_ready( bot_id, timeout_seconds=timeout_seconds, poll_interval_seconds=poll_interval_seconds, ) if agent_loop_ready: return if self._docker_manager.get_bot_status(bot_id) != "RUNNING": return detail = ( "Bot container started, but ready marker was not found in logs within " f"{int(timeout_seconds)}s. Check bot logs or MCP config if the bot stays unavailable." ) self._logger.warning("bot_id=%s agent loop ready marker not found within %ss", bot_id, timeout_seconds) with Session(self._engine) as background_session: if not background_session.get(BotInstance, bot_id): return self._record_activity_event( background_session, bot_id, "bot_warning", channel="system", detail=detail, metadata={ "kind": "agent_loop_ready_timeout", "marker": self._AGENT_LOOP_READY_MARKER, "timeout_seconds": timeout_seconds, }, ) background_session.commit() self._invalidate_bot_detail_cache(bot_id) except Exception: self._logger.exception("Failed to record agent loop readiness warning for bot_id=%s", bot_id)