imetting/backend/app/services/async_meeting_service.py

771 lines
34 KiB
Python
Raw Normal View History

"""
异步会议服务 - 处理会议总结生成的异步任务
2026-04-07 09:26:34 +00:00
采用受控线程池执行避免阻塞 Web 请求进程
"""
import uuid
import time
2026-03-26 06:55:12 +00:00
import os
import re
from datetime import datetime
from typing import Optional, Dict, Any, List
2026-03-26 06:55:12 +00:00
from pathlib import Path
import redis
2026-04-07 09:26:34 +00:00
from app.core.config import REDIS_CONFIG, TRANSCRIPTION_POLL_CONFIG, BACKGROUND_TASK_CONFIG, AUDIO_DIR
from app.core.database import get_db_connection
from app.services.async_transcription_service import AsyncTranscriptionService
2026-04-07 09:26:34 +00:00
from app.services.background_task_runner import KeyedBackgroundTaskRunner
from app.services.llm_service import LLMService
2026-04-07 09:26:34 +00:00
summary_task_runner = KeyedBackgroundTaskRunner(
max_workers=BACKGROUND_TASK_CONFIG['summary_workers'],
thread_name_prefix="imeeting-summary",
)
monitor_task_runner = KeyedBackgroundTaskRunner(
max_workers=BACKGROUND_TASK_CONFIG['monitor_workers'],
thread_name_prefix="imeeting-monitor",
)
class AsyncMeetingService:
"""异步会议服务类 - 处理会议相关的异步任务"""
def __init__(self):
# 确保redis客户端自动解码响应代码更简洁
if 'decode_responses' not in REDIS_CONFIG:
REDIS_CONFIG['decode_responses'] = True
self.redis_client = redis.Redis(**REDIS_CONFIG)
self.llm_service = LLMService() # 复用现有的同步LLM服务
2026-04-07 09:26:34 +00:00
def enqueue_summary_generation(
self,
meeting_id: int,
user_prompt: str = "",
prompt_id: Optional[int] = None,
model_code: Optional[str] = None
) -> tuple[str, bool]:
"""创建并提交总结任务;若已有运行中的同会议总结任务,则直接返回现有任务。"""
existing_task = self._get_existing_summary_task(meeting_id)
if existing_task:
return existing_task, False
task_id = self.start_summary_generation(meeting_id, user_prompt, prompt_id, model_code)
summary_task_runner.submit(f"meeting-summary:{task_id}", self._process_task, task_id)
return task_id, True
def enqueue_transcription_monitor(
self,
meeting_id: int,
transcription_task_id: str,
prompt_id: Optional[int] = None,
model_code: Optional[str] = None
) -> bool:
"""提交转录监控任务,避免同一转录任务重复轮询。"""
return monitor_task_runner.submit(
f"transcription-monitor:{transcription_task_id}",
self.monitor_and_auto_summarize,
meeting_id,
transcription_task_id,
prompt_id,
model_code,
)
2026-03-26 06:55:12 +00:00
def start_summary_generation(self, meeting_id: int, user_prompt: str = "", prompt_id: Optional[int] = None, model_code: Optional[str] = None) -> str:
"""
2026-04-07 09:26:34 +00:00
创建异步总结任务任务的执行将由后台线程池触发
Args:
meeting_id: 会议ID
user_prompt: 用户额外提示词
prompt_id: 可选的提示词模版ID如果不指定则使用默认模版
2026-03-26 06:55:12 +00:00
model_code: 可选的LLM模型编码如果不指定则使用默认模型
Returns:
str: 任务ID
"""
try:
task_id = str(uuid.uuid4())
# 在数据库中创建任务记录
self._save_task_to_db(task_id, meeting_id, user_prompt, prompt_id)
# 将任务详情存入Redis用于快速查询状态
current_time = datetime.now().isoformat()
task_data = {
'task_id': task_id,
'meeting_id': str(meeting_id),
'user_prompt': user_prompt,
'prompt_id': str(prompt_id) if prompt_id else '',
2026-03-26 06:55:12 +00:00
'model_code': model_code or '',
'status': 'pending',
'progress': '0',
'created_at': current_time,
'updated_at': current_time
}
self.redis_client.hset(f"llm_task:{task_id}", mapping=task_data)
self.redis_client.expire(f"llm_task:{task_id}", 86400)
return task_id
except Exception as e:
print(f"Error starting summary generation: {e}")
raise e
def _process_task(self, task_id: str):
"""
2026-04-07 09:26:34 +00:00
处理单个异步任务的函数在线程池中执行
"""
print(f"Background task started for meeting summary task: {task_id}")
2026-04-07 09:26:34 +00:00
lock_token = None
lock_key = f"lock:meeting-summary-task:{task_id}"
try:
# 从Redis获取任务数据
task_data = self.redis_client.hgetall(f"llm_task:{task_id}")
if not task_data:
print(f"Error: Task {task_id} not found in Redis for processing.")
return
meeting_id = int(task_data['meeting_id'])
user_prompt = task_data.get('user_prompt', '')
prompt_id_str = task_data.get('prompt_id', '')
prompt_id = int(prompt_id_str) if prompt_id_str and prompt_id_str != '' else None
2026-03-26 06:55:12 +00:00
model_code = task_data.get('model_code', '') or None
2026-04-07 09:26:34 +00:00
lock_token = self._acquire_lock(lock_key, ttl_seconds=7200)
if not lock_token:
print(f"Task {task_id} is already being processed, skipping duplicate execution")
return
# 1. 更新状态为processing
self._update_task_status_in_redis(task_id, 'processing', 10, message="任务已开始...")
# 2. 获取会议转录内容
self._update_task_status_in_redis(task_id, 'processing', 30, message="获取会议转录内容...")
transcript_text = self._get_meeting_transcript(meeting_id)
if not transcript_text:
raise Exception("无法获取会议转录内容")
# 3. 构建提示词
self._update_task_status_in_redis(task_id, 'processing', 40, message="准备AI提示词...")
full_prompt = self._build_prompt(meeting_id, transcript_text, user_prompt, prompt_id)
2026-03-26 06:55:12 +00:00
# 4. 调用LLM API支持指定模型
self._update_task_status_in_redis(task_id, 'processing', 50, message="AI正在分析会议内容...")
2026-03-26 06:55:12 +00:00
if model_code:
summary_content = self._call_llm_with_model(full_prompt, model_code)
else:
summary_content = self.llm_service._call_llm_api(full_prompt)
if not summary_content:
raise Exception("LLM API调用失败或返回空内容")
# 5. 保存结果到主表
2026-03-26 06:55:12 +00:00
self._update_task_status_in_redis(task_id, 'processing', 90, message="保存总结结果...")
self._save_summary_to_db(meeting_id, summary_content, user_prompt, prompt_id)
2026-03-26 06:55:12 +00:00
# 6. 导出MD文件到音频同目录
self._update_task_status_in_redis(task_id, 'processing', 95, message="导出Markdown文件...")
md_path = self._export_summary_md(meeting_id, summary_content)
# 7. 任务完成result保存MD文件路径
self._update_task_in_db(task_id, 'completed', 100, result=md_path)
self._update_task_status_in_redis(task_id, 'completed', 100, result=md_path)
print(f"Task {task_id} completed successfully")
except Exception as e:
error_msg = str(e)
print(f"Task {task_id} failed: {error_msg}")
# 更新失败状态
self._update_task_in_db(task_id, 'failed', 0, error_message=error_msg)
self._update_task_status_in_redis(task_id, 'failed', 0, error_message=error_msg)
2026-04-07 09:26:34 +00:00
finally:
self._release_lock(lock_key, lock_token)
2026-03-30 08:22:09 +00:00
def monitor_and_auto_summarize(
self,
meeting_id: int,
transcription_task_id: str,
prompt_id: Optional[int] = None,
model_code: Optional[str] = None
):
"""
监控转录任务完成后自动生成总结
此方法设计为由BackgroundTasks调用在后台运行
Args:
meeting_id: 会议ID
transcription_task_id: 转录任务ID
prompt_id: 提示词模版ID可选如果不指定则使用默认模版
2026-03-30 08:22:09 +00:00
model_code: 总结模型编码可选如果不指定则使用默认模型
流程:
1. 循环轮询转录任务状态
2. 转录成功后自动启动总结任务
3. 转录失败或超时则停止轮询并记录日志
"""
2026-03-30 08:22:09 +00:00
print(f"[Monitor] Started monitoring transcription task {transcription_task_id} for meeting {meeting_id}, prompt_id: {prompt_id}, model_code: {model_code}")
# 获取配置参数
poll_interval = TRANSCRIPTION_POLL_CONFIG['poll_interval']
max_wait_time = TRANSCRIPTION_POLL_CONFIG['max_wait_time']
max_polls = max_wait_time // poll_interval
2026-04-07 09:26:34 +00:00
lock_key = f"lock:transcription-monitor:{transcription_task_id}"
lock_token = self._acquire_lock(lock_key, ttl_seconds=max_wait_time + poll_interval)
if not lock_token:
print(f"[Monitor] Monitor task already running for transcription task {transcription_task_id}, skipping duplicate worker")
return
# 延迟导入以避免循环导入
transcription_service = AsyncTranscriptionService()
poll_count = 0
try:
while poll_count < max_polls:
poll_count += 1
elapsed_time = poll_count * poll_interval
try:
# 查询转录任务状态
status_info = transcription_service.get_task_status(transcription_task_id)
current_status = status_info.get('status', 'unknown')
progress = status_info.get('progress', 0)
print(f"[Monitor] Poll {poll_count}/{max_polls} - Status: {current_status}, Progress: {progress}%, Elapsed: {elapsed_time}s")
# 检查转录是否完成
if current_status == 'completed':
print(f"[Monitor] Transcription completed successfully for meeting {meeting_id}")
# 防止并发:检查是否已经有总结任务存在
existing_task = self._get_existing_summary_task(meeting_id)
if existing_task:
print(f"[Monitor] Summary task already exists for meeting {meeting_id}, task_id: {existing_task}, skipping duplicate task creation")
else:
# 启动总结任务
try:
2026-04-07 09:26:34 +00:00
summary_task_id, created = self.enqueue_summary_generation(
2026-03-30 08:22:09 +00:00
meeting_id,
user_prompt="",
prompt_id=prompt_id,
model_code=model_code
)
2026-04-07 09:26:34 +00:00
if created:
print(f"[Monitor] Summary task {summary_task_id} started for meeting {meeting_id}")
else:
print(f"[Monitor] Reused existing summary task {summary_task_id} for meeting {meeting_id}")
except Exception as e:
error_msg = f"Failed to start summary generation: {e}"
print(f"[Monitor] {error_msg}")
# 监控任务完成,退出循环
break
# 检查转录是否失败
elif current_status == 'failed':
error_msg = status_info.get('error_message', 'Unknown error')
print(f"[Monitor] Transcription failed for meeting {meeting_id}: {error_msg}")
# 转录失败,停止监控
break
# 转录还在进行中pending/processing继续等待
elif current_status in ['pending', 'processing']:
# 等待一段时间后继续轮询
time.sleep(poll_interval)
else:
# 未知状态
print(f"[Monitor] Unknown transcription status: {current_status}")
time.sleep(poll_interval)
except Exception as e:
print(f"[Monitor] Error checking transcription status: {e}")
# 出错后等待一段时间继续尝试
time.sleep(poll_interval)
# 检查是否超时
if poll_count >= max_polls:
print(f"[Monitor] Transcription monitoring timed out after {max_wait_time}s for meeting {meeting_id}")
except Exception as e:
print(f"[Monitor] Fatal error in monitor_and_auto_summarize: {e}")
2026-04-07 09:26:34 +00:00
finally:
self._release_lock(lock_key, lock_token)
# --- 会议相关方法 ---
2026-03-26 06:55:12 +00:00
def _call_llm_with_model(self, prompt: str, model_code: str) -> Optional[str]:
"""使用指定模型编码调用LLM API"""
import requests
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute(
"SELECT endpoint_url, api_key, llm_model_name, llm_timeout, llm_temperature, llm_top_p, llm_max_tokens FROM llm_model_config WHERE model_code = %s AND is_active = 1",
(model_code,)
)
config = cursor.fetchone()
if not config:
print(f"模型 {model_code} 未找到或未激活,回退到默认模型")
return self.llm_service._call_llm_api(prompt)
endpoint_url = (config['endpoint_url'] or '').rstrip('/')
if not endpoint_url.endswith('/chat/completions'):
endpoint_url = f"{endpoint_url}/chat/completions"
headers = {"Content-Type": "application/json"}
if config['api_key']:
headers["Authorization"] = f"Bearer {config['api_key']}"
payload = {
"model": config['llm_model_name'],
"messages": [{"role": "user", "content": prompt}],
"temperature": float(config.get('llm_temperature', 0.7)),
"top_p": float(config.get('llm_top_p', 0.9)),
"max_tokens": int(config.get('llm_max_tokens', 4096)),
"stream": False,
}
response = requests.post(
endpoint_url,
headers=headers,
json=payload,
timeout=int(config.get('llm_timeout', 120)),
)
response.raise_for_status()
return self.llm_service._extract_response_text(response.json())
except Exception as e:
print(f"使用模型 {model_code} 调用失败: {e}")
return None
def _export_summary_md(self, meeting_id: int, summary_content: str) -> Optional[str]:
"""将总结内容导出为MD文件保存到音频同目录返回文件路径"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT title FROM meetings WHERE meeting_id = %s", (meeting_id,))
meeting = cursor.fetchone()
cursor.execute("SELECT file_path FROM audio_files WHERE meeting_id = %s LIMIT 1", (meeting_id,))
audio = cursor.fetchone()
title = meeting['title'] if meeting else f"meeting_{meeting_id}"
# 始终以 AUDIO_DIR 为基准,避免数据库中的绝对路径指向不可写目录
if audio and audio.get('file_path'):
audio_path = Path(audio['file_path'])
# 提取 meeting_id 层级的子目录(如 "226" 或 "226/sub"
try:
rel = audio_path.relative_to(AUDIO_DIR)
md_dir = AUDIO_DIR / rel.parent
except ValueError:
# file_path 不在 AUDIO_DIR 下(如 Docker 绝对路径),取最后一级目录名
md_dir = AUDIO_DIR / audio_path.parent.name
else:
md_dir = AUDIO_DIR / str(meeting_id)
md_dir.mkdir(parents=True, exist_ok=True)
safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_', '.')).strip()
if not safe_title:
safe_title = f"meeting_{meeting_id}"
md_path = md_dir / f"{safe_title}_总结.md"
md_path.write_text(summary_content, encoding='utf-8')
md_path_str = str(md_path)
print(f"总结MD文件已保存: {md_path_str}")
return md_path_str
except Exception as e:
print(f"导出总结MD文件失败: {e}")
return None
def _get_meeting_transcript(self, meeting_id: int) -> str:
"""从数据库获取会议转录内容"""
try:
with get_db_connection() as connection:
cursor = connection.cursor()
query = """
SELECT speaker_tag, start_time_ms, end_time_ms, text_content
FROM transcript_segments
WHERE meeting_id = %s
ORDER BY start_time_ms
"""
cursor.execute(query, (meeting_id,))
segments = cursor.fetchall()
if not segments:
return ""
# 组装转录文本
transcript_lines = []
for speaker_tag, start_time, end_time, text in segments:
# 将毫秒转换为分:秒格式
start_min = start_time // 60000
start_sec = (start_time % 60000) // 1000
transcript_lines.append(f"[{start_min:02d}:{start_sec:02d}] 说话人{speaker_tag}: {text}")
return "\n".join(transcript_lines)
except Exception as e:
print(f"获取会议转录内容错误: {e}")
return ""
def _get_meeting_prompt_context(self, meeting_id: int) -> Dict[str, Any]:
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
cursor.execute(
"""
SELECT m.title, m.meeting_time, u.caption AS creator_name
FROM meetings m
LEFT JOIN sys_users u ON m.user_id = u.user_id
WHERE m.meeting_id = %s
""",
(meeting_id,)
)
meeting = cursor.fetchone()
if not meeting:
cursor.close()
return {}
meeting_time = meeting.get('meeting_time')
meeting_time_text = ''
if isinstance(meeting_time, datetime):
meeting_time_text = meeting_time.strftime('%Y-%m-%d %H:%M:%S')
elif meeting_time:
meeting_time_text = str(meeting_time)
cursor.execute(
"""
SELECT u.caption
FROM attendees a
JOIN sys_users u ON a.user_id = u.user_id
WHERE a.meeting_id = %s
ORDER BY u.caption ASC
""",
(meeting_id,)
)
attendee_rows = cursor.fetchall()
attendee_names = [row.get('caption') for row in attendee_rows if row.get('caption')]
attendee_text = ''.join(attendee_names)
cursor.close()
return {
'title': meeting.get('title') or '',
'meeting_time': meeting_time_text,
'meeting_time_value': meeting_time,
'creator_name': meeting.get('creator_name') or '',
'attendees': attendee_text
}
except Exception as e:
print(f"获取会议提示词上下文错误: {e}")
return {}
def _format_meeting_time_value(self, meeting_time_value: Any, custom_format: Optional[str] = None) -> str:
default_format = '%Y-%m-%d %H:%M:%S'
if isinstance(meeting_time_value, datetime):
try:
return meeting_time_value.strftime(custom_format or default_format)
except Exception:
return meeting_time_value.strftime(default_format)
if meeting_time_value:
meeting_time_text = str(meeting_time_value)
if custom_format:
try:
parsed_time = datetime.fromisoformat(meeting_time_text.replace('Z', '+00:00'))
return parsed_time.strftime(custom_format)
except Exception:
return meeting_time_text
return meeting_time_text
return ''
def _apply_prompt_variables(self, template: str, variables: Dict[str, Any]) -> str:
if not template:
return template
rendered = re.sub(
r"\{\{\s*meeting_time\s*:\s*([^{}]+?)\s*\}\}",
lambda match: self._format_meeting_time_value(
variables.get('meeting_time_value'),
match.group(1).strip()
),
template
)
for key, value in variables.items():
if key == 'meeting_time_value':
continue
rendered = rendered.replace(f"{{{{{key}}}}}", value or '')
rendered = rendered.replace(f"{{{{ {key} }}}}", value or '')
return rendered
def _build_prompt(self, meeting_id: int, transcript_text: str, user_prompt: str, prompt_id: Optional[int] = None) -> str:
"""
构建完整的提示词
使用数据库中配置的MEETING_TASK提示词模板
Args:
meeting_id: 会议ID
transcript_text: 会议转录文本
user_prompt: 用户额外提示词
prompt_id: 可选的提示词模版ID如果不指定则使用默认模版
"""
# 从数据库获取会议任务的提示词模板支持指定prompt_id
system_prompt = self.llm_service.get_task_prompt('MEETING_TASK', prompt_id=prompt_id)
meeting_context = self._get_meeting_prompt_context(meeting_id)
prompt_variables = {
'meeting_id': str(meeting_id),
'meeting_title': meeting_context.get('title', ''),
'meeting_time': meeting_context.get('meeting_time', ''),
'meeting_creator': meeting_context.get('creator_name', ''),
'meeting_attendees': meeting_context.get('attendees', ''),
'meeting_time_value': meeting_context.get('meeting_time_value')
}
system_prompt = self._apply_prompt_variables(system_prompt, prompt_variables)
rendered_user_prompt = self._apply_prompt_variables(user_prompt, prompt_variables) if user_prompt else ''
prompt = f"{system_prompt}\n\n"
if rendered_user_prompt:
prompt += f"用户额外要求:{rendered_user_prompt}\n\n"
prompt += f"会议转录内容:\n{transcript_text}\n\n请根据以上内容生成会议总结:"
return prompt
def _save_summary_to_db(self, meeting_id: int, summary_content: str, user_prompt: str, prompt_id: Optional[int] = None) -> Optional[int]:
"""保存总结到数据库 - 更新meetings表的summary、user_prompt、prompt_id和updated_at字段"""
try:
with get_db_connection() as connection:
cursor = connection.cursor()
# 更新meetings表的summary、user_prompt、prompt_id和updated_at字段
update_query = """
UPDATE meetings
SET summary = %s, user_prompt = %s, prompt_id = %s, updated_at = NOW()
WHERE meeting_id = %s
"""
cursor.execute(update_query, (summary_content, user_prompt, prompt_id, meeting_id))
connection.commit()
print(f"成功保存会议总结到meetings表meeting_id: {meeting_id}, prompt_id: {prompt_id}")
return meeting_id
except Exception as e:
print(f"保存总结到数据库错误: {e}")
return None
# --- 状态查询和数据库操作方法 ---
def get_task_status(self, task_id: str) -> Dict[str, Any]:
"""获取任务状态"""
try:
task_data = self.redis_client.hgetall(f"llm_task:{task_id}")
if not task_data:
task_data = self._get_task_from_db(task_id)
if not task_data:
return {'task_id': task_id, 'status': 'not_found', 'error_message': 'Task not found'}
return {
'task_id': task_id,
'status': task_data.get('status', 'unknown'),
'progress': int(task_data.get('progress', 0)),
'meeting_id': int(task_data.get('meeting_id', 0)),
'created_at': task_data.get('created_at'),
'updated_at': task_data.get('updated_at'),
'message': task_data.get('message'),
'result': task_data.get('result'),
'error_message': task_data.get('error_message')
}
except Exception as e:
print(f"Error getting task status: {e}")
return {'task_id': task_id, 'status': 'error', 'error_message': str(e)}
def get_meeting_llm_tasks(self, meeting_id: int) -> List[Dict[str, Any]]:
"""获取会议的所有LLM任务"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = "SELECT task_id, status, progress, user_prompt, created_at, completed_at, error_message FROM llm_tasks WHERE meeting_id = %s ORDER BY created_at DESC"
cursor.execute(query, (meeting_id,))
tasks = cursor.fetchall()
for task in tasks:
if task.get('created_at'): task['created_at'] = task['created_at'].isoformat()
if task.get('completed_at'): task['completed_at'] = task['completed_at'].isoformat()
return tasks
except Exception as e:
print(f"Error getting meeting LLM tasks: {e}")
return []
def get_meeting_llm_status(self, meeting_id: int) -> Optional[Dict[str, Any]]:
"""
获取会议的最新LLM任务状态与transcription对齐
Args:
meeting_id: 会议ID
Returns:
Optional[Dict]: 任务状态信息如果没有任务返回None
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
# 查询最新的LLM任务
query = """
SELECT task_id, status, progress, created_at, completed_at, error_message
FROM llm_tasks
WHERE meeting_id = %s
ORDER BY created_at DESC
LIMIT 1
"""
cursor.execute(query, (meeting_id,))
task_record = cursor.fetchone()
cursor.close()
if not task_record:
return None
# 如果任务还在进行中,获取最新状态
if task_record['status'] in ['pending', 'processing']:
try:
return self.get_task_status(task_record['task_id'])
except Exception as e:
print(f"Failed to get latest LLM task status for meeting {meeting_id}, returning DB status. Error: {e}")
return {
'task_id': task_record['task_id'],
'status': task_record['status'],
'progress': task_record['progress'] or 0,
'meeting_id': meeting_id,
'created_at': task_record['created_at'].isoformat() if task_record['created_at'] else None,
'completed_at': task_record['completed_at'].isoformat() if task_record['completed_at'] else None,
'error_message': task_record['error_message']
}
except Exception as e:
print(f"Error getting meeting LLM status: {e}")
return None
def _update_task_status_in_redis(self, task_id: str, status: str, progress: int, message: str = None, result: str = None, error_message: str = None):
"""更新Redis中的任务状态"""
try:
update_data = {
'status': status,
'progress': str(progress),
'updated_at': datetime.now().isoformat()
}
if message: update_data['message'] = message
if result: update_data['result'] = result
if error_message: update_data['error_message'] = error_message
self.redis_client.hset(f"llm_task:{task_id}", mapping=update_data)
except Exception as e:
print(f"Error updating task status in Redis: {e}")
def _save_task_to_db(self, task_id: str, meeting_id: int, user_prompt: str, prompt_id: Optional[int] = None):
"""保存任务到数据库
Args:
task_id: 任务ID
meeting_id: 会议ID
user_prompt: 用户额外提示词
prompt_id: 可选的提示词模版ID如果为None则使用默认模版
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor()
insert_query = "INSERT INTO llm_tasks (task_id, meeting_id, user_prompt, prompt_id, status, progress, created_at) VALUES (%s, %s, %s, %s, 'pending', 0, NOW())"
cursor.execute(insert_query, (task_id, meeting_id, user_prompt, prompt_id))
connection.commit()
print(f"[Meeting Service] Task saved successfully to database")
except Exception as e:
print(f"Error saving task to database: {e}")
raise
def _update_task_in_db(self, task_id: str, status: str, progress: int, result: str = None, error_message: str = None):
"""更新数据库中的任务状态"""
try:
with get_db_connection() as connection:
cursor = connection.cursor()
if status == 'completed':
2026-03-26 06:55:12 +00:00
query = "UPDATE llm_tasks SET status = %s, progress = %s, result = %s, error_message = NULL, completed_at = NOW() WHERE task_id = %s"
params = (status, progress, result, task_id)
else:
query = "UPDATE llm_tasks SET status = %s, progress = %s, error_message = %s WHERE task_id = %s"
2026-03-26 06:55:12 +00:00
params = (status, progress, error_message, task_id)
2026-03-26 06:55:12 +00:00
cursor.execute(query, params)
connection.commit()
except Exception as e:
print(f"Error updating task in database: {e}")
def _get_task_from_db(self, task_id: str) -> Optional[Dict[str, str]]:
"""从数据库获取任务信息"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = "SELECT * FROM llm_tasks WHERE task_id = %s"
cursor.execute(query, (task_id,))
task = cursor.fetchone()
if task:
# 确保所有字段都是字符串以匹配Redis的行为
return {k: v.isoformat() if isinstance(v, datetime) else str(v) for k, v in task.items()}
return None
except Exception as e:
print(f"Error getting task from database: {e}")
return None
def _get_existing_summary_task(self, meeting_id: int) -> Optional[str]:
"""
检查会议是否已经有总结任务用于并发控制
返回最新的pending或processing状态的任务ID如果没有则返回None
"""
try:
with get_db_connection() as connection:
cursor = connection.cursor(dictionary=True)
query = """
SELECT task_id FROM llm_tasks
WHERE meeting_id = %s AND status IN ('pending', 'processing')
ORDER BY created_at DESC
LIMIT 1
"""
cursor.execute(query, (meeting_id,))
result = cursor.fetchone()
cursor.close()
if result:
return result['task_id']
return None
except Exception as e:
print(f"Error checking existing summary task: {e}")
return None
2026-04-07 09:26:34 +00:00
def _acquire_lock(self, lock_key: str, ttl_seconds: int) -> Optional[str]:
"""使用 Redis 分布式锁防止多 worker 重复执行同一后台任务。"""
try:
token = str(uuid.uuid4())
acquired = self.redis_client.set(lock_key, token, nx=True, ex=max(30, ttl_seconds))
if acquired:
return token
except Exception as e:
print(f"Error acquiring lock {lock_key}: {e}")
return None
def _release_lock(self, lock_key: str, token: Optional[str]) -> None:
if not token:
return
try:
release_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
end
return 0
"""
self.redis_client.eval(release_script, 1, lock_key, token)
except Exception as e:
print(f"Error releasing lock {lock_key}: {e}")
# 创建全局实例
async_meeting_service = AsyncMeetingService()