codex/dev
mula.liu 2026-04-07 17:26:34 +08:00
parent 3c2ac639b4
commit af735bd93d
6 changed files with 280 additions and 82 deletions

View File

@ -963,8 +963,7 @@ def start_meeting_transcription(
"task_id": existing_status['task_id'], "status": existing_status['status'] "task_id": existing_status['task_id'], "status": existing_status['status']
}) })
task_id = transcription_service.start_transcription(meeting_id, audio_file['file_path']) task_id = transcription_service.start_transcription(meeting_id, audio_file['file_path'])
background_tasks.add_task( async_meeting_service.enqueue_transcription_monitor(
async_meeting_service.monitor_and_auto_summarize,
meeting_id, meeting_id,
task_id, task_id,
meeting.get('prompt_id') if meeting.get('prompt_id') not in (None, 0) else None, meeting.get('prompt_id') if meeting.get('prompt_id') not in (None, 0) else None,
@ -1103,9 +1102,24 @@ def generate_meeting_summary_async(meeting_id: int, request: GenerateSummaryRequ
"task_id": transcription_status.get('task_id'), "task_id": transcription_status.get('task_id'),
"status": transcription_status.get('status') "status": transcription_status.get('status')
}) })
llm_status = async_meeting_service.get_meeting_llm_status(meeting_id)
if llm_status and llm_status.get('status') in ['pending', 'processing']:
return create_api_response(code="409", message="总结任务已存在", data={
"task_id": llm_status.get('task_id'),
"status": llm_status.get('status')
})
# 传递 prompt_id 和 model_code 参数给服务层 # 传递 prompt_id 和 model_code 参数给服务层
task_id = async_meeting_service.start_summary_generation(meeting_id, request.user_prompt, request.prompt_id, request.model_code) task_id, created = async_meeting_service.enqueue_summary_generation(
background_tasks.add_task(async_meeting_service._process_task, task_id) meeting_id,
request.user_prompt,
request.prompt_id,
request.model_code,
)
if not created:
return create_api_response(code="409", message="总结任务已存在", data={
"task_id": task_id,
"status": "pending"
})
return create_api_response(code="200", message="Summary generation task has been accepted.", data={ return create_api_response(code="200", message="Summary generation task has been accepted.", data={
"task_id": task_id, "status": "pending", "meeting_id": meeting_id "task_id": task_id, "status": "pending", "meeting_id": meeting_id
}) })

View File

@ -98,4 +98,10 @@ TRANSCRIPTION_POLL_CONFIG = {
'max_wait_time': int(os.getenv('TRANSCRIPTION_MAX_WAIT_TIME', '1800')), # 最大等待30分钟 'max_wait_time': int(os.getenv('TRANSCRIPTION_MAX_WAIT_TIME', '1800')), # 最大等待30分钟
} }
# 后台任务配置
BACKGROUND_TASK_CONFIG = {
'summary_workers': int(os.getenv('SUMMARY_TASK_MAX_WORKERS', '2')),
'monitor_workers': int(os.getenv('MONITOR_TASK_MAX_WORKERS', '8')),
'transcription_status_cache_ttl': int(os.getenv('TRANSCRIPTION_STATUS_CACHE_TTL', '3')),
}

View File

@ -1,6 +1,6 @@
""" """
异步会议服务 - 处理会议总结生成的异步任务 异步会议服务 - 处理会议总结生成的异步任务
采用FastAPI BackgroundTasks模式 采用受控线程池执行避免阻塞 Web 请求进程
""" """
import uuid import uuid
import time import time
@ -11,11 +11,22 @@ from typing import Optional, Dict, Any, List
from pathlib import Path from pathlib import Path
import redis import redis
from app.core.config import REDIS_CONFIG, TRANSCRIPTION_POLL_CONFIG, AUDIO_DIR from app.core.config import REDIS_CONFIG, TRANSCRIPTION_POLL_CONFIG, BACKGROUND_TASK_CONFIG, AUDIO_DIR
from app.core.database import get_db_connection from app.core.database import get_db_connection
from app.services.async_transcription_service import AsyncTranscriptionService from app.services.async_transcription_service import AsyncTranscriptionService
from app.services.background_task_runner import KeyedBackgroundTaskRunner
from app.services.llm_service import LLMService from app.services.llm_service import LLMService
summary_task_runner = KeyedBackgroundTaskRunner(
max_workers=BACKGROUND_TASK_CONFIG['summary_workers'],
thread_name_prefix="imeeting-summary",
)
monitor_task_runner = KeyedBackgroundTaskRunner(
max_workers=BACKGROUND_TASK_CONFIG['monitor_workers'],
thread_name_prefix="imeeting-monitor",
)
class AsyncMeetingService: class AsyncMeetingService:
"""异步会议服务类 - 处理会议相关的异步任务""" """异步会议服务类 - 处理会议相关的异步任务"""
@ -26,9 +37,42 @@ class AsyncMeetingService:
self.redis_client = redis.Redis(**REDIS_CONFIG) self.redis_client = redis.Redis(**REDIS_CONFIG)
self.llm_service = LLMService() # 复用现有的同步LLM服务 self.llm_service = LLMService() # 复用现有的同步LLM服务
def enqueue_summary_generation(
self,
meeting_id: int,
user_prompt: str = "",
prompt_id: Optional[int] = None,
model_code: Optional[str] = None
) -> tuple[str, bool]:
"""创建并提交总结任务;若已有运行中的同会议总结任务,则直接返回现有任务。"""
existing_task = self._get_existing_summary_task(meeting_id)
if existing_task:
return existing_task, False
task_id = self.start_summary_generation(meeting_id, user_prompt, prompt_id, model_code)
summary_task_runner.submit(f"meeting-summary:{task_id}", self._process_task, task_id)
return task_id, True
def enqueue_transcription_monitor(
self,
meeting_id: int,
transcription_task_id: str,
prompt_id: Optional[int] = None,
model_code: Optional[str] = None
) -> bool:
"""提交转录监控任务,避免同一转录任务重复轮询。"""
return monitor_task_runner.submit(
f"transcription-monitor:{transcription_task_id}",
self.monitor_and_auto_summarize,
meeting_id,
transcription_task_id,
prompt_id,
model_code,
)
def start_summary_generation(self, meeting_id: int, user_prompt: str = "", prompt_id: Optional[int] = None, model_code: Optional[str] = None) -> str: def start_summary_generation(self, meeting_id: int, user_prompt: str = "", prompt_id: Optional[int] = None, model_code: Optional[str] = None) -> str:
""" """
创建异步总结任务任务的执行将由外部如API层的BackgroundTasks触发 创建异步总结任务任务的执行将由后台线程池触发
Args: Args:
meeting_id: 会议ID meeting_id: 会议ID
@ -70,9 +114,11 @@ class AsyncMeetingService:
def _process_task(self, task_id: str): def _process_task(self, task_id: str):
""" """
处理单个异步任务的函数设计为由BackgroundTasks调用 处理单个异步任务的函数在线程池中执行
""" """
print(f"Background task started for meeting summary task: {task_id}") print(f"Background task started for meeting summary task: {task_id}")
lock_token = None
lock_key = f"lock:meeting-summary-task:{task_id}"
try: try:
# 从Redis获取任务数据 # 从Redis获取任务数据
task_data = self.redis_client.hgetall(f"llm_task:{task_id}") task_data = self.redis_client.hgetall(f"llm_task:{task_id}")
@ -85,6 +131,10 @@ class AsyncMeetingService:
prompt_id_str = task_data.get('prompt_id', '') prompt_id_str = task_data.get('prompt_id', '')
prompt_id = int(prompt_id_str) if prompt_id_str and prompt_id_str != '' else None prompt_id = int(prompt_id_str) if prompt_id_str and prompt_id_str != '' else None
model_code = task_data.get('model_code', '') or None model_code = task_data.get('model_code', '') or None
lock_token = self._acquire_lock(lock_key, ttl_seconds=7200)
if not lock_token:
print(f"Task {task_id} is already being processed, skipping duplicate execution")
return
# 1. 更新状态为processing # 1. 更新状态为processing
self._update_task_status_in_redis(task_id, 'processing', 10, message="任务已开始...") self._update_task_status_in_redis(task_id, 'processing', 10, message="任务已开始...")
@ -127,6 +177,8 @@ class AsyncMeetingService:
# 更新失败状态 # 更新失败状态
self._update_task_in_db(task_id, 'failed', 0, error_message=error_msg) self._update_task_in_db(task_id, 'failed', 0, error_message=error_msg)
self._update_task_status_in_redis(task_id, 'failed', 0, error_message=error_msg) self._update_task_status_in_redis(task_id, 'failed', 0, error_message=error_msg)
finally:
self._release_lock(lock_key, lock_token)
def monitor_and_auto_summarize( def monitor_and_auto_summarize(
self, self,
@ -156,6 +208,11 @@ class AsyncMeetingService:
poll_interval = TRANSCRIPTION_POLL_CONFIG['poll_interval'] poll_interval = TRANSCRIPTION_POLL_CONFIG['poll_interval']
max_wait_time = TRANSCRIPTION_POLL_CONFIG['max_wait_time'] max_wait_time = TRANSCRIPTION_POLL_CONFIG['max_wait_time']
max_polls = max_wait_time // poll_interval max_polls = max_wait_time // poll_interval
lock_key = f"lock:transcription-monitor:{transcription_task_id}"
lock_token = self._acquire_lock(lock_key, ttl_seconds=max_wait_time + poll_interval)
if not lock_token:
print(f"[Monitor] Monitor task already running for transcription task {transcription_task_id}, skipping duplicate worker")
return
# 延迟导入以避免循环导入 # 延迟导入以避免循环导入
transcription_service = AsyncTranscriptionService() transcription_service = AsyncTranscriptionService()
@ -186,16 +243,16 @@ class AsyncMeetingService:
else: else:
# 启动总结任务 # 启动总结任务
try: try:
summary_task_id = self.start_summary_generation( summary_task_id, created = self.enqueue_summary_generation(
meeting_id, meeting_id,
user_prompt="", user_prompt="",
prompt_id=prompt_id, prompt_id=prompt_id,
model_code=model_code model_code=model_code
) )
print(f"[Monitor] Summary task {summary_task_id} started for meeting {meeting_id}") if created:
print(f"[Monitor] Summary task {summary_task_id} started for meeting {meeting_id}")
# 在后台执行总结任务 else:
self._process_task(summary_task_id) print(f"[Monitor] Reused existing summary task {summary_task_id} for meeting {meeting_id}")
except Exception as e: except Exception as e:
error_msg = f"Failed to start summary generation: {e}" error_msg = f"Failed to start summary generation: {e}"
@ -232,6 +289,8 @@ class AsyncMeetingService:
except Exception as e: except Exception as e:
print(f"[Monitor] Fatal error in monitor_and_auto_summarize: {e}") print(f"[Monitor] Fatal error in monitor_and_auto_summarize: {e}")
finally:
self._release_lock(lock_key, lock_token)
# --- 会议相关方法 --- # --- 会议相关方法 ---
@ -682,5 +741,30 @@ class AsyncMeetingService:
print(f"Error checking existing summary task: {e}") print(f"Error checking existing summary task: {e}")
return None return None
def _acquire_lock(self, lock_key: str, ttl_seconds: int) -> Optional[str]:
"""使用 Redis 分布式锁防止多 worker 重复执行同一后台任务。"""
try:
token = str(uuid.uuid4())
acquired = self.redis_client.set(lock_key, token, nx=True, ex=max(30, ttl_seconds))
if acquired:
return token
except Exception as e:
print(f"Error acquiring lock {lock_key}: {e}")
return None
def _release_lock(self, lock_key: str, token: Optional[str]) -> None:
if not token:
return
try:
release_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
"""
self.redis_client.eval(release_script, 1, lock_key, token)
except Exception as e:
print(f"Error releasing lock {lock_key}: {e}")
# 创建全局实例 # 创建全局实例
async_meeting_service = AsyncMeetingService() async_meeting_service = AsyncMeetingService()

View File

@ -10,7 +10,7 @@ from http import HTTPStatus
import dashscope import dashscope
from dashscope.audio.asr import Transcription from dashscope.audio.asr import Transcription
from app.core.config import QWEN_API_KEY, REDIS_CONFIG, APP_CONFIG from app.core.config import QWEN_API_KEY, REDIS_CONFIG, APP_CONFIG, BACKGROUND_TASK_CONFIG
from app.core.database import get_db_connection from app.core.database import get_db_connection
from app.services.system_config_service import SystemConfigService from app.services.system_config_service import SystemConfigService
@ -211,56 +211,70 @@ class AsyncTranscriptionService:
current_status = 'failed' current_status = 'failed'
progress = 0 progress = 0
error_message = "An unknown error occurred." error_message = "An unknown error occurred."
updated_at = datetime.now().isoformat()
status_cache_key = f"task_status_cache:{business_task_id}"
try: try:
# 1. 获取任务数据优先Redis回源DB # 1. 获取任务数据优先Redis回源DB
task_data = self._get_task_data(business_task_id) task_data = self._get_task_data(business_task_id)
paraformer_task_id = task_data['paraformer_task_id'] stored_status = str(task_data.get('status') or '').lower()
if stored_status in {'completed', 'failed'}:
# 2. 查询外部API获取状态 current_status = stored_status
try: progress = int(task_data.get('progress') or 0)
session = self._create_requests_session() error_message = task_data.get('error_message') or None
try: updated_at = task_data.get('updated_at') or updated_at
paraformer_response = Transcription.fetch(task=paraformer_task_id, session=session) else:
finally: cached_status = self.redis_client.hgetall(status_cache_key)
session.close() if cached_status and cached_status.get('status') in {'pending', 'processing'}:
if paraformer_response.status_code != HTTPStatus.OK: current_status = cached_status.get('status', 'pending')
raise Exception(f"Failed to fetch task status from provider: {paraformer_response.message}") progress = int(cached_status.get('progress') or 0)
error_message = cached_status.get('error_message') or None
paraformer_status = paraformer_response.output.task_status updated_at = cached_status.get('updated_at') or updated_at
current_status = self._map_paraformer_status(paraformer_status) else:
progress = self._calculate_progress(paraformer_status) paraformer_task_id = task_data['paraformer_task_id']
error_message = None #执行成功,清除初始状态
except Exception as e:
current_status = 'failed'
progress = 0
error_message = f"Error fetching status from provider: {e}"
# 直接进入finally块更新状态后返回
return
# 3. 如果任务完成,处理结果
if current_status == 'completed' and paraformer_response.output.get('results'):
# 防止并发处理:先检查数据库中的状态
db_task_status = self._get_task_status_from_db(business_task_id)
if db_task_status != 'completed':
# 只有当数据库中状态不是completed时才处理
# 先将状态更新为completed作为分布式锁
self._update_task_status_in_db(business_task_id, 'completed', 100, None)
# 2. 查询外部API获取状态
try: try:
self._process_transcription_result( session = self._create_requests_session()
business_task_id, try:
int(task_data['meeting_id']), paraformer_response = Transcription.fetch(task=paraformer_task_id, session=session)
paraformer_response.output finally:
) session.close()
if paraformer_response.status_code != HTTPStatus.OK:
raise Exception(f"Failed to fetch task status from provider: {paraformer_response.message}")
paraformer_status = paraformer_response.output.task_status
current_status = self._map_paraformer_status(paraformer_status)
progress = self._calculate_progress(paraformer_status)
error_message = None #执行成功,清除初始状态
except Exception as e: except Exception as e:
current_status = 'failed' current_status = 'failed'
progress = 100 # 进度为100但状态是失败 progress = 0
error_message = f"Error processing transcription result: {e}" error_message = f"Error fetching status from provider: {e}"
print(error_message)
else: # 3. 如果任务完成,处理结果
print(f"Task {business_task_id} already processed, skipping duplicate processing") if current_status == 'completed' and paraformer_response.output.get('results'):
# 防止并发处理:先检查数据库中的状态
db_task_status = self._get_task_status_from_db(business_task_id)
if db_task_status != 'completed':
# 只有当数据库中状态不是completed时才处理
# 先将状态更新为completed作为分布式锁
self._update_task_status_in_db(business_task_id, 'completed', 100, None)
try:
self._process_transcription_result(
business_task_id,
int(task_data['meeting_id']),
paraformer_response.output
)
except Exception as e:
current_status = 'failed'
progress = 100 # 进度为100但状态是失败
error_message = f"Error processing transcription result: {e}"
print(error_message)
else:
print(f"Task {business_task_id} already processed, skipping duplicate processing")
except Exception as e: except Exception as e:
error_message = f"Error getting task status: {e}" error_message = f"Error getting task status: {e}"
@ -281,33 +295,33 @@ class AsyncTranscriptionService:
if error_message: if error_message:
update_data['error_message'] = error_message update_data['error_message'] = error_message
self.redis_client.hset(f"task:{business_task_id}", mapping=update_data) self.redis_client.hset(f"task:{business_task_id}", mapping=update_data)
self.redis_client.hset(status_cache_key, mapping=update_data)
self.redis_client.expire(
status_cache_key,
max(1, int(BACKGROUND_TASK_CONFIG.get('transcription_status_cache_ttl', 3)))
)
# 更新数据库 # 更新数据库
self._update_task_status_in_db(business_task_id, current_status, progress, error_message) self._update_task_status_in_db(business_task_id, current_status, progress, error_message)
# 5. 构造并返回最终结果 # 5. 构造并返回最终结果
result = { return self._build_task_status_result(
'task_id': business_task_id, business_task_id,
'status': current_status, task_data,
'progress': progress, current_status,
'error_message': error_message, progress,
'updated_at': updated_at, error_message,
'meeting_id': None, updated_at,
'created_at': None, )
}
if task_data:
result['meeting_id'] = int(task_data['meeting_id'])
result['created_at'] = task_data.get('created_at')
return result
def _get_task_data(self, business_task_id: str) -> Dict[str, Any]: def _get_task_data(self, business_task_id: str) -> Dict[str, Any]:
"""从Redis或数据库获取任务数据""" """从Redis或数据库获取任务数据"""
# 尝试从Redis获取 # 尝试从Redis获取
task_data_bytes = self.redis_client.hgetall(f"task:{business_task_id}") task_data_raw = self.redis_client.hgetall(f"task:{business_task_id}")
if task_data_bytes and task_data_bytes.get(b'paraformer_task_id'): if task_data_raw:
# Redis返回的是bytes需要解码 task_data = self._normalize_redis_mapping(task_data_raw)
return {k.decode('utf-8'): v.decode('utf-8') for k, v in task_data_bytes.items()} if task_data.get('paraformer_task_id'):
return task_data
# 如果Redis没有从数据库回源 # 如果Redis没有从数据库回源
task_data_from_db = self._get_task_from_db(business_task_id) task_data_from_db = self._get_task_from_db(business_task_id)
@ -465,7 +479,8 @@ class AsyncTranscriptionService:
cursor = connection.cursor(dictionary=True) cursor = connection.cursor(dictionary=True)
query = """ query = """
SELECT tt.task_id as business_task_id, tt.paraformer_task_id, tt.meeting_id, tt.status, tt.created_at SELECT tt.task_id as business_task_id, tt.paraformer_task_id, tt.meeting_id, tt.status, tt.progress,
tt.error_message, tt.created_at, tt.completed_at
FROM transcript_tasks tt FROM transcript_tasks tt
WHERE tt.task_id = %s WHERE tt.task_id = %s
""" """
@ -480,13 +495,55 @@ class AsyncTranscriptionService:
'paraformer_task_id': result['paraformer_task_id'], 'paraformer_task_id': result['paraformer_task_id'],
'meeting_id': str(result['meeting_id']), 'meeting_id': str(result['meeting_id']),
'status': result['status'], 'status': result['status'],
'created_at': result['created_at'].isoformat() if result['created_at'] else None 'progress': str(result.get('progress') or 0),
'error_message': result.get('error_message') or '',
'created_at': result['created_at'].isoformat() if result['created_at'] else None,
'updated_at': (
result['completed_at'].isoformat()
if result.get('completed_at')
else result['created_at'].isoformat() if result.get('created_at') else None
),
} }
return None return None
except Exception as e: except Exception as e:
print(f"Error getting task from database: {e}") print(f"Error getting task from database: {e}")
return None return None
def _normalize_redis_mapping(self, mapping: Dict[Any, Any]) -> Dict[str, Any]:
normalized: Dict[str, Any] = {}
for key, value in mapping.items():
normalized_key = key.decode('utf-8') if isinstance(key, bytes) else str(key)
if isinstance(value, bytes):
normalized_value = value.decode('utf-8')
else:
normalized_value = value
normalized[normalized_key] = normalized_value
return normalized
def _build_task_status_result(
self,
business_task_id: str,
task_data: Optional[Dict[str, Any]],
status: str,
progress: int,
error_message: Optional[str],
updated_at: str
) -> Dict[str, Any]:
result = {
'task_id': business_task_id,
'status': status,
'progress': progress,
'error_message': error_message,
'updated_at': updated_at,
'meeting_id': None,
'created_at': None,
}
if task_data:
meeting_id = task_data.get('meeting_id')
result['meeting_id'] = int(meeting_id) if meeting_id is not None else None
result['created_at'] = task_data.get('created_at')
return result
def _process_transcription_result(self, business_task_id: str, meeting_id: int, paraformer_output: Any): def _process_transcription_result(self, business_task_id: str, meeting_id: int, paraformer_output: Any):
""" """

View File

@ -45,7 +45,7 @@ def handle_audio_upload(
meeting_id: 会议ID meeting_id: 会议ID
current_user: 当前用户信息 current_user: 当前用户信息
auto_summarize: 是否自动生成总结默认True auto_summarize: 是否自动生成总结默认True
background_tasks: FastAPI 后台任务对象 background_tasks: 为兼容现有调用保留实际后台执行由服务层线程池完成
prompt_id: 提示词模版ID可选如果不指定则使用默认模版 prompt_id: 提示词模版ID可选如果不指定则使用默认模版
model_code: 总结模型编码可选如果不指定则使用默认模型 model_code: 总结模型编码可选如果不指定则使用默认模型
duration: 音频时长 duration: 音频时长
@ -141,16 +141,15 @@ def handle_audio_upload(
transcription_task_id = transcription_service.start_transcription(meeting_id, file_path) transcription_task_id = transcription_service.start_transcription(meeting_id, file_path)
print(f"Transcription task {transcription_task_id} started for meeting {meeting_id}") print(f"Transcription task {transcription_task_id} started for meeting {meeting_id}")
# 5. 如果启用自动总结且提供了 background_tasks添加监控任务 # 5. 如果启用自动总结,则提交后台监控任务
if auto_summarize and transcription_task_id and background_tasks: if auto_summarize and transcription_task_id:
background_tasks.add_task( async_meeting_service.enqueue_transcription_monitor(
async_meeting_service.monitor_and_auto_summarize,
meeting_id, meeting_id,
transcription_task_id, transcription_task_id,
prompt_id, prompt_id,
model_code model_code
) )
print(f"[audio_service] Auto-summarize enabled, monitor task added for meeting {meeting_id}, prompt_id: {prompt_id}, model_code: {model_code}") print(f"[audio_service] Auto-summarize enabled, monitor task scheduled for meeting {meeting_id}, prompt_id: {prompt_id}, model_code: {model_code}")
except Exception as e: except Exception as e:
print(f"Failed to start transcription: {e}") print(f"Failed to start transcription: {e}")

View File

@ -0,0 +1,38 @@
from concurrent.futures import ThreadPoolExecutor
from itertools import count
from threading import Lock
from typing import Any, Callable, Dict
import traceback
class KeyedBackgroundTaskRunner:
"""按 key 去重的后台任务执行器,避免同类长任务重复堆积。"""
def __init__(self, max_workers: int, thread_name_prefix: str):
self._executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix=thread_name_prefix)
self._lock = Lock()
self._seq = count(1)
self._tasks: Dict[str, Dict[str, Any]] = {}
def submit(self, key: str, func: Callable[..., Any], *args: Any, **kwargs: Any) -> bool:
with self._lock:
current = self._tasks.get(key)
if current and not current["future"].done():
return False
token = next(self._seq)
future = self._executor.submit(self._run_task, key, token, func, *args, **kwargs)
self._tasks[key] = {"token": token, "future": future}
return True
def _run_task(self, key: str, token: int, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
try:
func(*args, **kwargs)
except Exception:
print(f"[BackgroundTaskRunner] Task failed, key={key}")
traceback.print_exc()
finally:
with self._lock:
current = self._tasks.get(key)
if current and current["token"] == token:
self._tasks.pop(key, None)