import json import os from datetime import datetime, timezone from typing import Any, Dict, List, Optional from fastapi import HTTPException from sqlmodel import Session, select from core.cache import cache from core.docker_instance import docker_manager from core.utils import _resolve_local_day_range from models.bot import BotInstance, BotMessage from services.bot_storage_service import ( _clear_bot_dashboard_direct_session, _clear_bot_sessions, get_bot_workspace_root, ) from services.cache_service import ( _cache_key_bot_messages, _cache_key_bot_messages_page, _invalidate_bot_detail_cache, _invalidate_bot_messages_cache, ) from services.platform_activity_service import record_activity_event from services.platform_settings_service import get_chat_pull_page_size def _get_bot_or_404(session: Session, bot_id: str) -> BotInstance: bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") return bot def _normalize_message_media_item(bot_id: str, value: Any) -> str: raw = str(value or "").strip().replace("\\", "/") if not raw: return "" if raw.startswith("/root/.nanobot/workspace/"): return raw[len("/root/.nanobot/workspace/") :].lstrip("/") root = get_bot_workspace_root(bot_id) if os.path.isabs(raw): try: if os.path.commonpath([root, raw]) == root: return os.path.relpath(raw, root).replace("\\", "/") except Exception: pass return raw.lstrip("/") def _normalize_message_media_list(raw: Any, bot_id: str) -> List[str]: if not isinstance(raw, list): return [] rows: List[str] = [] for value in raw: normalized = _normalize_message_media_item(bot_id, value) if normalized: rows.append(normalized) return rows def _parse_message_media(bot_id: str, media_raw: Optional[str]) -> List[str]: if not media_raw: return [] try: parsed = json.loads(media_raw) except Exception: return [] return _normalize_message_media_list(parsed, bot_id) def serialize_bot_message_row(bot_id: str, row: BotMessage) -> Dict[str, Any]: created_at = row.created_at if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) return { "id": row.id, "bot_id": row.bot_id, "role": row.role, "text": row.text, "media": _parse_message_media(bot_id, getattr(row, "media_json", None)), "feedback": str(getattr(row, "feedback", "") or "").strip() or None, "ts": int(created_at.timestamp() * 1000), } def list_bot_messages_payload(session: Session, bot_id: str, limit: int = 200) -> List[Dict[str, Any]]: _get_bot_or_404(session, bot_id) safe_limit = max(1, min(int(limit), 500)) cached = cache.get_json(_cache_key_bot_messages(bot_id, safe_limit)) if isinstance(cached, list): return cached rows = session.exec( select(BotMessage) .where(BotMessage.bot_id == bot_id) .order_by(BotMessage.created_at.desc(), BotMessage.id.desc()) .limit(safe_limit) ).all() payload = [serialize_bot_message_row(bot_id, row) for row in reversed(rows)] cache.set_json(_cache_key_bot_messages(bot_id, safe_limit), payload, ttl=30) return payload def list_bot_messages_page_payload( session: Session, bot_id: str, limit: Optional[int], before_id: Optional[int], ) -> Dict[str, Any]: _get_bot_or_404(session, bot_id) configured_limit = get_chat_pull_page_size() safe_limit = max(1, min(int(limit if limit is not None else configured_limit), 500)) safe_before_id = int(before_id) if isinstance(before_id, int) and before_id > 0 else None cache_key = _cache_key_bot_messages_page(bot_id, safe_limit, safe_before_id) cached = cache.get_json(cache_key) if isinstance(cached, dict) and isinstance(cached.get("items"), list): return cached stmt = ( select(BotMessage) .where(BotMessage.bot_id == bot_id) .order_by(BotMessage.created_at.desc(), BotMessage.id.desc()) .limit(safe_limit + 1) ) if safe_before_id is not None: stmt = stmt.where(BotMessage.id < safe_before_id) rows = session.exec(stmt).all() has_more = len(rows) > safe_limit if has_more: rows = rows[:safe_limit] ordered = list(reversed(rows)) payload = { "items": [serialize_bot_message_row(bot_id, row) for row in ordered], "has_more": bool(has_more), "next_before_id": rows[-1].id if rows else None, "limit": safe_limit, } cache.set_json(cache_key, payload, ttl=30) return payload def list_bot_messages_by_date_payload( session: Session, bot_id: str, date: str, tz_offset_minutes: Optional[int], limit: Optional[int], ) -> Dict[str, Any]: _get_bot_or_404(session, bot_id) utc_start, utc_end = _resolve_local_day_range(date, tz_offset_minutes) configured_limit = max(60, get_chat_pull_page_size()) safe_limit = max(12, min(int(limit if limit is not None else configured_limit), 240)) before_limit = max(3, min(18, safe_limit // 4)) after_limit = max(0, safe_limit - before_limit - 1) exact_anchor = session.exec( select(BotMessage) .where(BotMessage.bot_id == bot_id, BotMessage.created_at >= utc_start, BotMessage.created_at < utc_end) .order_by(BotMessage.created_at.asc(), BotMessage.id.asc()) .limit(1) ).first() anchor = exact_anchor matched_exact_date = exact_anchor is not None if anchor is None: next_row = session.exec( select(BotMessage) .where(BotMessage.bot_id == bot_id, BotMessage.created_at >= utc_end) .order_by(BotMessage.created_at.asc(), BotMessage.id.asc()) .limit(1) ).first() prev_row = session.exec( select(BotMessage) .where(BotMessage.bot_id == bot_id, BotMessage.created_at < utc_start) .order_by(BotMessage.created_at.desc(), BotMessage.id.desc()) .limit(1) ).first() if next_row and prev_row: gap_after = next_row.created_at - utc_end gap_before = utc_start - prev_row.created_at anchor = next_row if gap_after <= gap_before else prev_row else: anchor = next_row or prev_row if anchor is None or anchor.id is None: return { "items": [], "anchor_id": None, "resolved_ts": None, "matched_exact_date": False, "has_more_before": False, "has_more_after": False, } before_rows = session.exec( select(BotMessage) .where(BotMessage.bot_id == bot_id, BotMessage.id < anchor.id) .order_by(BotMessage.created_at.desc(), BotMessage.id.desc()) .limit(before_limit) ).all() after_rows = session.exec( select(BotMessage) .where(BotMessage.bot_id == bot_id, BotMessage.id > anchor.id) .order_by(BotMessage.created_at.asc(), BotMessage.id.asc()) .limit(after_limit) ).all() ordered = list(reversed(before_rows)) + [anchor] + after_rows first_row = ordered[0] if ordered else None last_row = ordered[-1] if ordered else None has_more_before = False if first_row is not None and first_row.id is not None: has_more_before = ( session.exec( select(BotMessage.id) .where(BotMessage.bot_id == bot_id, BotMessage.id < first_row.id) .order_by(BotMessage.id.desc()) .limit(1) ).first() is not None ) has_more_after = False if last_row is not None and last_row.id is not None: has_more_after = ( session.exec( select(BotMessage.id) .where(BotMessage.bot_id == bot_id, BotMessage.id > last_row.id) .order_by(BotMessage.id.asc()) .limit(1) ).first() is not None ) return { "items": [serialize_bot_message_row(bot_id, row) for row in ordered], "anchor_id": anchor.id, "resolved_ts": int(anchor.created_at.timestamp() * 1000), "matched_exact_date": matched_exact_date, "has_more_before": has_more_before, "has_more_after": has_more_after, } def update_bot_message_feedback_payload( session: Session, bot_id: str, message_id: int, feedback: Optional[str], ) -> Dict[str, Any]: _get_bot_or_404(session, bot_id) row = session.get(BotMessage, message_id) if not row or row.bot_id != bot_id: raise HTTPException(status_code=404, detail="Message not found") if row.role != "assistant": raise HTTPException(status_code=400, detail="Only assistant messages support feedback") raw = str(feedback or "").strip().lower() if raw in {"", "none", "null"}: row.feedback = None row.feedback_at = None elif raw in {"up", "down"}: row.feedback = raw row.feedback_at = datetime.utcnow() else: raise HTTPException(status_code=400, detail="feedback must be 'up' or 'down'") session.add(row) session.commit() _invalidate_bot_messages_cache(bot_id) return { "status": "updated", "bot_id": bot_id, "message_id": row.id, "feedback": row.feedback, "feedback_at": row.feedback_at.isoformat() if row.feedback_at else None, } def delete_bot_message_payload( session: Session, bot_id: str, message_id: int, ) -> Dict[str, Any]: _get_bot_or_404(session, bot_id) row = session.get(BotMessage, message_id) if not row or row.bot_id != bot_id: raise HTTPException(status_code=404, detail="Message not found") deleted_role = str(row.role or "").strip() or "assistant" session.delete(row) record_activity_event( session, bot_id, "message_deleted", channel="dashboard", detail=f"Deleted {deleted_role} message #{message_id}", metadata={"message_id": message_id, "role": deleted_role}, ) session.commit() _invalidate_bot_detail_cache(bot_id) _invalidate_bot_messages_cache(bot_id) return { "status": "deleted", "bot_id": bot_id, "message_id": message_id, "role": deleted_role, } def clear_bot_messages_payload(session: Session, bot_id: str) -> Dict[str, Any]: bot = _get_bot_or_404(session, bot_id) rows = session.exec(select(BotMessage).where(BotMessage.bot_id == bot_id)).all() deleted = 0 for row in rows: session.delete(row) deleted += 1 cleared_sessions = _clear_bot_sessions(bot_id) if str(bot.docker_status or "").upper() == "RUNNING": try: docker_manager.send_command(bot_id, "/new") except Exception: pass bot.last_action = "" bot.current_state = "IDLE" bot.updated_at = datetime.utcnow() session.add(bot) record_activity_event( session, bot_id, "history_cleared", channel="system", detail=f"Cleared {deleted} stored messages", metadata={"deleted_messages": deleted, "cleared_sessions": cleared_sessions}, ) session.commit() _invalidate_bot_detail_cache(bot_id) _invalidate_bot_messages_cache(bot_id) return {"bot_id": bot_id, "deleted": deleted, "cleared_sessions": cleared_sessions} def clear_dashboard_direct_session_payload(session: Session, bot_id: str) -> Dict[str, Any]: bot = _get_bot_or_404(session, bot_id) result = _clear_bot_dashboard_direct_session(bot_id) if str(bot.docker_status or "").upper() == "RUNNING": try: docker_manager.send_command(bot_id, "/new") except Exception: pass bot.updated_at = datetime.utcnow() session.add(bot) record_activity_event( session, bot_id, "dashboard_session_cleared", channel="dashboard", detail="Cleared dashboard_direct session file", metadata={"session_file": result["path"], "previously_existed": result["existed"]}, ) session.commit() _invalidate_bot_detail_cache(bot_id) return { "bot_id": bot_id, "cleared": True, "session_file": result["path"], "previously_existed": result["existed"], }