from __future__ import annotations from contextvars import ContextVar, Token from dataclasses import dataclass from datetime import datetime from typing import Any, Dict, Optional @dataclass(slots=True) class MCPRequestContext: user: Dict[str, Any] bot_id: str credential_id: int authenticated_at: datetime current_mcp_request: ContextVar[Optional[MCPRequestContext]] = ContextVar( "current_mcp_request", default=None, ) def set_current_mcp_request(request: MCPRequestContext) -> Token: return current_mcp_request.set(request) def reset_current_mcp_request(token: Token) -> None: current_mcp_request.reset(token) def require_mcp_request() -> MCPRequestContext: request = current_mcp_request.get() if request is None: raise RuntimeError("MCP request context is unavailable") return request