375 lines
15 KiB
Python
375 lines
15 KiB
Python
|
|
import uuid
|
|||
|
|
import json
|
|||
|
|
import redis
|
|||
|
|
import requests
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import Optional, Dict, Any
|
|||
|
|
from http import HTTPStatus
|
|||
|
|
|
|||
|
|
import dashscope
|
|||
|
|
from dashscope.audio.asr import Transcription
|
|||
|
|
|
|||
|
|
from app.core.config import QWEN_API_KEY, REDIS_CONFIG, APP_CONFIG
|
|||
|
|
from app.core.database import get_db_connection
|
|||
|
|
|
|||
|
|
|
|||
|
|
class AsyncTranscriptionService:
|
|||
|
|
"""异步转录服务类"""
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
dashscope.api_key = QWEN_API_KEY
|
|||
|
|
self.redis_client = redis.Redis(**REDIS_CONFIG)
|
|||
|
|
self.base_url = APP_CONFIG['base_url']
|
|||
|
|
|
|||
|
|
def start_transcription(self, meeting_id: int, audio_file_path: str) -> str:
|
|||
|
|
"""
|
|||
|
|
启动异步转录任务
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
meeting_id: 会议ID
|
|||
|
|
audio_file_path: 音频文件相对路径
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
str: 业务任务ID
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# 构造完整的文件URL
|
|||
|
|
file_url = f"{self.base_url}{audio_file_path}"
|
|||
|
|
|
|||
|
|
print(f"Starting transcription for meeting_id: {meeting_id}, file_url: {file_url}")
|
|||
|
|
|
|||
|
|
# 调用Paraformer异步API
|
|||
|
|
task_response = Transcription.async_call(
|
|||
|
|
model='paraformer-v2',
|
|||
|
|
file_urls=[file_url],
|
|||
|
|
language_hints=['zh', 'en'],
|
|||
|
|
disfluency_removal_enabled=True,
|
|||
|
|
diarization_enabled=True,
|
|||
|
|
speaker_count=10
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if task_response.status_code != HTTPStatus.OK:
|
|||
|
|
print(f"Failed to start transcription: {task_response.status_code}, {task_response.message}")
|
|||
|
|
raise Exception(f"Transcription API error: {task_response.message}")
|
|||
|
|
|
|||
|
|
paraformer_task_id = task_response.output.task_id
|
|||
|
|
business_task_id = str(uuid.uuid4())
|
|||
|
|
|
|||
|
|
# 在Redis中存储任务映射
|
|||
|
|
task_data = {
|
|||
|
|
'business_task_id': business_task_id,
|
|||
|
|
'paraformer_task_id': paraformer_task_id,
|
|||
|
|
'meeting_id': str(meeting_id),
|
|||
|
|
'file_url': file_url,
|
|||
|
|
'status': 'pending',
|
|||
|
|
'progress': '0',
|
|||
|
|
'created_at': datetime.now().isoformat(),
|
|||
|
|
'updated_at': datetime.now().isoformat()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 存储到Redis,过期时间24小时
|
|||
|
|
self.redis_client.hset(f"task:{business_task_id}", mapping=task_data)
|
|||
|
|
self.redis_client.expire(f"task:{business_task_id}", 86400)
|
|||
|
|
|
|||
|
|
# 在数据库中创建任务记录
|
|||
|
|
self._save_task_to_db(business_task_id, meeting_id, audio_file_path)
|
|||
|
|
|
|||
|
|
print(f"Transcription task created: {business_task_id}")
|
|||
|
|
return business_task_id
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Error starting transcription: {e}")
|
|||
|
|
raise e
|
|||
|
|
|
|||
|
|
def get_task_status(self, business_task_id: str) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
获取任务状态
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
business_task_id: 业务任务ID
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Dict: 任务状态信息
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# 从Redis获取任务信息
|
|||
|
|
task_data = self.redis_client.hgetall(f"task:{business_task_id}")
|
|||
|
|
if not task_data:
|
|||
|
|
# 尝试从数据库获取
|
|||
|
|
task_data = self._get_task_from_db(business_task_id)
|
|||
|
|
if not task_data:
|
|||
|
|
raise Exception("Task not found")
|
|||
|
|
|
|||
|
|
paraformer_task_id = task_data['paraformer_task_id']
|
|||
|
|
|
|||
|
|
# 查询Paraformer任务状态
|
|||
|
|
paraformer_response = Transcription.fetch(task=paraformer_task_id)
|
|||
|
|
|
|||
|
|
if paraformer_response.status_code != HTTPStatus.OK:
|
|||
|
|
print(f"Failed to fetch task status: {paraformer_response.message}")
|
|||
|
|
current_status = 'failed'
|
|||
|
|
progress = 0
|
|||
|
|
error_message = paraformer_response.message
|
|||
|
|
else:
|
|||
|
|
# 映射Paraformer状态到业务状态
|
|||
|
|
paraformer_status = paraformer_response.output.task_status
|
|||
|
|
current_status = self._map_paraformer_status(paraformer_status)
|
|||
|
|
progress = self._calculate_progress(paraformer_status)
|
|||
|
|
error_message = None
|
|||
|
|
|
|||
|
|
# 如果任务完成,处理结果
|
|||
|
|
if current_status == 'completed' and paraformer_response.output.get('results'):
|
|||
|
|
self._process_transcription_result(
|
|||
|
|
business_task_id,
|
|||
|
|
int(task_data['meeting_id']),
|
|||
|
|
paraformer_response.output
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 更新Redis中的状态
|
|||
|
|
update_data = {
|
|||
|
|
'status': current_status,
|
|||
|
|
'progress': str(progress),
|
|||
|
|
'updated_at': datetime.now().isoformat()
|
|||
|
|
}
|
|||
|
|
if error_message:
|
|||
|
|
update_data['error_message'] = error_message
|
|||
|
|
|
|||
|
|
self.redis_client.hset(f"task:{business_task_id}", mapping=update_data)
|
|||
|
|
|
|||
|
|
# 更新数据库中的状态
|
|||
|
|
self._update_task_status_in_db(business_task_id, current_status, progress, error_message)
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
'task_id': business_task_id,
|
|||
|
|
'status': current_status,
|
|||
|
|
'progress': progress,
|
|||
|
|
'meeting_id': int(task_data['meeting_id']),
|
|||
|
|
'created_at': task_data.get('created_at'),
|
|||
|
|
'updated_at': update_data['updated_at'],
|
|||
|
|
'error_message': error_message
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Error getting task status: {e}")
|
|||
|
|
raise e
|
|||
|
|
|
|||
|
|
def get_meeting_transcription_status(self, meeting_id: int) -> Optional[Dict[str, Any]]:
|
|||
|
|
"""
|
|||
|
|
获取会议的转录任务状态
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
meeting_id: 会议ID
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Optional[Dict]: 任务状态信息,如果没有任务返回None
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
with get_db_connection() as connection:
|
|||
|
|
cursor = connection.cursor(dictionary=True)
|
|||
|
|
|
|||
|
|
# 查询最新的转录任务
|
|||
|
|
query = """
|
|||
|
|
SELECT task_id, status, progress, created_at, completed_at, error_message
|
|||
|
|
FROM transcript_tasks
|
|||
|
|
WHERE meeting_id = %s
|
|||
|
|
ORDER BY created_at DESC
|
|||
|
|
LIMIT 1
|
|||
|
|
"""
|
|||
|
|
cursor.execute(query, (meeting_id,))
|
|||
|
|
task_record = cursor.fetchone()
|
|||
|
|
|
|||
|
|
# 关闭游标
|
|||
|
|
cursor.close()
|
|||
|
|
|
|||
|
|
if not task_record:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# 如果任务还在进行中,获取最新状态
|
|||
|
|
if task_record['status'] in ['pending', 'processing']:
|
|||
|
|
try:
|
|||
|
|
latest_status = self.get_task_status(task_record['task_id'])
|
|||
|
|
return latest_status
|
|||
|
|
except Exception:
|
|||
|
|
# 如果获取最新状态失败,返回数据库中的状态
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
'task_id': task_record['task_id'],
|
|||
|
|
'status': task_record['status'],
|
|||
|
|
'progress': task_record['progress'] or 0,
|
|||
|
|
'meeting_id': meeting_id,
|
|||
|
|
'created_at': task_record['created_at'].isoformat() if task_record['created_at'] else None,
|
|||
|
|
'completed_at': task_record['completed_at'].isoformat() if task_record['completed_at'] else None,
|
|||
|
|
'error_message': task_record['error_message']
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Error getting meeting transcription status: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def _map_paraformer_status(self, paraformer_status: str) -> str:
|
|||
|
|
"""映射Paraformer状态到业务状态"""
|
|||
|
|
status_mapping = {
|
|||
|
|
'PENDING': 'pending',
|
|||
|
|
'RUNNING': 'processing',
|
|||
|
|
'SUCCEEDED': 'completed',
|
|||
|
|
'FAILED': 'failed'
|
|||
|
|
}
|
|||
|
|
return status_mapping.get(paraformer_status, 'unknown')
|
|||
|
|
|
|||
|
|
def _calculate_progress(self, paraformer_status: str) -> int:
|
|||
|
|
"""根据Paraformer状态计算进度"""
|
|||
|
|
progress_mapping = {
|
|||
|
|
'PENDING': 0,
|
|||
|
|
'RUNNING': 50,
|
|||
|
|
'SUCCEEDED': 100,
|
|||
|
|
'FAILED': 0
|
|||
|
|
}
|
|||
|
|
return progress_mapping.get(paraformer_status, 0)
|
|||
|
|
|
|||
|
|
def _save_task_to_db(self, business_task_id: str, meeting_id: int, audio_file_path: str):
|
|||
|
|
"""保存任务记录到数据库"""
|
|||
|
|
try:
|
|||
|
|
with get_db_connection() as connection:
|
|||
|
|
cursor = connection.cursor()
|
|||
|
|
|
|||
|
|
# 更新audio_files表关联task_id
|
|||
|
|
update_audio_query = """
|
|||
|
|
UPDATE audio_files
|
|||
|
|
SET task_id = %s
|
|||
|
|
WHERE meeting_id = %s AND file_path = %s
|
|||
|
|
"""
|
|||
|
|
cursor.execute(update_audio_query, (business_task_id, meeting_id, audio_file_path))
|
|||
|
|
|
|||
|
|
# 插入转录任务记录
|
|||
|
|
insert_task_query = """
|
|||
|
|
INSERT INTO transcript_tasks (task_id, meeting_id, status, progress, created_at)
|
|||
|
|
VALUES (%s, %s, 'pending', 0, NOW())
|
|||
|
|
"""
|
|||
|
|
cursor.execute(insert_task_query, (business_task_id, meeting_id))
|
|||
|
|
|
|||
|
|
connection.commit()
|
|||
|
|
cursor.close() # 明确关闭游标
|
|||
|
|
print(f"Task record saved to database: {business_task_id}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Error saving task to database: {e}")
|
|||
|
|
raise e
|
|||
|
|
|
|||
|
|
def _update_task_status_in_db(self, business_task_id: str, status: str, progress: int, error_message: Optional[str] = None):
|
|||
|
|
"""更新数据库中的任务状态"""
|
|||
|
|
try:
|
|||
|
|
with get_db_connection() as connection:
|
|||
|
|
cursor = connection.cursor()
|
|||
|
|
|
|||
|
|
if status == 'completed':
|
|||
|
|
update_query = """
|
|||
|
|
UPDATE transcript_tasks
|
|||
|
|
SET status = %s, progress = %s, completed_at = NOW(), error_message = %s
|
|||
|
|
WHERE task_id = %s
|
|||
|
|
"""
|
|||
|
|
else:
|
|||
|
|
update_query = """
|
|||
|
|
UPDATE transcript_tasks
|
|||
|
|
SET status = %s, progress = %s, error_message = %s
|
|||
|
|
WHERE task_id = %s
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
cursor.execute(update_query, (status, progress, error_message, business_task_id))
|
|||
|
|
connection.commit()
|
|||
|
|
cursor.close() # 明确关闭游标
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Error updating task status in database: {e}")
|
|||
|
|
|
|||
|
|
def _get_task_from_db(self, business_task_id: str) -> Optional[Dict[str, str]]:
|
|||
|
|
"""从数据库获取任务信息"""
|
|||
|
|
try:
|
|||
|
|
with get_db_connection() as connection:
|
|||
|
|
cursor = connection.cursor(dictionary=True)
|
|||
|
|
|
|||
|
|
query = """
|
|||
|
|
SELECT tt.task_id as business_task_id, tt.meeting_id, tt.status
|
|||
|
|
FROM transcript_tasks tt
|
|||
|
|
WHERE tt.task_id = %s
|
|||
|
|
"""
|
|||
|
|
cursor.execute(query, (business_task_id,))
|
|||
|
|
result = cursor.fetchone()
|
|||
|
|
cursor.close() # 明确关闭游标
|
|||
|
|
|
|||
|
|
if result:
|
|||
|
|
# 转换为字符串格式以保持一致性
|
|||
|
|
return {
|
|||
|
|
'business_task_id': result['business_task_id'],
|
|||
|
|
'paraformer_task_id': '', # 数据库中没有存储,需要从Redis获取
|
|||
|
|
'meeting_id': str(result['meeting_id']),
|
|||
|
|
'status': result['status']
|
|||
|
|
}
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Error getting task from database: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def _process_transcription_result(self, business_task_id: str, meeting_id: int, paraformer_output: Any):
|
|||
|
|
"""处理转录结果"""
|
|||
|
|
try:
|
|||
|
|
if not paraformer_output.get('results'):
|
|||
|
|
print("No transcription results found in the response.")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
transcription_url = paraformer_output['results'][0]['transcription_url']
|
|||
|
|
print(f"Fetching transcription from URL: {transcription_url}")
|
|||
|
|
|
|||
|
|
response = requests.get(transcription_url)
|
|||
|
|
response.raise_for_status()
|
|||
|
|
transcription_data = response.json()
|
|||
|
|
|
|||
|
|
# 保存转录内容到数据库
|
|||
|
|
self._save_segments_to_db(transcription_data, meeting_id)
|
|||
|
|
|
|||
|
|
print(f"Transcription result processed for task: {business_task_id}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Error processing transcription result: {e}")
|
|||
|
|
|
|||
|
|
def _save_segments_to_db(self, data: dict, meeting_id: int):
|
|||
|
|
"""保存转录分段到数据库"""
|
|||
|
|
segments_to_insert = []
|
|||
|
|
for transcript in data.get('transcripts', []):
|
|||
|
|
for sentence in transcript.get('sentences', []):
|
|||
|
|
speaker_id = sentence.get('speaker_id', -1)
|
|||
|
|
segments_to_insert.append((
|
|||
|
|
meeting_id,
|
|||
|
|
speaker_id,
|
|||
|
|
f"发言人 {speaker_id}", # 默认speaker_tag
|
|||
|
|
sentence.get('begin_time'),
|
|||
|
|
sentence.get('end_time'),
|
|||
|
|
sentence.get('text')
|
|||
|
|
))
|
|||
|
|
|
|||
|
|
if not segments_to_insert:
|
|||
|
|
print("No segments to save.")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
with get_db_connection() as connection:
|
|||
|
|
cursor = connection.cursor()
|
|||
|
|
|
|||
|
|
# 清除该会议的现有转录分段
|
|||
|
|
delete_query = "DELETE FROM transcript_segments WHERE meeting_id = %s"
|
|||
|
|
cursor.execute(delete_query, (meeting_id,))
|
|||
|
|
print(f"Deleted existing segments for meeting_id: {meeting_id}")
|
|||
|
|
|
|||
|
|
# 插入新的转录分段
|
|||
|
|
insert_query = '''
|
|||
|
|
INSERT INTO transcript_segments (meeting_id, speaker_id, speaker_tag, start_time_ms, end_time_ms, text_content)
|
|||
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|||
|
|
'''
|
|||
|
|
cursor.executemany(insert_query, segments_to_insert)
|
|||
|
|
connection.commit()
|
|||
|
|
cursor.close() # 明确关闭游标
|
|||
|
|
print(f"Successfully saved {len(segments_to_insert)} segments to the database for meeting_id: {meeting_id}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Database error when saving segments: {e}")
|