687 lines
30 KiB
Python
687 lines
30 KiB
Python
"""
|
||
异步会议服务 - 处理会议总结生成的异步任务
|
||
采用FastAPI BackgroundTasks模式
|
||
"""
|
||
import uuid
|
||
import time
|
||
import os
|
||
import re
|
||
from datetime import datetime
|
||
from typing import Optional, Dict, Any, List
|
||
from pathlib import Path
|
||
|
||
import redis
|
||
from app.core.config import REDIS_CONFIG, TRANSCRIPTION_POLL_CONFIG, AUDIO_DIR
|
||
from app.core.database import get_db_connection
|
||
from app.services.async_transcription_service import AsyncTranscriptionService
|
||
from app.services.llm_service import LLMService
|
||
|
||
class AsyncMeetingService:
|
||
"""异步会议服务类 - 处理会议相关的异步任务"""
|
||
|
||
def __init__(self):
|
||
# 确保redis客户端自动解码响应,代码更简洁
|
||
if 'decode_responses' not in REDIS_CONFIG:
|
||
REDIS_CONFIG['decode_responses'] = True
|
||
self.redis_client = redis.Redis(**REDIS_CONFIG)
|
||
self.llm_service = LLMService() # 复用现有的同步LLM服务
|
||
|
||
def start_summary_generation(self, meeting_id: int, user_prompt: str = "", prompt_id: Optional[int] = None, model_code: Optional[str] = None) -> str:
|
||
"""
|
||
创建异步总结任务,任务的执行将由外部(如API层的BackgroundTasks)触发。
|
||
|
||
Args:
|
||
meeting_id: 会议ID
|
||
user_prompt: 用户额外提示词
|
||
prompt_id: 可选的提示词模版ID,如果不指定则使用默认模版
|
||
model_code: 可选的LLM模型编码,如果不指定则使用默认模型
|
||
|
||
Returns:
|
||
str: 任务ID
|
||
"""
|
||
|
||
try:
|
||
task_id = str(uuid.uuid4())
|
||
|
||
# 在数据库中创建任务记录
|
||
self._save_task_to_db(task_id, meeting_id, user_prompt, prompt_id)
|
||
|
||
# 将任务详情存入Redis,用于快速查询状态
|
||
current_time = datetime.now().isoformat()
|
||
task_data = {
|
||
'task_id': task_id,
|
||
'meeting_id': str(meeting_id),
|
||
'user_prompt': user_prompt,
|
||
'prompt_id': str(prompt_id) if prompt_id else '',
|
||
'model_code': model_code or '',
|
||
'status': 'pending',
|
||
'progress': '0',
|
||
'created_at': current_time,
|
||
'updated_at': current_time
|
||
}
|
||
self.redis_client.hset(f"llm_task:{task_id}", mapping=task_data)
|
||
self.redis_client.expire(f"llm_task:{task_id}", 86400)
|
||
|
||
return task_id
|
||
|
||
except Exception as e:
|
||
print(f"Error starting summary generation: {e}")
|
||
raise e
|
||
|
||
def _process_task(self, task_id: str):
|
||
"""
|
||
处理单个异步任务的函数,设计为由BackgroundTasks调用。
|
||
"""
|
||
print(f"Background task started for meeting summary task: {task_id}")
|
||
try:
|
||
# 从Redis获取任务数据
|
||
task_data = self.redis_client.hgetall(f"llm_task:{task_id}")
|
||
if not task_data:
|
||
print(f"Error: Task {task_id} not found in Redis for processing.")
|
||
return
|
||
|
||
meeting_id = int(task_data['meeting_id'])
|
||
user_prompt = task_data.get('user_prompt', '')
|
||
prompt_id_str = task_data.get('prompt_id', '')
|
||
prompt_id = int(prompt_id_str) if prompt_id_str and prompt_id_str != '' else None
|
||
model_code = task_data.get('model_code', '') or None
|
||
|
||
# 1. 更新状态为processing
|
||
self._update_task_status_in_redis(task_id, 'processing', 10, message="任务已开始...")
|
||
|
||
# 2. 获取会议转录内容
|
||
self._update_task_status_in_redis(task_id, 'processing', 30, message="获取会议转录内容...")
|
||
transcript_text = self._get_meeting_transcript(meeting_id)
|
||
if not transcript_text:
|
||
raise Exception("无法获取会议转录内容")
|
||
|
||
# 3. 构建提示词
|
||
self._update_task_status_in_redis(task_id, 'processing', 40, message="准备AI提示词...")
|
||
full_prompt = self._build_prompt(meeting_id, transcript_text, user_prompt, prompt_id)
|
||
|
||
# 4. 调用LLM API(支持指定模型)
|
||
self._update_task_status_in_redis(task_id, 'processing', 50, message="AI正在分析会议内容...")
|
||
if model_code:
|
||
summary_content = self._call_llm_with_model(full_prompt, model_code)
|
||
else:
|
||
summary_content = self.llm_service._call_llm_api(full_prompt)
|
||
if not summary_content:
|
||
raise Exception("LLM API调用失败或返回空内容")
|
||
|
||
# 5. 保存结果到主表
|
||
self._update_task_status_in_redis(task_id, 'processing', 90, message="保存总结结果...")
|
||
self._save_summary_to_db(meeting_id, summary_content, user_prompt, prompt_id)
|
||
|
||
# 6. 导出MD文件到音频同目录
|
||
self._update_task_status_in_redis(task_id, 'processing', 95, message="导出Markdown文件...")
|
||
md_path = self._export_summary_md(meeting_id, summary_content)
|
||
|
||
# 7. 任务完成,result保存MD文件路径
|
||
self._update_task_in_db(task_id, 'completed', 100, result=md_path)
|
||
self._update_task_status_in_redis(task_id, 'completed', 100, result=md_path)
|
||
print(f"Task {task_id} completed successfully")
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
print(f"Task {task_id} failed: {error_msg}")
|
||
# 更新失败状态
|
||
self._update_task_in_db(task_id, 'failed', 0, error_message=error_msg)
|
||
self._update_task_status_in_redis(task_id, 'failed', 0, error_message=error_msg)
|
||
|
||
def monitor_and_auto_summarize(
|
||
self,
|
||
meeting_id: int,
|
||
transcription_task_id: str,
|
||
prompt_id: Optional[int] = None,
|
||
model_code: Optional[str] = None
|
||
):
|
||
"""
|
||
监控转录任务,完成后自动生成总结
|
||
此方法设计为由BackgroundTasks调用,在后台运行
|
||
|
||
Args:
|
||
meeting_id: 会议ID
|
||
transcription_task_id: 转录任务ID
|
||
prompt_id: 提示词模版ID(可选,如果不指定则使用默认模版)
|
||
model_code: 总结模型编码(可选,如果不指定则使用默认模型)
|
||
|
||
流程:
|
||
1. 循环轮询转录任务状态
|
||
2. 转录成功后自动启动总结任务
|
||
3. 转录失败或超时则停止轮询并记录日志
|
||
"""
|
||
print(f"[Monitor] Started monitoring transcription task {transcription_task_id} for meeting {meeting_id}, prompt_id: {prompt_id}, model_code: {model_code}")
|
||
|
||
# 获取配置参数
|
||
poll_interval = TRANSCRIPTION_POLL_CONFIG['poll_interval']
|
||
max_wait_time = TRANSCRIPTION_POLL_CONFIG['max_wait_time']
|
||
max_polls = max_wait_time // poll_interval
|
||
|
||
# 延迟导入以避免循环导入
|
||
transcription_service = AsyncTranscriptionService()
|
||
|
||
poll_count = 0
|
||
|
||
try:
|
||
while poll_count < max_polls:
|
||
poll_count += 1
|
||
elapsed_time = poll_count * poll_interval
|
||
|
||
try:
|
||
# 查询转录任务状态
|
||
status_info = transcription_service.get_task_status(transcription_task_id)
|
||
current_status = status_info.get('status', 'unknown')
|
||
progress = status_info.get('progress', 0)
|
||
|
||
print(f"[Monitor] Poll {poll_count}/{max_polls} - Status: {current_status}, Progress: {progress}%, Elapsed: {elapsed_time}s")
|
||
|
||
# 检查转录是否完成
|
||
if current_status == 'completed':
|
||
print(f"[Monitor] Transcription completed successfully for meeting {meeting_id}")
|
||
|
||
# 防止并发:检查是否已经有总结任务存在
|
||
existing_task = self._get_existing_summary_task(meeting_id)
|
||
if existing_task:
|
||
print(f"[Monitor] Summary task already exists for meeting {meeting_id}, task_id: {existing_task}, skipping duplicate task creation")
|
||
else:
|
||
# 启动总结任务
|
||
try:
|
||
summary_task_id = self.start_summary_generation(
|
||
meeting_id,
|
||
user_prompt="",
|
||
prompt_id=prompt_id,
|
||
model_code=model_code
|
||
)
|
||
print(f"[Monitor] Summary task {summary_task_id} started for meeting {meeting_id}")
|
||
|
||
# 在后台执行总结任务
|
||
self._process_task(summary_task_id)
|
||
|
||
except Exception as e:
|
||
error_msg = f"Failed to start summary generation: {e}"
|
||
print(f"[Monitor] {error_msg}")
|
||
|
||
# 监控任务完成,退出循环
|
||
break
|
||
|
||
# 检查转录是否失败
|
||
elif current_status == 'failed':
|
||
error_msg = status_info.get('error_message', 'Unknown error')
|
||
print(f"[Monitor] Transcription failed for meeting {meeting_id}: {error_msg}")
|
||
# 转录失败,停止监控
|
||
break
|
||
|
||
# 转录还在进行中(pending/processing),继续等待
|
||
elif current_status in ['pending', 'processing']:
|
||
# 等待一段时间后继续轮询
|
||
time.sleep(poll_interval)
|
||
|
||
else:
|
||
# 未知状态
|
||
print(f"[Monitor] Unknown transcription status: {current_status}")
|
||
time.sleep(poll_interval)
|
||
|
||
except Exception as e:
|
||
print(f"[Monitor] Error checking transcription status: {e}")
|
||
# 出错后等待一段时间继续尝试
|
||
time.sleep(poll_interval)
|
||
|
||
# 检查是否超时
|
||
if poll_count >= max_polls:
|
||
print(f"[Monitor] Transcription monitoring timed out after {max_wait_time}s for meeting {meeting_id}")
|
||
|
||
except Exception as e:
|
||
print(f"[Monitor] Fatal error in monitor_and_auto_summarize: {e}")
|
||
|
||
# --- 会议相关方法 ---
|
||
|
||
def _call_llm_with_model(self, prompt: str, model_code: str) -> Optional[str]:
|
||
"""使用指定模型编码调用LLM API"""
|
||
import requests
|
||
try:
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor(dictionary=True)
|
||
cursor.execute(
|
||
"SELECT endpoint_url, api_key, llm_model_name, llm_timeout, llm_temperature, llm_top_p, llm_max_tokens FROM llm_model_config WHERE model_code = %s AND is_active = 1",
|
||
(model_code,)
|
||
)
|
||
config = cursor.fetchone()
|
||
if not config:
|
||
print(f"模型 {model_code} 未找到或未激活,回退到默认模型")
|
||
return self.llm_service._call_llm_api(prompt)
|
||
|
||
endpoint_url = (config['endpoint_url'] or '').rstrip('/')
|
||
if not endpoint_url.endswith('/chat/completions'):
|
||
endpoint_url = f"{endpoint_url}/chat/completions"
|
||
|
||
headers = {"Content-Type": "application/json"}
|
||
if config['api_key']:
|
||
headers["Authorization"] = f"Bearer {config['api_key']}"
|
||
|
||
payload = {
|
||
"model": config['llm_model_name'],
|
||
"messages": [{"role": "user", "content": prompt}],
|
||
"temperature": float(config.get('llm_temperature', 0.7)),
|
||
"top_p": float(config.get('llm_top_p', 0.9)),
|
||
"max_tokens": int(config.get('llm_max_tokens', 4096)),
|
||
"stream": False,
|
||
}
|
||
response = requests.post(
|
||
endpoint_url,
|
||
headers=headers,
|
||
json=payload,
|
||
timeout=int(config.get('llm_timeout', 120)),
|
||
)
|
||
response.raise_for_status()
|
||
return self.llm_service._extract_response_text(response.json())
|
||
except Exception as e:
|
||
print(f"使用模型 {model_code} 调用失败: {e}")
|
||
return None
|
||
|
||
def _export_summary_md(self, meeting_id: int, summary_content: str) -> Optional[str]:
|
||
"""将总结内容导出为MD文件,保存到音频同目录,返回文件路径"""
|
||
try:
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor(dictionary=True)
|
||
cursor.execute("SELECT title FROM meetings WHERE meeting_id = %s", (meeting_id,))
|
||
meeting = cursor.fetchone()
|
||
cursor.execute("SELECT file_path FROM audio_files WHERE meeting_id = %s LIMIT 1", (meeting_id,))
|
||
audio = cursor.fetchone()
|
||
|
||
title = meeting['title'] if meeting else f"meeting_{meeting_id}"
|
||
# 始终以 AUDIO_DIR 为基准,避免数据库中的绝对路径指向不可写目录
|
||
if audio and audio.get('file_path'):
|
||
audio_path = Path(audio['file_path'])
|
||
# 提取 meeting_id 层级的子目录(如 "226" 或 "226/sub")
|
||
try:
|
||
rel = audio_path.relative_to(AUDIO_DIR)
|
||
md_dir = AUDIO_DIR / rel.parent
|
||
except ValueError:
|
||
# file_path 不在 AUDIO_DIR 下(如 Docker 绝对路径),取最后一级目录名
|
||
md_dir = AUDIO_DIR / audio_path.parent.name
|
||
else:
|
||
md_dir = AUDIO_DIR / str(meeting_id)
|
||
|
||
md_dir.mkdir(parents=True, exist_ok=True)
|
||
safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_', '.')).strip()
|
||
if not safe_title:
|
||
safe_title = f"meeting_{meeting_id}"
|
||
md_path = md_dir / f"{safe_title}_总结.md"
|
||
md_path.write_text(summary_content, encoding='utf-8')
|
||
md_path_str = str(md_path)
|
||
print(f"总结MD文件已保存: {md_path_str}")
|
||
return md_path_str
|
||
except Exception as e:
|
||
print(f"导出总结MD文件失败: {e}")
|
||
return None
|
||
|
||
def _get_meeting_transcript(self, meeting_id: int) -> str:
|
||
"""从数据库获取会议转录内容"""
|
||
try:
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor()
|
||
query = """
|
||
SELECT speaker_tag, start_time_ms, end_time_ms, text_content
|
||
FROM transcript_segments
|
||
WHERE meeting_id = %s
|
||
ORDER BY start_time_ms
|
||
"""
|
||
cursor.execute(query, (meeting_id,))
|
||
segments = cursor.fetchall()
|
||
|
||
if not segments:
|
||
return ""
|
||
|
||
# 组装转录文本
|
||
transcript_lines = []
|
||
for speaker_tag, start_time, end_time, text in segments:
|
||
# 将毫秒转换为分:秒格式
|
||
start_min = start_time // 60000
|
||
start_sec = (start_time % 60000) // 1000
|
||
transcript_lines.append(f"[{start_min:02d}:{start_sec:02d}] 说话人{speaker_tag}: {text}")
|
||
|
||
return "\n".join(transcript_lines)
|
||
|
||
except Exception as e:
|
||
print(f"获取会议转录内容错误: {e}")
|
||
return ""
|
||
|
||
def _get_meeting_prompt_context(self, meeting_id: int) -> Dict[str, Any]:
|
||
try:
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor(dictionary=True)
|
||
cursor.execute(
|
||
"""
|
||
SELECT m.title, m.meeting_time, u.caption AS creator_name
|
||
FROM meetings m
|
||
LEFT JOIN sys_users u ON m.user_id = u.user_id
|
||
WHERE m.meeting_id = %s
|
||
""",
|
||
(meeting_id,)
|
||
)
|
||
meeting = cursor.fetchone()
|
||
|
||
if not meeting:
|
||
cursor.close()
|
||
return {}
|
||
|
||
meeting_time = meeting.get('meeting_time')
|
||
meeting_time_text = ''
|
||
if isinstance(meeting_time, datetime):
|
||
meeting_time_text = meeting_time.strftime('%Y-%m-%d %H:%M:%S')
|
||
elif meeting_time:
|
||
meeting_time_text = str(meeting_time)
|
||
|
||
cursor.execute(
|
||
"""
|
||
SELECT u.caption
|
||
FROM attendees a
|
||
JOIN sys_users u ON a.user_id = u.user_id
|
||
WHERE a.meeting_id = %s
|
||
ORDER BY u.caption ASC
|
||
""",
|
||
(meeting_id,)
|
||
)
|
||
attendee_rows = cursor.fetchall()
|
||
attendee_names = [row.get('caption') for row in attendee_rows if row.get('caption')]
|
||
attendee_text = '、'.join(attendee_names)
|
||
cursor.close()
|
||
|
||
return {
|
||
'title': meeting.get('title') or '',
|
||
'meeting_time': meeting_time_text,
|
||
'meeting_time_value': meeting_time,
|
||
'creator_name': meeting.get('creator_name') or '',
|
||
'attendees': attendee_text
|
||
}
|
||
except Exception as e:
|
||
print(f"获取会议提示词上下文错误: {e}")
|
||
return {}
|
||
|
||
def _format_meeting_time_value(self, meeting_time_value: Any, custom_format: Optional[str] = None) -> str:
|
||
default_format = '%Y-%m-%d %H:%M:%S'
|
||
|
||
if isinstance(meeting_time_value, datetime):
|
||
try:
|
||
return meeting_time_value.strftime(custom_format or default_format)
|
||
except Exception:
|
||
return meeting_time_value.strftime(default_format)
|
||
|
||
if meeting_time_value:
|
||
meeting_time_text = str(meeting_time_value)
|
||
if custom_format:
|
||
try:
|
||
parsed_time = datetime.fromisoformat(meeting_time_text.replace('Z', '+00:00'))
|
||
return parsed_time.strftime(custom_format)
|
||
except Exception:
|
||
return meeting_time_text
|
||
return meeting_time_text
|
||
|
||
return ''
|
||
|
||
def _apply_prompt_variables(self, template: str, variables: Dict[str, Any]) -> str:
|
||
if not template:
|
||
return template
|
||
|
||
rendered = re.sub(
|
||
r"\{\{\s*meeting_time\s*:\s*([^{}]+?)\s*\}\}",
|
||
lambda match: self._format_meeting_time_value(
|
||
variables.get('meeting_time_value'),
|
||
match.group(1).strip()
|
||
),
|
||
template
|
||
)
|
||
|
||
for key, value in variables.items():
|
||
if key == 'meeting_time_value':
|
||
continue
|
||
rendered = rendered.replace(f"{{{{{key}}}}}", value or '')
|
||
rendered = rendered.replace(f"{{{{ {key} }}}}", value or '')
|
||
return rendered
|
||
|
||
def _build_prompt(self, meeting_id: int, transcript_text: str, user_prompt: str, prompt_id: Optional[int] = None) -> str:
|
||
"""
|
||
构建完整的提示词
|
||
使用数据库中配置的MEETING_TASK提示词模板
|
||
|
||
Args:
|
||
meeting_id: 会议ID
|
||
transcript_text: 会议转录文本
|
||
user_prompt: 用户额外提示词
|
||
prompt_id: 可选的提示词模版ID,如果不指定则使用默认模版
|
||
"""
|
||
# 从数据库获取会议任务的提示词模板(支持指定prompt_id)
|
||
system_prompt = self.llm_service.get_task_prompt('MEETING_TASK', prompt_id=prompt_id)
|
||
meeting_context = self._get_meeting_prompt_context(meeting_id)
|
||
prompt_variables = {
|
||
'meeting_id': str(meeting_id),
|
||
'meeting_title': meeting_context.get('title', ''),
|
||
'meeting_time': meeting_context.get('meeting_time', ''),
|
||
'meeting_creator': meeting_context.get('creator_name', ''),
|
||
'meeting_attendees': meeting_context.get('attendees', ''),
|
||
'meeting_time_value': meeting_context.get('meeting_time_value')
|
||
}
|
||
system_prompt = self._apply_prompt_variables(system_prompt, prompt_variables)
|
||
rendered_user_prompt = self._apply_prompt_variables(user_prompt, prompt_variables) if user_prompt else ''
|
||
|
||
prompt = f"{system_prompt}\n\n"
|
||
|
||
if rendered_user_prompt:
|
||
prompt += f"用户额外要求:{rendered_user_prompt}\n\n"
|
||
|
||
prompt += f"会议转录内容:\n{transcript_text}\n\n请根据以上内容生成会议总结:"
|
||
|
||
return prompt
|
||
|
||
def _save_summary_to_db(self, meeting_id: int, summary_content: str, user_prompt: str, prompt_id: Optional[int] = None) -> Optional[int]:
|
||
"""保存总结到数据库 - 更新meetings表的summary、user_prompt、prompt_id和updated_at字段"""
|
||
try:
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor()
|
||
|
||
# 更新meetings表的summary、user_prompt、prompt_id和updated_at字段
|
||
update_query = """
|
||
UPDATE meetings
|
||
SET summary = %s, user_prompt = %s, prompt_id = %s, updated_at = NOW()
|
||
WHERE meeting_id = %s
|
||
"""
|
||
cursor.execute(update_query, (summary_content, user_prompt, prompt_id, meeting_id))
|
||
connection.commit()
|
||
|
||
print(f"成功保存会议总结到meetings表,meeting_id: {meeting_id}, prompt_id: {prompt_id}")
|
||
return meeting_id
|
||
|
||
except Exception as e:
|
||
print(f"保存总结到数据库错误: {e}")
|
||
return None
|
||
|
||
# --- 状态查询和数据库操作方法 ---
|
||
|
||
def get_task_status(self, task_id: str) -> Dict[str, Any]:
|
||
"""获取任务状态"""
|
||
try:
|
||
task_data = self.redis_client.hgetall(f"llm_task:{task_id}")
|
||
if not task_data:
|
||
task_data = self._get_task_from_db(task_id)
|
||
if not task_data:
|
||
return {'task_id': task_id, 'status': 'not_found', 'error_message': 'Task not found'}
|
||
|
||
return {
|
||
'task_id': task_id,
|
||
'status': task_data.get('status', 'unknown'),
|
||
'progress': int(task_data.get('progress', 0)),
|
||
'meeting_id': int(task_data.get('meeting_id', 0)),
|
||
'created_at': task_data.get('created_at'),
|
||
'updated_at': task_data.get('updated_at'),
|
||
'message': task_data.get('message'),
|
||
'result': task_data.get('result'),
|
||
'error_message': task_data.get('error_message')
|
||
}
|
||
except Exception as e:
|
||
print(f"Error getting task status: {e}")
|
||
return {'task_id': task_id, 'status': 'error', 'error_message': str(e)}
|
||
|
||
def get_meeting_llm_tasks(self, meeting_id: int) -> List[Dict[str, Any]]:
|
||
"""获取会议的所有LLM任务"""
|
||
try:
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor(dictionary=True)
|
||
query = "SELECT task_id, status, progress, user_prompt, created_at, completed_at, error_message FROM llm_tasks WHERE meeting_id = %s ORDER BY created_at DESC"
|
||
cursor.execute(query, (meeting_id,))
|
||
tasks = cursor.fetchall()
|
||
for task in tasks:
|
||
if task.get('created_at'): task['created_at'] = task['created_at'].isoformat()
|
||
if task.get('completed_at'): task['completed_at'] = task['completed_at'].isoformat()
|
||
return tasks
|
||
except Exception as e:
|
||
print(f"Error getting meeting LLM tasks: {e}")
|
||
return []
|
||
|
||
def get_meeting_llm_status(self, meeting_id: int) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
获取会议的最新LLM任务状态(与transcription对齐)
|
||
|
||
Args:
|
||
meeting_id: 会议ID
|
||
|
||
Returns:
|
||
Optional[Dict]: 任务状态信息,如果没有任务返回None
|
||
"""
|
||
try:
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor(dictionary=True)
|
||
|
||
# 查询最新的LLM任务
|
||
query = """
|
||
SELECT task_id, status, progress, created_at, completed_at, error_message
|
||
FROM llm_tasks
|
||
WHERE meeting_id = %s
|
||
ORDER BY created_at DESC
|
||
LIMIT 1
|
||
"""
|
||
cursor.execute(query, (meeting_id,))
|
||
task_record = cursor.fetchone()
|
||
|
||
cursor.close()
|
||
|
||
if not task_record:
|
||
return None
|
||
|
||
# 如果任务还在进行中,获取最新状态
|
||
if task_record['status'] in ['pending', 'processing']:
|
||
try:
|
||
return self.get_task_status(task_record['task_id'])
|
||
except Exception as e:
|
||
print(f"Failed to get latest LLM task status for meeting {meeting_id}, returning DB status. Error: {e}")
|
||
|
||
return {
|
||
'task_id': task_record['task_id'],
|
||
'status': task_record['status'],
|
||
'progress': task_record['progress'] or 0,
|
||
'meeting_id': meeting_id,
|
||
'created_at': task_record['created_at'].isoformat() if task_record['created_at'] else None,
|
||
'completed_at': task_record['completed_at'].isoformat() if task_record['completed_at'] else None,
|
||
'error_message': task_record['error_message']
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"Error getting meeting LLM status: {e}")
|
||
return None
|
||
|
||
def _update_task_status_in_redis(self, task_id: str, status: str, progress: int, message: str = None, result: str = None, error_message: str = None):
|
||
"""更新Redis中的任务状态"""
|
||
try:
|
||
update_data = {
|
||
'status': status,
|
||
'progress': str(progress),
|
||
'updated_at': datetime.now().isoformat()
|
||
}
|
||
if message: update_data['message'] = message
|
||
if result: update_data['result'] = result
|
||
if error_message: update_data['error_message'] = error_message
|
||
self.redis_client.hset(f"llm_task:{task_id}", mapping=update_data)
|
||
except Exception as e:
|
||
print(f"Error updating task status in Redis: {e}")
|
||
|
||
def _save_task_to_db(self, task_id: str, meeting_id: int, user_prompt: str, prompt_id: Optional[int] = None):
|
||
"""保存任务到数据库
|
||
|
||
Args:
|
||
task_id: 任务ID
|
||
meeting_id: 会议ID
|
||
user_prompt: 用户额外提示词
|
||
prompt_id: 可选的提示词模版ID,如果为None则使用默认模版
|
||
"""
|
||
try:
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor()
|
||
insert_query = "INSERT INTO llm_tasks (task_id, meeting_id, user_prompt, prompt_id, status, progress, created_at) VALUES (%s, %s, %s, %s, 'pending', 0, NOW())"
|
||
cursor.execute(insert_query, (task_id, meeting_id, user_prompt, prompt_id))
|
||
connection.commit()
|
||
print(f"[Meeting Service] Task saved successfully to database")
|
||
except Exception as e:
|
||
print(f"Error saving task to database: {e}")
|
||
raise
|
||
|
||
def _update_task_in_db(self, task_id: str, status: str, progress: int, result: str = None, error_message: str = None):
|
||
"""更新数据库中的任务状态"""
|
||
try:
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor()
|
||
if status == 'completed':
|
||
query = "UPDATE llm_tasks SET status = %s, progress = %s, result = %s, error_message = NULL, completed_at = NOW() WHERE task_id = %s"
|
||
params = (status, progress, result, task_id)
|
||
else:
|
||
query = "UPDATE llm_tasks SET status = %s, progress = %s, error_message = %s WHERE task_id = %s"
|
||
params = (status, progress, error_message, task_id)
|
||
|
||
cursor.execute(query, params)
|
||
connection.commit()
|
||
except Exception as e:
|
||
print(f"Error updating task in database: {e}")
|
||
|
||
def _get_task_from_db(self, task_id: str) -> Optional[Dict[str, str]]:
|
||
"""从数据库获取任务信息"""
|
||
try:
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor(dictionary=True)
|
||
query = "SELECT * FROM llm_tasks WHERE task_id = %s"
|
||
cursor.execute(query, (task_id,))
|
||
task = cursor.fetchone()
|
||
if task:
|
||
# 确保所有字段都是字符串,以匹配Redis的行为
|
||
return {k: v.isoformat() if isinstance(v, datetime) else str(v) for k, v in task.items()}
|
||
return None
|
||
except Exception as e:
|
||
print(f"Error getting task from database: {e}")
|
||
return None
|
||
|
||
def _get_existing_summary_task(self, meeting_id: int) -> Optional[str]:
|
||
"""
|
||
检查会议是否已经有总结任务(用于并发控制)
|
||
返回最新的pending或processing状态的任务ID,如果没有则返回None
|
||
"""
|
||
try:
|
||
with get_db_connection() as connection:
|
||
cursor = connection.cursor(dictionary=True)
|
||
query = """
|
||
SELECT task_id FROM llm_tasks
|
||
WHERE meeting_id = %s AND status IN ('pending', 'processing')
|
||
ORDER BY created_at DESC
|
||
LIMIT 1
|
||
"""
|
||
cursor.execute(query, (meeting_id,))
|
||
result = cursor.fetchone()
|
||
cursor.close()
|
||
|
||
if result:
|
||
return result['task_id']
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"Error checking existing summary task: {e}")
|
||
return None
|
||
|
||
# 创建全局实例
|
||
async_meeting_service = AsyncMeetingService()
|