""" Backend-integrated MCP Streamable HTTP server. """ from __future__ import annotations from datetime import datetime import hmac import json import logging from typing import Any, Dict, Optional from fastapi.responses import JSONResponse from starlette.datastructures import Headers try: from mcp.server.fastmcp import FastMCP except ImportError: # pragma: no cover - runtime dependency FastMCP = None from app.core.config import APP_CONFIG, API_CONFIG from app.core.database import get_db_connection from app.mcp.context import ( MCPRequestContext, require_mcp_request, reset_current_mcp_request, set_current_mcp_request, ) from app.services import meeting_service logger = logging.getLogger(__name__) def _build_absolute_url(path: str) -> str: normalized_base = APP_CONFIG["base_url"].rstrip("/") normalized_path = path if path.startswith("/") else f"/{path}" return f"{normalized_base}{normalized_path}" def _parse_api_response(response: JSONResponse) -> Dict[str, Any]: body = response.body.decode("utf-8") if isinstance(response.body, (bytes, bytearray)) else response.body return json.loads(body) def _load_user_meetings(current_user: Dict[str, Any], filter_type: str) -> list[Dict[str, Any]]: meetings: list[Dict[str, Any]] = [] page = 1 page_size = 100 while True: payload = _parse_api_response( meeting_service.get_meetings( current_user=current_user, user_id=current_user["user_id"], page=page, page_size=page_size, filter_type=filter_type, ) ) if payload.get("code") != "200": raise ValueError(payload.get("message") or "获取会议列表失败") data = payload.get("data") or {} meetings.extend(data.get("meetings") or []) if not data.get("has_more"): break page += 1 return meetings def _find_user_meeting(current_user: Dict[str, Any], meeting_id: int) -> Dict[str, Any]: for filter_type in ("created", "attended"): for meeting in _load_user_meetings(current_user, filter_type): if int(meeting.get("meeting_id")) == int(meeting_id): return meeting raise ValueError("会议不存在,或当前账号无权访问该会议") def _get_meeting_preview_payload(meeting_id: int, access_password: Optional[str]) -> Dict[str, Any]: return _parse_api_response( meeting_service.get_meeting_preview_data( meeting_id=meeting_id, password=access_password, ) ) def _get_mcp_user(bot_id: str, bot_secret: str) -> Optional[Dict[str, Any]]: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute( """ SELECT m.id, m.user_id, m.bot_id, m.bot_secret, m.status, u.username, u.caption, u.email, u.role_id FROM sys_user_mcp m JOIN sys_users u ON m.user_id = u.user_id WHERE m.bot_id = %s LIMIT 1 """, (bot_id,), ) record = cursor.fetchone() if not record: return None if int(record.get("status") or 0) != 1: return None if not hmac.compare_digest(str(record.get("bot_secret") or ""), bot_secret): return None cursor.execute( "UPDATE sys_user_mcp SET last_used_at = NOW() WHERE id = %s", (record["id"],), ) connection.commit() return { "credential_id": record["id"], "bot_id": record["bot_id"], "user": { "user_id": record["user_id"], "username": record["username"], "caption": record["caption"], "email": record["email"], "role_id": record["role_id"], }, } class MCPHeaderAuthApp: def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope["type"] != "http": await self.app(scope, receive, send) return headers = Headers(scope=scope) bot_id = (headers.get("x-bot-id") or "").strip() bot_secret = (headers.get("x-bot-secret") or "").strip() if not bot_id or not bot_secret: response = JSONResponse( status_code=401, content={"error": "Missing X-Bot-Id or X-Bot-Secret"}, ) await response(scope, receive, send) return auth_record = _get_mcp_user(bot_id, bot_secret) if not auth_record: response = JSONResponse( status_code=401, content={"error": "Invalid MCP credentials"}, ) await response(scope, receive, send) return token = set_current_mcp_request( MCPRequestContext( user=auth_record["user"], bot_id=auth_record["bot_id"], credential_id=auth_record["credential_id"], authenticated_at=datetime.utcnow(), ) ) try: await self.app(scope, receive, send) finally: reset_current_mcp_request(token) _mcp_http_app = None if FastMCP is not None: mcp_server = FastMCP( "iMeeting MCP", json_response=True, stateless_http=True, streamable_http_path="/", ) # Keep FastMCP's server settings aligned with the outer Uvicorn process. mcp_server.settings.host = API_CONFIG["host"] mcp_server.settings.port = API_CONFIG["port"] @mcp_server.tool() def get_my_meetings() -> Dict[str, Any]: """获取当前登录用户的会议列表,并按我创建/我参加分开返回。""" current_user = require_mcp_request().user created_meetings = _load_user_meetings(current_user, "created") attended_meetings = _load_user_meetings(current_user, "attended") return { "user": { "user_id": current_user["user_id"], "username": current_user.get("username"), "caption": current_user.get("caption"), }, "created_meetings": created_meetings, "attended_meetings": attended_meetings, "counts": { "created": len(created_meetings), "attended": len(attended_meetings), "all": len(created_meetings) + len(attended_meetings), }, } @mcp_server.tool() def get_meeting_preview_url(meeting_id: int) -> Dict[str, Any]: """获取指定会议的预览地址。""" current_user = require_mcp_request().user meeting = _find_user_meeting(current_user, meeting_id) preview_payload = _get_meeting_preview_payload(meeting_id, meeting.get("access_password")) if preview_payload.get("code") != "200": return { "meeting_id": meeting_id, "title": meeting.get("title"), "message": preview_payload.get("message"), "status": (preview_payload.get("data") or {}).get("processing_status"), } return { "meeting_id": meeting_id, "title": meeting.get("title"), "preview_url": _build_absolute_url(f"/meetings/preview/{meeting_id}"), "requires_password": bool(meeting.get("access_password")), "access_password": meeting.get("access_password"), "preview_data": preview_payload.get("data"), } else: # pragma: no cover - graceful fallback without runtime dependency mcp_server = None logger.warning("FastMCP is unavailable because the 'mcp' package is not installed; /mcp will not be mounted.") def get_mcp_server(): return mcp_server def get_mcp_session_manager(): if create_mcp_http_app() is None: return None return mcp_server.session_manager def create_mcp_http_app(): global _mcp_http_app if mcp_server is None: return None if _mcp_http_app is None: # FastMCP initializes its session manager when the Streamable HTTP app # is created, so cache the app and reuse it across startup/mounting. _mcp_http_app = MCPHeaderAuthApp(mcp_server.streamable_http_app()) return _mcp_http_app def get_mcp_asgi_app(): return create_mcp_http_app()