import logging import os import re import shutil from datetime import datetime from typing import Any, Callable, Dict, Optional from fastapi import HTTPException from sqlmodel import Session, select from core.settings import ( BOTS_WORKSPACE_ROOT, DEFAULT_AGENTS_MD, DEFAULT_IDENTITY_MD, DEFAULT_SOUL_MD, DEFAULT_TOOLS_MD, DEFAULT_USER_MD, ) from models.bot import BotInstance, BotMessage from models.platform import BotActivityEvent, BotRequestUsage from models.skill import BotSkillInstall from models.topic import TopicItem, TopicTopic from providers.target import ProviderTarget, normalize_provider_target, provider_target_to_dict from services.runtime_service import RuntimeService RefreshBotRuntimeStatus = Callable[[Any, BotInstance], str] ResolveBotProviderTarget = Callable[[BotInstance], ProviderTarget] ProviderTargetFromNode = Callable[[Optional[str]], Optional[ProviderTarget]] DefaultProviderTarget = Callable[[], ProviderTarget] EnsureProviderTargetSupported = Callable[[ProviderTarget], None] RequireReadyImage = Callable[..., Any] SyncBotWorkspaceViaProvider = Callable[..., None] ApplyProviderTargetToBot = Callable[[BotInstance, ProviderTarget], None] SerializeProviderTargetSummary = Callable[[ProviderTarget], Dict[str, Any]] SerializeBot = Callable[[BotInstance], Dict[str, Any]] NodeDisplayName = Callable[[str], str] InvalidateBotCache = Callable[[str], None] RecordActivityEvent = Callable[..., None] NormalizeEnvParams = Callable[[Any], Dict[str, str]] NormalizeSystemTimezone = Callable[[Any], str] NormalizeResourceLimits = Callable[[Any, Any, Any], Dict[str, Any]] WriteEnvStore = Callable[[str, Dict[str, str]], None] ResolveBotEnvParams = Callable[[str], Dict[str, str]] ClearProviderTargetOverride = Callable[[str], None] NormalizeInitialChannels = Callable[[str, Any], Any] ExpectedEdgeOfflineError = Callable[[Exception], bool] SummarizeEdgeException = Callable[[Exception], str] ResolveEdgeClient = Callable[[ProviderTarget], Any] NodeMetadata = Callable[[str], Dict[str, Any]] LogEdgeFailure = Callable[..., None] InvalidateBotMessagesCache = Callable[[str], None] class BotLifecycleService: def __init__( self, *, bot_id_pattern: re.Pattern[str], runtime_service: RuntimeService, refresh_bot_runtime_status: RefreshBotRuntimeStatus, resolve_bot_provider_target: ResolveBotProviderTarget, provider_target_from_node: ProviderTargetFromNode, default_provider_target: DefaultProviderTarget, ensure_provider_target_supported: EnsureProviderTargetSupported, require_ready_image: RequireReadyImage, sync_bot_workspace_via_provider: SyncBotWorkspaceViaProvider, apply_provider_target_to_bot: ApplyProviderTargetToBot, serialize_provider_target_summary: SerializeProviderTargetSummary, serialize_bot: SerializeBot, node_display_name: NodeDisplayName, invalidate_bot_detail_cache: InvalidateBotCache, record_activity_event: RecordActivityEvent, normalize_env_params: NormalizeEnvParams, normalize_system_timezone: NormalizeSystemTimezone, normalize_resource_limits: NormalizeResourceLimits, write_env_store: WriteEnvStore, resolve_bot_env_params: ResolveBotEnvParams, clear_provider_target_override: ClearProviderTargetOverride, normalize_initial_channels: NormalizeInitialChannels, is_expected_edge_offline_error: ExpectedEdgeOfflineError, summarize_edge_exception: SummarizeEdgeException, resolve_edge_client: ResolveEdgeClient, node_metadata: NodeMetadata, log_edge_failure: LogEdgeFailure, invalidate_bot_messages_cache: InvalidateBotMessagesCache, logger: logging.Logger, ) -> None: self._bot_id_pattern = bot_id_pattern self._runtime_service = runtime_service self._refresh_bot_runtime_status = refresh_bot_runtime_status self._resolve_bot_provider_target = resolve_bot_provider_target self._provider_target_from_node = provider_target_from_node self._default_provider_target = default_provider_target self._ensure_provider_target_supported = ensure_provider_target_supported self._require_ready_image = require_ready_image self._sync_bot_workspace_via_provider = sync_bot_workspace_via_provider self._apply_provider_target_to_bot = apply_provider_target_to_bot self._serialize_provider_target_summary = serialize_provider_target_summary self._serialize_bot = serialize_bot self._node_display_name = node_display_name self._invalidate_bot_detail_cache = invalidate_bot_detail_cache self._record_activity_event = record_activity_event self._normalize_env_params = normalize_env_params self._normalize_system_timezone = normalize_system_timezone self._normalize_resource_limits = normalize_resource_limits self._write_env_store = write_env_store self._resolve_bot_env_params = resolve_bot_env_params self._clear_provider_target_override = clear_provider_target_override self._normalize_initial_channels = normalize_initial_channels self._is_expected_edge_offline_error = is_expected_edge_offline_error self._summarize_edge_exception = summarize_edge_exception self._resolve_edge_client = resolve_edge_client self._node_metadata = node_metadata self._log_edge_failure = log_edge_failure self._invalidate_bot_messages_cache = invalidate_bot_messages_cache self._logger = logger def _require_bot(self, *, session: Session, bot_id: str) -> BotInstance: bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") return bot def create_bot(self, *, session: Session, payload: Any) -> Dict[str, Any]: normalized_bot_id = str(getattr(payload, "id", "") or "").strip() if not normalized_bot_id: raise HTTPException(status_code=400, detail="Bot ID is required") if not self._bot_id_pattern.fullmatch(normalized_bot_id): raise HTTPException(status_code=400, detail="Bot ID can only contain letters, numbers, and underscores") if session.get(BotInstance, normalized_bot_id): raise HTTPException(status_code=409, detail=f"Bot ID already exists: {normalized_bot_id}") normalized_env_params = self._normalize_env_params(getattr(payload, "env_params", None)) try: normalized_env_params["TZ"] = self._normalize_system_timezone(getattr(payload, "system_timezone", None)) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc provider_target = normalize_provider_target( { "node_id": getattr(payload, "node_id", None), "transport_kind": getattr(payload, "transport_kind", None), "runtime_kind": getattr(payload, "runtime_kind", None), "core_adapter": getattr(payload, "core_adapter", None), }, fallback=self._provider_target_from_node(getattr(payload, "node_id", None)) or self._default_provider_target(), ) self._ensure_provider_target_supported(provider_target) normalized_image_tag = str(getattr(payload, "image_tag", "") or "").strip() if provider_target.runtime_kind == "docker": self._require_ready_image(session, normalized_image_tag, require_local_image=True) bot = BotInstance( id=normalized_bot_id, name=getattr(payload, "name", None), enabled=bool(getattr(payload, "enabled", True)) if getattr(payload, "enabled", None) is not None else True, access_password="", image_tag=normalized_image_tag, node_id=provider_target.node_id, transport_kind=provider_target.transport_kind, runtime_kind=provider_target.runtime_kind, core_adapter=provider_target.core_adapter, workspace_dir=os.path.join(BOTS_WORKSPACE_ROOT, normalized_bot_id), ) session.add(bot) session.commit() session.refresh(bot) resource_limits = self._normalize_resource_limits( getattr(payload, "cpu_cores", None), getattr(payload, "memory_mb", None), getattr(payload, "storage_gb", None), ) workspace_synced = True sync_error_detail = "" try: self._write_env_store(normalized_bot_id, normalized_env_params) self._sync_bot_workspace_via_provider( session, bot, target_override=provider_target, channels_override=self._normalize_initial_channels(normalized_bot_id, getattr(payload, "channels", None)), global_delivery_override={ "sendProgress": bool(getattr(payload, "send_progress", None)) if getattr(payload, "send_progress", None) is not None else False, "sendToolHints": bool(getattr(payload, "send_tool_hints", None)) if getattr(payload, "send_tool_hints", None) is not None else False, }, runtime_overrides={ "llm_provider": getattr(payload, "llm_provider", None), "llm_model": getattr(payload, "llm_model", None), "api_key": getattr(payload, "api_key", None), "api_base": getattr(payload, "api_base", "") or "", "temperature": getattr(payload, "temperature", None), "top_p": getattr(payload, "top_p", None), "max_tokens": getattr(payload, "max_tokens", None), "cpu_cores": resource_limits["cpu_cores"], "memory_mb": resource_limits["memory_mb"], "storage_gb": resource_limits["storage_gb"], "node_id": provider_target.node_id, "transport_kind": provider_target.transport_kind, "runtime_kind": provider_target.runtime_kind, "core_adapter": provider_target.core_adapter, "system_prompt": getattr(payload, "system_prompt", None) or getattr(payload, "soul_md", None) or DEFAULT_SOUL_MD, "soul_md": getattr(payload, "soul_md", None) or getattr(payload, "system_prompt", None) or DEFAULT_SOUL_MD, "agents_md": getattr(payload, "agents_md", None) or DEFAULT_AGENTS_MD, "user_md": getattr(payload, "user_md", None) or DEFAULT_USER_MD, "tools_md": getattr(payload, "tools_md", None) or DEFAULT_TOOLS_MD, "identity_md": getattr(payload, "identity_md", None) or DEFAULT_IDENTITY_MD, "send_progress": bool(getattr(payload, "send_progress", None)) if getattr(payload, "send_progress", None) is not None else False, "send_tool_hints": bool(getattr(payload, "send_tool_hints", None)) if getattr(payload, "send_tool_hints", None) is not None else False, }, ) except Exception as exc: if self._is_expected_edge_offline_error(exc): workspace_synced = False sync_error_detail = self._summarize_edge_exception(exc) self._logger.info( "Create bot pending sync due to offline edge bot_id=%s node=%s detail=%s", normalized_bot_id, provider_target.node_id, sync_error_detail, ) else: detail = self._summarize_edge_exception(exc) try: doomed = session.get(BotInstance, normalized_bot_id) if doomed is not None: session.delete(doomed) session.commit() self._clear_provider_target_override(normalized_bot_id) except Exception: session.rollback() raise HTTPException(status_code=502, detail=f"Failed to initialize bot workspace: {detail}") from exc session.refresh(bot) self._record_activity_event( session, normalized_bot_id, "bot_created", channel="system", detail=f"Bot {normalized_bot_id} created", metadata={ "image_tag": normalized_image_tag, "workspace_synced": workspace_synced, "sync_error": sync_error_detail if not workspace_synced else "", }, ) if not workspace_synced: self._record_activity_event( session, normalized_bot_id, "bot_warning", channel="system", detail="Bot created, but node is offline. Workspace sync is pending.", metadata={"sync_error": sync_error_detail, "node_id": provider_target.node_id}, ) session.commit() self._invalidate_bot_detail_cache(normalized_bot_id) return self._serialize_bot(bot) def update_bot(self, *, session: Session, bot_id: str, payload: Any) -> Dict[str, Any]: bot = self._require_bot(session=session, bot_id=bot_id) update_data = payload.model_dump(exclude_unset=True) env_params = update_data.pop("env_params", None) if isinstance(update_data, dict) else None system_timezone = update_data.pop("system_timezone", None) if isinstance(update_data, dict) else None normalized_system_timezone: Optional[str] = None if system_timezone is not None: try: normalized_system_timezone = self._normalize_system_timezone(system_timezone) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc runtime_overrides: Dict[str, Any] = {} update_data.pop("tools_config", None) if isinstance(update_data, dict) else None runtime_fields = { "llm_provider", "llm_model", "api_key", "api_base", "temperature", "top_p", "max_tokens", "cpu_cores", "memory_mb", "storage_gb", "soul_md", "agents_md", "user_md", "tools_md", "identity_md", "send_progress", "send_tool_hints", "system_prompt", } execution_target_fields = { "node_id", "transport_kind", "runtime_kind", "core_adapter", } deploy_only_fields = {"image_tag", *execution_target_fields} if deploy_only_fields & set(update_data.keys()): raise HTTPException( status_code=400, detail=f"Use /api/bots/{bot_id}/deploy for execution target or image changes", ) for field in runtime_fields: if field in update_data: runtime_overrides[field] = update_data.pop(field) next_target: Optional[ProviderTarget] = None for text_field in ("llm_provider", "llm_model", "api_key"): if text_field in runtime_overrides: text = str(runtime_overrides.get(text_field) or "").strip() if not text: runtime_overrides.pop(text_field, None) else: runtime_overrides[text_field] = text if "api_base" in runtime_overrides: runtime_overrides["api_base"] = str(runtime_overrides.get("api_base") or "").strip() if "system_prompt" in runtime_overrides and "soul_md" not in runtime_overrides: runtime_overrides["soul_md"] = runtime_overrides["system_prompt"] if "soul_md" in runtime_overrides and "system_prompt" not in runtime_overrides: runtime_overrides["system_prompt"] = runtime_overrides["soul_md"] if {"cpu_cores", "memory_mb", "storage_gb"} & set(runtime_overrides.keys()): normalized_resources = self._normalize_resource_limits( runtime_overrides.get("cpu_cores"), runtime_overrides.get("memory_mb"), runtime_overrides.get("storage_gb"), ) runtime_overrides.update(normalized_resources) db_fields = {"name", "enabled"} for key, value in update_data.items(): if key in db_fields: setattr(bot, key, value) previous_env_params: Optional[Dict[str, str]] = None next_env_params: Optional[Dict[str, str]] = None if env_params is not None or normalized_system_timezone is not None: previous_env_params = self._resolve_bot_env_params(bot_id) next_env_params = dict(previous_env_params) if env_params is not None: next_env_params = self._normalize_env_params(env_params) if normalized_system_timezone is not None: next_env_params["TZ"] = normalized_system_timezone global_delivery_override: Optional[Dict[str, Any]] = None if "send_progress" in runtime_overrides or "send_tool_hints" in runtime_overrides: global_delivery_override = {} if "send_progress" in runtime_overrides: global_delivery_override["sendProgress"] = bool(runtime_overrides.get("send_progress")) if "send_tool_hints" in runtime_overrides: global_delivery_override["sendToolHints"] = bool(runtime_overrides.get("send_tool_hints")) self._sync_bot_workspace_via_provider( session, bot, target_override=next_target, runtime_overrides=runtime_overrides if runtime_overrides else None, global_delivery_override=global_delivery_override, ) try: if next_env_params is not None: self._write_env_store(bot_id, next_env_params) if next_target is not None: self._apply_provider_target_to_bot(bot, next_target) session.add(bot) session.commit() except Exception: session.rollback() if previous_env_params is not None: self._write_env_store(bot_id, previous_env_params) raise session.refresh(bot) self._invalidate_bot_detail_cache(bot_id) return self._serialize_bot(bot) async def start_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]: bot = self._require_bot(session=session, bot_id=bot_id) return await self._runtime_service.start_bot(app_state=app_state, session=session, bot=bot) def stop_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]: bot = self._require_bot(session=session, bot_id=bot_id) return self._runtime_service.stop_bot(app_state=app_state, session=session, bot=bot) def enable_bot(self, *, session: Session, bot_id: str) -> Dict[str, Any]: bot = self._require_bot(session=session, bot_id=bot_id) bot.enabled = True session.add(bot) self._record_activity_event(session, bot_id, "bot_enabled", channel="system", detail=f"Bot {bot_id} enabled") session.commit() self._invalidate_bot_detail_cache(bot_id) return {"status": "enabled", "enabled": True} def disable_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]: bot = self._require_bot(session=session, bot_id=bot_id) self._set_inactive(app_state=app_state, session=session, bot=bot, activity_type="bot_disabled", detail="disabled") return {"status": "disabled", "enabled": False} def deactivate_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]: bot = self._require_bot(session=session, bot_id=bot_id) self._set_inactive( app_state=app_state, session=session, bot=bot, activity_type="bot_deactivated", detail="deactivated", ) return {"status": "deactivated"} def delete_bot( self, *, app_state: Any, session: Session, bot_id: str, delete_workspace: bool = True, ) -> Dict[str, Any]: bot = self._require_bot(session=session, bot_id=bot_id) target = self._resolve_bot_provider_target(bot) try: self._runtime_service.stop_bot(app_state=app_state, session=session, bot=bot) except Exception: pass workspace_deleted = not bool(delete_workspace) if delete_workspace: if target.transport_kind == "edge": try: workspace_root = str(self._node_metadata(target.node_id).get("workspace_root") or "").strip() or None purge_result = self._resolve_edge_client(target).purge_workspace( bot_id=bot_id, workspace_root=workspace_root, ) workspace_deleted = str(purge_result.get("status") or "").strip().lower() in {"deleted", "not_found"} except Exception as exc: self._log_edge_failure( self._logger, key=f"bot-delete-workspace:{bot_id}", exc=exc, message=f"Failed to purge edge workspace for bot_id={bot_id}", ) workspace_deleted = False workspace_root = os.path.join(BOTS_WORKSPACE_ROOT, bot_id) if os.path.isdir(workspace_root): shutil.rmtree(workspace_root, ignore_errors=True) workspace_deleted = True messages = session.exec(select(BotMessage).where(BotMessage.bot_id == bot_id)).all() for row in messages: session.delete(row) topic_items = session.exec(select(TopicItem).where(TopicItem.bot_id == bot_id)).all() for row in topic_items: session.delete(row) topics = session.exec(select(TopicTopic).where(TopicTopic.bot_id == bot_id)).all() for row in topics: session.delete(row) usage_rows = session.exec(select(BotRequestUsage).where(BotRequestUsage.bot_id == bot_id)).all() for row in usage_rows: session.delete(row) activity_rows = session.exec(select(BotActivityEvent).where(BotActivityEvent.bot_id == bot_id)).all() for row in activity_rows: session.delete(row) skill_install_rows = session.exec(select(BotSkillInstall).where(BotSkillInstall.bot_id == bot_id)).all() for row in skill_install_rows: session.delete(row) session.delete(bot) session.commit() self._clear_provider_target_override(bot_id) self._invalidate_bot_detail_cache(bot_id) self._invalidate_bot_messages_cache(bot_id) return {"status": "deleted", "workspace_deleted": workspace_deleted} async def deploy_bot( self, *, app_state: Any, session: Session, bot_id: str, node_id: str, runtime_kind: Optional[str] = None, image_tag: Optional[str] = None, auto_start: bool = False, ) -> Dict[str, Any]: bot = self._require_bot(session=session, bot_id=bot_id) actual_status = self._refresh_bot_runtime_status(app_state, bot) session.add(bot) session.commit() if actual_status == "RUNNING": raise HTTPException(status_code=409, detail="Stop the bot before deploy or migrate") current_target = self._resolve_bot_provider_target(bot) next_target_base = self._provider_target_from_node(node_id) if next_target_base is None: raise HTTPException(status_code=400, detail=f"Managed node not found: {node_id}") next_target = normalize_provider_target( { "node_id": node_id, "runtime_kind": runtime_kind, }, fallback=next_target_base, ) self._ensure_provider_target_supported(next_target) existing_image_tag = str(bot.image_tag or "").strip() requested_image_tag = str(image_tag or "").strip() if next_target.runtime_kind == "docker": requested_image_tag = requested_image_tag or existing_image_tag image_changed = requested_image_tag != str(bot.image_tag or "").strip() target_changed = next_target.key != current_target.key if not image_changed and not target_changed: raise HTTPException(status_code=400, detail="No deploy changes detected") if next_target.runtime_kind == "docker": self._require_ready_image( session, requested_image_tag, require_local_image=True, ) self._sync_bot_workspace_via_provider( session, bot, target_override=next_target, runtime_overrides=provider_target_to_dict(next_target), ) previous_image_tag = str(bot.image_tag or "").strip() bot.image_tag = requested_image_tag self._apply_provider_target_to_bot(bot, next_target) bot.updated_at = datetime.utcnow() session.add(bot) self._record_activity_event( session, bot_id, "bot_deployed", channel="system", detail=( f"Bot {bot_id} deployed to {self._node_display_name(next_target.node_id)}" if target_changed else f"Bot {bot_id} redeployed with image {requested_image_tag}" ), metadata={ "previous_target": self._serialize_provider_target_summary(current_target), "next_target": self._serialize_provider_target_summary(next_target), "previous_image_tag": previous_image_tag, "image_tag": requested_image_tag, "auto_start": bool(auto_start), }, ) session.commit() session.refresh(bot) started = False if bool(auto_start): await self._runtime_service.start_bot(app_state=app_state, session=session, bot=bot) session.refresh(bot) started = True self._invalidate_bot_detail_cache(bot_id) return { "status": "deployed", "bot": self._serialize_bot(bot), "started": started, "image_tag": requested_image_tag, "previous_image_tag": previous_image_tag, "previous_target": self._serialize_provider_target_summary(current_target), "next_target": self._serialize_provider_target_summary(next_target), } def _set_inactive( self, *, app_state: Any, session: Session, bot: BotInstance, activity_type: str, detail: str, ) -> None: bot_id = str(bot.id or "").strip() try: self._runtime_service.stop_bot(app_state=app_state, session=session, bot=bot) except Exception: pass bot.enabled = False bot.docker_status = "STOPPED" if str(bot.current_state or "").upper() not in {"ERROR"}: bot.current_state = "IDLE" session.add(bot) self._record_activity_event(session, bot_id, activity_type, channel="system", detail=f"Bot {bot_id} {detail}") session.commit() self._invalidate_bot_detail_cache(bot_id)