import logging import os from typing import Any, Dict, List from fastapi import HTTPException from sqlmodel import Session from core.docker_instance import docker_manager from core.utils import _is_video_attachment_path, _is_visual_attachment_path from models.bot import BotInstance from services.bot_service import read_bot_runtime_snapshot from services.platform_activity_service import record_activity_event from services.platform_usage_service import create_usage_request, fail_latest_usage from services.runtime_service import broadcast_runtime_packet, persist_runtime_packet from services.workspace_service import resolve_workspace_path logger = logging.getLogger("dashboard.backend") def _normalize_message_media_item(value: Any) -> str: return str(value or "").strip().replace("\\", "/").lstrip("/") def _normalize_message_media_list(raw: Any) -> List[str]: if not isinstance(raw, list): return [] rows: List[str] = [] for value in raw: normalized = _normalize_message_media_item(value) if normalized: rows.append(normalized) return rows def _build_delivery_command(command: str, checked_attachments: List[str]) -> str: if not checked_attachments: return command attachment_block = "\n".join(f"- {path}" for path in checked_attachments) if all(_is_visual_attachment_path(path) for path in checked_attachments): has_video = any(_is_video_attachment_path(path) for path in checked_attachments) media_label = "图片/视频" if has_video else "图片" capability_hint = ( "1) 附件已随请求附带;图片在可用时可直接作为多模态输入理解,视频请按附件路径处理。\n" if has_video else "1) 附件中的图片已作为多模态输入提供,优先直接理解并回答。\n" ) if command: return ( f"{command}\n\n" "[Attached files]\n" f"{attachment_block}\n\n" "【附件处理要求】\n" f"{capability_hint}" "2) 若当前模型或接口不支持直接理解该附件,请明确说明后再调用工具解析。\n" "3) 除非用户明确要求,不要先调用工具读取附件文件。\n" "4) 回复语言必须遵循 USER.md;若未指定,则与用户当前输入语言保持一致。\n" "5) 仅基于可见内容回答;看不清或无法确认的部分请明确说明,不要猜测。" ) return ( "请先处理已附带的附件列表:\n" f"{attachment_block}\n\n" f"请直接分析已附带的{media_label}并总结关键信息。\n" f"{'图片在可用时可直接作为多模态输入理解,视频请按附件路径处理。' if has_video else ''}\n" "若当前模型或接口不支持直接理解该附件,请明确说明后再调用工具解析。\n" "回复语言必须遵循 USER.md;若未指定,则与用户当前输入语言保持一致。\n" "仅基于可见内容回答;看不清或无法确认的部分请明确说明,不要猜测。" ) command_has_paths = all(path in command for path in checked_attachments) if command else False if command and not command_has_paths: return ( f"{command}\n\n" "[Attached files]\n" f"{attachment_block}\n\n" "Please process the attached file(s) listed above when answering this request.\n" "Reply language must follow USER.md. If not specified, use the same language as the user input." ) if not command: return ( "Please process the uploaded file(s) listed below:\n" f"{attachment_block}\n\n" "Reply language must follow USER.md. If not specified, use the same language as the user input." ) return command def send_bot_command(session: Session, bot_id: str, command: str, attachments: Any) -> Dict[str, Any]: request_id = "" try: bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") runtime_snapshot = read_bot_runtime_snapshot(bot) normalized_attachments = _normalize_message_media_list(attachments) text_command = str(command or "").strip() if not text_command and not normalized_attachments: raise HTTPException(status_code=400, detail="Command or attachments is required") checked_attachments: List[str] = [] for rel_path in normalized_attachments: _, target = resolve_workspace_path(bot_id, rel_path) if not os.path.isfile(target): raise HTTPException(status_code=400, detail=f"attachment not found: {rel_path}") checked_attachments.append(rel_path) delivery_media = [f"/root/.nanobot/workspace/{path.lstrip('/')}" for path in checked_attachments] display_command = text_command if text_command else "[attachment message]" delivery_command = _build_delivery_command(text_command, checked_attachments) request_id = create_usage_request( session, bot_id, display_command, attachments=checked_attachments, channel="dashboard", metadata={"attachment_count": len(checked_attachments)}, provider=str(runtime_snapshot.get("llm_provider") or "").strip() or None, model=str(runtime_snapshot.get("llm_model") or "").strip() or None, ) record_activity_event( session, bot_id, "command_submitted", request_id=request_id, channel="dashboard", detail="command submitted", metadata={"attachment_count": len(checked_attachments), "has_text": bool(text_command)}, ) session.commit() outbound_user_packet: Dict[str, Any] | None = None if display_command or checked_attachments: outbound_user_packet = { "type": "USER_COMMAND", "channel": "dashboard", "text": display_command, "media": checked_attachments, "request_id": request_id, } persist_runtime_packet(bot_id, outbound_user_packet) if outbound_user_packet: broadcast_runtime_packet(bot_id, outbound_user_packet) success = docker_manager.send_command(bot_id, delivery_command, media=delivery_media) if success: return {"success": True} detail = docker_manager.get_last_delivery_error(bot_id) fail_latest_usage(session, bot_id, detail or "command delivery failed") record_activity_event( session, bot_id, "command_failed", request_id=request_id, channel="dashboard", detail=(detail or "command delivery failed")[:400], ) session.commit() broadcast_runtime_packet( bot_id, { "type": "AGENT_STATE", "channel": "dashboard", "payload": { "state": "ERROR", "action_msg": detail or "command delivery failed", }, }, ) raise HTTPException( status_code=502, detail=f"Failed to deliver command to bot dashboard channel{': ' + detail if detail else ''}", ) except HTTPException: raise except Exception as exc: logger.exception("send_bot_command failed for bot_id=%s", bot_id) try: session.rollback() except Exception: pass if request_id: try: fail_latest_usage(session, bot_id, str(exc)) record_activity_event( session, bot_id, "command_failed", request_id=request_id, channel="dashboard", detail=str(exc)[:400], ) session.commit() except Exception: try: session.rollback() except Exception: pass raise HTTPException(status_code=500, detail=f"Failed to process bot command: {exc}") from exc