104 lines
3.0 KiB
Python
104 lines
3.0 KiB
Python
import json
|
|
import math
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Any, Dict
|
|
|
|
from core.settings import BOTS_WORKSPACE_ROOT
|
|
|
|
|
|
def utcnow() -> datetime:
|
|
return datetime.utcnow()
|
|
|
|
|
|
def bot_workspace_root(bot_id: str) -> str:
|
|
return os.path.abspath(os.path.join(BOTS_WORKSPACE_ROOT, bot_id, ".nanobot", "workspace"))
|
|
|
|
|
|
def bot_data_root(bot_id: str) -> str:
|
|
return os.path.abspath(os.path.join(BOTS_WORKSPACE_ROOT, bot_id, ".nanobot"))
|
|
|
|
|
|
def calc_dir_size_bytes(path: str) -> int:
|
|
total = 0
|
|
if not os.path.isdir(path):
|
|
return 0
|
|
for root, _, files in os.walk(path):
|
|
for name in files:
|
|
target = os.path.join(root, name)
|
|
try:
|
|
if os.path.islink(target):
|
|
continue
|
|
total += int(os.path.getsize(target))
|
|
except OSError:
|
|
continue
|
|
return total
|
|
|
|
|
|
def workspace_usage_bytes(runtime: Dict[str, Any], bot_id: str) -> int:
|
|
usage = dict(runtime.get("usage") or {})
|
|
value = usage.get("workspace_used_bytes")
|
|
if value in {None, 0, "0", ""}:
|
|
value = usage.get("container_rw_bytes")
|
|
try:
|
|
normalized = int(value or 0)
|
|
except Exception:
|
|
normalized = 0
|
|
if normalized > 0:
|
|
return normalized
|
|
return calc_dir_size_bytes(bot_workspace_root(bot_id))
|
|
|
|
|
|
def read_bot_resources(bot_id: str) -> Dict[str, Any]:
|
|
path = os.path.join(bot_data_root(bot_id), "resources.json")
|
|
raw: Dict[str, Any] = {}
|
|
if os.path.isfile(path):
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as file:
|
|
loaded = json.load(file)
|
|
if isinstance(loaded, dict):
|
|
raw = loaded
|
|
except Exception:
|
|
raw = {}
|
|
|
|
def _safe_float(value: Any, default: float) -> float:
|
|
try:
|
|
return float(value)
|
|
except Exception:
|
|
return default
|
|
|
|
def _safe_int(value: Any, default: int) -> int:
|
|
try:
|
|
return int(value)
|
|
except Exception:
|
|
return default
|
|
|
|
cpu = _safe_float(raw.get("cpuCores", raw.get("cpu_cores", 1.0)), 1.0)
|
|
memory = _safe_int(raw.get("memoryMB", raw.get("memory_mb", 1024)), 1024)
|
|
storage = _safe_int(raw.get("storageGB", raw.get("storage_gb", 10)), 10)
|
|
cpu = 0.0 if cpu == 0 else min(16.0, max(0.1, cpu))
|
|
memory = 0 if memory == 0 else min(65536, max(256, memory))
|
|
storage = 0 if storage == 0 else min(1024, max(1, storage))
|
|
return {
|
|
"cpu_cores": cpu,
|
|
"memory_mb": memory,
|
|
"storage_gb": storage,
|
|
}
|
|
|
|
|
|
def estimate_tokens(text: str) -> int:
|
|
content = str(text or "").strip()
|
|
if not content:
|
|
return 0
|
|
pieces = re.findall(r"[\u4e00-\u9fff]|[A-Za-z0-9_]+|[^\s]", content)
|
|
total = 0
|
|
for piece in pieces:
|
|
if re.fullmatch(r"[\u4e00-\u9fff]", piece):
|
|
total += 1
|
|
elif re.fullmatch(r"[A-Za-z0-9_]+", piece):
|
|
total += max(1, math.ceil(len(piece) / 4))
|
|
else:
|
|
total += 1
|
|
return max(1, total)
|