refactor(background): 使用线程替代BackgroundTasks执行异步任务

重构异步任务执行机制,将FastAPI的BackgroundTasks替换为独立的线程执行。修改了async_meeting_service,添加run_task_in_background方法创建守护线程,并在会议总结生成和音频上传后的自动总结监控中调用。这消除了对API层BackgroundTasks参数的依赖,使服务层更独立且便于测试。
AlanPaine 2026-04-07 06:36:37 +00:00
parent abc5342258
commit 6c714ad780
3 changed files with 15 additions and 10 deletions

View File

@ -1105,7 +1105,7 @@ def generate_meeting_summary_async(meeting_id: int, request: GenerateSummaryRequ
}) })
# 传递 prompt_id 和 model_code 参数给服务层 # 传递 prompt_id 和 model_code 参数给服务层
task_id = async_meeting_service.start_summary_generation(meeting_id, request.user_prompt, request.prompt_id, request.model_code) task_id = async_meeting_service.start_summary_generation(meeting_id, request.user_prompt, request.prompt_id, request.model_code)
background_tasks.add_task(async_meeting_service._process_task, task_id) async_meeting_service.run_task_in_background(async_meeting_service._process_task, task_id)
return create_api_response(code="200", message="Summary generation task has been accepted.", data={ return create_api_response(code="200", message="Summary generation task has been accepted.", data={
"task_id": task_id, "status": "pending", "meeting_id": meeting_id "task_id": task_id, "status": "pending", "meeting_id": meeting_id
}) })

View File

@ -6,6 +6,7 @@ import uuid
import time import time
import os import os
import re import re
import threading
from datetime import datetime from datetime import datetime
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
from pathlib import Path from pathlib import Path
@ -26,9 +27,14 @@ class AsyncMeetingService:
self.redis_client = redis.Redis(**REDIS_CONFIG) self.redis_client = redis.Redis(**REDIS_CONFIG)
self.llm_service = LLMService() # 复用现有的同步LLM服务 self.llm_service = LLMService() # 复用现有的同步LLM服务
def run_task_in_background(self, target, *args):
thread = threading.Thread(target=target, args=args, daemon=True)
thread.start()
return thread
def start_summary_generation(self, meeting_id: int, user_prompt: str = "", prompt_id: Optional[int] = None, model_code: Optional[str] = None) -> str: def start_summary_generation(self, meeting_id: int, user_prompt: str = "", prompt_id: Optional[int] = None, model_code: Optional[str] = None) -> str:
""" """
创建异步总结任务任务的执行将由外部如API层的BackgroundTasks触发 创建异步总结任务
Args: Args:
meeting_id: 会议ID meeting_id: 会议ID
@ -70,7 +76,7 @@ class AsyncMeetingService:
def _process_task(self, task_id: str): def _process_task(self, task_id: str):
""" """
处理单个异步任务的函数设计为由BackgroundTasks调用 处理单个异步任务的函数
""" """
print(f"Background task started for meeting summary task: {task_id}") print(f"Background task started for meeting summary task: {task_id}")
try: try:
@ -137,7 +143,7 @@ class AsyncMeetingService:
): ):
""" """
监控转录任务完成后自动生成总结 监控转录任务完成后自动生成总结
此方法设计为由BackgroundTasks调用在后台运行 此方法独立后台线程中运行
Args: Args:
meeting_id: 会议ID meeting_id: 会议ID
@ -194,8 +200,7 @@ class AsyncMeetingService:
) )
print(f"[Monitor] Summary task {summary_task_id} started for meeting {meeting_id}") print(f"[Monitor] Summary task {summary_task_id} started for meeting {meeting_id}")
# 在后台执行总结任务 self.run_task_in_background(self._process_task, summary_task_id)
self._process_task(summary_task_id)
except Exception as e: except Exception as e:
error_msg = f"Failed to start summary generation: {e}" error_msg = f"Failed to start summary generation: {e}"

View File

@ -141,16 +141,16 @@ def handle_audio_upload(
transcription_task_id = transcription_service.start_transcription(meeting_id, file_path) transcription_task_id = transcription_service.start_transcription(meeting_id, file_path)
print(f"Transcription task {transcription_task_id} started for meeting {meeting_id}") print(f"Transcription task {transcription_task_id} started for meeting {meeting_id}")
# 5. 如果启用自动总结且提供了 background_tasks添加监控任务 # 5. 如果启用自动总结,启动后台监控任务
if auto_summarize and transcription_task_id and background_tasks: if auto_summarize and transcription_task_id:
background_tasks.add_task( async_meeting_service.run_task_in_background(
async_meeting_service.monitor_and_auto_summarize, async_meeting_service.monitor_and_auto_summarize,
meeting_id, meeting_id,
transcription_task_id, transcription_task_id,
prompt_id, prompt_id,
model_code model_code
) )
print(f"[audio_service] Auto-summarize enabled, monitor task added for meeting {meeting_id}, prompt_id: {prompt_id}, model_code: {model_code}") print(f"[audio_service] Auto-summarize enabled, monitor task started for meeting {meeting_id}, prompt_id: {prompt_id}, model_code: {model_code}")
except Exception as e: except Exception as e:
print(f"Failed to start transcription: {e}") print(f"Failed to start transcription: {e}")