feat: 添加 Redis 支持和优化会议进度管理

- 新增 `RedisValueSupport` 类,提供 Redis 操作支持
- 实现 `MeetingProgressServiceImpl` 服务,管理会议进度的读写操作
- 更新 `MeetingMcpToolService` 和 `MeetingCommandServiceImpl` 以使用新的进度服务
- 优化会议进度解析和状态更新逻辑
dev_na
chenhao 2026-05-19 17:49:41 +08:00
parent 7989b6aa11
commit 188809305e
21 changed files with 1378 additions and 178 deletions

View File

@ -0,0 +1,34 @@
package com.imeeting.common;
public enum MeetingProgressStage {
QUEUED("queued", 10, false),
ASR_SUBMITTED("asr_submitted", 20, false),
ASR_RUNNING("asr_running", 30, false),
ASR_COMPLETED("asr_completed", 40, false),
CHAPTER_RUNNING("chapter_running", 50, false),
SUMMARY_RUNNING("summary_running", 60, false),
COMPLETED("completed", 100, true),
FAILED("failed", 100, true);
private final String code;
private final int order;
private final boolean terminal;
MeetingProgressStage(String code, int order, boolean terminal) {
this.code = code;
this.order = order;
this.terminal = terminal;
}
public String getCode() {
return code;
}
public int getOrder() {
return order;
}
public boolean isTerminal() {
return terminal;
}
}

View File

@ -47,6 +47,22 @@ public final class RedisKeys {
return "biz:meeting:summary:lock:" + meetingId;
}
public static String meetingAsrScheduleLockKey() {
return "biz:meeting:asr:schedule:lock";
}
public static String meetingAsrPermitSetKey() {
return "biz:meeting:asr:permit:set";
}
public static String meetingAsrPermitSyncLockKey() {
return "biz:meeting:asr:permit:sync:lock";
}
public static String meetingAsrRefillLockKey() {
return "biz:meeting:asr:refill:lock";
}
public static String realtimeMeetingSocketSessionKey(String sessionToken) {
return "biz:meeting:realtime:socket:" + sessionToken;
}

View File

@ -10,4 +10,5 @@ public final class SysParamKeys {
public static final String MEETING_OFFLINE_AUDIO_MAX_SIZE_MB = "meeting.offline_audio.max_size_mb";
public static final String MEETING_CREATE_OFFLINE_ENABLED = "meeting.create.offline_enabled";
public static final String MEETING_CREATE_REALTIME_ENABLED = "meeting.create.realtime_enabled";
public static final String MEETING_ASR_MAX_CONCURRENT = "meeting.asr.max_concurrent";
}

View File

@ -0,0 +1,60 @@
package com.imeeting.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class MeetingAsyncExecutorConfig {
@Bean("asrDispatchExecutor")
public Executor asrDispatchExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(128);
executor.setThreadNamePrefix("imeeting-asr-dispatch-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean("asrTaskExecutor")
public Executor asrTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(0);
executor.setThreadNamePrefix("imeeting-asr-worker-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean("summaryDispatchExecutor")
public Executor summaryDispatchExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(128);
executor.setThreadNamePrefix("imeeting-summary-dispatch-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean("summaryTaskExecutor")
public Executor summaryTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(256);
executor.setThreadNamePrefix("imeeting-summary-worker-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}

View File

@ -22,9 +22,11 @@ import com.imeeting.service.android.legacy.LegacyMeetingAdapterService;
import com.imeeting.service.biz.AiTaskService;
import com.imeeting.service.biz.MeetingAccessService;
import com.imeeting.service.biz.MeetingCommandService;
import com.imeeting.service.biz.MeetingProgressService;
import com.imeeting.service.biz.MeetingQueryService;
import com.imeeting.service.biz.MeetingService;
import com.imeeting.service.biz.PromptTemplateService;
import com.imeeting.service.biz.impl.RedisOnlyMeetingProgressServiceAdapter;
import com.unisbase.common.ApiResponse;
import com.unisbase.common.annotation.Log;
import com.unisbase.dto.PageResult;
@ -38,7 +40,7 @@ import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
@ -63,7 +65,6 @@ import java.util.stream.Collectors;
@Tag(name = "Android会议接口")
@RestController
@RequestMapping("/api/android/meetings")
@RequiredArgsConstructor
public class AndroidMeetingController {
private static final String STAGE_DATA_INITIALIZATION = "data_initialization";
@ -80,9 +81,60 @@ public class AndroidMeetingController {
private final AiTaskService aiTaskService;
private final PromptTemplateService promptTemplateService;
private final SysUserMapper sysUserMapper;
private final StringRedisTemplate redisTemplate;
private final MeetingProgressService meetingProgressService;
private final ObjectMapper objectMapper;
@Autowired
public AndroidMeetingController(AndroidAuthService androidAuthService,
LegacyMeetingAdapterService legacyMeetingAdapterService,
MeetingQueryService meetingQueryService,
MeetingAccessService meetingAccessService,
MeetingCommandService meetingCommandService,
MeetingService meetingService,
AiTaskService aiTaskService,
PromptTemplateService promptTemplateService,
SysUserMapper sysUserMapper,
MeetingProgressService meetingProgressService,
ObjectMapper objectMapper) {
this.androidAuthService = androidAuthService;
this.legacyMeetingAdapterService = legacyMeetingAdapterService;
this.meetingQueryService = meetingQueryService;
this.meetingAccessService = meetingAccessService;
this.meetingCommandService = meetingCommandService;
this.meetingService = meetingService;
this.aiTaskService = aiTaskService;
this.promptTemplateService = promptTemplateService;
this.sysUserMapper = sysUserMapper;
this.meetingProgressService = meetingProgressService;
this.objectMapper = objectMapper;
}
public AndroidMeetingController(AndroidAuthService androidAuthService,
LegacyMeetingAdapterService legacyMeetingAdapterService,
MeetingQueryService meetingQueryService,
MeetingAccessService meetingAccessService,
MeetingCommandService meetingCommandService,
MeetingService meetingService,
AiTaskService aiTaskService,
PromptTemplateService promptTemplateService,
SysUserMapper sysUserMapper,
StringRedisTemplate redisTemplate,
ObjectMapper objectMapper) {
this(
androidAuthService,
legacyMeetingAdapterService,
meetingQueryService,
meetingAccessService,
meetingCommandService,
meetingService,
aiTaskService,
promptTemplateService,
sysUserMapper,
new RedisOnlyMeetingProgressServiceAdapter(redisTemplate, objectMapper),
objectMapper
);
}
@Operation(summary = "创建Android离线会议")
@ApiResponses({
@io.swagger.v3.oas.annotations.responses.ApiResponse(
@ -254,6 +306,13 @@ public class AndroidMeetingController {
}
Integer realtimeProgress = resolveRealtimeProgress(meetingId);
if (asrTask != null && Integer.valueOf(0).equals(asrTask.getStatus()) && realtimeProgress != null && realtimeProgress <= 0) {
return new LegacyMeetingPreviewResult(
"400",
"会议正在处理中",
buildProcessingPreview(meeting, summaryTask, processingStatus("会议数据准备中", 25, STAGE_DATA_INITIALIZATION))
);
}
if (realtimeProgress != null) {
if (realtimeProgress >= 100) {
MeetingVO completedDetail = detail != null ? detail : meetingQueryService.getDetail(meetingId);
@ -348,16 +407,7 @@ public class AndroidMeetingController {
}
private Integer resolveRealtimeProgress(Long meetingId) {
String rawProgress = redisTemplate.opsForValue().get(RedisKeys.meetingProgressKey(meetingId));
if (rawProgress == null || rawProgress.isBlank()) {
return null;
}
try {
JsonNode progress = objectMapper.readTree(rawProgress);
return progress.hasNonNull("percent") ? progress.path("percent").asInt() : null;
} catch (Exception ignored) {
return null;
}
return meetingProgressService.resolvePercent(meetingId);
}
private String buildFailureMessage(AiTask failedTask, String stageName) {
@ -368,11 +418,11 @@ public class AndroidMeetingController {
}
private boolean isRunningAsr(AiTask task) {
return task != null && (Integer.valueOf(0).equals(task.getStatus()) || Integer.valueOf(1).equals(task.getStatus()));
return task != null && Integer.valueOf(1).equals(task.getStatus());
}
private boolean isRunningSummary(AiTask task) {
return task != null && (Integer.valueOf(0).equals(task.getStatus()) || Integer.valueOf(1).equals(task.getStatus()));
return task != null && Integer.valueOf(1).equals(task.getStatus());
}
private boolean isFailed(AiTask task) {
@ -492,9 +542,9 @@ public class AndroidMeetingController {
}
private boolean isAsrStage(Integer meetingStatus, AiTask asrTask, boolean hasAudio, boolean isSummaryStage) {
return Integer.valueOf(1).equals(meetingStatus)
return (Integer.valueOf(1).equals(meetingStatus) && (asrTask == null || !Integer.valueOf(0).equals(asrTask.getStatus())))
|| isRunningAsr(asrTask)
|| (hasAudio && !isSummaryStage);
|| (asrTask == null && hasAudio && !isSummaryStage);
}
private String formatDateTime(LocalDateTime value) {

View File

@ -26,9 +26,11 @@ import com.imeeting.service.android.legacy.LegacyMeetingAdapterService;
import com.imeeting.service.biz.AiTaskService;
import com.imeeting.service.biz.MeetingAccessService;
import com.imeeting.service.biz.MeetingCommandService;
import com.imeeting.service.biz.MeetingProgressService;
import com.imeeting.service.biz.MeetingQueryService;
import com.imeeting.service.biz.MeetingService;
import com.imeeting.service.biz.PromptTemplateService;
import com.imeeting.service.biz.impl.RedisOnlyMeetingProgressServiceAdapter;
import com.unisbase.common.annotation.Log;
import com.unisbase.dto.PageResult;
import com.unisbase.entity.SysUser;
@ -80,7 +82,7 @@ public class LegacyMeetingController {
private final PromptTemplateService promptTemplateService;
private final MeetingTranscriptMapper meetingTranscriptMapper;
private final SysUserMapper sysUserMapper;
private final StringRedisTemplate redisTemplate;
private final MeetingProgressService meetingProgressService;
private final ObjectMapper objectMapper;
public LegacyMeetingController(LegacyMeetingAdapterService legacyMeetingAdapterService,
@ -101,10 +103,36 @@ public class LegacyMeetingController {
promptTemplateService,
meetingTranscriptMapper,
sysUserMapper,
null,
(MeetingProgressService) null,
new ObjectMapper());
}
public LegacyMeetingController(LegacyMeetingAdapterService legacyMeetingAdapterService,
MeetingQueryService meetingQueryService,
MeetingAccessService meetingAccessService,
MeetingCommandService meetingCommandService,
MeetingService meetingService,
AiTaskService aiTaskService,
PromptTemplateService promptTemplateService,
MeetingTranscriptMapper meetingTranscriptMapper,
SysUserMapper sysUserMapper,
StringRedisTemplate redisTemplate,
ObjectMapper objectMapper) {
this(
legacyMeetingAdapterService,
meetingQueryService,
meetingAccessService,
meetingCommandService,
meetingService,
aiTaskService,
promptTemplateService,
meetingTranscriptMapper,
sysUserMapper,
new RedisOnlyMeetingProgressServiceAdapter(redisTemplate, objectMapper),
objectMapper
);
}
@Autowired
public LegacyMeetingController(LegacyMeetingAdapterService legacyMeetingAdapterService,
MeetingQueryService meetingQueryService,
@ -115,7 +143,7 @@ public class LegacyMeetingController {
PromptTemplateService promptTemplateService,
MeetingTranscriptMapper meetingTranscriptMapper,
SysUserMapper sysUserMapper,
StringRedisTemplate redisTemplate,
MeetingProgressService meetingProgressService,
ObjectMapper objectMapper) {
this.legacyMeetingAdapterService = legacyMeetingAdapterService;
this.meetingQueryService = meetingQueryService;
@ -126,7 +154,7 @@ public class LegacyMeetingController {
this.promptTemplateService = promptTemplateService;
this.meetingTranscriptMapper = meetingTranscriptMapper;
this.sysUserMapper = sysUserMapper;
this.redisTemplate = redisTemplate;
this.meetingProgressService = meetingProgressService;
this.objectMapper = objectMapper;
}
@ -270,6 +298,13 @@ public class LegacyMeetingController {
}
Integer realtimeProgress = resolveRealtimeProgress(meetingId);
if (asrTask != null && Integer.valueOf(0).equals(asrTask.getStatus()) && realtimeProgress != null && realtimeProgress <= 0) {
return new LegacyMeetingPreviewResult(
"400",
"会议正在处理中",
buildProcessingPreview(meeting, summaryTask, processingStatus("会议数据准备中", 25, STAGE_DATA_INITIALIZATION))
);
}
if (realtimeProgress != null) {
if (realtimeProgress >= 100) {
MeetingVO completedDetail = detail != null ? detail : meetingQueryService.getDetail(meetingId);
@ -389,19 +424,10 @@ public class LegacyMeetingController {
}
private Integer resolveRealtimeProgress(Long meetingId) {
if (redisTemplate == null) {
return null;
}
String rawProgress = redisTemplate.opsForValue().get(RedisKeys.meetingProgressKey(meetingId));
if (rawProgress == null || rawProgress.isBlank()) {
return null;
}
try {
JsonNode progress = objectMapper.readTree(rawProgress);
return progress.hasNonNull("percent") ? progress.path("percent").asInt() : null;
} catch (Exception ignored) {
if (meetingProgressService == null) {
return null;
}
return meetingProgressService.resolvePercent(meetingId);
}
private LegacyMeetingProcessingStatusResponse buildListStatus(MeetingVO meeting) {
@ -440,11 +466,11 @@ public class LegacyMeetingController {
}
private boolean isRunningAsr(AiTask task) {
return task != null && (Integer.valueOf(0).equals(task.getStatus()) || Integer.valueOf(1).equals(task.getStatus()));
return task != null && Integer.valueOf(1).equals(task.getStatus());
}
private boolean isRunningSummary(AiTask task) {
return task != null && (Integer.valueOf(0).equals(task.getStatus()) || Integer.valueOf(1).equals(task.getStatus()));
return task != null && Integer.valueOf(1).equals(task.getStatus());
}
private boolean isFailed(AiTask task) {
@ -593,9 +619,9 @@ public class LegacyMeetingController {
}
private boolean isAsrStage(Integer meetingStatus, AiTask asrTask, boolean hasAudio, boolean isSummaryStage) {
return Integer.valueOf(1).equals(meetingStatus)
return (Integer.valueOf(1).equals(meetingStatus) && (asrTask == null || !Integer.valueOf(0).equals(asrTask.getStatus())))
|| isRunningAsr(asrTask)
|| (hasAudio && !isSummaryStage);
|| (asrTask == null && hasAudio && !isSummaryStage);
}
private String formatDateTime(LocalDateTime value) {

View File

@ -1,5 +1,6 @@
package com.imeeting.controller.biz;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.imeeting.common.MeetingConstants;
import com.imeeting.common.RedisKeys;
import com.imeeting.common.SysParamKeys;
@ -22,16 +23,20 @@ import com.imeeting.dto.biz.UpdateMeetingBasicCommand;
import com.imeeting.dto.biz.UpdateMeetingParticipantsCommand;
import com.imeeting.dto.biz.UpdateMeetingSummaryCommand;
import com.imeeting.dto.biz.UpdateMeetingTranscriptCommand;
import com.imeeting.entity.biz.AiTask;
import com.imeeting.entity.biz.Meeting;
import com.imeeting.service.biz.AiTaskService;
import com.imeeting.service.biz.MeetingAccessService;
import com.imeeting.service.biz.MeetingCommandService;
import com.imeeting.service.biz.MeetingExportService;
import com.imeeting.service.biz.MeetingProgressService;
import com.imeeting.service.biz.MeetingQueryService;
import com.imeeting.service.biz.MeetingTranscriptFileService;
import com.imeeting.service.biz.PromptTemplateService;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.service.biz.RealtimeMeetingSocketSessionService;
import com.imeeting.service.biz.impl.MeetingAudioUploadSupport;
import com.imeeting.service.biz.impl.RedisOnlyMeetingProgressServiceAdapter;
import com.unisbase.common.ApiResponse;
import com.unisbase.common.annotation.Log;
import com.unisbase.dto.PageResult;
@ -40,6 +45,7 @@ import com.unisbase.service.SysParamService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.validation.Valid;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
@ -79,9 +85,11 @@ public class MeetingController {
private final RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final MeetingAudioUploadSupport meetingAudioUploadSupport;
private final StringRedisTemplate redisTemplate;
private final MeetingProgressService meetingProgressService;
private final SysParamService sysParamService;
private AiTaskService compatibilityAiTaskService;
@Autowired
public MeetingController(MeetingQueryService meetingQueryService,
MeetingCommandService meetingCommandService,
MeetingAccessService meetingAccessService,
@ -91,7 +99,7 @@ public class MeetingController {
RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService,
RealtimeMeetingSessionStateService realtimeMeetingSessionStateService,
MeetingAudioUploadSupport meetingAudioUploadSupport,
StringRedisTemplate redisTemplate,
MeetingProgressService meetingProgressService,
SysParamService sysParamService) {
this.meetingQueryService = meetingQueryService;
this.meetingCommandService = meetingCommandService;
@ -102,10 +110,38 @@ public class MeetingController {
this.realtimeMeetingSocketSessionService = realtimeMeetingSocketSessionService;
this.realtimeMeetingSessionStateService = realtimeMeetingSessionStateService;
this.meetingAudioUploadSupport = meetingAudioUploadSupport;
this.redisTemplate = redisTemplate;
this.meetingProgressService = meetingProgressService;
this.sysParamService = sysParamService;
}
public MeetingController(MeetingQueryService meetingQueryService,
MeetingCommandService meetingCommandService,
MeetingAccessService meetingAccessService,
MeetingExportService meetingExportService,
MeetingTranscriptFileService meetingTranscriptFileService,
PromptTemplateService promptTemplateService,
RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService,
RealtimeMeetingSessionStateService realtimeMeetingSessionStateService,
AiTaskService unusedAiTaskService,
MeetingAudioUploadSupport meetingAudioUploadSupport,
StringRedisTemplate redisTemplate,
SysParamService sysParamService) {
this(
meetingQueryService,
meetingCommandService,
meetingAccessService,
meetingExportService,
meetingTranscriptFileService,
promptTemplateService,
realtimeMeetingSocketSessionService,
realtimeMeetingSessionStateService,
meetingAudioUploadSupport,
new RedisOnlyMeetingProgressServiceAdapter(redisTemplate, new com.fasterxml.jackson.databind.ObjectMapper()),
sysParamService
);
this.compatibilityAiTaskService = unusedAiTaskService;
}
@Operation(summary = "查询会议处理进度")
@GetMapping("/{id}/progress")
@PreAuthorize("isAuthenticated()")
@ -113,29 +149,21 @@ public class MeetingController {
LoginUser loginUser = currentLoginUser();
Meeting meeting = meetingAccessService.requireMeeting(id);
meetingAccessService.assertCanViewMeeting(meeting, loginUser);
String key = RedisKeys.meetingProgressKey(id);
String json = redisTemplate.opsForValue().get(key);
if (json != null) {
try {
return ApiResponse.ok(new com.fasterxml.jackson.databind.ObjectMapper().readValue(json, Map.class));
} catch (Exception ex) {
return ApiResponse.error("进度解析失败");
Map<String, Object> progress = meetingProgressService.getProgressMap(id);
if (compatibilityAiTaskService != null && "Waiting...".equals(progress.get("message"))) {
AiTask asrTask = compatibilityAiTaskService.getOne(new LambdaQueryWrapper<AiTask>()
.eq(AiTask::getMeetingId, id)
.eq(AiTask::getTaskType, "ASR")
.orderByDesc(AiTask::getId)
.last("LIMIT 1"));
if (asrTask != null && Integer.valueOf(0).equals(asrTask.getStatus())) {
return ApiResponse.ok(Map.of("percent", 0, "message", "排队中,等待 ASR 执行名额..."));
}
if (asrTask != null && Integer.valueOf(1).equals(asrTask.getStatus())) {
return ApiResponse.ok(Map.of("percent", 5, "message", "识别中,等待进度刷新..."));
}
}
Map<String, Object> fallback = new HashMap<>();
if (meeting.getStatus() == 3) {
fallback.put("percent", 100);
fallback.put("message", "Completed");
} else if (meeting.getStatus() == 4) {
fallback.put("percent", -1);
fallback.put("message", "Failed");
} else {
fallback.put("percent", 0);
fallback.put("message", "Waiting...");
}
return ApiResponse.ok(fallback);
return ApiResponse.ok(progress);
}
@Operation(summary = "上传会议音频")

View File

@ -0,0 +1,18 @@
package com.imeeting.dto.biz;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@Data
@Schema(description = "会议创建配置")
public class MeetingCreateConfigVO {
@Schema(description = "是否启用离线上传")
private Boolean offlineEnabled;
@Schema(description = "是否启用实时会议")
private Boolean realtimeEnabled;
@Schema(description = "离线音频上传大小上限,单位 MB")
private Long offlineAudioMaxSizeMb;
}

View File

@ -0,0 +1,30 @@
package com.imeeting.dto.biz;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MeetingProgressSnapshot {
private Long meetingId;
private Long taskId;
private String taskType;
private Integer taskStatus;
private Integer meetingStatus;
private String stage;
private Integer stageOrder;
private Integer percent;
private String message;
private Integer eta;
private String externalTaskId;
private LocalDateTime queuedAt;
private LocalDateTime startedAt;
private LocalDateTime completedAt;
private Long updateAt;
}

View File

@ -46,6 +46,9 @@ public class AiTask {
@Schema(description = "错误信息")
private String errorMsg;
@Schema(description = "排队时间")
private LocalDateTime queuedAt;
@Schema(description = "开始时间")
private LocalDateTime startedAt;

View File

@ -4,93 +4,93 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.imeeting.common.RedisKeys;
import com.imeeting.entity.biz.Meeting;
import com.imeeting.mapper.biz.MeetingMapper;
import com.imeeting.support.TaskSecurityContextRunner;
import com.imeeting.service.biz.AiTaskService;
import lombok.RequiredArgsConstructor;
import com.imeeting.support.RedisValueSupport;
import com.imeeting.support.TaskSecurityContextRunner;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* AI
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class MeetingTaskRecoveryListener implements ApplicationRunner {
private final MeetingMapper meetingMapper;
private final AiTaskService aiTaskService;
private final StringRedisTemplate redisTemplate;
private final RedisValueSupport redisValueSupport;
private final TaskSecurityContextRunner taskSecurityContextRunner;
private StringRedisTemplate compatibilityRedisTemplate;
@Autowired
public MeetingTaskRecoveryListener(MeetingMapper meetingMapper,
AiTaskService aiTaskService,
RedisValueSupport redisValueSupport,
TaskSecurityContextRunner taskSecurityContextRunner) {
this.meetingMapper = meetingMapper;
this.aiTaskService = aiTaskService;
this.redisValueSupport = redisValueSupport;
this.taskSecurityContextRunner = taskSecurityContextRunner;
}
public MeetingTaskRecoveryListener(MeetingMapper meetingMapper,
AiTaskService aiTaskService,
StringRedisTemplate redisTemplate,
TaskSecurityContextRunner taskSecurityContextRunner) {
this(meetingMapper, aiTaskService, new RedisValueSupport(redisTemplate, new com.fasterxml.jackson.databind.ObjectMapper()), taskSecurityContextRunner);
this.compatibilityRedisTemplate = redisTemplate;
}
@Override
public void run(ApplicationArguments args) {
log.info("Starting meeting task self-healing check...");
log.info("Starting meeting task recovery check...");
// 1. 查询状态为 1(识别中) 或 2(总结中) 且未删除的会议
List<Meeting> pendingMeetings = taskSecurityContextRunner.callAsPlatformAdmin(() ->
meetingMapper.selectList(new LambdaQueryWrapper<Meeting>()
.in(Meeting::getStatus, 1, 2)
.eq(Meeting::getIsDeleted, 0))
);
if (pendingMeetings.isEmpty()) {
log.info("No pending tasks found. Recovery check completed.");
log.info("No pending meeting tasks found.");
return;
}
LocalDateTime oneHourAgo = LocalDateTime.now().minusHours(1);
for (Meeting meeting : pendingMeetings) {
try {
// 处理 1 小时以上的死任务
if (meeting.getUpdatedAt() != null && meeting.getUpdatedAt().isBefore(oneHourAgo)) {
log.warn("Meeting {} is stuck for more than 1 hour, marking as failed", meeting.getId());
markAsFailed(meeting, "任务因系统维护超时,请重新发起分析");
continue;
}
redisValueSupport.delete(RedisKeys.meetingPollingLockKey(meeting.getId()));
redisValueSupport.delete(RedisKeys.meetingSummaryLockKey(meeting.getId()));
clearLegacyRedisState(meeting.getId());
// 2. 清理旧的 Redis 锁和进度缓存,确保恢复线程能拿到控制权
redisTemplate.delete(RedisKeys.meetingPollingLockKey(meeting.getId()));
redisTemplate.delete(RedisKeys.meetingSummaryLockKey(meeting.getId()));
// 3. 根据状态重新派发任务 (平滑拉起)
if (meeting.getStatus() == 1) {
log.info("Resuming ASR task for meeting {}", meeting.getId());
if (Integer.valueOf(1).equals(meeting.getStatus())) {
log.info("Recovering ASR task for meeting {}", meeting.getId());
aiTaskService.dispatchTasks(meeting.getId(), meeting.getTenantId(), meeting.getCreatorId());
} else if (meeting.getStatus() == 2) {
log.info("Resuming Summary task for meeting {}", meeting.getId());
} else if (Integer.valueOf(2).equals(meeting.getStatus())) {
log.info("Recovering summary task for meeting {}", meeting.getId());
aiTaskService.dispatchSummaryTask(meeting.getId(), meeting.getTenantId(), meeting.getCreatorId());
}
// 增加小延迟防止惊群效应
TimeUnit.MILLISECONDS.sleep(200);
} catch (Exception e) {
log.error("Failed to recover meeting task {}", meeting.getId(), e);
} catch (Exception ex) {
log.error("Failed to recover meeting task {}", meeting.getId(), ex);
}
}
log.info("Successfully processed {} pending tasks for recovery.", pendingMeetings.size());
log.info("Meeting task recovery processed {} meetings.", pendingMeetings.size());
}
private void markAsFailed(Meeting m, String reason) {
Meeting update = new Meeting();
update.setId(m.getId());
update.setStatus(4); // 失败
taskSecurityContextRunner.runAsTenantUser(m.getTenantId(), m.getCreatorId(), () -> meetingMapper.updateById(update));
// 同步 Redis 进度为失败
String progressKey = RedisKeys.meetingProgressKey(m.getId());
redisTemplate.opsForValue().set(progressKey,
"{\"percent\":-1, \"message\":\"" + reason + "\", \"updateAt\":" + System.currentTimeMillis() + "}",
1, TimeUnit.HOURS);
private void clearLegacyRedisState(Long meetingId) {
if (compatibilityRedisTemplate == null || meetingId == null) {
return;
}
compatibilityRedisTemplate.delete(List.of(
RedisKeys.meetingAsrPermitSyncLockKey(),
RedisKeys.meetingAsrRefillLockKey()
));
compatibilityRedisTemplate.opsForSet().remove(RedisKeys.meetingAsrPermitSetKey(), String.valueOf(meetingId));
}
}

View File

@ -284,6 +284,7 @@ public class LegacyMeetingAdapterServiceImpl implements LegacyMeetingAdapterServ
task.setTaskType(taskType);
}
task.setStatus(0);
task.setQueuedAt(LocalDateTime.now());
task.setTaskConfig(taskConfig);
task.setRequestData(null);
task.setResponseData(null);

View File

@ -0,0 +1,27 @@
package com.imeeting.service.biz;
import com.imeeting.common.MeetingProgressStage;
import com.imeeting.dto.biz.MeetingProgressSnapshot;
import com.imeeting.entity.biz.AiTask;
import java.util.Map;
public interface MeetingProgressService {
void clear(Long meetingId);
Map<String, Object> getProgressMap(Long meetingId);
Integer resolvePercent(Long meetingId);
void markQueued(Long meetingId, AiTask task, Integer meetingStatus, String message);
void markQueuedAfterCommitOrNow(Long meetingId, AiTask task, Integer meetingStatus, String message);
void markStage(Long meetingId, AiTask task, Integer meetingStatus, MeetingProgressStage stage, int percent, String message, int eta);
void markStageAfterCommitOrNow(Long meetingId, AiTask task, Integer meetingStatus, MeetingProgressStage stage, int percent, String message, int eta);
void syncFromDatabase(Long meetingId);
void writeSnapshot(MeetingProgressSnapshot snapshot);
}

View File

@ -1,10 +1,12 @@
package com.imeeting.service.biz.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.imeeting.common.MeetingProgressStage;
import com.imeeting.common.RedisKeys;
import com.imeeting.dto.biz.AiModelVO;
import com.imeeting.dto.biz.MeetingSummarySource;
@ -19,16 +21,21 @@ import com.imeeting.support.TaskSecurityContextRunner;
import com.imeeting.service.biz.AiModelService;
import com.imeeting.service.biz.AiTaskService;
import com.imeeting.service.biz.HotWordService;
import com.imeeting.service.biz.MeetingProgressService;
import com.imeeting.service.biz.MeetingSummaryFileService;
import com.imeeting.service.biz.MeetingTranscriptChapterService;
import com.imeeting.service.biz.MeetingTranscriptFileService;
import com.imeeting.service.biz.MeetingTranscriptRevisionService;
import com.imeeting.support.RedisValueSupport;
import com.unisbase.entity.SysUser;
import com.unisbase.mapper.SysUserMapper;
import lombok.RequiredArgsConstructor;
import com.unisbase.service.SysParamService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@ -46,12 +53,12 @@ import java.nio.file.Paths;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Service
@Slf4j
@RequiredArgsConstructor
public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> implements AiTaskService {
private final MeetingMapper meetingMapper;
@ -60,7 +67,8 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
private final ObjectMapper objectMapper;
private final SysUserMapper sysUserMapper;
private final HotWordService hotWordService;
private final StringRedisTemplate redisTemplate;
private final RedisValueSupport redisValueSupport;
private final MeetingProgressService meetingProgressService;
private final MeetingSummaryFileService meetingSummaryFileService;
private final MeetingTranscriptFileService meetingTranscriptFileService;
private final MeetingTranscriptRevisionService meetingTranscriptRevisionService;
@ -68,6 +76,19 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
private final MeetingSummaryPromptAssembler meetingSummaryPromptAssembler;
private final TaskSecurityContextRunner taskSecurityContextRunner;
private final MeetingExternalSummaryWebhookTrigger meetingExternalSummaryWebhookTrigger;
private final SysParamService sysParamService;
@Autowired
@Qualifier("asrTaskExecutor")
private Executor asrTaskExecutor;
@Autowired
@Qualifier("summaryTaskExecutor")
private Executor summaryTaskExecutor;
@Autowired
@Lazy
private AiTaskService self;
@Value("${unisbase.app.server-base-url}")
private String serverBaseUrl;
@ -83,15 +104,89 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
.version(HttpClient.Version.HTTP_1_1)
.build();
@Autowired
public AiTaskServiceImpl(MeetingMapper meetingMapper,
MeetingTranscriptMapper transcriptMapper,
AiModelService aiModelService,
ObjectMapper objectMapper,
SysUserMapper sysUserMapper,
HotWordService hotWordService,
RedisValueSupport redisValueSupport,
MeetingProgressService meetingProgressService,
MeetingSummaryFileService meetingSummaryFileService,
MeetingTranscriptFileService meetingTranscriptFileService,
MeetingTranscriptRevisionService meetingTranscriptRevisionService,
MeetingTranscriptChapterService meetingTranscriptChapterService,
MeetingSummaryPromptAssembler meetingSummaryPromptAssembler,
TaskSecurityContextRunner taskSecurityContextRunner,
MeetingExternalSummaryWebhookTrigger meetingExternalSummaryWebhookTrigger,
SysParamService sysParamService) {
this.meetingMapper = meetingMapper;
this.transcriptMapper = transcriptMapper;
this.aiModelService = aiModelService;
this.objectMapper = objectMapper;
this.sysUserMapper = sysUserMapper;
this.hotWordService = hotWordService;
this.redisValueSupport = redisValueSupport;
this.meetingProgressService = meetingProgressService;
this.meetingSummaryFileService = meetingSummaryFileService;
this.meetingTranscriptFileService = meetingTranscriptFileService;
this.meetingTranscriptRevisionService = meetingTranscriptRevisionService;
this.meetingTranscriptChapterService = meetingTranscriptChapterService;
this.meetingSummaryPromptAssembler = meetingSummaryPromptAssembler;
this.taskSecurityContextRunner = taskSecurityContextRunner;
this.meetingExternalSummaryWebhookTrigger = meetingExternalSummaryWebhookTrigger;
this.sysParamService = sysParamService;
}
public AiTaskServiceImpl(MeetingMapper meetingMapper,
MeetingTranscriptMapper transcriptMapper,
AiModelService aiModelService,
ObjectMapper objectMapper,
SysUserMapper sysUserMapper,
HotWordService hotWordService,
StringRedisTemplate redisTemplate,
MeetingSummaryFileService meetingSummaryFileService,
MeetingTranscriptFileService meetingTranscriptFileService,
MeetingTranscriptRevisionService meetingTranscriptRevisionService,
MeetingTranscriptChapterService meetingTranscriptChapterService,
MeetingSummaryPromptAssembler meetingSummaryPromptAssembler,
TaskSecurityContextRunner taskSecurityContextRunner,
MeetingExternalSummaryWebhookTrigger meetingExternalSummaryWebhookTrigger) {
this(
meetingMapper,
transcriptMapper,
aiModelService,
objectMapper,
sysUserMapper,
hotWordService,
new RedisValueSupport(redisTemplate, objectMapper),
new RedisOnlyMeetingProgressServiceAdapter(redisTemplate, objectMapper),
meetingSummaryFileService,
meetingTranscriptFileService,
meetingTranscriptRevisionService,
meetingTranscriptChapterService,
meetingSummaryPromptAssembler,
taskSecurityContextRunner,
meetingExternalSummaryWebhookTrigger,
null
);
}
@Override
@Async
@Async("asrDispatchExecutor")
public void dispatchTasks(Long meetingId, Long tenantId, Long userId) {
taskSecurityContextRunner.runAsTenantUser(tenantId, userId, () -> doDispatchTasks(meetingId));
Runnable task = () -> taskSecurityContextRunner.runAsTenantUser(tenantId, userId, () -> doDispatchTasks(meetingId));
if (asrTaskExecutor == null) {
task.run();
return;
}
asrTaskExecutor.execute(task);
}
private void doDispatchTasks(Long meetingId) {
String lockKey = RedisKeys.meetingPollingLockKey(meetingId);
Boolean acquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", 30, TimeUnit.MINUTES);
Boolean acquired = redisValueSupport.setIfAbsent(lockKey, "locked", 30, TimeUnit.MINUTES);
if (Boolean.FALSE.equals(acquired)) {
log.warn("Meeting {} is already being processed", meetingId);
return;
@ -107,6 +202,23 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
.orderByDesc(AiTask::getId)
.last("limit 1"));
if (asrTask != null) {
if (Integer.valueOf(1).equals(asrTask.getStatus())) {
if (!prepareRunningAsrTaskForRecovery(meeting, asrTask)) {
return;
}
} else if (Integer.valueOf(0).equals(asrTask.getStatus())) {
if (asrTask.getQueuedAt() == null) {
asrTask.setQueuedAt(LocalDateTime.now());
this.updateById(asrTask);
}
if (!claimQueuedAsrTask(asrTask)) {
meetingProgressService.markQueued(meetingId, asrTask, 1, "已进入 ASR 队列,等待执行");
return;
}
}
}
String asrText = "";
if (asrTask != null && canExecuteTask(asrTask)) {
asrText = processAsrTask(meeting, asrTask);
@ -135,6 +247,11 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
updateProgress(meetingId, -1, "未识别到可用于总结的转录内容", 0);
return;
}
if (!asrText.isBlank()) {
meetingProgressService.markStage(meetingId, asrTask, 1, MeetingProgressStage.ASR_COMPLETED, 80, "转写完成,准备生成总结", 0);
self.dispatchSummaryTask(meetingId, meeting.getTenantId(), meeting.getCreatorId());
return;
}
if (chapterTask != null && canExecuteTask(chapterTask)) {
executeChapterFlow(meeting, chapterTask);
}
@ -155,14 +272,20 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
updateMeetingStatus(meetingId, 4);
updateProgress(meetingId, -1, "分析失败: " + e.getMessage(), 0);
} finally {
redisTemplate.delete(lockKey);
redisValueSupport.delete(lockKey);
scheduleQueuedAsrTasks();
}
}
@Override
@Async
@Async("summaryDispatchExecutor")
public void dispatchSummaryTask(Long meetingId, Long tenantId, Long userId) {
taskSecurityContextRunner.runAsTenantUser(tenantId, userId, () -> doDispatchSummaryTask(meetingId));
Runnable task = () -> taskSecurityContextRunner.runAsTenantUser(tenantId, userId, () -> doDispatchSummaryTask(meetingId));
if (summaryTaskExecutor == null) {
task.run();
return;
}
summaryTaskExecutor.execute(task);
}
private void doDispatchSummaryTask(Long meetingId) {
@ -198,6 +321,164 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
}
}
private boolean prepareRunningAsrTaskForRecovery(Meeting meeting, AiTask asrTask) {
if (meeting == null || asrTask == null) {
return false;
}
Long asrModelId = extractAsrModelId(asrTask);
String externalTaskId = extractExternalTaskId(asrTask);
if (asrModelId == null || externalTaskId == null || externalTaskId.isBlank()) {
requeueAsrTask(asrTask, "恢复时缺少有效外部 ASR 任务,已重新排队", true);
return false;
}
AiModelVO asrModel = aiModelService.getModelById(asrModelId, "ASR");
if (asrModel == null || !canResumeAsrTask(asrModel, meeting.getId(), externalTaskId)) {
requeueAsrTask(asrTask, "外部 ASR 状态异常,已重新排队", true);
return false;
}
return true;
}
private boolean claimQueuedAsrTask(AiTask task) {
if (task == null || task.getId() == null || !Integer.valueOf(0).equals(task.getStatus())) {
return false;
}
if (getBaseMapper() == null) {
task.setStatus(1);
task.setStartedAt(LocalDateTime.now());
return true;
}
String scheduleLockKey = RedisKeys.meetingAsrScheduleLockKey();
Boolean acquired = redisValueSupport.setIfAbsent(scheduleLockKey, "locked", 30, TimeUnit.SECONDS);
if (Boolean.FALSE.equals(acquired)) {
return false;
}
try {
int maxConcurrent = resolveAsrMaxConcurrent();
long runningCount = count(new LambdaQueryWrapper<AiTask>()
.eq(AiTask::getTaskType, "ASR")
.eq(AiTask::getStatus, 1));
if (runningCount >= maxConcurrent) {
return false;
}
int available = (int) Math.max(1, maxConcurrent - runningCount);
List<AiTask> dispatchable = list(new LambdaQueryWrapper<AiTask>()
.eq(AiTask::getTaskType, "ASR")
.eq(AiTask::getStatus, 0)
.orderByAsc(AiTask::getQueuedAt)
.orderByAsc(AiTask::getId)
.last("LIMIT " + available));
boolean eligible = dispatchable.stream().anyMatch(item -> Objects.equals(item.getId(), task.getId()));
if (!eligible) {
return false;
}
LocalDateTime now = LocalDateTime.now();
boolean claimed = update(new LambdaUpdateWrapper<AiTask>()
.eq(AiTask::getId, task.getId())
.eq(AiTask::getStatus, 0)
.set(AiTask::getStatus, 1)
.set(AiTask::getStartedAt, now)
.set(AiTask::getCompletedAt, null)
.set(AiTask::getErrorMsg, null));
if (claimed) {
task.setStatus(1);
task.setStartedAt(now);
meetingProgressService.markStage(task.getMeetingId(), task, 1, MeetingProgressStage.ASR_SUBMITTED, 5, "ASR 任务已开始执行", 0);
}
return claimed;
} finally {
redisValueSupport.delete(scheduleLockKey);
}
}
private void scheduleQueuedAsrTasks() {
if (getBaseMapper() == null) {
return;
}
String scheduleLockKey = RedisKeys.meetingAsrScheduleLockKey();
Boolean acquired = redisValueSupport.setIfAbsent(scheduleLockKey, "locked", 30, TimeUnit.SECONDS);
if (Boolean.FALSE.equals(acquired)) {
return;
}
try {
int maxConcurrent = resolveAsrMaxConcurrent();
long runningCount = count(new LambdaQueryWrapper<AiTask>()
.eq(AiTask::getTaskType, "ASR")
.eq(AiTask::getStatus, 1));
int available = (int) (maxConcurrent - runningCount);
if (available <= 0) {
return;
}
List<AiTask> queuedTasks = list(new LambdaQueryWrapper<AiTask>()
.eq(AiTask::getTaskType, "ASR")
.eq(AiTask::getStatus, 0)
.orderByAsc(AiTask::getQueuedAt)
.orderByAsc(AiTask::getId)
.last("LIMIT " + available));
for (AiTask queuedTask : queuedTasks) {
Meeting queuedMeeting = meetingMapper.selectById(queuedTask.getMeetingId());
if (queuedMeeting == null) {
continue;
}
self.dispatchTasks(queuedMeeting.getId(), queuedMeeting.getTenantId(), queuedMeeting.getCreatorId());
}
} finally {
redisValueSupport.delete(scheduleLockKey);
}
}
private int resolveAsrMaxConcurrent() {
if (sysParamService == null) {
return 2;
}
String configured = sysParamService.getCachedParamValue(com.imeeting.common.SysParamKeys.MEETING_ASR_MAX_CONCURRENT, "2");
try {
return Math.max(1, Integer.parseInt(configured.trim()));
} catch (Exception ex) {
return 2;
}
}
private void requeueAsrTask(AiTask task, String reason, boolean clearExternalTaskId) {
if (task == null || task.getId() == null) {
return;
}
task.setStatus(0);
task.setQueuedAt(LocalDateTime.now());
task.setStartedAt(null);
task.setCompletedAt(null);
task.setErrorMsg(null);
if (clearExternalTaskId) {
clearAsrTaskId(task);
}
Map<String, Object> responseData = task.getResponseData() == null
? new HashMap<>()
: new HashMap<>(task.getResponseData());
responseData.put("requeueReason", reason);
responseData.put("requeuedAt", LocalDateTime.now().toString());
task.setResponseData(responseData);
updateById(task);
meetingProgressService.markQueued(task.getMeetingId(), task, 1, reason == null || reason.isBlank() ? "已重新进入 ASR 队列" : reason);
}
private Long extractAsrModelId(AiTask task) {
if (task == null || task.getTaskConfig() == null || task.getTaskConfig().get("asrModelId") == null) {
return null;
}
try {
return Long.parseLong(String.valueOf(task.getTaskConfig().get("asrModelId")));
} catch (Exception ex) {
return null;
}
}
private String extractExternalTaskId(AiTask task) {
if (task == null || task.getResponseData() == null || task.getResponseData().get("task_id") == null) {
return null;
}
return String.valueOf(task.getResponseData().get("task_id"));
}
private String processAsrTask(Meeting meeting, AiTask taskRecord) throws Exception {
updateMeetingStatus(meeting.getId(), 1);
@ -698,7 +979,7 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
return;
}
String summaryLockKey = RedisKeys.meetingSummaryLockKey(meeting.getId());
Boolean acquired = redisTemplate.opsForValue().setIfAbsent(summaryLockKey, "locked", 30, TimeUnit.MINUTES);
Boolean acquired = redisValueSupport.setIfAbsent(summaryLockKey, "locked", 30, TimeUnit.MINUTES);
if (Boolean.FALSE.equals(acquired)) {
log.warn("Meeting {} summary is already being processed", meeting.getId());
return;
@ -713,7 +994,7 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
}
processSummaryTask(meeting, summarySource, sumTask);
} finally {
redisTemplate.delete(summaryLockKey);
redisValueSupport.delete(summaryLockKey);
}
}
@ -725,6 +1006,18 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
.last("limit 1"));
}
private AiTask findLatestTaskForProgress(Long meetingId) {
AiTask summaryTask = findLatestTask(meetingId, "SUMMARY");
if (summaryTask != null && Integer.valueOf(1).equals(summaryTask.getStatus())) {
return summaryTask;
}
AiTask chapterTask = findLatestTask(meetingId, "CHAPTER");
if (chapterTask != null && Integer.valueOf(1).equals(chapterTask.getStatus())) {
return chapterTask;
}
return findLatestTask(meetingId, "ASR");
}
private boolean isExternalSummaryModeEnabled() {
return "EXTERNAL_N8N".equalsIgnoreCase(summaryOrchestrationMode);
}
@ -779,21 +1072,28 @@ public class AiTaskServiceImpl extends ServiceImpl<AiTaskMapper, AiTask> impleme
if (meetingId == null) {
return;
}
try {
Map<String, Object> progress = new HashMap<>();
progress.put("percent", percent);
progress.put("message", msg);
progress.put("eta", eta);
progress.put("updateAt", System.currentTimeMillis());
redisTemplate.opsForValue().set(
RedisKeys.meetingProgressKey(meetingId),
objectMapper.writeValueAsString(progress),
1,
TimeUnit.HOURS
);
} catch (Exception e) {
log.error("Redis progress update error", e);
MeetingProgressStage stage;
int meetingStatus;
if (percent < 0) {
stage = MeetingProgressStage.FAILED;
meetingStatus = 4;
} else if (percent >= 100) {
stage = MeetingProgressStage.COMPLETED;
meetingStatus = 3;
} else if (percent >= 90) {
stage = MeetingProgressStage.SUMMARY_RUNNING;
meetingStatus = 2;
} else if (percent >= 85) {
stage = MeetingProgressStage.CHAPTER_RUNNING;
meetingStatus = 2;
} else if (percent >= 5) {
stage = MeetingProgressStage.ASR_RUNNING;
meetingStatus = 1;
} else {
stage = MeetingProgressStage.QUEUED;
meetingStatus = 1;
}
meetingProgressService.markStage(meetingId, findLatestTaskForProgress(meetingId), meetingStatus, stage, percent, msg, eta);
}
private String postJson(String url, Object body, String apiKey) throws Exception {

View File

@ -27,6 +27,7 @@ import com.imeeting.entity.biz.MeetingTranscriptChapterVersion;
import com.imeeting.service.biz.AiTaskService;
import com.imeeting.service.biz.HotWordService;
import com.imeeting.service.biz.MeetingCommandService;
import com.imeeting.service.biz.MeetingProgressService;
import com.imeeting.service.biz.MeetingRuntimeProfileResolver;
import com.imeeting.service.biz.MeetingService;
import com.imeeting.service.biz.MeetingSummaryFileService;
@ -34,9 +35,10 @@ import com.imeeting.service.biz.MeetingTranscriptChapterService;
import com.imeeting.service.biz.MeetingTranscriptFileService;
import com.imeeting.service.biz.MeetingTranscriptRevisionService;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.service.biz.impl.RedisOnlyMeetingProgressServiceAdapter;
import com.imeeting.service.realtime.RealtimeMeetingAudioStorageService;
import lombok.extern.slf4j.Slf4j;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
@ -55,7 +57,6 @@ import java.util.stream.Collectors;
@Slf4j
@Service
@RequiredArgsConstructor
public class MeetingCommandServiceImpl implements MeetingCommandService {
private final MeetingService meetingService;
@ -70,13 +71,82 @@ public class MeetingCommandServiceImpl implements MeetingCommandService {
private final MeetingRuntimeProfileResolver meetingRuntimeProfileResolver;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService;
private final StringRedisTemplate redisTemplate;
private final MeetingProgressService meetingProgressService;
private final ObjectMapper objectMapper;
private final MeetingExternalSummaryWebhookTrigger meetingExternalSummaryWebhookTrigger;
private StringRedisTemplate compatibilityRedisTemplate;
@Value("${imeeting.summary-orchestration.mode:INTERNAL_BUILTIN}")
private String summaryOrchestrationMode;
@Autowired
public MeetingCommandServiceImpl(MeetingService meetingService,
AiTaskService aiTaskService,
HotWordService hotWordService,
com.imeeting.mapper.biz.MeetingTranscriptMapper transcriptMapper,
MeetingSummaryFileService meetingSummaryFileService,
MeetingTranscriptFileService meetingTranscriptFileService,
MeetingTranscriptRevisionService meetingTranscriptRevisionService,
MeetingTranscriptChapterService meetingTranscriptChapterService,
MeetingDomainSupport meetingDomainSupport,
MeetingRuntimeProfileResolver meetingRuntimeProfileResolver,
RealtimeMeetingSessionStateService realtimeMeetingSessionStateService,
RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService,
MeetingProgressService meetingProgressService,
ObjectMapper objectMapper,
MeetingExternalSummaryWebhookTrigger meetingExternalSummaryWebhookTrigger) {
this.meetingService = meetingService;
this.aiTaskService = aiTaskService;
this.hotWordService = hotWordService;
this.transcriptMapper = transcriptMapper;
this.meetingSummaryFileService = meetingSummaryFileService;
this.meetingTranscriptFileService = meetingTranscriptFileService;
this.meetingTranscriptRevisionService = meetingTranscriptRevisionService;
this.meetingTranscriptChapterService = meetingTranscriptChapterService;
this.meetingDomainSupport = meetingDomainSupport;
this.meetingRuntimeProfileResolver = meetingRuntimeProfileResolver;
this.realtimeMeetingSessionStateService = realtimeMeetingSessionStateService;
this.realtimeMeetingAudioStorageService = realtimeMeetingAudioStorageService;
this.meetingProgressService = meetingProgressService;
this.objectMapper = objectMapper;
this.meetingExternalSummaryWebhookTrigger = meetingExternalSummaryWebhookTrigger;
}
public MeetingCommandServiceImpl(MeetingService meetingService,
AiTaskService aiTaskService,
HotWordService hotWordService,
com.imeeting.mapper.biz.MeetingTranscriptMapper transcriptMapper,
MeetingSummaryFileService meetingSummaryFileService,
MeetingTranscriptFileService meetingTranscriptFileService,
MeetingTranscriptRevisionService meetingTranscriptRevisionService,
MeetingTranscriptChapterService meetingTranscriptChapterService,
MeetingDomainSupport meetingDomainSupport,
MeetingRuntimeProfileResolver meetingRuntimeProfileResolver,
RealtimeMeetingSessionStateService realtimeMeetingSessionStateService,
RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService,
StringRedisTemplate redisTemplate,
ObjectMapper objectMapper,
MeetingExternalSummaryWebhookTrigger meetingExternalSummaryWebhookTrigger) {
this(
meetingService,
aiTaskService,
hotWordService,
transcriptMapper,
meetingSummaryFileService,
meetingTranscriptFileService,
meetingTranscriptRevisionService,
meetingTranscriptChapterService,
meetingDomainSupport,
meetingRuntimeProfileResolver,
realtimeMeetingSessionStateService,
realtimeMeetingAudioStorageService,
new RedisOnlyMeetingProgressServiceAdapter(redisTemplate, objectMapper),
objectMapper,
meetingExternalSummaryWebhookTrigger
);
this.compatibilityRedisTemplate = redisTemplate;
}
@Override
@Transactional(rollbackFor = Exception.class)
public MeetingVO createMeeting(CreateMeetingCommand command, Long tenantId, Long creatorId, String creatorName, String meetingSource) {
@ -92,6 +162,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService {
asrTask.setMeetingId(meeting.getId());
asrTask.setTaskType("ASR");
asrTask.setStatus(0);
asrTask.setQueuedAt(java.time.LocalDateTime.now());
Map<String, Object> asrConfig = new HashMap<>();
asrConfig.put("asrModelId", runtimeProfile.getResolvedAsrModelId());
@ -204,7 +275,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService {
.eq(AiTask::getMeetingId, id));
meetingService.removeById(id);
realtimeMeetingSessionStateService.clear(id);
redisTemplate.delete(RedisKeys.meetingProgressKey(id));
meetingProgressService.clear(id);
deleteMeetingArtifactsAfterCommit(id);
}
@ -434,6 +505,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService {
private void resetAiTask(AiTask task, Map<String, Object> taskConfig) {
task.setStatus(0);
task.setQueuedAt(java.time.LocalDateTime.now());
task.setTaskConfig(taskConfig);
task.setRequestData(null);
task.setResponseData(null);
@ -878,6 +950,9 @@ public class MeetingCommandServiceImpl implements MeetingCommandService {
if (asrTask == null || asrTask.getTaskConfig() == null || asrTask.getTaskConfig().get("asrModelId") == null) {
throw new RuntimeException("未找到可用的识别任务配置");
}
if (Integer.valueOf(1).equals(asrTask.getStatus()) && !Integer.valueOf(4).equals(meeting.getStatus())) {
throw new RuntimeException("当前会议转写任务仍在处理中,请勿重复重试");
}
AiTask summaryTask = aiTaskService.getOne(new LambdaQueryWrapper<AiTask>()
.eq(AiTask::getMeetingId, meetingId)
@ -917,10 +992,20 @@ public class MeetingCommandServiceImpl implements MeetingCommandService {
meeting.setStatus(1);
meetingService.updateById(meeting);
clearLegacyDispatchState(meetingId);
updateMeetingProgress(meetingId, 0, "已重新提交识别任务,等待 ASR 处理...", 0);
dispatchTasksAfterCommit(meetingId, meeting.getTenantId(), meeting.getCreatorId());
}
private void clearLegacyDispatchState(Long meetingId) {
if (compatibilityRedisTemplate == null || meetingId == null) {
return;
}
compatibilityRedisTemplate.delete(RedisKeys.meetingPollingLockKey(meetingId));
compatibilityRedisTemplate.delete(RedisKeys.meetingSummaryLockKey(meetingId));
compatibilityRedisTemplate.opsForSet().remove(RedisKeys.meetingAsrPermitSetKey(), String.valueOf(meetingId));
}
private void ensureExternalSummaryModeEnabled() {
if (!"EXTERNAL_N8N".equalsIgnoreCase(summaryOrchestrationMode)) {
throw new RuntimeException("外部 n8n 总结编排模式未开启");
@ -1055,21 +1140,28 @@ public class MeetingCommandServiceImpl implements MeetingCommandService {
}
private void updateMeetingProgress(Long meetingId, int percent, String message, int eta) {
try {
Map<String, Object> progress = new HashMap<>();
progress.put("percent", percent);
progress.put("message", message);
progress.put("eta", eta);
progress.put("updateAt", System.currentTimeMillis());
redisTemplate.opsForValue().set(
RedisKeys.meetingProgressKey(meetingId),
objectMapper.writeValueAsString(progress),
1,
TimeUnit.HOURS
);
} catch (Exception ignored) {
// Ignore progress write failures.
com.imeeting.common.MeetingProgressStage stage;
int meetingStatus;
if (percent < 0) {
stage = com.imeeting.common.MeetingProgressStage.FAILED;
meetingStatus = 4;
} else if (percent >= 100) {
stage = com.imeeting.common.MeetingProgressStage.COMPLETED;
meetingStatus = 3;
} else if (percent >= 90) {
stage = com.imeeting.common.MeetingProgressStage.SUMMARY_RUNNING;
meetingStatus = 2;
} else if (percent >= 85) {
stage = com.imeeting.common.MeetingProgressStage.CHAPTER_RUNNING;
meetingStatus = 2;
} else if (percent >= 5) {
stage = com.imeeting.common.MeetingProgressStage.ASR_RUNNING;
meetingStatus = 1;
} else {
stage = com.imeeting.common.MeetingProgressStage.QUEUED;
meetingStatus = 1;
}
meetingProgressService.markStageAfterCommitOrNow(meetingId, null, meetingStatus, stage, percent, message, eta);
}
private RealtimeMeetingResumeConfig buildRealtimeResumeConfig(CreateRealtimeMeetingCommand command,

View File

@ -113,6 +113,7 @@ public class MeetingDomainSupport {
chapterTask.setMeetingId(meetingId);
chapterTask.setTaskType("CHAPTER");
chapterTask.setStatus(0);
chapterTask.setQueuedAt(LocalDateTime.now());
chapterTask.setTaskConfig(meetingSummaryPromptAssembler.buildTaskConfig(
summaryModelId,
chapterModelId,
@ -141,6 +142,7 @@ public class MeetingDomainSupport {
sumTask.setMeetingId(meetingId);
sumTask.setTaskType("SUMMARY");
sumTask.setStatus(0);
sumTask.setQueuedAt(LocalDateTime.now());
sumTask.setTaskConfig(meetingSummaryPromptAssembler.buildTaskConfig(
summaryModelId,
chapterModelId,
@ -393,11 +395,11 @@ public class MeetingDomainSupport {
} else {
vo.setParticipantIds(Collections.emptyList());
}
fillLatestTaskAttemptInfo(meeting, vo);
if (includeSummary) {
vo.setSummaryContent(meetingSummaryFileService.loadSummaryContent(meeting));
vo.setAnalysis(meetingSummaryFileService.loadSummaryAnalysis(meeting));
vo.setLastUserPrompt(resolveLastSummaryUserPrompt(meeting));
fillLatestTaskAttemptInfo(meeting, vo);
}
}

View File

@ -0,0 +1,254 @@
package com.imeeting.service.biz.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.imeeting.common.MeetingProgressStage;
import com.imeeting.common.RedisKeys;
import com.imeeting.dto.biz.MeetingProgressSnapshot;
import com.imeeting.entity.biz.AiTask;
import com.imeeting.entity.biz.Meeting;
import com.imeeting.mapper.biz.AiTaskMapper;
import com.imeeting.mapper.biz.MeetingMapper;
import com.imeeting.service.biz.MeetingProgressService;
import com.imeeting.support.RedisValueSupport;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Service
@RequiredArgsConstructor
public class MeetingProgressServiceImpl implements MeetingProgressService {
private static final long PROGRESS_TTL_HOURS = 1L;
private final MeetingMapper meetingMapper;
private final AiTaskMapper aiTaskMapper;
private final RedisValueSupport redisValueSupport;
private final ObjectMapper objectMapper;
@Override
public void clear(Long meetingId) {
if (meetingId == null) {
return;
}
redisValueSupport.delete(RedisKeys.meetingProgressKey(meetingId));
}
@Override
public Map<String, Object> getProgressMap(Long meetingId) {
MeetingProgressSnapshot snapshot = redisValueSupport.getJson(RedisKeys.meetingProgressKey(meetingId), MeetingProgressSnapshot.class);
if (snapshot == null) {
snapshot = buildFallbackSnapshot(meetingId);
if (snapshot != null) {
writeSnapshot(snapshot);
}
}
if (snapshot == null) {
return Map.of("percent", 0, "message", "Waiting...");
}
return objectMapper.convertValue(snapshot, Map.class);
}
@Override
public Integer resolvePercent(Long meetingId) {
MeetingProgressSnapshot snapshot = redisValueSupport.getJson(RedisKeys.meetingProgressKey(meetingId), MeetingProgressSnapshot.class);
if (snapshot != null && snapshot.getPercent() != null) {
return snapshot.getPercent();
}
MeetingProgressSnapshot fallback = buildFallbackSnapshot(meetingId);
return fallback == null ? null : fallback.getPercent();
}
@Override
public void markQueued(Long meetingId, AiTask task, Integer meetingStatus, String message) {
writeSnapshot(buildSnapshot(meetingId, task, meetingStatus, MeetingProgressStage.QUEUED, 0, message, 0));
}
@Override
public void markQueuedAfterCommitOrNow(Long meetingId, AiTask task, Integer meetingStatus, String message) {
afterCommitOrNow(() -> markQueued(meetingId, task, meetingStatus, message));
}
@Override
public void markStage(Long meetingId, AiTask task, Integer meetingStatus, MeetingProgressStage stage, int percent, String message, int eta) {
writeSnapshot(buildSnapshot(meetingId, task, meetingStatus, stage, percent, message, eta));
}
@Override
public void markStageAfterCommitOrNow(Long meetingId, AiTask task, Integer meetingStatus, MeetingProgressStage stage, int percent, String message, int eta) {
afterCommitOrNow(() -> markStage(meetingId, task, meetingStatus, stage, percent, message, eta));
}
@Override
public void syncFromDatabase(Long meetingId) {
MeetingProgressSnapshot snapshot = buildFallbackSnapshot(meetingId);
if (snapshot != null) {
writeSnapshot(snapshot);
}
}
@Override
public void writeSnapshot(MeetingProgressSnapshot snapshot) {
if (snapshot == null || snapshot.getMeetingId() == null) {
return;
}
MeetingProgressSnapshot existing = redisValueSupport.getJson(RedisKeys.meetingProgressKey(snapshot.getMeetingId()), MeetingProgressSnapshot.class);
if (!shouldReplace(existing, snapshot)) {
return;
}
redisValueSupport.setJson(RedisKeys.meetingProgressKey(snapshot.getMeetingId()), snapshot, PROGRESS_TTL_HOURS, TimeUnit.HOURS);
}
private MeetingProgressSnapshot buildSnapshot(Long meetingId,
AiTask task,
Integer meetingStatus,
MeetingProgressStage stage,
int percent,
String message,
int eta) {
String externalTaskId = null;
if (task != null && task.getResponseData() != null && task.getResponseData().get("task_id") != null) {
externalTaskId = String.valueOf(task.getResponseData().get("task_id"));
}
return MeetingProgressSnapshot.builder()
.meetingId(meetingId)
.taskId(task == null ? null : task.getId())
.taskType(task == null ? null : task.getTaskType())
.taskStatus(task == null ? null : task.getStatus())
.meetingStatus(meetingStatus)
.stage(stage.getCode())
.stageOrder(stage.getOrder())
.percent(percent)
.message(message)
.eta(eta)
.externalTaskId(externalTaskId)
.queuedAt(task == null ? null : task.getQueuedAt())
.startedAt(task == null ? null : task.getStartedAt())
.completedAt(task == null ? null : task.getCompletedAt())
.updateAt(System.currentTimeMillis())
.build();
}
private MeetingProgressSnapshot buildFallbackSnapshot(Long meetingId) {
if (meetingId == null) {
return null;
}
Meeting meeting = meetingMapper.selectById(meetingId);
if (meeting == null) {
return null;
}
AiTask latestTask = aiTaskMapper.selectOne(new LambdaQueryWrapper<AiTask>()
.eq(AiTask::getMeetingId, meetingId)
.orderByDesc(AiTask::getId)
.last("LIMIT 1"));
if (meeting.getStatus() != null && meeting.getStatus() == 3) {
return buildSnapshot(meetingId, latestTask, meeting.getStatus(), MeetingProgressStage.COMPLETED, 100, "处理完成", 0);
}
if (meeting.getStatus() != null && meeting.getStatus() == 4) {
String message = latestTask != null && latestTask.getErrorMsg() != null && !latestTask.getErrorMsg().isBlank()
? latestTask.getErrorMsg()
: "处理失败";
return buildSnapshot(meetingId, latestTask, meeting.getStatus(), MeetingProgressStage.FAILED, -1, message, 0);
}
AiTask latestSummary = findLatestTask(meetingId, "SUMMARY");
if (latestSummary != null && Integer.valueOf(1).equals(latestSummary.getStatus())) {
return buildSnapshot(meetingId, latestSummary, meeting.getStatus(), MeetingProgressStage.SUMMARY_RUNNING, 90, "正在生成会议总结...", 0);
}
AiTask latestChapter = findLatestTask(meetingId, "CHAPTER");
if (latestChapter != null && Integer.valueOf(1).equals(latestChapter.getStatus())) {
return buildSnapshot(meetingId, latestChapter, meeting.getStatus(), MeetingProgressStage.CHAPTER_RUNNING, 85, "正在生成会议章节...", 0);
}
AiTask latestAsr = findLatestTask(meetingId, "ASR");
if (latestAsr != null) {
if (Integer.valueOf(1).equals(latestAsr.getStatus())) {
return buildSnapshot(meetingId, latestAsr, meeting.getStatus(), MeetingProgressStage.ASR_RUNNING, 10, "正在识别音频...", 0);
}
if (Integer.valueOf(0).equals(latestAsr.getStatus())) {
return buildSnapshot(meetingId, latestAsr, meeting.getStatus(), MeetingProgressStage.QUEUED, 0, "已进入 ASR 队列,等待执行", 0);
}
}
return buildSnapshot(meetingId, latestTask, meeting.getStatus(), MeetingProgressStage.QUEUED, 0, "Waiting...", 0);
}
private AiTask findLatestTask(Long meetingId, String taskType) {
return aiTaskMapper.selectOne(new LambdaQueryWrapper<AiTask>()
.eq(AiTask::getMeetingId, meetingId)
.eq(AiTask::getTaskType, taskType)
.orderByDesc(AiTask::getId)
.last("LIMIT 1"));
}
private boolean shouldReplace(MeetingProgressSnapshot existing, MeetingProgressSnapshot candidate) {
if (candidate == null) {
return false;
}
if (existing == null) {
return true;
}
if (isRequeue(existing, candidate)) {
return true;
}
if (isTerminal(existing) && !isTerminal(candidate)) {
return false;
}
if (isTerminal(candidate)) {
return true;
}
int existingOrder = existing.getStageOrder() == null ? 0 : existing.getStageOrder();
int candidateOrder = candidate.getStageOrder() == null ? 0 : candidate.getStageOrder();
if (candidateOrder > existingOrder) {
return true;
}
if (candidateOrder < existingOrder) {
return false;
}
int existingPercent = existing.getPercent() == null ? Integer.MIN_VALUE : existing.getPercent();
int candidatePercent = candidate.getPercent() == null ? Integer.MIN_VALUE : candidate.getPercent();
if (candidatePercent >= existingPercent) {
return true;
}
Long existingUpdateAt = existing.getUpdateAt() == null ? 0L : existing.getUpdateAt();
Long candidateUpdateAt = candidate.getUpdateAt() == null ? 0L : candidate.getUpdateAt();
return candidateUpdateAt >= existingUpdateAt;
}
private boolean isTerminal(MeetingProgressSnapshot snapshot) {
return snapshot != null
&& snapshot.getStage() != null
&& (MeetingProgressStage.COMPLETED.getCode().equals(snapshot.getStage())
|| MeetingProgressStage.FAILED.getCode().equals(snapshot.getStage()));
}
private boolean isRequeue(MeetingProgressSnapshot existing, MeetingProgressSnapshot candidate) {
return candidate.getTaskStatus() != null
&& candidate.getTaskStatus() == 0
&& MeetingProgressStage.QUEUED.getCode().equals(candidate.getStage())
&& candidate.getQueuedAt() != null
&& (existing.getQueuedAt() == null || candidate.getQueuedAt().isAfter(existing.getQueuedAt()));
}
private void afterCommitOrNow(Runnable runnable) {
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
runnable.run();
return;
}
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
runnable.run();
}
});
}
}

View File

@ -0,0 +1,128 @@
package com.imeeting.service.biz.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.imeeting.common.MeetingProgressStage;
import com.imeeting.common.RedisKeys;
import com.imeeting.dto.biz.MeetingProgressSnapshot;
import com.imeeting.entity.biz.AiTask;
import com.imeeting.service.biz.MeetingProgressService;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class RedisOnlyMeetingProgressServiceAdapter implements MeetingProgressService {
private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
public RedisOnlyMeetingProgressServiceAdapter(StringRedisTemplate redisTemplate, ObjectMapper objectMapper) {
this.redisTemplate = redisTemplate;
this.objectMapper = objectMapper == null ? new ObjectMapper() : objectMapper;
}
@Override
public void clear(Long meetingId) {
if (redisTemplate == null || meetingId == null) {
return;
}
redisTemplate.delete(RedisKeys.meetingProgressKey(meetingId));
}
@Override
public Map<String, Object> getProgressMap(Long meetingId) {
MeetingProgressSnapshot snapshot = readSnapshot(meetingId);
if (snapshot == null) {
return Map.of("percent", 0, "message", "Waiting...");
}
return objectMapper.convertValue(snapshot, Map.class);
}
@Override
public Integer resolvePercent(Long meetingId) {
MeetingProgressSnapshot snapshot = readSnapshot(meetingId);
return snapshot == null ? null : snapshot.getPercent();
}
@Override
public void markQueued(Long meetingId, AiTask task, Integer meetingStatus, String message) {
writeSnapshot(buildSnapshot(meetingId, task, meetingStatus, MeetingProgressStage.QUEUED, 0, message, 0));
}
@Override
public void markQueuedAfterCommitOrNow(Long meetingId, AiTask task, Integer meetingStatus, String message) {
markQueued(meetingId, task, meetingStatus, message);
}
@Override
public void markStage(Long meetingId, AiTask task, Integer meetingStatus, MeetingProgressStage stage, int percent, String message, int eta) {
writeSnapshot(buildSnapshot(meetingId, task, meetingStatus, stage, percent, message, eta));
}
@Override
public void markStageAfterCommitOrNow(Long meetingId, AiTask task, Integer meetingStatus, MeetingProgressStage stage, int percent, String message, int eta) {
markStage(meetingId, task, meetingStatus, stage, percent, message, eta);
}
@Override
public void syncFromDatabase(Long meetingId) {
// No-op for constructor compatibility in tests.
}
@Override
public void writeSnapshot(MeetingProgressSnapshot snapshot) {
if (redisTemplate == null || snapshot == null || snapshot.getMeetingId() == null) {
return;
}
try {
redisTemplate.opsForValue().set(
RedisKeys.meetingProgressKey(snapshot.getMeetingId()),
objectMapper.writeValueAsString(snapshot),
1,
TimeUnit.HOURS
);
} catch (Exception ignored) {
// Compatibility adapter keeps test setup lightweight.
}
}
private MeetingProgressSnapshot readSnapshot(Long meetingId) {
if (redisTemplate == null || meetingId == null) {
return null;
}
try {
String raw = redisTemplate.opsForValue().get(RedisKeys.meetingProgressKey(meetingId));
if (raw == null || raw.isBlank()) {
return null;
}
return objectMapper.readValue(raw, MeetingProgressSnapshot.class);
} catch (Exception ignored) {
return null;
}
}
private MeetingProgressSnapshot buildSnapshot(Long meetingId,
AiTask task,
Integer meetingStatus,
MeetingProgressStage stage,
int percent,
String message,
int eta) {
return MeetingProgressSnapshot.builder()
.meetingId(meetingId)
.taskId(task == null ? null : task.getId())
.taskType(task == null ? null : task.getTaskType())
.taskStatus(task == null ? null : task.getStatus())
.meetingStatus(meetingStatus)
.stage(stage.getCode())
.stageOrder(stage.getOrder())
.percent(percent)
.message(message)
.eta(eta)
.queuedAt(task == null ? null : task.getQueuedAt())
.startedAt(task == null ? null : task.getStartedAt())
.completedAt(task == null ? null : task.getCompletedAt())
.updateAt(System.currentTimeMillis())
.build();
}
}

View File

@ -19,16 +19,18 @@ import com.imeeting.entity.biz.Meeting;
import com.imeeting.entity.biz.PromptTemplate;
import com.imeeting.service.biz.AiTaskService;
import com.imeeting.service.biz.MeetingAccessService;
import com.imeeting.service.biz.MeetingProgressService;
import com.imeeting.service.biz.MeetingQueryService;
import com.imeeting.service.biz.MeetingService;
import com.imeeting.service.biz.MeetingTranscriptChapterService;
import com.imeeting.service.biz.MeetingTranscriptFileService;
import com.imeeting.service.biz.PromptTemplateService;
import com.imeeting.service.biz.impl.RedisOnlyMeetingProgressServiceAdapter;
import com.unisbase.dto.PageResult;
import com.unisbase.entity.SysUser;
import com.unisbase.mapper.SysUserMapper;
import com.unisbase.security.LoginUser;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.security.core.Authentication;
@ -44,7 +46,6 @@ import java.util.Objects;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
public class MeetingMcpToolService {
private static final String STAGE_DATA_INITIALIZATION = "data_initialization";
@ -60,9 +61,56 @@ public class MeetingMcpToolService {
private final MeetingTranscriptFileService meetingTranscriptFileService;
private final MeetingTranscriptChapterService meetingTranscriptChapterService;
private final SysUserMapper sysUserMapper;
private final StringRedisTemplate redisTemplate;
private final MeetingProgressService meetingProgressService;
private final ObjectMapper objectMapper;
@Autowired
public MeetingMcpToolService(MeetingQueryService meetingQueryService,
MeetingAccessService meetingAccessService,
MeetingService meetingService,
AiTaskService aiTaskService,
PromptTemplateService promptTemplateService,
MeetingTranscriptFileService meetingTranscriptFileService,
MeetingTranscriptChapterService meetingTranscriptChapterService,
SysUserMapper sysUserMapper,
MeetingProgressService meetingProgressService,
ObjectMapper objectMapper) {
this.meetingQueryService = meetingQueryService;
this.meetingAccessService = meetingAccessService;
this.meetingService = meetingService;
this.aiTaskService = aiTaskService;
this.promptTemplateService = promptTemplateService;
this.meetingTranscriptFileService = meetingTranscriptFileService;
this.meetingTranscriptChapterService = meetingTranscriptChapterService;
this.sysUserMapper = sysUserMapper;
this.meetingProgressService = meetingProgressService;
this.objectMapper = objectMapper;
}
public MeetingMcpToolService(MeetingQueryService meetingQueryService,
MeetingAccessService meetingAccessService,
MeetingService meetingService,
AiTaskService aiTaskService,
PromptTemplateService promptTemplateService,
MeetingTranscriptFileService meetingTranscriptFileService,
MeetingTranscriptChapterService meetingTranscriptChapterService,
SysUserMapper sysUserMapper,
StringRedisTemplate redisTemplate,
ObjectMapper objectMapper) {
this(
meetingQueryService,
meetingAccessService,
meetingService,
aiTaskService,
promptTemplateService,
meetingTranscriptFileService,
meetingTranscriptChapterService,
sysUserMapper,
new RedisOnlyMeetingProgressServiceAdapter(redisTemplate, objectMapper),
objectMapper
);
}
@Value("${unisbase.app.server-base-url:}")
private String serverBaseUrl;
@ -192,6 +240,13 @@ public class MeetingMcpToolService {
}
Integer realtimeProgress = resolveRealtimeProgress(meetingId);
if (asrTask != null && Integer.valueOf(0).equals(asrTask.getStatus()) && realtimeProgress != null && realtimeProgress <= 0) {
return new LegacyMeetingPreviewResult(
"400",
"会议正在处理中",
buildProcessingPreview(meeting, summaryTask, processingStatus("会议数据准备中", 25, STAGE_DATA_INITIALIZATION))
);
}
if (realtimeProgress != null) {
if (realtimeProgress >= 100) {
MeetingVO completedDetail = detail != null ? detail : meetingQueryService.getDetail(meetingId);
@ -340,11 +395,11 @@ public class MeetingMcpToolService {
}
private boolean isRunningAsr(AiTask task) {
return task != null && (Integer.valueOf(0).equals(task.getStatus()) || Integer.valueOf(1).equals(task.getStatus()));
return task != null && Integer.valueOf(1).equals(task.getStatus());
}
private boolean isRunningSummary(AiTask task) {
return task != null && (Integer.valueOf(0).equals(task.getStatus()) || Integer.valueOf(1).equals(task.getStatus()));
return task != null && Integer.valueOf(1).equals(task.getStatus());
}
private boolean isFailed(AiTask task) {
@ -514,22 +569,13 @@ public class MeetingMcpToolService {
}
private boolean isAsrStage(Integer meetingStatus, AiTask asrTask, boolean hasAudio, boolean isSummaryStage) {
return Integer.valueOf(1).equals(meetingStatus)
return (Integer.valueOf(1).equals(meetingStatus) && (asrTask == null || !Integer.valueOf(0).equals(asrTask.getStatus())))
|| isRunningAsr(asrTask)
|| (hasAudio && !isSummaryStage);
|| (asrTask == null && hasAudio && !isSummaryStage);
}
private Integer resolveRealtimeProgress(Long meetingId) {
String rawProgress = redisTemplate.opsForValue().get(RedisKeys.meetingProgressKey(meetingId));
if (rawProgress == null || rawProgress.isBlank()) {
return null;
}
try {
JsonNode progress = objectMapper.readTree(rawProgress);
return progress.hasNonNull("percent") ? progress.path("percent").asInt() : null;
} catch (Exception ignored) {
return null;
}
return meetingProgressService.resolvePercent(meetingId);
}
private LegacyMeetingProcessingStatusResponse processingStatus(String overallStatus, int overallProgress, String currentStage) {

View File

@ -0,0 +1,74 @@
package com.imeeting.support;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
@RequiredArgsConstructor
public class RedisValueSupport {
private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
public <T> T getJson(String key, Class<T> type) {
String raw = getString(key);
if (raw == null || raw.isBlank()) {
return null;
}
try {
return objectMapper.readValue(raw, type);
} catch (Exception ex) {
log.warn("Failed to parse redis json, key={}", key, ex);
return null;
}
}
public String getString(String key) {
try {
return redisTemplate.opsForValue().get(key);
} catch (Exception ex) {
log.warn("Failed to get redis value, key={}", key, ex);
return null;
}
}
public void setJson(String key, Object value, long ttl, TimeUnit unit) {
try {
redisTemplate.opsForValue().set(key, objectMapper.writeValueAsString(value), ttl, unit);
} catch (Exception ex) {
log.warn("Failed to write redis json, key={}", key, ex);
}
}
public Boolean setIfAbsent(String key, String value, long ttl, TimeUnit unit) {
try {
return redisTemplate.opsForValue().setIfAbsent(key, value, ttl, unit);
} catch (Exception ex) {
log.warn("Failed to acquire redis lock, key={}", key, ex);
return false;
}
}
public void delete(String key) {
try {
redisTemplate.delete(key);
} catch (Exception ex) {
log.warn("Failed to delete redis key, key={}", key, ex);
}
}
public void delete(Collection<String> keys) {
try {
redisTemplate.delete(keys);
} catch (Exception ex) {
log.warn("Failed to delete redis keys, keys={}", keys, ex);
}
}
}

View File

@ -369,6 +369,16 @@ const MeetingProgressDisplay: React.FC<{
}> = ({ meetingId, onComplete, onProgressUpdate, onProgressChange, compact, inline }) => {
const [progress, setProgress] = useState<MeetingProgress | null>(null);
const onCompleteRef = useRef(onComplete);
const onProgressUpdateRef = useRef(onProgressUpdate);
const onProgressChangeRef = useRef(onProgressChange);
useEffect(() => {
onCompleteRef.current = onComplete;
onProgressUpdateRef.current = onProgressUpdate;
onProgressChangeRef.current = onProgressChange;
}, [onComplete, onProgressUpdate, onProgressChange]);
useEffect(() => {
let completed = false;
@ -383,10 +393,10 @@ const MeetingProgressDisplay: React.FC<{
]);
if (detailRes.data?.data) {
onProgressUpdate?.(detailRes.data.data);
onProgressUpdateRef.current?.(detailRes.data.data);
if (detailRes.data.data.status !== 1 && detailRes.data.data.status !== 2) {
completed = true;
onComplete();
onCompleteRef.current?.();
return;
}
}
@ -394,10 +404,10 @@ const MeetingProgressDisplay: React.FC<{
if (progressRes.data?.data) {
const nextProgress = progressRes.data.data;
setProgress(nextProgress);
onProgressChange?.(nextProgress);
onProgressChangeRef.current?.(nextProgress);
if (nextProgress.percent === 100 || nextProgress.percent < 0) {
completed = true;
onComplete();
onCompleteRef.current?.();
}
}
} catch {
@ -411,7 +421,7 @@ const MeetingProgressDisplay: React.FC<{
completed = true;
clearInterval(timer);
};
}, [meetingId, onComplete, onProgressChange, onProgressUpdate]);
}, [meetingId]);
const percent = progress?.percent || 0;
const isError = percent < 0;
@ -2066,7 +2076,7 @@ const MeetingDetail: React.FC = () => {
meetingId={meeting.id}
onComplete={() => fetchData(meeting.id)}
onProgressUpdate={(updated) => {
if (updated.status === 2 || updated.status !== meeting.status) {
if (updated.status !== meeting.status) {
void fetchData(updated.id);
}
}}