import uuid import os import redis import requests from datetime import datetime from typing import Optional, Dict, Any from http import HTTPStatus import dashscope from dashscope.audio.asr import Transcription from app.core.config import REDIS_CONFIG, APP_CONFIG, BACKGROUND_TASK_CONFIG from app.core.database import get_db_connection from app.services.system_config_service import SystemConfigService class _DefaultTimeoutSession(requests.Session): """为 requests.Session 注入默认超时。""" def __init__(self, default_timeout: Optional[int] = None): super().__init__() self.default_timeout = default_timeout def request(self, method, url, **kwargs): if "timeout" not in kwargs and self.default_timeout: kwargs["timeout"] = self.default_timeout return super().request(method, url, **kwargs) class AsyncTranscriptionService: """异步转录服务类""" def __init__(self): self.redis_client = redis.Redis(**REDIS_CONFIG) self.base_url = APP_CONFIG['base_url'].rstrip('/') @staticmethod def _create_requests_session(default_timeout: Optional[int] = None) -> requests.Session: session = _DefaultTimeoutSession(default_timeout=default_timeout) session.trust_env = os.getenv("IMEETING_USE_SYSTEM_PROXY", "").lower() in {"1", "true", "yes", "on"} return session @staticmethod def _normalize_dashscope_base_address(endpoint_url: Optional[str]) -> Optional[str]: if not endpoint_url: return None normalized = str(endpoint_url).strip().rstrip("/") suffix = "/services/audio/asr/transcription" if normalized.endswith(suffix): normalized = normalized[: -len(suffix)] return normalized or None @staticmethod def _resolve_dashscope_api_key(audio_config: Optional[Dict[str, Any]] = None) -> str: api_key = (audio_config or {}).get("api_key") if isinstance(api_key, str): api_key = api_key.strip() if not api_key: raise Exception("未在启用的 ASR 模型配置中设置 DashScope API Key") return api_key def _build_dashscope_request_options(self, audio_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: request_options: Dict[str, Any] = { "api_key": self._resolve_dashscope_api_key(audio_config), } endpoint_url = (audio_config or {}).get("endpoint_url") base_address = self._normalize_dashscope_base_address(endpoint_url) if base_address: request_options["base_address"] = base_address return request_options @staticmethod def _resolve_request_timeout_seconds(audio_config: Optional[Dict[str, Any]] = None) -> int: value = (audio_config or {}).get("request_timeout_seconds") try: timeout_seconds = int(value) except (TypeError, ValueError): timeout_seconds = 300 return max(10, timeout_seconds) def _dashscope_async_call(self, request_options: Dict[str, Any], call_params: Dict[str, Any], timeout_seconds: int): session = self._create_requests_session(timeout_seconds) try: try: return Transcription.async_call(session=session, **request_options, **call_params) except TypeError: return Transcription.async_call(**request_options, **call_params) finally: session.close() def _dashscope_fetch(self, paraformer_task_id: str, request_options: Dict[str, Any], timeout_seconds: int): session = self._create_requests_session(timeout_seconds) try: try: return Transcription.fetch(task=paraformer_task_id, session=session, **request_options) except TypeError: return Transcription.fetch(task=paraformer_task_id, **request_options) finally: session.close() @staticmethod def _build_dashscope_call_params(audio_config: Dict[str, Any], file_url: str) -> Dict[str, Any]: model_name = audio_config.get("model") or "paraformer-v2" call_params: Dict[str, Any] = { "model": model_name, "file_urls": [file_url], } optional_keys = [ "language_hints", "disfluency_removal_enabled", "diarization_enabled", "speaker_count", "vocabulary_id", "timestamp_alignment_enabled", "channel_id", "special_word_filter", "audio_event_detection_enabled", "phrase_id", ] for key in optional_keys: value = audio_config.get(key) if value is None: continue if isinstance(value, str) and not value.strip(): continue if isinstance(value, list) and not value: continue call_params[key] = value return call_params def test_asr_model(self, audio_config: Dict[str, Any], test_file_url: Optional[str] = None) -> Dict[str, Any]: provider = str(audio_config.get("provider") or "dashscope").strip().lower() if provider != "dashscope": raise Exception(f"当前仅支持 DashScope 音频识别测试,暂不支持供应商: {provider}") request_options = self._build_dashscope_request_options(audio_config) timeout_seconds = self._resolve_request_timeout_seconds(audio_config) dashscope.api_key = request_options["api_key"] target_file_url = ( test_file_url or "https://dashscope.oss-cn-beijing.aliyuncs.com/samples/audio/paraformer/hello_world_female2.wav" ) call_params = self._build_dashscope_call_params(audio_config, target_file_url) response = self._dashscope_async_call(request_options, call_params, timeout_seconds) if response.status_code != HTTPStatus.OK: raise Exception(response.message or "音频模型测试失败") return { "provider_task_id": response.output.task_id, "test_file_url": target_file_url, "used_params": call_params, } def start_transcription(self, meeting_id: int, audio_file_path: str, business_task_id: Optional[str] = None) -> str: """ 启动异步转录任务 Args: meeting_id: 会议ID audio_file_path: 音频文件相对路径 Returns: str: 业务任务ID """ try: # 1. 删除该会议的旧转录数据和任务记录,并清空会议总结 print(f"Cleaning old transcription data for meeting_id: {meeting_id}") with get_db_connection() as connection: cursor = connection.cursor() # 删除旧的转录文本段落 cursor.execute("DELETE FROM transcript_segments WHERE meeting_id = %s", (meeting_id,)) deleted_segments = cursor.rowcount print(f"Deleted {deleted_segments} old transcript segments") # 删除旧的转录任务记录;如果已创建本地占位任务,则保留当前任务记录 if business_task_id: cursor.execute( "DELETE FROM transcript_tasks WHERE meeting_id = %s AND task_id <> %s", (meeting_id, business_task_id), ) else: cursor.execute("DELETE FROM transcript_tasks WHERE meeting_id = %s", (meeting_id,)) deleted_tasks = cursor.rowcount print(f"Deleted {deleted_tasks} old transcript tasks") # 清空会议总结内容 cursor.execute("UPDATE meetings SET summary = NULL WHERE meeting_id = %s", (meeting_id,)) print(f"Cleared summary for meeting_id: {meeting_id}") connection.commit() cursor.close() # 2. 构造完整的文件URL file_url = f"{self.base_url}/{audio_file_path.lstrip('/')}" # 获取音频模型配置 audio_config = SystemConfigService.get_active_audio_model_config("asr") provider = str(audio_config.get("provider") or "dashscope").strip().lower() if provider != "dashscope": raise Exception(f"当前仅支持 DashScope 音频识别,暂不支持供应商: {provider}") request_options = self._build_dashscope_request_options(audio_config) timeout_seconds = self._resolve_request_timeout_seconds(audio_config) dashscope.api_key = request_options["api_key"] call_params = self._build_dashscope_call_params(audio_config, file_url) print( f"Starting transcription for meeting_id: {meeting_id}, " f"file_url: {file_url}, model: {call_params.get('model')}, " f"vocabulary_id: {call_params.get('vocabulary_id')}" ) # 3. 调用Paraformer异步API task_response = self._dashscope_async_call(request_options, call_params, timeout_seconds) if task_response.status_code != HTTPStatus.OK: print(f"Failed to start transcription: {task_response.status_code}, {task_response.message}") raise Exception(f"Transcription API error: {task_response.message}") paraformer_task_id = task_response.output.task_id final_business_task_id = business_task_id or str(uuid.uuid4()) # 4. 在Redis中存储任务映射 current_time = datetime.now().isoformat() task_data = { 'business_task_id': final_business_task_id, 'paraformer_task_id': paraformer_task_id, 'meeting_id': str(meeting_id), 'file_url': file_url, 'status': 'pending', 'progress': '0', 'created_at': current_time, 'updated_at': current_time } # 存储到Redis,过期时间24小时 self.redis_client.hset(f"task:{final_business_task_id}", mapping=task_data) self.redis_client.expire(f"task:{final_business_task_id}", 86400) # 5. 在数据库中创建任务记录 self._save_task_to_db(final_business_task_id, paraformer_task_id, meeting_id, audio_file_path) print(f"Transcription task created: {final_business_task_id}") return final_business_task_id except Exception as e: print(f"Error starting transcription: {e}") raise e def create_local_processing_task( self, meeting_id: int, status: str = "processing", progress: int = 0, error_message: Optional[str] = None, ) -> str: business_task_id = str(uuid.uuid4()) current_time = datetime.now().isoformat() task_data = { "business_task_id": business_task_id, "paraformer_task_id": "", "meeting_id": str(meeting_id), "status": status, "progress": str(progress), "created_at": current_time, "updated_at": current_time, "error_message": error_message or "", } self.redis_client.hset(f"task:{business_task_id}", mapping=task_data) self.redis_client.expire(f"task:{business_task_id}", 86400) with get_db_connection() as connection: cursor = connection.cursor() cursor.execute( """ INSERT INTO transcript_tasks (task_id, paraformer_task_id, meeting_id, status, progress, created_at, error_message) VALUES (%s, NULL, %s, %s, %s, NOW(), %s) """, (business_task_id, meeting_id, status, progress, error_message), ) connection.commit() cursor.close() return business_task_id def update_local_processing_task( self, business_task_id: str, status: str, progress: int, error_message: Optional[str] = None, ) -> None: updated_at = datetime.now().isoformat() self.redis_client.hset( f"task:{business_task_id}", mapping={ "status": status, "progress": str(progress), "updated_at": updated_at, "error_message": error_message or "", }, ) self.redis_client.expire(f"task:{business_task_id}", 86400) self._update_task_status_in_db(business_task_id, status, progress, error_message) def get_task_status(self, business_task_id: str) -> Dict[str, Any]: """ 获取任务状态 Args: business_task_id: 业务任务ID Returns: Dict: 任务状态信息 """ task_data = None current_status = 'failed' progress = 0 error_message = "An unknown error occurred." updated_at = datetime.now().isoformat() status_cache_key = f"task_status_cache:{business_task_id}" try: # 1. 获取任务数据(优先Redis,回源DB) task_data = self._get_task_data(business_task_id) stored_status = str(task_data.get('status') or '').lower() if stored_status in {'completed', 'failed'}: current_status = stored_status progress = int(task_data.get('progress') or 0) error_message = task_data.get('error_message') or None updated_at = task_data.get('updated_at') or updated_at else: paraformer_task_id = task_data.get('paraformer_task_id') if not paraformer_task_id: current_status = task_data.get('status') or 'processing' progress = int(task_data.get('progress') or 0) error_message = task_data.get('error_message') or None updated_at = task_data.get('updated_at') or updated_at else: cached_status_raw = self.redis_client.hgetall(status_cache_key) cached_status = self._normalize_redis_mapping(cached_status_raw) if cached_status_raw else {} if cached_status and cached_status.get('status') in {'pending', 'processing'}: current_status = cached_status.get('status', 'pending') progress = int(cached_status.get('progress') or 0) error_message = cached_status.get('error_message') or None updated_at = cached_status.get('updated_at') or updated_at else: # 2. 查询外部API获取状态 paraformer_response = None try: audio_config = SystemConfigService.get_active_audio_model_config("asr") request_options = self._build_dashscope_request_options(audio_config) timeout_seconds = self._resolve_request_timeout_seconds(audio_config) dashscope.api_key = request_options["api_key"] paraformer_response = self._dashscope_fetch(paraformer_task_id, request_options, timeout_seconds) if paraformer_response.status_code != HTTPStatus.OK: raise Exception(f"Failed to fetch task status from provider: {paraformer_response.message}") paraformer_status = paraformer_response.output.task_status current_status = self._map_paraformer_status(paraformer_status) progress = self._calculate_progress(paraformer_status) error_message = None except Exception as e: # 云侧状态查询抖动不应直接把任务打成 failed, # 保持当前非终态并等待下一轮轮询重试。 current_status = task_data.get('status') or 'processing' progress = int(task_data.get('progress') or 0) error_message = None print( f"Transient provider status fetch error for task {business_task_id}: {e}. " f"Keeping status={current_status}, progress={progress}" ) # 3. 如果任务完成,处理结果 if ( current_status == 'completed' and paraformer_response is not None and paraformer_response.output.get('results') ): db_task_status = self._get_task_status_from_db(business_task_id) if db_task_status != 'completed': self._update_task_status_in_db(business_task_id, 'completed', 100, None) try: self._process_transcription_result( business_task_id, int(task_data['meeting_id']), paraformer_response.output ) except Exception as e: current_status = 'failed' progress = 100 error_message = f"Error processing transcription result: {e}" print(error_message) else: print(f"Task {business_task_id} already processed, skipping duplicate processing") except Exception as e: error_message = f"Error getting task status: {e}" print(error_message) current_status = 'failed' progress = 0 finally: # 4. 更新Redis和数据库状态 updated_at = datetime.now().isoformat() # 更新Redis update_data = { 'status': current_status, 'progress': str(progress), 'updated_at': updated_at } if error_message: update_data['error_message'] = error_message self.redis_client.hset(f"task:{business_task_id}", mapping=update_data) self.redis_client.hset(status_cache_key, mapping=update_data) self.redis_client.expire( status_cache_key, max(1, int(BACKGROUND_TASK_CONFIG.get('transcription_status_cache_ttl', 3))) ) # 更新数据库 self._update_task_status_in_db(business_task_id, current_status, progress, error_message) # 5. 构造并返回最终结果 return self._build_task_status_result( business_task_id, task_data, current_status, progress, error_message, updated_at, ) def _get_task_data(self, business_task_id: str) -> Dict[str, Any]: """从Redis或数据库获取任务数据""" # 尝试从Redis获取 task_data_raw = self.redis_client.hgetall(f"task:{business_task_id}") if task_data_raw: task_data = self._normalize_redis_mapping(task_data_raw) if task_data.get('paraformer_task_id'): return task_data if str(task_data.get('status') or '').lower() in {'pending', 'processing', 'completed', 'failed'}: return task_data # 如果Redis没有,从数据库回源 task_data_from_db = self._get_task_from_db(business_task_id) if not task_data_from_db: raise Exception("Task not found in DB") if ( not task_data_from_db.get('paraformer_task_id') and str(task_data_from_db.get('status') or '').lower() not in {'pending', 'processing', 'completed', 'failed'} ): raise Exception("Task not found in DB or paraformer_task_id is missing") # 将从DB获取的数据缓存回Redis self.redis_client.hset(f"task:{business_task_id}", mapping=task_data_from_db) self.redis_client.expire(f"task:{business_task_id}", 86400) return task_data_from_db def get_meeting_transcription_status(self, meeting_id: int) -> Optional[Dict[str, Any]]: """ 获取会议的转录任务状态 Args: meeting_id: 会议ID Returns: Optional[Dict]: 任务状态信息,如果没有任务返回None """ try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # 查询最新的转录任务 query = """ SELECT task_id, status, progress, created_at, completed_at, error_message FROM transcript_tasks WHERE meeting_id = %s ORDER BY created_at DESC LIMIT 1 """ cursor.execute(query, (meeting_id,)) task_record = cursor.fetchone() # 关闭游标 cursor.close() if not task_record: return None # 如果任务还在进行中,获取最新状态 if task_record['status'] in ['pending', 'processing']: try: return self.get_task_status(task_record['task_id']) except Exception as e: print(f"Failed to get latest task status for meeting {meeting_id}, returning DB status. Error: {e}") return { 'task_id': task_record['task_id'], 'status': task_record['status'], 'progress': task_record['progress'] or 0, 'meeting_id': meeting_id, 'created_at': task_record['created_at'].isoformat() if task_record['created_at'] else None, 'completed_at': task_record['completed_at'].isoformat() if task_record['completed_at'] else None, 'error_message': task_record['error_message'] } except Exception as e: print(f"Error getting meeting transcription status: {e}") return None def _map_paraformer_status(self, paraformer_status: str) -> str: """映射Paraformer状态到业务状态""" status_mapping = { 'PENDING': 'pending', 'RUNNING': 'processing', 'SUCCEEDED': 'completed', 'FAILED': 'failed' } return status_mapping.get(paraformer_status, 'unknown') def _calculate_progress(self, paraformer_status: str) -> int: """根据Paraformer状态计算进度""" progress_mapping = { 'PENDING': 10, 'RUNNING': 50, 'SUCCEEDED': 100, 'FAILED': 0 } return progress_mapping.get(paraformer_status, 0) def _save_task_to_db(self, business_task_id: str, paraformer_task_id: str, meeting_id: int, audio_file_path: str): """保存任务记录到数据库""" try: with get_db_connection() as connection: cursor = connection.cursor() cursor.execute("SELECT task_id FROM transcript_tasks WHERE task_id = %s", (business_task_id,)) existing = cursor.fetchone() if existing: cursor.execute( """ UPDATE transcript_tasks SET paraformer_task_id = %s, meeting_id = %s, status = 'pending', progress = 0, completed_at = NULL, error_message = NULL WHERE task_id = %s """, (paraformer_task_id, meeting_id, business_task_id), ) else: insert_task_query = """ INSERT INTO transcript_tasks (task_id, paraformer_task_id, meeting_id, status, progress, created_at) VALUES (%s, %s, %s, 'pending', 0, NOW()) """ cursor.execute(insert_task_query, (business_task_id, paraformer_task_id, meeting_id)) connection.commit() cursor.close() except Exception as e: print(f"Error saving task to database: {e}") raise e def _update_task_status_in_db(self, business_task_id: str, status: str, progress: int, error_message: Optional[str] = None): """更新数据库中的任务状态""" try: with get_db_connection() as connection: cursor = connection.cursor() params = [status, progress, error_message, business_task_id] if status == 'completed': update_query = """ UPDATE transcript_tasks SET status = %s, progress = %s, completed_at = NOW(), error_message = %s WHERE task_id = %s """ else: update_query = """ UPDATE transcript_tasks SET status = %s, progress = %s, error_message = %s WHERE task_id = %s """ cursor.execute(update_query, tuple(params)) connection.commit() cursor.close() except Exception as e: print(f"Error updating task status in database: {e}") def _get_task_status_from_db(self, business_task_id: str) -> Optional[str]: """从数据库获取任务状态(用于并发控制)""" try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) query = "SELECT status FROM transcript_tasks WHERE task_id = %s" cursor.execute(query, (business_task_id,)) result = cursor.fetchone() cursor.close() if result: return result['status'] return None except Exception as e: print(f"Error getting task status from database: {e}") return None def _get_task_from_db(self, business_task_id: str) -> Optional[Dict[str, str]]: """从数据库获取任务信息""" try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) query = """ SELECT tt.task_id as business_task_id, tt.paraformer_task_id, tt.meeting_id, tt.status, tt.progress, tt.error_message, tt.created_at, tt.completed_at FROM transcript_tasks tt WHERE tt.task_id = %s """ cursor.execute(query, (business_task_id,)) result = cursor.fetchone() cursor.close() if result: # 转换为与Redis一致的字符串格式 return { 'business_task_id': result['business_task_id'], 'paraformer_task_id': result['paraformer_task_id'], 'meeting_id': str(result['meeting_id']), 'status': result['status'], 'progress': str(result.get('progress') or 0), 'error_message': result.get('error_message') or '', 'created_at': result['created_at'].isoformat() if result['created_at'] else None, 'updated_at': ( result['completed_at'].isoformat() if result.get('completed_at') else result['created_at'].isoformat() if result.get('created_at') else None ), } return None except Exception as e: print(f"Error getting task from database: {e}") return None def _normalize_redis_mapping(self, mapping: Dict[Any, Any]) -> Dict[str, Any]: normalized: Dict[str, Any] = {} for key, value in mapping.items(): normalized_key = key.decode('utf-8') if isinstance(key, bytes) else str(key) if isinstance(value, bytes): normalized_value = value.decode('utf-8') else: normalized_value = value normalized[normalized_key] = normalized_value return normalized def _build_task_status_result( self, business_task_id: str, task_data: Optional[Dict[str, Any]], status: str, progress: int, error_message: Optional[str], updated_at: str ) -> Dict[str, Any]: result = { 'task_id': business_task_id, 'status': status, 'progress': progress, 'error_message': error_message, 'updated_at': updated_at, 'meeting_id': None, 'created_at': None, } if task_data: meeting_id = task_data.get('meeting_id') result['meeting_id'] = int(meeting_id) if meeting_id is not None else None result['created_at'] = task_data.get('created_at') return result def _process_transcription_result(self, business_task_id: str, meeting_id: int, paraformer_output: Any): """ 处理转录结果. 如果处理失败,此函数会抛出异常. """ try: if not paraformer_output.get('results'): raise Exception("No transcription results found in the provider response.") transcription_url = paraformer_output['results'][0]['transcription_url'] print(f"Fetching transcription from URL: {transcription_url}") audio_config = SystemConfigService.get_active_audio_model_config("asr") timeout_seconds = self._resolve_request_timeout_seconds(audio_config) session = self._create_requests_session(timeout_seconds) try: response = session.get(transcription_url, timeout=timeout_seconds) finally: session.close() response.raise_for_status() transcription_data = response.json() # 保存转录内容到数据库 self._save_segments_to_db(transcription_data, meeting_id) print(f"Transcription result processed for task: {business_task_id}") except Exception as e: # 记录具体错误并重新抛出,以便上层捕获 print(f"Error processing transcription result for task {business_task_id}: {e}") raise def _save_segments_to_db(self, data: dict, meeting_id: int): """保存转录分段到数据库""" segments_to_insert = [] for transcript in data.get('transcripts', []): for sentence in transcript.get('sentences', []): speaker_id = sentence.get('speaker_id', -1) segments_to_insert.append(( meeting_id, speaker_id, f"发言人 {speaker_id}", # 默认speaker_tag sentence.get('begin_time'), sentence.get('end_time'), sentence.get('text') )) if not segments_to_insert: print("No segments to save.") return try: with get_db_connection() as connection: cursor = connection.cursor() # 清除该会议的现有转录分段(防止重复插入) delete_query = "DELETE FROM transcript_segments WHERE meeting_id = %s" cursor.execute(delete_query, (meeting_id,)) print(f"Deleted existing segments for meeting_id: {meeting_id}") # 插入新的转录分段 insert_query = ''' INSERT INTO transcript_segments (meeting_id, speaker_id, speaker_tag, start_time_ms, end_time_ms, text_content) VALUES (%s, %s, %s, %s, %s, %s) ''' cursor.executemany(insert_query, segments_to_insert) connection.commit() cursor.close() print(f"Successfully saved {len(segments_to_insert)} segments to the database for meeting_id: {meeting_id}") except Exception as e: print(f"Database error when saving segments: {e}") raise