372 lines
12 KiB
Python
372 lines
12 KiB
Python
import json
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from fastapi import HTTPException
|
|
from sqlmodel import Session, select
|
|
|
|
from core.cache import cache
|
|
from core.docker_instance import docker_manager
|
|
from core.utils import _resolve_local_day_range
|
|
from models.bot import BotInstance, BotMessage
|
|
from services.bot_storage_service import (
|
|
_clear_bot_dashboard_direct_session,
|
|
_clear_bot_sessions,
|
|
get_bot_workspace_root,
|
|
)
|
|
from services.cache_service import (
|
|
_cache_key_bot_messages,
|
|
_cache_key_bot_messages_page,
|
|
_invalidate_bot_detail_cache,
|
|
_invalidate_bot_messages_cache,
|
|
)
|
|
from services.platform_activity_service import record_activity_event
|
|
from services.platform_settings_service import get_chat_pull_page_size
|
|
|
|
|
|
def _get_bot_or_404(session: Session, bot_id: str) -> BotInstance:
|
|
bot = session.get(BotInstance, bot_id)
|
|
if not bot:
|
|
raise HTTPException(status_code=404, detail="Bot not found")
|
|
return bot
|
|
|
|
|
|
def _normalize_message_media_item(bot_id: str, value: Any) -> str:
|
|
raw = str(value or "").strip().replace("\\", "/")
|
|
if not raw:
|
|
return ""
|
|
if raw.startswith("/root/.nanobot/workspace/"):
|
|
return raw[len("/root/.nanobot/workspace/") :].lstrip("/")
|
|
root = get_bot_workspace_root(bot_id)
|
|
if os.path.isabs(raw):
|
|
try:
|
|
if os.path.commonpath([root, raw]) == root:
|
|
return os.path.relpath(raw, root).replace("\\", "/")
|
|
except Exception:
|
|
pass
|
|
return raw.lstrip("/")
|
|
|
|
|
|
def _normalize_message_media_list(raw: Any, bot_id: str) -> List[str]:
|
|
if not isinstance(raw, list):
|
|
return []
|
|
rows: List[str] = []
|
|
for value in raw:
|
|
normalized = _normalize_message_media_item(bot_id, value)
|
|
if normalized:
|
|
rows.append(normalized)
|
|
return rows
|
|
|
|
|
|
def _parse_message_media(bot_id: str, media_raw: Optional[str]) -> List[str]:
|
|
if not media_raw:
|
|
return []
|
|
try:
|
|
parsed = json.loads(media_raw)
|
|
except Exception:
|
|
return []
|
|
return _normalize_message_media_list(parsed, bot_id)
|
|
|
|
|
|
def serialize_bot_message_row(bot_id: str, row: BotMessage) -> Dict[str, Any]:
|
|
created_at = row.created_at
|
|
if created_at.tzinfo is None:
|
|
created_at = created_at.replace(tzinfo=timezone.utc)
|
|
return {
|
|
"id": row.id,
|
|
"bot_id": row.bot_id,
|
|
"role": row.role,
|
|
"text": row.text,
|
|
"media": _parse_message_media(bot_id, getattr(row, "media_json", None)),
|
|
"feedback": str(getattr(row, "feedback", "") or "").strip() or None,
|
|
"ts": int(created_at.timestamp() * 1000),
|
|
}
|
|
|
|
|
|
def list_bot_messages_payload(session: Session, bot_id: str, limit: int = 200) -> List[Dict[str, Any]]:
|
|
_get_bot_or_404(session, bot_id)
|
|
safe_limit = max(1, min(int(limit), 500))
|
|
cached = cache.get_json(_cache_key_bot_messages(bot_id, safe_limit))
|
|
if isinstance(cached, list):
|
|
return cached
|
|
rows = session.exec(
|
|
select(BotMessage)
|
|
.where(BotMessage.bot_id == bot_id)
|
|
.order_by(BotMessage.created_at.desc(), BotMessage.id.desc())
|
|
.limit(safe_limit)
|
|
).all()
|
|
payload = [serialize_bot_message_row(bot_id, row) for row in reversed(rows)]
|
|
cache.set_json(_cache_key_bot_messages(bot_id, safe_limit), payload, ttl=30)
|
|
return payload
|
|
|
|
|
|
def list_bot_messages_page_payload(
|
|
session: Session,
|
|
bot_id: str,
|
|
limit: Optional[int],
|
|
before_id: Optional[int],
|
|
) -> Dict[str, Any]:
|
|
_get_bot_or_404(session, bot_id)
|
|
configured_limit = get_chat_pull_page_size()
|
|
safe_limit = max(1, min(int(limit if limit is not None else configured_limit), 500))
|
|
safe_before_id = int(before_id) if isinstance(before_id, int) and before_id > 0 else None
|
|
cache_key = _cache_key_bot_messages_page(bot_id, safe_limit, safe_before_id)
|
|
cached = cache.get_json(cache_key)
|
|
if isinstance(cached, dict) and isinstance(cached.get("items"), list):
|
|
return cached
|
|
|
|
stmt = (
|
|
select(BotMessage)
|
|
.where(BotMessage.bot_id == bot_id)
|
|
.order_by(BotMessage.created_at.desc(), BotMessage.id.desc())
|
|
.limit(safe_limit + 1)
|
|
)
|
|
if safe_before_id is not None:
|
|
stmt = stmt.where(BotMessage.id < safe_before_id)
|
|
|
|
rows = session.exec(stmt).all()
|
|
has_more = len(rows) > safe_limit
|
|
if has_more:
|
|
rows = rows[:safe_limit]
|
|
ordered = list(reversed(rows))
|
|
payload = {
|
|
"items": [serialize_bot_message_row(bot_id, row) for row in ordered],
|
|
"has_more": bool(has_more),
|
|
"next_before_id": rows[-1].id if rows else None,
|
|
"limit": safe_limit,
|
|
}
|
|
cache.set_json(cache_key, payload, ttl=30)
|
|
return payload
|
|
|
|
|
|
def list_bot_messages_by_date_payload(
|
|
session: Session,
|
|
bot_id: str,
|
|
date: str,
|
|
tz_offset_minutes: Optional[int],
|
|
limit: Optional[int],
|
|
) -> Dict[str, Any]:
|
|
_get_bot_or_404(session, bot_id)
|
|
|
|
utc_start, utc_end = _resolve_local_day_range(date, tz_offset_minutes)
|
|
configured_limit = max(60, get_chat_pull_page_size())
|
|
safe_limit = max(12, min(int(limit if limit is not None else configured_limit), 240))
|
|
before_limit = max(3, min(18, safe_limit // 4))
|
|
after_limit = max(0, safe_limit - before_limit - 1)
|
|
|
|
exact_anchor = session.exec(
|
|
select(BotMessage)
|
|
.where(BotMessage.bot_id == bot_id, BotMessage.created_at >= utc_start, BotMessage.created_at < utc_end)
|
|
.order_by(BotMessage.created_at.asc(), BotMessage.id.asc())
|
|
.limit(1)
|
|
).first()
|
|
|
|
anchor = exact_anchor
|
|
matched_exact_date = exact_anchor is not None
|
|
if anchor is None:
|
|
next_row = session.exec(
|
|
select(BotMessage)
|
|
.where(BotMessage.bot_id == bot_id, BotMessage.created_at >= utc_end)
|
|
.order_by(BotMessage.created_at.asc(), BotMessage.id.asc())
|
|
.limit(1)
|
|
).first()
|
|
prev_row = session.exec(
|
|
select(BotMessage)
|
|
.where(BotMessage.bot_id == bot_id, BotMessage.created_at < utc_start)
|
|
.order_by(BotMessage.created_at.desc(), BotMessage.id.desc())
|
|
.limit(1)
|
|
).first()
|
|
if next_row and prev_row:
|
|
gap_after = next_row.created_at - utc_end
|
|
gap_before = utc_start - prev_row.created_at
|
|
anchor = next_row if gap_after <= gap_before else prev_row
|
|
else:
|
|
anchor = next_row or prev_row
|
|
|
|
if anchor is None or anchor.id is None:
|
|
return {
|
|
"items": [],
|
|
"anchor_id": None,
|
|
"resolved_ts": None,
|
|
"matched_exact_date": False,
|
|
"has_more_before": False,
|
|
"has_more_after": False,
|
|
}
|
|
|
|
before_rows = session.exec(
|
|
select(BotMessage)
|
|
.where(BotMessage.bot_id == bot_id, BotMessage.id < anchor.id)
|
|
.order_by(BotMessage.created_at.desc(), BotMessage.id.desc())
|
|
.limit(before_limit)
|
|
).all()
|
|
after_rows = session.exec(
|
|
select(BotMessage)
|
|
.where(BotMessage.bot_id == bot_id, BotMessage.id > anchor.id)
|
|
.order_by(BotMessage.created_at.asc(), BotMessage.id.asc())
|
|
.limit(after_limit)
|
|
).all()
|
|
|
|
ordered = list(reversed(before_rows)) + [anchor] + after_rows
|
|
first_row = ordered[0] if ordered else None
|
|
last_row = ordered[-1] if ordered else None
|
|
|
|
has_more_before = False
|
|
if first_row is not None and first_row.id is not None:
|
|
has_more_before = (
|
|
session.exec(
|
|
select(BotMessage.id)
|
|
.where(BotMessage.bot_id == bot_id, BotMessage.id < first_row.id)
|
|
.order_by(BotMessage.id.desc())
|
|
.limit(1)
|
|
).first()
|
|
is not None
|
|
)
|
|
|
|
has_more_after = False
|
|
if last_row is not None and last_row.id is not None:
|
|
has_more_after = (
|
|
session.exec(
|
|
select(BotMessage.id)
|
|
.where(BotMessage.bot_id == bot_id, BotMessage.id > last_row.id)
|
|
.order_by(BotMessage.id.asc())
|
|
.limit(1)
|
|
).first()
|
|
is not None
|
|
)
|
|
|
|
return {
|
|
"items": [serialize_bot_message_row(bot_id, row) for row in ordered],
|
|
"anchor_id": anchor.id,
|
|
"resolved_ts": int(anchor.created_at.timestamp() * 1000),
|
|
"matched_exact_date": matched_exact_date,
|
|
"has_more_before": has_more_before,
|
|
"has_more_after": has_more_after,
|
|
}
|
|
|
|
|
|
def update_bot_message_feedback_payload(
|
|
session: Session,
|
|
bot_id: str,
|
|
message_id: int,
|
|
feedback: Optional[str],
|
|
) -> Dict[str, Any]:
|
|
_get_bot_or_404(session, bot_id)
|
|
row = session.get(BotMessage, message_id)
|
|
if not row or row.bot_id != bot_id:
|
|
raise HTTPException(status_code=404, detail="Message not found")
|
|
if row.role != "assistant":
|
|
raise HTTPException(status_code=400, detail="Only assistant messages support feedback")
|
|
|
|
raw = str(feedback or "").strip().lower()
|
|
if raw in {"", "none", "null"}:
|
|
row.feedback = None
|
|
row.feedback_at = None
|
|
elif raw in {"up", "down"}:
|
|
row.feedback = raw
|
|
row.feedback_at = datetime.utcnow()
|
|
else:
|
|
raise HTTPException(status_code=400, detail="feedback must be 'up' or 'down'")
|
|
|
|
session.add(row)
|
|
session.commit()
|
|
_invalidate_bot_messages_cache(bot_id)
|
|
return {
|
|
"status": "updated",
|
|
"bot_id": bot_id,
|
|
"message_id": row.id,
|
|
"feedback": row.feedback,
|
|
"feedback_at": row.feedback_at.isoformat() if row.feedback_at else None,
|
|
}
|
|
|
|
|
|
def delete_bot_message_payload(
|
|
session: Session,
|
|
bot_id: str,
|
|
message_id: int,
|
|
) -> Dict[str, Any]:
|
|
_get_bot_or_404(session, bot_id)
|
|
row = session.get(BotMessage, message_id)
|
|
if not row or row.bot_id != bot_id:
|
|
raise HTTPException(status_code=404, detail="Message not found")
|
|
|
|
deleted_role = str(row.role or "").strip() or "assistant"
|
|
session.delete(row)
|
|
record_activity_event(
|
|
session,
|
|
bot_id,
|
|
"message_deleted",
|
|
channel="dashboard",
|
|
detail=f"Deleted {deleted_role} message #{message_id}",
|
|
metadata={"message_id": message_id, "role": deleted_role},
|
|
)
|
|
session.commit()
|
|
_invalidate_bot_detail_cache(bot_id)
|
|
_invalidate_bot_messages_cache(bot_id)
|
|
return {
|
|
"status": "deleted",
|
|
"bot_id": bot_id,
|
|
"message_id": message_id,
|
|
"role": deleted_role,
|
|
}
|
|
|
|
|
|
def clear_bot_messages_payload(session: Session, bot_id: str) -> Dict[str, Any]:
|
|
bot = _get_bot_or_404(session, bot_id)
|
|
rows = session.exec(select(BotMessage).where(BotMessage.bot_id == bot_id)).all()
|
|
deleted = 0
|
|
for row in rows:
|
|
session.delete(row)
|
|
deleted += 1
|
|
cleared_sessions = _clear_bot_sessions(bot_id)
|
|
if str(bot.docker_status or "").upper() == "RUNNING":
|
|
try:
|
|
docker_manager.send_command(bot_id, "/new")
|
|
except Exception:
|
|
pass
|
|
bot.last_action = ""
|
|
bot.current_state = "IDLE"
|
|
bot.updated_at = datetime.utcnow()
|
|
session.add(bot)
|
|
record_activity_event(
|
|
session,
|
|
bot_id,
|
|
"history_cleared",
|
|
channel="system",
|
|
detail=f"Cleared {deleted} stored messages",
|
|
metadata={"deleted_messages": deleted, "cleared_sessions": cleared_sessions},
|
|
)
|
|
session.commit()
|
|
_invalidate_bot_detail_cache(bot_id)
|
|
_invalidate_bot_messages_cache(bot_id)
|
|
return {"bot_id": bot_id, "deleted": deleted, "cleared_sessions": cleared_sessions}
|
|
|
|
|
|
def clear_dashboard_direct_session_payload(session: Session, bot_id: str) -> Dict[str, Any]:
|
|
bot = _get_bot_or_404(session, bot_id)
|
|
result = _clear_bot_dashboard_direct_session(bot_id)
|
|
if str(bot.docker_status or "").upper() == "RUNNING":
|
|
try:
|
|
docker_manager.send_command(bot_id, "/new")
|
|
except Exception:
|
|
pass
|
|
|
|
bot.updated_at = datetime.utcnow()
|
|
session.add(bot)
|
|
record_activity_event(
|
|
session,
|
|
bot_id,
|
|
"dashboard_session_cleared",
|
|
channel="dashboard",
|
|
detail="Cleared dashboard_direct session file",
|
|
metadata={"session_file": result["path"], "previously_existed": result["existed"]},
|
|
)
|
|
session.commit()
|
|
_invalidate_bot_detail_cache(bot_id)
|
|
return {
|
|
"bot_id": bot_id,
|
|
"cleared": True,
|
|
"session_file": result["path"],
|
|
"previously_existed": result["existed"],
|
|
}
|