imetting/backend/app/api/endpoints/dict_data.py

455 lines
14 KiB
Python
Raw Normal View History

from fastapi import APIRouter, HTTPException, Depends
from app.core.database import get_db_connection
from app.core.auth import get_current_user, get_current_admin_user
from app.core.response import create_api_response
from pydantic import BaseModel
from typing import Optional, List
import json
router = APIRouter()
class DictDataItem(BaseModel):
"""码表数据项"""
id: int
dict_type: str
dict_code: str
parent_code: str
tree_path: Optional[str] = None
label_cn: str
label_en: Optional[str] = None
sort_order: int
extension_attr: Optional[dict] = None
is_default: int
status: int
create_time: str
class CreateDictDataRequest(BaseModel):
"""创建码表数据请求"""
dict_type: str = "client_platform"
dict_code: str
parent_code: str = "ROOT"
label_cn: str
label_en: Optional[str] = None
sort_order: int = 0
extension_attr: Optional[dict] = None
is_default: int = 0
status: int = 1
class UpdateDictDataRequest(BaseModel):
"""更新码表数据请求"""
parent_code: Optional[str] = None
label_cn: Optional[str] = None
label_en: Optional[str] = None
sort_order: Optional[int] = None
extension_attr: Optional[dict] = None
is_default: Optional[int] = None
status: Optional[int] = None
@router.get("/dict/types", response_model=dict)
async def get_dict_types():
"""
获取所有字典类型公开接口
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
query = """
SELECT DISTINCT dict_type
FROM dict_data
WHERE status = 1
ORDER BY dict_type
"""
cursor.execute(query)
types = cursor.fetchall()
cursor.close()
return create_api_response(
code="200",
message="获取成功",
data={"types": [t['dict_type'] for t in types]}
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取字典类型失败: {str(e)}"
)
@router.get("/dict/{dict_type}", response_model=dict)
2026-01-21 07:21:17 +00:00
async def get_dict_data_by_type(dict_type: str, parent_code: Optional[str] = None):
"""
获取指定类型的所有码表数据公开接口
支持树形结构
2026-01-21 07:21:17 +00:00
可选参数parent_code 筛选特定父节点的子项此时不返回树结构只返回平铺列表
参数
dict_type: 字典类型 'client_platform'
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
query = """
SELECT id, dict_type, dict_code, parent_code, tree_path,
label_cn, label_en, sort_order, extension_attr,
is_default, status, create_time
FROM dict_data
WHERE dict_type = %s AND status = 1
"""
2026-01-21 07:21:17 +00:00
params = [dict_type]
if parent_code:
query += " AND parent_code = %s"
params.append(parent_code)
query += " ORDER BY parent_code, sort_order, dict_code"
cursor.execute(query, params)
items = cursor.fetchall()
cursor.close()
# 处理JSON字段
for item in items:
if item.get('extension_attr'):
try:
item['extension_attr'] = json.loads(item['extension_attr'])
except:
item['extension_attr'] = {}
2026-01-21 07:21:17 +00:00
# 如果指定了parent_code只返回平铺列表
if parent_code:
return create_api_response(
code="200",
message="获取成功",
data={
"items": items,
"tree": []
}
)
# 构建树形结构
tree_data = []
nodes_map = {}
# 第一遍:创建所有节点
for item in items:
nodes_map[item['dict_code']] = {
**item,
'children': []
}
# 第二遍:构建树形关系
for item in items:
node = nodes_map[item['dict_code']]
2026-01-21 07:21:17 +00:00
parent_code_val = item['parent_code']
2026-01-21 07:21:17 +00:00
if parent_code_val == 'ROOT':
tree_data.append(node)
2026-01-21 07:21:17 +00:00
elif parent_code_val in nodes_map:
nodes_map[parent_code_val]['children'].append(node)
return create_api_response(
code="200",
message="获取成功",
data={
"items": items, # 平铺数据
"tree": tree_data # 树形数据
}
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取码表数据失败: {str(e)}"
)
@router.get("/dict/{dict_type}/{dict_code}", response_model=dict)
async def get_dict_data_by_code(dict_type: str, dict_code: str):
"""
获取指定编码的码表数据公开接口
参数
dict_type: 字典类型
dict_code: 字典编码
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
query = """
SELECT id, dict_type, dict_code, parent_code, tree_path,
label_cn, label_en, sort_order, extension_attr,
is_default, status, create_time, update_time
FROM dict_data
WHERE dict_type = %s AND dict_code = %s
LIMIT 1
"""
cursor.execute(query, (dict_type, dict_code))
item = cursor.fetchone()
cursor.close()
if not item:
return create_api_response(
code="404",
message=f"未找到编码 {dict_code} 的数据"
)
# 处理JSON字段
if item.get('extension_attr'):
try:
item['extension_attr'] = json.loads(item['extension_attr'])
except:
item['extension_attr'] = {}
return create_api_response(
code="200",
message="获取成功",
data=item
)
except Exception as e:
return create_api_response(
code="500",
message=f"获取码表数据失败: {str(e)}"
)
@router.post("/dict", response_model=dict)
async def create_dict_data(
request: CreateDictDataRequest,
current_user: dict = Depends(get_current_admin_user)
):
"""
创建码表数据仅管理员
"""
try:
# 验证extension_attr的JSON格式
if request.extension_attr:
try:
# 尝试序列化验证是否为有效的JSON对象
json.dumps(request.extension_attr)
except (TypeError, ValueError) as e:
return create_api_response(
code="400",
message=f"扩展属性格式错误必须是有效的JSON对象: {str(e)}"
)
with get_db_connection() as conn:
cursor = conn.cursor()
# 检查是否已存在
cursor.execute(
"SELECT id FROM dict_data WHERE dict_type = %s AND dict_code = %s",
(request.dict_type, request.dict_code)
)
if cursor.fetchone():
cursor.close()
return create_api_response(
code="400",
message=f"编码 {request.dict_code} 已存在"
)
# 插入数据
query = """
INSERT INTO dict_data (
dict_type, dict_code, parent_code, label_cn, label_en,
sort_order, extension_attr, is_default, status
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
extension_json = json.dumps(request.extension_attr, ensure_ascii=False) if request.extension_attr else None
cursor.execute(query, (
request.dict_type,
request.dict_code,
request.parent_code,
request.label_cn,
request.label_en,
request.sort_order,
extension_json,
request.is_default,
request.status
))
new_id = cursor.lastrowid
conn.commit()
cursor.close()
return create_api_response(
code="200",
message="创建成功",
data={"id": new_id}
)
except Exception as e:
return create_api_response(
code="500",
message=f"创建码表数据失败: {str(e)}"
)
@router.put("/dict/{id}", response_model=dict)
async def update_dict_data(
id: int,
request: UpdateDictDataRequest,
current_user: dict = Depends(get_current_admin_user)
):
"""
更新码表数据仅管理员
"""
try:
# 验证extension_attr的JSON格式
if request.extension_attr is not None:
try:
# 尝试序列化验证是否为有效的JSON对象
json.dumps(request.extension_attr)
except (TypeError, ValueError) as e:
return create_api_response(
code="400",
message=f"扩展属性格式错误必须是有效的JSON对象: {str(e)}"
)
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
# 检查是否存在
cursor.execute("SELECT * FROM dict_data WHERE id = %s", (id,))
existing = cursor.fetchone()
if not existing:
cursor.close()
return create_api_response(
code="404",
message="码表数据不存在"
)
# 构建更新语句
update_fields = []
params = []
if request.parent_code is not None:
update_fields.append("parent_code = %s")
params.append(request.parent_code)
if request.label_cn is not None:
update_fields.append("label_cn = %s")
params.append(request.label_cn)
if request.label_en is not None:
update_fields.append("label_en = %s")
params.append(request.label_en)
if request.sort_order is not None:
update_fields.append("sort_order = %s")
params.append(request.sort_order)
if request.extension_attr is not None:
update_fields.append("extension_attr = %s")
params.append(json.dumps(request.extension_attr, ensure_ascii=False))
if request.is_default is not None:
update_fields.append("is_default = %s")
params.append(request.is_default)
if request.status is not None:
update_fields.append("status = %s")
params.append(request.status)
if not update_fields:
cursor.close()
return create_api_response(
code="400",
message="没有要更新的字段"
)
# 执行更新
update_query = f"""
UPDATE dict_data
SET {', '.join(update_fields)}
WHERE id = %s
"""
params.append(id)
cursor.execute(update_query, params)
conn.commit()
cursor.close()
return create_api_response(
code="200",
message="更新成功"
)
except Exception as e:
return create_api_response(
code="500",
message=f"更新码表数据失败: {str(e)}"
)
@router.delete("/dict/{id}", response_model=dict)
async def delete_dict_data(
id: int,
current_user: dict = Depends(get_current_admin_user)
):
"""
删除码表数据仅管理员
注意如果有子节点或被引用应该拒绝删除
"""
try:
with get_db_connection() as conn:
cursor = conn.cursor(dictionary=True)
# 检查是否存在
cursor.execute("SELECT dict_code FROM dict_data WHERE id = %s", (id,))
existing = cursor.fetchone()
if not existing:
cursor.close()
return create_api_response(
code="404",
message="码表数据不存在"
)
# 检查是否有子节点
cursor.execute(
"SELECT COUNT(*) as count FROM dict_data WHERE parent_code = %s",
(existing['dict_code'],)
)
if cursor.fetchone()['count'] > 0:
cursor.close()
return create_api_response(
code="400",
message="该节点存在子节点,无法删除"
)
# 检查是否被client_downloads引用
cursor.execute(
"SELECT COUNT(*) as count FROM client_downloads WHERE platform_code = %s",
(existing['dict_code'],)
)
if cursor.fetchone()['count'] > 0:
cursor.close()
return create_api_response(
code="400",
message="该平台编码已被客户端下载记录引用,无法删除"
)
# 执行删除
cursor.execute("DELETE FROM dict_data WHERE id = %s", (id,))
conn.commit()
cursor.close()
return create_api_response(
code="200",
message="删除成功"
)
except Exception as e:
return create_api_response(
code="500",
message=f"删除码表数据失败: {str(e)}"
)