from typing import Any, Callable, Dict, List from fastapi import HTTPException from sqlmodel import Session from models.bot import BotInstance ReadBotConfig = Callable[[str], Dict[str, Any]] WriteBotConfig = Callable[[str, Dict[str, Any]], None] SyncBotWorkspace = Callable[[Session, BotInstance], None] InvalidateBotCache = Callable[[str], None] GetBotChannels = Callable[[BotInstance], List[Dict[str, Any]]] NormalizeChannelExtra = Callable[[Any], Dict[str, Any]] ChannelApiToCfg = Callable[[Dict[str, Any]], Dict[str, Any]] ReadGlobalDeliveryFlags = Callable[[Any], tuple[bool, bool]] class BotChannelService: def __init__( self, *, read_bot_config: ReadBotConfig, write_bot_config: WriteBotConfig, sync_bot_workspace_via_provider: SyncBotWorkspace, invalidate_bot_detail_cache: InvalidateBotCache, get_bot_channels_from_config: GetBotChannels, normalize_channel_extra: NormalizeChannelExtra, channel_api_to_cfg: ChannelApiToCfg, read_global_delivery_flags: ReadGlobalDeliveryFlags, ) -> None: self._read_bot_config = read_bot_config self._write_bot_config = write_bot_config self._sync_bot_workspace_via_provider = sync_bot_workspace_via_provider self._invalidate_bot_detail_cache = invalidate_bot_detail_cache self._get_bot_channels_from_config = get_bot_channels_from_config self._normalize_channel_extra = normalize_channel_extra self._channel_api_to_cfg = channel_api_to_cfg self._read_global_delivery_flags = read_global_delivery_flags def _require_bot(self, *, session: Session, bot_id: str) -> BotInstance: bot = session.get(BotInstance, bot_id) if not bot: raise HTTPException(status_code=404, detail="Bot not found") return bot def list_channels(self, *, session: Session, bot_id: str) -> List[Dict[str, Any]]: bot = self._require_bot(session=session, bot_id=bot_id) return self._get_bot_channels_from_config(bot) def create_channel(self, *, session: Session, bot_id: str, payload: Any) -> Dict[str, Any]: bot = self._require_bot(session=session, bot_id=bot_id) ctype = str(getattr(payload, "channel_type", "") or "").strip().lower() if not ctype: raise HTTPException(status_code=400, detail="channel_type is required") if ctype == "dashboard": raise HTTPException(status_code=400, detail="dashboard channel is built-in and cannot be created manually") current_rows = self._get_bot_channels_from_config(bot) if any(str(row.get("channel_type") or "").lower() == ctype for row in current_rows): raise HTTPException(status_code=400, detail=f"Channel already exists: {ctype}") new_row = { "id": ctype, "bot_id": bot_id, "channel_type": ctype, "external_app_id": str(getattr(payload, "external_app_id", "") or "").strip() or f"{ctype}-{bot_id}", "app_secret": str(getattr(payload, "app_secret", "") or "").strip(), "internal_port": max(1, min(int(getattr(payload, "internal_port", 8080) or 8080), 65535)), "is_active": bool(getattr(payload, "is_active", True)), "extra_config": self._normalize_channel_extra(getattr(payload, "extra_config", None)), "locked": False, } config_data = self._read_bot_config(bot_id) channels_cfg = config_data.get("channels") if not isinstance(channels_cfg, dict): channels_cfg = {} config_data["channels"] = channels_cfg channels_cfg[ctype] = self._channel_api_to_cfg(new_row) self._write_bot_config(bot_id, config_data) self._sync_bot_workspace_via_provider(session, bot) self._invalidate_bot_detail_cache(bot_id) return new_row def update_channel( self, *, session: Session, bot_id: str, channel_id: str, payload: Any, ) -> Dict[str, Any]: bot = self._require_bot(session=session, bot_id=bot_id) channel_key = str(channel_id or "").strip().lower() rows = self._get_bot_channels_from_config(bot) row = next((r for r in rows if str(r.get("id") or "").lower() == channel_key), None) if not row: raise HTTPException(status_code=404, detail="Channel not found") if str(row.get("channel_type") or "").strip().lower() == "dashboard" or bool(row.get("locked")): raise HTTPException(status_code=400, detail="dashboard channel is built-in and cannot be modified") update_data = payload.model_dump(exclude_unset=True) existing_type = str(row.get("channel_type") or "").strip().lower() new_type = existing_type if "channel_type" in update_data and update_data["channel_type"] is not None: new_type = str(update_data["channel_type"]).strip().lower() if not new_type: raise HTTPException(status_code=400, detail="channel_type cannot be empty") if existing_type == "dashboard" and new_type != "dashboard": raise HTTPException(status_code=400, detail="dashboard channel type cannot be changed") if new_type != existing_type and any(str(r.get("channel_type") or "").lower() == new_type for r in rows): raise HTTPException(status_code=400, detail=f"Channel already exists: {new_type}") if "external_app_id" in update_data and update_data["external_app_id"] is not None: row["external_app_id"] = str(update_data["external_app_id"]).strip() if "app_secret" in update_data and update_data["app_secret"] is not None: row["app_secret"] = str(update_data["app_secret"]).strip() if "internal_port" in update_data and update_data["internal_port"] is not None: row["internal_port"] = max(1, min(int(update_data["internal_port"]), 65535)) if "is_active" in update_data and update_data["is_active"] is not None: next_active = bool(update_data["is_active"]) if existing_type == "dashboard" and not next_active: raise HTTPException(status_code=400, detail="dashboard channel must remain enabled") row["is_active"] = next_active if "extra_config" in update_data: row["extra_config"] = self._normalize_channel_extra(update_data.get("extra_config")) row["channel_type"] = new_type row["id"] = new_type row["locked"] = new_type == "dashboard" config_data = self._read_bot_config(bot_id) channels_cfg = config_data.get("channels") if not isinstance(channels_cfg, dict): channels_cfg = {} config_data["channels"] = channels_cfg current_send_progress, current_send_tool_hints = self._read_global_delivery_flags(channels_cfg) if new_type == "dashboard": extra = self._normalize_channel_extra(row.get("extra_config")) channels_cfg["sendProgress"] = bool(extra.get("sendProgress", current_send_progress)) channels_cfg["sendToolHints"] = bool(extra.get("sendToolHints", current_send_tool_hints)) else: channels_cfg["sendProgress"] = current_send_progress channels_cfg["sendToolHints"] = current_send_tool_hints channels_cfg.pop("dashboard", None) if existing_type != "dashboard" and existing_type in channels_cfg and existing_type != new_type: channels_cfg.pop(existing_type, None) if new_type != "dashboard": channels_cfg[new_type] = self._channel_api_to_cfg(row) self._write_bot_config(bot_id, config_data) session.commit() self._sync_bot_workspace_via_provider(session, bot) self._invalidate_bot_detail_cache(bot_id) return row def delete_channel(self, *, session: Session, bot_id: str, channel_id: str) -> Dict[str, Any]: bot = self._require_bot(session=session, bot_id=bot_id) channel_key = str(channel_id or "").strip().lower() rows = self._get_bot_channels_from_config(bot) row = next((r for r in rows if str(r.get("id") or "").lower() == channel_key), None) if not row: raise HTTPException(status_code=404, detail="Channel not found") if str(row.get("channel_type") or "").lower() == "dashboard": raise HTTPException(status_code=400, detail="dashboard channel cannot be deleted") config_data = self._read_bot_config(bot_id) channels_cfg = config_data.get("channels") if not isinstance(channels_cfg, dict): channels_cfg = {} config_data["channels"] = channels_cfg channels_cfg.pop(str(row.get("channel_type") or "").lower(), None) self._write_bot_config(bot_id, config_data) session.commit() self._sync_bot_workspace_via_provider(session, bot) self._invalidate_bot_detail_cache(bot_id) return {"status": "deleted"}