""" 异步会议服务 - 处理会议总结生成的异步任务 采用FastAPI BackgroundTasks模式 """ import uuid import time import os from datetime import datetime from typing import Optional, Dict, Any, List from pathlib import Path import redis from app.core.config import REDIS_CONFIG, TRANSCRIPTION_POLL_CONFIG, AUDIO_DIR from app.core.database import get_db_connection from app.services.async_transcription_service import AsyncTranscriptionService from app.services.llm_service import LLMService class AsyncMeetingService: """异步会议服务类 - 处理会议相关的异步任务""" def __init__(self): # 确保redis客户端自动解码响应,代码更简洁 if 'decode_responses' not in REDIS_CONFIG: REDIS_CONFIG['decode_responses'] = True self.redis_client = redis.Redis(**REDIS_CONFIG) self.llm_service = LLMService() # 复用现有的同步LLM服务 def start_summary_generation(self, meeting_id: int, user_prompt: str = "", prompt_id: Optional[int] = None, model_code: Optional[str] = None) -> str: """ 创建异步总结任务,任务的执行将由外部(如API层的BackgroundTasks)触发。 Args: meeting_id: 会议ID user_prompt: 用户额外提示词 prompt_id: 可选的提示词模版ID,如果不指定则使用默认模版 model_code: 可选的LLM模型编码,如果不指定则使用默认模型 Returns: str: 任务ID """ try: task_id = str(uuid.uuid4()) # 在数据库中创建任务记录 self._save_task_to_db(task_id, meeting_id, user_prompt, prompt_id) # 将任务详情存入Redis,用于快速查询状态 current_time = datetime.now().isoformat() task_data = { 'task_id': task_id, 'meeting_id': str(meeting_id), 'user_prompt': user_prompt, 'prompt_id': str(prompt_id) if prompt_id else '', 'model_code': model_code or '', 'status': 'pending', 'progress': '0', 'created_at': current_time, 'updated_at': current_time } self.redis_client.hset(f"llm_task:{task_id}", mapping=task_data) self.redis_client.expire(f"llm_task:{task_id}", 86400) return task_id except Exception as e: print(f"Error starting summary generation: {e}") raise e def _process_task(self, task_id: str): """ 处理单个异步任务的函数,设计为由BackgroundTasks调用。 """ print(f"Background task started for meeting summary task: {task_id}") try: # 从Redis获取任务数据 task_data = self.redis_client.hgetall(f"llm_task:{task_id}") if not task_data: print(f"Error: Task {task_id} not found in Redis for processing.") return meeting_id = int(task_data['meeting_id']) user_prompt = task_data.get('user_prompt', '') prompt_id_str = task_data.get('prompt_id', '') prompt_id = int(prompt_id_str) if prompt_id_str and prompt_id_str != '' else None model_code = task_data.get('model_code', '') or None # 1. 更新状态为processing self._update_task_status_in_redis(task_id, 'processing', 10, message="任务已开始...") # 2. 获取会议转录内容 self._update_task_status_in_redis(task_id, 'processing', 30, message="获取会议转录内容...") transcript_text = self._get_meeting_transcript(meeting_id) if not transcript_text: raise Exception("无法获取会议转录内容") # 3. 构建提示词 self._update_task_status_in_redis(task_id, 'processing', 40, message="准备AI提示词...") full_prompt = self._build_prompt(transcript_text, user_prompt, prompt_id) # 4. 调用LLM API(支持指定模型) self._update_task_status_in_redis(task_id, 'processing', 50, message="AI正在分析会议内容...") if model_code: summary_content = self._call_llm_with_model(full_prompt, model_code) else: summary_content = self.llm_service._call_llm_api(full_prompt) if not summary_content: raise Exception("LLM API调用失败或返回空内容") # 5. 保存结果到主表 self._update_task_status_in_redis(task_id, 'processing', 90, message="保存总结结果...") self._save_summary_to_db(meeting_id, summary_content, user_prompt, prompt_id) # 6. 导出MD文件到音频同目录 self._update_task_status_in_redis(task_id, 'processing', 95, message="导出Markdown文件...") md_path = self._export_summary_md(meeting_id, summary_content) # 7. 任务完成,result保存MD文件路径 self._update_task_in_db(task_id, 'completed', 100, result=md_path) self._update_task_status_in_redis(task_id, 'completed', 100, result=md_path) print(f"Task {task_id} completed successfully") except Exception as e: error_msg = str(e) print(f"Task {task_id} failed: {error_msg}") # 更新失败状态 self._update_task_in_db(task_id, 'failed', 0, error_message=error_msg) self._update_task_status_in_redis(task_id, 'failed', 0, error_message=error_msg) def monitor_and_auto_summarize( self, meeting_id: int, transcription_task_id: str, prompt_id: Optional[int] = None, model_code: Optional[str] = None ): """ 监控转录任务,完成后自动生成总结 此方法设计为由BackgroundTasks调用,在后台运行 Args: meeting_id: 会议ID transcription_task_id: 转录任务ID prompt_id: 提示词模版ID(可选,如果不指定则使用默认模版) model_code: 总结模型编码(可选,如果不指定则使用默认模型) 流程: 1. 循环轮询转录任务状态 2. 转录成功后自动启动总结任务 3. 转录失败或超时则停止轮询并记录日志 """ print(f"[Monitor] Started monitoring transcription task {transcription_task_id} for meeting {meeting_id}, prompt_id: {prompt_id}, model_code: {model_code}") # 获取配置参数 poll_interval = TRANSCRIPTION_POLL_CONFIG['poll_interval'] max_wait_time = TRANSCRIPTION_POLL_CONFIG['max_wait_time'] max_polls = max_wait_time // poll_interval # 延迟导入以避免循环导入 transcription_service = AsyncTranscriptionService() poll_count = 0 try: while poll_count < max_polls: poll_count += 1 elapsed_time = poll_count * poll_interval try: # 查询转录任务状态 status_info = transcription_service.get_task_status(transcription_task_id) current_status = status_info.get('status', 'unknown') progress = status_info.get('progress', 0) print(f"[Monitor] Poll {poll_count}/{max_polls} - Status: {current_status}, Progress: {progress}%, Elapsed: {elapsed_time}s") # 检查转录是否完成 if current_status == 'completed': print(f"[Monitor] Transcription completed successfully for meeting {meeting_id}") # 防止并发:检查是否已经有总结任务存在 existing_task = self._get_existing_summary_task(meeting_id) if existing_task: print(f"[Monitor] Summary task already exists for meeting {meeting_id}, task_id: {existing_task}, skipping duplicate task creation") else: # 启动总结任务 try: summary_task_id = self.start_summary_generation( meeting_id, user_prompt="", prompt_id=prompt_id, model_code=model_code ) print(f"[Monitor] Summary task {summary_task_id} started for meeting {meeting_id}") # 在后台执行总结任务 self._process_task(summary_task_id) except Exception as e: error_msg = f"Failed to start summary generation: {e}" print(f"[Monitor] {error_msg}") # 监控任务完成,退出循环 break # 检查转录是否失败 elif current_status == 'failed': error_msg = status_info.get('error_message', 'Unknown error') print(f"[Monitor] Transcription failed for meeting {meeting_id}: {error_msg}") # 转录失败,停止监控 break # 转录还在进行中(pending/processing),继续等待 elif current_status in ['pending', 'processing']: # 等待一段时间后继续轮询 time.sleep(poll_interval) else: # 未知状态 print(f"[Monitor] Unknown transcription status: {current_status}") time.sleep(poll_interval) except Exception as e: print(f"[Monitor] Error checking transcription status: {e}") # 出错后等待一段时间继续尝试 time.sleep(poll_interval) # 检查是否超时 if poll_count >= max_polls: print(f"[Monitor] Transcription monitoring timed out after {max_wait_time}s for meeting {meeting_id}") except Exception as e: print(f"[Monitor] Fatal error in monitor_and_auto_summarize: {e}") # --- 会议相关方法 --- def _call_llm_with_model(self, prompt: str, model_code: str) -> Optional[str]: """使用指定模型编码调用LLM API""" import requests try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute( "SELECT endpoint_url, api_key, llm_model_name, llm_timeout, llm_temperature, llm_top_p, llm_max_tokens FROM llm_model_config WHERE model_code = %s AND is_active = 1", (model_code,) ) config = cursor.fetchone() if not config: print(f"模型 {model_code} 未找到或未激活,回退到默认模型") return self.llm_service._call_llm_api(prompt) endpoint_url = (config['endpoint_url'] or '').rstrip('/') if not endpoint_url.endswith('/chat/completions'): endpoint_url = f"{endpoint_url}/chat/completions" headers = {"Content-Type": "application/json"} if config['api_key']: headers["Authorization"] = f"Bearer {config['api_key']}" payload = { "model": config['llm_model_name'], "messages": [{"role": "user", "content": prompt}], "temperature": float(config.get('llm_temperature', 0.7)), "top_p": float(config.get('llm_top_p', 0.9)), "max_tokens": int(config.get('llm_max_tokens', 4096)), "stream": False, } response = requests.post( endpoint_url, headers=headers, json=payload, timeout=int(config.get('llm_timeout', 120)), ) response.raise_for_status() return self.llm_service._extract_response_text(response.json()) except Exception as e: print(f"使用模型 {model_code} 调用失败: {e}") return None def _export_summary_md(self, meeting_id: int, summary_content: str) -> Optional[str]: """将总结内容导出为MD文件,保存到音频同目录,返回文件路径""" try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) cursor.execute("SELECT title FROM meetings WHERE meeting_id = %s", (meeting_id,)) meeting = cursor.fetchone() cursor.execute("SELECT file_path FROM audio_files WHERE meeting_id = %s LIMIT 1", (meeting_id,)) audio = cursor.fetchone() title = meeting['title'] if meeting else f"meeting_{meeting_id}" # 始终以 AUDIO_DIR 为基准,避免数据库中的绝对路径指向不可写目录 if audio and audio.get('file_path'): audio_path = Path(audio['file_path']) # 提取 meeting_id 层级的子目录(如 "226" 或 "226/sub") try: rel = audio_path.relative_to(AUDIO_DIR) md_dir = AUDIO_DIR / rel.parent except ValueError: # file_path 不在 AUDIO_DIR 下(如 Docker 绝对路径),取最后一级目录名 md_dir = AUDIO_DIR / audio_path.parent.name else: md_dir = AUDIO_DIR / str(meeting_id) md_dir.mkdir(parents=True, exist_ok=True) safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_', '.')).strip() if not safe_title: safe_title = f"meeting_{meeting_id}" md_path = md_dir / f"{safe_title}_总结.md" md_path.write_text(summary_content, encoding='utf-8') md_path_str = str(md_path) print(f"总结MD文件已保存: {md_path_str}") return md_path_str except Exception as e: print(f"导出总结MD文件失败: {e}") return None def _get_meeting_transcript(self, meeting_id: int) -> str: """从数据库获取会议转录内容""" try: with get_db_connection() as connection: cursor = connection.cursor() query = """ SELECT speaker_tag, start_time_ms, end_time_ms, text_content FROM transcript_segments WHERE meeting_id = %s ORDER BY start_time_ms """ cursor.execute(query, (meeting_id,)) segments = cursor.fetchall() if not segments: return "" # 组装转录文本 transcript_lines = [] for speaker_tag, start_time, end_time, text in segments: # 将毫秒转换为分:秒格式 start_min = start_time // 60000 start_sec = (start_time % 60000) // 1000 transcript_lines.append(f"[{start_min:02d}:{start_sec:02d}] 说话人{speaker_tag}: {text}") return "\n".join(transcript_lines) except Exception as e: print(f"获取会议转录内容错误: {e}") return "" def _build_prompt(self, transcript_text: str, user_prompt: str, prompt_id: Optional[int] = None) -> str: """ 构建完整的提示词 使用数据库中配置的MEETING_TASK提示词模板 Args: transcript_text: 会议转录文本 user_prompt: 用户额外提示词 prompt_id: 可选的提示词模版ID,如果不指定则使用默认模版 """ # 从数据库获取会议任务的提示词模板(支持指定prompt_id) system_prompt = self.llm_service.get_task_prompt('MEETING_TASK', prompt_id=prompt_id) prompt = f"{system_prompt}\n\n" if user_prompt: prompt += f"用户额外要求:{user_prompt}\n\n" prompt += f"会议转录内容:\n{transcript_text}\n\n请根据以上内容生成会议总结:" return prompt def _save_summary_to_db(self, meeting_id: int, summary_content: str, user_prompt: str, prompt_id: Optional[int] = None) -> Optional[int]: """保存总结到数据库 - 更新meetings表的summary、user_prompt、prompt_id和updated_at字段""" try: with get_db_connection() as connection: cursor = connection.cursor() # 更新meetings表的summary、user_prompt、prompt_id和updated_at字段 update_query = """ UPDATE meetings SET summary = %s, user_prompt = %s, prompt_id = %s, updated_at = NOW() WHERE meeting_id = %s """ cursor.execute(update_query, (summary_content, user_prompt, prompt_id, meeting_id)) connection.commit() print(f"成功保存会议总结到meetings表,meeting_id: {meeting_id}, prompt_id: {prompt_id}") return meeting_id except Exception as e: print(f"保存总结到数据库错误: {e}") return None # --- 状态查询和数据库操作方法 --- def get_task_status(self, task_id: str) -> Dict[str, Any]: """获取任务状态""" try: task_data = self.redis_client.hgetall(f"llm_task:{task_id}") if not task_data: task_data = self._get_task_from_db(task_id) if not task_data: return {'task_id': task_id, 'status': 'not_found', 'error_message': 'Task not found'} return { 'task_id': task_id, 'status': task_data.get('status', 'unknown'), 'progress': int(task_data.get('progress', 0)), 'meeting_id': int(task_data.get('meeting_id', 0)), 'created_at': task_data.get('created_at'), 'updated_at': task_data.get('updated_at'), 'message': task_data.get('message'), 'result': task_data.get('result'), 'error_message': task_data.get('error_message') } except Exception as e: print(f"Error getting task status: {e}") return {'task_id': task_id, 'status': 'error', 'error_message': str(e)} def get_meeting_llm_tasks(self, meeting_id: int) -> List[Dict[str, Any]]: """获取会议的所有LLM任务""" try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) query = "SELECT task_id, status, progress, user_prompt, created_at, completed_at, error_message FROM llm_tasks WHERE meeting_id = %s ORDER BY created_at DESC" cursor.execute(query, (meeting_id,)) tasks = cursor.fetchall() for task in tasks: if task.get('created_at'): task['created_at'] = task['created_at'].isoformat() if task.get('completed_at'): task['completed_at'] = task['completed_at'].isoformat() return tasks except Exception as e: print(f"Error getting meeting LLM tasks: {e}") return [] def get_meeting_llm_status(self, meeting_id: int) -> Optional[Dict[str, Any]]: """ 获取会议的最新LLM任务状态(与transcription对齐) Args: meeting_id: 会议ID Returns: Optional[Dict]: 任务状态信息,如果没有任务返回None """ try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) # 查询最新的LLM任务 query = """ SELECT task_id, status, progress, created_at, completed_at, error_message FROM llm_tasks WHERE meeting_id = %s ORDER BY created_at DESC LIMIT 1 """ cursor.execute(query, (meeting_id,)) task_record = cursor.fetchone() cursor.close() if not task_record: return None # 如果任务还在进行中,获取最新状态 if task_record['status'] in ['pending', 'processing']: try: return self.get_task_status(task_record['task_id']) except Exception as e: print(f"Failed to get latest LLM task status for meeting {meeting_id}, returning DB status. Error: {e}") return { 'task_id': task_record['task_id'], 'status': task_record['status'], 'progress': task_record['progress'] or 0, 'meeting_id': meeting_id, 'created_at': task_record['created_at'].isoformat() if task_record['created_at'] else None, 'completed_at': task_record['completed_at'].isoformat() if task_record['completed_at'] else None, 'error_message': task_record['error_message'] } except Exception as e: print(f"Error getting meeting LLM status: {e}") return None def _update_task_status_in_redis(self, task_id: str, status: str, progress: int, message: str = None, result: str = None, error_message: str = None): """更新Redis中的任务状态""" try: update_data = { 'status': status, 'progress': str(progress), 'updated_at': datetime.now().isoformat() } if message: update_data['message'] = message if result: update_data['result'] = result if error_message: update_data['error_message'] = error_message self.redis_client.hset(f"llm_task:{task_id}", mapping=update_data) except Exception as e: print(f"Error updating task status in Redis: {e}") def _save_task_to_db(self, task_id: str, meeting_id: int, user_prompt: str, prompt_id: Optional[int] = None): """保存任务到数据库 Args: task_id: 任务ID meeting_id: 会议ID user_prompt: 用户额外提示词 prompt_id: 可选的提示词模版ID,如果为None则使用默认模版 """ try: with get_db_connection() as connection: cursor = connection.cursor() insert_query = "INSERT INTO llm_tasks (task_id, meeting_id, user_prompt, prompt_id, status, progress, created_at) VALUES (%s, %s, %s, %s, 'pending', 0, NOW())" cursor.execute(insert_query, (task_id, meeting_id, user_prompt, prompt_id)) connection.commit() print(f"[Meeting Service] Task saved successfully to database") except Exception as e: print(f"Error saving task to database: {e}") raise def _update_task_in_db(self, task_id: str, status: str, progress: int, result: str = None, error_message: str = None): """更新数据库中的任务状态""" try: with get_db_connection() as connection: cursor = connection.cursor() if status == 'completed': query = "UPDATE llm_tasks SET status = %s, progress = %s, result = %s, error_message = NULL, completed_at = NOW() WHERE task_id = %s" params = (status, progress, result, task_id) else: query = "UPDATE llm_tasks SET status = %s, progress = %s, error_message = %s WHERE task_id = %s" params = (status, progress, error_message, task_id) cursor.execute(query, params) connection.commit() except Exception as e: print(f"Error updating task in database: {e}") def _get_task_from_db(self, task_id: str) -> Optional[Dict[str, str]]: """从数据库获取任务信息""" try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) query = "SELECT * FROM llm_tasks WHERE task_id = %s" cursor.execute(query, (task_id,)) task = cursor.fetchone() if task: # 确保所有字段都是字符串,以匹配Redis的行为 return {k: v.isoformat() if isinstance(v, datetime) else str(v) for k, v in task.items()} return None except Exception as e: print(f"Error getting task from database: {e}") return None def _get_existing_summary_task(self, meeting_id: int) -> Optional[str]: """ 检查会议是否已经有总结任务(用于并发控制) 返回最新的pending或processing状态的任务ID,如果没有则返回None """ try: with get_db_connection() as connection: cursor = connection.cursor(dictionary=True) query = """ SELECT task_id FROM llm_tasks WHERE meeting_id = %s AND status IN ('pending', 'processing') ORDER BY created_at DESC LIMIT 1 """ cursor.execute(query, (meeting_id,)) result = cursor.fetchone() cursor.close() if result: return result['task_id'] return None except Exception as e: print(f"Error checking existing summary task: {e}") return None # 创建全局实例 async_meeting_service = AsyncMeetingService()