dashboard-nanobot/backend/services/topic_service.py

718 lines
24 KiB
Python
Raw Normal View History

2026-03-13 06:40:54 +00:00
import json
import logging
import os
import re
import secrets
from datetime import datetime
from typing import Any, Dict, List, Optional
from sqlmodel import Session, select
from core.settings import BOTS_WORKSPACE_ROOT, TOPIC_MCP_INTERNAL_URL
from models.bot import BotInstance
from models.topic import TopicItem, TopicTopic
logger = logging.getLogger("dashboard.topic_mcp")
BOT_ID_PATTERN = re.compile(r"^[A-Za-z0-9_]+$")
TOPIC_MCP_SERVER_NAME = "topic_mcp"
TOPIC_MCP_TOKEN_HEADER = "x-topic-mcp-token"
TOPIC_MCP_DEFAULT_URL = TOPIC_MCP_INTERNAL_URL
TOPIC_MCP_DEFAULT_TIMEOUT = 30
TOPIC_MCP_PROTOCOL_VERSION = "2025-03-26"
TOPIC_DEDUPE_WINDOW_SECONDS = 10 * 60
TOPIC_LEVEL_SET = {"info", "warn", "error", "success"}
_TOPIC_KEY_RE = re.compile(r"^[a-z0-9][a-z0-9_.-]{0,63}$")
def _bot_data_root(bot_id: str) -> str:
return os.path.join(BOTS_WORKSPACE_ROOT, bot_id, ".nanobot")
def _config_json_path(bot_id: str) -> str:
return os.path.join(_bot_data_root(bot_id), "config.json")
def _read_bot_config(bot_id: str) -> Dict[str, Any]:
path = _config_json_path(bot_id)
if not os.path.isfile(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _write_bot_config(bot_id: str, config_data: Dict[str, Any]) -> None:
path = _config_json_path(bot_id)
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp = f"{path}.tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(config_data, f, ensure_ascii=False, indent=2)
os.replace(tmp, path)
def _dict_get_ci(raw: Any, key: str) -> Any:
if not isinstance(raw, dict):
return None
wanted = str(key or "").strip().lower()
for k, v in raw.items():
if str(k or "").strip().lower() == wanted:
return v
return None
def _as_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
text = str(value or "").strip().lower()
return text in {"1", "true", "yes", "on", "y"}
def _extract_topic_mcp_token(server_cfg: Any) -> str:
headers = server_cfg.get("headers") if isinstance(server_cfg, dict) else None
return str(_dict_get_ci(headers, TOPIC_MCP_TOKEN_HEADER) or "").strip()
def _generate_topic_mcp_token(bot_id: str) -> str:
return f"{bot_id}.{secrets.token_urlsafe(24)}"
def _build_locked_topic_mcp_server(bot_id: str, token: str) -> Dict[str, Any]:
fixed_token = str(token or "").strip() or _generate_topic_mcp_token(bot_id)
return {
"type": "streamableHttp",
"url": TOPIC_MCP_DEFAULT_URL,
"headers": {TOPIC_MCP_TOKEN_HEADER: fixed_token},
"toolTimeout": TOPIC_MCP_DEFAULT_TIMEOUT,
}
def _annotate_locked_mcp_servers(raw_servers: Dict[str, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
rows: Dict[str, Dict[str, Any]] = {}
for name, cfg in raw_servers.items():
if not isinstance(cfg, dict):
continue
row = dict(cfg)
row["locked"] = name == TOPIC_MCP_SERVER_NAME
rows[name] = row
return rows
def _ensure_topic_mcp_server(bot_id: str, config_data: Optional[Dict[str, Any]] = None, persist: bool = True) -> Dict[str, Any]:
2026-03-13 11:23:06 +00:00
working = config_data if isinstance(config_data, dict) else _read_bot_config(bot_id)
2026-03-13 06:40:54 +00:00
tools_cfg = working.get("tools")
if not isinstance(tools_cfg, dict):
tools_cfg = {}
mcp_servers = tools_cfg.get("mcpServers")
if not isinstance(mcp_servers, dict):
mcp_servers = {}
existing_server = mcp_servers.get(TOPIC_MCP_SERVER_NAME)
existing_token = _extract_topic_mcp_token(existing_server)
locked_server = _build_locked_topic_mcp_server(bot_id, existing_token)
changed = mcp_servers.get(TOPIC_MCP_SERVER_NAME) != locked_server
mcp_servers[TOPIC_MCP_SERVER_NAME] = locked_server
tools_cfg["mcpServers"] = mcp_servers
working["tools"] = tools_cfg
if persist and changed:
_write_bot_config(bot_id, working)
return locked_server
def _resolve_topic_mcp_bot_id_by_token(session: Session, token: str) -> Optional[str]:
incoming = str(token or "").strip()
if not incoming:
return None
candidates: List[str] = []
hinted_bot_id = incoming.split(".", 1)[0].strip()
if hinted_bot_id and BOT_ID_PATTERN.fullmatch(hinted_bot_id):
candidates.append(hinted_bot_id)
for bot in session.exec(select(BotInstance)).all():
if bot.id not in candidates:
candidates.append(bot.id)
for bot_id in candidates:
config_data = _read_bot_config(bot_id)
tools_cfg = config_data.get("tools")
if not isinstance(tools_cfg, dict):
continue
mcp_servers = tools_cfg.get("mcpServers")
if not isinstance(mcp_servers, dict):
continue
expected = _extract_topic_mcp_token(mcp_servers.get(TOPIC_MCP_SERVER_NAME))
if expected and secrets.compare_digest(expected, incoming):
return bot_id
return None
def _normalize_topic_key(raw: Any) -> str:
value = str(raw or "").strip().lower()
if not value:
return ""
return value
def _ensure_topic_defaults(session: Session, bot_id: str) -> None:
# Deprecated: topic feed global switch/fallback removed.
# Keep as no-op for call-site compatibility.
_ = session
_ = bot_id
return None
def _parse_json_dict(raw: str) -> Dict[str, Any]:
text = str(raw or "").strip()
if not text:
return {}
try:
data = json.loads(text)
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _parse_json_list(raw: str) -> List[Any]:
text = str(raw or "").strip()
if not text:
return []
try:
data = json.loads(text)
except Exception:
return []
return data if isinstance(data, list) else []
def _topic_to_dict(row: TopicTopic) -> Dict[str, Any]:
return {
"id": row.id,
"bot_id": row.bot_id,
"topic_key": str(row.topic_key or "").strip().lower(),
"name": row.name or "",
"description": row.description or "",
"is_active": bool(row.is_active),
"routing": _parse_json_dict(row.routing_json or "{}"),
"view_schema": _parse_json_dict(row.view_schema_json or "{}"),
"created_at": row.created_at.isoformat() if row.created_at else None,
"updated_at": row.updated_at.isoformat() if row.updated_at else None,
}
def _list_topics(session: Session, bot_id: str) -> List[Dict[str, Any]]:
rows = session.exec(
select(TopicTopic)
.where(TopicTopic.bot_id == bot_id)
.order_by(TopicTopic.is_active.desc(), TopicTopic.topic_key.asc())
).all()
return [_topic_to_dict(row) for row in rows]
def _topic_item_to_dict(row: TopicItem) -> Dict[str, Any]:
return {
"id": row.id,
"bot_id": row.bot_id,
"topic_key": str(row.topic_key or "").strip().lower(),
"title": row.title or "",
"content": row.content or "",
"level": str(row.level or "info").strip().lower(),
"tags": _parse_json_list(row.tags_json or "[]"),
"view": _parse_json_dict(row.view_json or "{}"),
"source": row.source or "mcp",
"dedupe_key": row.dedupe_key or "",
"is_read": bool(row.is_read),
"created_at": row.created_at.isoformat() if row.created_at else None,
}
def _topic_get_row(session: Session, bot_id: str, topic_key: str) -> Optional[TopicTopic]:
normalized = _normalize_topic_key(topic_key)
if not normalized:
return None
return session.exec(
select(TopicTopic)
.where(TopicTopic.bot_id == bot_id)
.where(TopicTopic.topic_key == normalized)
.limit(1)
).first()
def _normalize_topic_keywords(raw: Any) -> List[str]:
rows: List[str] = []
if isinstance(raw, list):
for item in raw:
text = str(item or "").strip().lower()
if text and text not in rows:
rows.append(text)
elif isinstance(raw, str):
text = raw.strip().lower()
if text:
rows.append(text)
return rows
def _topic_filter_reason(payload: Dict[str, Any]) -> str:
if _as_bool(payload.get("is_progress")):
return "progress message is filtered"
if _as_bool(payload.get("is_tool_hint")):
return "tool hint message is filtered"
source = str(payload.get("source") or payload.get("type") or "").strip().lower()
if source in {"progress", "tool_hint", "sendprogress", "sendtoolhints"}:
return f"{source} message is filtered"
return ""
def _topic_route_pick(
session: Session,
bot_id: str,
payload: Dict[str, Any],
requested_topic_key: str = "",
) -> Dict[str, Any]:
active_topics = session.exec(
select(TopicTopic)
.where(TopicTopic.bot_id == bot_id)
.where(TopicTopic.is_active == True)
.order_by(TopicTopic.topic_key.asc())
).all()
if not active_topics:
return {
"matched": False,
"topic_key": None,
"confidence": 1.0,
"reason": "no active topic configured",
}
req_key = _normalize_topic_key(requested_topic_key or payload.get("topic_key") or payload.get("topic"))
if req_key:
row = _topic_get_row(session, bot_id, req_key)
if row and bool(row.is_active):
return {
"matched": True,
"topic_key": req_key,
"confidence": 0.99,
"reason": "explicit topic key accepted",
}
return {
"matched": False,
"topic_key": None,
"confidence": 0.72,
"reason": f"requested topic {req_key} unavailable or inactive",
}
text = " ".join(
[
str(payload.get("title") or "").strip(),
str(payload.get("content") or payload.get("text") or "").strip(),
" ".join([str(v or "").strip() for v in (payload.get("tags") or [])]),
]
).strip().lower()
if not text:
return {
"matched": False,
"topic_key": None,
"confidence": 1.0,
"reason": "no routing evidence",
}
best_key = ""
best_score = -10.0
best_reason = "no topic matched"
matched_include = False
for topic in active_topics:
key = _normalize_topic_key(topic.topic_key)
if not key:
continue
routing = _parse_json_dict(topic.routing_json or "{}")
include_when = _normalize_topic_keywords(routing.get("include_when"))
exclude_when = _normalize_topic_keywords(routing.get("exclude_when"))
priority_raw = routing.get("priority", 0)
try:
priority = max(0, min(int(priority_raw), 100))
except Exception:
priority = 0
include_hits = [kw for kw in include_when if kw in text]
exclude_hits = [kw for kw in exclude_when if kw in text]
if not include_hits:
continue
matched_include = True
score = float(len(include_hits) * 2 - len(exclude_hits) * 3) + (priority / 1000.0)
if score > best_score:
best_score = score
best_key = key
if include_hits:
best_reason = f"matched include_when: {', '.join(include_hits[:3])}"
elif exclude_hits:
best_reason = f"matched exclude_when: {', '.join(exclude_hits[:3])}"
else:
best_reason = "no include/exclude match, used highest priority active topic"
if not matched_include:
return {
"matched": False,
"topic_key": None,
"confidence": 0.68,
"reason": "no include_when matched",
}
if best_score <= 0:
return {
"matched": False,
"topic_key": None,
"confidence": 0.68,
"reason": "no positive routing score",
}
confidence = min(0.95, max(0.61, 0.61 + best_score / 12.0))
return {
"matched": True,
"topic_key": best_key,
"confidence": round(confidence, 3),
"reason": best_reason,
}
def _topic_publish_internal(session: Session, bot_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
filter_reason = _topic_filter_reason(payload)
if filter_reason:
return {
"published": False,
"skipped": True,
"reason": filter_reason,
}
title = str(payload.get("title") or "").strip()
content = str(payload.get("content") or payload.get("text") or "").strip()
if not title and not content:
return {
"published": False,
"skipped": True,
"reason": "empty title/content",
}
level = str(payload.get("level") or "info").strip().lower()
if level not in TOPIC_LEVEL_SET:
level = "info"
tags = payload.get("tags")
tags_rows: List[str] = []
if isinstance(tags, list):
for tag in tags:
text = str(tag or "").strip()
if text and text not in tags_rows:
tags_rows.append(text[:64])
route_result = _topic_route_pick(session, bot_id, payload, requested_topic_key=str(payload.get("topic_key") or ""))
if not bool(route_result.get("matched")):
return {
"published": False,
"skipped": True,
"reason": str(route_result.get("reason") or "no topic matched"),
"route": route_result,
}
topic_key = _normalize_topic_key(route_result.get("topic_key"))
if not topic_key:
return {
"published": False,
"skipped": True,
"reason": "invalid topic route result",
"route": route_result,
}
row = _topic_get_row(session, bot_id, topic_key)
if not row or not bool(row.is_active):
return {
"published": False,
"skipped": True,
"reason": f"topic {topic_key} unavailable or inactive",
"route": route_result,
}
dedupe_key = str(payload.get("dedupe_key") or "").strip()
if dedupe_key:
existing = session.exec(
select(TopicItem)
.where(TopicItem.bot_id == bot_id)
.where(TopicItem.dedupe_key == dedupe_key)
.order_by(TopicItem.id.desc())
.limit(1)
).first()
if existing and existing.created_at:
age_s = (datetime.utcnow() - existing.created_at).total_seconds()
if age_s <= TOPIC_DEDUPE_WINDOW_SECONDS:
return {
"published": False,
"deduped": True,
"dedupe_window_seconds": TOPIC_DEDUPE_WINDOW_SECONDS,
"topic_key": _normalize_topic_key(existing.topic_key),
"reason": "dedupe_key hit within dedupe window",
"item": _topic_item_to_dict(existing),
}
view = payload.get("view")
view_json = json.dumps(view, ensure_ascii=False) if isinstance(view, dict) else None
source = str(payload.get("source") or "mcp").strip().lower() or "mcp"
now = datetime.utcnow()
item = TopicItem(
bot_id=bot_id,
topic_key=topic_key,
title=title[:2000],
content=content[:20000],
level=level,
tags_json=json.dumps(tags_rows, ensure_ascii=False) if tags_rows else None,
view_json=view_json,
source=source[:64],
dedupe_key=dedupe_key[:200] if dedupe_key else None,
is_read=False,
created_at=now,
)
session.add(item)
session.commit()
session.refresh(item)
return {
"published": True,
"topic_key": topic_key,
"item": _topic_item_to_dict(item),
"route": route_result,
}
def _jsonrpc_success(rpc_id: Any, result: Any) -> Dict[str, Any]:
return {
"jsonrpc": "2.0",
"id": rpc_id,
"result": result,
}
def _jsonrpc_error(rpc_id: Any, code: int, message: str, data: Any = None) -> Dict[str, Any]:
payload: Dict[str, Any] = {
"jsonrpc": "2.0",
"id": rpc_id,
"error": {
"code": int(code),
"message": str(message or "unknown error"),
},
}
if data is not None:
payload["error"]["data"] = data
return payload
def _mcp_tool_result(structured: Dict[str, Any], is_error: bool = False) -> Dict[str, Any]:
return {
"content": [
{
"type": "text",
"text": json.dumps(structured, ensure_ascii=False),
}
],
"structuredContent": structured,
"isError": bool(is_error),
}
def _topic_mcp_tools() -> List[Dict[str, Any]]:
return [
{
"name": "topic_list_topics",
"description": "List available topics for the current bot.",
"inputSchema": {
"type": "object",
"properties": {
"include_inactive": {"type": "boolean"},
},
"additionalProperties": False,
},
},
{
"name": "topic_get_schema",
"description": "Get allowed view schema and optional topic-specific schema.",
"inputSchema": {
"type": "object",
"properties": {
"topic_key": {"type": "string"},
},
"additionalProperties": False,
},
},
{
"name": "topic_route",
"description": "Route candidate content to a topic and decide if publish is needed.",
"inputSchema": {
"type": "object",
"properties": {
"topic_key": {"type": "string"},
"title": {"type": "string"},
"content": {"type": "string"},
"tags": {"type": "array", "items": {"type": "string"}},
"is_progress": {"type": "boolean"},
"is_tool_hint": {"type": "boolean"},
"source": {"type": "string"},
},
"additionalProperties": True,
},
},
{
"name": "topic_publish",
"description": "Publish one item into topic feed with dedupe support.",
"inputSchema": {
"type": "object",
"properties": {
"topic_key": {"type": "string"},
"title": {"type": "string"},
"content": {"type": "string"},
"level": {"type": "string"},
"tags": {"type": "array", "items": {"type": "string"}},
"view": {"type": "object"},
"dedupe_key": {"type": "string"},
"source": {"type": "string"},
"is_progress": {"type": "boolean"},
"is_tool_hint": {"type": "boolean"},
},
"additionalProperties": True,
},
},
]
def _topic_mcp_list_topics(session: Session, bot_id: str, args: Dict[str, Any]) -> Dict[str, Any]:
_ensure_topic_defaults(session, bot_id)
include_inactive = _as_bool(args.get("include_inactive")) or ("include_inactive" not in args)
topics = _list_topics(session, bot_id)
if not include_inactive:
topics = [row for row in topics if bool(row.get("is_active"))]
return {
"bot_id": bot_id,
"topics": topics,
}
def _topic_mcp_get_schema(session: Session, bot_id: str, args: Dict[str, Any]) -> Dict[str, Any]:
_ensure_topic_defaults(session, bot_id)
topic_key = _normalize_topic_key(args.get("topic_key"))
topic_payload: Optional[Dict[str, Any]] = None
if topic_key:
row = _topic_get_row(session, bot_id, topic_key)
if row:
topic_payload = _topic_to_dict(row)
return {
"version": "v1",
"view_types": ["markdown", "card", "table", "checklist", "metric", "timeline"],
"topic": topic_payload,
"view_schema": {
"type": "object",
"description": "Declarative view payload only. Scripts and unsafe HTML are not allowed.",
},
"publish_constraints": {
"level": sorted(list(TOPIC_LEVEL_SET)),
"dedupe_window_seconds": TOPIC_DEDUPE_WINDOW_SECONDS,
},
}
def _topic_mcp_route(session: Session, bot_id: str, args: Dict[str, Any]) -> Dict[str, Any]:
_ensure_topic_defaults(session, bot_id)
filter_reason = _topic_filter_reason(args)
if filter_reason:
return {
"should_publish": False,
"topic_key": None,
"confidence": 1.0,
"reason": filter_reason,
}
title = str(args.get("title") or "").strip()
content = str(args.get("content") or args.get("text") or "").strip()
if not title and not content:
return {
"should_publish": False,
"topic_key": None,
"confidence": 1.0,
"reason": "empty title/content",
}
route = _topic_route_pick(session, bot_id, args, requested_topic_key=str(args.get("topic_key") or ""))
return {
"should_publish": bool(route.get("matched")),
"topic_key": route.get("topic_key"),
"confidence": route.get("confidence"),
"reason": route.get("reason"),
}
def _topic_mcp_publish(session: Session, bot_id: str, args: Dict[str, Any]) -> Dict[str, Any]:
return _topic_publish_internal(session, bot_id, args)
def _dispatch_topic_mcp_method(session: Session, bot_id: str, method: str, params: Dict[str, Any]) -> Any:
if method == "initialize":
return {
"protocolVersion": TOPIC_MCP_PROTOCOL_VERSION,
"capabilities": {
"tools": {},
},
"serverInfo": {
"name": TOPIC_MCP_SERVER_NAME,
"version": "0.1.0",
},
}
if method in {"notifications/initialized", "initialized"}:
return None
if method == "ping":
return {}
if method == "tools/list":
return {
"tools": _topic_mcp_tools(),
}
if method != "tools/call":
raise KeyError(f"Unknown method: {method}")
tool_name = str(params.get("name") or "").strip()
arguments = params.get("arguments")
if not isinstance(arguments, dict):
arguments = {}
if tool_name == "topic_list_topics":
return _mcp_tool_result(_topic_mcp_list_topics(session, bot_id, arguments))
if tool_name == "topic_get_schema":
return _mcp_tool_result(_topic_mcp_get_schema(session, bot_id, arguments))
if tool_name == "topic_route":
return _mcp_tool_result(_topic_mcp_route(session, bot_id, arguments))
if tool_name == "topic_publish":
return _mcp_tool_result(_topic_mcp_publish(session, bot_id, arguments))
return _mcp_tool_result(
{
"error": f"unknown tool: {tool_name}",
"available_tools": [tool["name"] for tool in _topic_mcp_tools()],
},
is_error=True,
)
def _handle_topic_mcp_rpc_item(session: Session, bot_id: str, item: Any) -> Optional[Dict[str, Any]]:
if not isinstance(item, dict):
return _jsonrpc_error(None, -32600, "Invalid Request")
rpc_id = item.get("id")
method = str(item.get("method") or "").strip()
if not method:
return _jsonrpc_error(rpc_id, -32600, "Invalid Request: method is required")
params = item.get("params")
if params is None:
params = {}
if not isinstance(params, dict):
return _jsonrpc_error(rpc_id, -32602, "Invalid params")
try:
result = _dispatch_topic_mcp_method(session, bot_id, method, params)
except KeyError as exc:
return _jsonrpc_error(rpc_id, -32601, str(exc))
except ValueError as exc:
return _jsonrpc_error(rpc_id, -32602, str(exc))
except Exception as exc:
logger.exception("topic_mcp method failed: %s", method)
return _jsonrpc_error(rpc_id, -32000, f"topic_mcp execution failed: {type(exc).__name__}: {exc}")
if rpc_id is None:
return None
return _jsonrpc_success(rpc_id, result)