from fastapi import APIRouter, Depends, HTTPException from app.core.database import get_db_connection from app.core.auth import get_current_admin_user from app.core.response import create_api_response from app.core.config import QWEN_API_KEY from app.services.system_config_service import SystemConfigService from pydantic import BaseModel from typing import Optional, List import dashscope from dashscope.audio.asr import VocabularyService from datetime import datetime from http import HTTPStatus router = APIRouter() # ── Request Models ────────────────────────────────────────── class CreateGroupRequest(BaseModel): name: str description: Optional[str] = None status: int = 1 class UpdateGroupRequest(BaseModel): name: Optional[str] = None description: Optional[str] = None status: Optional[int] = None class CreateItemRequest(BaseModel): text: str weight: int = 4 lang: str = "zh" status: int = 1 class UpdateItemRequest(BaseModel): text: Optional[str] = None weight: Optional[int] = None lang: Optional[str] = None status: Optional[int] = None # ── Hot-Word Group CRUD ───────────────────────────────────── @router.get("/admin/hot-word-groups", response_model=dict) async def list_groups(current_user: dict = Depends(get_current_admin_user)): """列表(含每组热词数量统计)""" try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) cursor.execute(""" SELECT g.*, COUNT(i.id) AS item_count, SUM(CASE WHEN i.status = 1 THEN 1 ELSE 0 END) AS enabled_item_count FROM hot_word_group g LEFT JOIN hot_word_item i ON i.group_id = g.id GROUP BY g.id ORDER BY g.update_time DESC """) groups = cursor.fetchall() cursor.close() return create_api_response(code="200", message="获取成功", data=groups) except Exception as e: return create_api_response(code="500", message=f"获取失败: {str(e)}") @router.post("/admin/hot-word-groups", response_model=dict) async def create_group(request: CreateGroupRequest, current_user: dict = Depends(get_current_admin_user)): try: with get_db_connection() as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO hot_word_group (name, description, status) VALUES (%s, %s, %s)", (request.name, request.description, request.status), ) new_id = cursor.lastrowid conn.commit() cursor.close() return create_api_response(code="200", message="创建成功", data={"id": new_id}) except Exception as e: return create_api_response(code="500", message=f"创建失败: {str(e)}") @router.put("/admin/hot-word-groups/{id}", response_model=dict) async def update_group(id: int, request: UpdateGroupRequest, current_user: dict = Depends(get_current_admin_user)): try: with get_db_connection() as conn: cursor = conn.cursor() fields, params = [], [] if request.name is not None: fields.append("name = %s"); params.append(request.name) if request.description is not None: fields.append("description = %s"); params.append(request.description) if request.status is not None: fields.append("status = %s"); params.append(request.status) if not fields: return create_api_response(code="400", message="无更新内容") params.append(id) cursor.execute(f"UPDATE hot_word_group SET {', '.join(fields)} WHERE id = %s", tuple(params)) conn.commit() cursor.close() return create_api_response(code="200", message="更新成功") except Exception as e: return create_api_response(code="500", message=f"更新失败: {str(e)}") @router.delete("/admin/hot-word-groups/{id}", response_model=dict) async def delete_group(id: int, current_user: dict = Depends(get_current_admin_user)): """删除组(级联删除条目),同时清除关联的 audio_model_config""" try: with get_db_connection() as conn: cursor = conn.cursor() # 清除引用该组的音频模型配置 cursor.execute( """ UPDATE audio_model_config SET hot_word_group_id = NULL, extra_config = JSON_REMOVE(COALESCE(extra_config, JSON_OBJECT()), '$.vocabulary_id') WHERE hot_word_group_id = %s """, (id,), ) cursor.execute("DELETE FROM hot_word_item WHERE group_id = %s", (id,)) cursor.execute("DELETE FROM hot_word_group WHERE id = %s", (id,)) conn.commit() cursor.close() return create_api_response(code="200", message="删除成功") except Exception as e: return create_api_response(code="500", message=f"删除失败: {str(e)}") @router.post("/admin/hot-word-groups/{id}/sync", response_model=dict) async def sync_group(id: int, current_user: dict = Depends(get_current_admin_user)): """同步指定组到阿里云 DashScope""" try: dashscope.api_key = QWEN_API_KEY with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) # 获取组信息 cursor.execute("SELECT * FROM hot_word_group WHERE id = %s", (id,)) group = cursor.fetchone() if not group: return create_api_response(code="404", message="热词组不存在") # 获取该组下启用的热词 cursor.execute( "SELECT text, weight, lang FROM hot_word_item WHERE group_id = %s AND status = 1", (id,), ) items = cursor.fetchall() if not items: return create_api_response(code="400", message="该组没有启用的热词可同步") vocabulary_list = [{"text": it["text"], "weight": it["weight"], "lang": it["lang"]} for it in items] # ASR 模型名(同步时需要) asr_model_name = SystemConfigService.get_config_attribute('audio_model', 'model', 'paraformer-v2') existing_vocab_id = group.get("vocabulary_id") service = VocabularyService() vocab_id = existing_vocab_id try: if existing_vocab_id: try: service.update_vocabulary( vocabulary_id=existing_vocab_id, vocabulary=vocabulary_list, ) except Exception: existing_vocab_id = None # 更新失败,重建 if not existing_vocab_id: vocab_id = service.create_vocabulary( prefix="imeeting", target_model=asr_model_name, vocabulary=vocabulary_list, ) except Exception as api_error: return create_api_response(code="500", message=f"同步到阿里云失败: {str(api_error)}") # 回写 vocabulary_id 到热词组 cursor.execute( "UPDATE hot_word_group SET vocabulary_id = %s, last_sync_time = NOW() WHERE id = %s", (vocab_id, id), ) # 更新关联该组的所有 audio_model_config.extra_config.vocabulary_id cursor.execute( """ UPDATE audio_model_config SET extra_config = JSON_SET(COALESCE(extra_config, JSON_OBJECT()), '$.vocabulary_id', %s) WHERE hot_word_group_id = %s """, (vocab_id, id), ) conn.commit() cursor.close() return create_api_response( code="200", message="同步成功", data={"vocabulary_id": vocab_id, "synced_count": len(vocabulary_list)}, ) except Exception as e: return create_api_response(code="500", message=f"同步异常: {str(e)}") # ── Hot-Word Item CRUD ────────────────────────────────────── @router.get("/admin/hot-word-groups/{group_id}/items", response_model=dict) async def list_items(group_id: int, current_user: dict = Depends(get_current_admin_user)): try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) cursor.execute( "SELECT * FROM hot_word_item WHERE group_id = %s ORDER BY update_time DESC", (group_id,), ) items = cursor.fetchall() cursor.close() return create_api_response(code="200", message="获取成功", data=items) except Exception as e: return create_api_response(code="500", message=f"获取失败: {str(e)}") @router.post("/admin/hot-word-groups/{group_id}/items", response_model=dict) async def create_item(group_id: int, request: CreateItemRequest, current_user: dict = Depends(get_current_admin_user)): try: with get_db_connection() as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO hot_word_item (group_id, text, weight, lang, status) VALUES (%s, %s, %s, %s, %s)", (group_id, request.text, request.weight, request.lang, request.status), ) new_id = cursor.lastrowid conn.commit() cursor.close() return create_api_response(code="200", message="创建成功", data={"id": new_id}) except Exception as e: if "Duplicate entry" in str(e): return create_api_response(code="400", message="该组内已存在相同热词") return create_api_response(code="500", message=f"创建失败: {str(e)}") @router.put("/admin/hot-word-items/{id}", response_model=dict) async def update_item(id: int, request: UpdateItemRequest, current_user: dict = Depends(get_current_admin_user)): try: with get_db_connection() as conn: cursor = conn.cursor() fields, params = [], [] if request.text is not None: fields.append("text = %s"); params.append(request.text) if request.weight is not None: fields.append("weight = %s"); params.append(request.weight) if request.lang is not None: fields.append("lang = %s"); params.append(request.lang) if request.status is not None: fields.append("status = %s"); params.append(request.status) if not fields: return create_api_response(code="400", message="无更新内容") params.append(id) cursor.execute(f"UPDATE hot_word_item SET {', '.join(fields)} WHERE id = %s", tuple(params)) conn.commit() cursor.close() return create_api_response(code="200", message="更新成功") except Exception as e: if "Duplicate entry" in str(e): return create_api_response(code="400", message="该组内已存在相同热词") return create_api_response(code="500", message=f"更新失败: {str(e)}") @router.delete("/admin/hot-word-items/{id}", response_model=dict) async def delete_item(id: int, current_user: dict = Depends(get_current_admin_user)): try: with get_db_connection() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM hot_word_item WHERE id = %s", (id,)) conn.commit() cursor.close() return create_api_response(code="200", message="删除成功") except Exception as e: return create_api_response(code="500", message=f"删除失败: {str(e)}")