import json from typing import Optional, Dict, Any from app.core.database import get_db_connection class SystemConfigService: """系统配置服务 - 优先从新配置表读取,兼容 dict_data(system_config) 回退""" DICT_TYPE = 'system_config' DEFAULT_LLM_ENDPOINT_URL = 'https://dashscope.aliyuncs.com/compatible-mode/v1' # 配置键常量 ASR_VOCABULARY_ID = 'asr_vocabulary_id' TIMELINE_PAGESIZE = 'timeline_pagesize' DEFAULT_RESET_PASSWORD = 'default_reset_password' MAX_AUDIO_SIZE = 'max_audio_size' # 品牌配置 BRANDING_APP_NAME = 'branding_app_name' BRANDING_HOME_HEADLINE = 'branding_home_headline' BRANDING_HOME_TAGLINE = 'branding_home_tagline' BRANDING_CONSOLE_SUBTITLE = 'branding_console_subtitle' BRANDING_PREVIEW_TITLE = 'branding_preview_title' BRANDING_LOGIN_WELCOME = 'branding_login_welcome' BRANDING_FOOTER_TEXT = 'branding_footer_text' # 声纹配置 VOICEPRINT_TEMPLATE_TEXT = 'voiceprint_template_text' VOICEPRINT_MAX_SIZE = 'voiceprint_max_size' VOICEPRINT_DURATION = 'voiceprint_duration' VOICEPRINT_SAMPLE_RATE = 'voiceprint_sample_rate' VOICEPRINT_CHANNELS = 'voiceprint_channels' # LLM模型配置 LLM_MODEL_NAME = 'llm_model_name' LLM_TIMEOUT = 'llm_timeout' LLM_TEMPERATURE = 'llm_temperature' LLM_TOP_P = 'llm_top_p' @staticmethod def _parse_json_object(value: Any) -> Dict[str, Any]: if value is None: return {} if isinstance(value, dict): return dict(value) if isinstance(value, str): value = value.strip() if not value: return {} try: parsed = json.loads(value) return parsed if isinstance(parsed, dict) else {} except json.JSONDecodeError: return {} return {} @staticmethod def _normalize_string_list(value: Any) -> Optional[list[str]]: if value is None: return None if isinstance(value, list): items = [str(item).strip() for item in value if str(item).strip()] return items or None if isinstance(value, str): items = [item.strip() for item in value.split(",") if item.strip()] return items or None return None @classmethod def _build_audio_runtime_config(cls, audio_row: Dict[str, Any]) -> Dict[str, Any]: cfg: Dict[str, Any] = {} if not audio_row: return cfg extra_config = cls._parse_json_object(audio_row.get("extra_config")) if audio_row.get("endpoint_url"): cfg["endpoint_url"] = audio_row["endpoint_url"] if audio_row.get("api_key"): cfg["api_key"] = audio_row["api_key"] if audio_row.get("provider"): cfg["provider"] = audio_row["provider"] if audio_row.get("model_code"): cfg["model_code"] = audio_row["model_code"] if audio_row.get("audio_scene"): cfg["audio_scene"] = audio_row["audio_scene"] if audio_row.get("hot_word_group_id") is not None: cfg["hot_word_group_id"] = audio_row["hot_word_group_id"] if audio_row.get("audio_scene") == "asr": if extra_config.get("model") is None and audio_row.get("asr_model_name") is not None: extra_config["model"] = audio_row["asr_model_name"] if extra_config.get("vocabulary_id") is None and audio_row.get("asr_vocabulary_id") is not None: extra_config["vocabulary_id"] = audio_row["asr_vocabulary_id"] if extra_config.get("speaker_count") is None and audio_row.get("asr_speaker_count") is not None: extra_config["speaker_count"] = audio_row["asr_speaker_count"] if extra_config.get("language_hints") is None and audio_row.get("asr_language_hints"): extra_config["language_hints"] = audio_row["asr_language_hints"] if extra_config.get("disfluency_removal_enabled") is None and audio_row.get("asr_disfluency_removal_enabled") is not None: extra_config["disfluency_removal_enabled"] = bool(audio_row["asr_disfluency_removal_enabled"]) if extra_config.get("diarization_enabled") is None and audio_row.get("asr_diarization_enabled") is not None: extra_config["diarization_enabled"] = bool(audio_row["asr_diarization_enabled"]) else: if extra_config.get("model") is None and audio_row.get("model_name"): extra_config["model"] = audio_row["model_name"] if extra_config.get("template_text") is None and audio_row.get("vp_template_text") is not None: extra_config["template_text"] = audio_row["vp_template_text"] if extra_config.get("duration_seconds") is None and audio_row.get("vp_duration_seconds") is not None: extra_config["duration_seconds"] = audio_row["vp_duration_seconds"] if extra_config.get("sample_rate") is None and audio_row.get("vp_sample_rate") is not None: extra_config["sample_rate"] = audio_row["vp_sample_rate"] if extra_config.get("channels") is None and audio_row.get("vp_channels") is not None: extra_config["channels"] = audio_row["vp_channels"] if extra_config.get("max_size_bytes") is None and audio_row.get("vp_max_size_bytes") is not None: extra_config["max_size_bytes"] = audio_row["vp_max_size_bytes"] language_hints = cls._normalize_string_list(extra_config.get("language_hints")) if language_hints is not None: extra_config["language_hints"] = language_hints cfg.update(extra_config) return cfg @classmethod def get_active_audio_model_config(cls, scene: str = "asr") -> Dict[str, Any]: try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) cursor.execute( """ SELECT model_code, model_name, audio_scene, provider, endpoint_url, api_key, hot_word_group_id, asr_model_name, asr_vocabulary_id, asr_speaker_count, asr_language_hints, asr_disfluency_removal_enabled, asr_diarization_enabled, vp_template_text, vp_duration_seconds, vp_sample_rate, vp_channels, vp_max_size_bytes, extra_config FROM audio_model_config WHERE audio_scene = %s AND is_active = 1 ORDER BY is_default DESC, updated_at DESC, config_id ASC LIMIT 1 """, (scene,), ) row = cursor.fetchone() cursor.close() return cls._build_audio_runtime_config(row) if row else {} except Exception: return {} @classmethod def _get_parameter_value(cls, param_key: str): try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) cursor.execute( """ SELECT param_value FROM sys_system_parameters WHERE param_key = %s AND is_active = 1 LIMIT 1 """, (param_key,), ) result = cursor.fetchone() cursor.close() return result["param_value"] if result else None except Exception: return None @classmethod def _get_model_config_json(cls, model_code: str): try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) # 1) llm 专表 cursor.execute( """ SELECT model_code, endpoint_url, api_key, llm_model_name, llm_timeout, llm_temperature, llm_top_p, llm_max_tokens, llm_system_prompt FROM llm_model_config WHERE model_code = %s AND is_active = 1 ORDER BY is_default DESC, config_id ASC LIMIT 1 """, (model_code,), ) llm_row = cursor.fetchone() if not llm_row and model_code == "llm_model": cursor.execute( """ SELECT model_code, endpoint_url, api_key, llm_model_name, llm_timeout, llm_temperature, llm_top_p, llm_max_tokens, llm_system_prompt FROM llm_model_config WHERE is_active = 1 ORDER BY is_default DESC, updated_at DESC, config_id ASC LIMIT 1 """ ) llm_row = cursor.fetchone() if llm_row: cursor.close() cfg = {} if llm_row.get("endpoint_url"): cfg["endpoint_url"] = llm_row["endpoint_url"] if llm_row.get("api_key"): cfg["api_key"] = llm_row["api_key"] if llm_row.get("llm_model_name") is not None: cfg["model_name"] = llm_row["llm_model_name"] if llm_row.get("llm_timeout") is not None: cfg["time_out"] = llm_row["llm_timeout"] if llm_row.get("llm_temperature") is not None: cfg["temperature"] = float(llm_row["llm_temperature"]) if llm_row.get("llm_top_p") is not None: cfg["top_p"] = float(llm_row["llm_top_p"]) if llm_row.get("llm_max_tokens") is not None: cfg["max_tokens"] = llm_row["llm_max_tokens"] if llm_row.get("llm_system_prompt") is not None: cfg["system_prompt"] = llm_row["llm_system_prompt"] return cfg # 2) audio 专表 if model_code in ("audio_model", "voiceprint_model"): target_scene = "voiceprint" if model_code == "voiceprint_model" else "asr" cursor.close() audio_cfg = cls.get_active_audio_model_config(target_scene) return audio_cfg or None cursor.execute( """ SELECT model_code, model_name, audio_scene, provider, endpoint_url, api_key, hot_word_group_id, asr_model_name, asr_vocabulary_id, asr_speaker_count, asr_language_hints, asr_disfluency_removal_enabled, asr_diarization_enabled, vp_template_text, vp_duration_seconds, vp_sample_rate, vp_channels, vp_max_size_bytes, extra_config FROM audio_model_config WHERE model_code = %s AND is_active = 1 ORDER BY is_default DESC, config_id ASC LIMIT 1 """, (model_code,), ) audio_row = cursor.fetchone() cursor.close() if audio_row: cfg = cls._build_audio_runtime_config(audio_row) if cfg.get("max_size_bytes") is not None and cfg.get("voiceprint_max_size") is None: cfg["voiceprint_max_size"] = cfg["max_size_bytes"] return cfg return None except Exception: return None @classmethod def get_config(cls, dict_code: str, default_value: Any = None) -> Any: """ 获取指定配置项的值 Args: dict_code: 配置项编码 default_value: 默认值,如果配置不存在则返回此值 Returns: 配置项的值 """ # 1) 新参数表 value = cls._get_parameter_value(dict_code) if value is not None: return value # 2) 兼容旧 sys_dict_data try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) query = """ SELECT extension_attr FROM sys_dict_data WHERE dict_type = %s AND dict_code = %s AND status = 1 LIMIT 1 """ cursor.execute(query, (cls.DICT_TYPE, dict_code)) result = cursor.fetchone() cursor.close() if result and result['extension_attr']: try: ext_attr = json.loads(result['extension_attr']) if isinstance(result['extension_attr'], str) else result['extension_attr'] return ext_attr.get('value', default_value) except (json.JSONDecodeError, AttributeError): pass return default_value except Exception as e: print(f"Error getting config {dict_code}: {e}") return default_value @classmethod def get_config_attribute(cls, dict_code: str, attr_name: str, default_value: Any = None) -> Any: """ 从指定配置记录的扩展属性中读取指定属性值 此方法用于处理扩展属性为复杂JSON的配置记录(如llm_model、voiceprint等) Args: dict_code: 配置项编码 (如 'llm_model', 'voiceprint') attr_name: 扩展属性中的属性名 (如 'time_out', 'channels', 'template_text') default_value: 默认值,如果配置或属性不存在则返回此值 Returns: 属性值 """ # 1) 新模型配置表 model_json = cls._get_model_config_json(dict_code) if model_json is not None: return model_json.get(attr_name, default_value) # 2) 兼容旧 sys_dict_data try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) query = """ SELECT extension_attr FROM sys_dict_data WHERE dict_type = %s AND dict_code = %s AND status = 1 LIMIT 1 """ cursor.execute(query, (cls.DICT_TYPE, dict_code)) result = cursor.fetchone() cursor.close() if result and result['extension_attr']: try: ext_attr = json.loads(result['extension_attr']) if isinstance(result['extension_attr'], str) else result['extension_attr'] return ext_attr.get(attr_name, default_value) except (json.JSONDecodeError, AttributeError): pass return default_value except Exception as e: print(f"Error getting config attribute {dict_code}.{attr_name}: {e}") return default_value @classmethod def set_config(cls, dict_code: str, value: Any, label_cn: str = None) -> bool: """ 设置指定配置项的值 Args: dict_code: 配置项编码 value: 配置值 label_cn: 配置项中文名称(仅在配置不存在时需要) Returns: 是否设置成功 """ # 1) 优先写入新参数表 try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) cursor.execute( """ INSERT INTO sys_system_parameters (param_key, param_name, param_value, value_type, category, description, is_active) VALUES (%s, %s, %s, %s, %s, %s, 1) ON DUPLICATE KEY UPDATE param_name = VALUES(param_name), param_value = VALUES(param_value), value_type = VALUES(value_type), category = VALUES(category), description = VALUES(description), is_active = 1 """, ( dict_code, label_cn or dict_code, str(value) if value is not None else "", "string", "system", "Migrated from legacy system_config", ), ) if dict_code == cls.ASR_VOCABULARY_ID: cursor.execute( """ INSERT INTO audio_model_config (model_code, model_name, audio_scene, provider, asr_model_name, asr_vocabulary_id, asr_speaker_count, asr_language_hints, asr_disfluency_removal_enabled, asr_diarization_enabled, description, is_active, is_default) VALUES ( 'audio_model', '音频识别模型', 'asr', 'dashscope', 'paraformer-v2', %s, 10, 'zh,en', 1, 1, '语音识别模型配置', 1, 1 ) ON DUPLICATE KEY UPDATE asr_vocabulary_id = VALUES(asr_vocabulary_id), is_active = 1 """, (str(value),), ) conn.commit() cursor.close() return True except Exception as e: print(f"Error setting config in sys_system_parameters {dict_code}: {e}") # 2) 回退写入旧 sys_dict_data try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) # 检查配置是否存在 cursor.execute( "SELECT id FROM sys_dict_data WHERE dict_type = %s AND dict_code = %s", (cls.DICT_TYPE, dict_code) ) existing = cursor.fetchone() extension_attr = json.dumps({"value": value}, ensure_ascii=False) if existing: # 更新现有配置 update_query = """ UPDATE sys_dict_data SET extension_attr = %s, update_time = NOW() WHERE dict_type = %s AND dict_code = %s """ cursor.execute(update_query, (extension_attr, cls.DICT_TYPE, dict_code)) else: # 插入新配置 if not label_cn: label_cn = dict_code insert_query = """ INSERT INTO sys_dict_data ( dict_type, dict_code, parent_code, label_cn, extension_attr, status, sort_order ) VALUES (%s, %s, 'ROOT', %s, %s, 1, 0) """ cursor.execute(insert_query, (cls.DICT_TYPE, dict_code, label_cn, extension_attr)) conn.commit() cursor.close() return True except Exception as e: print(f"Error setting config {dict_code}: {e}") return False @classmethod def get_all_configs(cls) -> Dict[str, Any]: """ 获取所有系统配置 Returns: 配置字典 {dict_code: value} """ # 1) 新参数表 try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) cursor.execute( """ SELECT param_key, param_value FROM sys_system_parameters WHERE is_active = 1 ORDER BY category, param_key """ ) rows = cursor.fetchall() cursor.close() if rows: return {row["param_key"]: row["param_value"] for row in rows} except Exception as e: print(f"Error getting all configs from sys_system_parameters: {e}") # 2) 兼容旧 sys_dict_data try: with get_db_connection() as conn: cursor = conn.cursor(dictionary=True) query = """ SELECT dict_code, label_cn, extension_attr FROM sys_dict_data WHERE dict_type = %s AND status = 1 ORDER BY sort_order """ cursor.execute(query, (cls.DICT_TYPE,)) results = cursor.fetchall() cursor.close() configs = {} for row in results: if row['extension_attr']: try: ext_attr = json.loads(row['extension_attr']) if isinstance(row['extension_attr'], str) else row['extension_attr'] configs[row['dict_code']] = ext_attr.get('value') except (json.JSONDecodeError, AttributeError): configs[row['dict_code']] = None else: configs[row['dict_code']] = None return configs except Exception as e: print(f"Error getting all configs: {e}") return {} @classmethod def batch_set_configs(cls, configs: Dict[str, Any]) -> bool: """ 批量设置配置项 Args: configs: 配置字典 {dict_code: value} Returns: 是否全部设置成功 """ success = True for dict_code, value in configs.items(): if not cls.set_config(dict_code, value): success = False return success # 便捷方法:获取特定配置 @classmethod def get_asr_vocabulary_id(cls) -> Optional[str]: """获取ASR热词字典ID — 优先从 audio_model_config.hot_word_group_id → hot_word_group.vocabulary_id""" audio_cfg = cls.get_active_audio_model_config("asr") if audio_cfg.get("vocabulary_id"): return audio_cfg["vocabulary_id"] # 回退:直接读 audio_model_config.asr_vocabulary_id audio_vocab = cls.get_config_attribute('audio_model', 'vocabulary_id') if audio_vocab: return audio_vocab return cls.get_config(cls.ASR_VOCABULARY_ID) # 声纹配置获取方法(直接使用通用方法) @classmethod def get_voiceprint_template(cls, default: str = "我正在进行声纹采集,这段语音将用于身份识别和验证。\n\n声纹技术能够准确识别每个人独特的声音特征。") -> str: """获取声纹采集模版""" return cls.get_config_attribute('voiceprint_model', 'template_text', default) @classmethod def get_voiceprint_max_size(cls, default: int = 5242880) -> int: """获取声纹文件大小限制 (bytes), 默认5MB""" value = cls.get_config_attribute('voiceprint_model', 'max_size_bytes', None) if value is None: value = cls.get_config_attribute('voiceprint_model', 'voiceprint_max_size', default) try: return int(value) except (ValueError, TypeError): return default @classmethod def get_voiceprint_duration(cls, default: int = 12) -> int: """获取声纹采集最短时长 (秒)""" value = cls.get_config_attribute('voiceprint_model', 'duration_seconds', default) try: return int(value) except (ValueError, TypeError): return default @classmethod def get_voiceprint_sample_rate(cls, default: int = 16000) -> int: """获取声纹采样率""" value = cls.get_config_attribute('voiceprint_model', 'sample_rate', default) try: return int(value) except (ValueError, TypeError): return default @classmethod def get_voiceprint_channels(cls, default: int = 1) -> int: """获取声纹通道数""" value = cls.get_config_attribute('voiceprint_model', 'channels', default) try: return int(value) except (ValueError, TypeError): return default @classmethod def get_timeline_pagesize(cls, default: int = 10) -> int: """获取会议时间轴每页数量""" value = cls.get_config(cls.TIMELINE_PAGESIZE, str(default)) try: return int(value) except (ValueError, TypeError): return default @classmethod def get_default_reset_password(cls, default: str = "111111") -> str: """获取默认重置密码""" return cls.get_config(cls.DEFAULT_RESET_PASSWORD, default) @classmethod def get_max_audio_size(cls, default: int = 100) -> int: """获取上传音频文件大小限制(MB)""" value = cls.get_config(cls.MAX_AUDIO_SIZE, str(default)) try: return int(value) except (ValueError, TypeError): return default @classmethod def get_branding_config(cls) -> Dict[str, str]: return { "app_name": str(cls.get_config(cls.BRANDING_APP_NAME, "智听云平台") or "智听云平台"), "home_headline": str(cls.get_config(cls.BRANDING_HOME_HEADLINE, "智听云平台") or "智听云平台"), "home_tagline": str(cls.get_config(cls.BRANDING_HOME_TAGLINE, "让每一次谈话都产生价值。") or "让每一次谈话都产生价值。"), "console_subtitle": str(cls.get_config(cls.BRANDING_CONSOLE_SUBTITLE, "智能会议控制台") or "智能会议控制台"), "preview_title": str(cls.get_config(cls.BRANDING_PREVIEW_TITLE, "会议预览") or "会议预览"), "login_welcome": str(cls.get_config(cls.BRANDING_LOGIN_WELCOME, "欢迎回来,请输入您的登录凭证。") or "欢迎回来,请输入您的登录凭证。"), "footer_text": str(cls.get_config(cls.BRANDING_FOOTER_TEXT, "©2026 智听云平台") or "©2026 智听云平台"), } # LLM模型配置获取方法(直接使用通用方法) @classmethod def get_llm_model_name(cls, default: str = "qwen-plus") -> str: """获取LLM模型名称""" return cls.get_config_attribute('llm_model', 'model_name', default) @classmethod def get_llm_timeout(cls, default: int = 300) -> int: """获取LLM超时时间(秒)""" value = cls.get_config_attribute('llm_model', 'time_out', default) try: return int(value) except (ValueError, TypeError): return default @classmethod def get_llm_temperature(cls, default: float = 0.7) -> float: """获取LLM temperature参数""" value = cls.get_config_attribute('llm_model', 'temperature', default) try: return float(value) except (ValueError, TypeError): return default @classmethod def get_llm_top_p(cls, default: float = 0.9) -> float: """获取LLM top_p参数""" value = cls.get_config_attribute('llm_model', 'top_p', default) try: return float(value) except (ValueError, TypeError): return default @classmethod def get_llm_max_tokens(cls, default: int = 2048) -> int: """获取LLM最大输出token""" value = cls.get_config_attribute('llm_model', 'max_tokens', default) try: return int(value) except (ValueError, TypeError): return default @classmethod def get_llm_system_prompt(cls, default: str = "请根据提供的内容进行总结和分析。") -> str: """获取LLM系统提示词""" value = cls.get_config_attribute('llm_model', 'system_prompt', default) return value if isinstance(value, str) and value.strip() else default @classmethod def get_llm_endpoint_url(cls, default: str = DEFAULT_LLM_ENDPOINT_URL) -> str: """获取LLM服务Base API""" value = cls.get_config_attribute('llm_model', 'endpoint_url', default) return value if isinstance(value, str) and value.strip() else default @classmethod def get_llm_api_key(cls, default: Optional[str] = None) -> Optional[str]: """获取LLM服务API Key""" value = cls.get_config_attribute('llm_model', 'api_key', default) if value is None: return default value_str = str(value).strip() return value_str or default