cosmo/backend/app/api/user.py

234 lines
7.4 KiB
Python
Raw Normal View History

2025-11-30 02:43:29 +00:00
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from sqlalchemy import select, func
2025-12-02 13:25:28 +00:00
from typing import List, Optional
from pydantic import BaseModel, EmailStr
2025-11-30 02:43:29 +00:00
from app.database import get_db
from app.models.db import User
2025-12-02 13:25:28 +00:00
from app.services.auth import hash_password, verify_password
from app.services.auth_deps import get_current_user, require_admin
from app.services.system_settings_service import system_settings_service
2025-11-30 02:43:29 +00:00
router = APIRouter(prefix="/users", tags=["users"])
# Pydantic models
class UserListItem(BaseModel):
id: int
username: str
email: str | None
full_name: str | None
is_active: bool
roles: list[str]
last_login_at: str | None
created_at: str
class Config:
2025-12-02 13:25:28 +00:00
from_attributes = True
2025-11-30 02:43:29 +00:00
class UserStatusUpdate(BaseModel):
is_active: bool
2025-12-02 13:25:28 +00:00
class ProfileUpdateRequest(BaseModel):
full_name: Optional[str] = None
email: Optional[EmailStr] = None
class PasswordChangeRequest(BaseModel):
old_password: str
new_password: str
2025-11-30 02:43:29 +00:00
@router.get("/list")
async def get_user_list(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user) # Protect this route
):
"""Get a list of all users"""
# Ensure only admins can see all users
if "admin" not in [role.name for role in current_user.roles]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized")
result = await db.execute(
select(User).options(selectinload(User.roles)).order_by(User.id)
)
users = result.scalars().all()
users_list = []
for user in users:
users_list.append({
"id": user.id,
"username": user.username,
"email": user.email,
"full_name": user.full_name,
"is_active": user.is_active,
"roles": [role.name for role in user.roles],
"last_login_at": user.last_login_at.isoformat() if user.last_login_at else None,
"created_at": user.created_at.isoformat()
})
return {"users": users_list}
@router.put("/{user_id}/status")
async def update_user_status(
user_id: int,
status_update: UserStatusUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update a user's active status"""
if "admin" not in [role.name for role in current_user.roles]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
user.is_active = status_update.is_active
await db.commit()
return {"message": "User status updated successfully"}
@router.post("/{user_id}/reset-password")
async def reset_user_password(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
2025-12-02 13:25:28 +00:00
"""Reset a user's password to the system default"""
2025-11-30 02:43:29 +00:00
if "admin" not in [role.name for role in current_user.roles]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized")
2025-12-02 13:25:28 +00:00
2025-11-30 02:43:29 +00:00
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
2025-12-02 13:25:28 +00:00
# Get default password from system settings
default_password = await system_settings_service.get_setting_value(
"default_password",
db,
default="cosmo" # Fallback if setting doesn't exist
)
2025-11-30 02:43:29 +00:00
user.password_hash = hash_password(default_password)
2025-12-02 13:25:28 +00:00
2025-11-30 02:43:29 +00:00
await db.commit()
2025-12-02 13:25:28 +00:00
return {
"message": f"Password for user {user.username} has been reset to system default.",
"default_password": default_password
}
@router.get("/count", response_model=dict)
async def get_user_count(
db: AsyncSession = Depends(get_db),
2025-12-02 13:25:28 +00:00
current_user: User = Depends(get_current_user) # All authenticated users can access
):
"""
Get the total count of registered users.
2025-12-02 13:25:28 +00:00
Available to all authenticated users.
"""
result = await db.execute(select(func.count(User.id)))
total_users = result.scalar_one()
return {"total_users": total_users}
2025-12-02 13:25:28 +00:00
@router.get("/me")
async def get_current_user_profile(
current_user: User = Depends(get_current_user)
):
"""
Get current user's profile information
"""
return {
"id": current_user.id,
"username": current_user.username,
"email": current_user.email,
"full_name": current_user.full_name,
"is_active": current_user.is_active,
"roles": [role.name for role in current_user.roles],
"created_at": current_user.created_at.isoformat(),
"last_login_at": current_user.last_login_at.isoformat() if current_user.last_login_at else None
}
@router.put("/me/profile")
async def update_current_user_profile(
profile_update: ProfileUpdateRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Update current user's profile information (nickname/full_name and email)
"""
# Check if email is being changed and if it's already taken
if profile_update.email and profile_update.email != current_user.email:
# Check if email is already in use by another user
result = await db.execute(
select(User).where(User.email == profile_update.email, User.id != current_user.id)
)
existing_user = result.scalar_one_or_none()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already in use by another user"
)
current_user.email = profile_update.email
# Update full_name (nickname)
if profile_update.full_name is not None:
current_user.full_name = profile_update.full_name
await db.commit()
await db.refresh(current_user)
return {
"message": "Profile updated successfully",
"user": {
"id": current_user.id,
"username": current_user.username,
"email": current_user.email,
"full_name": current_user.full_name
}
}
@router.put("/me/password")
async def change_current_user_password(
password_change: PasswordChangeRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Change current user's password
"""
# Verify old password
if not verify_password(password_change.old_password, current_user.password_hash):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Current password is incorrect"
)
# Validate new password
if len(password_change.new_password) < 6:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="New password must be at least 6 characters long"
)
if password_change.old_password == password_change.new_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="New password must be different from the old password"
)
# Update password
current_user.password_hash = hash_password(password_change.new_password)
await db.commit()
return {"message": "Password changed successfully"}