dashboard-nanobot/backend/services/bot_channel_service.py

176 lines
8.8 KiB
Python

from typing import Any, Callable, Dict, List
from fastapi import HTTPException
from sqlmodel import Session
from models.bot import BotInstance
ReadBotConfig = Callable[[str], Dict[str, Any]]
WriteBotConfig = Callable[[str, Dict[str, Any]], None]
SyncBotWorkspace = Callable[[Session, BotInstance], None]
InvalidateBotCache = Callable[[str], None]
GetBotChannels = Callable[[BotInstance], List[Dict[str, Any]]]
NormalizeChannelExtra = Callable[[Any], Dict[str, Any]]
ChannelApiToCfg = Callable[[Dict[str, Any]], Dict[str, Any]]
ReadGlobalDeliveryFlags = Callable[[Any], tuple[bool, bool]]
class BotChannelService:
def __init__(
self,
*,
read_bot_config: ReadBotConfig,
write_bot_config: WriteBotConfig,
sync_bot_workspace_via_provider: SyncBotWorkspace,
invalidate_bot_detail_cache: InvalidateBotCache,
get_bot_channels_from_config: GetBotChannels,
normalize_channel_extra: NormalizeChannelExtra,
channel_api_to_cfg: ChannelApiToCfg,
read_global_delivery_flags: ReadGlobalDeliveryFlags,
) -> None:
self._read_bot_config = read_bot_config
self._write_bot_config = write_bot_config
self._sync_bot_workspace_via_provider = sync_bot_workspace_via_provider
self._invalidate_bot_detail_cache = invalidate_bot_detail_cache
self._get_bot_channels_from_config = get_bot_channels_from_config
self._normalize_channel_extra = normalize_channel_extra
self._channel_api_to_cfg = channel_api_to_cfg
self._read_global_delivery_flags = read_global_delivery_flags
def _require_bot(self, *, session: Session, bot_id: str) -> BotInstance:
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
return bot
def list_channels(self, *, session: Session, bot_id: str) -> List[Dict[str, Any]]:
bot = self._require_bot(session=session, bot_id=bot_id)
return self._get_bot_channels_from_config(bot)
def create_channel(self, *, session: Session, bot_id: str, payload: Any) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
ctype = str(getattr(payload, "channel_type", "") or "").strip().lower()
if not ctype:
raise HTTPException(status_code=400, detail="channel_type is required")
if ctype == "dashboard":
raise HTTPException(status_code=400, detail="dashboard channel is built-in and cannot be created manually")
current_rows = self._get_bot_channels_from_config(bot)
if any(str(row.get("channel_type") or "").lower() == ctype for row in current_rows):
raise HTTPException(status_code=400, detail=f"Channel already exists: {ctype}")
new_row = {
"id": ctype,
"bot_id": bot_id,
"channel_type": ctype,
"external_app_id": str(getattr(payload, "external_app_id", "") or "").strip() or f"{ctype}-{bot_id}",
"app_secret": str(getattr(payload, "app_secret", "") or "").strip(),
"internal_port": max(1, min(int(getattr(payload, "internal_port", 8080) or 8080), 65535)),
"is_active": bool(getattr(payload, "is_active", True)),
"extra_config": self._normalize_channel_extra(getattr(payload, "extra_config", None)),
"locked": False,
}
config_data = self._read_bot_config(bot_id)
channels_cfg = config_data.get("channels")
if not isinstance(channels_cfg, dict):
channels_cfg = {}
config_data["channels"] = channels_cfg
channels_cfg[ctype] = self._channel_api_to_cfg(new_row)
self._write_bot_config(bot_id, config_data)
self._sync_bot_workspace_via_provider(session, bot)
self._invalidate_bot_detail_cache(bot_id)
return new_row
def update_channel(
self,
*,
session: Session,
bot_id: str,
channel_id: str,
payload: Any,
) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
channel_key = str(channel_id or "").strip().lower()
rows = self._get_bot_channels_from_config(bot)
row = next((r for r in rows if str(r.get("id") or "").lower() == channel_key), None)
if not row:
raise HTTPException(status_code=404, detail="Channel not found")
if str(row.get("channel_type") or "").strip().lower() == "dashboard" or bool(row.get("locked")):
raise HTTPException(status_code=400, detail="dashboard channel is built-in and cannot be modified")
update_data = payload.model_dump(exclude_unset=True)
existing_type = str(row.get("channel_type") or "").strip().lower()
new_type = existing_type
if "channel_type" in update_data and update_data["channel_type"] is not None:
new_type = str(update_data["channel_type"]).strip().lower()
if not new_type:
raise HTTPException(status_code=400, detail="channel_type cannot be empty")
if existing_type == "dashboard" and new_type != "dashboard":
raise HTTPException(status_code=400, detail="dashboard channel type cannot be changed")
if new_type != existing_type and any(str(r.get("channel_type") or "").lower() == new_type for r in rows):
raise HTTPException(status_code=400, detail=f"Channel already exists: {new_type}")
if "external_app_id" in update_data and update_data["external_app_id"] is not None:
row["external_app_id"] = str(update_data["external_app_id"]).strip()
if "app_secret" in update_data and update_data["app_secret"] is not None:
row["app_secret"] = str(update_data["app_secret"]).strip()
if "internal_port" in update_data and update_data["internal_port"] is not None:
row["internal_port"] = max(1, min(int(update_data["internal_port"]), 65535))
if "is_active" in update_data and update_data["is_active"] is not None:
next_active = bool(update_data["is_active"])
if existing_type == "dashboard" and not next_active:
raise HTTPException(status_code=400, detail="dashboard channel must remain enabled")
row["is_active"] = next_active
if "extra_config" in update_data:
row["extra_config"] = self._normalize_channel_extra(update_data.get("extra_config"))
row["channel_type"] = new_type
row["id"] = new_type
row["locked"] = new_type == "dashboard"
config_data = self._read_bot_config(bot_id)
channels_cfg = config_data.get("channels")
if not isinstance(channels_cfg, dict):
channels_cfg = {}
config_data["channels"] = channels_cfg
current_send_progress, current_send_tool_hints = self._read_global_delivery_flags(channels_cfg)
if new_type == "dashboard":
extra = self._normalize_channel_extra(row.get("extra_config"))
channels_cfg["sendProgress"] = bool(extra.get("sendProgress", current_send_progress))
channels_cfg["sendToolHints"] = bool(extra.get("sendToolHints", current_send_tool_hints))
else:
channels_cfg["sendProgress"] = current_send_progress
channels_cfg["sendToolHints"] = current_send_tool_hints
channels_cfg.pop("dashboard", None)
if existing_type != "dashboard" and existing_type in channels_cfg and existing_type != new_type:
channels_cfg.pop(existing_type, None)
if new_type != "dashboard":
channels_cfg[new_type] = self._channel_api_to_cfg(row)
self._write_bot_config(bot_id, config_data)
session.commit()
self._sync_bot_workspace_via_provider(session, bot)
self._invalidate_bot_detail_cache(bot_id)
return row
def delete_channel(self, *, session: Session, bot_id: str, channel_id: str) -> Dict[str, Any]:
bot = self._require_bot(session=session, bot_id=bot_id)
channel_key = str(channel_id or "").strip().lower()
rows = self._get_bot_channels_from_config(bot)
row = next((r for r in rows if str(r.get("id") or "").lower() == channel_key), None)
if not row:
raise HTTPException(status_code=404, detail="Channel not found")
if str(row.get("channel_type") or "").lower() == "dashboard":
raise HTTPException(status_code=400, detail="dashboard channel cannot be deleted")
config_data = self._read_bot_config(bot_id)
channels_cfg = config_data.get("channels")
if not isinstance(channels_cfg, dict):
channels_cfg = {}
config_data["channels"] = channels_cfg
channels_cfg.pop(str(row.get("channel_type") or "").lower(), None)
self._write_bot_config(bot_id, config_data)
session.commit()
self._sync_bot_workspace_via_provider(session, bot)
self._invalidate_bot_detail_cache(bot_id)
return {"status": "deleted"}