cosmo/backend/app/api/user.py

121 lines
3.8 KiB
Python
Raw Normal View History

2025-11-30 02:43:29 +00:00
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from sqlalchemy import select, func
2025-11-30 02:43:29 +00:00
from typing import List
from pydantic import BaseModel
from app.database import get_db
from app.models.db import User
from app.services.auth import hash_password
from app.services.auth_deps import get_current_user, require_admin # To protect endpoints
2025-11-30 02:43:29 +00:00
router = APIRouter(prefix="/users", tags=["users"])
# Pydantic models
class UserListItem(BaseModel):
id: int
username: str
email: str | None
full_name: str | None
is_active: bool
roles: list[str]
last_login_at: str | None
created_at: str
class Config:
orm_mode = True
class UserStatusUpdate(BaseModel):
is_active: bool
@router.get("/list")
async def get_user_list(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user) # Protect this route
):
"""Get a list of all users"""
# Ensure only admins can see all users
if "admin" not in [role.name for role in current_user.roles]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized")
result = await db.execute(
select(User).options(selectinload(User.roles)).order_by(User.id)
)
users = result.scalars().all()
users_list = []
for user in users:
users_list.append({
"id": user.id,
"username": user.username,
"email": user.email,
"full_name": user.full_name,
"is_active": user.is_active,
"roles": [role.name for role in user.roles],
"last_login_at": user.last_login_at.isoformat() if user.last_login_at else None,
"created_at": user.created_at.isoformat()
})
return {"users": users_list}
@router.put("/{user_id}/status")
async def update_user_status(
user_id: int,
status_update: UserStatusUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update a user's active status"""
if "admin" not in [role.name for role in current_user.roles]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
user.is_active = status_update.is_active
await db.commit()
return {"message": "User status updated successfully"}
@router.post("/{user_id}/reset-password")
async def reset_user_password(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Reset a user's password to the default"""
if "admin" not in [role.name for role in current_user.roles]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
# Hardcoded default password for now.
# TODO: Move to a configurable system parameter.
default_password = "password123"
user.password_hash = hash_password(default_password)
await db.commit()
return {"message": f"Password for user {user.username} has been reset."}
@router.get("/count", response_model=dict)
async def get_user_count(
db: AsyncSession = Depends(get_db),
current_admin_user: User = Depends(require_admin) # Ensure only admin can access
):
"""
Get the total count of registered users.
"""
result = await db.execute(select(func.count(User.id)))
total_users = result.scalar_one()
return {"total_users": total_users}