dashboard-nanobot/backend/api/sys_router.py

231 lines
8.8 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Request
from sqlmodel import Session, select
from core.database import get_session
from models.sys_auth import SysUser
from schemas.sys_auth import (
SysAuthBootstrapResponse,
SysAuthLoginRequest,
SysProfileUpdateRequest,
SysAuthStatusResponse,
SysRoleGrantBootstrapResponse,
SysRoleListResponse,
SysRoleSummaryResponse,
SysRoleUpsertRequest,
SysUserCreateRequest,
SysUserListResponse,
SysUserSummaryResponse,
SysUserUpdateRequest,
)
from services.sys_auth_service import (
DEFAULT_ADMIN_USERNAME,
authenticate_user,
build_user_bootstrap,
create_sys_role,
create_sys_user,
delete_sys_role,
delete_sys_user,
issue_user_token,
list_role_grant_bootstrap,
list_sys_roles,
list_sys_users,
resolve_user_by_token,
revoke_user_token,
update_sys_role,
update_sys_user,
update_current_sys_user_profile,
)
router = APIRouter()
def _extract_auth_token(request: Request) -> str:
authorization = str(request.headers.get("authorization") or "").strip()
if authorization.lower().startswith("bearer "):
return authorization[7:].strip()
return str(request.headers.get("x-auth-token") or request.query_params.get("auth_token") or "").strip()
def _require_current_user(request: Request, session: Session) -> SysUser:
state_user_id = getattr(request.state, "sys_user_id", None)
if state_user_id:
user = session.get(SysUser, state_user_id)
if user is not None and bool(user.is_active):
return user
token = _extract_auth_token(request)
user = resolve_user_by_token(session, token)
if user is None:
raise HTTPException(status_code=401, detail="Authentication required")
return user
@router.get("/api/sys/auth/status", response_model=SysAuthStatusResponse)
def get_sys_auth_status(session: Session = Depends(get_session)):
user_count = len(session.exec(select(SysUser)).all())
return SysAuthStatusResponse(
enabled=True,
user_count=user_count,
default_username=DEFAULT_ADMIN_USERNAME,
)
@router.post("/api/sys/auth/login", response_model=SysAuthBootstrapResponse)
def login_sys_user(payload: SysAuthLoginRequest, session: Session = Depends(get_session)):
username = str(payload.username or "").strip().lower()
password = str(payload.password or "")
user = authenticate_user(session, username, password)
if user is None:
raise HTTPException(status_code=401, detail="Invalid username or password")
try:
token, expires_at = issue_user_token(session, user)
except RuntimeError as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
return SysAuthBootstrapResponse.model_validate(build_user_bootstrap(session, user, token=token, expires_at=expires_at))
@router.post("/api/sys/auth/logout")
def logout_sys_user(request: Request, session: Session = Depends(get_session)):
token = _extract_auth_token(request)
user = resolve_user_by_token(session, token)
if user is not None:
revoke_user_token(token)
return {"success": True}
@router.get("/api/sys/auth/me", response_model=SysAuthBootstrapResponse)
def get_current_sys_user(request: Request, session: Session = Depends(get_session)):
user = _require_current_user(request, session)
return SysAuthBootstrapResponse.model_validate(build_user_bootstrap(session, user))
@router.put("/api/sys/auth/me", response_model=SysAuthBootstrapResponse)
def update_current_sys_user(
payload: SysProfileUpdateRequest,
request: Request,
session: Session = Depends(get_session),
):
current_user = _require_current_user(request, session)
try:
user = update_current_sys_user_profile(
session,
user_id=int(current_user.id or 0),
display_name=payload.display_name,
password=payload.password,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
return SysAuthBootstrapResponse.model_validate(build_user_bootstrap(session, user))
@router.get("/api/sys/users", response_model=SysUserListResponse)
def list_sys_users_api(request: Request, session: Session = Depends(get_session)):
_require_current_user(request, session)
return SysUserListResponse(items=[SysUserSummaryResponse.model_validate(item) for item in list_sys_users(session)])
@router.post("/api/sys/users", response_model=SysUserSummaryResponse)
def create_sys_user_api(payload: SysUserCreateRequest, request: Request, session: Session = Depends(get_session)):
_require_current_user(request, session)
try:
item = create_sys_user(
session,
username=payload.username,
display_name=payload.display_name,
password=payload.password,
role_id=int(payload.role_id),
is_active=bool(payload.is_active),
bot_ids=list(payload.bot_ids or []),
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
return SysUserSummaryResponse.model_validate(item)
@router.put("/api/sys/users/{user_id}", response_model=SysUserSummaryResponse)
def update_sys_user_api(user_id: int, payload: SysUserUpdateRequest, request: Request, session: Session = Depends(get_session)):
current_user = _require_current_user(request, session)
try:
item = update_sys_user(
session,
user_id=int(user_id),
display_name=payload.display_name,
password=payload.password,
role_id=int(payload.role_id),
is_active=bool(payload.is_active),
bot_ids=list(payload.bot_ids or []),
acting_user_id=int(current_user.id or 0),
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
return SysUserSummaryResponse.model_validate(item)
@router.delete("/api/sys/users/{user_id}")
def delete_sys_user_api(user_id: int, request: Request, session: Session = Depends(get_session)):
current_user = _require_current_user(request, session)
try:
delete_sys_user(session, user_id=int(user_id), acting_user_id=int(current_user.id or 0))
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
return {"success": True}
@router.get("/api/sys/roles", response_model=SysRoleListResponse)
def list_sys_roles_api(request: Request, session: Session = Depends(get_session)):
_require_current_user(request, session)
return SysRoleListResponse(items=[SysRoleSummaryResponse.model_validate(item) for item in list_sys_roles(session)])
@router.get("/api/sys/roles/grants/bootstrap", response_model=SysRoleGrantBootstrapResponse)
def list_sys_role_grants_bootstrap_api(request: Request, session: Session = Depends(get_session)):
_require_current_user(request, session)
return SysRoleGrantBootstrapResponse.model_validate(list_role_grant_bootstrap(session))
@router.post("/api/sys/roles", response_model=SysRoleSummaryResponse)
def create_sys_role_api(payload: SysRoleUpsertRequest, request: Request, session: Session = Depends(get_session)):
_require_current_user(request, session)
try:
item = create_sys_role(
session,
role_key=payload.role_key,
name=payload.name,
description=payload.description,
is_active=bool(payload.is_active),
sort_order=int(payload.sort_order),
menu_keys=list(payload.menu_keys or []),
permission_keys=list(payload.permission_keys or []),
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
return SysRoleSummaryResponse.model_validate(item)
@router.put("/api/sys/roles/{role_id}", response_model=SysRoleSummaryResponse)
def update_sys_role_api(role_id: int, payload: SysRoleUpsertRequest, request: Request, session: Session = Depends(get_session)):
_require_current_user(request, session)
try:
item = update_sys_role(
session,
role_id=int(role_id),
name=payload.name,
description=payload.description,
is_active=bool(payload.is_active),
sort_order=int(payload.sort_order),
menu_keys=list(payload.menu_keys or []),
permission_keys=list(payload.permission_keys or []),
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
return SysRoleSummaryResponse.model_validate(item)
@router.delete("/api/sys/roles/{role_id}")
def delete_sys_role_api(role_id: int, request: Request, session: Session = Depends(get_session)):
_require_current_user(request, session)
try:
delete_sys_role(session, role_id=int(role_id))
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
return {"success": True}