import json from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from sqlalchemy import delete as sql_delete, func from sqlmodel import Session, select from models.platform import BotActivityEvent from schemas.platform import PlatformActivityItem, PlatformActivityListResponse from services.platform_common import utcnow from services.platform_settings_service import get_activity_event_retention_days ACTIVITY_EVENT_PRUNE_INTERVAL = timedelta(minutes=10) OPERATIONAL_ACTIVITY_EVENT_TYPES = { "bot_created", "bot_deployed", "bot_started", "bot_stopped", "bot_warning", "bot_enabled", "bot_disabled", "bot_deactivated", "command_submitted", "command_failed", "history_cleared", } _last_activity_event_prune_at: Optional[datetime] = None def prune_expired_activity_events(session: Session, force: bool = False) -> int: global _last_activity_event_prune_at now = utcnow() if not force and _last_activity_event_prune_at and now - _last_activity_event_prune_at < ACTIVITY_EVENT_PRUNE_INTERVAL: return 0 retention_days = get_activity_event_retention_days(session) cutoff = now - timedelta(days=retention_days) result = session.exec(sql_delete(BotActivityEvent).where(BotActivityEvent.created_at < cutoff)) _last_activity_event_prune_at = now return int(getattr(result, "rowcount", 0) or 0) def record_activity_event( session: Session, bot_id: str, event_type: str, request_id: Optional[str] = None, channel: str = "dashboard", detail: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ) -> None: normalized_event_type = str(event_type or "unknown").strip().lower() or "unknown" if normalized_event_type not in OPERATIONAL_ACTIVITY_EVENT_TYPES: return prune_expired_activity_events(session, force=False) row = BotActivityEvent( bot_id=bot_id, request_id=request_id, event_type=normalized_event_type, channel=str(channel or "dashboard").strip().lower() or "dashboard", detail=(str(detail or "").strip() or None), metadata_json=json.dumps(metadata or {}, ensure_ascii=False) if metadata else None, created_at=utcnow(), ) session.add(row) def list_activity_events( session: Session, bot_id: Optional[str] = None, limit: int = 100, offset: int = 0, ) -> Dict[str, Any]: deleted = prune_expired_activity_events(session, force=False) if deleted > 0: session.commit() safe_limit = max(1, min(int(limit), 500)) safe_offset = max(0, int(offset or 0)) stmt = ( select(BotActivityEvent) .order_by(BotActivityEvent.created_at.desc(), BotActivityEvent.id.desc()) .offset(safe_offset) .limit(safe_limit) ) total_stmt = select(func.count(BotActivityEvent.id)) if bot_id: stmt = stmt.where(BotActivityEvent.bot_id == bot_id) total_stmt = total_stmt.where(BotActivityEvent.bot_id == bot_id) rows = session.exec(stmt).all() total = int(session.exec(total_stmt).one() or 0) items: List[Dict[str, Any]] = [] for row in rows: try: metadata = json.loads(row.metadata_json or "{}") except Exception: metadata = {} items.append( PlatformActivityItem( id=int(row.id or 0), bot_id=row.bot_id, request_id=row.request_id, event_type=row.event_type, channel=row.channel, detail=row.detail, metadata=metadata if isinstance(metadata, dict) else {}, created_at=row.created_at.isoformat() + "Z", ).model_dump() ) return PlatformActivityListResponse( items=[PlatformActivityItem.model_validate(item) for item in items], total=total, limit=safe_limit, offset=safe_offset, has_more=safe_offset + len(items) < total, ).model_dump()