176 lines
8.8 KiB
Python
176 lines
8.8 KiB
Python
from typing import Any, Callable, Dict, List
|
|
|
|
from fastapi import HTTPException
|
|
from sqlmodel import Session
|
|
|
|
from models.bot import BotInstance
|
|
|
|
ReadBotConfig = Callable[[str], Dict[str, Any]]
|
|
WriteBotConfig = Callable[[str, Dict[str, Any]], None]
|
|
SyncBotWorkspace = Callable[[Session, BotInstance], None]
|
|
InvalidateBotCache = Callable[[str], None]
|
|
GetBotChannels = Callable[[BotInstance], List[Dict[str, Any]]]
|
|
NormalizeChannelExtra = Callable[[Any], Dict[str, Any]]
|
|
ChannelApiToCfg = Callable[[Dict[str, Any]], Dict[str, Any]]
|
|
ReadGlobalDeliveryFlags = Callable[[Any], tuple[bool, bool]]
|
|
|
|
|
|
class BotChannelService:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
read_bot_config: ReadBotConfig,
|
|
write_bot_config: WriteBotConfig,
|
|
sync_bot_workspace_via_provider: SyncBotWorkspace,
|
|
invalidate_bot_detail_cache: InvalidateBotCache,
|
|
get_bot_channels_from_config: GetBotChannels,
|
|
normalize_channel_extra: NormalizeChannelExtra,
|
|
channel_api_to_cfg: ChannelApiToCfg,
|
|
read_global_delivery_flags: ReadGlobalDeliveryFlags,
|
|
) -> None:
|
|
self._read_bot_config = read_bot_config
|
|
self._write_bot_config = write_bot_config
|
|
self._sync_bot_workspace_via_provider = sync_bot_workspace_via_provider
|
|
self._invalidate_bot_detail_cache = invalidate_bot_detail_cache
|
|
self._get_bot_channels_from_config = get_bot_channels_from_config
|
|
self._normalize_channel_extra = normalize_channel_extra
|
|
self._channel_api_to_cfg = channel_api_to_cfg
|
|
self._read_global_delivery_flags = read_global_delivery_flags
|
|
|
|
def _require_bot(self, *, session: Session, bot_id: str) -> BotInstance:
|
|
bot = session.get(BotInstance, bot_id)
|
|
if not bot:
|
|
raise HTTPException(status_code=404, detail="Bot not found")
|
|
return bot
|
|
|
|
def list_channels(self, *, session: Session, bot_id: str) -> List[Dict[str, Any]]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
return self._get_bot_channels_from_config(bot)
|
|
|
|
def create_channel(self, *, session: Session, bot_id: str, payload: Any) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
|
|
ctype = str(getattr(payload, "channel_type", "") or "").strip().lower()
|
|
if not ctype:
|
|
raise HTTPException(status_code=400, detail="channel_type is required")
|
|
if ctype == "dashboard":
|
|
raise HTTPException(status_code=400, detail="dashboard channel is built-in and cannot be created manually")
|
|
current_rows = self._get_bot_channels_from_config(bot)
|
|
if any(str(row.get("channel_type") or "").lower() == ctype for row in current_rows):
|
|
raise HTTPException(status_code=400, detail=f"Channel already exists: {ctype}")
|
|
|
|
new_row = {
|
|
"id": ctype,
|
|
"bot_id": bot_id,
|
|
"channel_type": ctype,
|
|
"external_app_id": str(getattr(payload, "external_app_id", "") or "").strip() or f"{ctype}-{bot_id}",
|
|
"app_secret": str(getattr(payload, "app_secret", "") or "").strip(),
|
|
"internal_port": max(1, min(int(getattr(payload, "internal_port", 8080) or 8080), 65535)),
|
|
"is_active": bool(getattr(payload, "is_active", True)),
|
|
"extra_config": self._normalize_channel_extra(getattr(payload, "extra_config", None)),
|
|
"locked": False,
|
|
}
|
|
|
|
config_data = self._read_bot_config(bot_id)
|
|
channels_cfg = config_data.get("channels")
|
|
if not isinstance(channels_cfg, dict):
|
|
channels_cfg = {}
|
|
config_data["channels"] = channels_cfg
|
|
channels_cfg[ctype] = self._channel_api_to_cfg(new_row)
|
|
self._write_bot_config(bot_id, config_data)
|
|
self._sync_bot_workspace_via_provider(session, bot)
|
|
self._invalidate_bot_detail_cache(bot_id)
|
|
return new_row
|
|
|
|
def update_channel(
|
|
self,
|
|
*,
|
|
session: Session,
|
|
bot_id: str,
|
|
channel_id: str,
|
|
payload: Any,
|
|
) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
channel_key = str(channel_id or "").strip().lower()
|
|
rows = self._get_bot_channels_from_config(bot)
|
|
row = next((r for r in rows if str(r.get("id") or "").lower() == channel_key), None)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Channel not found")
|
|
if str(row.get("channel_type") or "").strip().lower() == "dashboard" or bool(row.get("locked")):
|
|
raise HTTPException(status_code=400, detail="dashboard channel is built-in and cannot be modified")
|
|
|
|
update_data = payload.model_dump(exclude_unset=True)
|
|
existing_type = str(row.get("channel_type") or "").strip().lower()
|
|
new_type = existing_type
|
|
if "channel_type" in update_data and update_data["channel_type"] is not None:
|
|
new_type = str(update_data["channel_type"]).strip().lower()
|
|
if not new_type:
|
|
raise HTTPException(status_code=400, detail="channel_type cannot be empty")
|
|
if existing_type == "dashboard" and new_type != "dashboard":
|
|
raise HTTPException(status_code=400, detail="dashboard channel type cannot be changed")
|
|
if new_type != existing_type and any(str(r.get("channel_type") or "").lower() == new_type for r in rows):
|
|
raise HTTPException(status_code=400, detail=f"Channel already exists: {new_type}")
|
|
|
|
if "external_app_id" in update_data and update_data["external_app_id"] is not None:
|
|
row["external_app_id"] = str(update_data["external_app_id"]).strip()
|
|
if "app_secret" in update_data and update_data["app_secret"] is not None:
|
|
row["app_secret"] = str(update_data["app_secret"]).strip()
|
|
if "internal_port" in update_data and update_data["internal_port"] is not None:
|
|
row["internal_port"] = max(1, min(int(update_data["internal_port"]), 65535))
|
|
if "is_active" in update_data and update_data["is_active"] is not None:
|
|
next_active = bool(update_data["is_active"])
|
|
if existing_type == "dashboard" and not next_active:
|
|
raise HTTPException(status_code=400, detail="dashboard channel must remain enabled")
|
|
row["is_active"] = next_active
|
|
if "extra_config" in update_data:
|
|
row["extra_config"] = self._normalize_channel_extra(update_data.get("extra_config"))
|
|
row["channel_type"] = new_type
|
|
row["id"] = new_type
|
|
row["locked"] = new_type == "dashboard"
|
|
|
|
config_data = self._read_bot_config(bot_id)
|
|
channels_cfg = config_data.get("channels")
|
|
if not isinstance(channels_cfg, dict):
|
|
channels_cfg = {}
|
|
config_data["channels"] = channels_cfg
|
|
current_send_progress, current_send_tool_hints = self._read_global_delivery_flags(channels_cfg)
|
|
if new_type == "dashboard":
|
|
extra = self._normalize_channel_extra(row.get("extra_config"))
|
|
channels_cfg["sendProgress"] = bool(extra.get("sendProgress", current_send_progress))
|
|
channels_cfg["sendToolHints"] = bool(extra.get("sendToolHints", current_send_tool_hints))
|
|
else:
|
|
channels_cfg["sendProgress"] = current_send_progress
|
|
channels_cfg["sendToolHints"] = current_send_tool_hints
|
|
channels_cfg.pop("dashboard", None)
|
|
if existing_type != "dashboard" and existing_type in channels_cfg and existing_type != new_type:
|
|
channels_cfg.pop(existing_type, None)
|
|
if new_type != "dashboard":
|
|
channels_cfg[new_type] = self._channel_api_to_cfg(row)
|
|
self._write_bot_config(bot_id, config_data)
|
|
session.commit()
|
|
self._sync_bot_workspace_via_provider(session, bot)
|
|
self._invalidate_bot_detail_cache(bot_id)
|
|
return row
|
|
|
|
def delete_channel(self, *, session: Session, bot_id: str, channel_id: str) -> Dict[str, Any]:
|
|
bot = self._require_bot(session=session, bot_id=bot_id)
|
|
channel_key = str(channel_id or "").strip().lower()
|
|
rows = self._get_bot_channels_from_config(bot)
|
|
row = next((r for r in rows if str(r.get("id") or "").lower() == channel_key), None)
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Channel not found")
|
|
if str(row.get("channel_type") or "").lower() == "dashboard":
|
|
raise HTTPException(status_code=400, detail="dashboard channel cannot be deleted")
|
|
|
|
config_data = self._read_bot_config(bot_id)
|
|
channels_cfg = config_data.get("channels")
|
|
if not isinstance(channels_cfg, dict):
|
|
channels_cfg = {}
|
|
config_data["channels"] = channels_cfg
|
|
channels_cfg.pop(str(row.get("channel_type") or "").lower(), None)
|
|
self._write_bot_config(bot_id, config_data)
|
|
session.commit()
|
|
self._sync_bot_workspace_via_provider(session, bot)
|
|
self._invalidate_bot_detail_cache(bot_id)
|
|
return {"status": "deleted"}
|