dashboard-nanobot/backend/services/chat_history_service.py

336 lines
11 KiB
Python

import json
import os
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from fastapi import HTTPException
from sqlmodel import Session, select
from core.cache import cache
from core.docker_instance import docker_manager
from core.utils import _resolve_local_day_range
from models.bot import BotInstance, BotMessage
from services.bot_storage_service import _clear_bot_dashboard_direct_session, _clear_bot_sessions, _workspace_root
from services.cache_service import (
_cache_key_bot_messages,
_cache_key_bot_messages_page,
_invalidate_bot_detail_cache,
_invalidate_bot_messages_cache,
)
from services.platform_service import get_chat_pull_page_size, record_activity_event
def _get_bot_or_404(session: Session, bot_id: str) -> BotInstance:
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
return bot
def _normalize_message_media_item(bot_id: str, value: Any) -> str:
raw = str(value or "").strip().replace("\\", "/")
if not raw:
return ""
if raw.startswith("/root/.nanobot/workspace/"):
return raw[len("/root/.nanobot/workspace/") :].lstrip("/")
root = _workspace_root(bot_id)
if os.path.isabs(raw):
try:
if os.path.commonpath([root, raw]) == root:
return os.path.relpath(raw, root).replace("\\", "/")
except Exception:
pass
return raw.lstrip("/")
def _normalize_message_media_list(raw: Any, bot_id: str) -> List[str]:
if not isinstance(raw, list):
return []
rows: List[str] = []
for value in raw:
normalized = _normalize_message_media_item(bot_id, value)
if normalized:
rows.append(normalized)
return rows
def _parse_message_media(bot_id: str, media_raw: Optional[str]) -> List[str]:
if not media_raw:
return []
try:
parsed = json.loads(media_raw)
except Exception:
return []
return _normalize_message_media_list(parsed, bot_id)
def serialize_bot_message_row(bot_id: str, row: BotMessage) -> Dict[str, Any]:
created_at = row.created_at
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
return {
"id": row.id,
"bot_id": row.bot_id,
"role": row.role,
"text": row.text,
"media": _parse_message_media(bot_id, getattr(row, "media_json", None)),
"feedback": str(getattr(row, "feedback", "") or "").strip() or None,
"ts": int(created_at.timestamp() * 1000),
}
def list_bot_messages_payload(session: Session, bot_id: str, limit: int = 200) -> List[Dict[str, Any]]:
_get_bot_or_404(session, bot_id)
safe_limit = max(1, min(int(limit), 500))
cached = cache.get_json(_cache_key_bot_messages(bot_id, safe_limit))
if isinstance(cached, list):
return cached
rows = session.exec(
select(BotMessage)
.where(BotMessage.bot_id == bot_id)
.order_by(BotMessage.created_at.desc(), BotMessage.id.desc())
.limit(safe_limit)
).all()
payload = [serialize_bot_message_row(bot_id, row) for row in reversed(rows)]
cache.set_json(_cache_key_bot_messages(bot_id, safe_limit), payload, ttl=30)
return payload
def list_bot_messages_page_payload(
session: Session,
bot_id: str,
limit: Optional[int],
before_id: Optional[int],
) -> Dict[str, Any]:
_get_bot_or_404(session, bot_id)
configured_limit = get_chat_pull_page_size()
safe_limit = max(1, min(int(limit if limit is not None else configured_limit), 500))
safe_before_id = int(before_id) if isinstance(before_id, int) and before_id > 0 else None
cache_key = _cache_key_bot_messages_page(bot_id, safe_limit, safe_before_id)
cached = cache.get_json(cache_key)
if isinstance(cached, dict) and isinstance(cached.get("items"), list):
return cached
stmt = (
select(BotMessage)
.where(BotMessage.bot_id == bot_id)
.order_by(BotMessage.created_at.desc(), BotMessage.id.desc())
.limit(safe_limit + 1)
)
if safe_before_id is not None:
stmt = stmt.where(BotMessage.id < safe_before_id)
rows = session.exec(stmt).all()
has_more = len(rows) > safe_limit
if has_more:
rows = rows[:safe_limit]
ordered = list(reversed(rows))
payload = {
"items": [serialize_bot_message_row(bot_id, row) for row in ordered],
"has_more": bool(has_more),
"next_before_id": rows[-1].id if rows else None,
"limit": safe_limit,
}
cache.set_json(cache_key, payload, ttl=30)
return payload
def list_bot_messages_by_date_payload(
session: Session,
bot_id: str,
date: str,
tz_offset_minutes: Optional[int],
limit: Optional[int],
) -> Dict[str, Any]:
_get_bot_or_404(session, bot_id)
utc_start, utc_end = _resolve_local_day_range(date, tz_offset_minutes)
configured_limit = max(60, get_chat_pull_page_size())
safe_limit = max(12, min(int(limit if limit is not None else configured_limit), 240))
before_limit = max(3, min(18, safe_limit // 4))
after_limit = max(0, safe_limit - before_limit - 1)
exact_anchor = session.exec(
select(BotMessage)
.where(BotMessage.bot_id == bot_id, BotMessage.created_at >= utc_start, BotMessage.created_at < utc_end)
.order_by(BotMessage.created_at.asc(), BotMessage.id.asc())
.limit(1)
).first()
anchor = exact_anchor
matched_exact_date = exact_anchor is not None
if anchor is None:
next_row = session.exec(
select(BotMessage)
.where(BotMessage.bot_id == bot_id, BotMessage.created_at >= utc_end)
.order_by(BotMessage.created_at.asc(), BotMessage.id.asc())
.limit(1)
).first()
prev_row = session.exec(
select(BotMessage)
.where(BotMessage.bot_id == bot_id, BotMessage.created_at < utc_start)
.order_by(BotMessage.created_at.desc(), BotMessage.id.desc())
.limit(1)
).first()
if next_row and prev_row:
gap_after = next_row.created_at - utc_end
gap_before = utc_start - prev_row.created_at
anchor = next_row if gap_after <= gap_before else prev_row
else:
anchor = next_row or prev_row
if anchor is None or anchor.id is None:
return {
"items": [],
"anchor_id": None,
"resolved_ts": None,
"matched_exact_date": False,
"has_more_before": False,
"has_more_after": False,
}
before_rows = session.exec(
select(BotMessage)
.where(BotMessage.bot_id == bot_id, BotMessage.id < anchor.id)
.order_by(BotMessage.created_at.desc(), BotMessage.id.desc())
.limit(before_limit)
).all()
after_rows = session.exec(
select(BotMessage)
.where(BotMessage.bot_id == bot_id, BotMessage.id > anchor.id)
.order_by(BotMessage.created_at.asc(), BotMessage.id.asc())
.limit(after_limit)
).all()
ordered = list(reversed(before_rows)) + [anchor] + after_rows
first_row = ordered[0] if ordered else None
last_row = ordered[-1] if ordered else None
has_more_before = False
if first_row is not None and first_row.id is not None:
has_more_before = (
session.exec(
select(BotMessage.id)
.where(BotMessage.bot_id == bot_id, BotMessage.id < first_row.id)
.order_by(BotMessage.id.desc())
.limit(1)
).first()
is not None
)
has_more_after = False
if last_row is not None and last_row.id is not None:
has_more_after = (
session.exec(
select(BotMessage.id)
.where(BotMessage.bot_id == bot_id, BotMessage.id > last_row.id)
.order_by(BotMessage.id.asc())
.limit(1)
).first()
is not None
)
return {
"items": [serialize_bot_message_row(bot_id, row) for row in ordered],
"anchor_id": anchor.id,
"resolved_ts": int(anchor.created_at.timestamp() * 1000),
"matched_exact_date": matched_exact_date,
"has_more_before": has_more_before,
"has_more_after": has_more_after,
}
def update_bot_message_feedback_payload(
session: Session,
bot_id: str,
message_id: int,
feedback: Optional[str],
) -> Dict[str, Any]:
_get_bot_or_404(session, bot_id)
row = session.get(BotMessage, message_id)
if not row or row.bot_id != bot_id:
raise HTTPException(status_code=404, detail="Message not found")
if row.role != "assistant":
raise HTTPException(status_code=400, detail="Only assistant messages support feedback")
raw = str(feedback or "").strip().lower()
if raw in {"", "none", "null"}:
row.feedback = None
row.feedback_at = None
elif raw in {"up", "down"}:
row.feedback = raw
row.feedback_at = datetime.utcnow()
else:
raise HTTPException(status_code=400, detail="feedback must be 'up' or 'down'")
session.add(row)
session.commit()
_invalidate_bot_messages_cache(bot_id)
return {
"status": "updated",
"bot_id": bot_id,
"message_id": row.id,
"feedback": row.feedback,
"feedback_at": row.feedback_at.isoformat() if row.feedback_at else None,
}
def clear_bot_messages_payload(session: Session, bot_id: str) -> Dict[str, Any]:
bot = _get_bot_or_404(session, bot_id)
rows = session.exec(select(BotMessage).where(BotMessage.bot_id == bot_id)).all()
deleted = 0
for row in rows:
session.delete(row)
deleted += 1
cleared_sessions = _clear_bot_sessions(bot_id)
if str(bot.docker_status or "").upper() == "RUNNING":
try:
docker_manager.send_command(bot_id, "/new")
except Exception:
pass
bot.last_action = ""
bot.current_state = "IDLE"
bot.updated_at = datetime.utcnow()
session.add(bot)
record_activity_event(
session,
bot_id,
"history_cleared",
channel="system",
detail=f"Cleared {deleted} stored messages",
metadata={"deleted_messages": deleted, "cleared_sessions": cleared_sessions},
)
session.commit()
_invalidate_bot_detail_cache(bot_id)
_invalidate_bot_messages_cache(bot_id)
return {"bot_id": bot_id, "deleted": deleted, "cleared_sessions": cleared_sessions}
def clear_dashboard_direct_session_payload(session: Session, bot_id: str) -> Dict[str, Any]:
bot = _get_bot_or_404(session, bot_id)
result = _clear_bot_dashboard_direct_session(bot_id)
if str(bot.docker_status or "").upper() == "RUNNING":
try:
docker_manager.send_command(bot_id, "/new")
except Exception:
pass
bot.updated_at = datetime.utcnow()
session.add(bot)
record_activity_event(
session,
bot_id,
"dashboard_session_cleared",
channel="dashboard",
detail="Cleared dashboard_direct session file",
metadata={"session_file": result["path"], "previously_existed": result["existed"]},
)
session.commit()
_invalidate_bot_detail_cache(bot_id)
return {
"bot_id": bot_id,
"cleared": True,
"session_file": result["path"],
"previously_existed": result["existed"],
}