diff --git a/backend/app/api/endpoints/meetings.py b/backend/app/api/endpoints/meetings.py index a0525d7..edf6ee4 100644 --- a/backend/app/api/endpoints/meetings.py +++ b/backend/app/api/endpoints/meetings.py @@ -963,8 +963,7 @@ def start_meeting_transcription( "task_id": existing_status['task_id'], "status": existing_status['status'] }) task_id = transcription_service.start_transcription(meeting_id, audio_file['file_path']) - background_tasks.add_task( - async_meeting_service.monitor_and_auto_summarize, + async_meeting_service.enqueue_transcription_monitor( meeting_id, task_id, meeting.get('prompt_id') if meeting.get('prompt_id') not in (None, 0) else None, @@ -1103,9 +1102,24 @@ def generate_meeting_summary_async(meeting_id: int, request: GenerateSummaryRequ "task_id": transcription_status.get('task_id'), "status": transcription_status.get('status') }) + llm_status = async_meeting_service.get_meeting_llm_status(meeting_id) + if llm_status and llm_status.get('status') in ['pending', 'processing']: + return create_api_response(code="409", message="总结任务已存在", data={ + "task_id": llm_status.get('task_id'), + "status": llm_status.get('status') + }) # 传递 prompt_id 和 model_code 参数给服务层 - task_id = async_meeting_service.start_summary_generation(meeting_id, request.user_prompt, request.prompt_id, request.model_code) - background_tasks.add_task(async_meeting_service._process_task, task_id) + task_id, created = async_meeting_service.enqueue_summary_generation( + meeting_id, + request.user_prompt, + request.prompt_id, + request.model_code, + ) + if not created: + return create_api_response(code="409", message="总结任务已存在", data={ + "task_id": task_id, + "status": "pending" + }) return create_api_response(code="200", message="Summary generation task has been accepted.", data={ "task_id": task_id, "status": "pending", "meeting_id": meeting_id }) diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 67c358d..d0a5738 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -98,4 +98,10 @@ TRANSCRIPTION_POLL_CONFIG = { 'max_wait_time': int(os.getenv('TRANSCRIPTION_MAX_WAIT_TIME', '1800')), # 最大等待:30分钟 } +# 后台任务配置 +BACKGROUND_TASK_CONFIG = { + 'summary_workers': int(os.getenv('SUMMARY_TASK_MAX_WORKERS', '2')), + 'monitor_workers': int(os.getenv('MONITOR_TASK_MAX_WORKERS', '8')), + 'transcription_status_cache_ttl': int(os.getenv('TRANSCRIPTION_STATUS_CACHE_TTL', '3')), +} diff --git a/backend/app/services/async_meeting_service.py b/backend/app/services/async_meeting_service.py index 09fc43e..28958fd 100644 --- a/backend/app/services/async_meeting_service.py +++ b/backend/app/services/async_meeting_service.py @@ -1,6 +1,6 @@ """ 异步会议服务 - 处理会议总结生成的异步任务 -采用FastAPI BackgroundTasks模式 +采用受控线程池执行,避免阻塞 Web 请求进程 """ import uuid import time @@ -11,11 +11,22 @@ from typing import Optional, Dict, Any, List from pathlib import Path import redis -from app.core.config import REDIS_CONFIG, TRANSCRIPTION_POLL_CONFIG, AUDIO_DIR +from app.core.config import REDIS_CONFIG, TRANSCRIPTION_POLL_CONFIG, BACKGROUND_TASK_CONFIG, AUDIO_DIR from app.core.database import get_db_connection from app.services.async_transcription_service import AsyncTranscriptionService +from app.services.background_task_runner import KeyedBackgroundTaskRunner from app.services.llm_service import LLMService + +summary_task_runner = KeyedBackgroundTaskRunner( + max_workers=BACKGROUND_TASK_CONFIG['summary_workers'], + thread_name_prefix="imeeting-summary", +) +monitor_task_runner = KeyedBackgroundTaskRunner( + max_workers=BACKGROUND_TASK_CONFIG['monitor_workers'], + thread_name_prefix="imeeting-monitor", +) + class AsyncMeetingService: """异步会议服务类 - 处理会议相关的异步任务""" @@ -26,9 +37,42 @@ class AsyncMeetingService: self.redis_client = redis.Redis(**REDIS_CONFIG) self.llm_service = LLMService() # 复用现有的同步LLM服务 + def enqueue_summary_generation( + self, + meeting_id: int, + user_prompt: str = "", + prompt_id: Optional[int] = None, + model_code: Optional[str] = None + ) -> tuple[str, bool]: + """创建并提交总结任务;若已有运行中的同会议总结任务,则直接返回现有任务。""" + existing_task = self._get_existing_summary_task(meeting_id) + if existing_task: + return existing_task, False + + task_id = self.start_summary_generation(meeting_id, user_prompt, prompt_id, model_code) + summary_task_runner.submit(f"meeting-summary:{task_id}", self._process_task, task_id) + return task_id, True + + def enqueue_transcription_monitor( + self, + meeting_id: int, + transcription_task_id: str, + prompt_id: Optional[int] = None, + model_code: Optional[str] = None + ) -> bool: + """提交转录监控任务,避免同一转录任务重复轮询。""" + return monitor_task_runner.submit( + f"transcription-monitor:{transcription_task_id}", + self.monitor_and_auto_summarize, + meeting_id, + transcription_task_id, + prompt_id, + model_code, + ) + def start_summary_generation(self, meeting_id: int, user_prompt: str = "", prompt_id: Optional[int] = None, model_code: Optional[str] = None) -> str: """ - 创建异步总结任务,任务的执行将由外部(如API层的BackgroundTasks)触发。 + 创建异步总结任务,任务的执行将由后台线程池触发。 Args: meeting_id: 会议ID @@ -70,9 +114,11 @@ class AsyncMeetingService: def _process_task(self, task_id: str): """ - 处理单个异步任务的函数,设计为由BackgroundTasks调用。 + 处理单个异步任务的函数,在线程池中执行。 """ print(f"Background task started for meeting summary task: {task_id}") + lock_token = None + lock_key = f"lock:meeting-summary-task:{task_id}" try: # 从Redis获取任务数据 task_data = self.redis_client.hgetall(f"llm_task:{task_id}") @@ -85,6 +131,10 @@ class AsyncMeetingService: prompt_id_str = task_data.get('prompt_id', '') prompt_id = int(prompt_id_str) if prompt_id_str and prompt_id_str != '' else None model_code = task_data.get('model_code', '') or None + lock_token = self._acquire_lock(lock_key, ttl_seconds=7200) + if not lock_token: + print(f"Task {task_id} is already being processed, skipping duplicate execution") + return # 1. 更新状态为processing self._update_task_status_in_redis(task_id, 'processing', 10, message="任务已开始...") @@ -127,6 +177,8 @@ class AsyncMeetingService: # 更新失败状态 self._update_task_in_db(task_id, 'failed', 0, error_message=error_msg) self._update_task_status_in_redis(task_id, 'failed', 0, error_message=error_msg) + finally: + self._release_lock(lock_key, lock_token) def monitor_and_auto_summarize( self, @@ -156,6 +208,11 @@ class AsyncMeetingService: poll_interval = TRANSCRIPTION_POLL_CONFIG['poll_interval'] max_wait_time = TRANSCRIPTION_POLL_CONFIG['max_wait_time'] max_polls = max_wait_time // poll_interval + lock_key = f"lock:transcription-monitor:{transcription_task_id}" + lock_token = self._acquire_lock(lock_key, ttl_seconds=max_wait_time + poll_interval) + if not lock_token: + print(f"[Monitor] Monitor task already running for transcription task {transcription_task_id}, skipping duplicate worker") + return # 延迟导入以避免循环导入 transcription_service = AsyncTranscriptionService() @@ -186,16 +243,16 @@ class AsyncMeetingService: else: # 启动总结任务 try: - summary_task_id = self.start_summary_generation( + summary_task_id, created = self.enqueue_summary_generation( meeting_id, user_prompt="", prompt_id=prompt_id, model_code=model_code ) - print(f"[Monitor] Summary task {summary_task_id} started for meeting {meeting_id}") - - # 在后台执行总结任务 - self._process_task(summary_task_id) + if created: + print(f"[Monitor] Summary task {summary_task_id} started for meeting {meeting_id}") + else: + print(f"[Monitor] Reused existing summary task {summary_task_id} for meeting {meeting_id}") except Exception as e: error_msg = f"Failed to start summary generation: {e}" @@ -232,6 +289,8 @@ class AsyncMeetingService: except Exception as e: print(f"[Monitor] Fatal error in monitor_and_auto_summarize: {e}") + finally: + self._release_lock(lock_key, lock_token) # --- 会议相关方法 --- @@ -682,5 +741,30 @@ class AsyncMeetingService: print(f"Error checking existing summary task: {e}") return None + def _acquire_lock(self, lock_key: str, ttl_seconds: int) -> Optional[str]: + """使用 Redis 分布式锁防止多 worker 重复执行同一后台任务。""" + try: + token = str(uuid.uuid4()) + acquired = self.redis_client.set(lock_key, token, nx=True, ex=max(30, ttl_seconds)) + if acquired: + return token + except Exception as e: + print(f"Error acquiring lock {lock_key}: {e}") + return None + + def _release_lock(self, lock_key: str, token: Optional[str]) -> None: + if not token: + return + try: + release_script = """ + if redis.call('get', KEYS[1]) == ARGV[1] then + return redis.call('del', KEYS[1]) + end + return 0 + """ + self.redis_client.eval(release_script, 1, lock_key, token) + except Exception as e: + print(f"Error releasing lock {lock_key}: {e}") + # 创建全局实例 async_meeting_service = AsyncMeetingService() diff --git a/backend/app/services/async_transcription_service.py b/backend/app/services/async_transcription_service.py index ec57bb1..0e127cd 100644 --- a/backend/app/services/async_transcription_service.py +++ b/backend/app/services/async_transcription_service.py @@ -10,7 +10,7 @@ from http import HTTPStatus import dashscope from dashscope.audio.asr import Transcription -from app.core.config import QWEN_API_KEY, REDIS_CONFIG, APP_CONFIG +from app.core.config import QWEN_API_KEY, REDIS_CONFIG, APP_CONFIG, BACKGROUND_TASK_CONFIG from app.core.database import get_db_connection from app.services.system_config_service import SystemConfigService @@ -211,56 +211,70 @@ class AsyncTranscriptionService: current_status = 'failed' progress = 0 error_message = "An unknown error occurred." + updated_at = datetime.now().isoformat() + status_cache_key = f"task_status_cache:{business_task_id}" try: # 1. 获取任务数据(优先Redis,回源DB) task_data = self._get_task_data(business_task_id) - paraformer_task_id = task_data['paraformer_task_id'] - - # 2. 查询外部API获取状态 - try: - session = self._create_requests_session() - try: - paraformer_response = Transcription.fetch(task=paraformer_task_id, session=session) - finally: - session.close() - if paraformer_response.status_code != HTTPStatus.OK: - raise Exception(f"Failed to fetch task status from provider: {paraformer_response.message}") - - paraformer_status = paraformer_response.output.task_status - current_status = self._map_paraformer_status(paraformer_status) - progress = self._calculate_progress(paraformer_status) - error_message = None #执行成功,清除初始状态 - - except Exception as e: - current_status = 'failed' - progress = 0 - error_message = f"Error fetching status from provider: {e}" - # 直接进入finally块更新状态后返回 - return - - # 3. 如果任务完成,处理结果 - if current_status == 'completed' and paraformer_response.output.get('results'): - # 防止并发处理:先检查数据库中的状态 - db_task_status = self._get_task_status_from_db(business_task_id) - if db_task_status != 'completed': - # 只有当数据库中状态不是completed时才处理 - # 先将状态更新为completed,作为分布式锁 - self._update_task_status_in_db(business_task_id, 'completed', 100, None) + stored_status = str(task_data.get('status') or '').lower() + if stored_status in {'completed', 'failed'}: + current_status = stored_status + progress = int(task_data.get('progress') or 0) + error_message = task_data.get('error_message') or None + updated_at = task_data.get('updated_at') or updated_at + else: + cached_status = self.redis_client.hgetall(status_cache_key) + if cached_status and cached_status.get('status') in {'pending', 'processing'}: + current_status = cached_status.get('status', 'pending') + progress = int(cached_status.get('progress') or 0) + error_message = cached_status.get('error_message') or None + updated_at = cached_status.get('updated_at') or updated_at + else: + paraformer_task_id = task_data['paraformer_task_id'] + # 2. 查询外部API获取状态 try: - self._process_transcription_result( - business_task_id, - int(task_data['meeting_id']), - paraformer_response.output - ) + session = self._create_requests_session() + try: + paraformer_response = Transcription.fetch(task=paraformer_task_id, session=session) + finally: + session.close() + if paraformer_response.status_code != HTTPStatus.OK: + raise Exception(f"Failed to fetch task status from provider: {paraformer_response.message}") + + paraformer_status = paraformer_response.output.task_status + current_status = self._map_paraformer_status(paraformer_status) + progress = self._calculate_progress(paraformer_status) + error_message = None #执行成功,清除初始状态 + except Exception as e: current_status = 'failed' - progress = 100 # 进度为100,但状态是失败 - error_message = f"Error processing transcription result: {e}" - print(error_message) - else: - print(f"Task {business_task_id} already processed, skipping duplicate processing") + progress = 0 + error_message = f"Error fetching status from provider: {e}" + + # 3. 如果任务完成,处理结果 + if current_status == 'completed' and paraformer_response.output.get('results'): + # 防止并发处理:先检查数据库中的状态 + db_task_status = self._get_task_status_from_db(business_task_id) + if db_task_status != 'completed': + # 只有当数据库中状态不是completed时才处理 + # 先将状态更新为completed,作为分布式锁 + self._update_task_status_in_db(business_task_id, 'completed', 100, None) + + try: + self._process_transcription_result( + business_task_id, + int(task_data['meeting_id']), + paraformer_response.output + ) + except Exception as e: + current_status = 'failed' + progress = 100 # 进度为100,但状态是失败 + error_message = f"Error processing transcription result: {e}" + print(error_message) + else: + print(f"Task {business_task_id} already processed, skipping duplicate processing") except Exception as e: error_message = f"Error getting task status: {e}" @@ -281,33 +295,33 @@ class AsyncTranscriptionService: if error_message: update_data['error_message'] = error_message self.redis_client.hset(f"task:{business_task_id}", mapping=update_data) + self.redis_client.hset(status_cache_key, mapping=update_data) + self.redis_client.expire( + status_cache_key, + max(1, int(BACKGROUND_TASK_CONFIG.get('transcription_status_cache_ttl', 3))) + ) # 更新数据库 self._update_task_status_in_db(business_task_id, current_status, progress, error_message) # 5. 构造并返回最终结果 - result = { - 'task_id': business_task_id, - 'status': current_status, - 'progress': progress, - 'error_message': error_message, - 'updated_at': updated_at, - 'meeting_id': None, - 'created_at': None, - } - if task_data: - result['meeting_id'] = int(task_data['meeting_id']) - result['created_at'] = task_data.get('created_at') - - return result + return self._build_task_status_result( + business_task_id, + task_data, + current_status, + progress, + error_message, + updated_at, + ) def _get_task_data(self, business_task_id: str) -> Dict[str, Any]: """从Redis或数据库获取任务数据""" # 尝试从Redis获取 - task_data_bytes = self.redis_client.hgetall(f"task:{business_task_id}") - if task_data_bytes and task_data_bytes.get(b'paraformer_task_id'): - # Redis返回的是bytes,需要解码 - return {k.decode('utf-8'): v.decode('utf-8') for k, v in task_data_bytes.items()} + task_data_raw = self.redis_client.hgetall(f"task:{business_task_id}") + if task_data_raw: + task_data = self._normalize_redis_mapping(task_data_raw) + if task_data.get('paraformer_task_id'): + return task_data # 如果Redis没有,从数据库回源 task_data_from_db = self._get_task_from_db(business_task_id) @@ -465,7 +479,8 @@ class AsyncTranscriptionService: cursor = connection.cursor(dictionary=True) query = """ - SELECT tt.task_id as business_task_id, tt.paraformer_task_id, tt.meeting_id, tt.status, tt.created_at + SELECT tt.task_id as business_task_id, tt.paraformer_task_id, tt.meeting_id, tt.status, tt.progress, + tt.error_message, tt.created_at, tt.completed_at FROM transcript_tasks tt WHERE tt.task_id = %s """ @@ -480,13 +495,55 @@ class AsyncTranscriptionService: 'paraformer_task_id': result['paraformer_task_id'], 'meeting_id': str(result['meeting_id']), 'status': result['status'], - 'created_at': result['created_at'].isoformat() if result['created_at'] else None + 'progress': str(result.get('progress') or 0), + 'error_message': result.get('error_message') or '', + 'created_at': result['created_at'].isoformat() if result['created_at'] else None, + 'updated_at': ( + result['completed_at'].isoformat() + if result.get('completed_at') + else result['created_at'].isoformat() if result.get('created_at') else None + ), } return None except Exception as e: print(f"Error getting task from database: {e}") return None + + def _normalize_redis_mapping(self, mapping: Dict[Any, Any]) -> Dict[str, Any]: + normalized: Dict[str, Any] = {} + for key, value in mapping.items(): + normalized_key = key.decode('utf-8') if isinstance(key, bytes) else str(key) + if isinstance(value, bytes): + normalized_value = value.decode('utf-8') + else: + normalized_value = value + normalized[normalized_key] = normalized_value + return normalized + + def _build_task_status_result( + self, + business_task_id: str, + task_data: Optional[Dict[str, Any]], + status: str, + progress: int, + error_message: Optional[str], + updated_at: str + ) -> Dict[str, Any]: + result = { + 'task_id': business_task_id, + 'status': status, + 'progress': progress, + 'error_message': error_message, + 'updated_at': updated_at, + 'meeting_id': None, + 'created_at': None, + } + if task_data: + meeting_id = task_data.get('meeting_id') + result['meeting_id'] = int(meeting_id) if meeting_id is not None else None + result['created_at'] = task_data.get('created_at') + return result def _process_transcription_result(self, business_task_id: str, meeting_id: int, paraformer_output: Any): """ diff --git a/backend/app/services/audio_service.py b/backend/app/services/audio_service.py index ea39ca6..60f5960 100644 --- a/backend/app/services/audio_service.py +++ b/backend/app/services/audio_service.py @@ -45,7 +45,7 @@ def handle_audio_upload( meeting_id: 会议ID current_user: 当前用户信息 auto_summarize: 是否自动生成总结(默认True) - background_tasks: FastAPI 后台任务对象 + background_tasks: 为兼容现有调用保留,实际后台执行由服务层线程池完成 prompt_id: 提示词模版ID(可选,如果不指定则使用默认模版) model_code: 总结模型编码(可选,如果不指定则使用默认模型) duration: 音频时长(秒) @@ -141,16 +141,15 @@ def handle_audio_upload( transcription_task_id = transcription_service.start_transcription(meeting_id, file_path) print(f"Transcription task {transcription_task_id} started for meeting {meeting_id}") - # 5. 如果启用自动总结且提供了 background_tasks,添加监控任务 - if auto_summarize and transcription_task_id and background_tasks: - background_tasks.add_task( - async_meeting_service.monitor_and_auto_summarize, + # 5. 如果启用自动总结,则提交后台监控任务 + if auto_summarize and transcription_task_id: + async_meeting_service.enqueue_transcription_monitor( meeting_id, transcription_task_id, prompt_id, model_code ) - print(f"[audio_service] Auto-summarize enabled, monitor task added for meeting {meeting_id}, prompt_id: {prompt_id}, model_code: {model_code}") + print(f"[audio_service] Auto-summarize enabled, monitor task scheduled for meeting {meeting_id}, prompt_id: {prompt_id}, model_code: {model_code}") except Exception as e: print(f"Failed to start transcription: {e}") diff --git a/backend/app/services/background_task_runner.py b/backend/app/services/background_task_runner.py new file mode 100644 index 0000000..0db0d67 --- /dev/null +++ b/backend/app/services/background_task_runner.py @@ -0,0 +1,38 @@ +from concurrent.futures import ThreadPoolExecutor +from itertools import count +from threading import Lock +from typing import Any, Callable, Dict +import traceback + + +class KeyedBackgroundTaskRunner: + """按 key 去重的后台任务执行器,避免同类长任务重复堆积。""" + + def __init__(self, max_workers: int, thread_name_prefix: str): + self._executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix=thread_name_prefix) + self._lock = Lock() + self._seq = count(1) + self._tasks: Dict[str, Dict[str, Any]] = {} + + def submit(self, key: str, func: Callable[..., Any], *args: Any, **kwargs: Any) -> bool: + with self._lock: + current = self._tasks.get(key) + if current and not current["future"].done(): + return False + + token = next(self._seq) + future = self._executor.submit(self._run_task, key, token, func, *args, **kwargs) + self._tasks[key] = {"token": token, "future": future} + return True + + def _run_task(self, key: str, token: int, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None: + try: + func(*args, **kwargs) + except Exception: + print(f"[BackgroundTaskRunner] Task failed, key={key}") + traceback.print_exc() + finally: + with self._lock: + current = self._tasks.get(key) + if current and current["token"] == token: + self._tasks.pop(key, None)