612 lines
27 KiB
Python
612 lines
27 KiB
Python
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
from datetime import datetime
|
|
from typing import Any, Callable, Dict, Optional
|
|
|
|
from fastapi import HTTPException
|
|
from sqlmodel import Session, select
|
|
|
|
from core.settings import (
|
|
BOTS_WORKSPACE_ROOT,
|
|
DEFAULT_AGENTS_MD,
|
|
DEFAULT_IDENTITY_MD,
|
|
DEFAULT_SOUL_MD,
|
|
DEFAULT_TOOLS_MD,
|
|
DEFAULT_USER_MD,
|
|
)
|
|
from models.bot import BotInstance, BotMessage
|
|
from models.platform import BotActivityEvent, BotRequestUsage
|
|
from models.skill import BotSkillInstall
|
|
from models.topic import TopicItem, TopicTopic
|
|
from providers.target import ProviderTarget, normalize_provider_target, provider_target_to_dict
|
|
from services.runtime_service import RuntimeService
|
|
|
|
RefreshBotRuntimeStatus = Callable[[Any, BotInstance], str]
|
|
ResolveBotProviderTarget = Callable[[BotInstance], ProviderTarget]
|
|
ProviderTargetFromNode = Callable[[Optional[str]], Optional[ProviderTarget]]
|
|
DefaultProviderTarget = Callable[[], ProviderTarget]
|
|
EnsureProviderTargetSupported = Callable[[ProviderTarget], None]
|
|
RequireReadyImage = Callable[..., Any]
|
|
SyncBotWorkspaceViaProvider = Callable[..., None]
|
|
ApplyProviderTargetToBot = Callable[[BotInstance, ProviderTarget], None]
|
|
SerializeProviderTargetSummary = Callable[[ProviderTarget], Dict[str, Any]]
|
|
SerializeBot = Callable[[BotInstance], Dict[str, Any]]
|
|
NodeDisplayName = Callable[[str], str]
|
|
InvalidateBotCache = Callable[[str], None]
|
|
RecordActivityEvent = Callable[..., None]
|
|
NormalizeEnvParams = Callable[[Any], Dict[str, str]]
|
|
NormalizeSystemTimezone = Callable[[Any], str]
|
|
NormalizeResourceLimits = Callable[[Any, Any, Any], Dict[str, Any]]
|
|
WriteEnvStore = Callable[[str, Dict[str, str]], None]
|
|
ResolveBotEnvParams = Callable[[str], Dict[str, str]]
|
|
ClearProviderTargetOverride = Callable[[str], None]
|
|
NormalizeInitialChannels = Callable[[str, Any], Any]
|
|
ExpectedEdgeOfflineError = Callable[[Exception], bool]
|
|
SummarizeEdgeException = Callable[[Exception], str]
|
|
ResolveEdgeClient = Callable[[ProviderTarget], Any]
|
|
NodeMetadata = Callable[[str], Dict[str, Any]]
|
|
LogEdgeFailure = Callable[..., None]
|
|
InvalidateBotMessagesCache = Callable[[str], None]
|
|
|
|
|
|
class BotLifecycleService:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
bot_id_pattern: re.Pattern[str],
|
|
runtime_service: RuntimeService,
|
|
refresh_bot_runtime_status: RefreshBotRuntimeStatus,
|
|
resolve_bot_provider_target: ResolveBotProviderTarget,
|
|
provider_target_from_node: ProviderTargetFromNode,
|
|
default_provider_target: DefaultProviderTarget,
|
|
ensure_provider_target_supported: EnsureProviderTargetSupported,
|
|
require_ready_image: RequireReadyImage,
|
|
sync_bot_workspace_via_provider: SyncBotWorkspaceViaProvider,
|
|
apply_provider_target_to_bot: ApplyProviderTargetToBot,
|
|
serialize_provider_target_summary: SerializeProviderTargetSummary,
|
|
serialize_bot: SerializeBot,
|
|
node_display_name: NodeDisplayName,
|
|
invalidate_bot_detail_cache: InvalidateBotCache,
|
|
record_activity_event: RecordActivityEvent,
|
|
normalize_env_params: NormalizeEnvParams,
|
|
normalize_system_timezone: NormalizeSystemTimezone,
|
|
normalize_resource_limits: NormalizeResourceLimits,
|
|
write_env_store: WriteEnvStore,
|
|
resolve_bot_env_params: ResolveBotEnvParams,
|
|
clear_provider_target_override: ClearProviderTargetOverride,
|
|
normalize_initial_channels: NormalizeInitialChannels,
|
|
is_expected_edge_offline_error: ExpectedEdgeOfflineError,
|
|
summarize_edge_exception: SummarizeEdgeException,
|
|
resolve_edge_client: ResolveEdgeClient,
|
|
node_metadata: NodeMetadata,
|
|
log_edge_failure: LogEdgeFailure,
|
|
invalidate_bot_messages_cache: InvalidateBotMessagesCache,
|
|
logger: logging.Logger,
|
|
) -> None:
|
|
self._bot_id_pattern = bot_id_pattern
|
|
self._runtime_service = runtime_service
|
|
self._refresh_bot_runtime_status = refresh_bot_runtime_status
|
|
self._resolve_bot_provider_target = resolve_bot_provider_target
|
|
self._provider_target_from_node = provider_target_from_node
|
|
self._default_provider_target = default_provider_target
|
|
self._ensure_provider_target_supported = ensure_provider_target_supported
|
|
self._require_ready_image = require_ready_image
|
|
self._sync_bot_workspace_via_provider = sync_bot_workspace_via_provider
|
|
self._apply_provider_target_to_bot = apply_provider_target_to_bot
|
|
self._serialize_provider_target_summary = serialize_provider_target_summary
|
|
self._serialize_bot = serialize_bot
|
|
self._node_display_name = node_display_name
|
|
self._invalidate_bot_detail_cache = invalidate_bot_detail_cache
|
|
self._record_activity_event = record_activity_event
|
|
self._normalize_env_params = normalize_env_params
|
|
self._normalize_system_timezone = normalize_system_timezone
|
|
self._normalize_resource_limits = normalize_resource_limits
|
|
self._write_env_store = write_env_store
|
|
self._resolve_bot_env_params = resolve_bot_env_params
|
|
self._clear_provider_target_override = clear_provider_target_override
|
|
self._normalize_initial_channels = normalize_initial_channels
|
|
self._is_expected_edge_offline_error = is_expected_edge_offline_error
|
|
self._summarize_edge_exception = summarize_edge_exception
|
|
self._resolve_edge_client = resolve_edge_client
|
|
self._node_metadata = node_metadata
|
|
self._log_edge_failure = log_edge_failure
|
|
self._invalidate_bot_messages_cache = invalidate_bot_messages_cache
|
|
self._logger = logger
|
|
|
|
def _require_bot(self, *, session: Session, bot_id: str) -> BotInstance:
|
|
bot = session.get(BotInstance, bot_id)
|
|
if not bot:
|
|
raise HTTPException(status_code=404, detail="Bot not found")
|
|
return bot
|
|
|
|
def create_bot(self, *, session: Session, payload: Any) -> Dict[str, Any]:
|
|
normalized_bot_id = str(getattr(payload, "id", "") or "").strip()
|
|
if not normalized_bot_id:
|
|
raise HTTPException(status_code=400, detail="Bot ID is required")
|
|
if not self._bot_id_pattern.fullmatch(normalized_bot_id):
|
|
raise HTTPException(status_code=400, detail="Bot ID can only contain letters, numbers, and underscores")
|
|
if session.get(BotInstance, normalized_bot_id):
|
|
raise HTTPException(status_code=409, detail=f"Bot ID already exists: {normalized_bot_id}")
|
|
|
|
normalized_env_params = self._normalize_env_params(getattr(payload, "env_params", None))
|
|
try:
|
|
normalized_env_params["TZ"] = self._normalize_system_timezone(getattr(payload, "system_timezone", None))
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc)) from exc
|
|
|
|
provider_target = normalize_provider_target(
|
|
{
|
|
"node_id": getattr(payload, "node_id", None),
|
|
"transport_kind": getattr(payload, "transport_kind", None),
|
|
"runtime_kind": getattr(payload, "runtime_kind", None),
|
|
"core_adapter": getattr(payload, "core_adapter", None),
|
|
},
|
|
fallback=self._provider_target_from_node(getattr(payload, "node_id", None)) or self._default_provider_target(),
|
|
)
|
|
self._ensure_provider_target_supported(provider_target)
|
|
|
|
normalized_image_tag = str(getattr(payload, "image_tag", "") or "").strip()
|
|
if provider_target.runtime_kind == "docker":
|
|
self._require_ready_image(session, normalized_image_tag, require_local_image=True)
|
|
|
|
bot = BotInstance(
|
|
id=normalized_bot_id,
|
|
name=getattr(payload, "name", None),
|
|
enabled=bool(getattr(payload, "enabled", True)) if getattr(payload, "enabled", None) is not None else True,
|
|
access_password="",
|
|
image_tag=normalized_image_tag,
|
|
node_id=provider_target.node_id,
|
|
transport_kind=provider_target.transport_kind,
|
|
runtime_kind=provider_target.runtime_kind,
|
|
core_adapter=provider_target.core_adapter,
|
|
workspace_dir=os.path.join(BOTS_WORKSPACE_ROOT, normalized_bot_id),
|
|
)
|
|
|
|
session.add(bot)
|
|
session.commit()
|
|
session.refresh(bot)
|
|
|
|
resource_limits = self._normalize_resource_limits(
|
|
getattr(payload, "cpu_cores", None),
|
|
getattr(payload, "memory_mb", None),
|
|
getattr(payload, "storage_gb", None),
|
|
)
|
|
workspace_synced = True
|
|
sync_error_detail = ""
|
|
try:
|
|
self._write_env_store(normalized_bot_id, normalized_env_params)
|
|
self._sync_bot_workspace_via_provider(
|
|
session,
|
|
bot,
|
|
target_override=provider_target,
|
|
channels_override=self._normalize_initial_channels(normalized_bot_id, getattr(payload, "channels", None)),
|
|
global_delivery_override={
|
|
"sendProgress": bool(getattr(payload, "send_progress", None))
|
|
if getattr(payload, "send_progress", None) is not None
|
|
else False,
|
|
"sendToolHints": bool(getattr(payload, "send_tool_hints", None))
|
|
if getattr(payload, "send_tool_hints", None) is not None
|
|
else False,
|
|
},
|
|
runtime_overrides={
|
|
"llm_provider": getattr(payload, "llm_provider", None),
|
|
"llm_model": getattr(payload, "llm_model", None),
|
|
"api_key": getattr(payload, "api_key", None),
|
|
"api_base": getattr(payload, "api_base", "") or "",
|
|
"temperature": getattr(payload, "temperature", None),
|
|
"top_p": getattr(payload, "top_p", None),
|
|
"max_tokens": getattr(payload, "max_tokens", None),
|
|
"cpu_cores": resource_limits["cpu_cores"],
|
|
"memory_mb": resource_limits["memory_mb"],
|
|
"storage_gb": resource_limits["storage_gb"],
|
|
"node_id": provider_target.node_id,
|
|
"transport_kind": provider_target.transport_kind,
|
|
"runtime_kind": provider_target.runtime_kind,
|
|
"core_adapter": provider_target.core_adapter,
|
|
"system_prompt": getattr(payload, "system_prompt", None) or getattr(payload, "soul_md", None) or DEFAULT_SOUL_MD,
|
|
"soul_md": getattr(payload, "soul_md", None) or getattr(payload, "system_prompt", None) or DEFAULT_SOUL_MD,
|
|
"agents_md": getattr(payload, "agents_md", None) or DEFAULT_AGENTS_MD,
|
|
"user_md": getattr(payload, "user_md", None) or DEFAULT_USER_MD,
|
|
"tools_md": getattr(payload, "tools_md", None) or DEFAULT_TOOLS_MD,
|
|
"identity_md": getattr(payload, "identity_md", None) or DEFAULT_IDENTITY_MD,
|
|
"send_progress": bool(getattr(payload, "send_progress", None))
|
|
if getattr(payload, "send_progress", None) is not None
|
|
else False,
|
|
"send_tool_hints": bool(getattr(payload, "send_tool_hints", None))
|
|
if getattr(payload, "send_tool_hints", None) is not None
|
|
else False,
|
|
},
|
|
)
|
|
except Exception as exc:
|
|
if self._is_expected_edge_offline_error(exc):
|
|
workspace_synced = False
|
|
sync_error_detail = self._summarize_edge_exception(exc)
|
|
self._logger.info(
|
|
"Create bot pending sync due to offline edge bot_id=%s node=%s detail=%s",
|
|
normalized_bot_id,
|
|
provider_target.node_id,
|
|
sync_error_detail,
|
|
)
|
|
else:
|
|
detail = self._summarize_edge_exception(exc)
|
|
try:
|
|
doomed = session.get(BotInstance, normalized_bot_id)
|
|
if doomed is not None:
|
|
session.delete(doomed)
|
|
session.commit()
|
|
self._clear_provider_target_override(normalized_bot_id)
|
|
except Exception:
|
|
session.rollback()
|
|
raise HTTPException(status_code=502, detail=f"Failed to initialize bot workspace: {detail}") from exc
|
|
|
|
session.refresh(bot)
|
|
self._record_activity_event(
|
|
session,
|
|
normalized_bot_id,
|
|
"bot_created",
|
|
channel="system",
|
|
detail=f"Bot {normalized_bot_id} created",
|
|
metadata={
|
|
"image_tag": normalized_image_tag,
|
|
"workspace_synced": workspace_synced,
|
|
"sync_error": sync_error_detail if not workspace_synced else "",
|
|
},
|
|
)
|
|
if not workspace_synced:
|
|
self._record_activity_event(
|
|
session,
|
|
normalized_bot_id,
|
|
"bot_warning",
|
|
channel="system",
|
|
detail="Bot created, but node is offline. Workspace sync is pending.",
|
|
metadata={"sync_error": sync_error_detail, "node_id": provider_target.node_id},
|
|
)
|
|
session.commit()
|
|
self._invalidate_bot_detail_cache(normalized_bot_id)
|
|
return self._serialize_bot(bot)
|
|
|
|
def update_bot(self, *, session: Session, bot_id: str, payload: Any) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
update_data = payload.model_dump(exclude_unset=True)
|
|
|
|
env_params = update_data.pop("env_params", None) if isinstance(update_data, dict) else None
|
|
system_timezone = update_data.pop("system_timezone", None) if isinstance(update_data, dict) else None
|
|
normalized_system_timezone: Optional[str] = None
|
|
if system_timezone is not None:
|
|
try:
|
|
normalized_system_timezone = self._normalize_system_timezone(system_timezone)
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc)) from exc
|
|
|
|
runtime_overrides: Dict[str, Any] = {}
|
|
update_data.pop("tools_config", None) if isinstance(update_data, dict) else None
|
|
|
|
runtime_fields = {
|
|
"llm_provider",
|
|
"llm_model",
|
|
"api_key",
|
|
"api_base",
|
|
"temperature",
|
|
"top_p",
|
|
"max_tokens",
|
|
"cpu_cores",
|
|
"memory_mb",
|
|
"storage_gb",
|
|
"soul_md",
|
|
"agents_md",
|
|
"user_md",
|
|
"tools_md",
|
|
"identity_md",
|
|
"send_progress",
|
|
"send_tool_hints",
|
|
"system_prompt",
|
|
}
|
|
execution_target_fields = {
|
|
"node_id",
|
|
"transport_kind",
|
|
"runtime_kind",
|
|
"core_adapter",
|
|
}
|
|
deploy_only_fields = {"image_tag", *execution_target_fields}
|
|
if deploy_only_fields & set(update_data.keys()):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Use /api/bots/{bot_id}/deploy for execution target or image changes",
|
|
)
|
|
for field in runtime_fields:
|
|
if field in update_data:
|
|
runtime_overrides[field] = update_data.pop(field)
|
|
|
|
next_target: Optional[ProviderTarget] = None
|
|
|
|
for text_field in ("llm_provider", "llm_model", "api_key"):
|
|
if text_field in runtime_overrides:
|
|
text = str(runtime_overrides.get(text_field) or "").strip()
|
|
if not text:
|
|
runtime_overrides.pop(text_field, None)
|
|
else:
|
|
runtime_overrides[text_field] = text
|
|
if "api_base" in runtime_overrides:
|
|
runtime_overrides["api_base"] = str(runtime_overrides.get("api_base") or "").strip()
|
|
|
|
if "system_prompt" in runtime_overrides and "soul_md" not in runtime_overrides:
|
|
runtime_overrides["soul_md"] = runtime_overrides["system_prompt"]
|
|
if "soul_md" in runtime_overrides and "system_prompt" not in runtime_overrides:
|
|
runtime_overrides["system_prompt"] = runtime_overrides["soul_md"]
|
|
if {"cpu_cores", "memory_mb", "storage_gb"} & set(runtime_overrides.keys()):
|
|
normalized_resources = self._normalize_resource_limits(
|
|
runtime_overrides.get("cpu_cores"),
|
|
runtime_overrides.get("memory_mb"),
|
|
runtime_overrides.get("storage_gb"),
|
|
)
|
|
runtime_overrides.update(normalized_resources)
|
|
|
|
db_fields = {"name", "enabled"}
|
|
for key, value in update_data.items():
|
|
if key in db_fields:
|
|
setattr(bot, key, value)
|
|
|
|
previous_env_params: Optional[Dict[str, str]] = None
|
|
next_env_params: Optional[Dict[str, str]] = None
|
|
if env_params is not None or normalized_system_timezone is not None:
|
|
previous_env_params = self._resolve_bot_env_params(bot_id)
|
|
next_env_params = dict(previous_env_params)
|
|
if env_params is not None:
|
|
next_env_params = self._normalize_env_params(env_params)
|
|
if normalized_system_timezone is not None:
|
|
next_env_params["TZ"] = normalized_system_timezone
|
|
|
|
global_delivery_override: Optional[Dict[str, Any]] = None
|
|
if "send_progress" in runtime_overrides or "send_tool_hints" in runtime_overrides:
|
|
global_delivery_override = {}
|
|
if "send_progress" in runtime_overrides:
|
|
global_delivery_override["sendProgress"] = bool(runtime_overrides.get("send_progress"))
|
|
if "send_tool_hints" in runtime_overrides:
|
|
global_delivery_override["sendToolHints"] = bool(runtime_overrides.get("send_tool_hints"))
|
|
|
|
self._sync_bot_workspace_via_provider(
|
|
session,
|
|
bot,
|
|
target_override=next_target,
|
|
runtime_overrides=runtime_overrides if runtime_overrides else None,
|
|
global_delivery_override=global_delivery_override,
|
|
)
|
|
try:
|
|
if next_env_params is not None:
|
|
self._write_env_store(bot_id, next_env_params)
|
|
if next_target is not None:
|
|
self._apply_provider_target_to_bot(bot, next_target)
|
|
session.add(bot)
|
|
session.commit()
|
|
except Exception:
|
|
session.rollback()
|
|
if previous_env_params is not None:
|
|
self._write_env_store(bot_id, previous_env_params)
|
|
raise
|
|
|
|
session.refresh(bot)
|
|
self._invalidate_bot_detail_cache(bot_id)
|
|
return self._serialize_bot(bot)
|
|
|
|
async def start_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
return await self._runtime_service.start_bot(app_state=app_state, session=session, bot=bot)
|
|
|
|
def stop_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
return self._runtime_service.stop_bot(app_state=app_state, session=session, bot=bot)
|
|
|
|
def enable_bot(self, *, session: Session, bot_id: str) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
bot.enabled = True
|
|
session.add(bot)
|
|
self._record_activity_event(session, bot_id, "bot_enabled", channel="system", detail=f"Bot {bot_id} enabled")
|
|
session.commit()
|
|
self._invalidate_bot_detail_cache(bot_id)
|
|
return {"status": "enabled", "enabled": True}
|
|
|
|
def disable_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
self._set_inactive(app_state=app_state, session=session, bot=bot, activity_type="bot_disabled", detail="disabled")
|
|
return {"status": "disabled", "enabled": False}
|
|
|
|
def deactivate_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
self._set_inactive(
|
|
app_state=app_state,
|
|
session=session,
|
|
bot=bot,
|
|
activity_type="bot_deactivated",
|
|
detail="deactivated",
|
|
)
|
|
return {"status": "deactivated"}
|
|
|
|
def delete_bot(
|
|
self,
|
|
*,
|
|
app_state: Any,
|
|
session: Session,
|
|
bot_id: str,
|
|
delete_workspace: bool = True,
|
|
) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
target = self._resolve_bot_provider_target(bot)
|
|
|
|
try:
|
|
self._runtime_service.stop_bot(app_state=app_state, session=session, bot=bot)
|
|
except Exception:
|
|
pass
|
|
|
|
workspace_deleted = not bool(delete_workspace)
|
|
if delete_workspace:
|
|
if target.transport_kind == "edge":
|
|
try:
|
|
workspace_root = str(self._node_metadata(target.node_id).get("workspace_root") or "").strip() or None
|
|
purge_result = self._resolve_edge_client(target).purge_workspace(
|
|
bot_id=bot_id,
|
|
workspace_root=workspace_root,
|
|
)
|
|
workspace_deleted = str(purge_result.get("status") or "").strip().lower() in {"deleted", "not_found"}
|
|
except Exception as exc:
|
|
self._log_edge_failure(
|
|
self._logger,
|
|
key=f"bot-delete-workspace:{bot_id}",
|
|
exc=exc,
|
|
message=f"Failed to purge edge workspace for bot_id={bot_id}",
|
|
)
|
|
workspace_deleted = False
|
|
|
|
workspace_root = os.path.join(BOTS_WORKSPACE_ROOT, bot_id)
|
|
if os.path.isdir(workspace_root):
|
|
shutil.rmtree(workspace_root, ignore_errors=True)
|
|
workspace_deleted = True
|
|
|
|
messages = session.exec(select(BotMessage).where(BotMessage.bot_id == bot_id)).all()
|
|
for row in messages:
|
|
session.delete(row)
|
|
topic_items = session.exec(select(TopicItem).where(TopicItem.bot_id == bot_id)).all()
|
|
for row in topic_items:
|
|
session.delete(row)
|
|
topics = session.exec(select(TopicTopic).where(TopicTopic.bot_id == bot_id)).all()
|
|
for row in topics:
|
|
session.delete(row)
|
|
usage_rows = session.exec(select(BotRequestUsage).where(BotRequestUsage.bot_id == bot_id)).all()
|
|
for row in usage_rows:
|
|
session.delete(row)
|
|
activity_rows = session.exec(select(BotActivityEvent).where(BotActivityEvent.bot_id == bot_id)).all()
|
|
for row in activity_rows:
|
|
session.delete(row)
|
|
skill_install_rows = session.exec(select(BotSkillInstall).where(BotSkillInstall.bot_id == bot_id)).all()
|
|
for row in skill_install_rows:
|
|
session.delete(row)
|
|
|
|
session.delete(bot)
|
|
session.commit()
|
|
self._clear_provider_target_override(bot_id)
|
|
self._invalidate_bot_detail_cache(bot_id)
|
|
self._invalidate_bot_messages_cache(bot_id)
|
|
return {"status": "deleted", "workspace_deleted": workspace_deleted}
|
|
|
|
async def deploy_bot(
|
|
self,
|
|
*,
|
|
app_state: Any,
|
|
session: Session,
|
|
bot_id: str,
|
|
node_id: str,
|
|
runtime_kind: Optional[str] = None,
|
|
image_tag: Optional[str] = None,
|
|
auto_start: bool = False,
|
|
) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
|
|
actual_status = self._refresh_bot_runtime_status(app_state, bot)
|
|
session.add(bot)
|
|
session.commit()
|
|
if actual_status == "RUNNING":
|
|
raise HTTPException(status_code=409, detail="Stop the bot before deploy or migrate")
|
|
|
|
current_target = self._resolve_bot_provider_target(bot)
|
|
next_target_base = self._provider_target_from_node(node_id)
|
|
if next_target_base is None:
|
|
raise HTTPException(status_code=400, detail=f"Managed node not found: {node_id}")
|
|
next_target = normalize_provider_target(
|
|
{
|
|
"node_id": node_id,
|
|
"runtime_kind": runtime_kind,
|
|
},
|
|
fallback=next_target_base,
|
|
)
|
|
self._ensure_provider_target_supported(next_target)
|
|
|
|
existing_image_tag = str(bot.image_tag or "").strip()
|
|
requested_image_tag = str(image_tag or "").strip()
|
|
if next_target.runtime_kind == "docker":
|
|
requested_image_tag = requested_image_tag or existing_image_tag
|
|
image_changed = requested_image_tag != str(bot.image_tag or "").strip()
|
|
target_changed = next_target.key != current_target.key
|
|
if not image_changed and not target_changed:
|
|
raise HTTPException(status_code=400, detail="No deploy changes detected")
|
|
|
|
if next_target.runtime_kind == "docker":
|
|
self._require_ready_image(
|
|
session,
|
|
requested_image_tag,
|
|
require_local_image=True,
|
|
)
|
|
|
|
self._sync_bot_workspace_via_provider(
|
|
session,
|
|
bot,
|
|
target_override=next_target,
|
|
runtime_overrides=provider_target_to_dict(next_target),
|
|
)
|
|
|
|
previous_image_tag = str(bot.image_tag or "").strip()
|
|
bot.image_tag = requested_image_tag
|
|
self._apply_provider_target_to_bot(bot, next_target)
|
|
bot.updated_at = datetime.utcnow()
|
|
session.add(bot)
|
|
self._record_activity_event(
|
|
session,
|
|
bot_id,
|
|
"bot_deployed",
|
|
channel="system",
|
|
detail=(
|
|
f"Bot {bot_id} deployed to {self._node_display_name(next_target.node_id)}"
|
|
if target_changed
|
|
else f"Bot {bot_id} redeployed with image {requested_image_tag}"
|
|
),
|
|
metadata={
|
|
"previous_target": self._serialize_provider_target_summary(current_target),
|
|
"next_target": self._serialize_provider_target_summary(next_target),
|
|
"previous_image_tag": previous_image_tag,
|
|
"image_tag": requested_image_tag,
|
|
"auto_start": bool(auto_start),
|
|
},
|
|
)
|
|
session.commit()
|
|
session.refresh(bot)
|
|
|
|
started = False
|
|
if bool(auto_start):
|
|
await self._runtime_service.start_bot(app_state=app_state, session=session, bot=bot)
|
|
session.refresh(bot)
|
|
started = True
|
|
|
|
self._invalidate_bot_detail_cache(bot_id)
|
|
return {
|
|
"status": "deployed",
|
|
"bot": self._serialize_bot(bot),
|
|
"started": started,
|
|
"image_tag": requested_image_tag,
|
|
"previous_image_tag": previous_image_tag,
|
|
"previous_target": self._serialize_provider_target_summary(current_target),
|
|
"next_target": self._serialize_provider_target_summary(next_target),
|
|
}
|
|
|
|
def _set_inactive(
|
|
self,
|
|
*,
|
|
app_state: Any,
|
|
session: Session,
|
|
bot: BotInstance,
|
|
activity_type: str,
|
|
detail: str,
|
|
) -> None:
|
|
bot_id = str(bot.id or "").strip()
|
|
try:
|
|
self._runtime_service.stop_bot(app_state=app_state, session=session, bot=bot)
|
|
except Exception:
|
|
pass
|
|
bot.enabled = False
|
|
bot.docker_status = "STOPPED"
|
|
if str(bot.current_state or "").upper() not in {"ERROR"}:
|
|
bot.current_state = "IDLE"
|
|
session.add(bot)
|
|
self._record_activity_event(session, bot_id, activity_type, channel="system", detail=f"Bot {bot_id} {detail}")
|
|
session.commit()
|
|
self._invalidate_bot_detail_cache(bot_id)
|