import json import math import os import re from datetime import datetime from typing import Any, Dict from core.settings import BOTS_WORKSPACE_ROOT def utcnow() -> datetime: return datetime.utcnow() def bot_workspace_root(bot_id: str) -> str: return os.path.abspath(os.path.join(BOTS_WORKSPACE_ROOT, bot_id, ".nanobot", "workspace")) def bot_data_root(bot_id: str) -> str: return os.path.abspath(os.path.join(BOTS_WORKSPACE_ROOT, bot_id, ".nanobot")) def calc_dir_size_bytes(path: str) -> int: total = 0 if not os.path.isdir(path): return 0 for root, _, files in os.walk(path): for name in files: target = os.path.join(root, name) try: if os.path.islink(target): continue total += int(os.path.getsize(target)) except OSError: continue return total def workspace_usage_bytes(runtime: Dict[str, Any], bot_id: str) -> int: usage = dict(runtime.get("usage") or {}) value = usage.get("workspace_used_bytes") if value in {None, 0, "0", ""}: value = usage.get("container_rw_bytes") try: normalized = int(value or 0) except Exception: normalized = 0 if normalized > 0: return normalized return calc_dir_size_bytes(bot_workspace_root(bot_id)) def read_bot_resources(bot_id: str) -> Dict[str, Any]: path = os.path.join(bot_data_root(bot_id), "resources.json") raw: Dict[str, Any] = {} if os.path.isfile(path): try: with open(path, "r", encoding="utf-8") as file: loaded = json.load(file) if isinstance(loaded, dict): raw = loaded except Exception: raw = {} def _safe_float(value: Any, default: float) -> float: try: return float(value) except Exception: return default def _safe_int(value: Any, default: int) -> int: try: return int(value) except Exception: return default cpu = _safe_float(raw.get("cpuCores", raw.get("cpu_cores", 1.0)), 1.0) memory = _safe_int(raw.get("memoryMB", raw.get("memory_mb", 1024)), 1024) storage = _safe_int(raw.get("storageGB", raw.get("storage_gb", 10)), 10) cpu = 0.0 if cpu == 0 else min(16.0, max(0.1, cpu)) memory = 0 if memory == 0 else min(65536, max(256, memory)) storage = 0 if storage == 0 else min(1024, max(1, storage)) return { "cpu_cores": cpu, "memory_mb": memory, "storage_gb": storage, } def estimate_tokens(text: str) -> int: content = str(text or "").strip() if not content: return 0 pieces = re.findall(r"[\u4e00-\u9fff]|[A-Za-z0-9_]+|[^\s]", content) total = 0 for piece in pieces: if re.fullmatch(r"[\u4e00-\u9fff]", piece): total += 1 elif re.fullmatch(r"[A-Za-z0-9_]+", piece): total += max(1, math.ceil(len(piece) / 4)) else: total += 1 return max(1, total)