dashboard-nanobot/backend/core/utils.py

161 lines
5.0 KiB
Python
Raw Normal View History

2026-03-31 04:31:47 +00:00
import os
import re
import json
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, List, Optional
from zoneinfo import ZoneInfo
from fastapi import HTTPException
from core.settings import DEFAULT_BOT_SYSTEM_TIMEZONE
_ENV_KEY_RE = re.compile(r"^[A-Z_][A-Z0-9_]{0,127}$")
__all__ = [
"_calc_dir_size_bytes",
"_get_default_system_timezone",
"_is_ignored_skill_zip_top_level",
"_is_image_attachment_path",
"_is_valid_top_level_skill_name",
"_is_video_attachment_path",
"_is_visual_attachment_path",
"_normalize_env_params",
"_normalize_system_timezone",
"_parse_env_params",
"_parse_json_string_list",
"_read_description_from_text",
"_resolve_local_day_range",
"_safe_float",
"_safe_int",
"_sanitize_skill_market_key",
"_sanitize_zip_filename",
"_workspace_stat_ctime_iso",
]
def _resolve_local_day_range(date_text: str, tz_offset_minutes: Optional[int]) -> tuple[datetime, datetime]:
try:
local_day = datetime.strptime(str(date_text or "").strip(), "%Y-%m-%d")
except ValueError as exc:
raise HTTPException(status_code=400, detail="Invalid date, expected YYYY-MM-DD") from exc
offset = timedelta(minutes=tz_offset_minutes if tz_offset_minutes is not None else 0)
utc_start = (local_day).replace(tzinfo=timezone.utc) + offset
utc_end = utc_start + timedelta(days=1)
return utc_start, utc_end
def _sanitize_zip_filename(name: str) -> str:
s = str(name or "").strip()
s = re.sub(r"[^a-zA-Z0-9._-]", "_", s)
return s if s else "upload.zip"
def _normalize_env_params(raw: Any) -> Dict[str, str]:
if not isinstance(raw, dict):
return {}
res: Dict[str, str] = {}
for k, v in raw.items():
ks = str(k).strip()
if _ENV_KEY_RE.match(ks):
res[ks] = str(v or "").strip()
return res
def _get_default_system_timezone() -> str:
return str(DEFAULT_BOT_SYSTEM_TIMEZONE or "Asia/Shanghai").strip()
def _normalize_system_timezone(raw: Any) -> str:
s = str(raw or "").strip()
if not s:
return _get_default_system_timezone()
try:
ZoneInfo(s)
return s
except Exception:
return _get_default_system_timezone()
def _safe_float(raw: Any, default: float) -> float:
try:
return float(raw)
except (ValueError, TypeError):
return default
def _safe_int(raw: Any, default: int) -> int:
try:
return int(raw)
except (ValueError, TypeError):
return default
def _parse_env_params(raw: Any) -> Dict[str, str]:
if isinstance(raw, dict):
return _normalize_env_params(raw)
if isinstance(raw, str):
try:
parsed = json.loads(raw)
return _normalize_env_params(parsed)
except Exception:
pass
return {}
def _is_valid_top_level_skill_name(name: str) -> bool:
return bool(re.match(r"^[a-zA-Z0-9_-]+$", name))
def _parse_json_string_list(raw: Any) -> List[str]:
if not raw:
return []
if isinstance(raw, list):
return [str(v) for v in raw]
if isinstance(raw, str):
try:
parsed = json.loads(raw)
if isinstance(parsed, list):
return [str(v) for v in parsed]
except Exception:
pass
return []
def _is_ignored_skill_zip_top_level(name: str) -> bool:
return name.startswith(".") or name.startswith("__") or name in {"venv", "node_modules"}
def _read_description_from_text(text: str) -> str:
if not text:
return ""
lines = text.strip().split("\n")
for line in lines:
s = line.strip()
if s and not s.startswith("#"):
return s[:200]
return ""
def _sanitize_skill_market_key(key: str) -> str:
s = str(key or "").strip().lower()
s = re.sub(r"[^a-z0-9_-]", "_", s)
return s
def _calc_dir_size_bytes(path: str) -> int:
total = 0
try:
for root, dirs, files in os.walk(path):
for f in files:
fp = os.path.join(root, f)
if not os.path.islink(fp):
total += os.path.getsize(fp)
except Exception:
pass
return total
def _is_image_attachment_path(path: str) -> bool:
ext = (os.path.splitext(path)[1] or "").lower()
return ext in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg", ".bmp"}
def _is_video_attachment_path(path: str) -> bool:
ext = (os.path.splitext(path)[1] or "").lower()
return ext in {".mp4", ".mov", ".avi", ".mkv", ".webm"}
def _is_visual_attachment_path(path: str) -> bool:
return _is_image_attachment_path(path) or _is_video_attachment_path(path)
def _workspace_stat_ctime_iso(stat: os.stat_result) -> str:
ts = getattr(stat, "st_birthtime", None)
if ts is None:
ts = getattr(stat, "st_ctime", None)
try:
return datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat().replace("+00:00", "Z")
except Exception:
return datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z")