imetting/backend/app/services/async_meeting_service.py

827 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
异步会议服务 - 处理会议总结生成的异步任务
采用受控线程池执行,避免阻塞 Web 请求进程
"""
import uuid
import time
import re
from datetime import datetime
from typing import Optional, Dict, Any, List
from pathlib import Path
import redis
from app.core.config import REDIS_CONFIG, TRANSCRIPTION_POLL_CONFIG, BACKGROUND_TASK_CONFIG, AUDIO_DIR, BASE_DIR
from app.core.database import get_db_connection
from app.services.async_transcription_service import AsyncTranscriptionService
from app.services.background_task_runner import KeyedBackgroundTaskRunner
from app.services.llm_service import LLMService, LLMServiceError
summary_task_runner = KeyedBackgroundTaskRunner(
max_workers=BACKGROUND_TASK_CONFIG['summary_workers'],
thread_name_prefix="imeeting-summary",
)
monitor_task_runner = KeyedBackgroundTaskRunner(
max_workers=BACKGROUND_TASK_CONFIG['monitor_workers'],
thread_name_prefix="imeeting-monitor",
)
class AsyncMeetingService:
"""异步会议服务类 - 处理会议相关的异步任务"""
def __init__(self):
# 确保redis客户端自动解码响应代码更简洁
if 'decode_responses' not in REDIS_CONFIG:
REDIS_CONFIG['decode_responses'] = True
self.redis_client = redis.Redis(**REDIS_CONFIG)
self.llm_service = LLMService() # 复用现有的同步LLM服务
def enqueue_summary_generation(
self,
meeting_id: int,
user_prompt: str = "",
prompt_id: Optional[int] = None,
model_code: Optional[str] = None
) -> tuple[str, bool]:
"""创建并提交总结任务;若已有运行中的同会议总结任务,则直接返回现有任务。"""
existing_task = self._get_existing_summary_task(meeting_id)
if existing_task:
return existing_task, False
task_id = self.start_summary_generation(meeting_id, user_prompt, prompt_id, model_code)
summary_task_runner.submit(f"meeting-summary:{task_id}", self._process_task, task_id)
return task_id, True
def enqueue_transcription_monitor(
self,
meeting_id: int,
transcription_task_id: str,
prompt_id: Optional[int] = None,
model_code: Optional[str] = None
) -> bool:
"""提交转录监控任务,避免同一转录任务重复轮询。"""
return monitor_task_runner.submit(
f"transcription-monitor:{transcription_task_id}",
self.monitor_and_auto_summarize,
meeting_id,
transcription_task_id,
prompt_id,
model_code,
)
def start_summary_generation(self, meeting_id: int, user_prompt: str = "", prompt_id: Optional[int] = None, model_code: Optional[str] = None) -> str:
"""
创建异步总结任务,任务的执行将由后台线程池触发。
Args:
meeting_id: 会议ID
user_prompt: 用户额外提示词
prompt_id: 可选的提示词模版ID如果不指定则使用默认模版
model_code: 可选的LLM模型编码如果不指定则使用默认模型
Returns:
str: 任务ID
"""
try:
task_id = str(uuid.uuid4())
# 在数据库中创建任务记录
self._save_task_to_db(task_id, meeting_id, user_prompt, prompt_id, model_code)
# 将任务详情存入Redis用于快速查询状态
current_time = datetime.now().isoformat()
task_data = {
'task_id': task_id,
'meeting_id': str(meeting_id),
'user_prompt': user_prompt,
'prompt_id': str(prompt_id) if prompt_id else '',
'model_code': model_code or '',
'status': 'pending',
'progress': '0',
'created_at': current_time,
'updated_at': current_time
}
self.redis_client.hset(f"llm_task:{task_id}", mapping=task_data)
self.redis_client.expire(f"llm_task:{task_id}", 86400)
return task_id
except Exception as e:
print(f"Error starting summary generation: {e}")
raise
def _process_task(self, task_id: str):
"""
处理单个异步任务的函数,在线程池中执行。
"""
print(f"Background task started for meeting summary task: {task_id}")
lock_token = None
lock_key = f"lock:meeting-summary-task:{task_id}"
try:
# 从Redis获取任务数据
task_data = self.redis_client.hgetall(f"llm_task:{task_id}")
if not task_data:
print(f"Error: Task {task_id} not found in Redis for processing.")
return
meeting_id = int(task_data['meeting_id'])
user_prompt = task_data.get('user_prompt', '')
prompt_id_str = task_data.get('prompt_id', '')
prompt_id = int(prompt_id_str) if prompt_id_str and prompt_id_str != '' else None
model_code = task_data.get('model_code', '') or None
lock_token = self._acquire_lock(lock_key, ttl_seconds=7200)
if not lock_token:
print(f"Task {task_id} is already being processed, skipping duplicate execution")
return
# 1. 更新状态为processing
self._update_task_status_in_redis(task_id, 'processing', 10, message="任务已开始...")
# 2. 获取会议转录内容
self._update_task_status_in_redis(task_id, 'processing', 30, message="获取会议转录内容...")
transcript_text = self._get_meeting_transcript(meeting_id)
if not transcript_text:
raise Exception("无法获取会议转录内容")
# 3. 构建消息
self._update_task_status_in_redis(task_id, 'processing', 40, message="准备AI提示词...")
messages = self._build_messages(meeting_id, transcript_text, user_prompt, prompt_id)
# 4. 调用LLM API支持指定模型
self._update_task_status_in_redis(task_id, 'processing', 50, message="AI正在分析会议内容...")
summary_content = self.llm_service.call_llm_api_messages_or_raise(messages, model_code=model_code)
# 5. 保存结果到主表
self._update_task_status_in_redis(task_id, 'processing', 90, message="保存总结结果...")
self._save_summary_to_db(meeting_id, summary_content, user_prompt, prompt_id)
# 6. 导出MD文件到音频同目录
self._update_task_status_in_redis(task_id, 'processing', 95, message="导出Markdown文件...")
md_path = self._export_summary_md(meeting_id, summary_content, task_id=task_id)
if not md_path:
raise RuntimeError("导出Markdown文件失败未生成文件路径")
# 7. 任务完成result保存MD文件路径
self._update_task_in_db(task_id, 'completed', 100, result=md_path)
self._update_task_status_in_redis(task_id, 'completed', 100, message="任务已完成", result=md_path)
print(f"Task {task_id} completed successfully")
except LLMServiceError as e:
error_msg = e.message or str(e)
print(f"Task {task_id} failed with LLM error: {error_msg}")
self._mark_task_failed(task_id, error_msg)
except Exception as e:
error_msg = str(e)
print(f"Task {task_id} failed: {error_msg}")
self._mark_task_failed(task_id, error_msg)
finally:
self._release_lock(lock_key, lock_token)
def monitor_and_auto_summarize(
self,
meeting_id: int,
transcription_task_id: str,
prompt_id: Optional[int] = None,
model_code: Optional[str] = None
):
"""
监控转录任务,完成后自动生成总结
此方法设计为由BackgroundTasks调用在后台运行
Args:
meeting_id: 会议ID
transcription_task_id: 转录任务ID
prompt_id: 提示词模版ID可选如果不指定则使用默认模版
model_code: 总结模型编码(可选,如果不指定则使用默认模型)
流程:
1. 循环轮询转录任务状态
2. 转录成功后自动启动总结任务
3. 转录失败或超时则停止轮询并记录日志
"""
print(f"[Monitor] Started monitoring transcription task {transcription_task_id} for meeting {meeting_id}, prompt_id: {prompt_id}, model_code: {model_code}")
# 获取配置参数
poll_interval = TRANSCRIPTION_POLL_CONFIG['poll_interval']
max_wait_time = TRANSCRIPTION_POLL_CONFIG['max_wait_time']
max_polls = max_wait_time // poll_interval
lock_key = f"lock:transcription-monitor:{transcription_task_id}"
lock_token = self._acquire_lock(lock_key, ttl_seconds=max_wait_time + poll_interval)
if not lock_token:
print(f"[Monitor] Monitor task already running for transcription task {transcription_task_id}, skipping duplicate worker")
return
# 延迟导入以避免循环导入
transcription_service = AsyncTranscriptionService()
poll_count = 0
try:
while poll_count < max_polls:
poll_count += 1
elapsed_time = poll_count * poll_interval
try:
# 查询转录任务状态
status_info = transcription_service.get_task_status(transcription_task_id)
current_status = status_info.get('status', 'unknown')
progress = status_info.get('progress', 0)
print(f"[Monitor] Poll {poll_count}/{max_polls} - Status: {current_status}, Progress: {progress}%, Elapsed: {elapsed_time}s")
# 检查转录是否完成
if current_status == 'completed':
print(f"[Monitor] Transcription completed successfully for meeting {meeting_id}")
# 防止并发:检查是否已经有总结任务存在
existing_task = self._get_existing_summary_task(meeting_id)
if existing_task:
print(f"[Monitor] Summary task already exists for meeting {meeting_id}, task_id: {existing_task}, skipping duplicate task creation")
else:
# 启动总结任务
try:
summary_task_id, created = self.enqueue_summary_generation(
meeting_id,
user_prompt="",
prompt_id=prompt_id,
model_code=model_code
)
if created:
print(f"[Monitor] Summary task {summary_task_id} started for meeting {meeting_id}")
else:
print(f"[Monitor] Reused existing summary task {summary_task_id} for meeting {meeting_id}")
except Exception as e:
error_msg = f"Failed to start summary generation: {e}"
print(f"[Monitor] {error_msg}")
# 监控任务完成,退出循环
break
# 检查转录是否失败
elif current_status == 'failed':
error_msg = status_info.get('error_message', 'Unknown error')
print(f"[Monitor] Transcription failed for meeting {meeting_id}: {error_msg}")
# 转录失败,停止监控
break
# 转录还在进行中pending/processing继续等待
elif current_status in ['pending', 'processing']:
# 等待一段时间后继续轮询
time.sleep(poll_interval)
else:
# 未知状态
print(f"[Monitor] Unknown transcription status: {current_status}")
time.sleep(poll_interval)
except Exception as e:
print(f"[Monitor] Error checking transcription status: {e}")
# 出错后等待一段时间继续尝试
time.sleep(poll_interval)
# 检查是否超时
if poll_count >= max_polls:
print(f"[Monitor] Transcription monitoring timed out after {max_wait_time}s for meeting {meeting_id}")
except Exception as e:
print(f"[Monitor] Fatal error in monitor_and_auto_summarize: {e}")
finally:
self._release_lock(lock_key, lock_token)
# --- 会议相关方法 ---
def _export_summary_md(self, meeting_id: int, summary_content: str, task_id: Optional[str] = None) -> str:
"""将总结内容导出为MD文件保存到音频同目录返回 /uploads/... 相对路径"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT title FROM meetings WHERE meeting_id = %s", (meeting_id,))
meeting = cursor.fetchone()
cursor.execute("SELECT file_path FROM audio_files WHERE meeting_id = %s LIMIT 1", (meeting_id,))
audio = cursor.fetchone()
title = meeting['title'] if meeting else f"meeting_{meeting_id}"
if audio and audio.get('file_path'):
audio_path = BASE_DIR / str(audio['file_path']).lstrip('/')
md_dir = audio_path.parent
else:
md_dir = AUDIO_DIR / str(meeting_id)
md_dir.mkdir(parents=True, exist_ok=True)
safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_', '.')).strip()
if not safe_title:
safe_title = f"meeting_{meeting_id}"
timestamp_suffix = datetime.now().strftime("%Y%m%d_%H%M%S")
task_suffix = ""
if task_id:
task_suffix = f"_{str(task_id).replace('-', '')[:8]}"
md_path = md_dir / f"{safe_title}_总结_{timestamp_suffix}{task_suffix}.md"
md_path.write_text(summary_content, encoding='utf-8')
relative_md_path = "/" + str(md_path.relative_to(BASE_DIR)).replace("\\", "/")
print(f"总结MD文件已保存: {relative_md_path}")
return relative_md_path
except Exception as e:
raise RuntimeError(f"导出总结MD文件失败: {e}") from e
def _get_meeting_transcript(self, meeting_id: int) -> str:
"""从数据库获取会议转录内容"""
try:
with get_db_connection() as connection:
cursor = connection.cursor()
query = """
SELECT speaker_tag, start_time_ms, end_time_ms, text_content
FROM transcript_segments
WHERE meeting_id = %s
ORDER BY start_time_ms
"""
cursor.execute(query, (meeting_id,))
segments = cursor.fetchall()
if not segments:
return ""
# 组装转录文本
transcript_lines = []
for speaker_tag, start_time, end_time, text in segments:
# 将毫秒转换为分:秒格式
start_min = start_time // 60000
start_sec = (start_time % 60000) // 1000
transcript_lines.append(f"[{start_min:02d}:{start_sec:02d}] 说话人{speaker_tag}: {text}")
return "\n".join(transcript_lines)
except Exception as e:
print(f"获取会议转录内容错误: {e}")
return ""
def _get_meeting_prompt_context(self, meeting_id: int) -> Dict[str, Any]:
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute(
"""
SELECT m.title, m.meeting_time, u.caption AS creator_name
FROM meetings m
LEFT JOIN sys_users u ON m.user_id = u.user_id
WHERE m.meeting_id = %s
""",
(meeting_id,)
)
meeting = cursor.fetchone()
if not meeting:
cursor.close()
return {}
meeting_time = meeting.get('meeting_time')
meeting_time_text = ''
if isinstance(meeting_time, datetime):
meeting_time_text = meeting_time.strftime('%Y-%m-%d %H:%M:%S')
elif meeting_time:
meeting_time_text = str(meeting_time)
cursor.execute(
"""
SELECT u.caption
FROM attendees a
JOIN sys_users u ON a.user_id = u.user_id
WHERE a.meeting_id = %s
ORDER BY u.caption ASC
""",
(meeting_id,)
)
attendee_rows = cursor.fetchall()
attendee_names = [row.get('caption') for row in attendee_rows if row.get('caption')]
attendee_text = ''.join(attendee_names)
cursor.close()
return {
'title': meeting.get('title') or '',
'meeting_time': meeting_time_text,
'meeting_time_value': meeting_time,
'creator_name': meeting.get('creator_name') or '',
'attendees': attendee_text
}
except Exception as e:
print(f"获取会议提示词上下文错误: {e}")
return {}
def _format_meeting_time_value(self, meeting_time_value: Any, custom_format: Optional[str] = None) -> str:
default_format = '%Y-%m-%d %H:%M:%S'
if isinstance(meeting_time_value, datetime):
try:
return meeting_time_value.strftime(custom_format or default_format)
except Exception:
return meeting_time_value.strftime(default_format)
if meeting_time_value:
meeting_time_text = str(meeting_time_value)
if custom_format:
try:
parsed_time = datetime.fromisoformat(meeting_time_text.replace('Z', '+00:00'))
return parsed_time.strftime(custom_format)
except Exception:
return meeting_time_text
return meeting_time_text
return ''
def _apply_prompt_variables(self, template: str, variables: Dict[str, Any]) -> str:
if not template:
return template
rendered = re.sub(
r"\{\{\s*meeting_time\s*:\s*([^{}]+?)\s*\}\}",
lambda match: self._format_meeting_time_value(
variables.get('meeting_time_value'),
match.group(1).strip()
),
template
)
for key, value in variables.items():
if key == 'meeting_time_value':
continue
rendered = rendered.replace(f"{{{{{key}}}}}", value or '')
rendered = rendered.replace(f"{{{{ {key} }}}}", value or '')
return rendered
def _build_messages(self, meeting_id: int, transcript_text: str, user_prompt: str, prompt_id: Optional[int] = None) -> List[Dict[str, str]]:
"""
构建会议总结消息数组
使用数据库中配置的 MEETING_TASK 提示词模板作为任务级 system 指令
Args:
meeting_id: 会议ID
transcript_text: 会议转录文本
user_prompt: 用户额外提示词
prompt_id: 可选的提示词模版ID如果不指定则使用默认模版
"""
task_prompt = self.llm_service.get_task_prompt('MEETING_TASK', prompt_id=prompt_id)
meeting_context = self._get_meeting_prompt_context(meeting_id)
prompt_variables = {
'meeting_id': str(meeting_id),
'meeting_title': meeting_context.get('title', ''),
'meeting_time': meeting_context.get('meeting_time', ''),
'meeting_creator': meeting_context.get('creator_name', ''),
'meeting_attendees': meeting_context.get('attendees', ''),
'meeting_time_value': meeting_context.get('meeting_time_value')
}
rendered_task_prompt = self._apply_prompt_variables(task_prompt, prompt_variables)
rendered_user_prompt = self._apply_prompt_variables(user_prompt, prompt_variables) if user_prompt else ''
meeting_info_lines = [
f"会议ID{prompt_variables['meeting_id']}",
f"会议标题:{prompt_variables['meeting_title'] or '未提供'}",
f"会议时间:{prompt_variables['meeting_time'] or '未提供'}",
f"会议创建人:{prompt_variables['meeting_creator'] or '未提供'}",
f"参会人员:{prompt_variables['meeting_attendees'] or '未提供'}",
]
meeting_info_message = "\n".join(meeting_info_lines)
user_requirement_message = rendered_user_prompt or "无额外要求"
messages: List[Dict[str, str]] = []
if rendered_task_prompt:
messages.append({"role": "system", "content": rendered_task_prompt})
messages.append({
"role": "user",
"content": (
"以下是本次会议的上下文信息,请结合这些信息理解会议背景。\n\n"
f"{meeting_info_message}\n\n"
"以下是用户额外要求,如与事实冲突请以转录原文为准:\n"
f"{user_requirement_message}"
)
})
messages.append({
"role": "user",
"content": (
"以下是会议转录原文,请严格依据原文生成会议总结。\n"
"如果信息不足,请明确写出“原文未明确”或“需人工确认”。\n\n"
"<meeting_transcript>\n"
f"{transcript_text}\n"
"</meeting_transcript>"
)
})
return messages
def _save_summary_to_db(self, meeting_id: int, summary_content: str, user_prompt: str, prompt_id: Optional[int] = None) -> int:
"""保存总结到数据库 - 更新meetings表的summary、user_prompt、prompt_id和updated_at字段"""
with get_db_connection() as connection:
cursor = connection.cursor()
cursor.execute("SELECT 1 FROM meetings WHERE meeting_id = %s LIMIT 1", (meeting_id,))
if not cursor.fetchone():
raise RuntimeError(f"会议不存在,无法保存总结: meeting_id={meeting_id}")
update_query = """
UPDATE meetings
SET summary = %s, user_prompt = %s, prompt_id = %s, updated_at = NOW()
WHERE meeting_id = %s
"""
cursor.execute(update_query, (summary_content, user_prompt, prompt_id, meeting_id))
connection.commit()
print(f"成功保存会议总结到meetings表meeting_id: {meeting_id}, prompt_id: {prompt_id}")
return meeting_id
# --- 状态查询和数据库操作方法 ---
def get_task_status(self, task_id: str) -> Dict[str, Any]:
"""获取任务状态"""
try:
task_data = self.redis_client.hgetall(f"llm_task:{task_id}")
if not task_data:
task_data = self._get_task_from_db(task_id)
if not task_data:
return {'task_id': task_id, 'status': 'not_found', 'error_message': 'Task not found'}
self._resume_task_if_needed(task_id, task_data)
task_data = self.redis_client.hgetall(f"llm_task:{task_id}") or task_data
return {
'task_id': task_id,
'status': task_data.get('status', 'unknown'),
'progress': int(task_data.get('progress', 0)),
'meeting_id': int(task_data.get('meeting_id', 0)),
'model_code': self._normalize_optional_text(task_data.get('model_code')),
'created_at': task_data.get('created_at'),
'updated_at': task_data.get('updated_at'),
'message': task_data.get('message'),
'result': self._normalize_optional_text(task_data.get('result')),
'error_message': self._normalize_optional_text(task_data.get('error_message'))
}
except Exception as e:
print(f"Error getting task status: {e}")
return {'task_id': task_id, 'status': 'error', 'error_message': str(e)}
def _resume_task_if_needed(self, task_id: str, task_data: Dict[str, Any]) -> None:
"""恢复服务重启后丢失在内存中的总结任务。"""
try:
status = str(task_data.get('status') or '').lower()
if status not in {'pending', 'processing'}:
return
restored_data = {
'task_id': task_id,
'meeting_id': str(task_data.get('meeting_id') or ''),
'user_prompt': '' if task_data.get('user_prompt') in (None, 'None') else str(task_data.get('user_prompt')),
'prompt_id': '' if task_data.get('prompt_id') in (None, 'None') else str(task_data.get('prompt_id')),
'model_code': '' if task_data.get('model_code') in (None, 'None') else str(task_data.get('model_code')),
'status': status,
'progress': str(task_data.get('progress') or 0),
'created_at': task_data.get('created_at') or datetime.now().isoformat(),
'updated_at': datetime.now().isoformat(),
'message': '任务恢复中,准备继续执行...',
}
self.redis_client.hset(f"llm_task:{task_id}", mapping=restored_data)
self.redis_client.expire(f"llm_task:{task_id}", 86400)
submitted = summary_task_runner.submit(f"meeting-summary:{task_id}", self._process_task, task_id)
print(f"[LLM Task Recovery] task_id={task_id}, status={status}, submitted={submitted}")
except Exception as e:
print(f"Error resuming summary task {task_id}: {e}")
def get_meeting_llm_tasks(self, meeting_id: int) -> List[Dict[str, Any]]:
"""获取会议的所有LLM任务"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = """
SELECT task_id, status, progress, user_prompt, model_code, result, created_at, completed_at, error_message
FROM llm_tasks
WHERE meeting_id = %s
ORDER BY created_at DESC
"""
cursor.execute(query, (meeting_id,))
tasks = cursor.fetchall()
for task in tasks:
if task.get('created_at'): task['created_at'] = task['created_at'].isoformat()
if task.get('completed_at'): task['completed_at'] = task['completed_at'].isoformat()
return tasks
except Exception as e:
print(f"Error getting meeting LLM tasks: {e}")
return []
def get_meeting_llm_status(self, meeting_id: int) -> Optional[Dict[str, Any]]:
"""
获取会议的最新LLM任务状态与transcription对齐
Args:
meeting_id: 会议ID
Returns:
Optional[Dict]: 任务状态信息如果没有任务返回None
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 查询最新的LLM任务
query = """
SELECT task_id, status, progress, model_code, result, created_at, completed_at, error_message
FROM llm_tasks
WHERE meeting_id = %s
ORDER BY created_at DESC
LIMIT 1
"""
cursor.execute(query, (meeting_id,))
task_record = cursor.fetchone()
cursor.close()
if not task_record:
return None
# 如果任务还在进行中,获取最新状态
if task_record['status'] in ['pending', 'processing']:
try:
return self.get_task_status(task_record['task_id'])
except Exception as e:
print(f"Failed to get latest LLM task status for meeting {meeting_id}, returning DB status. Error: {e}")
return {
'task_id': task_record['task_id'],
'status': task_record['status'],
'progress': task_record['progress'] or 0,
'meeting_id': meeting_id,
'model_code': task_record.get('model_code'),
'created_at': task_record['created_at'].isoformat() if task_record['created_at'] else None,
'completed_at': task_record['completed_at'].isoformat() if task_record['completed_at'] else None,
'result': task_record.get('result'),
'error_message': task_record['error_message']
}
except Exception as e:
print(f"Error getting meeting LLM status: {e}")
return None
def _update_task_status_in_redis(self, task_id: str, status: str, progress: int, message: str = None, result: str = None, error_message: str = None):
"""更新Redis中的任务状态"""
try:
redis_key = f"llm_task:{task_id}"
update_data = {
'status': status,
'progress': str(progress),
'updated_at': datetime.now().isoformat()
}
if message: update_data['message'] = message
if result is not None: update_data['result'] = result
if error_message is not None: update_data['error_message'] = error_message
self.redis_client.hset(redis_key, mapping=update_data)
if status == 'failed':
self.redis_client.hdel(redis_key, 'result')
elif status == 'completed':
self.redis_client.hdel(redis_key, 'error_message')
except Exception as e:
print(f"Error updating task status in Redis: {e}")
def _save_task_to_db(
self,
task_id: str,
meeting_id: int,
user_prompt: str,
prompt_id: Optional[int] = None,
model_code: Optional[str] = None
):
"""保存任务到数据库
Args:
task_id: 任务ID
meeting_id: 会议ID
user_prompt: 用户额外提示词
prompt_id: 可选的提示词模版ID如果为None则使用默认模版
model_code: 可选的模型编码,用于恢复/重试时复用原始模型
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor()
insert_query = """
INSERT INTO llm_tasks (task_id, meeting_id, user_prompt, prompt_id, model_code, status, progress, created_at)
VALUES (%s, %s, %s, %s, %s, 'pending', 0, NOW())
"""
cursor.execute(insert_query, (task_id, meeting_id, user_prompt, prompt_id, model_code))
connection.commit()
print(f"[Meeting Service] Task saved successfully to database")
except Exception as e:
print(f"Error saving task to database: {e}")
raise
def _update_task_in_db(self, task_id: str, status: str, progress: int, result: str = None, error_message: str = None):
"""更新数据库中的任务状态"""
with get_db_connection() as connection:
cursor = connection.cursor()
if status == 'completed':
query = """
UPDATE llm_tasks
SET status = %s, progress = %s, result = %s, error_message = NULL, completed_at = NOW()
WHERE task_id = %s
"""
params = (status, progress, result, task_id)
else:
query = """
UPDATE llm_tasks
SET status = %s, progress = %s, result = NULL, error_message = %s, completed_at = NULL
WHERE task_id = %s
"""
params = (status, progress, error_message, task_id)
cursor.execute(query, params)
if cursor.rowcount == 0:
raise RuntimeError(f"更新LLM任务状态失败任务不存在: task_id={task_id}")
connection.commit()
def _mark_task_failed(self, task_id: str, error_message: str) -> None:
"""尽力持久化失败状态,避免原始异常被状态更新失败覆盖。"""
try:
self._update_task_in_db(task_id, 'failed', 0, error_message=error_message)
except Exception as update_error:
print(f"Error updating failed task in database: {update_error}")
self._update_task_status_in_redis(
task_id,
'failed',
0,
message="任务执行失败",
error_message=error_message,
)
def _get_task_from_db(self, task_id: str) -> Optional[Dict[str, str]]:
"""从数据库获取任务信息"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = "SELECT * FROM llm_tasks WHERE task_id = %s"
cursor.execute(query, (task_id,))
task = cursor.fetchone()
if task:
return {
key: value.isoformat() if isinstance(value, datetime) else (None if value is None else str(value))
for key, value in task.items()
}
return None
except Exception as e:
print(f"Error getting task from database: {e}")
return None
@staticmethod
def _normalize_optional_text(value: Any) -> Optional[str]:
if value in (None, "", "None"):
return None
return str(value)
def _get_existing_summary_task(self, meeting_id: int) -> Optional[str]:
"""
检查会议是否已经有总结任务(用于并发控制)
返回最新的pending或processing状态的任务ID如果没有则返回None
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = """
SELECT task_id FROM llm_tasks
WHERE meeting_id = %s AND status IN ('pending', 'processing')
ORDER BY created_at DESC
LIMIT 1
"""
cursor.execute(query, (meeting_id,))
result = cursor.fetchone()
cursor.close()
if result:
return result['task_id']
return None
except Exception as e:
print(f"Error checking existing summary task: {e}")
return None
def _acquire_lock(self, lock_key: str, ttl_seconds: int) -> Optional[str]:
"""使用 Redis 分布式锁防止多 worker 重复执行同一后台任务。"""
try:
token = str(uuid.uuid4())
acquired = self.redis_client.set(lock_key, token, nx=True, ex=max(30, ttl_seconds))
if acquired:
return token
except Exception as e:
print(f"Error acquiring lock {lock_key}: {e}")
return None
def _release_lock(self, lock_key: str, token: Optional[str]) -> None:
if not token:
return
try:
release_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
"""
self.redis_client.eval(release_script, 1, lock_key, token)
except Exception as e:
print(f"Error releasing lock {lock_key}: {e}")
# 创建全局实例
async_meeting_service = AsyncMeetingService()