170 lines
7.0 KiB
Python
170 lines
7.0 KiB
Python
from datetime import datetime
|
|
from typing import Any, Callable, Dict
|
|
|
|
from sqlmodel import Session, select
|
|
|
|
from models.bot import BotInstance, BotMessage
|
|
from fastapi import HTTPException
|
|
from providers.runtime.base import RuntimeProvider
|
|
from services.bot_command_service import BotCommandService
|
|
|
|
|
|
class RuntimeService:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
command_service: BotCommandService,
|
|
resolve_runtime_provider: Callable[[Any, BotInstance], RuntimeProvider],
|
|
clear_bot_sessions: Callable[[str], int],
|
|
clear_dashboard_direct_session_file: Callable[[str], Dict[str, Any]],
|
|
invalidate_bot_detail_cache: Callable[[str], None],
|
|
invalidate_bot_messages_cache: Callable[[str], None],
|
|
record_activity_event: Callable[..., None],
|
|
) -> None:
|
|
self._command_service = command_service
|
|
self._resolve_runtime_provider = resolve_runtime_provider
|
|
self._clear_bot_sessions = clear_bot_sessions
|
|
self._clear_dashboard_direct_session_file = clear_dashboard_direct_session_file
|
|
self._invalidate_bot_detail_cache = invalidate_bot_detail_cache
|
|
self._invalidate_bot_messages_cache = invalidate_bot_messages_cache
|
|
self._record_activity_event = record_activity_event
|
|
|
|
def _require_bot(self, *, session: Session, bot_id: str) -> BotInstance:
|
|
bot = session.get(BotInstance, bot_id)
|
|
if not bot:
|
|
raise HTTPException(status_code=404, detail="Bot not found")
|
|
return bot
|
|
|
|
async def start_bot(self, *, app_state: Any, session: Session, bot: BotInstance) -> Dict[str, Any]:
|
|
result = await self._resolve_runtime_provider(app_state, bot).start_bot(session=session, bot=bot)
|
|
self._invalidate_bot_detail_cache(str(bot.id or ""))
|
|
return result
|
|
|
|
def stop_bot(self, *, app_state: Any, session: Session, bot: BotInstance) -> Dict[str, Any]:
|
|
result = self._resolve_runtime_provider(app_state, bot).stop_bot(session=session, bot=bot)
|
|
self._invalidate_bot_detail_cache(str(bot.id or ""))
|
|
return result
|
|
|
|
async def clear_messages_for_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
return self.clear_messages(app_state=app_state, session=session, bot=bot)
|
|
|
|
def clear_dashboard_direct_session_for_bot(self, *, app_state: Any, session: Session, bot_id: str) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
return self.clear_dashboard_direct_session(app_state=app_state, session=session, bot=bot)
|
|
|
|
def get_logs_for_bot(self, *, app_state: Any, session: Session, bot_id: str, tail: int = 300) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
return self.get_logs(app_state=app_state, bot=bot, tail=tail)
|
|
|
|
def send_command_for_bot(
|
|
self,
|
|
*,
|
|
app_state: Any,
|
|
session: Session,
|
|
bot_id: str,
|
|
payload: Any,
|
|
) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
return self.send_command(
|
|
app_state=app_state,
|
|
session=session,
|
|
bot_id=bot_id,
|
|
bot=bot,
|
|
payload=payload,
|
|
)
|
|
|
|
def send_command(
|
|
self,
|
|
*,
|
|
app_state: Any,
|
|
session: Session,
|
|
bot_id: str,
|
|
bot: BotInstance,
|
|
payload: Any,
|
|
) -> Dict[str, Any]:
|
|
return self._command_service.execute(
|
|
session=session,
|
|
bot_id=bot_id,
|
|
bot=bot,
|
|
payload=payload,
|
|
runtime_provider=self._resolve_runtime_provider(app_state, bot),
|
|
app_state=app_state,
|
|
)
|
|
|
|
def get_logs(self, *, app_state: Any, bot: BotInstance, tail: int = 300) -> Dict[str, Any]:
|
|
return {
|
|
"bot_id": bot.id,
|
|
"logs": self._resolve_runtime_provider(app_state, bot).get_recent_logs(bot_id=bot.id, tail=tail),
|
|
}
|
|
|
|
def ensure_monitor(self, *, app_state: Any, bot: BotInstance) -> bool:
|
|
return bool(self._resolve_runtime_provider(app_state, bot).ensure_monitor(bot_id=bot.id))
|
|
|
|
def sync_edge_monitor_packets(self, *, app_state: Any, bot: BotInstance, request_id: str) -> None:
|
|
runtime_provider = self._resolve_runtime_provider(app_state, bot)
|
|
self._command_service.sync_edge_monitor_packets(
|
|
runtime_provider=runtime_provider,
|
|
bot_id=str(bot.id or "").strip(),
|
|
request_id=str(request_id or "").strip(),
|
|
app_state=app_state,
|
|
)
|
|
|
|
def clear_messages(self, *, app_state: Any, session: Session, bot: BotInstance) -> Dict[str, Any]:
|
|
bot_id = str(bot.id or "").strip()
|
|
rows = session.exec(select(BotMessage).where(BotMessage.bot_id == bot_id)).all()
|
|
deleted = 0
|
|
for row in rows:
|
|
session.delete(row)
|
|
deleted += 1
|
|
|
|
cleared_sessions = self._clear_bot_sessions(bot_id)
|
|
self._reset_running_runtime_session(app_state=app_state, bot=bot)
|
|
bot.last_action = ""
|
|
bot.current_state = "IDLE"
|
|
bot.updated_at = datetime.utcnow()
|
|
session.add(bot)
|
|
self._record_activity_event(
|
|
session,
|
|
bot_id,
|
|
"history_cleared",
|
|
channel="system",
|
|
detail=f"Cleared {deleted} stored messages",
|
|
metadata={"deleted_messages": deleted, "cleared_sessions": cleared_sessions},
|
|
)
|
|
session.commit()
|
|
self._invalidate_bot_detail_cache(bot_id)
|
|
self._invalidate_bot_messages_cache(bot_id)
|
|
return {"bot_id": bot_id, "deleted": deleted, "cleared_sessions": cleared_sessions}
|
|
|
|
def clear_dashboard_direct_session(self, *, app_state: Any, session: Session, bot: BotInstance) -> Dict[str, Any]:
|
|
bot_id = str(bot.id or "").strip()
|
|
result = self._clear_dashboard_direct_session_file(bot_id)
|
|
self._reset_running_runtime_session(app_state=app_state, bot=bot)
|
|
bot.updated_at = datetime.utcnow()
|
|
session.add(bot)
|
|
self._record_activity_event(
|
|
session,
|
|
bot_id,
|
|
"dashboard_session_cleared",
|
|
channel="dashboard",
|
|
detail="Cleared dashboard_direct session file",
|
|
metadata={"session_file": result["path"], "previously_existed": result["existed"]},
|
|
)
|
|
session.commit()
|
|
self._invalidate_bot_detail_cache(bot_id)
|
|
return {"bot_id": bot_id, "cleared": True, "session_file": result["path"], "previously_existed": result["existed"]}
|
|
|
|
def _reset_running_runtime_session(self, *, app_state: Any, bot: BotInstance) -> None:
|
|
if not self._is_runtime_running(bot):
|
|
return
|
|
try:
|
|
self._resolve_runtime_provider(app_state, bot).deliver_command(bot_id=str(bot.id), command="/new")
|
|
except Exception:
|
|
pass
|
|
|
|
@staticmethod
|
|
def _is_runtime_running(bot: BotInstance) -> bool:
|
|
runtime_status = str(getattr(bot, "runtime_status", None) or getattr(bot, "docker_status", None) or "").upper()
|
|
return runtime_status == "RUNNING"
|