dashboard-nanobot/backend/services/bot_lifecycle_service.py

612 lines
27 KiB
Python

import logging
import os
import re
import shutil
from datetime import datetime
from typing import Any, Callable, Dict, Optional
from fastapi import HTTPException
from sqlmodel import Session, select
from core.settings import (
BOTS_WORKSPACE_ROOT,
DEFAULT_AGENTS_MD,
DEFAULT_IDENTITY_MD,
DEFAULT_SOUL_MD,
DEFAULT_TOOLS_MD,
DEFAULT_USER_MD,
)
from models.bot import BotInstance, BotMessage
from models.platform import BotActivityEvent, BotRequestUsage
from models.skill import BotSkillInstall
from models.topic import TopicItem, TopicTopic
from providers.target import ProviderTarget, normalize_provider_target, provider_target_to_dict
from services.runtime_service import RuntimeService
RefreshBotRuntimeStatus = Callable[[Any, BotInstance], str]
ResolveBotProviderTarget = Callable[[BotInstance], ProviderTarget]
ProviderTargetFromNode = Callable[[Optional[str]], Optional[ProviderTarget]]
DefaultProviderTarget = Callable[[], ProviderTarget]
EnsureProviderTargetSupported = Callable[[ProviderTarget], None]
RequireReadyImage = Callable[..., Any]
SyncBotWorkspaceViaProvider = Callable[..., None]
ApplyProviderTargetToBot = Callable[[BotInstance, ProviderTarget], None]
SerializeProviderTargetSummary = Callable[[ProviderTarget], Dict[str, Any]]
SerializeBot = Callable[[BotInstance], Dict[str, Any]]
NodeDisplayName = Callable[[str], str]
InvalidateBotCache = Callable[[str], None]
RecordActivityEvent = Callable[..., None]
NormalizeEnvParams = Callable[[Any], Dict[str, str]]
NormalizeSystemTimezone = Callable[[Any], str]
NormalizeResourceLimits = Callable[[Any, Any, Any], Dict[str, Any]]
WriteEnvStore = Callable[[str, Dict[str, str]], None]
ResolveBotEnvParams = Callable[[str], Dict[str, str]]
ClearProviderTargetOverride = Callable[[str], None]
NormalizeInitialChannels = Callable[[str, Any], Any]
ExpectedEdgeOfflineError = Callable[[Exception], bool]
SummarizeEdgeException = Callable[[Exception], str]
ResolveEdgeClient = Callable[[ProviderTarget], Any]
NodeMetadata = Callable[[str], Dict[str, Any]]
LogEdgeFailure = Callable[..., None]
InvalidateBotMessagesCache = Callable[[str], None]
class BotLifecycleService:
def __init__(
self,
*,
bot_id_pattern: re.Pattern[str],
runtime_service: RuntimeService,
refresh_bot_runtime_status: RefreshBotRuntimeStatus,
resolve_bot_provider_target: ResolveBotProviderTarget,
provider_target_from_node: ProviderTargetFromNode,
default_provider_target: DefaultProviderTarget,
ensure_provider_target_supported: EnsureProviderTargetSupported,
require_ready_image: RequireReadyImage,
sync_bot_workspace_via_provider: SyncBotWorkspaceViaProvider,
apply_provider_target_to_bot: ApplyProviderTargetToBot,
serialize_provider_target_summary: SerializeProviderTargetSummary,
serialize_bot: SerializeBot,
node_display_name: NodeDisplayName,
invalidate_bot_detail_cache: InvalidateBotCache,
record_activity_event: RecordActivityEvent,
normalize_env_params: NormalizeEnvParams,
normalize_system_timezone: NormalizeSystemTimezone,
normalize_resource_limits: NormalizeResourceLimits,
write_env_store: WriteEnvStore,
resolve_bot_env_params: ResolveBotEnvParams,
clear_provider_target_override: ClearProviderTargetOverride,
normalize_initial_channels: NormalizeInitialChannels,
is_expected_edge_offline_error: ExpectedEdgeOfflineError,
summarize_edge_exception: SummarizeEdgeException,
resolve_edge_client: ResolveEdgeClient,
node_metadata: NodeMetadata,
log_edge_failure: LogEdgeFailure,
invalidate_bot_messages_cache: InvalidateBotMessagesCache,
logger: logging.Logger,
) -> None:
self._bot_id_pattern = bot_id_pattern
self._runtime_service = runtime_service
self._refresh_bot_runtime_status = refresh_bot_runtime_status
self._resolve_bot_provider_target = resolve_bot_provider_target
self._provider_target_from_node = provider_target_from_node
self._default_provider_target = default_provider_target
self._ensure_provider_target_supported = ensure_provider_target_supported
self._require_ready_image = require_ready_image
self._sync_bot_workspace_via_provider = sync_bot_workspace_via_provider
self._apply_provider_target_to_bot = apply_provider_target_to_bot
self._serialize_provider_target_summary = serialize_provider_target_summary
self._serialize_bot = serialize_bot
self._node_display_name = node_display_name
self._invalidate_bot_detail_cache = invalidate_bot_detail_cache
self._record_activity_event = record_activity_event
self._normalize_env_params = normalize_env_params
self._normalize_system_timezone = normalize_system_timezone
self._normalize_resource_limits = normalize_resource_limits
self._write_env_store = write_env_store
self._resolve_bot_env_params = resolve_bot_env_params
self._clear_provider_target_override = clear_provider_target_override
self._normalize_initial_channels = normalize_initial_channels
self._is_expected_edge_offline_error = is_expected_edge_offline_error
self._summarize_edge_exception = summarize_edge_exception
self._resolve_edge_client = resolve_edge_client
self._node_metadata = node_metadata
self._log_edge_failure = log_edge_failure
self._invalidate_bot_messages_cache = invalidate_bot_messages_cache
self._logger = logger
def _require_bot(self, *, session: Session, bot_id: str) -> BotInstance:
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
return bot
def create_bot(self, *, session: Session, payload: Any) -> Dict[str, Any]:
normalized_bot_id = str(getattr(payload, "id", "") or "").strip()
if not normalized_bot_id:
raise HTTPException(status_code=400, detail="Bot ID is required")
if not self._bot_id_pattern.fullmatch(normalized_bot_id):
raise HTTPException(status_code=400, detail="Bot ID can only contain letters, numbers, and underscores")
if session.get(BotInstance, normalized_bot_id):
raise HTTPException(status_code=409, detail=f"Bot ID already exists: {normalized_bot_id}")
normalized_env_params = self._normalize_env_params(getattr(payload, "env_params", None))
try:
normalized_env_params["TZ"] = self._normalize_system_timezone(getattr(payload, "system_timezone", None))
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
provider_target = normalize_provider_target(
{
"node_id": getattr(payload, "node_id", None),
"transport_kind": getattr(payload, "transport_kind", None),
"runtime_kind": getattr(payload, "runtime_kind", None),
"core_adapter": getattr(payload, "core_adapter", None),
},
fallback=self._provider_target_from_node(getattr(payload, "node_id", None)) or self._default_provider_target(),
)
self._ensure_provider_target_supported(provider_target)
normalized_image_tag = str(getattr(payload, "image_tag", "") or "").strip()
if provider_target.runtime_kind == "docker":
self._require_ready_image(session, normalized_image_tag, require_local_image=True)
bot = BotInstance(
id=normalized_bot_id,
name=getattr(payload, "name", None),
enabled=bool(getattr(payload, "enabled", True)) if getattr(payload, "enabled", None) is not None else True,
access_password="",
image_tag=normalized_image_tag,
node_id=provider_target.node_id,
transport_kind=provider_target.transport_kind,
runtime_kind=provider_target.runtime_kind,
core_adapter=provider_target.core_adapter,
workspace_dir=os.path.join(BOTS_WORKSPACE_ROOT, normalized_bot_id),
)
session.add(bot)
session.commit()
session.refresh(bot)
resource_limits = self._normalize_resource_limits(
getattr(payload, "cpu_cores", None),
getattr(payload, "memory_mb", None),
getattr(payload, "storage_gb", None),
)
workspace_synced = True
sync_error_detail = ""
try:
self._write_env_store(normalized_bot_id, normalized_env_params)
self._sync_bot_workspace_via_provider(
session,
bot,
target_override=provider_target,
channels_override=self._normalize_initial_channels(normalized_bot_id, getattr(payload, "channels", None)),
global_delivery_override={
"sendProgress": bool(getattr(payload, "send_progress", None))
if getattr(payload, "send_progress", None) is not None
else False,
"sendToolHints": bool(getattr(payload, "send_tool_hints", None))
if getattr(payload, "send_tool_hints", None) is not None
else False,
},
runtime_overrides={
"llm_provider": getattr(payload, "llm_provider", None),
"llm_model": getattr(payload, "llm_model", None),
"api_key": getattr(payload, "api_key", None),
"api_base": getattr(payload, "api_base", "") or "",
"temperature": getattr(payload, "temperature", None),
"top_p": getattr(payload, "top_p", None),
"max_tokens": getattr(payload, "max_tokens", None),
"cpu_cores": resource_limits["cpu_cores"],
"memory_mb": resource_limits["memory_mb"],
"storage_gb": resource_limits["storage_gb"],
"node_id": provider_target.node_id,
"transport_kind": provider_target.transport_kind,
"runtime_kind": provider_target.runtime_kind,
"core_adapter": provider_target.core_adapter,
"system_prompt": getattr(payload, "system_prompt", None) or getattr(payload, "soul_md", None) or DEFAULT_SOUL_MD,
"soul_md": getattr(payload, "soul_md", None) or getattr(payload, "system_prompt", None) or DEFAULT_SOUL_MD,
"agents_md": getattr(payload, "agents_md", None) or DEFAULT_AGENTS_MD,
"user_md": getattr(payload, "user_md", None) or DEFAULT_USER_MD,
"tools_md": getattr(payload, "tools_md", None) or DEFAULT_TOOLS_MD,
"identity_md": getattr(payload, "identity_md", None) or DEFAULT_IDENTITY_MD,
"send_progress": bool(getattr(payload, "send_progress", None))
if getattr(payload, "send_progress", None) is not None
else False,
"send_tool_hints": bool(getattr(payload, "send_tool_hints", None))
if getattr(payload, "send_tool_hints", None) is not None
else False,
},
)
except Exception as exc:
if self._is_expected_edge_offline_error(exc):
workspace_synced = False
sync_error_detail = self._summarize_edge_exception(exc)
self._logger.info(
"Create bot pending sync due to offline edge bot_id=%s node=%s detail=%s",
normalized_bot_id,
provider_target.node_id,
sync_error_detail,
)
else:
detail = self._summarize_edge_exception(exc)
try:
doomed = session.get(BotInstance, normalized_bot_id)
if doomed is not None:
session.delete(doomed)
session.commit()
self._clear_provider_target_override(normalized_bot_id)
except Exception:
session.rollback()
raise HTTPException(status_code=502, detail=f"Failed to initialize bot workspace: {detail}") from exc
session.refresh(bot)
self._record_activity_event(
session,
normalized_bot_id,
"bot_created",
channel="system",
detail=f"Bot {normalized_bot_id} created",
metadata={
"image_tag": normalized_image_tag,
"workspace_synced": workspace_synced,
"sync_error": sync_error_detail if not workspace_synced else "",
},
)
if not workspace_synced:
self._record_activity_event(
session,
normalized_bot_id,
"bot_warning",
channel="system",
detail="Bot created, but node is offline. Workspace sync is pending.",
metadata={"sync_error": sync_error_detail, "node_id": provider_target.node_id},
)
session.commit()
self._invalidate_bot_detail_cache(normalized_bot_id)
return self._serialize_bot(bot)
def update_bot(self, *, session: Session, bot_id: str, payload: Any) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
update_data = payload.model_dump(exclude_unset=True)
env_params = update_data.pop("env_params", None) if isinstance(update_data, dict) else None
system_timezone = update_data.pop("system_timezone", None) if isinstance(update_data, dict) else None
normalized_system_timezone: Optional[str] = None
if system_timezone is not None:
try:
normalized_system_timezone = self._normalize_system_timezone(system_timezone)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
runtime_overrides: Dict[str, Any] = {}
update_data.pop("tools_config", None) if isinstance(update_data, dict) else None
runtime_fields = {
"llm_provider",
"llm_model",
"api_key",
"api_base",
"temperature",
"top_p",
"max_tokens",
"cpu_cores",
"memory_mb",
"storage_gb",
"soul_md",
"agents_md",
"user_md",
"tools_md",
"identity_md",
"send_progress",
"send_tool_hints",
"system_prompt",
}
execution_target_fields = {
"node_id",
"transport_kind",
"runtime_kind",
"core_adapter",
}
deploy_only_fields = {"image_tag", *execution_target_fields}
if deploy_only_fields & set(update_data.keys()):
raise HTTPException(
status_code=400,
detail=f"Use /api/bots/{bot_id}/deploy for execution target or image changes",
)
for field in runtime_fields:
if field in update_data:
runtime_overrides[field] = update_data.pop(field)
next_target: Optional[ProviderTarget] = None
for text_field in ("llm_provider", "llm_model", "api_key"):
if text_field in runtime_overrides:
text = str(runtime_overrides.get(text_field) or "").strip()
if not text:
runtime_overrides.pop(text_field, None)
else:
runtime_overrides[text_field] = text
if "api_base" in runtime_overrides:
runtime_overrides["api_base"] = str(runtime_overrides.get("api_base") or "").strip()
if "system_prompt" in runtime_overrides and "soul_md" not in runtime_overrides:
runtime_overrides["soul_md"] = runtime_overrides["system_prompt"]
if "soul_md" in runtime_overrides and "system_prompt" not in runtime_overrides:
runtime_overrides["system_prompt"] = runtime_overrides["soul_md"]
if {"cpu_cores", "memory_mb", "storage_gb"} & set(runtime_overrides.keys()):
normalized_resources = self._normalize_resource_limits(
runtime_overrides.get("cpu_cores"),
runtime_overrides.get("memory_mb"),
runtime_overrides.get("storage_gb"),
)
runtime_overrides.update(normalized_resources)
db_fields = {"name", "enabled"}
for key, value in update_data.items():
if key in db_fields:
setattr(bot, key, value)
previous_env_params: Optional[Dict[str, str]] = None
next_env_params: Optional[Dict[str, str]] = None
if env_params is not None or normalized_system_timezone is not None:
previous_env_params = self._resolve_bot_env_params(bot_id)
next_env_params = dict(previous_env_params)
if env_params is not None:
next_env_params = self._normalize_env_params(env_params)
if normalized_system_timezone is not None:
next_env_params["TZ"] = normalized_system_timezone
global_delivery_override: Optional[Dict[str, Any]] = None
if "send_progress" in runtime_overrides or "send_tool_hints" in runtime_overrides:
global_delivery_override = {}
if "send_progress" in runtime_overrides:
global_delivery_override["sendProgress"] = bool(runtime_overrides.get("send_progress"))
if "send_tool_hints" in runtime_overrides:
global_delivery_override["sendToolHints"] = bool(runtime_overrides.get("send_tool_hints"))
self._sync_bot_workspace_via_provider(
session,
bot,
target_override=next_target,
runtime_overrides=runtime_overrides if runtime_overrides else None,
global_delivery_override=global_delivery_override,
)
try:
if next_env_params is not None:
self._write_env_store(bot_id, next_env_params)
if next_target is not None:
self._apply_provider_target_to_bot(bot, next_target)
session.add(bot)
session.commit()
except Exception:
session.rollback()
if previous_env_params is not None:
self._write_env_store(bot_id, previous_env_params)
raise
session.refresh(bot)
self._invalidate_bot_detail_cache(bot_id)
return self._serialize_bot(bot)
async def start_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
return await self._runtime_service.start_bot(app_state=app_state, session=session, bot=bot)
def stop_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
return self._runtime_service.stop_bot(app_state=app_state, session=session, bot=bot)
def enable_bot(self, *, session: Session, bot_id: str) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
bot.enabled = True
session.add(bot)
self._record_activity_event(session, bot_id, "bot_enabled", channel="system", detail=f"Bot {bot_id} enabled")
session.commit()
self._invalidate_bot_detail_cache(bot_id)
return {"status": "enabled", "enabled": True}
def disable_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
self._set_inactive(app_state=app_state, session=session, bot=bot, activity_type="bot_disabled", detail="disabled")
return {"status": "disabled", "enabled": False}
def deactivate_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
self._set_inactive(
app_state=app_state,
session=session,
bot=bot,
activity_type="bot_deactivated",
detail="deactivated",
)
return {"status": "deactivated"}
def delete_bot(
self,
*,
app_state: Any,
session: Session,
bot_id: str,
delete_workspace: bool = True,
) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
target = self._resolve_bot_provider_target(bot)
try:
self._runtime_service.stop_bot(app_state=app_state, session=session, bot=bot)
except Exception:
pass
workspace_deleted = not bool(delete_workspace)
if delete_workspace:
if target.transport_kind == "edge":
try:
workspace_root = str(self._node_metadata(target.node_id).get("workspace_root") or "").strip() or None
purge_result = self._resolve_edge_client(target).purge_workspace(
bot_id=bot_id,
workspace_root=workspace_root,
)
workspace_deleted = str(purge_result.get("status") or "").strip().lower() in {"deleted", "not_found"}
except Exception as exc:
self._log_edge_failure(
self._logger,
key=f"bot-delete-workspace:{bot_id}",
exc=exc,
message=f"Failed to purge edge workspace for bot_id={bot_id}",
)
workspace_deleted = False
workspace_root = os.path.join(BOTS_WORKSPACE_ROOT, bot_id)
if os.path.isdir(workspace_root):
shutil.rmtree(workspace_root, ignore_errors=True)
workspace_deleted = True
messages = session.exec(select(BotMessage).where(BotMessage.bot_id == bot_id)).all()
for row in messages:
session.delete(row)
topic_items = session.exec(select(TopicItem).where(TopicItem.bot_id == bot_id)).all()
for row in topic_items:
session.delete(row)
topics = session.exec(select(TopicTopic).where(TopicTopic.bot_id == bot_id)).all()
for row in topics:
session.delete(row)
usage_rows = session.exec(select(BotRequestUsage).where(BotRequestUsage.bot_id == bot_id)).all()
for row in usage_rows:
session.delete(row)
activity_rows = session.exec(select(BotActivityEvent).where(BotActivityEvent.bot_id == bot_id)).all()
for row in activity_rows:
session.delete(row)
skill_install_rows = session.exec(select(BotSkillInstall).where(BotSkillInstall.bot_id == bot_id)).all()
for row in skill_install_rows:
session.delete(row)
session.delete(bot)
session.commit()
self._clear_provider_target_override(bot_id)
self._invalidate_bot_detail_cache(bot_id)
self._invalidate_bot_messages_cache(bot_id)
return {"status": "deleted", "workspace_deleted": workspace_deleted}
async def deploy_bot(
self,
*,
app_state: Any,
session: Session,
bot_id: str,
node_id: str,
runtime_kind: Optional[str] = None,
image_tag: Optional[str] = None,
auto_start: bool = False,
) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
actual_status = self._refresh_bot_runtime_status(app_state, bot)
session.add(bot)
session.commit()
if actual_status == "RUNNING":
raise HTTPException(status_code=409, detail="Stop the bot before deploy or migrate")
current_target = self._resolve_bot_provider_target(bot)
next_target_base = self._provider_target_from_node(node_id)
if next_target_base is None:
raise HTTPException(status_code=400, detail=f"Managed node not found: {node_id}")
next_target = normalize_provider_target(
{
"node_id": node_id,
"runtime_kind": runtime_kind,
},
fallback=next_target_base,
)
self._ensure_provider_target_supported(next_target)
existing_image_tag = str(bot.image_tag or "").strip()
requested_image_tag = str(image_tag or "").strip()
if next_target.runtime_kind == "docker":
requested_image_tag = requested_image_tag or existing_image_tag
image_changed = requested_image_tag != str(bot.image_tag or "").strip()
target_changed = next_target.key != current_target.key
if not image_changed and not target_changed:
raise HTTPException(status_code=400, detail="No deploy changes detected")
if next_target.runtime_kind == "docker":
self._require_ready_image(
session,
requested_image_tag,
require_local_image=True,
)
self._sync_bot_workspace_via_provider(
session,
bot,
target_override=next_target,
runtime_overrides=provider_target_to_dict(next_target),
)
previous_image_tag = str(bot.image_tag or "").strip()
bot.image_tag = requested_image_tag
self._apply_provider_target_to_bot(bot, next_target)
bot.updated_at = datetime.utcnow()
session.add(bot)
self._record_activity_event(
session,
bot_id,
"bot_deployed",
channel="system",
detail=(
f"Bot {bot_id} deployed to {self._node_display_name(next_target.node_id)}"
if target_changed
else f"Bot {bot_id} redeployed with image {requested_image_tag}"
),
metadata={
"previous_target": self._serialize_provider_target_summary(current_target),
"next_target": self._serialize_provider_target_summary(next_target),
"previous_image_tag": previous_image_tag,
"image_tag": requested_image_tag,
"auto_start": bool(auto_start),
},
)
session.commit()
session.refresh(bot)
started = False
if bool(auto_start):
await self._runtime_service.start_bot(app_state=app_state, session=session, bot=bot)
session.refresh(bot)
started = True
self._invalidate_bot_detail_cache(bot_id)
return {
"status": "deployed",
"bot": self._serialize_bot(bot),
"started": started,
"image_tag": requested_image_tag,
"previous_image_tag": previous_image_tag,
"previous_target": self._serialize_provider_target_summary(current_target),
"next_target": self._serialize_provider_target_summary(next_target),
}
def _set_inactive(
self,
*,
app_state: Any,
session: Session,
bot: BotInstance,
activity_type: str,
detail: str,
) -> None:
bot_id = str(bot.id or "").strip()
try:
self._runtime_service.stop_bot(app_state=app_state, session=session, bot=bot)
except Exception:
pass
bot.enabled = False
bot.docker_status = "STOPPED"
if str(bot.current_state or "").upper() not in {"ERROR"}:
bot.current_state = "IDLE"
session.add(bot)
self._record_activity_event(session, bot_id, activity_type, channel="system", detail=f"Bot {bot_id} {detail}")
session.commit()
self._invalidate_bot_detail_cache(bot_id)