from fastapi import APIRouter, Depends, Query from app.core.auth import get_current_admin_user, get_current_user from app.core.response import create_api_response from app.core.database import get_db_connection from app.models.models import ( MenuInfo, MenuListResponse, RolePermissionInfo, UpdateRolePermissionsRequest, RoleInfo, CreateMenuRequest, UpdateMenuRequest, CreateRoleRequest, UpdateRoleRequest, ) from typing import List import time router = APIRouter() _USER_MENU_CACHE_TTL_SECONDS = 120 _USER_MENU_CACHE_VERSION = "menu-rules-v4" _user_menu_cache_by_role = {} def _get_cached_user_menus(role_id: int): cached = _user_menu_cache_by_role.get(role_id) if not cached: return None if cached.get("version") != _USER_MENU_CACHE_VERSION: _user_menu_cache_by_role.pop(role_id, None) return None if time.time() > cached["expires_at"]: _user_menu_cache_by_role.pop(role_id, None) return None return cached["menus"] def _set_cached_user_menus(role_id: int, menus): _user_menu_cache_by_role[role_id] = { "version": _USER_MENU_CACHE_VERSION, "menus": menus, "expires_at": time.time() + _USER_MENU_CACHE_TTL_SECONDS, } def _invalidate_user_menu_cache(role_id: int | None = None): if role_id is None: _user_menu_cache_by_role.clear() return _user_menu_cache_by_role.pop(role_id, None) def _build_menu_index(menus): menu_by_id = {} children_by_parent = {} for menu in menus: menu_id = menu["menu_id"] menu_by_id[menu_id] = menu parent_id = menu.get("parent_id") if parent_id is not None: children_by_parent.setdefault(parent_id, []).append(menu_id) return menu_by_id, children_by_parent def _get_descendants(menu_id, children_by_parent): result = set() stack = [menu_id] while stack: current = stack.pop() for child_id in children_by_parent.get(current, []): if child_id in result: continue result.add(child_id) stack.append(child_id) return result def _normalize_permission_menu_ids(raw_menu_ids, all_menus): """ 对权限菜单ID做归一化: 1. 选中父节点 => 自动包含全部子孙节点 2. 选中子节点 => 自动包含全部祖先节点 """ menu_by_id, children_by_parent = _build_menu_index(all_menus) selected = {menu_id for menu_id in raw_menu_ids if menu_id in menu_by_id} expanded = set(selected) # 父 -> 子孙 for menu_id in list(expanded): expanded.update(_get_descendants(menu_id, children_by_parent)) # 子 -> 祖先 for menu_id in list(expanded): cursor = menu_by_id[menu_id].get("parent_id") while cursor is not None and cursor in menu_by_id: if cursor in expanded: break expanded.add(cursor) cursor = menu_by_id[cursor].get("parent_id") return sorted(expanded) # ========== 菜单权限管理接口 ========== @router.get("/admin/menus") async def get_all_menus(current_user=Depends(get_current_admin_user)): """ 获取所有菜单列表 只有管理员才能访问 """ try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) query = """ SELECT menu_id, menu_code, menu_name, menu_icon, menu_url, menu_type, parent_id, sort_order, is_active, description, created_at, updated_at FROM sys_menus ORDER BY COALESCE(parent_id, menu_id) ASC, CASE WHEN parent_id IS NULL THEN 0 ELSE 1 END ASC, sort_order ASC, menu_id ASC """ cursor.execute(query) menus = cursor.fetchall() menu_list = [MenuInfo(**menu) for menu in menus] return create_api_response( code="200", message="获取菜单列表成功", data=MenuListResponse(menus=menu_list, total=len(menu_list)) ) except Exception as e: return create_api_response(code="500", message=f"获取菜单列表失败: {str(e)}") @router.post("/admin/menus") async def create_menu(request: CreateMenuRequest, current_user=Depends(get_current_admin_user)): """ 创建菜单 """ try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT menu_id FROM sys_menus WHERE menu_code = %s", (request.menu_code,)) if cursor.fetchone(): return create_api_response(code="400", message="菜单编码已存在") if request.parent_id is not None: cursor.execute("SELECT menu_id FROM sys_menus WHERE menu_id = %s", (request.parent_id,)) if not cursor.fetchone(): return create_api_response(code="400", message="父菜单不存在") cursor.execute( """ INSERT INTO sys_menus (menu_code, menu_name, menu_icon, menu_url, menu_type, parent_id, sort_order, is_active, description) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( request.menu_code, request.menu_name, request.menu_icon, request.menu_url, request.menu_type, request.parent_id, request.sort_order, 1 if request.is_active else 0, request.description, ), ) menu_id = cursor.lastrowid connection.commit() _invalidate_user_menu_cache() cursor.execute( """ SELECT menu_id, menu_code, menu_name, menu_icon, menu_url, menu_type, parent_id, sort_order, is_active, description, created_at, updated_at FROM sys_menus WHERE menu_id = %s """, (menu_id,), ) created = cursor.fetchone() return create_api_response(code="200", message="创建菜单成功", data=created) except Exception as e: return create_api_response(code="500", message=f"创建菜单失败: {str(e)}") @router.put("/admin/menus/{menu_id}") async def update_menu(menu_id: int, request: UpdateMenuRequest, current_user=Depends(get_current_admin_user)): """ 更新菜单 """ try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT * FROM sys_menus WHERE menu_id = %s", (menu_id,)) current = cursor.fetchone() if not current: return create_api_response(code="404", message="菜单不存在") updates = {} for field in [ "menu_code", "menu_name", "menu_icon", "menu_url", "menu_type", "sort_order", "description", ]: value = getattr(request, field) if value is not None: updates[field] = value if request.is_active is not None: updates["is_active"] = 1 if request.is_active else 0 fields_set = getattr(request, "model_fields_set", getattr(request, "__fields_set__", set())) # parent_id 允许设为 null,且不允许设为自己 if request.parent_id == menu_id: return create_api_response(code="400", message="父菜单不能为自身") if request.parent_id is not None: cursor.execute("SELECT menu_id FROM sys_menus WHERE menu_id = %s", (request.parent_id,)) if not cursor.fetchone(): return create_api_response(code="400", message="父菜单不存在") # 防止形成环:父菜单不能是当前菜单的子孙 cursor.execute("SELECT menu_id, parent_id FROM sys_menus") all_menus = cursor.fetchall() _, children_by_parent = _build_menu_index(all_menus) descendants = _get_descendants(menu_id, children_by_parent) if request.parent_id in descendants: return create_api_response(code="400", message="父菜单不能设置为当前菜单的子孙菜单") if request.parent_id is not None or (request.parent_id is None and "parent_id" in fields_set): updates["parent_id"] = request.parent_id if "menu_code" in updates: cursor.execute( "SELECT menu_id FROM sys_menus WHERE menu_code = %s AND menu_id != %s", (updates["menu_code"], menu_id), ) if cursor.fetchone(): return create_api_response(code="400", message="菜单编码已存在") if not updates: return create_api_response(code="200", message="没有变更内容", data=current) set_sql = ", ".join([f"{k} = %s" for k in updates.keys()]) values = list(updates.values()) + [menu_id] cursor.execute(f"UPDATE sys_menus SET {set_sql} WHERE menu_id = %s", tuple(values)) connection.commit() _invalidate_user_menu_cache() cursor.execute( """ SELECT menu_id, menu_code, menu_name, menu_icon, menu_url, menu_type, parent_id, sort_order, is_active, description, created_at, updated_at FROM sys_menus WHERE menu_id = %s """, (menu_id,), ) updated = cursor.fetchone() return create_api_response(code="200", message="更新菜单成功", data=updated) except Exception as e: return create_api_response(code="500", message=f"更新菜单失败: {str(e)}") @router.delete("/admin/menus/{menu_id}") async def delete_menu(menu_id: int, current_user=Depends(get_current_admin_user)): """ 删除菜单(有子菜单时不允许删除) """ try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT menu_id FROM sys_menus WHERE menu_id = %s", (menu_id,)) if not cursor.fetchone(): return create_api_response(code="404", message="菜单不存在") cursor.execute("SELECT COUNT(*) AS cnt FROM sys_menus WHERE parent_id = %s", (menu_id,)) child_count = cursor.fetchone()["cnt"] if child_count > 0: return create_api_response(code="400", message="请先删除子菜单") cursor.execute("DELETE FROM sys_role_menu_permissions WHERE menu_id = %s", (menu_id,)) cursor.execute("DELETE FROM sys_menus WHERE menu_id = %s", (menu_id,)) connection.commit() _invalidate_user_menu_cache() return create_api_response(code="200", message="删除菜单成功") except Exception as e: return create_api_response(code="500", message=f"删除菜单失败: {str(e)}") @router.get("/admin/roles") async def get_all_roles(current_user=Depends(get_current_admin_user)): """ 获取所有角色列表及其权限统计 只有管理员才能访问 """ try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # 查询所有角色及其权限数量 query = """ SELECT r.role_id, r.role_name, r.created_at, COUNT(rmp.menu_id) as menu_count FROM sys_roles r LEFT JOIN sys_role_menu_permissions rmp ON r.role_id = rmp.role_id GROUP BY r.role_id ORDER BY r.role_id ASC """ cursor.execute(query) roles = cursor.fetchall() return create_api_response( code="200", message="获取角色列表成功", data={"roles": roles, "total": len(roles)} ) except Exception as e: return create_api_response(code="500", message=f"获取角色列表失败: {str(e)}") @router.post("/admin/roles") async def create_role(request: CreateRoleRequest, current_user=Depends(get_current_admin_user)): """ 创建角色 """ try: role_name = request.role_name.strip() if not role_name: return create_api_response(code="400", message="角色名称不能为空") with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT role_id FROM sys_roles WHERE role_name = %s", (role_name,)) if cursor.fetchone(): return create_api_response(code="400", message="角色名称已存在") cursor.execute("INSERT INTO sys_roles (role_name) VALUES (%s)", (role_name,)) role_id = cursor.lastrowid connection.commit() cursor.execute("SELECT role_id, role_name, created_at FROM sys_roles WHERE role_id = %s", (role_id,)) role = cursor.fetchone() return create_api_response(code="200", message="创建角色成功", data=role) except Exception as e: return create_api_response(code="500", message=f"创建角色失败: {str(e)}") @router.put("/admin/roles/{role_id}") async def update_role(role_id: int, request: UpdateRoleRequest, current_user=Depends(get_current_admin_user)): """ 更新角色 """ try: role_name = request.role_name.strip() if not role_name: return create_api_response(code="400", message="角色名称不能为空") with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT role_id FROM sys_roles WHERE role_id = %s", (role_id,)) if not cursor.fetchone(): return create_api_response(code="404", message="角色不存在") cursor.execute("SELECT role_id FROM sys_roles WHERE role_name = %s AND role_id != %s", (role_name, role_id)) if cursor.fetchone(): return create_api_response(code="400", message="角色名称已存在") cursor.execute("UPDATE sys_roles SET role_name = %s WHERE role_id = %s", (role_name, role_id)) connection.commit() cursor.execute("SELECT role_id, role_name, created_at FROM sys_roles WHERE role_id = %s", (role_id,)) role = cursor.fetchone() return create_api_response(code="200", message="更新角色成功", data=role) except Exception as e: return create_api_response(code="500", message=f"更新角色失败: {str(e)}") @router.get("/admin/roles/{role_id}/users") async def get_role_users( role_id: int, page: int = Query(1, ge=1), size: int = Query(10, ge=1, le=100), current_user=Depends(get_current_admin_user), ): """ 获取角色下用户列表 """ try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT role_id, role_name FROM sys_roles WHERE role_id = %s", (role_id,)) role = cursor.fetchone() if not role: return create_api_response(code="404", message="角色不存在") cursor.execute( """ SELECT COUNT(*) AS total FROM sys_users WHERE role_id = %s """, (role_id,), ) total = cursor.fetchone()["total"] offset = (page - 1) * size cursor.execute( """ SELECT user_id, username, caption, email, avatar_url, role_id, created_at FROM sys_users WHERE role_id = %s ORDER BY user_id ASC LIMIT %s OFFSET %s """, (role_id, size, offset), ) users = cursor.fetchall() return create_api_response( code="200", message="获取角色用户成功", data={ "role_id": role_id, "role_name": role["role_name"], "users": users, "total": total, "page": page, "size": size, }, ) except Exception as e: return create_api_response(code="500", message=f"获取角色用户失败: {str(e)}") @router.get("/admin/roles/permissions/all") async def get_all_role_permissions(current_user=Depends(get_current_admin_user)): """ 批量获取所有角色权限(用于减少N次请求) """ try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute( """ SELECT rmp.role_id, rmp.menu_id FROM sys_role_menu_permissions rmp JOIN sys_menus m ON m.menu_id = rmp.menu_id WHERE m.is_active = 1 ORDER BY rmp.role_id ASC, rmp.menu_id ASC """ ) rows = cursor.fetchall() result = {} for row in rows: result.setdefault(row["role_id"], []).append(row["menu_id"]) return create_api_response(code="200", message="获取角色权限成功", data={"permissions": result}) except Exception as e: return create_api_response(code="500", message=f"获取角色权限失败: {str(e)}") @router.get("/admin/roles/{role_id}/permissions") async def get_role_permissions(role_id: int, current_user=Depends(get_current_admin_user)): """ 获取指定角色的菜单权限 只有管理员才能访问 """ try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # 检查角色是否存在 cursor.execute("SELECT role_id, role_name FROM sys_roles WHERE role_id = %s", (role_id,)) role = cursor.fetchone() if not role: return create_api_response(code="404", message="角色不存在") # 查询该角色的所有菜单权限 query = """ SELECT rmp.menu_id FROM sys_role_menu_permissions rmp JOIN sys_menus m ON m.menu_id = rmp.menu_id WHERE rmp.role_id = %s AND m.is_active = 1 """ cursor.execute(query, (role_id,)) permissions = cursor.fetchall() menu_ids = [p['menu_id'] for p in permissions] return create_api_response( code="200", message="获取角色权限成功", data=RolePermissionInfo( role_id=role['role_id'], role_name=role['role_name'], menu_ids=menu_ids ) ) except Exception as e: return create_api_response(code="500", message=f"获取角色权限失败: {str(e)}") @router.put("/admin/roles/{role_id}/permissions") async def update_role_permissions( role_id: int, request: UpdateRolePermissionsRequest, current_user=Depends(get_current_admin_user) ): """ 更新指定角色的菜单权限 只有管理员才能访问 """ try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # 检查角色是否存在 cursor.execute("SELECT role_id FROM sys_roles WHERE role_id = %s", (role_id,)) if not cursor.fetchone(): return create_api_response(code="404", message="角色不存在") cursor.execute( """ SELECT menu_id, parent_id FROM sys_menus WHERE is_active = 1 """ ) all_menus = cursor.fetchall() menu_id_set = {menu["menu_id"] for menu in all_menus} # 验证所有menu_id是否有效 invalid_menu_ids = [menu_id for menu_id in request.menu_ids if menu_id not in menu_id_set] if invalid_menu_ids: return create_api_response(code="400", message="包含无效的菜单ID") normalized_menu_ids = _normalize_permission_menu_ids(request.menu_ids, all_menus) # 删除该角色的所有现有权限 cursor.execute("DELETE FROM sys_role_menu_permissions WHERE role_id = %s", (role_id,)) # 插入新的权限 if normalized_menu_ids: insert_values = [(role_id, menu_id) for menu_id in normalized_menu_ids] cursor.executemany( "INSERT INTO sys_role_menu_permissions (role_id, menu_id) VALUES (%s, %s)", insert_values ) connection.commit() _invalidate_user_menu_cache(role_id) return create_api_response( code="200", message="更新角色权限成功", data={"role_id": role_id, "menu_count": len(normalized_menu_ids)} ) except Exception as e: return create_api_response(code="500", message=f"更新角色权限失败: {str(e)}") @router.get("/menus/user") async def get_user_menus(current_user=Depends(get_current_user)): """ 获取当前用户可访问的菜单列表(用于渲染下拉菜单) 所有登录用户都可以访问 """ try: role_id = current_user["role_id"] cached_menus = _get_cached_user_menus(role_id) if cached_menus is not None: return create_api_response( code="200", message="获取用户菜单成功", data={"menus": cached_menus} ) with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # 根据用户的role_id查询可访问的菜单 query = """ SELECT m.menu_id, m.menu_code, m.menu_name, m.menu_icon, m.menu_url, m.menu_type, m.parent_id, m.sort_order FROM sys_menus m JOIN sys_role_menu_permissions rmp ON m.menu_id = rmp.menu_id WHERE rmp.role_id = %s AND m.is_active = 1 AND (m.is_visible = 1 OR m.is_visible IS NULL OR m.menu_code IN ('dashboard', 'desktop')) ORDER BY COALESCE(m.parent_id, m.menu_id) ASC, CASE WHEN m.parent_id IS NULL THEN 0 ELSE 1 END ASC, m.sort_order ASC, m.menu_id ASC """ cursor.execute(query, (role_id,)) menus = cursor.fetchall() # 仅在缺失父菜单时补查,减少不必要的SQL current_menu_ids = {menu["menu_id"] for menu in menus} missing_parent_ids = { menu["parent_id"] for menu in menus if menu.get("parent_id") is not None and menu["parent_id"] not in current_menu_ids } if missing_parent_ids: format_strings = ",".join(["%s"] * len(missing_parent_ids)) cursor.execute( f""" SELECT menu_id, menu_code, menu_name, menu_icon, menu_url, menu_type, parent_id, sort_order FROM sys_menus WHERE is_active = 1 AND menu_id IN ({format_strings}) """, tuple(missing_parent_ids), ) parent_rows = cursor.fetchall() menus.extend(parent_rows) current_menu_ids.update(row["menu_id"] for row in parent_rows) menus = sorted( {menu["menu_id"]: menu for menu in menus}.values(), key=lambda m: ( m["parent_id"] if m["parent_id"] is not None else m["menu_id"], 0 if m["parent_id"] is None else 1, m["sort_order"], m["menu_id"], ), ) _set_cached_user_menus(role_id, menus) return create_api_response( code="200", message="获取用户菜单成功", data={"menus": menus} ) except Exception as e: return create_api_response(code="500", message=f"获取用户菜单失败: {str(e)}")