imetting/backend/app/api/endpoints/admin.py

648 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from fastapi import APIRouter, Depends, Query
from app.core.auth import get_current_admin_user, get_current_user
from app.core.response import create_api_response
from app.core.database import get_db_connection
from app.models.models import (
MenuInfo,
MenuListResponse,
RolePermissionInfo,
UpdateRolePermissionsRequest,
RoleInfo,
CreateMenuRequest,
UpdateMenuRequest,
CreateRoleRequest,
UpdateRoleRequest,
)
from typing import List
import time
router = APIRouter()
_USER_MENU_CACHE_TTL_SECONDS = 120
_USER_MENU_CACHE_VERSION = "menu-rules-v4"
_user_menu_cache_by_role = {}
def _get_cached_user_menus(role_id: int):
cached = _user_menu_cache_by_role.get(role_id)
if not cached:
return None
if cached.get("version") != _USER_MENU_CACHE_VERSION:
_user_menu_cache_by_role.pop(role_id, None)
return None
if time.time() > cached["expires_at"]:
_user_menu_cache_by_role.pop(role_id, None)
return None
return cached["menus"]
def _set_cached_user_menus(role_id: int, menus):
_user_menu_cache_by_role[role_id] = {
"version": _USER_MENU_CACHE_VERSION,
"menus": menus,
"expires_at": time.time() + _USER_MENU_CACHE_TTL_SECONDS,
}
def _invalidate_user_menu_cache(role_id: int | None = None):
if role_id is None:
_user_menu_cache_by_role.clear()
return
_user_menu_cache_by_role.pop(role_id, None)
def _build_menu_index(menus):
menu_by_id = {}
children_by_parent = {}
for menu in menus:
menu_id = menu["menu_id"]
menu_by_id[menu_id] = menu
parent_id = menu.get("parent_id")
if parent_id is not None:
children_by_parent.setdefault(parent_id, []).append(menu_id)
return menu_by_id, children_by_parent
def _get_descendants(menu_id, children_by_parent):
result = set()
stack = [menu_id]
while stack:
current = stack.pop()
for child_id in children_by_parent.get(current, []):
if child_id in result:
continue
result.add(child_id)
stack.append(child_id)
return result
def _normalize_permission_menu_ids(raw_menu_ids, all_menus):
"""
对权限菜单ID做归一化
1. 选中父节点 => 自动包含全部子孙节点
2. 选中子节点 => 自动包含全部祖先节点
"""
menu_by_id, children_by_parent = _build_menu_index(all_menus)
selected = {menu_id for menu_id in raw_menu_ids if menu_id in menu_by_id}
expanded = set(selected)
# 父 -> 子孙
for menu_id in list(expanded):
expanded.update(_get_descendants(menu_id, children_by_parent))
# 子 -> 祖先
for menu_id in list(expanded):
cursor = menu_by_id[menu_id].get("parent_id")
while cursor is not None and cursor in menu_by_id:
if cursor in expanded:
break
expanded.add(cursor)
cursor = menu_by_id[cursor].get("parent_id")
return sorted(expanded)
# ========== 菜单权限管理接口 ==========
@router.get("/admin/menus")
async def get_all_menus(current_user=Depends(get_current_admin_user)):
"""
获取所有菜单列表
只有管理员才能访问
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = """
SELECT menu_id, menu_code, menu_name, menu_icon, menu_url, menu_type,
parent_id, sort_order, is_active, description, created_at, updated_at
FROM sys_menus
ORDER BY
COALESCE(parent_id, menu_id) ASC,
CASE WHEN parent_id IS NULL THEN 0 ELSE 1 END ASC,
sort_order ASC,
menu_id ASC
"""
cursor.execute(query)
menus = cursor.fetchall()
menu_list = [MenuInfo(**menu) for menu in menus]
return create_api_response(
code="200",
message="获取菜单列表成功",
data=MenuListResponse(menus=menu_list, total=len(menu_list))
)
except Exception as e:
return create_api_response(code="500", message=f"获取菜单列表失败: {str(e)}")
@router.post("/admin/menus")
async def create_menu(request: CreateMenuRequest, current_user=Depends(get_current_admin_user)):
"""
创建菜单
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT menu_id FROM sys_menus WHERE menu_code = %s", (request.menu_code,))
if cursor.fetchone():
return create_api_response(code="400", message="菜单编码已存在")
if request.parent_id is not None:
cursor.execute("SELECT menu_id FROM sys_menus WHERE menu_id = %s", (request.parent_id,))
if not cursor.fetchone():
return create_api_response(code="400", message="父菜单不存在")
cursor.execute(
"""
INSERT INTO sys_menus (menu_code, menu_name, menu_icon, menu_url, menu_type, parent_id, sort_order, is_active, description)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
request.menu_code,
request.menu_name,
request.menu_icon,
request.menu_url,
request.menu_type,
request.parent_id,
request.sort_order,
1 if request.is_active else 0,
request.description,
),
)
menu_id = cursor.lastrowid
connection.commit()
_invalidate_user_menu_cache()
cursor.execute(
"""
SELECT menu_id, menu_code, menu_name, menu_icon, menu_url, menu_type,
parent_id, sort_order, is_active, description, created_at, updated_at
FROM sys_menus
WHERE menu_id = %s
""",
(menu_id,),
)
created = cursor.fetchone()
return create_api_response(code="200", message="创建菜单成功", data=created)
except Exception as e:
return create_api_response(code="500", message=f"创建菜单失败: {str(e)}")
@router.put("/admin/menus/{menu_id}")
async def update_menu(menu_id: int, request: UpdateMenuRequest, current_user=Depends(get_current_admin_user)):
"""
更新菜单
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT * FROM sys_menus WHERE menu_id = %s", (menu_id,))
current = cursor.fetchone()
if not current:
return create_api_response(code="404", message="菜单不存在")
updates = {}
for field in [
"menu_code",
"menu_name",
"menu_icon",
"menu_url",
"menu_type",
"sort_order",
"description",
]:
value = getattr(request, field)
if value is not None:
updates[field] = value
if request.is_active is not None:
updates["is_active"] = 1 if request.is_active else 0
fields_set = getattr(request, "model_fields_set", getattr(request, "__fields_set__", set()))
# parent_id 允许设为 null且不允许设为自己
if request.parent_id == menu_id:
return create_api_response(code="400", message="父菜单不能为自身")
if request.parent_id is not None:
cursor.execute("SELECT menu_id FROM sys_menus WHERE menu_id = %s", (request.parent_id,))
if not cursor.fetchone():
return create_api_response(code="400", message="父菜单不存在")
# 防止形成环:父菜单不能是当前菜单的子孙
cursor.execute("SELECT menu_id, parent_id FROM sys_menus")
all_menus = cursor.fetchall()
_, children_by_parent = _build_menu_index(all_menus)
descendants = _get_descendants(menu_id, children_by_parent)
if request.parent_id in descendants:
return create_api_response(code="400", message="父菜单不能设置为当前菜单的子孙菜单")
if request.parent_id is not None or (request.parent_id is None and "parent_id" in fields_set):
updates["parent_id"] = request.parent_id
if "menu_code" in updates:
cursor.execute(
"SELECT menu_id FROM sys_menus WHERE menu_code = %s AND menu_id != %s",
(updates["menu_code"], menu_id),
)
if cursor.fetchone():
return create_api_response(code="400", message="菜单编码已存在")
if not updates:
return create_api_response(code="200", message="没有变更内容", data=current)
set_sql = ", ".join([f"{k} = %s" for k in updates.keys()])
values = list(updates.values()) + [menu_id]
cursor.execute(f"UPDATE sys_menus SET {set_sql} WHERE menu_id = %s", tuple(values))
connection.commit()
_invalidate_user_menu_cache()
cursor.execute(
"""
SELECT menu_id, menu_code, menu_name, menu_icon, menu_url, menu_type,
parent_id, sort_order, is_active, description, created_at, updated_at
FROM sys_menus
WHERE menu_id = %s
""",
(menu_id,),
)
updated = cursor.fetchone()
return create_api_response(code="200", message="更新菜单成功", data=updated)
except Exception as e:
return create_api_response(code="500", message=f"更新菜单失败: {str(e)}")
@router.delete("/admin/menus/{menu_id}")
async def delete_menu(menu_id: int, current_user=Depends(get_current_admin_user)):
"""
删除菜单(有子菜单时不允许删除)
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT menu_id FROM sys_menus WHERE menu_id = %s", (menu_id,))
if not cursor.fetchone():
return create_api_response(code="404", message="菜单不存在")
cursor.execute("SELECT COUNT(*) AS cnt FROM sys_menus WHERE parent_id = %s", (menu_id,))
child_count = cursor.fetchone()["cnt"]
if child_count > 0:
return create_api_response(code="400", message="请先删除子菜单")
cursor.execute("DELETE FROM sys_role_menu_permissions WHERE menu_id = %s", (menu_id,))
cursor.execute("DELETE FROM sys_menus WHERE menu_id = %s", (menu_id,))
connection.commit()
_invalidate_user_menu_cache()
return create_api_response(code="200", message="删除菜单成功")
except Exception as e:
return create_api_response(code="500", message=f"删除菜单失败: {str(e)}")
@router.get("/admin/roles")
async def get_all_roles(current_user=Depends(get_current_admin_user)):
"""
获取所有角色列表及其权限统计
只有管理员才能访问
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 查询所有角色及其权限数量
query = """
SELECT r.role_id, r.role_name, r.created_at,
COUNT(rmp.menu_id) as menu_count
FROM sys_roles r
LEFT JOIN sys_role_menu_permissions rmp ON r.role_id = rmp.role_id
GROUP BY r.role_id
ORDER BY r.role_id ASC
"""
cursor.execute(query)
roles = cursor.fetchall()
return create_api_response(
code="200",
message="获取角色列表成功",
data={"roles": roles, "total": len(roles)}
)
except Exception as e:
return create_api_response(code="500", message=f"获取角色列表失败: {str(e)}")
@router.post("/admin/roles")
async def create_role(request: CreateRoleRequest, current_user=Depends(get_current_admin_user)):
"""
创建角色
"""
try:
role_name = request.role_name.strip()
if not role_name:
return create_api_response(code="400", message="角色名称不能为空")
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT role_id FROM sys_roles WHERE role_name = %s", (role_name,))
if cursor.fetchone():
return create_api_response(code="400", message="角色名称已存在")
cursor.execute("INSERT INTO sys_roles (role_name) VALUES (%s)", (role_name,))
role_id = cursor.lastrowid
connection.commit()
cursor.execute("SELECT role_id, role_name, created_at FROM sys_roles WHERE role_id = %s", (role_id,))
role = cursor.fetchone()
return create_api_response(code="200", message="创建角色成功", data=role)
except Exception as e:
return create_api_response(code="500", message=f"创建角色失败: {str(e)}")
@router.put("/admin/roles/{role_id}")
async def update_role(role_id: int, request: UpdateRoleRequest, current_user=Depends(get_current_admin_user)):
"""
更新角色
"""
try:
role_name = request.role_name.strip()
if not role_name:
return create_api_response(code="400", message="角色名称不能为空")
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT role_id FROM sys_roles WHERE role_id = %s", (role_id,))
if not cursor.fetchone():
return create_api_response(code="404", message="角色不存在")
cursor.execute("SELECT role_id FROM sys_roles WHERE role_name = %s AND role_id != %s", (role_name, role_id))
if cursor.fetchone():
return create_api_response(code="400", message="角色名称已存在")
cursor.execute("UPDATE sys_roles SET role_name = %s WHERE role_id = %s", (role_name, role_id))
connection.commit()
cursor.execute("SELECT role_id, role_name, created_at FROM sys_roles WHERE role_id = %s", (role_id,))
role = cursor.fetchone()
return create_api_response(code="200", message="更新角色成功", data=role)
except Exception as e:
return create_api_response(code="500", message=f"更新角色失败: {str(e)}")
@router.get("/admin/roles/{role_id}/users")
async def get_role_users(
role_id: int,
page: int = Query(1, ge=1),
size: int = Query(10, ge=1, le=100),
current_user=Depends(get_current_admin_user),
):
"""
获取角色下用户列表
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT role_id, role_name FROM sys_roles WHERE role_id = %s", (role_id,))
role = cursor.fetchone()
if not role:
return create_api_response(code="404", message="角色不存在")
cursor.execute(
"""
SELECT COUNT(*) AS total
FROM sys_users
WHERE role_id = %s
""",
(role_id,),
)
total = cursor.fetchone()["total"]
offset = (page - 1) * size
cursor.execute(
"""
SELECT user_id, username, caption, email, avatar_url, role_id, created_at
FROM sys_users
WHERE role_id = %s
ORDER BY user_id ASC
LIMIT %s OFFSET %s
""",
(role_id, size, offset),
)
users = cursor.fetchall()
return create_api_response(
code="200",
message="获取角色用户成功",
data={
"role_id": role_id,
"role_name": role["role_name"],
"users": users,
"total": total,
"page": page,
"size": size,
},
)
except Exception as e:
return create_api_response(code="500", message=f"获取角色用户失败: {str(e)}")
@router.get("/admin/roles/permissions/all")
async def get_all_role_permissions(current_user=Depends(get_current_admin_user)):
"""
批量获取所有角色权限用于减少N次请求
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute(
"""
SELECT rmp.role_id, rmp.menu_id
FROM sys_role_menu_permissions rmp
JOIN sys_menus m ON m.menu_id = rmp.menu_id
WHERE m.is_active = 1
ORDER BY rmp.role_id ASC, rmp.menu_id ASC
"""
)
rows = cursor.fetchall()
result = {}
for row in rows:
result.setdefault(row["role_id"], []).append(row["menu_id"])
return create_api_response(code="200", message="获取角色权限成功", data={"permissions": result})
except Exception as e:
return create_api_response(code="500", message=f"获取角色权限失败: {str(e)}")
@router.get("/admin/roles/{role_id}/permissions")
async def get_role_permissions(role_id: int, current_user=Depends(get_current_admin_user)):
"""
获取指定角色的菜单权限
只有管理员才能访问
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 检查角色是否存在
cursor.execute("SELECT role_id, role_name FROM sys_roles WHERE role_id = %s", (role_id,))
role = cursor.fetchone()
if not role:
return create_api_response(code="404", message="角色不存在")
# 查询该角色的所有菜单权限
query = """
SELECT rmp.menu_id
FROM sys_role_menu_permissions rmp
JOIN sys_menus m ON m.menu_id = rmp.menu_id
WHERE rmp.role_id = %s
AND m.is_active = 1
"""
cursor.execute(query, (role_id,))
permissions = cursor.fetchall()
menu_ids = [p['menu_id'] for p in permissions]
return create_api_response(
code="200",
message="获取角色权限成功",
data=RolePermissionInfo(
role_id=role['role_id'],
role_name=role['role_name'],
menu_ids=menu_ids
)
)
except Exception as e:
return create_api_response(code="500", message=f"获取角色权限失败: {str(e)}")
@router.put("/admin/roles/{role_id}/permissions")
async def update_role_permissions(
role_id: int,
request: UpdateRolePermissionsRequest,
current_user=Depends(get_current_admin_user)
):
"""
更新指定角色的菜单权限
只有管理员才能访问
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 检查角色是否存在
cursor.execute("SELECT role_id FROM sys_roles WHERE role_id = %s", (role_id,))
if not cursor.fetchone():
return create_api_response(code="404", message="角色不存在")
cursor.execute(
"""
SELECT menu_id, parent_id
FROM sys_menus
WHERE is_active = 1
"""
)
all_menus = cursor.fetchall()
menu_id_set = {menu["menu_id"] for menu in all_menus}
# 验证所有menu_id是否有效
invalid_menu_ids = [menu_id for menu_id in request.menu_ids if menu_id not in menu_id_set]
if invalid_menu_ids:
return create_api_response(code="400", message="包含无效的菜单ID")
normalized_menu_ids = _normalize_permission_menu_ids(request.menu_ids, all_menus)
# 删除该角色的所有现有权限
cursor.execute("DELETE FROM sys_role_menu_permissions WHERE role_id = %s", (role_id,))
# 插入新的权限
if normalized_menu_ids:
insert_values = [(role_id, menu_id) for menu_id in normalized_menu_ids]
cursor.executemany(
"INSERT INTO sys_role_menu_permissions (role_id, menu_id) VALUES (%s, %s)",
insert_values
)
connection.commit()
_invalidate_user_menu_cache(role_id)
return create_api_response(
code="200",
message="更新角色权限成功",
data={"role_id": role_id, "menu_count": len(normalized_menu_ids)}
)
except Exception as e:
return create_api_response(code="500", message=f"更新角色权限失败: {str(e)}")
@router.get("/menus/user")
async def get_user_menus(current_user=Depends(get_current_user)):
"""
获取当前用户可访问的菜单列表(用于渲染下拉菜单)
所有登录用户都可以访问
"""
try:
role_id = current_user["role_id"]
cached_menus = _get_cached_user_menus(role_id)
if cached_menus is not None:
return create_api_response(
code="200",
message="获取用户菜单成功",
data={"menus": cached_menus}
)
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 根据用户的role_id查询可访问的菜单
query = """
SELECT m.menu_id, m.menu_code, m.menu_name, m.menu_icon,
m.menu_url, m.menu_type, m.parent_id, m.sort_order
FROM sys_menus m
JOIN sys_role_menu_permissions rmp ON m.menu_id = rmp.menu_id
WHERE rmp.role_id = %s
AND m.is_active = 1
AND (m.is_visible = 1 OR m.is_visible IS NULL OR m.menu_code IN ('dashboard', 'desktop'))
ORDER BY
COALESCE(m.parent_id, m.menu_id) ASC,
CASE WHEN m.parent_id IS NULL THEN 0 ELSE 1 END ASC,
m.sort_order ASC,
m.menu_id ASC
"""
cursor.execute(query, (role_id,))
menus = cursor.fetchall()
# 仅在缺失父菜单时补查减少不必要的SQL
current_menu_ids = {menu["menu_id"] for menu in menus}
missing_parent_ids = {
menu["parent_id"] for menu in menus
if menu.get("parent_id") is not None and menu["parent_id"] not in current_menu_ids
}
if missing_parent_ids:
format_strings = ",".join(["%s"] * len(missing_parent_ids))
cursor.execute(
f"""
SELECT menu_id, menu_code, menu_name, menu_icon, menu_url, menu_type, parent_id, sort_order
FROM sys_menus
WHERE is_active = 1 AND menu_id IN ({format_strings})
""",
tuple(missing_parent_ids),
)
parent_rows = cursor.fetchall()
menus.extend(parent_rows)
current_menu_ids.update(row["menu_id"] for row in parent_rows)
menus = sorted(
{menu["menu_id"]: menu for menu in menus}.values(),
key=lambda m: (
m["parent_id"] if m["parent_id"] is not None else m["menu_id"],
0 if m["parent_id"] is None else 1,
m["sort_order"],
m["menu_id"],
),
)
_set_cached_user_menus(role_id, menus)
return create_api_response(
code="200",
message="获取用户菜单成功",
data={"menus": menus}
)
except Exception as e:
return create_api_response(code="500", message=f"获取用户菜单失败: {str(e)}")