from __future__ import annotations import time from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional from zoneinfo import ZoneInfo from fastapi import WebSocket from sqlmodel import Session from core.docker_instance import docker_manager from core.settings import BOTS_WORKSPACE_ROOT from models.bot import BotInstance from services.bot_lifecycle_service import start_bot_instance, stop_bot_instance from services.bot_service import list_bot_channels_from_config from services.bot_storage_service import ( read_bot_config_data, read_bot_cron_jobs_store, write_bot_config_data, write_bot_cron_jobs_store, ) from services.platform_auth_service import resolve_bot_websocket_auth, resolve_panel_websocket_auth def _now_ms() -> int: return int(time.time() * 1000) def _get_bot_or_raise(session: Session, bot_id: str) -> BotInstance: bot = session.get(BotInstance, bot_id) if not bot: raise LookupError("Bot not found") return bot def _weixin_state_file_path(bot_id: str) -> Path: return Path(BOTS_WORKSPACE_ROOT) / bot_id / ".nanobot" / "weixin" / "account.json" def _compute_cron_next_run(schedule: Dict[str, Any], now_ms: Optional[int] = None) -> Optional[int]: current_ms = int(now_ms or _now_ms()) kind = str(schedule.get("kind") or "").strip().lower() if kind == "at": at_ms = int(schedule.get("atMs") or 0) return at_ms if at_ms > current_ms else None if kind == "every": every_ms = int(schedule.get("everyMs") or 0) return current_ms + every_ms if every_ms > 0 else None if kind == "cron": expr = str(schedule.get("expr") or "").strip() if not expr: return None try: from croniter import croniter tz_name = str(schedule.get("tz") or "").strip() tz = ZoneInfo(tz_name) if tz_name else datetime.now().astimezone().tzinfo base_dt = datetime.fromtimestamp(current_ms / 1000, tz=tz) next_dt = croniter(expr, base_dt).get_next(datetime) return int(next_dt.timestamp() * 1000) except Exception: return None return None def get_bot_logs( session: Session, *, bot_id: str, tail: Optional[int] = 300, offset: int = 0, limit: Optional[int] = None, reverse: bool = False, ) -> Dict[str, Any]: _get_bot_or_raise(session, bot_id) if limit is not None: page = docker_manager.get_logs_page( bot_id, offset=max(0, int(offset)), limit=max(1, int(limit)), reverse=bool(reverse), fresh_client=True, ) return {"bot_id": bot_id, **page} effective_tail = max(1, int(tail or 300)) return { "bot_id": bot_id, "logs": docker_manager.get_recent_logs(bot_id, tail=effective_tail, fresh_client=True), } async def relogin_weixin(session: Session, *, bot_id: str) -> Dict[str, Any]: bot = _get_bot_or_raise(session, bot_id) weixin_channel = next( ( row for row in list_bot_channels_from_config(bot) if str(row.get("channel_type") or "").strip().lower() == "weixin" ), None, ) if not weixin_channel: raise ValueError("Weixin channel not found") state_file = _weixin_state_file_path(bot_id) removed = False try: if state_file.is_file(): state_file.unlink() removed = True except Exception as exc: raise RuntimeError(f"Failed to remove weixin state: {exc}") from exc config_data = read_bot_config_data(bot_id) channels_cfg = config_data.get("channels") if isinstance(config_data, dict) else {} weixin_cfg = channels_cfg.get("weixin") if isinstance(channels_cfg, dict) else None if isinstance(weixin_cfg, dict) and "token" in weixin_cfg: weixin_cfg.pop("token", None) write_bot_config_data(bot_id, config_data) restarted = False if str(bot.docker_status or "").upper() == "RUNNING": stop_bot_instance(session, bot_id) await start_bot_instance(session, bot_id) restarted = True return { "status": "relogin_started", "bot_id": bot_id, "removed_state": removed, "restarted": restarted, } def list_cron_jobs(session: Session, *, bot_id: str, include_disabled: bool = True) -> Dict[str, Any]: _get_bot_or_raise(session, bot_id) store = read_bot_cron_jobs_store(bot_id) rows = [] for row in store.get("jobs", []): if not isinstance(row, dict): continue enabled = bool(row.get("enabled", True)) if not include_disabled and not enabled: continue rows.append(row) rows.sort(key=lambda value: int(((value.get("state") or {}).get("nextRunAtMs")) or 2**62)) return {"bot_id": bot_id, "version": int(store.get("version", 1) or 1), "jobs": rows} def stop_cron_job(session: Session, *, bot_id: str, job_id: str) -> Dict[str, Any]: _get_bot_or_raise(session, bot_id) store = read_bot_cron_jobs_store(bot_id) jobs = store.get("jobs", []) if not isinstance(jobs, list): jobs = [] found = next((row for row in jobs if isinstance(row, dict) and str(row.get("id")) == job_id), None) if not found: raise LookupError("Cron job not found") found["enabled"] = False found["updatedAtMs"] = _now_ms() state = found.get("state") if not isinstance(state, dict): state = {} found["state"] = state state["nextRunAtMs"] = None write_bot_cron_jobs_store(bot_id, {"version": int(store.get("version", 1) or 1), "jobs": jobs}) return {"status": "stopped", "job_id": job_id} def start_cron_job(session: Session, *, bot_id: str, job_id: str) -> Dict[str, Any]: _get_bot_or_raise(session, bot_id) store = read_bot_cron_jobs_store(bot_id) jobs = store.get("jobs", []) if not isinstance(jobs, list): jobs = [] found = next((row for row in jobs if isinstance(row, dict) and str(row.get("id")) == job_id), None) if not found: raise LookupError("Cron job not found") found["enabled"] = True found["updatedAtMs"] = _now_ms() state = found.get("state") if not isinstance(state, dict): state = {} found["state"] = state schedule = found.get("schedule") state["nextRunAtMs"] = _compute_cron_next_run(schedule if isinstance(schedule, dict) else {}) write_bot_cron_jobs_store(bot_id, {"version": int(store.get("version", 1) or 1), "jobs": jobs}) return {"status": "started", "job_id": job_id} def delete_cron_job(session: Session, *, bot_id: str, job_id: str) -> Dict[str, Any]: _get_bot_or_raise(session, bot_id) store = read_bot_cron_jobs_store(bot_id) jobs = store.get("jobs", []) if not isinstance(jobs, list): jobs = [] kept = [row for row in jobs if not (isinstance(row, dict) and str(row.get("id")) == job_id)] if len(kept) == len(jobs): raise LookupError("Cron job not found") write_bot_cron_jobs_store(bot_id, {"version": int(store.get("version", 1) or 1), "jobs": kept}) return {"status": "deleted", "job_id": job_id} def ensure_monitor_websocket_access(session: Session, websocket: WebSocket, bot_id: str) -> BotInstance: principal = resolve_panel_websocket_auth(session, websocket) if not principal.authenticated: principal = resolve_bot_websocket_auth(session, websocket, bot_id) if not principal.authenticated: raise PermissionError("Bot or panel authentication required") return _get_bot_or_raise(session, bot_id)