dashboard-nanobot/backend/services/platform_analytics_service.py

105 lines
3.9 KiB
Python

from datetime import timedelta
from typing import Any, Dict
from sqlmodel import Session, select
from models.platform import BotRequestUsage
from schemas.platform import (
PlatformActivityItem,
PlatformDashboardAnalyticsResponse,
PlatformDashboardUsagePoint,
PlatformDashboardUsageSeries,
)
from services.platform_activity_service import list_activity_events
from services.platform_common import utcnow
from services.platform_settings_service import get_platform_settings
def build_dashboard_analytics(
session: Session,
*,
since_days: int = 7,
events_limit: int = 20,
) -> Dict[str, Any]:
safe_since_days = max(1, min(int(since_days or 7), 30))
safe_events_limit = max(1, min(int(events_limit or get_platform_settings(session).page_size), 100))
granularity = "hour" if safe_since_days <= 2 else "day"
now = utcnow()
if granularity == "hour":
current_bucket = now.replace(minute=0, second=0, microsecond=0)
bucket_starts = [current_bucket - timedelta(hours=index) for index in range(max(1, safe_since_days * 24))]
bucket_starts.reverse()
label_format = "%m-%d %H:00"
else:
current_bucket = now.replace(hour=0, minute=0, second=0, microsecond=0)
bucket_starts = [current_bucket - timedelta(days=index) for index in range(safe_since_days)]
bucket_starts.reverse()
label_format = "%m-%d"
bucket_index: Dict[str, int] = {}
points_template: list[PlatformDashboardUsagePoint] = []
for index, bucket_start in enumerate(bucket_starts):
bucket_key = bucket_start.isoformat()
bucket_index[bucket_key] = index
points_template.append(
PlatformDashboardUsagePoint(
bucket_at=bucket_start.isoformat() + "Z",
label=bucket_start.strftime(label_format),
call_count=0,
)
)
since = bucket_starts[0] if bucket_starts else now - timedelta(days=safe_since_days)
rows = session.exec(
select(BotRequestUsage)
.where(BotRequestUsage.started_at >= since)
.order_by(BotRequestUsage.started_at.asc(), BotRequestUsage.id.asc())
).all()
series_map: Dict[str, PlatformDashboardUsageSeries] = {}
total_request_count = 0
for row in rows:
total_request_count += 1
model_name = str(row.model or row.provider or "Unknown").strip() or "Unknown"
point_time = row.started_at or row.created_at or now
if granularity == "hour":
bucket_start = point_time.replace(minute=0, second=0, microsecond=0)
else:
bucket_start = point_time.replace(hour=0, minute=0, second=0, microsecond=0)
bucket_key = bucket_start.isoformat()
bucket_position = bucket_index.get(bucket_key)
if bucket_position is None:
continue
if model_name not in series_map:
series_map[model_name] = PlatformDashboardUsageSeries(
model=model_name,
total_calls=0,
points=[
PlatformDashboardUsagePoint.model_validate(point.model_dump())
for point in points_template
],
)
series = series_map[model_name]
series.total_calls += 1
series.points[bucket_position].call_count += 1
ordered_series = sorted(
series_map.values(),
key=lambda item: (-int(item.total_calls or 0), str(item.model or "").lower()),
)[:8]
return PlatformDashboardAnalyticsResponse(
total_request_count=total_request_count,
total_model_count=len(series_map),
granularity=granularity,
since_days=safe_since_days,
events_page_size=safe_events_limit,
series=ordered_series,
recent_events=[
PlatformActivityItem.model_validate(item)
for item in (list_activity_events(session, limit=safe_events_limit, offset=0).get("items") or [])
],
).model_dump()