dashboard-nanobot/backend/api/bot_runtime_router.py

254 lines
8.8 KiB
Python

import logging
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from zoneinfo import ZoneInfo
from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect
from sqlmodel import Session
from core.database import engine, get_session
from core.docker_instance import docker_manager
from core.settings import BOTS_WORKSPACE_ROOT
from core.websocket_manager import manager
from models.bot import BotInstance
from services.bot_channel_service import _get_bot_channels_from_config
from services.bot_lifecycle_service import start_bot_instance, stop_bot_instance
from services.bot_storage_service import _read_bot_config, _write_bot_config
from services.bot_storage_service import _read_cron_store, _write_cron_store
from services.runtime_service import docker_callback
router = APIRouter()
logger = logging.getLogger("dashboard.backend")
def _now_ms() -> int:
return int(time.time() * 1000)
def _compute_cron_next_run(schedule: Dict[str, Any], now_ms: Optional[int] = None) -> Optional[int]:
current_ms = int(now_ms or _now_ms())
kind = str(schedule.get("kind") or "").strip().lower()
if kind == "at":
at_ms = int(schedule.get("atMs") or 0)
return at_ms if at_ms > current_ms else None
if kind == "every":
every_ms = int(schedule.get("everyMs") or 0)
return current_ms + every_ms if every_ms > 0 else None
if kind == "cron":
expr = str(schedule.get("expr") or "").strip()
if not expr:
return None
try:
from croniter import croniter
tz_name = str(schedule.get("tz") or "").strip()
tz = ZoneInfo(tz_name) if tz_name else datetime.now().astimezone().tzinfo
base_dt = datetime.fromtimestamp(current_ms / 1000, tz=tz)
next_dt = croniter(expr, base_dt).get_next(datetime)
return int(next_dt.timestamp() * 1000)
except Exception:
return None
return None
def _get_bot_or_404(session: Session, bot_id: str) -> BotInstance:
bot = session.get(BotInstance, bot_id)
if not bot:
raise HTTPException(status_code=404, detail="Bot not found")
return bot
def _weixin_state_file_path(bot_id: str) -> Path:
return Path(BOTS_WORKSPACE_ROOT) / bot_id / ".nanobot" / "weixin" / "account.json"
@router.get("/api/bots/{bot_id}/logs")
def get_bot_logs(
bot_id: str,
tail: Optional[int] = 300,
offset: int = 0,
limit: Optional[int] = None,
reverse: bool = False,
session: Session = Depends(get_session),
):
_get_bot_or_404(session, bot_id)
if limit is not None:
page = docker_manager.get_logs_page(
bot_id,
offset=max(0, int(offset)),
limit=max(1, int(limit)),
reverse=bool(reverse),
)
return {"bot_id": bot_id, **page}
effective_tail = max(1, int(tail or 300))
return {"bot_id": bot_id, "logs": docker_manager.get_recent_logs(bot_id, tail=effective_tail)}
@router.post("/api/bots/{bot_id}/weixin/relogin")
async def relogin_weixin(bot_id: str, session: Session = Depends(get_session)):
bot = _get_bot_or_404(session, bot_id)
weixin_channel = next(
(
row
for row in _get_bot_channels_from_config(bot)
if str(row.get("channel_type") or "").strip().lower() == "weixin"
),
None,
)
if not weixin_channel:
raise HTTPException(status_code=404, detail="Weixin channel not found")
state_file = _weixin_state_file_path(bot_id)
removed = False
try:
if state_file.is_file():
state_file.unlink()
removed = True
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Failed to remove weixin state: {exc}") from exc
config_data = _read_bot_config(bot_id)
channels_cfg = config_data.get("channels") if isinstance(config_data, dict) else {}
weixin_cfg = channels_cfg.get("weixin") if isinstance(channels_cfg, dict) else None
if isinstance(weixin_cfg, dict) and "token" in weixin_cfg:
weixin_cfg.pop("token", None)
_write_bot_config(bot_id, config_data)
restarted = False
if str(bot.docker_status or "").upper() == "RUNNING":
stop_bot_instance(session, bot_id)
await start_bot_instance(session, bot_id)
restarted = True
return {
"status": "relogin_started",
"bot_id": bot_id,
"removed_state": removed,
"restarted": restarted,
}
@router.get("/api/bots/{bot_id}/cron/jobs")
def list_cron_jobs(bot_id: str, include_disabled: bool = True, session: Session = Depends(get_session)):
_get_bot_or_404(session, bot_id)
store = _read_cron_store(bot_id)
rows = []
for row in store.get("jobs", []):
if not isinstance(row, dict):
continue
enabled = bool(row.get("enabled", True))
if not include_disabled and not enabled:
continue
rows.append(row)
rows.sort(key=lambda value: int(((value.get("state") or {}).get("nextRunAtMs")) or 2**62))
return {"bot_id": bot_id, "version": int(store.get("version", 1) or 1), "jobs": rows}
@router.post("/api/bots/{bot_id}/cron/jobs/{job_id}/stop")
def stop_cron_job(bot_id: str, job_id: str, session: Session = Depends(get_session)):
_get_bot_or_404(session, bot_id)
store = _read_cron_store(bot_id)
jobs = store.get("jobs", [])
if not isinstance(jobs, list):
jobs = []
found = None
for row in jobs:
if isinstance(row, dict) and str(row.get("id")) == job_id:
found = row
break
if not found:
raise HTTPException(status_code=404, detail="Cron job not found")
found["enabled"] = False
found["updatedAtMs"] = _now_ms()
state = found.get("state")
if not isinstance(state, dict):
state = {}
found["state"] = state
state["nextRunAtMs"] = None
_write_cron_store(bot_id, {"version": int(store.get("version", 1) or 1), "jobs": jobs})
return {"status": "stopped", "job_id": job_id}
@router.post("/api/bots/{bot_id}/cron/jobs/{job_id}/start")
def start_cron_job(bot_id: str, job_id: str, session: Session = Depends(get_session)):
_get_bot_or_404(session, bot_id)
store = _read_cron_store(bot_id)
jobs = store.get("jobs", [])
if not isinstance(jobs, list):
jobs = []
found = None
for row in jobs:
if isinstance(row, dict) and str(row.get("id")) == job_id:
found = row
break
if not found:
raise HTTPException(status_code=404, detail="Cron job not found")
found["enabled"] = True
found["updatedAtMs"] = _now_ms()
state = found.get("state")
if not isinstance(state, dict):
state = {}
found["state"] = state
schedule = found.get("schedule")
state["nextRunAtMs"] = _compute_cron_next_run(schedule if isinstance(schedule, dict) else {})
_write_cron_store(bot_id, {"version": int(store.get("version", 1) or 1), "jobs": jobs})
return {"status": "started", "job_id": job_id}
@router.delete("/api/bots/{bot_id}/cron/jobs/{job_id}")
def delete_cron_job(bot_id: str, job_id: str, session: Session = Depends(get_session)):
_get_bot_or_404(session, bot_id)
store = _read_cron_store(bot_id)
jobs = store.get("jobs", [])
if not isinstance(jobs, list):
jobs = []
kept = [row for row in jobs if not (isinstance(row, dict) and str(row.get("id")) == job_id)]
if len(kept) == len(jobs):
raise HTTPException(status_code=404, detail="Cron job not found")
_write_cron_store(bot_id, {"version": int(store.get("version", 1) or 1), "jobs": kept})
return {"status": "deleted", "job_id": job_id}
@router.websocket("/ws/monitor/{bot_id}")
async def websocket_endpoint(websocket: WebSocket, bot_id: str):
with Session(engine) as session:
bot = session.get(BotInstance, bot_id)
if not bot:
await websocket.close(code=4404, reason="Bot not found")
return
connected = False
try:
await manager.connect(bot_id, websocket)
connected = True
except Exception as exc:
logger.warning("websocket connect failed bot_id=%s detail=%s", bot_id, exc)
try:
await websocket.close(code=1011, reason="WebSocket accept failed")
except Exception:
pass
return
docker_manager.ensure_monitor(bot_id, docker_callback)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
pass
except RuntimeError as exc:
msg = str(exc or "").lower()
if "need to call \"accept\" first" not in msg and "not connected" not in msg:
logger.exception("websocket runtime error bot_id=%s", bot_id)
except Exception:
logger.exception("websocket unexpected error bot_id=%s", bot_id)
finally:
if connected:
manager.disconnect(bot_id, websocket)