dashboard-nanobot/backend/services/skill_service.py

208 lines
7.9 KiB
Python
Raw Normal View History

2026-03-31 04:31:47 +00:00
import shutil
import zipfile
import tempfile
from datetime import datetime
import os
from typing import Any, Dict, List, Optional
from fastapi import HTTPException, UploadFile
from core.utils import (
_is_ignored_skill_zip_top_level,
_is_valid_top_level_skill_name,
)
2026-04-04 16:29:37 +00:00
from services.bot_storage_service import get_bot_workspace_root
2026-04-14 02:04:12 +00:00
from services.platform_settings_service import get_platform_settings_snapshot
2026-03-31 04:31:47 +00:00
2026-04-04 16:29:37 +00:00
def get_bot_skills_root(bot_id: str) -> str:
return _skills_root(bot_id)
2026-03-31 04:31:47 +00:00
def _skills_root(bot_id: str) -> str:
2026-04-04 16:29:37 +00:00
return os.path.join(get_bot_workspace_root(bot_id), "skills")
2026-03-31 04:31:47 +00:00
def _read_skill_description(entry_path: str) -> str:
candidates: List[str] = []
if os.path.isdir(entry_path):
candidates = [
os.path.join(entry_path, "SKILL.md"),
os.path.join(entry_path, "skill.md"),
os.path.join(entry_path, "README.md"),
os.path.join(entry_path, "readme.md"),
]
elif entry_path.lower().endswith(".md"):
candidates = [entry_path]
for candidate in candidates:
if not os.path.isfile(candidate):
continue
try:
with open(candidate, "r", encoding="utf-8") as f:
for line in f:
text = line.strip()
if text and not text.startswith("#"):
return text[:240]
except Exception:
continue
return ""
def _list_workspace_skills(bot_id: str) -> List[Dict[str, Any]]:
root = _skills_root(bot_id)
os.makedirs(root, exist_ok=True)
rows: List[Dict[str, Any]] = []
names = sorted(os.listdir(root), key=lambda n: (not os.path.isdir(os.path.join(root, n)), n.lower()))
for name in names:
if not name or name.startswith("."):
continue
if not _is_valid_top_level_skill_name(name):
continue
abs_path = os.path.join(root, name)
if not os.path.exists(abs_path):
continue
stat = os.stat(abs_path)
rows.append(
{
"id": name,
"name": name,
"type": "dir" if os.path.isdir(abs_path) else "file",
"path": f"skills/{name}",
"size": stat.st_size if os.path.isfile(abs_path) else None,
"mtime": datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z",
"description": _read_skill_description(abs_path),
}
)
return rows
def _install_skill_zip_into_workspace(bot_id: str, zip_path: str) -> Dict[str, Any]:
try:
archive = zipfile.ZipFile(zip_path)
except Exception as exc:
raise HTTPException(status_code=400, detail="Invalid zip file") from exc
skills_root = _skills_root(bot_id)
os.makedirs(skills_root, exist_ok=True)
installed: List[str] = []
with archive:
members = archive.infolist()
file_members = [m for m in members if not m.is_dir()]
if not file_members:
raise HTTPException(status_code=400, detail="Zip package has no files")
top_names: List[str] = []
for member in file_members:
raw_name = str(member.filename or "").replace("\\", "/").lstrip("/")
if not raw_name:
continue
first = raw_name.split("/", 1)[0].strip()
if _is_ignored_skill_zip_top_level(first):
continue
if not _is_valid_top_level_skill_name(first):
raise HTTPException(status_code=400, detail=f"Invalid skill entry name in zip: {first}")
if first not in top_names:
top_names.append(first)
if not top_names:
raise HTTPException(status_code=400, detail="Zip package has no valid skill entries")
conflicts = [name for name in top_names if os.path.exists(os.path.join(skills_root, name))]
if conflicts:
raise HTTPException(status_code=400, detail=f"Skill already exists: {', '.join(conflicts)}")
with tempfile.TemporaryDirectory(prefix=".skill_upload_", dir=skills_root) as tmp_dir:
tmp_root = os.path.abspath(tmp_dir)
for member in members:
raw_name = str(member.filename or "").replace("\\", "/").lstrip("/")
if not raw_name:
continue
target = os.path.abspath(os.path.join(tmp_root, raw_name))
if os.path.commonpath([tmp_root, target]) != tmp_root:
raise HTTPException(status_code=400, detail=f"Unsafe zip entry path: {raw_name}")
if member.is_dir():
os.makedirs(target, exist_ok=True)
continue
os.makedirs(os.path.dirname(target), exist_ok=True)
with archive.open(member, "r") as source, open(target, "wb") as dest:
shutil.copyfileobj(source, dest)
for name in top_names:
src = os.path.join(tmp_root, name)
dst = os.path.join(skills_root, name)
if not os.path.exists(src):
continue
shutil.move(src, dst)
installed.append(name)
if not installed:
raise HTTPException(status_code=400, detail="No skill entries installed from zip")
return {
"installed": installed,
"skills": _list_workspace_skills(bot_id),
}
2026-04-04 16:29:37 +00:00
def install_skill_zip_into_workspace(bot_id: str, zip_path: str) -> Dict[str, Any]:
return _install_skill_zip_into_workspace(bot_id, zip_path)
2026-03-31 04:31:47 +00:00
def list_bot_skills(bot_id: str) -> List[Dict[str, Any]]:
return _list_workspace_skills(bot_id)
async def upload_bot_skill_zip_to_workspace(bot_id: str, *, upload: UploadFile) -> Dict[str, Any]:
tmp_zip_path: Optional[str] = None
try:
with tempfile.NamedTemporaryFile(prefix=".skill_upload_", suffix=".zip", delete=False) as tmp_zip:
tmp_zip_path = tmp_zip.name
filename = str(upload.filename or "").strip()
if not filename.lower().endswith(".zip"):
raise HTTPException(status_code=400, detail="Only .zip skill package is supported")
max_bytes = get_platform_settings_snapshot().upload_max_mb * 1024 * 1024
total_size = 0
while True:
chunk = await upload.read(1024 * 1024)
if not chunk:
break
total_size += len(chunk)
if total_size > max_bytes:
raise HTTPException(
status_code=413,
detail=f"Zip package too large (max {max_bytes // (1024 * 1024)}MB)",
)
tmp_zip.write(chunk)
if total_size == 0:
raise HTTPException(status_code=400, detail="Zip package is empty")
finally:
await upload.close()
try:
install_result = _install_skill_zip_into_workspace(bot_id, tmp_zip_path)
finally:
if tmp_zip_path and os.path.exists(tmp_zip_path):
os.remove(tmp_zip_path)
return {
"status": "installed",
"bot_id": bot_id,
"installed": install_result["installed"],
"skills": install_result["skills"],
}
def delete_workspace_skill_entry(bot_id: str, *, skill_name: str) -> Dict[str, Any]:
name = str(skill_name or "").strip()
if not _is_valid_top_level_skill_name(name):
raise HTTPException(status_code=400, detail="Invalid skill name")
root = _skills_root(bot_id)
target = os.path.abspath(os.path.join(root, name))
if os.path.commonpath([os.path.abspath(root), target]) != os.path.abspath(root):
raise HTTPException(status_code=400, detail="Invalid skill path")
if not os.path.exists(target):
raise HTTPException(status_code=404, detail="Skill not found in workspace")
if os.path.isdir(target):
shutil.rmtree(target, ignore_errors=False)
else:
os.remove(target)
return {"status": "deleted", "bot_id": bot_id, "skill": name}