nex_docus/backend/app/api/v1/preview.py

234 lines
7.8 KiB
Python
Raw Normal View History

2025-12-20 11:18:59 +00:00
"""
2025-12-29 12:53:50 +00:00
项目预览相关 API支持公开和私密项目
2025-12-20 11:18:59 +00:00
"""
from fastapi import APIRouter, Depends, HTTPException, Header
2025-12-31 05:44:03 +00:00
from fastapi.responses import FileResponse
from fastapi.security import HTTPAuthorizationCredentials
2025-12-20 11:18:59 +00:00
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import Optional
2025-12-31 05:44:03 +00:00
import mimetypes
2025-12-20 11:18:59 +00:00
from app.core.database import get_db
2025-12-31 05:44:03 +00:00
from app.core.deps import get_current_user_optional, security_optional
from app.core.security import decode_access_token
from app.core.redis_client import TokenCache
2025-12-29 12:53:50 +00:00
from app.models.project import Project, ProjectMember
from app.models.user import User
2025-12-20 11:18:59 +00:00
from app.schemas.response import success_response
from app.services.storage import storage_service
router = APIRouter()
2025-12-29 12:53:50 +00:00
async def check_preview_access(
project: Project,
current_user: Optional[User],
db: AsyncSession
):
"""检查预览访问权限"""
# 公开项目:任何人都可以访问
if project.is_public == 1:
return True
# 私密项目:必须是项目成员
if not current_user:
raise HTTPException(status_code=401, detail="私密项目需要登录才能访问")
# 检查是否是项目所有者
if project.owner_id == current_user.id:
return True
# 检查是否是项目成员
member_result = await db.execute(
select(ProjectMember).where(
ProjectMember.project_id == project.id,
ProjectMember.user_id == current_user.id
)
)
member = member_result.scalar_one_or_none()
if not member:
raise HTTPException(status_code=403, detail="无权访问该私密项目")
return True
2025-12-20 11:18:59 +00:00
@router.get("/{project_id}/info", response_model=dict)
async def get_preview_info(
project_id: int,
2025-12-29 12:53:50 +00:00
current_user: Optional[User] = Depends(get_current_user_optional),
2025-12-20 11:18:59 +00:00
db: AsyncSession = Depends(get_db)
):
2025-12-29 12:53:50 +00:00
"""获取预览项目基本信息"""
2025-12-20 11:18:59 +00:00
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
2025-12-29 12:53:50 +00:00
# 检查访问权限
await check_preview_access(project, current_user, db)
2025-12-20 11:18:59 +00:00
# 返回基本信息
info = {
"id": project.id,
"name": project.name,
"description": project.description,
2025-12-29 12:53:50 +00:00
"is_public": project.is_public,
2025-12-20 11:18:59 +00:00
"has_password": bool(project.access_pass),
}
return success_response(data=info)
@router.post("/{project_id}/verify", response_model=dict)
async def verify_access_password(
project_id: int,
password: str = Header(..., alias="X-Access-Password"),
2025-12-29 12:53:50 +00:00
current_user: Optional[User] = Depends(get_current_user_optional),
2025-12-20 11:18:59 +00:00
db: AsyncSession = Depends(get_db)
):
"""验证访问密码"""
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
2025-12-29 12:53:50 +00:00
# 检查访问权限
await check_preview_access(project, current_user, db)
2025-12-20 11:18:59 +00:00
# 验证密码
if not project.access_pass:
return success_response(message="该项目无需密码访问")
if project.access_pass != password:
raise HTTPException(status_code=403, detail="访问密码错误")
return success_response(message="验证成功")
@router.get("/{project_id}/tree", response_model=dict)
async def get_preview_tree(
project_id: int,
password: Optional[str] = Header(None, alias="X-Access-Password"),
2025-12-29 12:53:50 +00:00
current_user: Optional[User] = Depends(get_current_user_optional),
2025-12-20 11:18:59 +00:00
db: AsyncSession = Depends(get_db)
):
2025-12-29 12:53:50 +00:00
"""获取预览项目的文档树"""
2025-12-20 11:18:59 +00:00
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
2025-12-29 12:53:50 +00:00
# 检查访问权限
await check_preview_access(project, current_user, db)
2025-12-20 11:18:59 +00:00
# 如果设置了密码,需要验证
if project.access_pass:
if not password or project.access_pass != password:
raise HTTPException(status_code=403, detail="需要提供正确的访问密码")
# 获取文档树
project_path = storage_service.get_secure_path(project.storage_key)
tree = storage_service.generate_tree(project_path)
return success_response(data=tree)
@router.get("/{project_id}/file", response_model=dict)
async def get_preview_file(
project_id: int,
path: str,
password: Optional[str] = Header(None, alias="X-Access-Password"),
2025-12-29 12:53:50 +00:00
current_user: Optional[User] = Depends(get_current_user_optional),
2025-12-20 11:18:59 +00:00
db: AsyncSession = Depends(get_db)
):
2025-12-29 12:53:50 +00:00
"""获取预览项目的文件内容"""
2025-12-20 11:18:59 +00:00
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
2025-12-29 12:53:50 +00:00
# 检查访问权限
await check_preview_access(project, current_user, db)
2025-12-20 11:18:59 +00:00
# 如果设置了密码,需要验证
if project.access_pass:
if not password or project.access_pass != password:
raise HTTPException(status_code=403, detail="需要提供正确的访问密码")
# 获取文件内容
file_path = storage_service.get_secure_path(project.storage_key, path)
content = await storage_service.read_file(file_path)
return success_response(data={"content": content})
2025-12-31 05:44:03 +00:00
@router.get("/{project_id}/document/{path:path}")
async def get_preview_document(
project_id: int,
path: str,
password: Optional[str] = Header(None, alias="X-Access-Password"),
access_pass: Optional[str] = None, # 支持密码查询参数
token: Optional[str] = None, # 支持token查询参数
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security_optional),
db: AsyncSession = Depends(get_db)
):
"""获取预览项目的文档文件PDF等- 返回文件流"""
# 获取当前用户支持header或query参数
current_user = None
token_str = None
if credentials:
token_str = credentials.credentials
elif token:
token_str = token
if token_str:
try:
user_id_from_redis = await TokenCache.get_user_id(token_str)
if user_id_from_redis:
payload = decode_access_token(token_str)
if payload:
user_id_str = payload.get("sub")
if user_id_str:
user_id = int(user_id_str)
result = await db.execute(select(User).where(User.id == user_id))
current_user = result.scalar_one_or_none()
except Exception:
pass # 忽略token验证失败继续作为未登录用户
# 查询项目
result = await db.execute(select(Project).where(Project.id == project_id))
project = result.scalar_one_or_none()
if not project:
raise HTTPException(status_code=404, detail="项目不存在")
# 检查访问权限
await check_preview_access(project, current_user, db)
# 如果设置了密码需要验证优先使用header其次使用query参数
provided_password = password or access_pass
if project.access_pass:
if not provided_password or project.access_pass != provided_password:
raise HTTPException(status_code=403, detail="需要提供正确的访问密码")
# 获取文件
file_path = storage_service.get_secure_path(project.storage_key, path)
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="文件不存在")
content_type, _ = mimetypes.guess_type(str(file_path))
return FileResponse(path=str(file_path), media_type=content_type, filename=file_path.name)