dashboard-nanobot/backend/services/runtime_service.py

170 lines
7.0 KiB
Python

from datetime import datetime
from typing import Any, Callable, Dict
from sqlmodel import Session, select
from models.bot import BotInstance, BotMessage
from fastapi import HTTPException
from providers.runtime.base import RuntimeProvider
from services.bot_command_service import BotCommandService
class RuntimeService:
def __init__(
self,
*,
command_service: BotCommandService,
resolve_runtime_provider: Callable[[Any, BotInstance], RuntimeProvider],
clear_bot_sessions: Callable[[str], int],
clear_dashboard_direct_session_file: Callable[[str], Dict[str, Any]],
invalidate_bot_detail_cache: Callable[[str], None],
invalidate_bot_messages_cache: Callable[[str], None],
record_activity_event: Callable[..., None],
) -> None:
self._command_service = command_service
self._resolve_runtime_provider = resolve_runtime_provider
self._clear_bot_sessions = clear_bot_sessions
self._clear_dashboard_direct_session_file = clear_dashboard_direct_session_file
self._invalidate_bot_detail_cache = invalidate_bot_detail_cache
self._invalidate_bot_messages_cache = invalidate_bot_messages_cache
self._record_activity_event = record_activity_event
def _require_bot(self, *, session: Session, bot_id: str) -> BotInstance:
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
return bot
async def start_bot(self, *, app_state: Any, session: Session, bot: BotInstance) -> Dict[str, Any]:
result = await self._resolve_runtime_provider(app_state, bot).start_bot(session=session, bot=bot)
self._invalidate_bot_detail_cache(str(bot.id or ""))
return result
def stop_bot(self, *, app_state: Any, session: Session, bot: BotInstance) -> Dict[str, Any]:
result = self._resolve_runtime_provider(app_state, bot).stop_bot(session=session, bot=bot)
self._invalidate_bot_detail_cache(str(bot.id or ""))
return result
async def clear_messages_for_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
return self.clear_messages(app_state=app_state, session=session, bot=bot)
def clear_dashboard_direct_session_for_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
return self.clear_dashboard_direct_session(app_state=app_state, session=session, bot=bot)
def get_logs_for_bot(self, *, app_state: Any, session: Session, bot_id: str, tail: int = 300) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
return self.get_logs(app_state=app_state, bot=bot, tail=tail)
def send_command_for_bot(
self,
*,
app_state: Any,
session: Session,
bot_id: str,
payload: Any,
) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
return self.send_command(
app_state=app_state,
session=session,
bot_id=bot_id,
bot=bot,
payload=payload,
)
def send_command(
self,
*,
app_state: Any,
session: Session,
bot_id: str,
bot: BotInstance,
payload: Any,
) -> Dict[str, Any]:
return self._command_service.execute(
session=session,
bot_id=bot_id,
bot=bot,
payload=payload,
runtime_provider=self._resolve_runtime_provider(app_state, bot),
app_state=app_state,
)
def get_logs(self, *, app_state: Any, bot: BotInstance, tail: int = 300) -> Dict[str, Any]:
return {
"bot_id": bot.id,
"logs": self._resolve_runtime_provider(app_state, bot).get_recent_logs(bot_id=bot.id, tail=tail),
}
def ensure_monitor(self, *, app_state: Any, bot: BotInstance) -> bool:
return bool(self._resolve_runtime_provider(app_state, bot).ensure_monitor(bot_id=bot.id))
def sync_edge_monitor_packets(self, *, app_state: Any, bot: BotInstance, request_id: str) -> None:
runtime_provider = self._resolve_runtime_provider(app_state, bot)
self._command_service.sync_edge_monitor_packets(
runtime_provider=runtime_provider,
bot_id=str(bot.id or "").strip(),
request_id=str(request_id or "").strip(),
app_state=app_state,
)
def clear_messages(self, *, app_state: Any, session: Session, bot: BotInstance) -> Dict[str, Any]:
bot_id = str(bot.id or "").strip()
rows = session.exec(select(BotMessage).where(BotMessage.bot_id == bot_id)).all()
deleted = 0
for row in rows:
session.delete(row)
deleted += 1
cleared_sessions = self._clear_bot_sessions(bot_id)
self._reset_running_runtime_session(app_state=app_state, bot=bot)
bot.last_action = ""
bot.current_state = "IDLE"
bot.updated_at = datetime.utcnow()
session.add(bot)
self._record_activity_event(
session,
bot_id,
"history_cleared",
channel="system",
detail=f"Cleared {deleted} stored messages",
metadata={"deleted_messages": deleted, "cleared_sessions": cleared_sessions},
)
session.commit()
self._invalidate_bot_detail_cache(bot_id)
self._invalidate_bot_messages_cache(bot_id)
return {"bot_id": bot_id, "deleted": deleted, "cleared_sessions": cleared_sessions}
def clear_dashboard_direct_session(self, *, app_state: Any, session: Session, bot: BotInstance) -> Dict[str, Any]:
bot_id = str(bot.id or "").strip()
result = self._clear_dashboard_direct_session_file(bot_id)
self._reset_running_runtime_session(app_state=app_state, bot=bot)
bot.updated_at = datetime.utcnow()
session.add(bot)
self._record_activity_event(
session,
bot_id,
"dashboard_session_cleared",
channel="dashboard",
detail="Cleared dashboard_direct session file",
metadata={"session_file": result["path"], "previously_existed": result["existed"]},
)
session.commit()
self._invalidate_bot_detail_cache(bot_id)
return {"bot_id": bot_id, "cleared": True, "session_file": result["path"], "previously_existed": result["existed"]}
def _reset_running_runtime_session(self, *, app_state: Any, bot: BotInstance) -> None:
if not self._is_runtime_running(bot):
return
try:
self._resolve_runtime_provider(app_state, bot).deliver_command(bot_id=str(bot.id), command="/new")
except Exception:
pass
@staticmethod
def _is_runtime_running(bot: BotInstance) -> bool:
runtime_status = str(getattr(bot, "runtime_status", None) or getattr(bot, "docker_status", None) or "").upper()
return runtime_status == "RUNNING"