refactor: 优化 ASR 和总结任务调度逻辑并移除未使用的线程池配置

- 移除 `@Async` 注解和未使用的 `asrDispatchExecutor` 配置
- 更新 `dispatchTasks`、`dispatchChapterTask` 和 `dispatchSummaryTask` 方法,添加日志记录和线程池检查
- 在 `doDispatchTasks`、`doDispatchChapterTask` 和 `doDispatchSummaryTask` 方法中添加详细的日志记录和异常处理
- 调整 `MeetingAsyncExecutorConfig` 中的线程池配置,移除 `asrDispatchExecutor` 和 `summaryDispatchExecutor`,更新 `asrTaskExecutor` 和 `summaryTaskExecutor` 的配置
dev_na
chenhao 2026-06-12 16:34:15 +08:00
parent c64c8b5690
commit fee7f2a0b3
3 changed files with 114 additions and 40 deletions

View File

@ -10,48 +10,27 @@ import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class MeetingAsyncExecutorConfig {
@Bean("asrDispatchExecutor")
public Executor asrDispatchExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(128);
executor.setThreadNamePrefix("imeeting-asr-dispatch-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean("asrTaskExecutor")
public Executor asrTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(0);
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(20);
executor.setThreadNamePrefix("imeeting-asr-worker-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean("summaryDispatchExecutor")
public Executor summaryDispatchExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(128);
executor.setThreadNamePrefix("imeeting-summary-dispatch-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean("summaryTaskExecutor")
public Executor summaryTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(256);
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(20);
executor.setThreadNamePrefix("imeeting-summary-worker-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();

View File

@ -2,7 +2,7 @@ package com.imeeting.service.android.legacy.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.imeeting.common.MeetingConstants;
import com.imeeting.common.exception.ExistingOfflineMeetingException;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidOfflineMeetingCreateCommand;
import com.imeeting.dto.android.legacy.LegacyMeetingCreateRequest;

View File

@ -38,7 +38,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -145,10 +144,13 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
}
@Override
@Async("asrDispatchExecutor")
public void dispatchTasks(Long meetingId, Long tenantId, Long userId) {
log.info("提交ASR异步任务: meetingId={}, tenantId={}, userId={}, executorReady={}",
meetingId, tenantId, userId, asrTaskExecutor != null);
Runnable task = () -> taskSecurityContextRunner.runAsTenantUser(tenantId, userId, () -> doDispatchTasks(meetingId));
if (asrTaskExecutor == null) {
log.warn("ASR线程池未配置当前线程直接执行: meetingId={}, tenantId={}, userId={}",
meetingId, tenantId, userId);
task.run();
return;
}
@ -183,25 +185,38 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
private void doDispatchTasks(Long meetingId) {
long startMillis = System.currentTimeMillis();
log.info("[ASR-FLOW] 开始执行ASR任务: meetingId={}, thread={}", meetingId, Thread.currentThread().getName());
boolean acquired = meetingLockCache.tryAcquirePollingLock(meetingId, Duration.ofMinutes(30));
if (!acquired) {
log.warn("Meeting {} is already being processed", meetingId);
log.warn("[ASR-FLOW] 获取轮询锁失败,会议正在处理中,跳过本次执行: meetingId={}", meetingId);
return;
}
log.info("[ASR-FLOW] 已获取轮询锁: meetingId={}", meetingId);
try {
Meeting meeting = meetingMapper.selectById(meetingId);
if (meeting == null) return;
if (meeting == null) {
log.warn("[ASR-FLOW] 会议不存在终止ASR流程: meetingId={}", meetingId);
return;
}
AiTask asrTask = this.getOne(new LambdaQueryWrapper<AiTask>()
.eq(AiTask::getMeetingId, meetingId)
.eq(AiTask::getTaskType, "ASR")
.orderByDesc(AiTask::getId)
.last("limit 1"));
log.info("[ASR-FLOW] 加载ASR任务: meetingId={}, asrTaskId={}, status={}, audioUrlPresent={}",
meetingId,
asrTask == null ? null : asrTask.getId(),
asrTask == null ? null : asrTask.getStatus(),
meeting.getAudioUrl() != null && !meeting.getAudioUrl().isBlank());
if (asrTask != null) {
if (Integer.valueOf(1).equals(asrTask.getStatus())) {
if (!prepareRunningAsrTaskForRecovery(meeting, asrTask)) {
log.info("[ASR-FLOW] RUNNING状态ASR任务恢复检查未通过已重新排队终止本次执行: meetingId={}, asrTaskId={}",
meetingId, asrTask.getId());
return;
}
} else if (Integer.valueOf(0).equals(asrTask.getStatus())) {
@ -209,6 +224,7 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
asrTask.setQueuedAt(LocalDateTime.now());
this.updateById(asrTask);
}
log.info("[ASR-FLOW] ASR任务处于排队状态等待调度执行: meetingId={}, asrTaskId={}", meetingId, asrTask.getId());
meetingProgressService.markQueued(meetingId, asrTask, 1, "ASR queued and waiting for execution");
return;
}
@ -216,12 +232,17 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
String asrText = "";
if (asrTask != null && canExecuteTask(asrTask)) {
log.info("[ASR-FLOW] 开始处理ASR识别任务: meetingId={}, asrTaskId={}", meetingId, asrTask.getId());
asrText = processAsrTask(meeting, asrTask);
log.info("[ASR-FLOW] ASR识别任务处理完成: meetingId={}, asrTaskId={}, transcriptLength={}",
meetingId, asrTask.getId(), asrText == null ? 0 : asrText.length());
} else {
List<MeetingTranscript> transcripts = transcriptMapper.selectList(new LambdaQueryWrapper<MeetingTranscript>()
.eq(MeetingTranscript::getMeetingId, meetingId)
.orderByAsc(MeetingTranscript::getStartTime));
asrText = buildTranscriptText(transcripts);
log.info("[ASR-FLOW] 无可执行ASR任务使用已有转录记录: meetingId={}, transcriptCount={}, transcriptLength={}",
meetingId, transcripts.size(), asrText == null ? 0 : asrText.length());
}
// Real-time meetings are created without audio files and without ASR tasks.
@ -230,6 +251,7 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
if ((meeting.getAudioUrl() == null || meeting.getAudioUrl().isBlank())
&& asrTask == null
&& (asrText == null || asrText.isBlank())) {
log.info("[ASR-FLOW] 实时会议尚无音频和转录,保持等待状态: meetingId={}", meetingId);
updateProgress(meetingId, 0, "等待实时识别开始...", 0);
return;
}
@ -237,12 +259,18 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
AiTask chapterTask = findLatestTask(meetingId, "CHAPTER");
AiTask sumTask = findLatestTask(meetingId, "SUMMARY");
if (asrText == null || asrText.isBlank()) {
log.warn("[ASR-FLOW] 未识别到可用转录内容,标记会议失败并跳过总结: meetingId={}, sumTaskId={}",
meetingId, sumTask == null ? null : sumTask.getId());
failPendingSummaryTask(sumTask, "没有可用于总结的转录内容");
updateMeetingStatus(meetingId, 4);
updateProgress(meetingId, -1, "未识别到可用于总结的转录内容", 0);
return;
}
if (!asrText.isBlank()) {
log.info("[ASR-FLOW] 转写完成,准备派发章节与总结任务: meetingId={}, chapterTaskId={}, sumTaskId={}",
meetingId,
chapterTask == null ? null : chapterTask.getId(),
sumTask == null ? null : sumTask.getId());
meetingProgressService.markStage(meetingId, asrTask, 1, MeetingProgressStage.ASR_COMPLETED, 80, "转写完成,准备生成总结", 0);
scheduleQueuedAsrTasks();
self.dispatchChapterTask(meetingId, meeting.getTenantId(), meeting.getCreatorId());
@ -254,21 +282,26 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
}
reconcileMeetingStatus(meetingId);
} catch (Exception e) {
log.error("Meeting {} AI Task Flow failed", meetingId, e);
log.error("[ASR-FLOW] ASR任务流程执行异常标记会议失败: meetingId={}", meetingId, e);
failPendingSummaryTask(findLatestSummaryTask(meetingId), "转录失败,已跳过总结任务: " + e.getMessage());
updateMeetingStatus(meetingId, 4);
updateProgress(meetingId, -1, "分析失败: " + e.getMessage(), 0);
} finally {
meetingLockCache.releasePollingLock(meetingId);
scheduleQueuedAsrTasks();
log.info("[ASR-FLOW] ASR任务流程结束已释放轮询锁: meetingId={}, costMs={}",
meetingId, System.currentTimeMillis() - startMillis);
}
}
@Override
@Async("summaryDispatchExecutor")
public void dispatchChapterTask(Long meetingId, Long tenantId, Long userId) {
log.info("提交章节异步任务: meetingId={}, tenantId={}, userId={}, executorReady={}",
meetingId, tenantId, userId, summaryTaskExecutor != null);
Runnable task = () -> taskSecurityContextRunner.runAsTenantUser(tenantId, userId, () -> doDispatchChapterTask(meetingId));
if (summaryTaskExecutor == null) {
log.warn("总结线程池未配置,章节任务改为当前线程执行: meetingId={}, tenantId={}, userId={}",
meetingId, tenantId, userId);
task.run();
return;
}
@ -276,24 +309,36 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
}
private void doDispatchChapterTask(Long meetingId) {
long startMillis = System.currentTimeMillis();
log.info("[CHAPTER-FLOW] 开始执行章节任务: meetingId={}, thread={}", meetingId, Thread.currentThread().getName());
Meeting meeting = meetingMapper.selectById(meetingId);
if (meeting == null) {
log.warn("[CHAPTER-FLOW] 会议不存在,终止章节流程: meetingId={}", meetingId);
return;
}
AiTask chapterTask = findLatestTask(meetingId, "CHAPTER");
if (chapterTask == null || !canExecuteTask(chapterTask)) {
log.info("[CHAPTER-FLOW] 无可执行章节任务,跳过并对账会议状态: meetingId={}, chapterTaskId={}, status={}",
meetingId,
chapterTask == null ? null : chapterTask.getId(),
chapterTask == null ? null : chapterTask.getStatus());
reconcileMeetingStatus(meetingId);
return;
}
executeChapterFlow(meeting, chapterTask);
reconcileMeetingStatus(meetingId);
log.info("[CHAPTER-FLOW] 章节任务流程结束: meetingId={}, chapterTaskId={}, costMs={}",
meetingId, chapterTask.getId(), System.currentTimeMillis() - startMillis);
}
@Override
@Async("summaryDispatchExecutor")
public void dispatchSummaryTask(Long meetingId, Long tenantId, Long userId) {
log.info("提交总结异步任务: meetingId={}, tenantId={}, userId={}, executorReady={}",
meetingId, tenantId, userId, summaryTaskExecutor != null);
Runnable task = () -> taskSecurityContextRunner.runAsTenantUser(tenantId, userId, () -> doDispatchSummaryTask(meetingId));
if (summaryTaskExecutor == null) {
log.warn("总结线程池未配置,总结任务改为当前线程执行: meetingId={}, tenantId={}, userId={}",
meetingId, tenantId, userId);
task.run();
return;
}
@ -301,20 +346,33 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
}
private void doDispatchSummaryTask(Long meetingId) {
long startMillis = System.currentTimeMillis();
log.info("[SUMMARY-FLOW] 开始执行总结任务: meetingId={}, thread={}", meetingId, Thread.currentThread().getName());
Meeting meeting = meetingMapper.selectById(meetingId);
if (meeting == null) {
log.warn("[SUMMARY-FLOW] 会议不存在,终止总结流程: meetingId={}", meetingId);
return;
}
AiTask sumTask = findLatestTask(meetingId, "SUMMARY");
try {
if (sumTask != null && canExecuteTask(sumTask)) {
log.info("[SUMMARY-FLOW] 开始执行总结流程: meetingId={}, sumTaskId={}", meetingId, sumTask.getId());
executeSummaryFlow(meeting, sumTask);
} else {
log.info("[SUMMARY-FLOW] 无可执行总结任务,仅对账会议状态: meetingId={}, sumTaskId={}, status={}",
meetingId,
sumTask == null ? null : sumTask.getId(),
sumTask == null ? null : sumTask.getStatus());
}
reconcileMeetingStatus(meetingId);
} catch (Exception e) {
log.error("Re-summary failed for meeting {}", meetingId, e);
log.error("[SUMMARY-FLOW] 总结任务执行异常: meetingId={}, sumTaskId={}",
meetingId, sumTask == null ? null : sumTask.getId(), e);
failPendingSummaryTask(sumTask, e.getMessage());
reconcileMeetingStatus(meetingId);
} finally {
log.info("[SUMMARY-FLOW] 总结任务流程结束: meetingId={}, costMs={}",
meetingId, System.currentTimeMillis() - startMillis);
}
}
@ -501,6 +559,8 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
Long asrModelId = Long.valueOf(taskRecord.getTaskConfig().get("asrModelId").toString());
AiModelVO asrModel = aiModelService.getModelById(asrModelId, "ASR");
if (asrModel == null) throw new RuntimeException("ASR模型配置不存在");
log.info("[ASR-PROC] 解析ASR模型成功: meetingId={}, asrTaskId={}, asrModelId={}, baseUrl={}",
meeting.getId(), taskRecord.getId(), asrModelId, asrModel.getBaseUrl());
String submitUrl = appendPath(asrModel.getBaseUrl(), "api/v1/asr/transcriptions");
String taskId = taskRecord.getResponseData() != null
@ -508,8 +568,12 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
: "";
if (taskId != null && !taskId.isBlank()) {
log.info("[ASR-PROC] 检测到已有外部任务,尝试恢复轮询: meetingId={}, asrTaskId={}, externalTaskId={}",
meeting.getId(), taskRecord.getId(), taskId);
updateProgress(meeting.getId(), 5, "Resuming ASR polling...", 0);
if (!canResumeAsrTask(asrModel, meeting.getId(), taskId)) {
log.info("[ASR-PROC] 外部任务无法恢复,将重新提交: meetingId={}, asrTaskId={}, externalTaskId={}",
meeting.getId(), taskRecord.getId(), taskId);
clearAsrTaskId(taskRecord);
taskId = "";
}
@ -517,9 +581,13 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
if (taskId == null || taskId.isBlank()) {
taskId = submitAsrTask(meeting, taskRecord, asrModel, submitUrl);
log.info("[ASR-PROC] 已提交新的外部ASR任务: meetingId={}, asrTaskId={}, externalTaskId={}",
meeting.getId(), taskRecord.getId(), taskId);
}
this.updateById(taskRecord);
String queryUrl = appendPath(asrModel.getBaseUrl(), "api/v1/asr/transcriptions/" + taskId);
log.info("[ASR-PROC] 开始轮询ASR结果: meetingId={}, asrTaskId={}, externalTaskId={}, queryUrl={}",
meeting.getId(), taskRecord.getId(), taskId, queryUrl);
// 轮询逻辑(带防卡死防护)
JsonNode resultNode = null;
@ -532,6 +600,8 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
JsonNode statusNode = objectMapper.readTree(queryResp);
int code = statusNode.path("code").asInt(500);
if (code!=0){
log.warn("[ASR-PROC] ASR引擎返回错误码任务失败: meetingId={}, asrTaskId={}, externalTaskId={}, code={}, response={}",
meeting.getId(), taskRecord.getId(), taskId, code, queryResp);
updateAiTaskFail(taskRecord, "ASR 引擎返回失败:" + queryResp);
throw new RuntimeException("ASR引擎处理失败: " + statusNode.get("message").asText());
}
@ -539,10 +609,14 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
String status = data.path("status").asText();
if ("completed".equalsIgnoreCase(status)) {
log.info("[ASR-PROC] ASR识别完成: meetingId={}, asrTaskId={}, externalTaskId={}, polls={}",
meeting.getId(), taskRecord.getId(), taskId, i + 1);
resultNode = extractAsrResultNode(data);
updateAiTaskSuccess(taskRecord, statusNode);
break;
} else if ("failed".equalsIgnoreCase(status)) {
log.warn("[ASR-PROC] ASR引擎返回失败状态: meetingId={}, asrTaskId={}, externalTaskId={}, response={}",
meeting.getId(), taskRecord.getId(), taskId, queryResp);
updateAiTaskFail(taskRecord, "ASR 引擎返回失败:" + queryResp);
throw new RuntimeException("ASR引擎处理失败: " + data.path("message").asText());
} else {
@ -551,18 +625,32 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
updateProgress(meeting.getId(), (int) (currentPercent * 0.85), data.path("message").asText(), eta);
if (currentPercent > 0 && currentPercent == lastPercent) {
if (++unchangedCount > 300) throw new RuntimeException("识别任务长时间无进度增长,自动强制超时");
if (++unchangedCount > 300) {
log.warn("[ASR-PROC] ASR进度长时间无增长强制超时: meetingId={}, asrTaskId={}, externalTaskId={}, stuckPercent={}",
meeting.getId(), taskRecord.getId(), taskId, currentPercent);
throw new RuntimeException("识别任务长时间无进度增长,自动强制超时");
}
} else {
unchangedCount = 0;
}
if (currentPercent != lastPercent) {
log.debug("[ASR-PROC] ASR轮询进度更新: meetingId={}, asrTaskId={}, externalTaskId={}, status={}, percent={}, eta={}",
meeting.getId(), taskRecord.getId(), taskId, status, currentPercent, eta);
}
lastPercent = currentPercent;
}
}
if (resultNode == null) throw new RuntimeException("ASR轮询超时");
if (resultNode == null) {
log.warn("[ASR-PROC] ASR轮询超时达到最大轮询次数: meetingId={}, asrTaskId={}, externalTaskId={}",
meeting.getId(), taskRecord.getId(), taskId);
throw new RuntimeException("ASR轮询超时");
}
// 解析并入库(防御性清理旧数据)
String transcriptText = saveTranscripts(meeting, resultNode);
log.info("[ASR-PROC] 转录已入库: meetingId={}, asrTaskId={}, transcriptLength={}",
meeting.getId(), taskRecord.getId(), transcriptText == null ? 0 : transcriptText.length());
meetingPointsService.recordAsrSuccessCharge(meeting, taskRecord);
return transcriptText;
}
@ -1043,18 +1131,24 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
private void executeSummaryFlow(Meeting meeting, AiTask sumTask) throws Exception {
if (isExternalSummaryModeEnabled()) {
log.info("[SUMMARY-EXEC] 外部总结编排模式触发webhook: meetingId={}, sumTaskId={}",
meeting.getId(), sumTask == null ? null : sumTask.getId());
AiTask chapterTask = findLatestTask(meeting.getId(), "CHAPTER");
triggerExternalSummaryWebhook(meeting, sumTask, chapterTask, "AUTO_AFTER_TRANSCRIPT_READY", false);
return;
}
boolean acquired = meetingLockCache.tryAcquireSummaryLock(meeting.getId(), Duration.ofMinutes(30));
if (!acquired) {
log.warn("Meeting {} summary is already being processed", meeting.getId());
log.warn("[SUMMARY-EXEC] 获取总结锁失败,会议总结正在处理中,跳过: meetingId={}", meeting.getId());
return;
}
log.info("[SUMMARY-EXEC] 已获取总结锁,开始构建总结来源: meetingId={}, sumTaskId={}",
meeting.getId(), sumTask == null ? null : sumTask.getId());
try {
MeetingSummarySource summarySource = buildRawTranscriptSummarySource(meeting);
if (summarySource.getText() == null || summarySource.getText().isBlank()) {
log.warn("[SUMMARY-EXEC] 无转录内容,无法生成总结: meetingId={}, sumTaskId={}",
meeting.getId(), sumTask == null ? null : sumTask.getId());
failPendingSummaryTask(sumTask, "没有转录内容");
reconcileMeetingStatus(meeting.getId());
return;
@ -1063,6 +1157,7 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
reconcileMeetingStatus(meeting.getId());
} finally {
meetingLockCache.releaseSummaryLock(meeting.getId());
log.info("[SUMMARY-EXEC] 已释放总结锁: meetingId={}", meeting.getId());
}
}