dashboard-nanobot/backend/services/workspace_service.py

451 lines
15 KiB
Python

import mimetypes
import os
import re
from datetime import datetime
from typing import Any, Dict, Generator, List, Optional
from urllib.parse import quote
from fastapi import HTTPException, Request, UploadFile
from fastapi.responses import FileResponse, RedirectResponse, Response, StreamingResponse
from core.utils import _workspace_stat_ctime_iso
from services.bot_storage_service import get_bot_workspace_root
from services.platform_settings_service import get_platform_settings_snapshot
TEXT_PREVIEW_EXTENSIONS = {
"",
".md",
".txt",
".log",
".json",
".yaml",
".yml",
".cfg",
".ini",
".csv",
".tsv",
".toml",
".py",
".sh",
}
MARKDOWN_EXTENSIONS = {".md", ".markdown"}
def _resolve_workspace_path(bot_id: str, rel_path: Optional[str] = None) -> tuple[str, str]:
root = get_bot_workspace_root(bot_id)
rel = (rel_path or "").strip().replace("\\", "/")
target = os.path.abspath(os.path.join(root, rel))
if os.path.commonpath([root, target]) != root:
raise HTTPException(status_code=400, detail="invalid workspace path")
return root, target
def resolve_workspace_path(bot_id: str, rel_path: Optional[str] = None) -> tuple[str, str]:
return _resolve_workspace_path(bot_id, rel_path)
def _write_text_atomic(target: str, content: str) -> None:
os.makedirs(os.path.dirname(target), exist_ok=True)
tmp = f"{target}.tmp"
with open(tmp, "w", encoding="utf-8") as fh:
fh.write(content)
os.replace(tmp, target)
def _build_workspace_tree(path: str, root: str, depth: int) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
try:
names = sorted(os.listdir(path), key=lambda v: (not os.path.isdir(os.path.join(path, v)), v.lower()))
except FileNotFoundError:
return rows
for name in names:
if name in {".DS_Store"}:
continue
abs_path = os.path.join(path, name)
rel_path = os.path.relpath(abs_path, root).replace("\\", "/")
stat = os.stat(abs_path)
base: Dict[str, Any] = {
"name": name,
"path": rel_path,
"ctime": _workspace_stat_ctime_iso(stat),
"mtime": datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z",
}
if os.path.isdir(abs_path):
node = {**base, "type": "dir"}
if depth > 0:
node["children"] = _build_workspace_tree(abs_path, root, depth - 1)
rows.append(node)
continue
rows.append(
{
**base,
"type": "file",
"size": stat.st_size,
"ext": os.path.splitext(name)[1].lower(),
}
)
return rows
def _list_workspace_dir(path: str, root: str) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
names = sorted(os.listdir(path), key=lambda v: (not os.path.isdir(os.path.join(path, v)), v.lower()))
for name in names:
if name in {".DS_Store"}:
continue
abs_path = os.path.join(path, name)
rel_path = os.path.relpath(abs_path, root).replace("\\", "/")
stat = os.stat(abs_path)
rows.append(
{
"name": name,
"path": rel_path,
"type": "dir" if os.path.isdir(abs_path) else "file",
"size": stat.st_size if os.path.isfile(abs_path) else None,
"ext": os.path.splitext(name)[1].lower() if os.path.isfile(abs_path) else "",
"ctime": _workspace_stat_ctime_iso(stat),
"mtime": datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z",
}
)
return rows
def _list_workspace_dir_recursive(path: str, root: str) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for walk_root, dirnames, filenames in os.walk(path):
dirnames.sort(key=lambda v: v.lower())
filenames.sort(key=lambda v: v.lower())
for name in dirnames:
if name in {".DS_Store"}:
continue
abs_path = os.path.join(walk_root, name)
rel_path = os.path.relpath(abs_path, root).replace("\\", "/")
stat = os.stat(abs_path)
rows.append(
{
"name": name,
"path": rel_path,
"type": "dir",
"size": None,
"ext": "",
"ctime": _workspace_stat_ctime_iso(stat),
"mtime": datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z",
}
)
for name in filenames:
if name in {".DS_Store"}:
continue
abs_path = os.path.join(walk_root, name)
rel_path = os.path.relpath(abs_path, root).replace("\\", "/")
stat = os.stat(abs_path)
rows.append(
{
"name": name,
"path": rel_path,
"type": "file",
"size": stat.st_size,
"ext": os.path.splitext(name)[1].lower(),
"ctime": _workspace_stat_ctime_iso(stat),
"mtime": datetime.utcfromtimestamp(stat.st_mtime).isoformat() + "Z",
}
)
rows.sort(key=lambda v: (v.get("type") != "dir", str(v.get("path", "")).lower()))
return rows
def _stream_file_range(target: str, start: int, end: int, chunk_size: int = 1024 * 1024) -> Generator[bytes, None, None]:
with open(target, "rb") as fh:
fh.seek(start)
remaining = end - start + 1
while remaining > 0:
chunk = fh.read(min(chunk_size, remaining))
if not chunk:
break
remaining -= len(chunk)
yield chunk
def _build_ranged_workspace_response(target: str, media_type: str, range_header: str) -> Response:
file_size = os.path.getsize(target)
range_match = re.match(r"bytes=(\d*)-(\d*)", range_header.strip())
if not range_match:
raise HTTPException(status_code=416, detail="Invalid range")
start_raw, end_raw = range_match.groups()
if start_raw == "" and end_raw == "":
raise HTTPException(status_code=416, detail="Invalid range")
if start_raw == "":
length = int(end_raw)
if length <= 0:
raise HTTPException(status_code=416, detail="Invalid range")
start = max(file_size - length, 0)
end = file_size - 1
else:
start = int(start_raw)
end = int(end_raw) if end_raw else file_size - 1
if start >= file_size or start < 0:
raise HTTPException(status_code=416, detail="Requested range not satisfiable")
end = min(end, file_size - 1)
if end < start:
raise HTTPException(status_code=416, detail="Requested range not satisfiable")
content_length = end - start + 1
headers = {
"Accept-Ranges": "bytes",
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Content-Length": str(content_length),
}
return StreamingResponse(
_stream_file_range(target, start, end),
status_code=206,
media_type=media_type or "application/octet-stream",
headers=headers,
)
def _build_workspace_raw_url(bot_id: str, path: str, public: bool) -> str:
normalized = "/".join(part for part in str(path or "").strip().split("/") if part)
if not normalized:
return ""
prefix = "/public" if public else "/api"
return f"{prefix}/bots/{quote(bot_id, safe='')}/workspace/raw/{quote(normalized, safe='/')}"
def _serve_workspace_file(
*,
bot_id: str,
path: str,
download: bool,
request: Request,
public: bool = False,
redirect_html_to_raw: bool = False,
) -> Response:
_root, target = _resolve_workspace_path(bot_id, path)
if not os.path.isfile(target):
raise HTTPException(status_code=404, detail="File not found")
media_type, _ = mimetypes.guess_type(target)
if redirect_html_to_raw and not download and str(media_type or "").startswith("text/html"):
raw_url = _build_workspace_raw_url(bot_id, path, public=public)
if raw_url:
return RedirectResponse(url=raw_url, status_code=307)
range_header = request.headers.get("range", "") if request else ""
if range_header and not download:
return _build_ranged_workspace_response(target, media_type or "application/octet-stream", range_header)
common_headers = {"Accept-Ranges": "bytes"}
if download:
return FileResponse(
target,
media_type=media_type or "application/octet-stream",
filename=os.path.basename(target),
headers=common_headers,
)
return FileResponse(target, media_type=media_type or "application/octet-stream", headers=common_headers)
def get_workspace_tree_data(
bot_id: str,
*,
path: Optional[str] = None,
recursive: bool = False,
) -> Dict[str, Any]:
root = get_bot_workspace_root(bot_id)
if not os.path.isdir(root):
return {"bot_id": bot_id, "root": root, "cwd": "", "parent": None, "entries": []}
_, target = _resolve_workspace_path(bot_id, path)
if not os.path.isdir(target):
raise HTTPException(status_code=400, detail="workspace path is not a directory")
cwd = os.path.relpath(target, root).replace("\\", "/")
if cwd == ".":
cwd = ""
parent = None
if cwd:
parent = os.path.dirname(cwd).replace("\\", "/")
if parent == ".":
parent = ""
return {
"bot_id": bot_id,
"root": root,
"cwd": cwd,
"parent": parent,
"entries": _list_workspace_dir_recursive(target, root) if recursive else _list_workspace_dir(target, root),
}
def read_workspace_text_file(
bot_id: str,
*,
path: str,
max_bytes: int = 200000,
) -> Dict[str, Any]:
root, target = _resolve_workspace_path(bot_id, path)
if not os.path.isfile(target):
raise HTTPException(status_code=404, detail="workspace file not found")
ext = os.path.splitext(target)[1].lower()
if ext not in TEXT_PREVIEW_EXTENSIONS:
raise HTTPException(status_code=400, detail=f"unsupported file type: {ext or '(none)'}")
safe_max = max(4096, min(int(max_bytes), 1000000))
with open(target, "rb") as file:
raw = file.read(safe_max + 1)
if b"\x00" in raw:
raise HTTPException(status_code=400, detail="binary file is not previewable")
truncated = len(raw) > safe_max
body = raw[:safe_max] if truncated else raw
rel_path = os.path.relpath(target, root).replace("\\", "/")
return {
"bot_id": bot_id,
"path": rel_path,
"size": os.path.getsize(target),
"is_markdown": ext in MARKDOWN_EXTENSIONS,
"truncated": truncated,
"content": body.decode("utf-8", errors="replace"),
}
def update_workspace_markdown_file(
bot_id: str,
*,
path: str,
content: str,
) -> Dict[str, Any]:
root, target = _resolve_workspace_path(bot_id, path)
if not os.path.isfile(target):
raise HTTPException(status_code=404, detail="workspace file not found")
ext = os.path.splitext(target)[1].lower()
if ext not in MARKDOWN_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"editing is only supported for markdown files: {ext or '(none)'}",
)
normalized_content = str(content or "")
encoded = normalized_content.encode("utf-8")
if len(encoded) > 2_000_000:
raise HTTPException(status_code=413, detail="markdown file too large to save")
if "\x00" in normalized_content:
raise HTTPException(status_code=400, detail="markdown content contains invalid null bytes")
_write_text_atomic(target, normalized_content)
rel_path = os.path.relpath(target, root).replace("\\", "/")
return {
"bot_id": bot_id,
"path": rel_path,
"size": os.path.getsize(target),
"is_markdown": True,
"truncated": False,
"content": normalized_content,
}
def serve_workspace_file(
*,
bot_id: str,
path: str,
download: bool,
request: Request,
public: bool = False,
redirect_html_to_raw: bool = False,
) -> Response:
return _serve_workspace_file(
bot_id=bot_id,
path=path,
download=download,
request=request,
public=public,
redirect_html_to_raw=redirect_html_to_raw,
)
def _sanitize_upload_filename(original_name: str) -> str:
name = os.path.basename(original_name).replace("\\", "_").replace("/", "_")
name = re.sub(r"[^\w.\-()+@ ]+", "_", name)
return name or "upload.bin"
async def upload_workspace_files_to_workspace(
bot_id: str,
*,
files: List[UploadFile],
path: Optional[str] = None,
) -> Dict[str, Any]:
if not files:
raise HTTPException(status_code=400, detail="no files uploaded")
platform_settings = get_platform_settings_snapshot()
max_bytes = platform_settings.upload_max_mb * 1024 * 1024
allowed_extensions = set(platform_settings.allowed_attachment_extensions)
root, upload_dir = _resolve_workspace_path(bot_id, path or "uploads")
os.makedirs(upload_dir, exist_ok=True)
safe_dir_real = os.path.abspath(upload_dir)
if os.path.commonpath([root, safe_dir_real]) != root:
raise HTTPException(status_code=400, detail="invalid upload target path")
rows: List[Dict[str, Any]] = []
for upload in files:
original = (upload.filename or "upload.bin").strip() or "upload.bin"
name = _sanitize_upload_filename(original)
ext = str(os.path.splitext(name)[1] or "").strip().lower()
if allowed_extensions and ext not in allowed_extensions:
raise HTTPException(
status_code=400,
detail=f"File '{name}' extension is not allowed. Allowed: {', '.join(sorted(allowed_extensions))}",
)
abs_path = os.path.join(safe_dir_real, name)
if os.path.exists(abs_path):
base, file_ext = os.path.splitext(name)
name = f"{base}-{int(datetime.utcnow().timestamp())}{file_ext}"
abs_path = os.path.join(safe_dir_real, name)
total_size = 0
try:
with open(abs_path, "wb") as file:
while True:
chunk = await upload.read(1024 * 1024)
if not chunk:
break
total_size += len(chunk)
if total_size > max_bytes:
raise HTTPException(
status_code=413,
detail=f"File '{name}' too large (max {max_bytes // (1024 * 1024)}MB)",
)
file.write(chunk)
except HTTPException:
if os.path.exists(abs_path):
os.remove(abs_path)
raise
except OSError as exc:
if os.path.exists(abs_path):
os.remove(abs_path)
raise HTTPException(
status_code=500,
detail=f"Failed to write file '{name}': {exc.strerror or str(exc)}",
)
except Exception:
if os.path.exists(abs_path):
os.remove(abs_path)
raise HTTPException(status_code=500, detail=f"Failed to upload file '{name}'")
finally:
await upload.close()
rel_path = os.path.relpath(abs_path, root).replace("\\", "/")
rows.append({"name": name, "path": rel_path, "size": total_size})
return {"bot_id": bot_id, "files": rows}