from fastapi import APIRouter, Depends, HTTPException, Request from sqlmodel import Session, select from core.database import get_session from models.sys_auth import SysUser from schemas.sys_auth import ( SysAuthBootstrapResponse, SysAuthLoginRequest, SysProfileUpdateRequest, SysAuthStatusResponse, SysRoleGrantBootstrapResponse, SysRoleListResponse, SysRoleSummaryResponse, SysRoleUpsertRequest, SysUserCreateRequest, SysUserListResponse, SysUserSummaryResponse, SysUserUpdateRequest, ) from services.sys_auth_service import ( DEFAULT_ADMIN_USERNAME, authenticate_user, build_user_bootstrap, create_sys_role, create_sys_user, delete_sys_role, delete_sys_user, issue_user_token, list_role_grant_bootstrap, list_sys_roles, list_sys_users, resolve_user_by_token, revoke_user_token, update_sys_role, update_sys_user, update_current_sys_user_profile, ) router = APIRouter() def _extract_auth_token(request: Request) -> str: authorization = str(request.headers.get("authorization") or "").strip() if authorization.lower().startswith("bearer "): return authorization[7:].strip() return str(request.headers.get("x-auth-token") or request.query_params.get("auth_token") or "").strip() def _require_current_user(request: Request, session: Session) -> SysUser: state_user_id = getattr(request.state, "sys_user_id", None) if state_user_id: user = session.get(SysUser, state_user_id) if user is not None and bool(user.is_active): return user token = _extract_auth_token(request) user = resolve_user_by_token(session, token) if user is None: raise HTTPException(status_code=401, detail="Authentication required") return user @router.get("/api/sys/auth/status", response_model=SysAuthStatusResponse) def get_sys_auth_status(session: Session = Depends(get_session)): user_count = len(session.exec(select(SysUser)).all()) return SysAuthStatusResponse( enabled=True, user_count=user_count, default_username=DEFAULT_ADMIN_USERNAME, ) @router.post("/api/sys/auth/login", response_model=SysAuthBootstrapResponse) def login_sys_user(payload: SysAuthLoginRequest, session: Session = Depends(get_session)): username = str(payload.username or "").strip().lower() password = str(payload.password or "") user = authenticate_user(session, username, password) if user is None: raise HTTPException(status_code=401, detail="Invalid username or password") try: token, expires_at = issue_user_token(session, user) except RuntimeError as exc: raise HTTPException(status_code=503, detail=str(exc)) from exc return SysAuthBootstrapResponse.model_validate(build_user_bootstrap(session, user, token=token, expires_at=expires_at)) @router.post("/api/sys/auth/logout") def logout_sys_user(request: Request, session: Session = Depends(get_session)): token = _extract_auth_token(request) user = resolve_user_by_token(session, token) if user is not None: revoke_user_token(token) return {"success": True} @router.get("/api/sys/auth/me", response_model=SysAuthBootstrapResponse) def get_current_sys_user(request: Request, session: Session = Depends(get_session)): user = _require_current_user(request, session) return SysAuthBootstrapResponse.model_validate(build_user_bootstrap(session, user)) @router.put("/api/sys/auth/me", response_model=SysAuthBootstrapResponse) def update_current_sys_user( payload: SysProfileUpdateRequest, request: Request, session: Session = Depends(get_session), ): current_user = _require_current_user(request, session) try: user = update_current_sys_user_profile( session, user_id=int(current_user.id or 0), display_name=payload.display_name, password=payload.password, ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc return SysAuthBootstrapResponse.model_validate(build_user_bootstrap(session, user)) @router.get("/api/sys/users", response_model=SysUserListResponse) def list_sys_users_api(request: Request, session: Session = Depends(get_session)): _require_current_user(request, session) return SysUserListResponse(items=[SysUserSummaryResponse.model_validate(item) for item in list_sys_users(session)]) @router.post("/api/sys/users", response_model=SysUserSummaryResponse) def create_sys_user_api(payload: SysUserCreateRequest, request: Request, session: Session = Depends(get_session)): _require_current_user(request, session) try: item = create_sys_user( session, username=payload.username, display_name=payload.display_name, password=payload.password, role_id=int(payload.role_id), is_active=bool(payload.is_active), bot_ids=list(payload.bot_ids or []), ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc return SysUserSummaryResponse.model_validate(item) @router.put("/api/sys/users/{user_id}", response_model=SysUserSummaryResponse) def update_sys_user_api(user_id: int, payload: SysUserUpdateRequest, request: Request, session: Session = Depends(get_session)): current_user = _require_current_user(request, session) try: item = update_sys_user( session, user_id=int(user_id), display_name=payload.display_name, password=payload.password, role_id=int(payload.role_id), is_active=bool(payload.is_active), bot_ids=list(payload.bot_ids or []), acting_user_id=int(current_user.id or 0), ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc return SysUserSummaryResponse.model_validate(item) @router.delete("/api/sys/users/{user_id}") def delete_sys_user_api(user_id: int, request: Request, session: Session = Depends(get_session)): current_user = _require_current_user(request, session) try: delete_sys_user(session, user_id=int(user_id), acting_user_id=int(current_user.id or 0)) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc return {"success": True} @router.get("/api/sys/roles", response_model=SysRoleListResponse) def list_sys_roles_api(request: Request, session: Session = Depends(get_session)): _require_current_user(request, session) return SysRoleListResponse(items=[SysRoleSummaryResponse.model_validate(item) for item in list_sys_roles(session)]) @router.get("/api/sys/roles/grants/bootstrap", response_model=SysRoleGrantBootstrapResponse) def list_sys_role_grants_bootstrap_api(request: Request, session: Session = Depends(get_session)): _require_current_user(request, session) return SysRoleGrantBootstrapResponse.model_validate(list_role_grant_bootstrap(session)) @router.post("/api/sys/roles", response_model=SysRoleSummaryResponse) def create_sys_role_api(payload: SysRoleUpsertRequest, request: Request, session: Session = Depends(get_session)): _require_current_user(request, session) try: item = create_sys_role( session, role_key=payload.role_key, name=payload.name, description=payload.description, is_active=bool(payload.is_active), sort_order=int(payload.sort_order), menu_keys=list(payload.menu_keys or []), permission_keys=list(payload.permission_keys or []), ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc return SysRoleSummaryResponse.model_validate(item) @router.put("/api/sys/roles/{role_id}", response_model=SysRoleSummaryResponse) def update_sys_role_api(role_id: int, payload: SysRoleUpsertRequest, request: Request, session: Session = Depends(get_session)): _require_current_user(request, session) try: item = update_sys_role( session, role_id=int(role_id), name=payload.name, description=payload.description, is_active=bool(payload.is_active), sort_order=int(payload.sort_order), menu_keys=list(payload.menu_keys or []), permission_keys=list(payload.permission_keys or []), ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc return SysRoleSummaryResponse.model_validate(item) @router.delete("/api/sys/roles/{role_id}") def delete_sys_role_api(role_id: int, request: Request, session: Session = Depends(get_session)): _require_current_user(request, session) try: delete_sys_role(session, role_id=int(role_id)) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc return {"success": True}