nex_docus/backend/app/api/v1/dashboard.py

335 lines
12 KiB
Python
Raw Normal View History

2025-12-20 11:18:59 +00:00
"""
管理员仪表盘相关 API
"""
2026-01-13 13:21:47 +00:00
from fastapi import APIRouter, Depends, HTTPException, Query
2025-12-20 11:18:59 +00:00
from sqlalchemy.ext.asyncio import AsyncSession
2026-01-13 13:21:47 +00:00
from sqlalchemy import select, func, and_, or_
from datetime import datetime, date
from typing import Optional
2025-12-20 11:18:59 +00:00
import os
import glob
2026-01-13 13:21:47 +00:00
import json
2025-12-20 11:18:59 +00:00
from app.core.database import get_db
from app.core.deps import get_current_user
from app.core.config import settings
from app.models.user import User
from app.models.project import Project, ProjectMember
2026-01-13 13:21:47 +00:00
from app.models.log import OperationLog
from app.core.enums import OperationType, ResourceType
2025-12-20 11:18:59 +00:00
from app.schemas.response import success_response
router = APIRouter()
@router.get("/stats", response_model=dict)
async def get_dashboard_stats(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取仪表盘统计数据(仅管理员)"""
# 检查是否为超级管理员
if current_user.is_superuser != 1:
raise HTTPException(status_code=403, detail="仅管理员可访问")
# 统计用户数
user_count_result = await db.execute(select(func.count(User.id)))
user_count = user_count_result.scalar()
# 统计项目数
project_count_result = await db.execute(select(func.count(Project.id)))
project_count = project_count_result.scalar()
# 统计文档数(所有项目中的 .md 文件)
document_count = 0
if os.path.exists(settings.PROJECTS_PATH):
for project_dir in os.listdir(settings.PROJECTS_PATH):
project_path = os.path.join(settings.PROJECTS_PATH, project_dir)
if os.path.isdir(project_path):
md_files = glob.glob(os.path.join(project_path, "**/*.md"), recursive=True)
document_count += len(md_files)
# 获取最近创建的用户
recent_users_result = await db.execute(
select(User)
.order_by(User.created_at.desc())
.limit(5)
)
recent_users = recent_users_result.scalars().all()
recent_users_data = [
{
"id": user.id,
"username": user.username,
"email": user.email,
"created_at": user.created_at.isoformat() if user.created_at else None,
}
for user in recent_users
]
# 获取最近创建的项目(包含所有者信息)
recent_projects_result = await db.execute(
select(Project, User)
.join(User, Project.owner_id == User.id)
.order_by(Project.created_at.desc())
.limit(5)
)
recent_projects_rows = recent_projects_result.all()
recent_projects_data = [
{
"id": project.id,
"name": project.name,
"description": project.description,
"owner_name": owner.username,
"created_at": project.created_at.isoformat() if project.created_at else None,
}
for project, owner in recent_projects_rows
]
return success_response(
data={
"stats": {
"user_count": user_count,
"project_count": project_count,
"document_count": document_count,
},
"recent_users": recent_users_data,
"recent_projects": recent_projects_data,
}
)
@router.get("/personal-stats", response_model=dict)
async def get_personal_stats(
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取个人桌面统计数据"""
# 统计个人项目数
personal_projects_count_result = await db.execute(
select(func.count(Project.id)).where(Project.owner_id == current_user.id)
)
personal_projects_count = personal_projects_count_result.scalar()
2025-12-29 12:53:50 +00:00
# 统计参加项目数(协作项目)
shared_projects_count_result = await db.execute(
select(func.count(Project.id))
.join(ProjectMember, Project.id == ProjectMember.project_id)
.where(ProjectMember.user_id == current_user.id)
.where(Project.owner_id != current_user.id)
)
shared_projects_count = shared_projects_count_result.scalar()
2025-12-20 11:18:59 +00:00
# 统计个人文档数(个人项目中的 .md 文件)
document_count = 0
personal_projects_result = await db.execute(
select(Project).where(Project.owner_id == current_user.id)
)
personal_projects = personal_projects_result.scalars().all()
for project in personal_projects:
project_path = os.path.join(settings.PROJECTS_PATH, project.storage_key)
if os.path.exists(project_path) and os.path.isdir(project_path):
md_files = glob.glob(os.path.join(project_path, "**/*.md"), recursive=True)
document_count += len(md_files)
# 获取最近的个人项目
recent_personal_projects_result = await db.execute(
select(Project)
.where(Project.owner_id == current_user.id)
.order_by(Project.created_at.desc())
.limit(5)
)
recent_personal_projects = recent_personal_projects_result.scalars().all()
recent_personal_projects_data = [
{
"id": project.id,
"name": project.name,
"description": project.description,
"created_at": project.created_at.isoformat() if project.created_at else None,
}
for project in recent_personal_projects
]
# 获取最近的分享项目(从 project_members 表)
recent_shared_projects_result = await db.execute(
select(Project, ProjectMember)
.join(ProjectMember, Project.id == ProjectMember.project_id)
.where(ProjectMember.user_id == current_user.id)
.where(Project.owner_id != current_user.id)
.order_by(ProjectMember.joined_at.desc())
.limit(5)
)
recent_shared_projects_rows = recent_shared_projects_result.all()
recent_shared_projects_data = [
{
"id": project.id,
"name": project.name,
"description": project.description,
"role": member.role,
"joined_at": member.joined_at.isoformat() if member.joined_at else None,
}
for project, member in recent_shared_projects_rows
]
return success_response(
data={
"user_info": {
"id": current_user.id,
"username": current_user.username,
"email": current_user.email,
"created_at": current_user.created_at.isoformat() if current_user.created_at else None,
},
"stats": {
"personal_projects_count": personal_projects_count,
2025-12-29 12:53:50 +00:00
"shared_projects_count": shared_projects_count,
2025-12-20 11:18:59 +00:00
"document_count": document_count,
},
"recent_personal_projects": recent_personal_projects_data,
"recent_shared_projects": recent_shared_projects_data,
}
)
2026-01-13 13:21:47 +00:00
@router.get("/document-activity-dates", response_model=dict)
async def get_document_activity_dates(
year: int = Query(..., description="年份"),
month: int = Query(..., description="月份"),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取指定月份有文档操作的日期列表"""
# 计算月份的开始和结束日期
start_date = datetime(year, month, 1)
if month == 12:
end_date = datetime(year + 1, 1, 1)
else:
end_date = datetime(year, month + 1, 1)
# 查询该用户在指定月份内的文档操作日志
# 文档操作包括:创建文件、保存文件、删除文件、重命名文件、移动文件
document_operations = [
OperationType.CREATE_FILE,
OperationType.SAVE_FILE,
OperationType.DELETE_FILE,
OperationType.RENAME_FILE,
OperationType.MOVE_FILE,
]
result = await db.execute(
select(func.date(OperationLog.created_at).label('activity_date'), func.count(OperationLog.id).label('count'))
.where(
and_(
OperationLog.user_id == current_user.id,
OperationLog.operation_type.in_(document_operations),
OperationLog.created_at >= start_date,
OperationLog.created_at < end_date
)
)
.group_by(func.date(OperationLog.created_at))
.order_by(func.date(OperationLog.created_at))
)
activity_dates = result.all()
dates_data = [
{
"date": activity_date.isoformat(),
"count": count
}
for activity_date, count in activity_dates
]
return success_response(data={"dates": dates_data})
@router.get("/document-activity", response_model=dict)
async def get_document_activity(
date_str: str = Query(..., description="日期YYYY-MM-DD"),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取指定日期的文档操作日志"""
# 解析日期
try:
target_date = datetime.strptime(date_str, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(status_code=400, detail="日期格式错误,应为 YYYY-MM-DD")
# 计算日期范围
start_datetime = datetime.combine(target_date, datetime.min.time())
end_datetime = datetime.combine(target_date, datetime.max.time())
# 查询该用户在指定日期的文档操作日志
document_operations = [
OperationType.CREATE_FILE,
OperationType.SAVE_FILE,
OperationType.DELETE_FILE,
OperationType.RENAME_FILE,
OperationType.MOVE_FILE,
]
result = await db.execute(
select(OperationLog)
.where(
and_(
OperationLog.user_id == current_user.id,
OperationLog.operation_type.in_(document_operations),
OperationLog.created_at >= start_datetime,
OperationLog.created_at <= end_datetime
)
)
.order_by(OperationLog.created_at.desc())
)
logs = result.scalars().all()
# 构建返回数据,包含项目信息
logs_data = []
for log in logs:
# 解析 detail 字段获取文件路径和项目ID
detail = json.loads(log.detail) if log.detail else {}
project_id = detail.get('project_id')
file_path = detail.get('path') or detail.get('file_path') or detail.get('old_path')
# 获取项目信息
project_name = None
project_storage_key = None
if project_id:
project_result = await db.execute(
select(Project).where(Project.id == project_id)
)
project = project_result.scalar_one_or_none()
if project:
project_name = project.name
project_storage_key = project.storage_key
# 检查文件是否存在(仅针对非删除操作)
file_exists = False
if project_storage_key and file_path and log.operation_type != OperationType.DELETE_FILE:
full_path = os.path.join(settings.PROJECTS_PATH, project_storage_key, file_path)
file_exists = os.path.exists(full_path) and os.path.isfile(full_path)
# 操作类型中文映射
operation_map = {
OperationType.CREATE_FILE: "创建文件",
OperationType.SAVE_FILE: "保存文件",
OperationType.DELETE_FILE: "删除文件",
OperationType.RENAME_FILE: "重命名文件",
OperationType.MOVE_FILE: "移动文件",
}
logs_data.append({
"id": log.id,
"operation_type": operation_map.get(log.operation_type, log.operation_type),
"project_id": project_id,
"project_name": project_name or "未知项目",
"file_path": file_path or "未知文件",
"file_exists": file_exists,
"created_at": log.created_at.isoformat() if log.created_at else None,
"detail": detail,
})
return success_response(data={"logs": logs_data})