from datetime import timedelta from typing import Any, Dict from sqlmodel import Session, select from models.platform import BotRequestUsage from schemas.platform import ( PlatformActivityItem, PlatformDashboardAnalyticsResponse, PlatformDashboardUsagePoint, PlatformDashboardUsageSeries, ) from services.platform_activity_service import list_activity_events from services.platform_common import utcnow from services.platform_settings_service import get_platform_settings def build_dashboard_analytics( session: Session, *, since_days: int = 7, events_limit: int = 20, ) -> Dict[str, Any]: safe_since_days = max(1, min(int(since_days or 7), 30)) safe_events_limit = max(1, min(int(events_limit or get_platform_settings(session).page_size), 100)) granularity = "hour" if safe_since_days <= 2 else "day" now = utcnow() if granularity == "hour": current_bucket = now.replace(minute=0, second=0, microsecond=0) bucket_starts = [current_bucket - timedelta(hours=index) for index in range(max(1, safe_since_days * 24))] bucket_starts.reverse() label_format = "%m-%d %H:00" else: current_bucket = now.replace(hour=0, minute=0, second=0, microsecond=0) bucket_starts = [current_bucket - timedelta(days=index) for index in range(safe_since_days)] bucket_starts.reverse() label_format = "%m-%d" bucket_index: Dict[str, int] = {} points_template: list[PlatformDashboardUsagePoint] = [] for index, bucket_start in enumerate(bucket_starts): bucket_key = bucket_start.isoformat() bucket_index[bucket_key] = index points_template.append( PlatformDashboardUsagePoint( bucket_at=bucket_start.isoformat() + "Z", label=bucket_start.strftime(label_format), call_count=0, ) ) since = bucket_starts[0] if bucket_starts else now - timedelta(days=safe_since_days) rows = session.exec( select(BotRequestUsage) .where(BotRequestUsage.started_at >= since) .order_by(BotRequestUsage.started_at.asc(), BotRequestUsage.id.asc()) ).all() series_map: Dict[str, PlatformDashboardUsageSeries] = {} total_request_count = 0 for row in rows: total_request_count += 1 model_name = str(row.model or row.provider or "Unknown").strip() or "Unknown" point_time = row.started_at or row.created_at or now if granularity == "hour": bucket_start = point_time.replace(minute=0, second=0, microsecond=0) else: bucket_start = point_time.replace(hour=0, minute=0, second=0, microsecond=0) bucket_key = bucket_start.isoformat() bucket_position = bucket_index.get(bucket_key) if bucket_position is None: continue if model_name not in series_map: series_map[model_name] = PlatformDashboardUsageSeries( model=model_name, total_calls=0, points=[ PlatformDashboardUsagePoint.model_validate(point.model_dump()) for point in points_template ], ) series = series_map[model_name] series.total_calls += 1 series.points[bucket_position].call_count += 1 ordered_series = sorted( series_map.values(), key=lambda item: (-int(item.total_calls or 0), str(item.model or "").lower()), )[:8] return PlatformDashboardAnalyticsResponse( total_request_count=total_request_count, total_model_count=len(series_map), granularity=granularity, since_days=safe_since_days, events_page_size=safe_events_limit, series=ordered_series, recent_events=[ PlatformActivityItem.model_validate(item) for item in (list_activity_events(session, limit=safe_events_limit, offset=0).get("items") or []) ], ).model_dump()