dashboard-nanobot/backend/services/workspace_preview_token_ser...

96 lines
3.1 KiB
Python

import base64
import hashlib
import hmac
import json
import time
from typing import Any, Dict, Optional
from core.settings import WORKSPACE_PREVIEW_SIGNING_SECRET, WORKSPACE_PREVIEW_TOKEN_TTL_SECONDS
HTML_PREVIEW_EXTENSIONS = {".html", ".htm"}
def _b64url_encode(raw: bytes) -> str:
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
def _b64url_decode(raw: str) -> bytes:
padding = "=" * (-len(raw) % 4)
return base64.urlsafe_b64decode(f"{raw}{padding}".encode("ascii"))
def normalize_workspace_preview_path(path: str) -> str:
return "/".join(part for part in str(path or "").strip().replace("\\", "/").split("/") if part)
def is_html_preview_path(path: str) -> bool:
normalized = normalize_workspace_preview_path(path).lower()
return any(normalized.endswith(ext) for ext in HTML_PREVIEW_EXTENSIONS)
def create_workspace_preview_token(bot_id: str, path: str, ttl_seconds: Optional[int] = None) -> Dict[str, Any]:
normalized_bot_id = str(bot_id or "").strip()
normalized_path = normalize_workspace_preview_path(path)
ttl = max(60, min(int(ttl_seconds or WORKSPACE_PREVIEW_TOKEN_TTL_SECONDS), 86400))
expires_at = int(time.time()) + ttl
payload = {
"bot_id": normalized_bot_id,
"entry_path": normalized_path,
"exp": expires_at,
"kind": "workspace-preview-session",
}
payload_json = json.dumps(payload, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8")
body = _b64url_encode(payload_json)
signature = hmac.new(
WORKSPACE_PREVIEW_SIGNING_SECRET.encode("utf-8"),
body.encode("ascii"),
hashlib.sha256,
).digest()
return {
"token": f"{body}.{_b64url_encode(signature)}",
"expires_at": expires_at,
"ttl_seconds": ttl,
}
def resolve_workspace_preview_token(token: str) -> Optional[Dict[str, Any]]:
raw_token = str(token or "").strip()
if not raw_token or "." not in raw_token:
return None
body, signature_raw = raw_token.split(".", 1)
expected_signature = hmac.new(
WORKSPACE_PREVIEW_SIGNING_SECRET.encode("utf-8"),
body.encode("ascii"),
hashlib.sha256,
).digest()
try:
provided_signature = _b64url_decode(signature_raw)
except Exception:
return None
if not hmac.compare_digest(expected_signature, provided_signature):
return None
try:
payload = json.loads(_b64url_decode(body).decode("utf-8"))
except Exception:
return None
if not isinstance(payload, dict):
return None
if str(payload.get("kind") or "") != "workspace-preview-session":
return None
bot_id = str(payload.get("bot_id") or "").strip()
entry_path = normalize_workspace_preview_path(str(payload.get("entry_path") or ""))
if not bot_id or not is_html_preview_path(entry_path):
return None
try:
expires_at = int(payload.get("exp") or 0)
except Exception:
return None
if expires_at < int(time.time()):
return None
return {
"bot_id": bot_id,
"entry_path": entry_path,
"expires_at": expires_at,
}