imetting/backend/app/services/admin_service.py

551 lines
21 KiB
Python

import time
from typing import Any
from app.core.database import get_db_connection
from app.core.response import create_api_response
from app.models.models import MenuInfo, MenuListResponse, RolePermissionInfo
_USER_MENU_CACHE_TTL_SECONDS = 120
_USER_MENU_CACHE_VERSION = "menu-rules-v4"
_user_menu_cache_by_role: dict[int, dict[str, Any]] = {}
def _get_cached_user_menus(role_id: int):
cached = _user_menu_cache_by_role.get(role_id)
if not cached:
return None
if cached.get("version") != _USER_MENU_CACHE_VERSION:
_user_menu_cache_by_role.pop(role_id, None)
return None
if time.time() > cached["expires_at"]:
_user_menu_cache_by_role.pop(role_id, None)
return None
return cached["menus"]
def _set_cached_user_menus(role_id: int, menus):
_user_menu_cache_by_role[role_id] = {
"version": _USER_MENU_CACHE_VERSION,
"menus": menus,
"expires_at": time.time() + _USER_MENU_CACHE_TTL_SECONDS,
}
def _invalidate_user_menu_cache(role_id: int | None = None):
if role_id is None:
_user_menu_cache_by_role.clear()
return
_user_menu_cache_by_role.pop(role_id, None)
def _build_menu_index(menus):
menu_by_id = {}
children_by_parent = {}
for menu in menus:
menu_id = menu["menu_id"]
menu_by_id[menu_id] = menu
parent_id = menu.get("parent_id")
if parent_id is not None:
children_by_parent.setdefault(parent_id, []).append(menu_id)
return menu_by_id, children_by_parent
def _get_descendants(menu_id, children_by_parent):
result = set()
stack = [menu_id]
while stack:
current = stack.pop()
for child_id in children_by_parent.get(current, []):
if child_id in result:
continue
result.add(child_id)
stack.append(child_id)
return result
def _normalize_permission_menu_ids(raw_menu_ids, all_menus):
menu_by_id, children_by_parent = _build_menu_index(all_menus)
selected = {menu_id for menu_id in raw_menu_ids if menu_id in menu_by_id}
expanded = set(selected)
for menu_id in list(expanded):
expanded.update(_get_descendants(menu_id, children_by_parent))
for menu_id in list(expanded):
cursor = menu_by_id[menu_id].get("parent_id")
while cursor is not None and cursor in menu_by_id:
if cursor in expanded:
break
expanded.add(cursor)
cursor = menu_by_id[cursor].get("parent_id")
return sorted(expanded)
def get_all_menus():
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = """
SELECT menu_id, menu_code, menu_name, menu_icon, menu_url, menu_type,
parent_id, sort_order, is_active, description, created_at, updated_at
FROM sys_menus
ORDER BY
COALESCE(parent_id, menu_id) ASC,
CASE WHEN parent_id IS NULL THEN 0 ELSE 1 END ASC,
sort_order ASC,
menu_id ASC
"""
cursor.execute(query)
menus = cursor.fetchall()
menu_list = [MenuInfo(**menu) for menu in menus]
return create_api_response(
code="200",
message="获取菜单列表成功",
data=MenuListResponse(menus=menu_list, total=len(menu_list)),
)
except Exception as e:
return create_api_response(code="500", message=f"获取菜单列表失败: {str(e)}")
def create_menu(request):
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT menu_id FROM sys_menus WHERE menu_code = %s", (request.menu_code,))
if cursor.fetchone():
return create_api_response(code="400", message="菜单编码已存在")
if request.parent_id is not None:
cursor.execute("SELECT menu_id FROM sys_menus WHERE menu_id = %s", (request.parent_id,))
if not cursor.fetchone():
return create_api_response(code="400", message="父菜单不存在")
cursor.execute(
"""
INSERT INTO sys_menus (menu_code, menu_name, menu_icon, menu_url, menu_type, parent_id, sort_order, is_active, description)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
request.menu_code,
request.menu_name,
request.menu_icon,
request.menu_url,
request.menu_type,
request.parent_id,
request.sort_order,
1 if request.is_active else 0,
request.description,
),
)
menu_id = cursor.lastrowid
connection.commit()
_invalidate_user_menu_cache()
cursor.execute(
"""
SELECT menu_id, menu_code, menu_name, menu_icon, menu_url, menu_type,
parent_id, sort_order, is_active, description, created_at, updated_at
FROM sys_menus
WHERE menu_id = %s
""",
(menu_id,),
)
created = cursor.fetchone()
return create_api_response(code="200", message="创建菜单成功", data=created)
except Exception as e:
return create_api_response(code="500", message=f"创建菜单失败: {str(e)}")
def update_menu(menu_id: int, request):
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT * FROM sys_menus WHERE menu_id = %s", (menu_id,))
current = cursor.fetchone()
if not current:
return create_api_response(code="404", message="菜单不存在")
updates = {}
for field in [
"menu_code",
"menu_name",
"menu_icon",
"menu_url",
"menu_type",
"sort_order",
"description",
]:
value = getattr(request, field)
if value is not None:
updates[field] = value
if request.is_active is not None:
updates["is_active"] = 1 if request.is_active else 0
fields_set = getattr(request, "model_fields_set", getattr(request, "__fields_set__", set()))
if request.parent_id == menu_id:
return create_api_response(code="400", message="父菜单不能为自身")
if request.parent_id is not None:
cursor.execute("SELECT menu_id FROM sys_menus WHERE menu_id = %s", (request.parent_id,))
if not cursor.fetchone():
return create_api_response(code="400", message="父菜单不存在")
cursor.execute("SELECT menu_id, parent_id FROM sys_menus")
all_menus = cursor.fetchall()
_, children_by_parent = _build_menu_index(all_menus)
descendants = _get_descendants(menu_id, children_by_parent)
if request.parent_id in descendants:
return create_api_response(code="400", message="父菜单不能设置为当前菜单的子孙菜单")
if request.parent_id is not None or (request.parent_id is None and "parent_id" in fields_set):
updates["parent_id"] = request.parent_id
if "menu_code" in updates:
cursor.execute(
"SELECT menu_id FROM sys_menus WHERE menu_code = %s AND menu_id != %s",
(updates["menu_code"], menu_id),
)
if cursor.fetchone():
return create_api_response(code="400", message="菜单编码已存在")
if not updates:
return create_api_response(code="200", message="没有变更内容", data=current)
set_sql = ", ".join([f"{key} = %s" for key in updates.keys()])
values = list(updates.values()) + [menu_id]
cursor.execute(f"UPDATE sys_menus SET {set_sql} WHERE menu_id = %s", tuple(values))
connection.commit()
_invalidate_user_menu_cache()
cursor.execute(
"""
SELECT menu_id, menu_code, menu_name, menu_icon, menu_url, menu_type,
parent_id, sort_order, is_active, description, created_at, updated_at
FROM sys_menus
WHERE menu_id = %s
""",
(menu_id,),
)
updated = cursor.fetchone()
return create_api_response(code="200", message="更新菜单成功", data=updated)
except Exception as e:
return create_api_response(code="500", message=f"更新菜单失败: {str(e)}")
def delete_menu(menu_id: int):
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT menu_id FROM sys_menus WHERE menu_id = %s", (menu_id,))
if not cursor.fetchone():
return create_api_response(code="404", message="菜单不存在")
cursor.execute("SELECT COUNT(*) AS cnt FROM sys_menus WHERE parent_id = %s", (menu_id,))
child_count = cursor.fetchone()["cnt"]
if child_count > 0:
return create_api_response(code="400", message="请先删除子菜单")
cursor.execute("DELETE FROM sys_role_menu_permissions WHERE menu_id = %s", (menu_id,))
cursor.execute("DELETE FROM sys_menus WHERE menu_id = %s", (menu_id,))
connection.commit()
_invalidate_user_menu_cache()
return create_api_response(code="200", message="删除菜单成功")
except Exception as e:
return create_api_response(code="500", message=f"删除菜单失败: {str(e)}")
def get_all_roles():
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = """
SELECT r.role_id, r.role_name, r.created_at,
COUNT(rmp.menu_id) as menu_count
FROM sys_roles r
LEFT JOIN sys_role_menu_permissions rmp ON r.role_id = rmp.role_id
GROUP BY r.role_id
ORDER BY r.role_id ASC
"""
cursor.execute(query)
roles = cursor.fetchall()
return create_api_response(
code="200",
message="获取角色列表成功",
data={"roles": roles, "total": len(roles)},
)
except Exception as e:
return create_api_response(code="500", message=f"获取角色列表失败: {str(e)}")
def create_role(request):
try:
role_name = request.role_name.strip()
if not role_name:
return create_api_response(code="400", message="角色名称不能为空")
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT role_id FROM sys_roles WHERE role_name = %s", (role_name,))
if cursor.fetchone():
return create_api_response(code="400", message="角色名称已存在")
cursor.execute("INSERT INTO sys_roles (role_name) VALUES (%s)", (role_name,))
role_id = cursor.lastrowid
connection.commit()
cursor.execute("SELECT role_id, role_name, created_at FROM sys_roles WHERE role_id = %s", (role_id,))
role = cursor.fetchone()
return create_api_response(code="200", message="创建角色成功", data=role)
except Exception as e:
return create_api_response(code="500", message=f"创建角色失败: {str(e)}")
def update_role(role_id: int, request):
try:
role_name = request.role_name.strip()
if not role_name:
return create_api_response(code="400", message="角色名称不能为空")
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT role_id FROM sys_roles WHERE role_id = %s", (role_id,))
if not cursor.fetchone():
return create_api_response(code="404", message="角色不存在")
cursor.execute("SELECT role_id FROM sys_roles WHERE role_name = %s AND role_id != %s", (role_name, role_id))
if cursor.fetchone():
return create_api_response(code="400", message="角色名称已存在")
cursor.execute("UPDATE sys_roles SET role_name = %s WHERE role_id = %s", (role_name, role_id))
connection.commit()
cursor.execute("SELECT role_id, role_name, created_at FROM sys_roles WHERE role_id = %s", (role_id,))
role = cursor.fetchone()
return create_api_response(code="200", message="更新角色成功", data=role)
except Exception as e:
return create_api_response(code="500", message=f"更新角色失败: {str(e)}")
def get_role_users(role_id: int, page: int, size: int):
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT role_id, role_name FROM sys_roles WHERE role_id = %s", (role_id,))
role = cursor.fetchone()
if not role:
return create_api_response(code="404", message="角色不存在")
cursor.execute(
"""
SELECT COUNT(*) AS total
FROM sys_users
WHERE role_id = %s
""",
(role_id,),
)
total = cursor.fetchone()["total"]
offset = (page - 1) * size
cursor.execute(
"""
SELECT user_id, username, caption, email, avatar_url, role_id, created_at
FROM sys_users
WHERE role_id = %s
ORDER BY user_id ASC
LIMIT %s OFFSET %s
""",
(role_id, size, offset),
)
users = cursor.fetchall()
return create_api_response(
code="200",
message="获取角色用户成功",
data={
"role_id": role_id,
"role_name": role["role_name"],
"users": users,
"total": total,
"page": page,
"size": size,
},
)
except Exception as e:
return create_api_response(code="500", message=f"获取角色用户失败: {str(e)}")
def get_all_role_permissions():
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute(
"""
SELECT rmp.role_id, rmp.menu_id
FROM sys_role_menu_permissions rmp
JOIN sys_menus m ON m.menu_id = rmp.menu_id
WHERE m.is_active = 1
ORDER BY rmp.role_id ASC, rmp.menu_id ASC
"""
)
rows = cursor.fetchall()
result = {}
for row in rows:
result.setdefault(row["role_id"], []).append(row["menu_id"])
return create_api_response(code="200", message="获取角色权限成功", data={"permissions": result})
except Exception as e:
return create_api_response(code="500", message=f"获取角色权限失败: {str(e)}")
def get_role_permissions(role_id: int):
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT role_id, role_name FROM sys_roles WHERE role_id = %s", (role_id,))
role = cursor.fetchone()
if not role:
return create_api_response(code="404", message="角色不存在")
query = """
SELECT rmp.menu_id
FROM sys_role_menu_permissions rmp
JOIN sys_menus m ON m.menu_id = rmp.menu_id
WHERE rmp.role_id = %s
AND m.is_active = 1
"""
cursor.execute(query, (role_id,))
permissions = cursor.fetchall()
menu_ids = [permission["menu_id"] for permission in permissions]
return create_api_response(
code="200",
message="获取角色权限成功",
data=RolePermissionInfo(
role_id=role["role_id"],
role_name=role["role_name"],
menu_ids=menu_ids,
),
)
except Exception as e:
return create_api_response(code="500", message=f"获取角色权限失败: {str(e)}")
def update_role_permissions(role_id: int, request):
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT role_id FROM sys_roles WHERE role_id = %s", (role_id,))
if not cursor.fetchone():
return create_api_response(code="404", message="角色不存在")
cursor.execute(
"""
SELECT menu_id, parent_id
FROM sys_menus
WHERE is_active = 1
"""
)
all_menus = cursor.fetchall()
menu_id_set = {menu["menu_id"] for menu in all_menus}
invalid_menu_ids = [menu_id for menu_id in request.menu_ids if menu_id not in menu_id_set]
if invalid_menu_ids:
return create_api_response(code="400", message="包含无效的菜单ID")
normalized_menu_ids = _normalize_permission_menu_ids(request.menu_ids, all_menus)
cursor.execute("DELETE FROM sys_role_menu_permissions WHERE role_id = %s", (role_id,))
if normalized_menu_ids:
insert_values = [(role_id, menu_id) for menu_id in normalized_menu_ids]
cursor.executemany(
"INSERT INTO sys_role_menu_permissions (role_id, menu_id) VALUES (%s, %s)",
insert_values,
)
connection.commit()
_invalidate_user_menu_cache(role_id)
return create_api_response(
code="200",
message="更新角色权限成功",
data={"role_id": role_id, "menu_count": len(normalized_menu_ids)},
)
except Exception as e:
return create_api_response(code="500", message=f"更新角色权限失败: {str(e)}")
def get_user_menus(current_user: dict):
try:
role_id = current_user["role_id"]
cached_menus = _get_cached_user_menus(role_id)
if cached_menus is not None:
return create_api_response(
code="200",
message="获取用户菜单成功",
data={"menus": cached_menus},
)
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = """
SELECT m.menu_id, m.menu_code, m.menu_name, m.menu_icon,
m.menu_url, m.menu_type, m.parent_id, m.sort_order
FROM sys_menus m
JOIN sys_role_menu_permissions rmp ON m.menu_id = rmp.menu_id
WHERE rmp.role_id = %s
AND m.is_active = 1
AND (m.is_visible = 1 OR m.is_visible IS NULL OR m.menu_code IN ('dashboard', 'desktop'))
ORDER BY
COALESCE(m.parent_id, m.menu_id) ASC,
CASE WHEN m.parent_id IS NULL THEN 0 ELSE 1 END ASC,
m.sort_order ASC,
m.menu_id ASC
"""
cursor.execute(query, (role_id,))
menus = cursor.fetchall()
current_menu_ids = {menu["menu_id"] for menu in menus}
missing_parent_ids = {
menu["parent_id"]
for menu in menus
if menu.get("parent_id") is not None and menu["parent_id"] not in current_menu_ids
}
if missing_parent_ids:
format_strings = ",".join(["%s"] * len(missing_parent_ids))
cursor.execute(
f"""
SELECT menu_id, menu_code, menu_name, menu_icon, menu_url, menu_type, parent_id, sort_order
FROM sys_menus
WHERE is_active = 1 AND menu_id IN ({format_strings})
""",
tuple(missing_parent_ids),
)
parent_rows = cursor.fetchall()
menus.extend(parent_rows)
menus = sorted(
{menu["menu_id"]: menu for menu in menus}.values(),
key=lambda menu: (
menu["parent_id"] if menu["parent_id"] is not None else menu["menu_id"],
0 if menu["parent_id"] is None else 1,
menu["sort_order"],
menu["menu_id"],
),
)
_set_cached_user_menus(role_id, menus)
return create_api_response(
code="200",
message="获取用户菜单成功",
data={"menus": menus},
)
except Exception as e:
return create_api_response(code="500", message=f"获取用户菜单失败: {str(e)}")