dashboard-nanobot/backend/services/skill_service.py

208 lines
7.9 KiB
Python

import shutil
import zipfile
import tempfile
from datetime import datetime
import os
from typing import Any, Dict, List, Optional
from fastapi import HTTPException, UploadFile
from core.utils import (
_is_ignored_skill_zip_top_level,
_is_valid_top_level_skill_name,
)
from services.bot_storage_service import get_bot_workspace_root
from services.platform_service import get_platform_settings_snapshot
def get_bot_skills_root(bot_id: str) -> str:
return _skills_root(bot_id)
def _skills_root(bot_id: str) -> str:
return os.path.join(get_bot_workspace_root(bot_id), "skills")
def _read_skill_description(entry_path: str) -> str:
candidates: List[str] = []
if os.path.isdir(entry_path):
candidates = [
os.path.join(entry_path, "SKILL.md"),
os.path.join(entry_path, "skill.md"),
os.path.join(entry_path, "README.md"),
os.path.join(entry_path, "readme.md"),
]
elif entry_path.lower().endswith(".md"):
candidates = [entry_path]
for candidate in candidates:
if not os.path.isfile(candidate):
continue
try:
with open(candidate, "r", encoding="utf-8") as f:
for line in f:
text = line.strip()
if text and not text.startswith("#"):
return text[:240]
except Exception:
continue
return ""
def _list_workspace_skills(bot_id: str) -> List[Dict[str, Any]]:
root = _skills_root(bot_id)
os.makedirs(root, exist_ok=True)
rows: List[Dict[str, Any]] = []
names = sorted(os.listdir(root), key=lambda n: (not os.path.isdir(os.path.join(root, n)), n.lower()))
for name in names:
if not name or name.startswith("."):
continue
if not _is_valid_top_level_skill_name(name):
continue
abs_path = os.path.join(root, name)
if not os.path.exists(abs_path):
continue
stat = os.stat(abs_path)
rows.append(
{
"id": name,
"name": name,
"type": "dir" if os.path.isdir(abs_path) else "file",
"path": f"skills/{name}",
"size": stat.st_size if os.path.isfile(abs_path) else None,
"mtime": datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z",
"description": _read_skill_description(abs_path),
}
)
return rows
def _install_skill_zip_into_workspace(bot_id: str, zip_path: str) -> Dict[str, Any]:
try:
archive = zipfile.ZipFile(zip_path)
except Exception as exc:
raise HTTPException(status_code=400, detail="Invalid zip file") from exc
skills_root = _skills_root(bot_id)
os.makedirs(skills_root, exist_ok=True)
installed: List[str] = []
with archive:
members = archive.infolist()
file_members = [m for m in members if not m.is_dir()]
if not file_members:
raise HTTPException(status_code=400, detail="Zip package has no files")
top_names: List[str] = []
for member in file_members:
raw_name = str(member.filename or "").replace("\\", "/").lstrip("/")
if not raw_name:
continue
first = raw_name.split("/", 1)[0].strip()
if _is_ignored_skill_zip_top_level(first):
continue
if not _is_valid_top_level_skill_name(first):
raise HTTPException(status_code=400, detail=f"Invalid skill entry name in zip: {first}")
if first not in top_names:
top_names.append(first)
if not top_names:
raise HTTPException(status_code=400, detail="Zip package has no valid skill entries")
conflicts = [name for name in top_names if os.path.exists(os.path.join(skills_root, name))]
if conflicts:
raise HTTPException(status_code=400, detail=f"Skill already exists: {', '.join(conflicts)}")
with tempfile.TemporaryDirectory(prefix=".skill_upload_", dir=skills_root) as tmp_dir:
tmp_root = os.path.abspath(tmp_dir)
for member in members:
raw_name = str(member.filename or "").replace("\\", "/").lstrip("/")
if not raw_name:
continue
target = os.path.abspath(os.path.join(tmp_root, raw_name))
if os.path.commonpath([tmp_root, target]) != tmp_root:
raise HTTPException(status_code=400, detail=f"Unsafe zip entry path: {raw_name}")
if member.is_dir():
os.makedirs(target, exist_ok=True)
continue
os.makedirs(os.path.dirname(target), exist_ok=True)
with archive.open(member, "r") as source, open(target, "wb") as dest:
shutil.copyfileobj(source, dest)
for name in top_names:
src = os.path.join(tmp_root, name)
dst = os.path.join(skills_root, name)
if not os.path.exists(src):
continue
shutil.move(src, dst)
installed.append(name)
if not installed:
raise HTTPException(status_code=400, detail="No skill entries installed from zip")
return {
"installed": installed,
"skills": _list_workspace_skills(bot_id),
}
def install_skill_zip_into_workspace(bot_id: str, zip_path: str) -> Dict[str, Any]:
return _install_skill_zip_into_workspace(bot_id, zip_path)
def list_bot_skills(bot_id: str) -> List[Dict[str, Any]]:
return _list_workspace_skills(bot_id)
async def upload_bot_skill_zip_to_workspace(bot_id: str, *, upload: UploadFile) -> Dict[str, Any]:
tmp_zip_path: Optional[str] = None
try:
with tempfile.NamedTemporaryFile(prefix=".skill_upload_", suffix=".zip", delete=False) as tmp_zip:
tmp_zip_path = tmp_zip.name
filename = str(upload.filename or "").strip()
if not filename.lower().endswith(".zip"):
raise HTTPException(status_code=400, detail="Only .zip skill package is supported")
max_bytes = get_platform_settings_snapshot().upload_max_mb * 1024 * 1024
total_size = 0
while True:
chunk = await upload.read(1024 * 1024)
if not chunk:
break
total_size += len(chunk)
if total_size > max_bytes:
raise HTTPException(
status_code=413,
detail=f"Zip package too large (max {max_bytes // (1024 * 1024)}MB)",
)
tmp_zip.write(chunk)
if total_size == 0:
raise HTTPException(status_code=400, detail="Zip package is empty")
finally:
await upload.close()
try:
install_result = _install_skill_zip_into_workspace(bot_id, tmp_zip_path)
finally:
if tmp_zip_path and os.path.exists(tmp_zip_path):
os.remove(tmp_zip_path)
return {
"status": "installed",
"bot_id": bot_id,
"installed": install_result["installed"],
"skills": install_result["skills"],
}
def delete_workspace_skill_entry(bot_id: str, *, skill_name: str) -> Dict[str, Any]:
name = str(skill_name or "").strip()
if not _is_valid_top_level_skill_name(name):
raise HTTPException(status_code=400, detail="Invalid skill name")
root = _skills_root(bot_id)
target = os.path.abspath(os.path.join(root, name))
if os.path.commonpath([os.path.abspath(root), target]) != os.path.abspath(root):
raise HTTPException(status_code=400, detail="Invalid skill path")
if not os.path.exists(target):
raise HTTPException(status_code=404, detail="Skill not found in workspace")
if os.path.isdir(target):
shutil.rmtree(target, ignore_errors=False)
else:
os.remove(target)
return {"status": "deleted", "bot_id": bot_id, "skill": name}