nex_docus/backend/app/api/v1/users.py

446 lines
14 KiB
Python
Raw Normal View History

2025-12-23 05:02:10 +00:00
"""
用户管理 API
"""
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, delete, or_
from typing import List, Optional
from pydantic import BaseModel, EmailStr
from app.core.database import get_db
from app.core.deps import get_current_user
from app.core.security import get_password_hash
from app.core.config import settings
from app.models.user import User
from app.models.role import Role, UserRole
from app.models.project import Project, ProjectMember
from app.schemas.response import success_response, error_response
router = APIRouter()
# === Pydantic Schemas ===
class UserCreateRequest(BaseModel):
"""创建用户请求"""
username: str
nickname: Optional[str] = None
email: Optional[EmailStr] = None
phone: Optional[str] = None
role_ids: List[int] = [] # 分配的角色ID列表
class UserUpdateRequest(BaseModel):
"""更新用户请求"""
nickname: Optional[str] = None
email: Optional[EmailStr] = None
phone: Optional[str] = None
status: Optional[int] = None
class UserRolesUpdateRequest(BaseModel):
"""更新用户角色请求"""
role_ids: List[int]
# === API Endpoints ===
@router.get("/", response_model=dict)
async def get_users(
page: int = Query(1, ge=1),
page_size: int = Query(10, ge=1, le=100),
keyword: Optional[str] = Query(None, description="搜索关键词(用户名、昵称、邮箱)"),
status: Optional[int] = Query(None, description="状态筛选0-禁用 1-启用"),
2025-12-29 12:53:50 +00:00
role_id: Optional[int] = Query(None, description="角色ID筛选"),
2025-12-23 05:02:10 +00:00
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取用户列表(分页)"""
# 构建查询条件
conditions = []
if keyword:
conditions.append(
or_(
User.username.like(f"%{keyword}%"),
User.nickname.like(f"%{keyword}%"),
User.email.like(f"%{keyword}%")
)
)
if status is not None:
conditions.append(User.status == status)
2025-12-29 12:53:50 +00:00
# 如果需要按角色筛选
if role_id is not None:
# 查询用户列表时需要JOIN UserRole表
query = (
select(User)
.join(UserRole, UserRole.user_id == User.id)
.where(UserRole.role_id == role_id)
)
if conditions:
query = query.where(*conditions)
query = query.order_by(User.created_at.desc())
# 查询总数
count_query = (
select(func.count(User.id.distinct()))
.join(UserRole, UserRole.user_id == User.id)
.where(UserRole.role_id == role_id)
)
if conditions:
count_query = count_query.where(*conditions)
total_result = await db.execute(count_query)
total = total_result.scalar()
else:
# 查询总数
count_query = select(func.count(User.id))
if conditions:
count_query = count_query.where(*conditions)
total_result = await db.execute(count_query)
total = total_result.scalar()
# 查询用户列表
query = select(User).order_by(User.created_at.desc())
if conditions:
query = query.where(*conditions)
2025-12-23 05:02:10 +00:00
2025-12-29 12:53:50 +00:00
query = query.offset((page - 1) * page_size).limit(page_size)
2025-12-23 05:02:10 +00:00
result = await db.execute(query)
users = result.scalars().all()
# 获取每个用户的角色信息
users_data = []
for user in users:
# 获取用户角色
roles_result = await db.execute(
select(Role)
.join(UserRole, UserRole.role_id == Role.id)
.where(UserRole.user_id == user.id)
)
roles = roles_result.scalars().all()
users_data.append({
"id": user.id,
"username": user.username,
"nickname": user.nickname,
"email": user.email,
"phone": user.phone,
"avatar": user.avatar,
"status": user.status,
"is_superuser": user.is_superuser,
"last_login_at": user.last_login_at.isoformat() if user.last_login_at else None,
"created_at": user.created_at.isoformat() if user.created_at else None,
"roles": [{"id": r.id, "role_name": r.role_name, "role_code": r.role_code} for r in roles]
})
return {
"code": 200,
"message": "success",
"data": users_data,
"total": total,
"page": page,
"page_size": page_size
}
@router.post("/", response_model=dict)
async def create_user(
user_data: UserCreateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""创建新用户"""
# 检查用户名是否已存在
result = await db.execute(
select(User).where(User.username == user_data.username)
)
existing_user = result.scalar_one_or_none()
if existing_user:
raise HTTPException(status_code=400, detail="用户名已存在")
# 检查邮箱是否已存在
if user_data.email:
result = await db.execute(
select(User).where(User.email == user_data.email)
)
existing_email = result.scalar_one_or_none()
if existing_email:
raise HTTPException(status_code=400, detail="邮箱已被使用")
# 创建用户
new_user = User(
username=user_data.username,
password_hash=get_password_hash(settings.DEFAULT_USER_PASSWORD),
nickname=user_data.nickname or user_data.username,
email=user_data.email,
phone=user_data.phone,
status=1,
is_superuser=0
)
db.add(new_user)
await db.flush()
# 分配角色
if user_data.role_ids:
# 验证角色ID是否存在
roles_result = await db.execute(
select(Role.id).where(Role.id.in_(user_data.role_ids))
)
valid_role_ids = [row[0] for row in roles_result.all()]
for role_id in valid_role_ids:
user_role = UserRole(user_id=new_user.id, role_id=role_id)
db.add(user_role)
await db.commit()
await db.refresh(new_user)
return success_response(
data={
"id": new_user.id,
"username": new_user.username,
"default_password": settings.DEFAULT_USER_PASSWORD
},
message="用户创建成功,请记住初始密码"
)
@router.get("/{user_id}", response_model=dict)
async def get_user(
user_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""获取用户详情"""
# 查询用户
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 获取用户角色
roles_result = await db.execute(
select(Role)
.join(UserRole, UserRole.role_id == Role.id)
.where(UserRole.user_id == user.id)
)
roles = roles_result.scalars().all()
return success_response(data={
"id": user.id,
"username": user.username,
"nickname": user.nickname,
"email": user.email,
"phone": user.phone,
"avatar": user.avatar,
"status": user.status,
"is_superuser": user.is_superuser,
"last_login_at": user.last_login_at.isoformat() if user.last_login_at else None,
"created_at": user.created_at.isoformat() if user.created_at else None,
"roles": [{"id": r.id, "role_name": r.role_name, "role_code": r.role_code} for r in roles]
})
@router.put("/{user_id}", response_model=dict)
async def update_user(
user_id: int,
user_data: UserUpdateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""更新用户信息"""
# 查询用户
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 更新字段
if user_data.nickname is not None:
user.nickname = user_data.nickname
if user_data.email is not None:
# 检查邮箱是否被其他用户使用
if user_data.email != user.email:
email_result = await db.execute(
select(User).where(User.email == user_data.email, User.id != user_id)
)
if email_result.scalar_one_or_none():
raise HTTPException(status_code=400, detail="邮箱已被其他用户使用")
user.email = user_data.email
if user_data.phone is not None:
user.phone = user_data.phone
if user_data.status is not None:
user.status = user_data.status
await db.commit()
await db.refresh(user)
return success_response(data={"id": user.id}, message="用户信息更新成功")
@router.delete("/{user_id}", response_model=dict)
async def delete_user(
user_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""删除用户(需要检查是否有归属项目)"""
# 查询用户
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 不允许删除超级管理员
if user.is_superuser == 1:
raise HTTPException(status_code=400, detail="不允许删除超级管理员")
# 不允许删除自己
if user_id == current_user.id:
raise HTTPException(status_code=400, detail="不允许删除当前登录用户")
# 检查用户是否拥有项目
owned_projects_result = await db.execute(
select(func.count(Project.id)).where(Project.owner_id == user_id)
)
owned_projects_count = owned_projects_result.scalar()
if owned_projects_count > 0:
raise HTTPException(
status_code=400,
detail=f"该用户拥有 {owned_projects_count} 个项目,无法删除。请先转移或删除这些项目。"
)
# 删除用户的角色关联
await db.execute(
delete(UserRole).where(UserRole.user_id == user_id)
)
# 删除用户的项目成员关联
await db.execute(
delete(ProjectMember).where(ProjectMember.user_id == user_id)
)
# 删除用户
await db.delete(user)
await db.commit()
return success_response(message="用户删除成功")
@router.put("/{user_id}/status", response_model=dict)
async def update_user_status(
user_id: int,
status: int = Query(..., ge=0, le=1, description="状态0-禁用 1-启用"),
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""更新用户状态(停用/启用)"""
# 查询用户
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 不允许停用超级管理员
if user.is_superuser == 1 and status == 0:
raise HTTPException(status_code=400, detail="不允许停用超级管理员")
# 不允许停用自己
if user_id == current_user.id and status == 0:
raise HTTPException(status_code=400, detail="不允许停用当前登录用户")
user.status = status
await db.commit()
status_text = "启用" if status == 1 else "停用"
return success_response(message=f"用户已{status_text}")
@router.put("/{user_id}/roles", response_model=dict)
async def update_user_roles(
user_id: int,
roles_data: UserRolesUpdateRequest,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""更新用户角色"""
# 查询用户
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 验证角色ID是否存在
if roles_data.role_ids:
roles_result = await db.execute(
select(Role.id).where(Role.id.in_(roles_data.role_ids))
)
valid_role_ids = [row[0] for row in roles_result.all()]
invalid_ids = set(roles_data.role_ids) - set(valid_role_ids)
if invalid_ids:
raise HTTPException(
status_code=400,
detail=f"以下角色ID不存在: {', '.join(map(str, invalid_ids))}"
)
else:
valid_role_ids = []
# 删除原有角色关联
await db.execute(
delete(UserRole).where(UserRole.user_id == user_id)
)
# 添加新角色关联
for role_id in valid_role_ids:
user_role = UserRole(user_id=user_id, role_id=role_id)
db.add(user_role)
await db.commit()
return success_response(message="用户角色更新成功")
@router.post("/{user_id}/reset-password", response_model=dict)
async def reset_user_password(
user_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db)
):
"""重置用户密码为初始密码"""
# 查询用户
result = await db.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
# 重置密码
user.password_hash = get_password_hash(settings.DEFAULT_USER_PASSWORD)
await db.commit()
return success_response(
data={"default_password": settings.DEFAULT_USER_PASSWORD},
message="密码已重置为初始密码"
)