from __future__ import annotations import json import os import re from typing import Any, Dict, Optional from core.utils import _calc_dir_size_bytes from core.settings import BOTS_WORKSPACE_ROOT _ENV_KEY_RE = re.compile(r"^[A-Z_][A-Z0-9_]{0,127}$") _BYTES_PER_GB = 1024 * 1024 * 1024 __all__ = [ "get_bot_data_root", "normalize_bot_env_params", "normalize_bot_resource_limits", "read_bot_config_data", "read_bot_cron_jobs_store", "read_bot_env_params", "get_bot_resource_limits", "get_bot_workspace_root", "get_bot_workspace_snapshot", "get_bot_workspace_usage_bytes", "write_bot_config_data", "write_bot_cron_jobs_store", "write_bot_env_params", "write_bot_resource_limits", "_bot_data_root", "_clear_bot_dashboard_direct_session", "_clear_bot_sessions", "_normalize_env_params", "_normalize_resource_limits", "_read_bot_config", "_read_bot_resources", "_read_cron_store", "_read_env_store", "_safe_float", "_safe_int", "_workspace_root", "_write_bot_config", "_write_bot_resources", "_write_cron_store", "_write_env_store", ] def get_bot_workspace_root(bot_id: str) -> str: return _workspace_root(bot_id) def _workspace_root(bot_id: str) -> str: return os.path.abspath(os.path.join(BOTS_WORKSPACE_ROOT, bot_id, ".nanobot", "workspace")) def get_bot_data_root(bot_id: str) -> str: return _bot_data_root(bot_id) def _bot_data_root(bot_id: str) -> str: return os.path.abspath(os.path.join(BOTS_WORKSPACE_ROOT, bot_id, ".nanobot")) def _safe_float(raw: Any, default: float) -> float: try: return float(raw) except Exception: return default def _safe_int(raw: Any, default: int) -> int: try: return int(raw) except Exception: return default def _normalize_resource_limits(cpu_cores: Any, memory_mb: Any, storage_gb: Any) -> Dict[str, Any]: cpu = _safe_float(cpu_cores, 1.0) mem = _safe_int(memory_mb, 1024) storage = _safe_int(storage_gb, 10) if cpu < 0: cpu = 1.0 if mem < 0: mem = 1024 if storage < 0: storage = 10 normalized_cpu = 0.0 if cpu == 0 else min(16.0, max(0.1, cpu)) normalized_mem = 0 if mem == 0 else min(65536, max(256, mem)) normalized_storage = 0 if storage == 0 else min(1024, max(1, storage)) return { "cpu_cores": normalized_cpu, "memory_mb": normalized_mem, "storage_gb": normalized_storage, } def normalize_bot_resource_limits(cpu_cores: Any, memory_mb: Any, storage_gb: Any) -> Dict[str, Any]: return _normalize_resource_limits(cpu_cores, memory_mb, storage_gb) def _normalize_env_params(raw: Any) -> Dict[str, str]: if not isinstance(raw, dict): return {} rows: Dict[str, str] = {} for key, value in raw.items(): normalized_key = str(key or "").strip().upper() if not normalized_key or not _ENV_KEY_RE.fullmatch(normalized_key): continue rows[normalized_key] = str(value or "").strip() return rows def normalize_bot_env_params(raw: Any) -> Dict[str, str]: return _normalize_env_params(raw) def _read_json_object(path: str) -> Dict[str, Any]: if not os.path.isfile(path): raise RuntimeError(f"Missing required JSON file: {path}") try: with open(path, "r", encoding="utf-8") as file: data = json.load(file) except Exception as exc: raise RuntimeError(f"Invalid JSON file: {path}") from exc if not isinstance(data, dict): raise RuntimeError(f"JSON file must contain an object: {path}") return data def _read_json_value(path: str) -> Any: if not os.path.isfile(path): return None try: with open(path, "r", encoding="utf-8") as file: return json.load(file) except Exception: return None def _write_json_atomic(path: str, payload: Dict[str, Any]) -> None: os.makedirs(os.path.dirname(path), exist_ok=True) tmp_path = f"{path}.tmp" with open(tmp_path, "w", encoding="utf-8") as file: json.dump(payload, file, ensure_ascii=False, indent=2) os.replace(tmp_path, path) def _config_json_path(bot_id: str) -> str: return os.path.join(_bot_data_root(bot_id), "config.json") def _read_bot_config(bot_id: str) -> Dict[str, Any]: return _read_json_object(_config_json_path(bot_id)) def read_bot_config_data(bot_id: str) -> Dict[str, Any]: return _read_bot_config(bot_id) def _write_bot_config(bot_id: str, config_data: Dict[str, Any]) -> None: _write_json_atomic(_config_json_path(bot_id), config_data) def write_bot_config_data(bot_id: str, config_data: Dict[str, Any]) -> None: _write_bot_config(bot_id, config_data) def _resources_json_path(bot_id: str) -> str: return os.path.join(_bot_data_root(bot_id), "resources.json") def _write_bot_resources(bot_id: str, cpu_cores: Any, memory_mb: Any, storage_gb: Any) -> None: normalized = _normalize_resource_limits(cpu_cores, memory_mb, storage_gb) _write_json_atomic( _resources_json_path(bot_id), { "cpuCores": normalized["cpu_cores"], "memoryMB": normalized["memory_mb"], "storageGB": normalized["storage_gb"], }, ) def write_bot_resource_limits(bot_id: str, cpu_cores: Any, memory_mb: Any, storage_gb: Any) -> None: _write_bot_resources(bot_id, cpu_cores, memory_mb, storage_gb) def _read_bot_resources(bot_id: str, config_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: _ = config_data path = _resources_json_path(bot_id) data = _read_json_object(path) return _normalize_resource_limits( data.get("cpuCores", data.get("cpu_cores")), data.get("memoryMB", data.get("memory_mb")), data.get("storageGB", data.get("storage_gb")), ) def get_bot_resource_limits(bot_id: str, config_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: return _read_bot_resources(bot_id, config_data=config_data) def get_bot_workspace_usage_bytes(bot_id: str) -> int: return _calc_dir_size_bytes(_workspace_root(bot_id)) def get_bot_workspace_snapshot(bot_id: str, config_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: resources = get_bot_resource_limits(bot_id, config_data=config_data) configured_limit_bytes = int(resources.get("storage_gb") or 0) * _BYTES_PER_GB return { "path": get_bot_workspace_root(bot_id), "usage_bytes": get_bot_workspace_usage_bytes(bot_id), "configured_limit_bytes": configured_limit_bytes if configured_limit_bytes > 0 else None, } def _env_store_path(bot_id: str) -> str: return os.path.join(_bot_data_root(bot_id), "env.json") def _read_env_store(bot_id: str) -> Dict[str, str]: return _normalize_env_params(_read_json_object(_env_store_path(bot_id))) def read_bot_env_params(bot_id: str) -> Dict[str, str]: return _read_env_store(bot_id) def _write_env_store(bot_id: str, env_params: Dict[str, str]) -> None: _write_json_atomic(_env_store_path(bot_id), _normalize_env_params(env_params)) def write_bot_env_params(bot_id: str, env_params: Dict[str, str]) -> None: _write_env_store(bot_id, env_params) def _cron_store_path(bot_id: str) -> str: return os.path.join(_workspace_root(bot_id), "cron", "jobs.json") def _normalize_cron_store_payload(raw: Any) -> Dict[str, Any]: if isinstance(raw, list): return {"version": 1, "jobs": [row for row in raw if isinstance(row, dict)]} if not isinstance(raw, dict): return {"version": 1, "jobs": []} jobs = raw.get("jobs") if isinstance(jobs, list): normalized_jobs = [row for row in jobs if isinstance(row, dict)] else: normalized_jobs = [] return { "version": _safe_int(raw.get("version"), 1), "jobs": normalized_jobs, } def _read_cron_store(bot_id: str) -> Dict[str, Any]: return _normalize_cron_store_payload(_read_json_value(_cron_store_path(bot_id))) def read_bot_cron_jobs_store(bot_id: str) -> Dict[str, Any]: return _read_cron_store(bot_id) def _write_cron_store(bot_id: str, store: Dict[str, Any]) -> None: normalized = _normalize_cron_store_payload(store) _write_json_atomic(_cron_store_path(bot_id), normalized) def write_bot_cron_jobs_store(bot_id: str, store: Dict[str, Any]) -> None: _write_cron_store(bot_id, store) def _sessions_root(bot_id: str) -> str: return os.path.join(_workspace_root(bot_id), "sessions") def _clear_bot_sessions(bot_id: str) -> int: root = _sessions_root(bot_id) if not os.path.isdir(root): return 0 deleted = 0 for name in os.listdir(root): path = os.path.join(root, name) if not os.path.isfile(path): continue if not name.lower().endswith(".jsonl"): continue try: os.remove(path) deleted += 1 except Exception: continue return deleted def _clear_bot_dashboard_direct_session(bot_id: str) -> Dict[str, Any]: root = _sessions_root(bot_id) os.makedirs(root, exist_ok=True) path = os.path.join(root, "dashboard_direct.jsonl") existed = os.path.exists(path) with open(path, "w", encoding="utf-8"): pass return {"path": path, "existed": existed}