85 lines
2.9 KiB
Python
85 lines
2.9 KiB
Python
import logging
|
|
import threading
|
|
import time
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from fastapi import HTTPException
|
|
|
|
_OFFLINE_LOG_LOCK = threading.Lock()
|
|
_OFFLINE_LOGGED_AT: dict[str, float] = {}
|
|
_DEFAULT_LOG_COOLDOWN_SECONDS = 60.0
|
|
|
|
|
|
def describe_edge_node(node: Any) -> str:
|
|
display_name = str(getattr(node, "display_name", "") or "").strip()
|
|
node_id = str(getattr(node, "node_id", "") or "").strip()
|
|
if display_name and node_id and display_name != node_id:
|
|
return f"{display_name} ({node_id})"
|
|
return display_name or node_id or "unknown edge node"
|
|
|
|
|
|
def summarize_edge_exception(exc: Exception) -> str:
|
|
detail = getattr(exc, "detail", None)
|
|
text = str(detail if detail is not None else exc).strip()
|
|
if not text:
|
|
return exc.__class__.__name__
|
|
return text[:400]
|
|
|
|
|
|
def edge_transport_http_exception(exc: httpx.RequestError, *, node: Any) -> HTTPException:
|
|
node_label = describe_edge_node(node)
|
|
if isinstance(exc, httpx.TimeoutException):
|
|
detail = f"dashboard-edge timed out for node {node_label}"
|
|
else:
|
|
reason = str(exc).strip() or exc.__class__.__name__
|
|
detail = f"dashboard-edge is unreachable for node {node_label}: {reason}"
|
|
return HTTPException(status_code=502, detail=detail[:400])
|
|
|
|
|
|
def is_expected_edge_offline_error(exc: Exception) -> bool:
|
|
if isinstance(exc, httpx.RequestError):
|
|
return True
|
|
if not isinstance(exc, HTTPException):
|
|
return False
|
|
if int(getattr(exc, "status_code", 0) or 0) not in {502, 503, 504}:
|
|
return False
|
|
detail = summarize_edge_exception(exc).lower()
|
|
markers = (
|
|
"dashboard-edge is unreachable",
|
|
"dashboard-edge timed out",
|
|
"connection refused",
|
|
"request failed before receiving a response",
|
|
"name or service not known",
|
|
"nodename nor servname provided",
|
|
"temporary failure in name resolution",
|
|
)
|
|
return any(marker in detail for marker in markers)
|
|
|
|
|
|
def log_edge_failure(
|
|
logger: logging.Logger,
|
|
*,
|
|
key: str,
|
|
exc: Exception,
|
|
message: str,
|
|
cooldown_seconds: float = _DEFAULT_LOG_COOLDOWN_SECONDS,
|
|
) -> None:
|
|
detail = summarize_edge_exception(exc)
|
|
if is_expected_edge_offline_error(exc):
|
|
if _should_emit_offline_log(key=key, cooldown_seconds=cooldown_seconds):
|
|
logger.info("%s detail=%s", message, detail)
|
|
return
|
|
logger.exception("%s detail=%s", message, detail)
|
|
|
|
|
|
def _should_emit_offline_log(*, key: str, cooldown_seconds: float) -> bool:
|
|
now = time.monotonic()
|
|
normalized_key = str(key or "edge-offline").strip() or "edge-offline"
|
|
with _OFFLINE_LOG_LOCK:
|
|
last_logged_at = _OFFLINE_LOGGED_AT.get(normalized_key, 0.0)
|
|
if now - last_logged_at < max(1.0, float(cooldown_seconds or _DEFAULT_LOG_COOLDOWN_SECONDS)):
|
|
return False
|
|
_OFFLINE_LOGGED_AT[normalized_key] = now
|
|
return True
|