296 lines
10 KiB
Python
296 lines
10 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from sqlmodel import Session
|
|
|
|
from core.database import engine
|
|
from core.docker_instance import docker_manager
|
|
from core.websocket_manager import manager
|
|
from models.bot import BotInstance, BotMessage
|
|
from services.bot_storage_service import get_bot_workspace_root
|
|
from services.cache_service import _invalidate_bot_detail_cache, _invalidate_bot_messages_cache
|
|
from services.platform_activity_service import record_activity_event
|
|
from services.platform_usage_service import bind_usage_message, finalize_usage_from_packet
|
|
from services.topic_runtime import publish_runtime_topic_packet
|
|
|
|
logger = logging.getLogger("dashboard.backend")
|
|
|
|
_main_loop: Optional[asyncio.AbstractEventLoop] = None
|
|
_AGENT_LOOP_READY_MARKER = "Agent loop started"
|
|
_LAST_ACTION_CONTROL_RE = re.compile(r"[\u0000-\u0008\u000B\u000C\u000E-\u001F\u007F]")
|
|
|
|
|
|
def set_main_loop(loop: Optional[asyncio.AbstractEventLoop]) -> None:
|
|
global _main_loop
|
|
_main_loop = loop
|
|
|
|
|
|
def get_main_loop() -> Optional[asyncio.AbstractEventLoop]:
|
|
return _main_loop
|
|
|
|
|
|
def _queue_runtime_broadcast(bot_id: str, packet: Dict[str, Any]) -> None:
|
|
loop = get_main_loop()
|
|
if not loop or not loop.is_running():
|
|
return
|
|
asyncio.run_coroutine_threadsafe(manager.broadcast(bot_id, packet), loop)
|
|
|
|
|
|
def broadcast_runtime_packet(bot_id: str, packet: Dict[str, Any]) -> None:
|
|
_queue_runtime_broadcast(bot_id, packet)
|
|
|
|
|
|
def _normalize_packet_channel(packet: Dict[str, Any]) -> str:
|
|
raw = str(packet.get("channel") or packet.get("source") or "").strip().lower()
|
|
if raw in {"dashboard", "dashboard_channel", "dashboard-channel"}:
|
|
return "dashboard"
|
|
return raw
|
|
|
|
|
|
def _normalize_media_item(bot_id: str, value: Any) -> str:
|
|
raw = str(value or "").strip().replace("\\", "/")
|
|
if not raw:
|
|
return ""
|
|
if raw.startswith("/root/.nanobot/workspace/"):
|
|
return raw[len("/root/.nanobot/workspace/") :].lstrip("/")
|
|
root = get_bot_workspace_root(bot_id)
|
|
if os.path.isabs(raw):
|
|
try:
|
|
if os.path.commonpath([root, raw]) == root:
|
|
return os.path.relpath(raw, root).replace("\\", "/")
|
|
except Exception:
|
|
pass
|
|
return raw.lstrip("/")
|
|
|
|
|
|
def _normalize_media_list(raw: Any, bot_id: str) -> List[str]:
|
|
if not isinstance(raw, list):
|
|
return []
|
|
rows: List[str] = []
|
|
for value in raw:
|
|
normalized = _normalize_media_item(bot_id, value)
|
|
if normalized:
|
|
rows.append(normalized)
|
|
return rows
|
|
|
|
|
|
def _normalize_last_action_text(value: Any) -> str:
|
|
text = str(value or "")
|
|
if not text:
|
|
return ""
|
|
text = _LAST_ACTION_CONTROL_RE.sub("", text)
|
|
text = text.replace("\r\n", "\n").replace("\r", "\n")
|
|
text = "\n".join(line.rstrip() for line in text.split("\n"))
|
|
text = re.sub(r"\n{4,}", "\n\n\n", text).strip()
|
|
return text[:4000]
|
|
|
|
|
|
def _persist_runtime_packet(bot_id: str, packet: Dict[str, Any]) -> Optional[int]:
|
|
packet_type = str(packet.get("type", "")).upper()
|
|
if packet_type not in {"AGENT_STATE", "ASSISTANT_MESSAGE", "USER_COMMAND", "BUS_EVENT"}:
|
|
return None
|
|
|
|
source_channel = _normalize_packet_channel(packet)
|
|
if source_channel != "dashboard":
|
|
return None
|
|
|
|
persisted_message_id: Optional[int] = None
|
|
with Session(engine) as session:
|
|
bot = session.get(BotInstance, bot_id)
|
|
if not bot:
|
|
return None
|
|
|
|
if packet_type == "AGENT_STATE":
|
|
payload = packet.get("payload") or {}
|
|
state = str(payload.get("state") or "").strip()
|
|
action = _normalize_last_action_text(payload.get("action_msg") or payload.get("msg") or "")
|
|
if state:
|
|
bot.current_state = state
|
|
if action:
|
|
bot.last_action = action
|
|
elif packet_type == "ASSISTANT_MESSAGE":
|
|
bot.current_state = "IDLE"
|
|
text_msg = str(packet.get("text") or "").strip()
|
|
media_list = _normalize_media_list(packet.get("media"), bot_id)
|
|
if text_msg or media_list:
|
|
if text_msg:
|
|
bot.last_action = _normalize_last_action_text(text_msg)
|
|
message_row = BotMessage(
|
|
bot_id=bot_id,
|
|
role="assistant",
|
|
text=text_msg,
|
|
media_json=json.dumps(media_list, ensure_ascii=False) if media_list else None,
|
|
)
|
|
session.add(message_row)
|
|
session.flush()
|
|
persisted_message_id = message_row.id
|
|
finalize_usage_from_packet(
|
|
session,
|
|
bot_id,
|
|
{
|
|
**packet,
|
|
"message_id": persisted_message_id,
|
|
},
|
|
)
|
|
elif packet_type == "USER_COMMAND":
|
|
text_msg = str(packet.get("text") or "").strip()
|
|
media_list = _normalize_media_list(packet.get("media"), bot_id)
|
|
if text_msg or media_list:
|
|
message_row = BotMessage(
|
|
bot_id=bot_id,
|
|
role="user",
|
|
text=text_msg,
|
|
media_json=json.dumps(media_list, ensure_ascii=False) if media_list else None,
|
|
)
|
|
session.add(message_row)
|
|
session.flush()
|
|
persisted_message_id = message_row.id
|
|
bind_usage_message(
|
|
session,
|
|
bot_id,
|
|
str(packet.get("request_id") or "").strip(),
|
|
persisted_message_id,
|
|
)
|
|
elif packet_type == "BUS_EVENT":
|
|
is_progress = bool(packet.get("is_progress"))
|
|
detail_text = str(packet.get("content") or packet.get("text") or "").strip()
|
|
if not is_progress:
|
|
text_msg = detail_text
|
|
media_list = _normalize_media_list(packet.get("media"), bot_id)
|
|
if text_msg or media_list:
|
|
bot.current_state = "IDLE"
|
|
if text_msg:
|
|
bot.last_action = _normalize_last_action_text(text_msg)
|
|
message_row = BotMessage(
|
|
bot_id=bot_id,
|
|
role="assistant",
|
|
text=text_msg,
|
|
media_json=json.dumps(media_list, ensure_ascii=False) if media_list else None,
|
|
)
|
|
session.add(message_row)
|
|
session.flush()
|
|
persisted_message_id = message_row.id
|
|
finalize_usage_from_packet(
|
|
session,
|
|
bot_id,
|
|
{
|
|
"text": text_msg,
|
|
"usage": packet.get("usage"),
|
|
"request_id": packet.get("request_id"),
|
|
"provider": packet.get("provider"),
|
|
"model": packet.get("model"),
|
|
"message_id": persisted_message_id,
|
|
},
|
|
)
|
|
|
|
bot.updated_at = datetime.utcnow()
|
|
session.add(bot)
|
|
session.commit()
|
|
|
|
publish_runtime_topic_packet(
|
|
engine,
|
|
bot_id,
|
|
packet,
|
|
source_channel,
|
|
persisted_message_id,
|
|
logger,
|
|
)
|
|
|
|
if persisted_message_id:
|
|
packet["message_id"] = persisted_message_id
|
|
if packet_type in {"ASSISTANT_MESSAGE", "USER_COMMAND", "BUS_EVENT"}:
|
|
_invalidate_bot_messages_cache(bot_id)
|
|
_invalidate_bot_detail_cache(bot_id)
|
|
return persisted_message_id
|
|
|
|
|
|
def persist_runtime_packet(bot_id: str, packet: Dict[str, Any]) -> Optional[int]:
|
|
return _persist_runtime_packet(bot_id, packet)
|
|
|
|
|
|
def docker_callback(bot_id: str, packet: Dict[str, Any]) -> None:
|
|
packet_type = str(packet.get("type", "")).upper()
|
|
if packet_type == "RAW_LOG":
|
|
_queue_runtime_broadcast(bot_id, packet)
|
|
return
|
|
|
|
persisted_message_id = _persist_runtime_packet(bot_id, packet)
|
|
if persisted_message_id:
|
|
packet["message_id"] = persisted_message_id
|
|
_queue_runtime_broadcast(bot_id, packet)
|
|
|
|
|
|
async def _wait_for_agent_loop_ready(
|
|
bot_id: str,
|
|
timeout_seconds: float = 12.0,
|
|
poll_interval_seconds: float = 0.5,
|
|
) -> bool:
|
|
deadline = time.monotonic() + max(1.0, timeout_seconds)
|
|
marker = _AGENT_LOOP_READY_MARKER.lower()
|
|
while time.monotonic() < deadline:
|
|
logs = docker_manager.get_recent_logs(bot_id, tail=200)
|
|
if any(marker in str(line or "").lower() for line in logs):
|
|
return True
|
|
await asyncio.sleep(max(0.1, poll_interval_seconds))
|
|
return False
|
|
|
|
|
|
async def _record_agent_loop_ready_warning(
|
|
bot_id: str,
|
|
timeout_seconds: float = 12.0,
|
|
poll_interval_seconds: float = 0.5,
|
|
) -> None:
|
|
try:
|
|
agent_loop_ready = await _wait_for_agent_loop_ready(
|
|
bot_id,
|
|
timeout_seconds=timeout_seconds,
|
|
poll_interval_seconds=poll_interval_seconds,
|
|
)
|
|
if agent_loop_ready:
|
|
return
|
|
if docker_manager.get_bot_status(bot_id) != "RUNNING":
|
|
return
|
|
|
|
detail = (
|
|
"Bot container started, but ready marker was not found in logs within "
|
|
f"{int(timeout_seconds)}s. Check bot logs or MCP config if the bot stays unavailable."
|
|
)
|
|
logger.warning("bot_id=%s agent loop ready marker not found within %ss", bot_id, timeout_seconds)
|
|
with Session(engine) as background_session:
|
|
if not background_session.get(BotInstance, bot_id):
|
|
return
|
|
record_activity_event(
|
|
background_session,
|
|
bot_id,
|
|
"bot_warning",
|
|
channel="system",
|
|
detail=detail,
|
|
metadata={
|
|
"kind": "agent_loop_ready_timeout",
|
|
"marker": _AGENT_LOOP_READY_MARKER,
|
|
"timeout_seconds": timeout_seconds,
|
|
},
|
|
)
|
|
background_session.commit()
|
|
_invalidate_bot_detail_cache(bot_id)
|
|
except Exception:
|
|
logger.exception("Failed to record agent loop readiness warning for bot_id=%s", bot_id)
|
|
|
|
|
|
async def record_agent_loop_ready_warning(
|
|
bot_id: str,
|
|
timeout_seconds: float = 12.0,
|
|
poll_interval_seconds: float = 0.5,
|
|
) -> None:
|
|
await _record_agent_loop_ready_warning(
|
|
bot_id,
|
|
timeout_seconds=timeout_seconds,
|
|
poll_interval_seconds=poll_interval_seconds,
|
|
)
|