2026-01-19 11:03:08 +00:00
"""
异步会议服务 - 处理会议总结生成的异步任务
采用FastAPI BackgroundTasks模式
"""
import uuid
import time
2026-03-26 06:55:12 +00:00
import os
2026-01-19 11:03:08 +00:00
from datetime import datetime
from typing import Optional , Dict , Any , List
2026-03-26 06:55:12 +00:00
from pathlib import Path
2026-01-19 11:03:08 +00:00
import redis
2026-03-26 06:55:12 +00:00
from app . core . config import REDIS_CONFIG , TRANSCRIPTION_POLL_CONFIG , AUDIO_DIR
2026-01-19 11:03:08 +00:00
from app . core . database import get_db_connection
from app . services . async_transcription_service import AsyncTranscriptionService
from app . services . llm_service import LLMService
class AsyncMeetingService :
""" 异步会议服务类 - 处理会议相关的异步任务 """
def __init__ ( self ) :
# 确保redis客户端自动解码响应, 代码更简洁
if ' decode_responses ' not in REDIS_CONFIG :
REDIS_CONFIG [ ' decode_responses ' ] = True
self . redis_client = redis . Redis ( * * REDIS_CONFIG )
self . llm_service = LLMService ( ) # 复用现有的同步LLM服务
2026-03-26 06:55:12 +00:00
def start_summary_generation ( self , meeting_id : int , user_prompt : str = " " , prompt_id : Optional [ int ] = None , model_code : Optional [ str ] = None ) - > str :
2026-01-19 11:03:08 +00:00
"""
创建异步总结任务 , 任务的执行将由外部 ( 如API层的BackgroundTasks ) 触发 。
Args :
meeting_id : 会议ID
user_prompt : 用户额外提示词
prompt_id : 可选的提示词模版ID , 如果不指定则使用默认模版
2026-03-26 06:55:12 +00:00
model_code : 可选的LLM模型编码 , 如果不指定则使用默认模型
2026-01-19 11:03:08 +00:00
Returns :
str : 任务ID
"""
try :
task_id = str ( uuid . uuid4 ( ) )
# 在数据库中创建任务记录
self . _save_task_to_db ( task_id , meeting_id , user_prompt , prompt_id )
# 将任务详情存入Redis, 用于快速查询状态
current_time = datetime . now ( ) . isoformat ( )
task_data = {
' task_id ' : task_id ,
' meeting_id ' : str ( meeting_id ) ,
' user_prompt ' : user_prompt ,
' prompt_id ' : str ( prompt_id ) if prompt_id else ' ' ,
2026-03-26 06:55:12 +00:00
' model_code ' : model_code or ' ' ,
2026-01-19 11:03:08 +00:00
' status ' : ' pending ' ,
' progress ' : ' 0 ' ,
' created_at ' : current_time ,
' updated_at ' : current_time
}
self . redis_client . hset ( f " llm_task: { task_id } " , mapping = task_data )
self . redis_client . expire ( f " llm_task: { task_id } " , 86400 )
return task_id
except Exception as e :
print ( f " Error starting summary generation: { e } " )
raise e
def _process_task ( self , task_id : str ) :
"""
处理单个异步任务的函数 , 设计为由BackgroundTasks调用 。
"""
print ( f " Background task started for meeting summary task: { task_id } " )
try :
# 从Redis获取任务数据
task_data = self . redis_client . hgetall ( f " llm_task: { task_id } " )
if not task_data :
print ( f " Error: Task { task_id } not found in Redis for processing. " )
return
meeting_id = int ( task_data [ ' meeting_id ' ] )
user_prompt = task_data . get ( ' user_prompt ' , ' ' )
prompt_id_str = task_data . get ( ' prompt_id ' , ' ' )
prompt_id = int ( prompt_id_str ) if prompt_id_str and prompt_id_str != ' ' else None
2026-03-26 06:55:12 +00:00
model_code = task_data . get ( ' model_code ' , ' ' ) or None
2026-01-19 11:03:08 +00:00
# 1. 更新状态为processing
self . _update_task_status_in_redis ( task_id , ' processing ' , 10 , message = " 任务已开始... " )
# 2. 获取会议转录内容
self . _update_task_status_in_redis ( task_id , ' processing ' , 30 , message = " 获取会议转录内容... " )
transcript_text = self . _get_meeting_transcript ( meeting_id )
if not transcript_text :
raise Exception ( " 无法获取会议转录内容 " )
# 3. 构建提示词
self . _update_task_status_in_redis ( task_id , ' processing ' , 40 , message = " 准备AI提示词... " )
full_prompt = self . _build_prompt ( transcript_text , user_prompt , prompt_id )
2026-03-26 06:55:12 +00:00
# 4. 调用LLM API( 支持指定模型)
2026-01-19 11:03:08 +00:00
self . _update_task_status_in_redis ( task_id , ' processing ' , 50 , message = " AI正在分析会议内容... " )
2026-03-26 06:55:12 +00:00
if model_code :
summary_content = self . _call_llm_with_model ( full_prompt , model_code )
else :
summary_content = self . llm_service . _call_llm_api ( full_prompt )
2026-01-19 11:03:08 +00:00
if not summary_content :
raise Exception ( " LLM API调用失败或返回空内容 " )
# 5. 保存结果到主表
2026-03-26 06:55:12 +00:00
self . _update_task_status_in_redis ( task_id , ' processing ' , 90 , message = " 保存总结结果... " )
2026-01-19 11:03:08 +00:00
self . _save_summary_to_db ( meeting_id , summary_content , user_prompt , prompt_id )
2026-03-26 06:55:12 +00:00
# 6. 导出MD文件到音频同目录
self . _update_task_status_in_redis ( task_id , ' processing ' , 95 , message = " 导出Markdown文件... " )
md_path = self . _export_summary_md ( meeting_id , summary_content )
# 7. 任务完成, result保存MD文件路径
self . _update_task_in_db ( task_id , ' completed ' , 100 , result = md_path )
self . _update_task_status_in_redis ( task_id , ' completed ' , 100 , result = md_path )
2026-01-19 11:03:08 +00:00
print ( f " Task { task_id } completed successfully " )
except Exception as e :
error_msg = str ( e )
print ( f " Task { task_id } failed: { error_msg } " )
# 更新失败状态
self . _update_task_in_db ( task_id , ' failed ' , 0 , error_message = error_msg )
self . _update_task_status_in_redis ( task_id , ' failed ' , 0 , error_message = error_msg )
2026-03-30 08:22:09 +00:00
def monitor_and_auto_summarize (
self ,
meeting_id : int ,
transcription_task_id : str ,
prompt_id : Optional [ int ] = None ,
model_code : Optional [ str ] = None
) :
2026-01-19 11:03:08 +00:00
"""
监控转录任务 , 完成后自动生成总结
此方法设计为由BackgroundTasks调用 , 在后台运行
Args :
meeting_id : 会议ID
transcription_task_id : 转录任务ID
prompt_id : 提示词模版ID ( 可选 , 如果不指定则使用默认模版 )
2026-03-30 08:22:09 +00:00
model_code : 总结模型编码 ( 可选 , 如果不指定则使用默认模型 )
2026-01-19 11:03:08 +00:00
流程 :
1. 循环轮询转录任务状态
2. 转录成功后自动启动总结任务
3. 转录失败或超时则停止轮询并记录日志
"""
2026-03-30 08:22:09 +00:00
print ( f " [Monitor] Started monitoring transcription task { transcription_task_id } for meeting { meeting_id } , prompt_id: { prompt_id } , model_code: { model_code } " )
2026-01-19 11:03:08 +00:00
# 获取配置参数
poll_interval = TRANSCRIPTION_POLL_CONFIG [ ' poll_interval ' ]
max_wait_time = TRANSCRIPTION_POLL_CONFIG [ ' max_wait_time ' ]
max_polls = max_wait_time / / poll_interval
# 延迟导入以避免循环导入
transcription_service = AsyncTranscriptionService ( )
poll_count = 0
try :
while poll_count < max_polls :
poll_count + = 1
elapsed_time = poll_count * poll_interval
try :
# 查询转录任务状态
status_info = transcription_service . get_task_status ( transcription_task_id )
current_status = status_info . get ( ' status ' , ' unknown ' )
progress = status_info . get ( ' progress ' , 0 )
print ( f " [Monitor] Poll { poll_count } / { max_polls } - Status: { current_status } , Progress: { progress } %, Elapsed: { elapsed_time } s " )
# 检查转录是否完成
if current_status == ' completed ' :
print ( f " [Monitor] Transcription completed successfully for meeting { meeting_id } " )
# 防止并发:检查是否已经有总结任务存在
existing_task = self . _get_existing_summary_task ( meeting_id )
if existing_task :
print ( f " [Monitor] Summary task already exists for meeting { meeting_id } , task_id: { existing_task } , skipping duplicate task creation " )
else :
# 启动总结任务
try :
2026-03-30 08:22:09 +00:00
summary_task_id = self . start_summary_generation (
meeting_id ,
user_prompt = " " ,
prompt_id = prompt_id ,
model_code = model_code
)
2026-01-19 11:03:08 +00:00
print ( f " [Monitor] Summary task { summary_task_id } started for meeting { meeting_id } " )
# 在后台执行总结任务
self . _process_task ( summary_task_id )
except Exception as e :
error_msg = f " Failed to start summary generation: { e } "
print ( f " [Monitor] { error_msg } " )
# 监控任务完成,退出循环
break
# 检查转录是否失败
elif current_status == ' failed ' :
error_msg = status_info . get ( ' error_message ' , ' Unknown error ' )
print ( f " [Monitor] Transcription failed for meeting { meeting_id } : { error_msg } " )
# 转录失败,停止监控
break
# 转录还在进行中( pending/processing) , 继续等待
elif current_status in [ ' pending ' , ' processing ' ] :
# 等待一段时间后继续轮询
time . sleep ( poll_interval )
else :
# 未知状态
print ( f " [Monitor] Unknown transcription status: { current_status } " )
time . sleep ( poll_interval )
except Exception as e :
print ( f " [Monitor] Error checking transcription status: { e } " )
# 出错后等待一段时间继续尝试
time . sleep ( poll_interval )
# 检查是否超时
if poll_count > = max_polls :
print ( f " [Monitor] Transcription monitoring timed out after { max_wait_time } s for meeting { meeting_id } " )
except Exception as e :
print ( f " [Monitor] Fatal error in monitor_and_auto_summarize: { e } " )
# --- 会议相关方法 ---
2026-03-26 06:55:12 +00:00
def _call_llm_with_model ( self , prompt : str , model_code : str ) - > Optional [ str ] :
""" 使用指定模型编码调用LLM API """
import requests
try :
with get_db_connection ( ) as connection :
cursor = connection . cursor ( dictionary = True )
cursor . execute (
" SELECT endpoint_url, api_key, llm_model_name, llm_timeout, llm_temperature, llm_top_p, llm_max_tokens FROM llm_model_config WHERE model_code = %s AND is_active = 1 " ,
( model_code , )
)
config = cursor . fetchone ( )
if not config :
print ( f " 模型 { model_code } 未找到或未激活,回退到默认模型 " )
return self . llm_service . _call_llm_api ( prompt )
endpoint_url = ( config [ ' endpoint_url ' ] or ' ' ) . rstrip ( ' / ' )
if not endpoint_url . endswith ( ' /chat/completions ' ) :
endpoint_url = f " { endpoint_url } /chat/completions "
headers = { " Content-Type " : " application/json " }
if config [ ' api_key ' ] :
headers [ " Authorization " ] = f " Bearer { config [ ' api_key ' ] } "
payload = {
" model " : config [ ' llm_model_name ' ] ,
" messages " : [ { " role " : " user " , " content " : prompt } ] ,
" temperature " : float ( config . get ( ' llm_temperature ' , 0.7 ) ) ,
" top_p " : float ( config . get ( ' llm_top_p ' , 0.9 ) ) ,
" max_tokens " : int ( config . get ( ' llm_max_tokens ' , 4096 ) ) ,
" stream " : False ,
}
response = requests . post (
endpoint_url ,
headers = headers ,
json = payload ,
timeout = int ( config . get ( ' llm_timeout ' , 120 ) ) ,
)
response . raise_for_status ( )
return self . llm_service . _extract_response_text ( response . json ( ) )
except Exception as e :
print ( f " 使用模型 { model_code } 调用失败: { e } " )
return None
def _export_summary_md ( self , meeting_id : int , summary_content : str ) - > Optional [ str ] :
""" 将总结内容导出为MD文件, 保存到音频同目录, 返回文件路径 """
try :
with get_db_connection ( ) as connection :
cursor = connection . cursor ( dictionary = True )
cursor . execute ( " SELECT title FROM meetings WHERE meeting_id = %s " , ( meeting_id , ) )
meeting = cursor . fetchone ( )
cursor . execute ( " SELECT file_path FROM audio_files WHERE meeting_id = %s LIMIT 1 " , ( meeting_id , ) )
audio = cursor . fetchone ( )
title = meeting [ ' title ' ] if meeting else f " meeting_ { meeting_id } "
# 始终以 AUDIO_DIR 为基准,避免数据库中的绝对路径指向不可写目录
if audio and audio . get ( ' file_path ' ) :
audio_path = Path ( audio [ ' file_path ' ] )
# 提取 meeting_id 层级的子目录(如 "226" 或 "226/sub")
try :
rel = audio_path . relative_to ( AUDIO_DIR )
md_dir = AUDIO_DIR / rel . parent
except ValueError :
# file_path 不在 AUDIO_DIR 下(如 Docker 绝对路径),取最后一级目录名
md_dir = AUDIO_DIR / audio_path . parent . name
else :
md_dir = AUDIO_DIR / str ( meeting_id )
md_dir . mkdir ( parents = True , exist_ok = True )
safe_title = " " . join ( c for c in title if c . isalnum ( ) or c in ( ' ' , ' - ' , ' _ ' , ' . ' ) ) . strip ( )
if not safe_title :
safe_title = f " meeting_ { meeting_id } "
md_path = md_dir / f " { safe_title } _总结.md "
md_path . write_text ( summary_content , encoding = ' utf-8 ' )
md_path_str = str ( md_path )
print ( f " 总结MD文件已保存: { md_path_str } " )
return md_path_str
except Exception as e :
print ( f " 导出总结MD文件失败: { e } " )
return None
2026-01-19 11:03:08 +00:00
def _get_meeting_transcript ( self , meeting_id : int ) - > str :
""" 从数据库获取会议转录内容 """
try :
with get_db_connection ( ) as connection :
cursor = connection . cursor ( )
query = """
SELECT speaker_tag , start_time_ms , end_time_ms , text_content
FROM transcript_segments
WHERE meeting_id = % s
ORDER BY start_time_ms
"""
cursor . execute ( query , ( meeting_id , ) )
segments = cursor . fetchall ( )
if not segments :
return " "
# 组装转录文本
transcript_lines = [ ]
for speaker_tag , start_time , end_time , text in segments :
# 将毫秒转换为分:秒格式
start_min = start_time / / 60000
start_sec = ( start_time % 60000 ) / / 1000
transcript_lines . append ( f " [ { start_min : 02d } : { start_sec : 02d } ] 说话人 { speaker_tag } : { text } " )
return " \n " . join ( transcript_lines )
except Exception as e :
print ( f " 获取会议转录内容错误: { e } " )
return " "
def _build_prompt ( self , transcript_text : str , user_prompt : str , prompt_id : Optional [ int ] = None ) - > str :
"""
构建完整的提示词
使用数据库中配置的MEETING_TASK提示词模板
Args :
transcript_text : 会议转录文本
user_prompt : 用户额外提示词
prompt_id : 可选的提示词模版ID , 如果不指定则使用默认模版
"""
# 从数据库获取会议任务的提示词模板( 支持指定prompt_id)
system_prompt = self . llm_service . get_task_prompt ( ' MEETING_TASK ' , prompt_id = prompt_id )
prompt = f " { system_prompt } \n \n "
if user_prompt :
prompt + = f " 用户额外要求: { user_prompt } \n \n "
prompt + = f " 会议转录内容: \n { transcript_text } \n \n 请根据以上内容生成会议总结: "
return prompt
def _save_summary_to_db ( self , meeting_id : int , summary_content : str , user_prompt : str , prompt_id : Optional [ int ] = None ) - > Optional [ int ] :
""" 保存总结到数据库 - 更新meetings表的summary、user_prompt、prompt_id和updated_at字段 """
try :
with get_db_connection ( ) as connection :
cursor = connection . cursor ( )
# 更新meetings表的summary、user_prompt、prompt_id和updated_at字段
update_query = """
UPDATE meetings
SET summary = % s , user_prompt = % s , prompt_id = % s , updated_at = NOW ( )
WHERE meeting_id = % s
"""
cursor . execute ( update_query , ( summary_content , user_prompt , prompt_id , meeting_id ) )
connection . commit ( )
print ( f " 成功保存会议总结到meetings表, meeting_id: { meeting_id } , prompt_id: { prompt_id } " )
return meeting_id
except Exception as e :
print ( f " 保存总结到数据库错误: { e } " )
return None
# --- 状态查询和数据库操作方法 ---
def get_task_status ( self , task_id : str ) - > Dict [ str , Any ] :
""" 获取任务状态 """
try :
task_data = self . redis_client . hgetall ( f " llm_task: { task_id } " )
if not task_data :
task_data = self . _get_task_from_db ( task_id )
if not task_data :
return { ' task_id ' : task_id , ' status ' : ' not_found ' , ' error_message ' : ' Task not found ' }
return {
' task_id ' : task_id ,
' status ' : task_data . get ( ' status ' , ' unknown ' ) ,
' progress ' : int ( task_data . get ( ' progress ' , 0 ) ) ,
' meeting_id ' : int ( task_data . get ( ' meeting_id ' , 0 ) ) ,
' created_at ' : task_data . get ( ' created_at ' ) ,
' updated_at ' : task_data . get ( ' updated_at ' ) ,
2026-04-02 11:07:41 +00:00
' message ' : task_data . get ( ' message ' ) ,
2026-01-19 11:03:08 +00:00
' result ' : task_data . get ( ' result ' ) ,
' error_message ' : task_data . get ( ' error_message ' )
}
except Exception as e :
print ( f " Error getting task status: { e } " )
return { ' task_id ' : task_id , ' status ' : ' error ' , ' error_message ' : str ( e ) }
def get_meeting_llm_tasks ( self , meeting_id : int ) - > List [ Dict [ str , Any ] ] :
""" 获取会议的所有LLM任务 """
try :
with get_db_connection ( ) as connection :
cursor = connection . cursor ( dictionary = True )
query = " SELECT task_id, status, progress, user_prompt, created_at, completed_at, error_message FROM llm_tasks WHERE meeting_id = %s ORDER BY created_at DESC "
cursor . execute ( query , ( meeting_id , ) )
tasks = cursor . fetchall ( )
for task in tasks :
if task . get ( ' created_at ' ) : task [ ' created_at ' ] = task [ ' created_at ' ] . isoformat ( )
if task . get ( ' completed_at ' ) : task [ ' completed_at ' ] = task [ ' completed_at ' ] . isoformat ( )
return tasks
except Exception as e :
print ( f " Error getting meeting LLM tasks: { e } " )
return [ ]
def get_meeting_llm_status ( self , meeting_id : int ) - > Optional [ Dict [ str , Any ] ] :
"""
获取会议的最新LLM任务状态 ( 与transcription对齐 )
Args :
meeting_id : 会议ID
Returns :
Optional [ Dict ] : 任务状态信息 , 如果没有任务返回None
"""
try :
with get_db_connection ( ) as connection :
cursor = connection . cursor ( dictionary = True )
# 查询最新的LLM任务
query = """
SELECT task_id , status , progress , created_at , completed_at , error_message
FROM llm_tasks
WHERE meeting_id = % s
ORDER BY created_at DESC
LIMIT 1
"""
cursor . execute ( query , ( meeting_id , ) )
task_record = cursor . fetchone ( )
cursor . close ( )
if not task_record :
return None
# 如果任务还在进行中,获取最新状态
if task_record [ ' status ' ] in [ ' pending ' , ' processing ' ] :
try :
return self . get_task_status ( task_record [ ' task_id ' ] )
except Exception as e :
print ( f " Failed to get latest LLM task status for meeting { meeting_id } , returning DB status. Error: { e } " )
return {
' task_id ' : task_record [ ' task_id ' ] ,
' status ' : task_record [ ' status ' ] ,
' progress ' : task_record [ ' progress ' ] or 0 ,
' meeting_id ' : meeting_id ,
' created_at ' : task_record [ ' created_at ' ] . isoformat ( ) if task_record [ ' created_at ' ] else None ,
' completed_at ' : task_record [ ' completed_at ' ] . isoformat ( ) if task_record [ ' completed_at ' ] else None ,
' error_message ' : task_record [ ' error_message ' ]
}
except Exception as e :
print ( f " Error getting meeting LLM status: { e } " )
return None
def _update_task_status_in_redis ( self , task_id : str , status : str , progress : int , message : str = None , result : str = None , error_message : str = None ) :
""" 更新Redis中的任务状态 """
try :
update_data = {
' status ' : status ,
' progress ' : str ( progress ) ,
' updated_at ' : datetime . now ( ) . isoformat ( )
}
if message : update_data [ ' message ' ] = message
if result : update_data [ ' result ' ] = result
if error_message : update_data [ ' error_message ' ] = error_message
self . redis_client . hset ( f " llm_task: { task_id } " , mapping = update_data )
except Exception as e :
print ( f " Error updating task status in Redis: { e } " )
def _save_task_to_db ( self , task_id : str , meeting_id : int , user_prompt : str , prompt_id : Optional [ int ] = None ) :
""" 保存任务到数据库
Args :
task_id : 任务ID
meeting_id : 会议ID
user_prompt : 用户额外提示词
prompt_id : 可选的提示词模版ID , 如果为None则使用默认模版
"""
try :
with get_db_connection ( ) as connection :
cursor = connection . cursor ( )
insert_query = " INSERT INTO llm_tasks (task_id, meeting_id, user_prompt, prompt_id, status, progress, created_at) VALUES ( %s , %s , %s , %s , ' pending ' , 0, NOW()) "
cursor . execute ( insert_query , ( task_id , meeting_id , user_prompt , prompt_id ) )
connection . commit ( )
print ( f " [Meeting Service] Task saved successfully to database " )
except Exception as e :
print ( f " Error saving task to database: { e } " )
raise
def _update_task_in_db ( self , task_id : str , status : str , progress : int , result : str = None , error_message : str = None ) :
""" 更新数据库中的任务状态 """
try :
with get_db_connection ( ) as connection :
cursor = connection . cursor ( )
if status == ' completed ' :
2026-03-26 06:55:12 +00:00
query = " UPDATE llm_tasks SET status = %s , progress = %s , result = %s , error_message = NULL, completed_at = NOW() WHERE task_id = %s "
params = ( status , progress , result , task_id )
2026-01-19 11:03:08 +00:00
else :
query = " UPDATE llm_tasks SET status = %s , progress = %s , error_message = %s WHERE task_id = %s "
2026-03-26 06:55:12 +00:00
params = ( status , progress , error_message , task_id )
2026-01-19 11:03:08 +00:00
2026-03-26 06:55:12 +00:00
cursor . execute ( query , params )
2026-01-19 11:03:08 +00:00
connection . commit ( )
except Exception as e :
print ( f " Error updating task in database: { e } " )
def _get_task_from_db ( self , task_id : str ) - > Optional [ Dict [ str , str ] ] :
""" 从数据库获取任务信息 """
try :
with get_db_connection ( ) as connection :
cursor = connection . cursor ( dictionary = True )
query = " SELECT * FROM llm_tasks WHERE task_id = %s "
cursor . execute ( query , ( task_id , ) )
task = cursor . fetchone ( )
if task :
# 确保所有字段都是字符串, 以匹配Redis的行为
return { k : v . isoformat ( ) if isinstance ( v , datetime ) else str ( v ) for k , v in task . items ( ) }
return None
except Exception as e :
print ( f " Error getting task from database: { e } " )
return None
def _get_existing_summary_task ( self , meeting_id : int ) - > Optional [ str ] :
"""
检查会议是否已经有总结任务 ( 用于并发控制 )
返回最新的pending或processing状态的任务ID , 如果没有则返回None
"""
try :
with get_db_connection ( ) as connection :
cursor = connection . cursor ( dictionary = True )
query = """
SELECT task_id FROM llm_tasks
WHERE meeting_id = % s AND status IN ( ' pending ' , ' processing ' )
ORDER BY created_at DESC
LIMIT 1
"""
cursor . execute ( query , ( meeting_id , ) )
result = cursor . fetchone ( )
cursor . close ( )
if result :
return result [ ' task_id ' ]
return None
except Exception as e :
print ( f " Error checking existing summary task: { e } " )
return None
# 创建全局实例
async_meeting_service = AsyncMeetingService ( )