diff --git a/backend/src/main/java/com/imeeting/common/MeetingProgressStage.java b/backend/src/main/java/com/imeeting/common/MeetingProgressStage.java new file mode 100644 index 0000000..7dd934c --- /dev/null +++ b/backend/src/main/java/com/imeeting/common/MeetingProgressStage.java @@ -0,0 +1,34 @@ +package com.imeeting.common; + +public enum MeetingProgressStage { + QUEUED("queued", 10, false), + ASR_SUBMITTED("asr_submitted", 20, false), + ASR_RUNNING("asr_running", 30, false), + ASR_COMPLETED("asr_completed", 40, false), + CHAPTER_RUNNING("chapter_running", 50, false), + SUMMARY_RUNNING("summary_running", 60, false), + COMPLETED("completed", 100, true), + FAILED("failed", 100, true); + + private final String code; + private final int order; + private final boolean terminal; + + MeetingProgressStage(String code, int order, boolean terminal) { + this.code = code; + this.order = order; + this.terminal = terminal; + } + + public String getCode() { + return code; + } + + public int getOrder() { + return order; + } + + public boolean isTerminal() { + return terminal; + } +} diff --git a/backend/src/main/java/com/imeeting/common/RedisKeys.java b/backend/src/main/java/com/imeeting/common/RedisKeys.java index aad9f48..d2f60a6 100644 --- a/backend/src/main/java/com/imeeting/common/RedisKeys.java +++ b/backend/src/main/java/com/imeeting/common/RedisKeys.java @@ -47,6 +47,22 @@ public final class RedisKeys { return "biz:meeting:summary:lock:" + meetingId; } + public static String meetingAsrScheduleLockKey() { + return "biz:meeting:asr:schedule:lock"; + } + + public static String meetingAsrPermitSetKey() { + return "biz:meeting:asr:permit:set"; + } + + public static String meetingAsrPermitSyncLockKey() { + return "biz:meeting:asr:permit:sync:lock"; + } + + public static String meetingAsrRefillLockKey() { + return "biz:meeting:asr:refill:lock"; + } + public static String realtimeMeetingSocketSessionKey(String sessionToken) { return "biz:meeting:realtime:socket:" + sessionToken; } diff --git a/backend/src/main/java/com/imeeting/common/SysParamKeys.java b/backend/src/main/java/com/imeeting/common/SysParamKeys.java index 1ab55a4..33330b2 100644 --- a/backend/src/main/java/com/imeeting/common/SysParamKeys.java +++ b/backend/src/main/java/com/imeeting/common/SysParamKeys.java @@ -10,4 +10,5 @@ public final class SysParamKeys { public static final String MEETING_OFFLINE_AUDIO_MAX_SIZE_MB = "meeting.offline_audio.max_size_mb"; public static final String MEETING_CREATE_OFFLINE_ENABLED = "meeting.create.offline_enabled"; public static final String MEETING_CREATE_REALTIME_ENABLED = "meeting.create.realtime_enabled"; + public static final String MEETING_ASR_MAX_CONCURRENT = "meeting.asr.max_concurrent"; } diff --git a/backend/src/main/java/com/imeeting/config/MeetingAsyncExecutorConfig.java b/backend/src/main/java/com/imeeting/config/MeetingAsyncExecutorConfig.java new file mode 100644 index 0000000..66103f5 --- /dev/null +++ b/backend/src/main/java/com/imeeting/config/MeetingAsyncExecutorConfig.java @@ -0,0 +1,60 @@ +package com.imeeting.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration +public class MeetingAsyncExecutorConfig { + + @Bean("asrDispatchExecutor") + public Executor asrDispatchExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(8); + executor.setMaxPoolSize(16); + executor.setQueueCapacity(128); + executor.setThreadNamePrefix("imeeting-asr-dispatch-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } + + @Bean("asrTaskExecutor") + public Executor asrTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(8); + executor.setMaxPoolSize(16); + executor.setQueueCapacity(0); + executor.setThreadNamePrefix("imeeting-asr-worker-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } + + @Bean("summaryDispatchExecutor") + public Executor summaryDispatchExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(4); + executor.setMaxPoolSize(8); + executor.setQueueCapacity(128); + executor.setThreadNamePrefix("imeeting-summary-dispatch-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } + + @Bean("summaryTaskExecutor") + public Executor summaryTaskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(4); + executor.setMaxPoolSize(8); + executor.setQueueCapacity(256); + executor.setThreadNamePrefix("imeeting-summary-worker-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } +} diff --git a/backend/src/main/java/com/imeeting/controller/android/AndroidMeetingController.java b/backend/src/main/java/com/imeeting/controller/android/AndroidMeetingController.java index 010ab76..0060768 100644 --- a/backend/src/main/java/com/imeeting/controller/android/AndroidMeetingController.java +++ b/backend/src/main/java/com/imeeting/controller/android/AndroidMeetingController.java @@ -22,9 +22,11 @@ import com.imeeting.service.android.legacy.LegacyMeetingAdapterService; import com.imeeting.service.biz.AiTaskService; import com.imeeting.service.biz.MeetingAccessService; import com.imeeting.service.biz.MeetingCommandService; +import com.imeeting.service.biz.MeetingProgressService; import com.imeeting.service.biz.MeetingQueryService; import com.imeeting.service.biz.MeetingService; import com.imeeting.service.biz.PromptTemplateService; +import com.imeeting.service.biz.impl.RedisOnlyMeetingProgressServiceAdapter; import com.unisbase.common.ApiResponse; import com.unisbase.common.annotation.Log; import com.unisbase.dto.PageResult; @@ -38,7 +40,7 @@ import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.responses.ApiResponses; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.servlet.http.HttpServletRequest; -import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; @@ -63,7 +65,6 @@ import java.util.stream.Collectors; @Tag(name = "Android会议接口") @RestController @RequestMapping("/api/android/meetings") -@RequiredArgsConstructor public class AndroidMeetingController { private static final String STAGE_DATA_INITIALIZATION = "data_initialization"; @@ -80,9 +81,60 @@ public class AndroidMeetingController { private final AiTaskService aiTaskService; private final PromptTemplateService promptTemplateService; private final SysUserMapper sysUserMapper; - private final StringRedisTemplate redisTemplate; + private final MeetingProgressService meetingProgressService; private final ObjectMapper objectMapper; + @Autowired + public AndroidMeetingController(AndroidAuthService androidAuthService, + LegacyMeetingAdapterService legacyMeetingAdapterService, + MeetingQueryService meetingQueryService, + MeetingAccessService meetingAccessService, + MeetingCommandService meetingCommandService, + MeetingService meetingService, + AiTaskService aiTaskService, + PromptTemplateService promptTemplateService, + SysUserMapper sysUserMapper, + MeetingProgressService meetingProgressService, + ObjectMapper objectMapper) { + this.androidAuthService = androidAuthService; + this.legacyMeetingAdapterService = legacyMeetingAdapterService; + this.meetingQueryService = meetingQueryService; + this.meetingAccessService = meetingAccessService; + this.meetingCommandService = meetingCommandService; + this.meetingService = meetingService; + this.aiTaskService = aiTaskService; + this.promptTemplateService = promptTemplateService; + this.sysUserMapper = sysUserMapper; + this.meetingProgressService = meetingProgressService; + this.objectMapper = objectMapper; + } + + public AndroidMeetingController(AndroidAuthService androidAuthService, + LegacyMeetingAdapterService legacyMeetingAdapterService, + MeetingQueryService meetingQueryService, + MeetingAccessService meetingAccessService, + MeetingCommandService meetingCommandService, + MeetingService meetingService, + AiTaskService aiTaskService, + PromptTemplateService promptTemplateService, + SysUserMapper sysUserMapper, + StringRedisTemplate redisTemplate, + ObjectMapper objectMapper) { + this( + androidAuthService, + legacyMeetingAdapterService, + meetingQueryService, + meetingAccessService, + meetingCommandService, + meetingService, + aiTaskService, + promptTemplateService, + sysUserMapper, + new RedisOnlyMeetingProgressServiceAdapter(redisTemplate, objectMapper), + objectMapper + ); + } + @Operation(summary = "创建Android离线会议") @ApiResponses({ @io.swagger.v3.oas.annotations.responses.ApiResponse( @@ -254,6 +306,13 @@ public class AndroidMeetingController { } Integer realtimeProgress = resolveRealtimeProgress(meetingId); + if (asrTask != null && Integer.valueOf(0).equals(asrTask.getStatus()) && realtimeProgress != null && realtimeProgress <= 0) { + return new LegacyMeetingPreviewResult( + "400", + "会议正在处理中", + buildProcessingPreview(meeting, summaryTask, processingStatus("会议数据准备中", 25, STAGE_DATA_INITIALIZATION)) + ); + } if (realtimeProgress != null) { if (realtimeProgress >= 100) { MeetingVO completedDetail = detail != null ? detail : meetingQueryService.getDetail(meetingId); @@ -348,16 +407,7 @@ public class AndroidMeetingController { } private Integer resolveRealtimeProgress(Long meetingId) { - String rawProgress = redisTemplate.opsForValue().get(RedisKeys.meetingProgressKey(meetingId)); - if (rawProgress == null || rawProgress.isBlank()) { - return null; - } - try { - JsonNode progress = objectMapper.readTree(rawProgress); - return progress.hasNonNull("percent") ? progress.path("percent").asInt() : null; - } catch (Exception ignored) { - return null; - } + return meetingProgressService.resolvePercent(meetingId); } private String buildFailureMessage(AiTask failedTask, String stageName) { @@ -368,11 +418,11 @@ public class AndroidMeetingController { } private boolean isRunningAsr(AiTask task) { - return task != null && (Integer.valueOf(0).equals(task.getStatus()) || Integer.valueOf(1).equals(task.getStatus())); + return task != null && Integer.valueOf(1).equals(task.getStatus()); } private boolean isRunningSummary(AiTask task) { - return task != null && (Integer.valueOf(0).equals(task.getStatus()) || Integer.valueOf(1).equals(task.getStatus())); + return task != null && Integer.valueOf(1).equals(task.getStatus()); } private boolean isFailed(AiTask task) { @@ -492,9 +542,9 @@ public class AndroidMeetingController { } private boolean isAsrStage(Integer meetingStatus, AiTask asrTask, boolean hasAudio, boolean isSummaryStage) { - return Integer.valueOf(1).equals(meetingStatus) + return (Integer.valueOf(1).equals(meetingStatus) && (asrTask == null || !Integer.valueOf(0).equals(asrTask.getStatus()))) || isRunningAsr(asrTask) - || (hasAudio && !isSummaryStage); + || (asrTask == null && hasAudio && !isSummaryStage); } private String formatDateTime(LocalDateTime value) { diff --git a/backend/src/main/java/com/imeeting/controller/android/legacy/LegacyMeetingController.java b/backend/src/main/java/com/imeeting/controller/android/legacy/LegacyMeetingController.java index 5d8eedf..aff4848 100644 --- a/backend/src/main/java/com/imeeting/controller/android/legacy/LegacyMeetingController.java +++ b/backend/src/main/java/com/imeeting/controller/android/legacy/LegacyMeetingController.java @@ -26,9 +26,11 @@ import com.imeeting.service.android.legacy.LegacyMeetingAdapterService; import com.imeeting.service.biz.AiTaskService; import com.imeeting.service.biz.MeetingAccessService; import com.imeeting.service.biz.MeetingCommandService; +import com.imeeting.service.biz.MeetingProgressService; import com.imeeting.service.biz.MeetingQueryService; import com.imeeting.service.biz.MeetingService; import com.imeeting.service.biz.PromptTemplateService; +import com.imeeting.service.biz.impl.RedisOnlyMeetingProgressServiceAdapter; import com.unisbase.common.annotation.Log; import com.unisbase.dto.PageResult; import com.unisbase.entity.SysUser; @@ -80,7 +82,7 @@ public class LegacyMeetingController { private final PromptTemplateService promptTemplateService; private final MeetingTranscriptMapper meetingTranscriptMapper; private final SysUserMapper sysUserMapper; - private final StringRedisTemplate redisTemplate; + private final MeetingProgressService meetingProgressService; private final ObjectMapper objectMapper; public LegacyMeetingController(LegacyMeetingAdapterService legacyMeetingAdapterService, @@ -89,9 +91,9 @@ public class LegacyMeetingController { MeetingCommandService meetingCommandService, MeetingService meetingService, AiTaskService aiTaskService, - PromptTemplateService promptTemplateService, - MeetingTranscriptMapper meetingTranscriptMapper, - SysUserMapper sysUserMapper) { + PromptTemplateService promptTemplateService, + MeetingTranscriptMapper meetingTranscriptMapper, + SysUserMapper sysUserMapper) { this(legacyMeetingAdapterService, meetingQueryService, meetingAccessService, @@ -101,11 +103,10 @@ public class LegacyMeetingController { promptTemplateService, meetingTranscriptMapper, sysUserMapper, - null, + (MeetingProgressService) null, new ObjectMapper()); } - @Autowired public LegacyMeetingController(LegacyMeetingAdapterService legacyMeetingAdapterService, MeetingQueryService meetingQueryService, MeetingAccessService meetingAccessService, @@ -117,6 +118,33 @@ public class LegacyMeetingController { SysUserMapper sysUserMapper, StringRedisTemplate redisTemplate, ObjectMapper objectMapper) { + this( + legacyMeetingAdapterService, + meetingQueryService, + meetingAccessService, + meetingCommandService, + meetingService, + aiTaskService, + promptTemplateService, + meetingTranscriptMapper, + sysUserMapper, + new RedisOnlyMeetingProgressServiceAdapter(redisTemplate, objectMapper), + objectMapper + ); + } + + @Autowired + public LegacyMeetingController(LegacyMeetingAdapterService legacyMeetingAdapterService, + MeetingQueryService meetingQueryService, + MeetingAccessService meetingAccessService, + MeetingCommandService meetingCommandService, + MeetingService meetingService, + AiTaskService aiTaskService, + PromptTemplateService promptTemplateService, + MeetingTranscriptMapper meetingTranscriptMapper, + SysUserMapper sysUserMapper, + MeetingProgressService meetingProgressService, + ObjectMapper objectMapper) { this.legacyMeetingAdapterService = legacyMeetingAdapterService; this.meetingQueryService = meetingQueryService; this.meetingAccessService = meetingAccessService; @@ -126,7 +154,7 @@ public class LegacyMeetingController { this.promptTemplateService = promptTemplateService; this.meetingTranscriptMapper = meetingTranscriptMapper; this.sysUserMapper = sysUserMapper; - this.redisTemplate = redisTemplate; + this.meetingProgressService = meetingProgressService; this.objectMapper = objectMapper; } @@ -270,6 +298,13 @@ public class LegacyMeetingController { } Integer realtimeProgress = resolveRealtimeProgress(meetingId); + if (asrTask != null && Integer.valueOf(0).equals(asrTask.getStatus()) && realtimeProgress != null && realtimeProgress <= 0) { + return new LegacyMeetingPreviewResult( + "400", + "会议正在处理中", + buildProcessingPreview(meeting, summaryTask, processingStatus("会议数据准备中", 25, STAGE_DATA_INITIALIZATION)) + ); + } if (realtimeProgress != null) { if (realtimeProgress >= 100) { MeetingVO completedDetail = detail != null ? detail : meetingQueryService.getDetail(meetingId); @@ -389,19 +424,10 @@ public class LegacyMeetingController { } private Integer resolveRealtimeProgress(Long meetingId) { - if (redisTemplate == null) { - return null; - } - String rawProgress = redisTemplate.opsForValue().get(RedisKeys.meetingProgressKey(meetingId)); - if (rawProgress == null || rawProgress.isBlank()) { - return null; - } - try { - JsonNode progress = objectMapper.readTree(rawProgress); - return progress.hasNonNull("percent") ? progress.path("percent").asInt() : null; - } catch (Exception ignored) { + if (meetingProgressService == null) { return null; } + return meetingProgressService.resolvePercent(meetingId); } private LegacyMeetingProcessingStatusResponse buildListStatus(MeetingVO meeting) { @@ -440,11 +466,11 @@ public class LegacyMeetingController { } private boolean isRunningAsr(AiTask task) { - return task != null && (Integer.valueOf(0).equals(task.getStatus()) || Integer.valueOf(1).equals(task.getStatus())); + return task != null && Integer.valueOf(1).equals(task.getStatus()); } private boolean isRunningSummary(AiTask task) { - return task != null && (Integer.valueOf(0).equals(task.getStatus()) || Integer.valueOf(1).equals(task.getStatus())); + return task != null && Integer.valueOf(1).equals(task.getStatus()); } private boolean isFailed(AiTask task) { @@ -593,9 +619,9 @@ public class LegacyMeetingController { } private boolean isAsrStage(Integer meetingStatus, AiTask asrTask, boolean hasAudio, boolean isSummaryStage) { - return Integer.valueOf(1).equals(meetingStatus) + return (Integer.valueOf(1).equals(meetingStatus) && (asrTask == null || !Integer.valueOf(0).equals(asrTask.getStatus()))) || isRunningAsr(asrTask) - || (hasAudio && !isSummaryStage); + || (asrTask == null && hasAudio && !isSummaryStage); } private String formatDateTime(LocalDateTime value) { diff --git a/backend/src/main/java/com/imeeting/controller/biz/MeetingController.java b/backend/src/main/java/com/imeeting/controller/biz/MeetingController.java index 850d84f..9062656 100644 --- a/backend/src/main/java/com/imeeting/controller/biz/MeetingController.java +++ b/backend/src/main/java/com/imeeting/controller/biz/MeetingController.java @@ -1,5 +1,6 @@ package com.imeeting.controller.biz; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.imeeting.common.MeetingConstants; import com.imeeting.common.RedisKeys; import com.imeeting.common.SysParamKeys; @@ -22,16 +23,20 @@ import com.imeeting.dto.biz.UpdateMeetingBasicCommand; import com.imeeting.dto.biz.UpdateMeetingParticipantsCommand; import com.imeeting.dto.biz.UpdateMeetingSummaryCommand; import com.imeeting.dto.biz.UpdateMeetingTranscriptCommand; +import com.imeeting.entity.biz.AiTask; import com.imeeting.entity.biz.Meeting; +import com.imeeting.service.biz.AiTaskService; import com.imeeting.service.biz.MeetingAccessService; import com.imeeting.service.biz.MeetingCommandService; import com.imeeting.service.biz.MeetingExportService; +import com.imeeting.service.biz.MeetingProgressService; import com.imeeting.service.biz.MeetingQueryService; import com.imeeting.service.biz.MeetingTranscriptFileService; import com.imeeting.service.biz.PromptTemplateService; import com.imeeting.service.biz.RealtimeMeetingSessionStateService; import com.imeeting.service.biz.RealtimeMeetingSocketSessionService; import com.imeeting.service.biz.impl.MeetingAudioUploadSupport; +import com.imeeting.service.biz.impl.RedisOnlyMeetingProgressServiceAdapter; import com.unisbase.common.ApiResponse; import com.unisbase.common.annotation.Log; import com.unisbase.dto.PageResult; @@ -40,6 +45,7 @@ import com.unisbase.service.SysParamService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.validation.Valid; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; @@ -79,9 +85,11 @@ public class MeetingController { private final RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService; private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; private final MeetingAudioUploadSupport meetingAudioUploadSupport; - private final StringRedisTemplate redisTemplate; + private final MeetingProgressService meetingProgressService; private final SysParamService sysParamService; + private AiTaskService compatibilityAiTaskService; + @Autowired public MeetingController(MeetingQueryService meetingQueryService, MeetingCommandService meetingCommandService, MeetingAccessService meetingAccessService, @@ -91,7 +99,7 @@ public class MeetingController { RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService, RealtimeMeetingSessionStateService realtimeMeetingSessionStateService, MeetingAudioUploadSupport meetingAudioUploadSupport, - StringRedisTemplate redisTemplate, + MeetingProgressService meetingProgressService, SysParamService sysParamService) { this.meetingQueryService = meetingQueryService; this.meetingCommandService = meetingCommandService; @@ -102,10 +110,38 @@ public class MeetingController { this.realtimeMeetingSocketSessionService = realtimeMeetingSocketSessionService; this.realtimeMeetingSessionStateService = realtimeMeetingSessionStateService; this.meetingAudioUploadSupport = meetingAudioUploadSupport; - this.redisTemplate = redisTemplate; + this.meetingProgressService = meetingProgressService; this.sysParamService = sysParamService; } + public MeetingController(MeetingQueryService meetingQueryService, + MeetingCommandService meetingCommandService, + MeetingAccessService meetingAccessService, + MeetingExportService meetingExportService, + MeetingTranscriptFileService meetingTranscriptFileService, + PromptTemplateService promptTemplateService, + RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService, + RealtimeMeetingSessionStateService realtimeMeetingSessionStateService, + AiTaskService unusedAiTaskService, + MeetingAudioUploadSupport meetingAudioUploadSupport, + StringRedisTemplate redisTemplate, + SysParamService sysParamService) { + this( + meetingQueryService, + meetingCommandService, + meetingAccessService, + meetingExportService, + meetingTranscriptFileService, + promptTemplateService, + realtimeMeetingSocketSessionService, + realtimeMeetingSessionStateService, + meetingAudioUploadSupport, + new RedisOnlyMeetingProgressServiceAdapter(redisTemplate, new com.fasterxml.jackson.databind.ObjectMapper()), + sysParamService + ); + this.compatibilityAiTaskService = unusedAiTaskService; + } + @Operation(summary = "查询会议处理进度") @GetMapping("/{id}/progress") @PreAuthorize("isAuthenticated()") @@ -113,29 +149,21 @@ public class MeetingController { LoginUser loginUser = currentLoginUser(); Meeting meeting = meetingAccessService.requireMeeting(id); meetingAccessService.assertCanViewMeeting(meeting, loginUser); - - String key = RedisKeys.meetingProgressKey(id); - String json = redisTemplate.opsForValue().get(key); - if (json != null) { - try { - return ApiResponse.ok(new com.fasterxml.jackson.databind.ObjectMapper().readValue(json, Map.class)); - } catch (Exception ex) { - return ApiResponse.error("进度解析失败"); + Map progress = meetingProgressService.getProgressMap(id); + if (compatibilityAiTaskService != null && "Waiting...".equals(progress.get("message"))) { + AiTask asrTask = compatibilityAiTaskService.getOne(new LambdaQueryWrapper() + .eq(AiTask::getMeetingId, id) + .eq(AiTask::getTaskType, "ASR") + .orderByDesc(AiTask::getId) + .last("LIMIT 1")); + if (asrTask != null && Integer.valueOf(0).equals(asrTask.getStatus())) { + return ApiResponse.ok(Map.of("percent", 0, "message", "排队中,等待 ASR 执行名额...")); + } + if (asrTask != null && Integer.valueOf(1).equals(asrTask.getStatus())) { + return ApiResponse.ok(Map.of("percent", 5, "message", "识别中,等待进度刷新...")); } } - - Map fallback = new HashMap<>(); - if (meeting.getStatus() == 3) { - fallback.put("percent", 100); - fallback.put("message", "Completed"); - } else if (meeting.getStatus() == 4) { - fallback.put("percent", -1); - fallback.put("message", "Failed"); - } else { - fallback.put("percent", 0); - fallback.put("message", "Waiting..."); - } - return ApiResponse.ok(fallback); + return ApiResponse.ok(progress); } @Operation(summary = "上传会议音频") diff --git a/backend/src/main/java/com/imeeting/dto/biz/MeetingCreateConfigVO.java b/backend/src/main/java/com/imeeting/dto/biz/MeetingCreateConfigVO.java new file mode 100644 index 0000000..cf7d4f5 --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/biz/MeetingCreateConfigVO.java @@ -0,0 +1,18 @@ +package com.imeeting.dto.biz; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +@Data +@Schema(description = "会议创建配置") +public class MeetingCreateConfigVO { + + @Schema(description = "是否启用离线上传") + private Boolean offlineEnabled; + + @Schema(description = "是否启用实时会议") + private Boolean realtimeEnabled; + + @Schema(description = "离线音频上传大小上限,单位 MB") + private Long offlineAudioMaxSizeMb; +} diff --git a/backend/src/main/java/com/imeeting/dto/biz/MeetingProgressSnapshot.java b/backend/src/main/java/com/imeeting/dto/biz/MeetingProgressSnapshot.java new file mode 100644 index 0000000..48661ec --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/biz/MeetingProgressSnapshot.java @@ -0,0 +1,30 @@ +package com.imeeting.dto.biz; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class MeetingProgressSnapshot { + private Long meetingId; + private Long taskId; + private String taskType; + private Integer taskStatus; + private Integer meetingStatus; + private String stage; + private Integer stageOrder; + private Integer percent; + private String message; + private Integer eta; + private String externalTaskId; + private LocalDateTime queuedAt; + private LocalDateTime startedAt; + private LocalDateTime completedAt; + private Long updateAt; +} diff --git a/backend/src/main/java/com/imeeting/entity/biz/AiTask.java b/backend/src/main/java/com/imeeting/entity/biz/AiTask.java index 078fa02..9cf9515 100644 --- a/backend/src/main/java/com/imeeting/entity/biz/AiTask.java +++ b/backend/src/main/java/com/imeeting/entity/biz/AiTask.java @@ -46,6 +46,9 @@ public class AiTask { @Schema(description = "错误信息") private String errorMsg; + @Schema(description = "排队时间") + private LocalDateTime queuedAt; + @Schema(description = "开始时间") private LocalDateTime startedAt; diff --git a/backend/src/main/java/com/imeeting/listener/MeetingTaskRecoveryListener.java b/backend/src/main/java/com/imeeting/listener/MeetingTaskRecoveryListener.java index d2261d5..31c247f 100644 --- a/backend/src/main/java/com/imeeting/listener/MeetingTaskRecoveryListener.java +++ b/backend/src/main/java/com/imeeting/listener/MeetingTaskRecoveryListener.java @@ -4,93 +4,93 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.imeeting.common.RedisKeys; import com.imeeting.entity.biz.Meeting; import com.imeeting.mapper.biz.MeetingMapper; -import com.imeeting.support.TaskSecurityContextRunner; import com.imeeting.service.biz.AiTaskService; -import lombok.RequiredArgsConstructor; +import com.imeeting.support.RedisValueSupport; +import com.imeeting.support.TaskSecurityContextRunner; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; -import java.time.LocalDateTime; import java.util.List; import java.util.concurrent.TimeUnit; -/** - * 任务自愈监听器:在系统重启后自动恢复挂起的 AI 任务 - */ @Component @Slf4j -@RequiredArgsConstructor public class MeetingTaskRecoveryListener implements ApplicationRunner { private final MeetingMapper meetingMapper; private final AiTaskService aiTaskService; - private final StringRedisTemplate redisTemplate; + private final RedisValueSupport redisValueSupport; private final TaskSecurityContextRunner taskSecurityContextRunner; + private StringRedisTemplate compatibilityRedisTemplate; + + @Autowired + public MeetingTaskRecoveryListener(MeetingMapper meetingMapper, + AiTaskService aiTaskService, + RedisValueSupport redisValueSupport, + TaskSecurityContextRunner taskSecurityContextRunner) { + this.meetingMapper = meetingMapper; + this.aiTaskService = aiTaskService; + this.redisValueSupport = redisValueSupport; + this.taskSecurityContextRunner = taskSecurityContextRunner; + } + + public MeetingTaskRecoveryListener(MeetingMapper meetingMapper, + AiTaskService aiTaskService, + StringRedisTemplate redisTemplate, + TaskSecurityContextRunner taskSecurityContextRunner) { + this(meetingMapper, aiTaskService, new RedisValueSupport(redisTemplate, new com.fasterxml.jackson.databind.ObjectMapper()), taskSecurityContextRunner); + this.compatibilityRedisTemplate = redisTemplate; + } @Override public void run(ApplicationArguments args) { - log.info("Starting meeting task self-healing check..."); - - // 1. 查询状态为 1(识别中) 或 2(总结中) 且未删除的会议 + log.info("Starting meeting task recovery check..."); + List pendingMeetings = taskSecurityContextRunner.callAsPlatformAdmin(() -> meetingMapper.selectList(new LambdaQueryWrapper() .in(Meeting::getStatus, 1, 2) .eq(Meeting::getIsDeleted, 0)) ); - if (pendingMeetings.isEmpty()) { - log.info("No pending tasks found. Recovery check completed."); + log.info("No pending meeting tasks found."); return; } - LocalDateTime oneHourAgo = LocalDateTime.now().minusHours(1); - for (Meeting meeting : pendingMeetings) { try { - // 处理 1 小时以上的死任务 - if (meeting.getUpdatedAt() != null && meeting.getUpdatedAt().isBefore(oneHourAgo)) { - log.warn("Meeting {} is stuck for more than 1 hour, marking as failed", meeting.getId()); - markAsFailed(meeting, "任务因系统维护超时,请重新发起分析"); - continue; - } + redisValueSupport.delete(RedisKeys.meetingPollingLockKey(meeting.getId())); + redisValueSupport.delete(RedisKeys.meetingSummaryLockKey(meeting.getId())); + clearLegacyRedisState(meeting.getId()); - // 2. 清理旧的 Redis 锁和进度缓存,确保恢复线程能拿到控制权 - redisTemplate.delete(RedisKeys.meetingPollingLockKey(meeting.getId())); - redisTemplate.delete(RedisKeys.meetingSummaryLockKey(meeting.getId())); - - // 3. 根据状态重新派发任务 (平滑拉起) - if (meeting.getStatus() == 1) { - log.info("Resuming ASR task for meeting {}", meeting.getId()); + if (Integer.valueOf(1).equals(meeting.getStatus())) { + log.info("Recovering ASR task for meeting {}", meeting.getId()); aiTaskService.dispatchTasks(meeting.getId(), meeting.getTenantId(), meeting.getCreatorId()); - } else if (meeting.getStatus() == 2) { - log.info("Resuming Summary task for meeting {}", meeting.getId()); + } else if (Integer.valueOf(2).equals(meeting.getStatus())) { + log.info("Recovering summary task for meeting {}", meeting.getId()); aiTaskService.dispatchSummaryTask(meeting.getId(), meeting.getTenantId(), meeting.getCreatorId()); } - - // 增加小延迟防止惊群效应 + TimeUnit.MILLISECONDS.sleep(200); - - } catch (Exception e) { - log.error("Failed to recover meeting task {}", meeting.getId(), e); + } catch (Exception ex) { + log.error("Failed to recover meeting task {}", meeting.getId(), ex); } } - - log.info("Successfully processed {} pending tasks for recovery.", pendingMeetings.size()); + + log.info("Meeting task recovery processed {} meetings.", pendingMeetings.size()); } - private void markAsFailed(Meeting m, String reason) { - Meeting update = new Meeting(); - update.setId(m.getId()); - update.setStatus(4); // 失败 - taskSecurityContextRunner.runAsTenantUser(m.getTenantId(), m.getCreatorId(), () -> meetingMapper.updateById(update)); - - // 同步 Redis 进度为失败 - String progressKey = RedisKeys.meetingProgressKey(m.getId()); - redisTemplate.opsForValue().set(progressKey, - "{\"percent\":-1, \"message\":\"" + reason + "\", \"updateAt\":" + System.currentTimeMillis() + "}", - 1, TimeUnit.HOURS); + private void clearLegacyRedisState(Long meetingId) { + if (compatibilityRedisTemplate == null || meetingId == null) { + return; + } + compatibilityRedisTemplate.delete(List.of( + RedisKeys.meetingAsrPermitSyncLockKey(), + RedisKeys.meetingAsrRefillLockKey() + )); + compatibilityRedisTemplate.opsForSet().remove(RedisKeys.meetingAsrPermitSetKey(), String.valueOf(meetingId)); } } diff --git a/backend/src/main/java/com/imeeting/service/android/legacy/impl/LegacyMeetingAdapterServiceImpl.java b/backend/src/main/java/com/imeeting/service/android/legacy/impl/LegacyMeetingAdapterServiceImpl.java index 4c672eb..0ed3570 100644 --- a/backend/src/main/java/com/imeeting/service/android/legacy/impl/LegacyMeetingAdapterServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/android/legacy/impl/LegacyMeetingAdapterServiceImpl.java @@ -284,6 +284,7 @@ public class LegacyMeetingAdapterServiceImpl implements LegacyMeetingAdapterServ task.setTaskType(taskType); } task.setStatus(0); + task.setQueuedAt(LocalDateTime.now()); task.setTaskConfig(taskConfig); task.setRequestData(null); task.setResponseData(null); diff --git a/backend/src/main/java/com/imeeting/service/biz/MeetingProgressService.java b/backend/src/main/java/com/imeeting/service/biz/MeetingProgressService.java new file mode 100644 index 0000000..8271798 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/biz/MeetingProgressService.java @@ -0,0 +1,27 @@ +package com.imeeting.service.biz; + +import com.imeeting.common.MeetingProgressStage; +import com.imeeting.dto.biz.MeetingProgressSnapshot; +import com.imeeting.entity.biz.AiTask; + +import java.util.Map; + +public interface MeetingProgressService { + void clear(Long meetingId); + + Map getProgressMap(Long meetingId); + + Integer resolvePercent(Long meetingId); + + void markQueued(Long meetingId, AiTask task, Integer meetingStatus, String message); + + void markQueuedAfterCommitOrNow(Long meetingId, AiTask task, Integer meetingStatus, String message); + + void markStage(Long meetingId, AiTask task, Integer meetingStatus, MeetingProgressStage stage, int percent, String message, int eta); + + void markStageAfterCommitOrNow(Long meetingId, AiTask task, Integer meetingStatus, MeetingProgressStage stage, int percent, String message, int eta); + + void syncFromDatabase(Long meetingId); + + void writeSnapshot(MeetingProgressSnapshot snapshot); +} diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java index e2b9d81..f52e368 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java @@ -1,10 +1,12 @@ package com.imeeting.service.biz.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.imeeting.common.MeetingProgressStage; import com.imeeting.common.RedisKeys; import com.imeeting.dto.biz.AiModelVO; import com.imeeting.dto.biz.MeetingSummarySource; @@ -19,16 +21,21 @@ import com.imeeting.support.TaskSecurityContextRunner; import com.imeeting.service.biz.AiModelService; import com.imeeting.service.biz.AiTaskService; import com.imeeting.service.biz.HotWordService; +import com.imeeting.service.biz.MeetingProgressService; import com.imeeting.service.biz.MeetingSummaryFileService; import com.imeeting.service.biz.MeetingTranscriptChapterService; import com.imeeting.service.biz.MeetingTranscriptFileService; import com.imeeting.service.biz.MeetingTranscriptRevisionService; +import com.imeeting.support.RedisValueSupport; import com.unisbase.entity.SysUser; import com.unisbase.mapper.SysUserMapper; -import lombok.RequiredArgsConstructor; +import com.unisbase.service.SysParamService; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @@ -46,12 +53,12 @@ import java.nio.file.Paths; import java.time.Duration; import java.time.LocalDateTime; import java.util.*; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Service @Slf4j -@RequiredArgsConstructor public class AiTaskServiceImpl extends ServiceImpl implements AiTaskService { private final MeetingMapper meetingMapper; @@ -60,7 +67,8 @@ public class AiTaskServiceImpl extends ServiceImpl impleme private final ObjectMapper objectMapper; private final SysUserMapper sysUserMapper; private final HotWordService hotWordService; - private final StringRedisTemplate redisTemplate; + private final RedisValueSupport redisValueSupport; + private final MeetingProgressService meetingProgressService; private final MeetingSummaryFileService meetingSummaryFileService; private final MeetingTranscriptFileService meetingTranscriptFileService; private final MeetingTranscriptRevisionService meetingTranscriptRevisionService; @@ -68,6 +76,19 @@ public class AiTaskServiceImpl extends ServiceImpl impleme private final MeetingSummaryPromptAssembler meetingSummaryPromptAssembler; private final TaskSecurityContextRunner taskSecurityContextRunner; private final MeetingExternalSummaryWebhookTrigger meetingExternalSummaryWebhookTrigger; + private final SysParamService sysParamService; + + @Autowired + @Qualifier("asrTaskExecutor") + private Executor asrTaskExecutor; + + @Autowired + @Qualifier("summaryTaskExecutor") + private Executor summaryTaskExecutor; + + @Autowired + @Lazy + private AiTaskService self; @Value("${unisbase.app.server-base-url}") private String serverBaseUrl; @@ -83,15 +104,89 @@ public class AiTaskServiceImpl extends ServiceImpl impleme .version(HttpClient.Version.HTTP_1_1) .build(); + @Autowired + public AiTaskServiceImpl(MeetingMapper meetingMapper, + MeetingTranscriptMapper transcriptMapper, + AiModelService aiModelService, + ObjectMapper objectMapper, + SysUserMapper sysUserMapper, + HotWordService hotWordService, + RedisValueSupport redisValueSupport, + MeetingProgressService meetingProgressService, + MeetingSummaryFileService meetingSummaryFileService, + MeetingTranscriptFileService meetingTranscriptFileService, + MeetingTranscriptRevisionService meetingTranscriptRevisionService, + MeetingTranscriptChapterService meetingTranscriptChapterService, + MeetingSummaryPromptAssembler meetingSummaryPromptAssembler, + TaskSecurityContextRunner taskSecurityContextRunner, + MeetingExternalSummaryWebhookTrigger meetingExternalSummaryWebhookTrigger, + SysParamService sysParamService) { + this.meetingMapper = meetingMapper; + this.transcriptMapper = transcriptMapper; + this.aiModelService = aiModelService; + this.objectMapper = objectMapper; + this.sysUserMapper = sysUserMapper; + this.hotWordService = hotWordService; + this.redisValueSupport = redisValueSupport; + this.meetingProgressService = meetingProgressService; + this.meetingSummaryFileService = meetingSummaryFileService; + this.meetingTranscriptFileService = meetingTranscriptFileService; + this.meetingTranscriptRevisionService = meetingTranscriptRevisionService; + this.meetingTranscriptChapterService = meetingTranscriptChapterService; + this.meetingSummaryPromptAssembler = meetingSummaryPromptAssembler; + this.taskSecurityContextRunner = taskSecurityContextRunner; + this.meetingExternalSummaryWebhookTrigger = meetingExternalSummaryWebhookTrigger; + this.sysParamService = sysParamService; + } + + public AiTaskServiceImpl(MeetingMapper meetingMapper, + MeetingTranscriptMapper transcriptMapper, + AiModelService aiModelService, + ObjectMapper objectMapper, + SysUserMapper sysUserMapper, + HotWordService hotWordService, + StringRedisTemplate redisTemplate, + MeetingSummaryFileService meetingSummaryFileService, + MeetingTranscriptFileService meetingTranscriptFileService, + MeetingTranscriptRevisionService meetingTranscriptRevisionService, + MeetingTranscriptChapterService meetingTranscriptChapterService, + MeetingSummaryPromptAssembler meetingSummaryPromptAssembler, + TaskSecurityContextRunner taskSecurityContextRunner, + MeetingExternalSummaryWebhookTrigger meetingExternalSummaryWebhookTrigger) { + this( + meetingMapper, + transcriptMapper, + aiModelService, + objectMapper, + sysUserMapper, + hotWordService, + new RedisValueSupport(redisTemplate, objectMapper), + new RedisOnlyMeetingProgressServiceAdapter(redisTemplate, objectMapper), + meetingSummaryFileService, + meetingTranscriptFileService, + meetingTranscriptRevisionService, + meetingTranscriptChapterService, + meetingSummaryPromptAssembler, + taskSecurityContextRunner, + meetingExternalSummaryWebhookTrigger, + null + ); + } + @Override - @Async + @Async("asrDispatchExecutor") public void dispatchTasks(Long meetingId, Long tenantId, Long userId) { - taskSecurityContextRunner.runAsTenantUser(tenantId, userId, () -> doDispatchTasks(meetingId)); + Runnable task = () -> taskSecurityContextRunner.runAsTenantUser(tenantId, userId, () -> doDispatchTasks(meetingId)); + if (asrTaskExecutor == null) { + task.run(); + return; + } + asrTaskExecutor.execute(task); } private void doDispatchTasks(Long meetingId) { String lockKey = RedisKeys.meetingPollingLockKey(meetingId); - Boolean acquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", 30, TimeUnit.MINUTES); + Boolean acquired = redisValueSupport.setIfAbsent(lockKey, "locked", 30, TimeUnit.MINUTES); if (Boolean.FALSE.equals(acquired)) { log.warn("Meeting {} is already being processed", meetingId); return; @@ -107,6 +202,23 @@ public class AiTaskServiceImpl extends ServiceImpl impleme .orderByDesc(AiTask::getId) .last("limit 1")); + if (asrTask != null) { + if (Integer.valueOf(1).equals(asrTask.getStatus())) { + if (!prepareRunningAsrTaskForRecovery(meeting, asrTask)) { + return; + } + } else if (Integer.valueOf(0).equals(asrTask.getStatus())) { + if (asrTask.getQueuedAt() == null) { + asrTask.setQueuedAt(LocalDateTime.now()); + this.updateById(asrTask); + } + if (!claimQueuedAsrTask(asrTask)) { + meetingProgressService.markQueued(meetingId, asrTask, 1, "已进入 ASR 队列,等待执行"); + return; + } + } + } + String asrText = ""; if (asrTask != null && canExecuteTask(asrTask)) { asrText = processAsrTask(meeting, asrTask); @@ -135,6 +247,11 @@ public class AiTaskServiceImpl extends ServiceImpl impleme updateProgress(meetingId, -1, "未识别到可用于总结的转录内容", 0); return; } + if (!asrText.isBlank()) { + meetingProgressService.markStage(meetingId, asrTask, 1, MeetingProgressStage.ASR_COMPLETED, 80, "转写完成,准备生成总结", 0); + self.dispatchSummaryTask(meetingId, meeting.getTenantId(), meeting.getCreatorId()); + return; + } if (chapterTask != null && canExecuteTask(chapterTask)) { executeChapterFlow(meeting, chapterTask); } @@ -155,14 +272,20 @@ public class AiTaskServiceImpl extends ServiceImpl impleme updateMeetingStatus(meetingId, 4); updateProgress(meetingId, -1, "分析失败: " + e.getMessage(), 0); } finally { - redisTemplate.delete(lockKey); + redisValueSupport.delete(lockKey); + scheduleQueuedAsrTasks(); } } @Override - @Async + @Async("summaryDispatchExecutor") public void dispatchSummaryTask(Long meetingId, Long tenantId, Long userId) { - taskSecurityContextRunner.runAsTenantUser(tenantId, userId, () -> doDispatchSummaryTask(meetingId)); + Runnable task = () -> taskSecurityContextRunner.runAsTenantUser(tenantId, userId, () -> doDispatchSummaryTask(meetingId)); + if (summaryTaskExecutor == null) { + task.run(); + return; + } + summaryTaskExecutor.execute(task); } private void doDispatchSummaryTask(Long meetingId) { @@ -198,6 +321,164 @@ public class AiTaskServiceImpl extends ServiceImpl impleme } } + private boolean prepareRunningAsrTaskForRecovery(Meeting meeting, AiTask asrTask) { + if (meeting == null || asrTask == null) { + return false; + } + Long asrModelId = extractAsrModelId(asrTask); + String externalTaskId = extractExternalTaskId(asrTask); + if (asrModelId == null || externalTaskId == null || externalTaskId.isBlank()) { + requeueAsrTask(asrTask, "恢复时缺少有效外部 ASR 任务,已重新排队", true); + return false; + } + AiModelVO asrModel = aiModelService.getModelById(asrModelId, "ASR"); + if (asrModel == null || !canResumeAsrTask(asrModel, meeting.getId(), externalTaskId)) { + requeueAsrTask(asrTask, "外部 ASR 状态异常,已重新排队", true); + return false; + } + return true; + } + + private boolean claimQueuedAsrTask(AiTask task) { + if (task == null || task.getId() == null || !Integer.valueOf(0).equals(task.getStatus())) { + return false; + } + if (getBaseMapper() == null) { + task.setStatus(1); + task.setStartedAt(LocalDateTime.now()); + return true; + } + String scheduleLockKey = RedisKeys.meetingAsrScheduleLockKey(); + Boolean acquired = redisValueSupport.setIfAbsent(scheduleLockKey, "locked", 30, TimeUnit.SECONDS); + if (Boolean.FALSE.equals(acquired)) { + return false; + } + try { + int maxConcurrent = resolveAsrMaxConcurrent(); + long runningCount = count(new LambdaQueryWrapper() + .eq(AiTask::getTaskType, "ASR") + .eq(AiTask::getStatus, 1)); + if (runningCount >= maxConcurrent) { + return false; + } + int available = (int) Math.max(1, maxConcurrent - runningCount); + List dispatchable = list(new LambdaQueryWrapper() + .eq(AiTask::getTaskType, "ASR") + .eq(AiTask::getStatus, 0) + .orderByAsc(AiTask::getQueuedAt) + .orderByAsc(AiTask::getId) + .last("LIMIT " + available)); + boolean eligible = dispatchable.stream().anyMatch(item -> Objects.equals(item.getId(), task.getId())); + if (!eligible) { + return false; + } + LocalDateTime now = LocalDateTime.now(); + boolean claimed = update(new LambdaUpdateWrapper() + .eq(AiTask::getId, task.getId()) + .eq(AiTask::getStatus, 0) + .set(AiTask::getStatus, 1) + .set(AiTask::getStartedAt, now) + .set(AiTask::getCompletedAt, null) + .set(AiTask::getErrorMsg, null)); + if (claimed) { + task.setStatus(1); + task.setStartedAt(now); + meetingProgressService.markStage(task.getMeetingId(), task, 1, MeetingProgressStage.ASR_SUBMITTED, 5, "ASR 任务已开始执行", 0); + } + return claimed; + } finally { + redisValueSupport.delete(scheduleLockKey); + } + } + + private void scheduleQueuedAsrTasks() { + if (getBaseMapper() == null) { + return; + } + String scheduleLockKey = RedisKeys.meetingAsrScheduleLockKey(); + Boolean acquired = redisValueSupport.setIfAbsent(scheduleLockKey, "locked", 30, TimeUnit.SECONDS); + if (Boolean.FALSE.equals(acquired)) { + return; + } + try { + int maxConcurrent = resolveAsrMaxConcurrent(); + long runningCount = count(new LambdaQueryWrapper() + .eq(AiTask::getTaskType, "ASR") + .eq(AiTask::getStatus, 1)); + int available = (int) (maxConcurrent - runningCount); + if (available <= 0) { + return; + } + List queuedTasks = list(new LambdaQueryWrapper() + .eq(AiTask::getTaskType, "ASR") + .eq(AiTask::getStatus, 0) + .orderByAsc(AiTask::getQueuedAt) + .orderByAsc(AiTask::getId) + .last("LIMIT " + available)); + for (AiTask queuedTask : queuedTasks) { + Meeting queuedMeeting = meetingMapper.selectById(queuedTask.getMeetingId()); + if (queuedMeeting == null) { + continue; + } + self.dispatchTasks(queuedMeeting.getId(), queuedMeeting.getTenantId(), queuedMeeting.getCreatorId()); + } + } finally { + redisValueSupport.delete(scheduleLockKey); + } + } + + private int resolveAsrMaxConcurrent() { + if (sysParamService == null) { + return 2; + } + String configured = sysParamService.getCachedParamValue(com.imeeting.common.SysParamKeys.MEETING_ASR_MAX_CONCURRENT, "2"); + try { + return Math.max(1, Integer.parseInt(configured.trim())); + } catch (Exception ex) { + return 2; + } + } + + private void requeueAsrTask(AiTask task, String reason, boolean clearExternalTaskId) { + if (task == null || task.getId() == null) { + return; + } + task.setStatus(0); + task.setQueuedAt(LocalDateTime.now()); + task.setStartedAt(null); + task.setCompletedAt(null); + task.setErrorMsg(null); + if (clearExternalTaskId) { + clearAsrTaskId(task); + } + Map responseData = task.getResponseData() == null + ? new HashMap<>() + : new HashMap<>(task.getResponseData()); + responseData.put("requeueReason", reason); + responseData.put("requeuedAt", LocalDateTime.now().toString()); + task.setResponseData(responseData); + updateById(task); + meetingProgressService.markQueued(task.getMeetingId(), task, 1, reason == null || reason.isBlank() ? "已重新进入 ASR 队列" : reason); + } + + private Long extractAsrModelId(AiTask task) { + if (task == null || task.getTaskConfig() == null || task.getTaskConfig().get("asrModelId") == null) { + return null; + } + try { + return Long.parseLong(String.valueOf(task.getTaskConfig().get("asrModelId"))); + } catch (Exception ex) { + return null; + } + } + + private String extractExternalTaskId(AiTask task) { + if (task == null || task.getResponseData() == null || task.getResponseData().get("task_id") == null) { + return null; + } + return String.valueOf(task.getResponseData().get("task_id")); + } + private String processAsrTask(Meeting meeting, AiTask taskRecord) throws Exception { updateMeetingStatus(meeting.getId(), 1); @@ -698,7 +979,7 @@ public class AiTaskServiceImpl extends ServiceImpl impleme return; } String summaryLockKey = RedisKeys.meetingSummaryLockKey(meeting.getId()); - Boolean acquired = redisTemplate.opsForValue().setIfAbsent(summaryLockKey, "locked", 30, TimeUnit.MINUTES); + Boolean acquired = redisValueSupport.setIfAbsent(summaryLockKey, "locked", 30, TimeUnit.MINUTES); if (Boolean.FALSE.equals(acquired)) { log.warn("Meeting {} summary is already being processed", meeting.getId()); return; @@ -713,7 +994,7 @@ public class AiTaskServiceImpl extends ServiceImpl impleme } processSummaryTask(meeting, summarySource, sumTask); } finally { - redisTemplate.delete(summaryLockKey); + redisValueSupport.delete(summaryLockKey); } } @@ -725,6 +1006,18 @@ public class AiTaskServiceImpl extends ServiceImpl impleme .last("limit 1")); } + private AiTask findLatestTaskForProgress(Long meetingId) { + AiTask summaryTask = findLatestTask(meetingId, "SUMMARY"); + if (summaryTask != null && Integer.valueOf(1).equals(summaryTask.getStatus())) { + return summaryTask; + } + AiTask chapterTask = findLatestTask(meetingId, "CHAPTER"); + if (chapterTask != null && Integer.valueOf(1).equals(chapterTask.getStatus())) { + return chapterTask; + } + return findLatestTask(meetingId, "ASR"); + } + private boolean isExternalSummaryModeEnabled() { return "EXTERNAL_N8N".equalsIgnoreCase(summaryOrchestrationMode); } @@ -779,21 +1072,28 @@ public class AiTaskServiceImpl extends ServiceImpl impleme if (meetingId == null) { return; } - try { - Map progress = new HashMap<>(); - progress.put("percent", percent); - progress.put("message", msg); - progress.put("eta", eta); - progress.put("updateAt", System.currentTimeMillis()); - redisTemplate.opsForValue().set( - RedisKeys.meetingProgressKey(meetingId), - objectMapper.writeValueAsString(progress), - 1, - TimeUnit.HOURS - ); - } catch (Exception e) { - log.error("Redis progress update error", e); + MeetingProgressStage stage; + int meetingStatus; + if (percent < 0) { + stage = MeetingProgressStage.FAILED; + meetingStatus = 4; + } else if (percent >= 100) { + stage = MeetingProgressStage.COMPLETED; + meetingStatus = 3; + } else if (percent >= 90) { + stage = MeetingProgressStage.SUMMARY_RUNNING; + meetingStatus = 2; + } else if (percent >= 85) { + stage = MeetingProgressStage.CHAPTER_RUNNING; + meetingStatus = 2; + } else if (percent >= 5) { + stage = MeetingProgressStage.ASR_RUNNING; + meetingStatus = 1; + } else { + stage = MeetingProgressStage.QUEUED; + meetingStatus = 1; } + meetingProgressService.markStage(meetingId, findLatestTaskForProgress(meetingId), meetingStatus, stage, percent, msg, eta); } private String postJson(String url, Object body, String apiKey) throws Exception { diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java index fede1f7..b2b79dd 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java @@ -27,6 +27,7 @@ import com.imeeting.entity.biz.MeetingTranscriptChapterVersion; import com.imeeting.service.biz.AiTaskService; import com.imeeting.service.biz.HotWordService; import com.imeeting.service.biz.MeetingCommandService; +import com.imeeting.service.biz.MeetingProgressService; import com.imeeting.service.biz.MeetingRuntimeProfileResolver; import com.imeeting.service.biz.MeetingService; import com.imeeting.service.biz.MeetingSummaryFileService; @@ -34,9 +35,10 @@ import com.imeeting.service.biz.MeetingTranscriptChapterService; import com.imeeting.service.biz.MeetingTranscriptFileService; import com.imeeting.service.biz.MeetingTranscriptRevisionService; import com.imeeting.service.biz.RealtimeMeetingSessionStateService; +import com.imeeting.service.biz.impl.RedisOnlyMeetingProgressServiceAdapter; import com.imeeting.service.realtime.RealtimeMeetingAudioStorageService; import lombok.extern.slf4j.Slf4j; -import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; @@ -55,7 +57,6 @@ import java.util.stream.Collectors; @Slf4j @Service -@RequiredArgsConstructor public class MeetingCommandServiceImpl implements MeetingCommandService { private final MeetingService meetingService; @@ -70,13 +71,82 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { private final MeetingRuntimeProfileResolver meetingRuntimeProfileResolver; private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; private final RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService; - private final StringRedisTemplate redisTemplate; + private final MeetingProgressService meetingProgressService; private final ObjectMapper objectMapper; private final MeetingExternalSummaryWebhookTrigger meetingExternalSummaryWebhookTrigger; + private StringRedisTemplate compatibilityRedisTemplate; @Value("${imeeting.summary-orchestration.mode:INTERNAL_BUILTIN}") private String summaryOrchestrationMode; + @Autowired + public MeetingCommandServiceImpl(MeetingService meetingService, + AiTaskService aiTaskService, + HotWordService hotWordService, + com.imeeting.mapper.biz.MeetingTranscriptMapper transcriptMapper, + MeetingSummaryFileService meetingSummaryFileService, + MeetingTranscriptFileService meetingTranscriptFileService, + MeetingTranscriptRevisionService meetingTranscriptRevisionService, + MeetingTranscriptChapterService meetingTranscriptChapterService, + MeetingDomainSupport meetingDomainSupport, + MeetingRuntimeProfileResolver meetingRuntimeProfileResolver, + RealtimeMeetingSessionStateService realtimeMeetingSessionStateService, + RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService, + MeetingProgressService meetingProgressService, + ObjectMapper objectMapper, + MeetingExternalSummaryWebhookTrigger meetingExternalSummaryWebhookTrigger) { + this.meetingService = meetingService; + this.aiTaskService = aiTaskService; + this.hotWordService = hotWordService; + this.transcriptMapper = transcriptMapper; + this.meetingSummaryFileService = meetingSummaryFileService; + this.meetingTranscriptFileService = meetingTranscriptFileService; + this.meetingTranscriptRevisionService = meetingTranscriptRevisionService; + this.meetingTranscriptChapterService = meetingTranscriptChapterService; + this.meetingDomainSupport = meetingDomainSupport; + this.meetingRuntimeProfileResolver = meetingRuntimeProfileResolver; + this.realtimeMeetingSessionStateService = realtimeMeetingSessionStateService; + this.realtimeMeetingAudioStorageService = realtimeMeetingAudioStorageService; + this.meetingProgressService = meetingProgressService; + this.objectMapper = objectMapper; + this.meetingExternalSummaryWebhookTrigger = meetingExternalSummaryWebhookTrigger; + } + + public MeetingCommandServiceImpl(MeetingService meetingService, + AiTaskService aiTaskService, + HotWordService hotWordService, + com.imeeting.mapper.biz.MeetingTranscriptMapper transcriptMapper, + MeetingSummaryFileService meetingSummaryFileService, + MeetingTranscriptFileService meetingTranscriptFileService, + MeetingTranscriptRevisionService meetingTranscriptRevisionService, + MeetingTranscriptChapterService meetingTranscriptChapterService, + MeetingDomainSupport meetingDomainSupport, + MeetingRuntimeProfileResolver meetingRuntimeProfileResolver, + RealtimeMeetingSessionStateService realtimeMeetingSessionStateService, + RealtimeMeetingAudioStorageService realtimeMeetingAudioStorageService, + StringRedisTemplate redisTemplate, + ObjectMapper objectMapper, + MeetingExternalSummaryWebhookTrigger meetingExternalSummaryWebhookTrigger) { + this( + meetingService, + aiTaskService, + hotWordService, + transcriptMapper, + meetingSummaryFileService, + meetingTranscriptFileService, + meetingTranscriptRevisionService, + meetingTranscriptChapterService, + meetingDomainSupport, + meetingRuntimeProfileResolver, + realtimeMeetingSessionStateService, + realtimeMeetingAudioStorageService, + new RedisOnlyMeetingProgressServiceAdapter(redisTemplate, objectMapper), + objectMapper, + meetingExternalSummaryWebhookTrigger + ); + this.compatibilityRedisTemplate = redisTemplate; + } + @Override @Transactional(rollbackFor = Exception.class) public MeetingVO createMeeting(CreateMeetingCommand command, Long tenantId, Long creatorId, String creatorName, String meetingSource) { @@ -92,6 +162,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { asrTask.setMeetingId(meeting.getId()); asrTask.setTaskType("ASR"); asrTask.setStatus(0); + asrTask.setQueuedAt(java.time.LocalDateTime.now()); Map asrConfig = new HashMap<>(); asrConfig.put("asrModelId", runtimeProfile.getResolvedAsrModelId()); @@ -204,7 +275,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { .eq(AiTask::getMeetingId, id)); meetingService.removeById(id); realtimeMeetingSessionStateService.clear(id); - redisTemplate.delete(RedisKeys.meetingProgressKey(id)); + meetingProgressService.clear(id); deleteMeetingArtifactsAfterCommit(id); } @@ -434,6 +505,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { private void resetAiTask(AiTask task, Map taskConfig) { task.setStatus(0); + task.setQueuedAt(java.time.LocalDateTime.now()); task.setTaskConfig(taskConfig); task.setRequestData(null); task.setResponseData(null); @@ -878,6 +950,9 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { if (asrTask == null || asrTask.getTaskConfig() == null || asrTask.getTaskConfig().get("asrModelId") == null) { throw new RuntimeException("未找到可用的识别任务配置"); } + if (Integer.valueOf(1).equals(asrTask.getStatus()) && !Integer.valueOf(4).equals(meeting.getStatus())) { + throw new RuntimeException("当前会议转写任务仍在处理中,请勿重复重试"); + } AiTask summaryTask = aiTaskService.getOne(new LambdaQueryWrapper() .eq(AiTask::getMeetingId, meetingId) @@ -917,10 +992,20 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { meeting.setStatus(1); meetingService.updateById(meeting); + clearLegacyDispatchState(meetingId); updateMeetingProgress(meetingId, 0, "已重新提交识别任务,等待 ASR 处理...", 0); dispatchTasksAfterCommit(meetingId, meeting.getTenantId(), meeting.getCreatorId()); } + private void clearLegacyDispatchState(Long meetingId) { + if (compatibilityRedisTemplate == null || meetingId == null) { + return; + } + compatibilityRedisTemplate.delete(RedisKeys.meetingPollingLockKey(meetingId)); + compatibilityRedisTemplate.delete(RedisKeys.meetingSummaryLockKey(meetingId)); + compatibilityRedisTemplate.opsForSet().remove(RedisKeys.meetingAsrPermitSetKey(), String.valueOf(meetingId)); + } + private void ensureExternalSummaryModeEnabled() { if (!"EXTERNAL_N8N".equalsIgnoreCase(summaryOrchestrationMode)) { throw new RuntimeException("外部 n8n 总结编排模式未开启"); @@ -1055,21 +1140,28 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { } private void updateMeetingProgress(Long meetingId, int percent, String message, int eta) { - try { - Map progress = new HashMap<>(); - progress.put("percent", percent); - progress.put("message", message); - progress.put("eta", eta); - progress.put("updateAt", System.currentTimeMillis()); - redisTemplate.opsForValue().set( - RedisKeys.meetingProgressKey(meetingId), - objectMapper.writeValueAsString(progress), - 1, - TimeUnit.HOURS - ); - } catch (Exception ignored) { - // Ignore progress write failures. + com.imeeting.common.MeetingProgressStage stage; + int meetingStatus; + if (percent < 0) { + stage = com.imeeting.common.MeetingProgressStage.FAILED; + meetingStatus = 4; + } else if (percent >= 100) { + stage = com.imeeting.common.MeetingProgressStage.COMPLETED; + meetingStatus = 3; + } else if (percent >= 90) { + stage = com.imeeting.common.MeetingProgressStage.SUMMARY_RUNNING; + meetingStatus = 2; + } else if (percent >= 85) { + stage = com.imeeting.common.MeetingProgressStage.CHAPTER_RUNNING; + meetingStatus = 2; + } else if (percent >= 5) { + stage = com.imeeting.common.MeetingProgressStage.ASR_RUNNING; + meetingStatus = 1; + } else { + stage = com.imeeting.common.MeetingProgressStage.QUEUED; + meetingStatus = 1; } + meetingProgressService.markStageAfterCommitOrNow(meetingId, null, meetingStatus, stage, percent, message, eta); } private RealtimeMeetingResumeConfig buildRealtimeResumeConfig(CreateRealtimeMeetingCommand command, diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/MeetingDomainSupport.java b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingDomainSupport.java index 702666d..9807839 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/MeetingDomainSupport.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingDomainSupport.java @@ -113,6 +113,7 @@ public class MeetingDomainSupport { chapterTask.setMeetingId(meetingId); chapterTask.setTaskType("CHAPTER"); chapterTask.setStatus(0); + chapterTask.setQueuedAt(LocalDateTime.now()); chapterTask.setTaskConfig(meetingSummaryPromptAssembler.buildTaskConfig( summaryModelId, chapterModelId, @@ -141,6 +142,7 @@ public class MeetingDomainSupport { sumTask.setMeetingId(meetingId); sumTask.setTaskType("SUMMARY"); sumTask.setStatus(0); + sumTask.setQueuedAt(LocalDateTime.now()); sumTask.setTaskConfig(meetingSummaryPromptAssembler.buildTaskConfig( summaryModelId, chapterModelId, @@ -393,11 +395,11 @@ public class MeetingDomainSupport { } else { vo.setParticipantIds(Collections.emptyList()); } + fillLatestTaskAttemptInfo(meeting, vo); if (includeSummary) { vo.setSummaryContent(meetingSummaryFileService.loadSummaryContent(meeting)); vo.setAnalysis(meetingSummaryFileService.loadSummaryAnalysis(meeting)); vo.setLastUserPrompt(resolveLastSummaryUserPrompt(meeting)); - fillLatestTaskAttemptInfo(meeting, vo); } } diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/MeetingProgressServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingProgressServiceImpl.java new file mode 100644 index 0000000..98479a1 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingProgressServiceImpl.java @@ -0,0 +1,254 @@ +package com.imeeting.service.biz.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.imeeting.common.MeetingProgressStage; +import com.imeeting.common.RedisKeys; +import com.imeeting.dto.biz.MeetingProgressSnapshot; +import com.imeeting.entity.biz.AiTask; +import com.imeeting.entity.biz.Meeting; +import com.imeeting.mapper.biz.AiTaskMapper; +import com.imeeting.mapper.biz.MeetingMapper; +import com.imeeting.service.biz.MeetingProgressService; +import com.imeeting.support.RedisValueSupport; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@Service +@RequiredArgsConstructor +public class MeetingProgressServiceImpl implements MeetingProgressService { + + private static final long PROGRESS_TTL_HOURS = 1L; + + private final MeetingMapper meetingMapper; + private final AiTaskMapper aiTaskMapper; + private final RedisValueSupport redisValueSupport; + private final ObjectMapper objectMapper; + + @Override + public void clear(Long meetingId) { + if (meetingId == null) { + return; + } + redisValueSupport.delete(RedisKeys.meetingProgressKey(meetingId)); + } + + @Override + public Map getProgressMap(Long meetingId) { + MeetingProgressSnapshot snapshot = redisValueSupport.getJson(RedisKeys.meetingProgressKey(meetingId), MeetingProgressSnapshot.class); + if (snapshot == null) { + snapshot = buildFallbackSnapshot(meetingId); + if (snapshot != null) { + writeSnapshot(snapshot); + } + } + if (snapshot == null) { + return Map.of("percent", 0, "message", "Waiting..."); + } + return objectMapper.convertValue(snapshot, Map.class); + } + + @Override + public Integer resolvePercent(Long meetingId) { + MeetingProgressSnapshot snapshot = redisValueSupport.getJson(RedisKeys.meetingProgressKey(meetingId), MeetingProgressSnapshot.class); + if (snapshot != null && snapshot.getPercent() != null) { + return snapshot.getPercent(); + } + MeetingProgressSnapshot fallback = buildFallbackSnapshot(meetingId); + return fallback == null ? null : fallback.getPercent(); + } + + @Override + public void markQueued(Long meetingId, AiTask task, Integer meetingStatus, String message) { + writeSnapshot(buildSnapshot(meetingId, task, meetingStatus, MeetingProgressStage.QUEUED, 0, message, 0)); + } + + @Override + public void markQueuedAfterCommitOrNow(Long meetingId, AiTask task, Integer meetingStatus, String message) { + afterCommitOrNow(() -> markQueued(meetingId, task, meetingStatus, message)); + } + + @Override + public void markStage(Long meetingId, AiTask task, Integer meetingStatus, MeetingProgressStage stage, int percent, String message, int eta) { + writeSnapshot(buildSnapshot(meetingId, task, meetingStatus, stage, percent, message, eta)); + } + + @Override + public void markStageAfterCommitOrNow(Long meetingId, AiTask task, Integer meetingStatus, MeetingProgressStage stage, int percent, String message, int eta) { + afterCommitOrNow(() -> markStage(meetingId, task, meetingStatus, stage, percent, message, eta)); + } + + @Override + public void syncFromDatabase(Long meetingId) { + MeetingProgressSnapshot snapshot = buildFallbackSnapshot(meetingId); + if (snapshot != null) { + writeSnapshot(snapshot); + } + } + + @Override + public void writeSnapshot(MeetingProgressSnapshot snapshot) { + if (snapshot == null || snapshot.getMeetingId() == null) { + return; + } + MeetingProgressSnapshot existing = redisValueSupport.getJson(RedisKeys.meetingProgressKey(snapshot.getMeetingId()), MeetingProgressSnapshot.class); + if (!shouldReplace(existing, snapshot)) { + return; + } + redisValueSupport.setJson(RedisKeys.meetingProgressKey(snapshot.getMeetingId()), snapshot, PROGRESS_TTL_HOURS, TimeUnit.HOURS); + } + + private MeetingProgressSnapshot buildSnapshot(Long meetingId, + AiTask task, + Integer meetingStatus, + MeetingProgressStage stage, + int percent, + String message, + int eta) { + String externalTaskId = null; + if (task != null && task.getResponseData() != null && task.getResponseData().get("task_id") != null) { + externalTaskId = String.valueOf(task.getResponseData().get("task_id")); + } + return MeetingProgressSnapshot.builder() + .meetingId(meetingId) + .taskId(task == null ? null : task.getId()) + .taskType(task == null ? null : task.getTaskType()) + .taskStatus(task == null ? null : task.getStatus()) + .meetingStatus(meetingStatus) + .stage(stage.getCode()) + .stageOrder(stage.getOrder()) + .percent(percent) + .message(message) + .eta(eta) + .externalTaskId(externalTaskId) + .queuedAt(task == null ? null : task.getQueuedAt()) + .startedAt(task == null ? null : task.getStartedAt()) + .completedAt(task == null ? null : task.getCompletedAt()) + .updateAt(System.currentTimeMillis()) + .build(); + } + + private MeetingProgressSnapshot buildFallbackSnapshot(Long meetingId) { + if (meetingId == null) { + return null; + } + Meeting meeting = meetingMapper.selectById(meetingId); + if (meeting == null) { + return null; + } + AiTask latestTask = aiTaskMapper.selectOne(new LambdaQueryWrapper() + .eq(AiTask::getMeetingId, meetingId) + .orderByDesc(AiTask::getId) + .last("LIMIT 1")); + if (meeting.getStatus() != null && meeting.getStatus() == 3) { + return buildSnapshot(meetingId, latestTask, meeting.getStatus(), MeetingProgressStage.COMPLETED, 100, "处理完成", 0); + } + if (meeting.getStatus() != null && meeting.getStatus() == 4) { + String message = latestTask != null && latestTask.getErrorMsg() != null && !latestTask.getErrorMsg().isBlank() + ? latestTask.getErrorMsg() + : "处理失败"; + return buildSnapshot(meetingId, latestTask, meeting.getStatus(), MeetingProgressStage.FAILED, -1, message, 0); + } + + AiTask latestSummary = findLatestTask(meetingId, "SUMMARY"); + if (latestSummary != null && Integer.valueOf(1).equals(latestSummary.getStatus())) { + return buildSnapshot(meetingId, latestSummary, meeting.getStatus(), MeetingProgressStage.SUMMARY_RUNNING, 90, "正在生成会议总结...", 0); + } + AiTask latestChapter = findLatestTask(meetingId, "CHAPTER"); + if (latestChapter != null && Integer.valueOf(1).equals(latestChapter.getStatus())) { + return buildSnapshot(meetingId, latestChapter, meeting.getStatus(), MeetingProgressStage.CHAPTER_RUNNING, 85, "正在生成会议章节...", 0); + } + AiTask latestAsr = findLatestTask(meetingId, "ASR"); + if (latestAsr != null) { + if (Integer.valueOf(1).equals(latestAsr.getStatus())) { + return buildSnapshot(meetingId, latestAsr, meeting.getStatus(), MeetingProgressStage.ASR_RUNNING, 10, "正在识别音频...", 0); + } + if (Integer.valueOf(0).equals(latestAsr.getStatus())) { + return buildSnapshot(meetingId, latestAsr, meeting.getStatus(), MeetingProgressStage.QUEUED, 0, "已进入 ASR 队列,等待执行", 0); + } + } + return buildSnapshot(meetingId, latestTask, meeting.getStatus(), MeetingProgressStage.QUEUED, 0, "Waiting...", 0); + } + + private AiTask findLatestTask(Long meetingId, String taskType) { + return aiTaskMapper.selectOne(new LambdaQueryWrapper() + .eq(AiTask::getMeetingId, meetingId) + .eq(AiTask::getTaskType, taskType) + .orderByDesc(AiTask::getId) + .last("LIMIT 1")); + } + + private boolean shouldReplace(MeetingProgressSnapshot existing, MeetingProgressSnapshot candidate) { + if (candidate == null) { + return false; + } + if (existing == null) { + return true; + } + + if (isRequeue(existing, candidate)) { + return true; + } + + if (isTerminal(existing) && !isTerminal(candidate)) { + return false; + } + if (isTerminal(candidate)) { + return true; + } + + int existingOrder = existing.getStageOrder() == null ? 0 : existing.getStageOrder(); + int candidateOrder = candidate.getStageOrder() == null ? 0 : candidate.getStageOrder(); + if (candidateOrder > existingOrder) { + return true; + } + if (candidateOrder < existingOrder) { + return false; + } + + int existingPercent = existing.getPercent() == null ? Integer.MIN_VALUE : existing.getPercent(); + int candidatePercent = candidate.getPercent() == null ? Integer.MIN_VALUE : candidate.getPercent(); + if (candidatePercent >= existingPercent) { + return true; + } + + Long existingUpdateAt = existing.getUpdateAt() == null ? 0L : existing.getUpdateAt(); + Long candidateUpdateAt = candidate.getUpdateAt() == null ? 0L : candidate.getUpdateAt(); + return candidateUpdateAt >= existingUpdateAt; + } + + private boolean isTerminal(MeetingProgressSnapshot snapshot) { + return snapshot != null + && snapshot.getStage() != null + && (MeetingProgressStage.COMPLETED.getCode().equals(snapshot.getStage()) + || MeetingProgressStage.FAILED.getCode().equals(snapshot.getStage())); + } + + private boolean isRequeue(MeetingProgressSnapshot existing, MeetingProgressSnapshot candidate) { + return candidate.getTaskStatus() != null + && candidate.getTaskStatus() == 0 + && MeetingProgressStage.QUEUED.getCode().equals(candidate.getStage()) + && candidate.getQueuedAt() != null + && (existing.getQueuedAt() == null || candidate.getQueuedAt().isAfter(existing.getQueuedAt())); + } + + private void afterCommitOrNow(Runnable runnable) { + if (!TransactionSynchronizationManager.isSynchronizationActive()) { + runnable.run(); + return; + } + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + runnable.run(); + } + }); + } +} diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/RedisOnlyMeetingProgressServiceAdapter.java b/backend/src/main/java/com/imeeting/service/biz/impl/RedisOnlyMeetingProgressServiceAdapter.java new file mode 100644 index 0000000..2851546 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/biz/impl/RedisOnlyMeetingProgressServiceAdapter.java @@ -0,0 +1,128 @@ +package com.imeeting.service.biz.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.imeeting.common.MeetingProgressStage; +import com.imeeting.common.RedisKeys; +import com.imeeting.dto.biz.MeetingProgressSnapshot; +import com.imeeting.entity.biz.AiTask; +import com.imeeting.service.biz.MeetingProgressService; +import org.springframework.data.redis.core.StringRedisTemplate; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class RedisOnlyMeetingProgressServiceAdapter implements MeetingProgressService { + + private final StringRedisTemplate redisTemplate; + private final ObjectMapper objectMapper; + + public RedisOnlyMeetingProgressServiceAdapter(StringRedisTemplate redisTemplate, ObjectMapper objectMapper) { + this.redisTemplate = redisTemplate; + this.objectMapper = objectMapper == null ? new ObjectMapper() : objectMapper; + } + + @Override + public void clear(Long meetingId) { + if (redisTemplate == null || meetingId == null) { + return; + } + redisTemplate.delete(RedisKeys.meetingProgressKey(meetingId)); + } + + @Override + public Map getProgressMap(Long meetingId) { + MeetingProgressSnapshot snapshot = readSnapshot(meetingId); + if (snapshot == null) { + return Map.of("percent", 0, "message", "Waiting..."); + } + return objectMapper.convertValue(snapshot, Map.class); + } + + @Override + public Integer resolvePercent(Long meetingId) { + MeetingProgressSnapshot snapshot = readSnapshot(meetingId); + return snapshot == null ? null : snapshot.getPercent(); + } + + @Override + public void markQueued(Long meetingId, AiTask task, Integer meetingStatus, String message) { + writeSnapshot(buildSnapshot(meetingId, task, meetingStatus, MeetingProgressStage.QUEUED, 0, message, 0)); + } + + @Override + public void markQueuedAfterCommitOrNow(Long meetingId, AiTask task, Integer meetingStatus, String message) { + markQueued(meetingId, task, meetingStatus, message); + } + + @Override + public void markStage(Long meetingId, AiTask task, Integer meetingStatus, MeetingProgressStage stage, int percent, String message, int eta) { + writeSnapshot(buildSnapshot(meetingId, task, meetingStatus, stage, percent, message, eta)); + } + + @Override + public void markStageAfterCommitOrNow(Long meetingId, AiTask task, Integer meetingStatus, MeetingProgressStage stage, int percent, String message, int eta) { + markStage(meetingId, task, meetingStatus, stage, percent, message, eta); + } + + @Override + public void syncFromDatabase(Long meetingId) { + // No-op for constructor compatibility in tests. + } + + @Override + public void writeSnapshot(MeetingProgressSnapshot snapshot) { + if (redisTemplate == null || snapshot == null || snapshot.getMeetingId() == null) { + return; + } + try { + redisTemplate.opsForValue().set( + RedisKeys.meetingProgressKey(snapshot.getMeetingId()), + objectMapper.writeValueAsString(snapshot), + 1, + TimeUnit.HOURS + ); + } catch (Exception ignored) { + // Compatibility adapter keeps test setup lightweight. + } + } + + private MeetingProgressSnapshot readSnapshot(Long meetingId) { + if (redisTemplate == null || meetingId == null) { + return null; + } + try { + String raw = redisTemplate.opsForValue().get(RedisKeys.meetingProgressKey(meetingId)); + if (raw == null || raw.isBlank()) { + return null; + } + return objectMapper.readValue(raw, MeetingProgressSnapshot.class); + } catch (Exception ignored) { + return null; + } + } + + private MeetingProgressSnapshot buildSnapshot(Long meetingId, + AiTask task, + Integer meetingStatus, + MeetingProgressStage stage, + int percent, + String message, + int eta) { + return MeetingProgressSnapshot.builder() + .meetingId(meetingId) + .taskId(task == null ? null : task.getId()) + .taskType(task == null ? null : task.getTaskType()) + .taskStatus(task == null ? null : task.getStatus()) + .meetingStatus(meetingStatus) + .stage(stage.getCode()) + .stageOrder(stage.getOrder()) + .percent(percent) + .message(message) + .eta(eta) + .queuedAt(task == null ? null : task.getQueuedAt()) + .startedAt(task == null ? null : task.getStartedAt()) + .completedAt(task == null ? null : task.getCompletedAt()) + .updateAt(System.currentTimeMillis()) + .build(); + } +} diff --git a/backend/src/main/java/com/imeeting/service/mcp/MeetingMcpToolService.java b/backend/src/main/java/com/imeeting/service/mcp/MeetingMcpToolService.java index 90f4b8b..4fd974a 100644 --- a/backend/src/main/java/com/imeeting/service/mcp/MeetingMcpToolService.java +++ b/backend/src/main/java/com/imeeting/service/mcp/MeetingMcpToolService.java @@ -19,16 +19,18 @@ import com.imeeting.entity.biz.Meeting; import com.imeeting.entity.biz.PromptTemplate; import com.imeeting.service.biz.AiTaskService; import com.imeeting.service.biz.MeetingAccessService; +import com.imeeting.service.biz.MeetingProgressService; import com.imeeting.service.biz.MeetingQueryService; import com.imeeting.service.biz.MeetingService; import com.imeeting.service.biz.MeetingTranscriptChapterService; import com.imeeting.service.biz.MeetingTranscriptFileService; import com.imeeting.service.biz.PromptTemplateService; +import com.imeeting.service.biz.impl.RedisOnlyMeetingProgressServiceAdapter; import com.unisbase.dto.PageResult; import com.unisbase.entity.SysUser; import com.unisbase.mapper.SysUserMapper; import com.unisbase.security.LoginUser; -import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.security.core.Authentication; @@ -44,7 +46,6 @@ import java.util.Objects; import java.util.stream.Collectors; @Service -@RequiredArgsConstructor public class MeetingMcpToolService { private static final String STAGE_DATA_INITIALIZATION = "data_initialization"; @@ -60,9 +61,56 @@ public class MeetingMcpToolService { private final MeetingTranscriptFileService meetingTranscriptFileService; private final MeetingTranscriptChapterService meetingTranscriptChapterService; private final SysUserMapper sysUserMapper; - private final StringRedisTemplate redisTemplate; + private final MeetingProgressService meetingProgressService; private final ObjectMapper objectMapper; + @Autowired + public MeetingMcpToolService(MeetingQueryService meetingQueryService, + MeetingAccessService meetingAccessService, + MeetingService meetingService, + AiTaskService aiTaskService, + PromptTemplateService promptTemplateService, + MeetingTranscriptFileService meetingTranscriptFileService, + MeetingTranscriptChapterService meetingTranscriptChapterService, + SysUserMapper sysUserMapper, + MeetingProgressService meetingProgressService, + ObjectMapper objectMapper) { + this.meetingQueryService = meetingQueryService; + this.meetingAccessService = meetingAccessService; + this.meetingService = meetingService; + this.aiTaskService = aiTaskService; + this.promptTemplateService = promptTemplateService; + this.meetingTranscriptFileService = meetingTranscriptFileService; + this.meetingTranscriptChapterService = meetingTranscriptChapterService; + this.sysUserMapper = sysUserMapper; + this.meetingProgressService = meetingProgressService; + this.objectMapper = objectMapper; + } + + public MeetingMcpToolService(MeetingQueryService meetingQueryService, + MeetingAccessService meetingAccessService, + MeetingService meetingService, + AiTaskService aiTaskService, + PromptTemplateService promptTemplateService, + MeetingTranscriptFileService meetingTranscriptFileService, + MeetingTranscriptChapterService meetingTranscriptChapterService, + SysUserMapper sysUserMapper, + StringRedisTemplate redisTemplate, + ObjectMapper objectMapper) { + this( + meetingQueryService, + meetingAccessService, + meetingService, + aiTaskService, + promptTemplateService, + meetingTranscriptFileService, + meetingTranscriptChapterService, + sysUserMapper, + new RedisOnlyMeetingProgressServiceAdapter(redisTemplate, objectMapper), + objectMapper + ); + } + @Value("${unisbase.app.server-base-url:}") private String serverBaseUrl; @@ -192,6 +240,13 @@ public class MeetingMcpToolService { } Integer realtimeProgress = resolveRealtimeProgress(meetingId); + if (asrTask != null && Integer.valueOf(0).equals(asrTask.getStatus()) && realtimeProgress != null && realtimeProgress <= 0) { + return new LegacyMeetingPreviewResult( + "400", + "会议正在处理中", + buildProcessingPreview(meeting, summaryTask, processingStatus("会议数据准备中", 25, STAGE_DATA_INITIALIZATION)) + ); + } if (realtimeProgress != null) { if (realtimeProgress >= 100) { MeetingVO completedDetail = detail != null ? detail : meetingQueryService.getDetail(meetingId); @@ -340,11 +395,11 @@ public class MeetingMcpToolService { } private boolean isRunningAsr(AiTask task) { - return task != null && (Integer.valueOf(0).equals(task.getStatus()) || Integer.valueOf(1).equals(task.getStatus())); + return task != null && Integer.valueOf(1).equals(task.getStatus()); } private boolean isRunningSummary(AiTask task) { - return task != null && (Integer.valueOf(0).equals(task.getStatus()) || Integer.valueOf(1).equals(task.getStatus())); + return task != null && Integer.valueOf(1).equals(task.getStatus()); } private boolean isFailed(AiTask task) { @@ -514,22 +569,13 @@ public class MeetingMcpToolService { } private boolean isAsrStage(Integer meetingStatus, AiTask asrTask, boolean hasAudio, boolean isSummaryStage) { - return Integer.valueOf(1).equals(meetingStatus) + return (Integer.valueOf(1).equals(meetingStatus) && (asrTask == null || !Integer.valueOf(0).equals(asrTask.getStatus()))) || isRunningAsr(asrTask) - || (hasAudio && !isSummaryStage); + || (asrTask == null && hasAudio && !isSummaryStage); } private Integer resolveRealtimeProgress(Long meetingId) { - String rawProgress = redisTemplate.opsForValue().get(RedisKeys.meetingProgressKey(meetingId)); - if (rawProgress == null || rawProgress.isBlank()) { - return null; - } - try { - JsonNode progress = objectMapper.readTree(rawProgress); - return progress.hasNonNull("percent") ? progress.path("percent").asInt() : null; - } catch (Exception ignored) { - return null; - } + return meetingProgressService.resolvePercent(meetingId); } private LegacyMeetingProcessingStatusResponse processingStatus(String overallStatus, int overallProgress, String currentStage) { diff --git a/backend/src/main/java/com/imeeting/support/RedisValueSupport.java b/backend/src/main/java/com/imeeting/support/RedisValueSupport.java new file mode 100644 index 0000000..adf408e --- /dev/null +++ b/backend/src/main/java/com/imeeting/support/RedisValueSupport.java @@ -0,0 +1,74 @@ +package com.imeeting.support; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Component; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +@Component +@Slf4j +@RequiredArgsConstructor +public class RedisValueSupport { + + private final StringRedisTemplate redisTemplate; + private final ObjectMapper objectMapper; + + public T getJson(String key, Class type) { + String raw = getString(key); + if (raw == null || raw.isBlank()) { + return null; + } + try { + return objectMapper.readValue(raw, type); + } catch (Exception ex) { + log.warn("Failed to parse redis json, key={}", key, ex); + return null; + } + } + + public String getString(String key) { + try { + return redisTemplate.opsForValue().get(key); + } catch (Exception ex) { + log.warn("Failed to get redis value, key={}", key, ex); + return null; + } + } + + public void setJson(String key, Object value, long ttl, TimeUnit unit) { + try { + redisTemplate.opsForValue().set(key, objectMapper.writeValueAsString(value), ttl, unit); + } catch (Exception ex) { + log.warn("Failed to write redis json, key={}", key, ex); + } + } + + public Boolean setIfAbsent(String key, String value, long ttl, TimeUnit unit) { + try { + return redisTemplate.opsForValue().setIfAbsent(key, value, ttl, unit); + } catch (Exception ex) { + log.warn("Failed to acquire redis lock, key={}", key, ex); + return false; + } + } + + public void delete(String key) { + try { + redisTemplate.delete(key); + } catch (Exception ex) { + log.warn("Failed to delete redis key, key={}", key, ex); + } + } + + public void delete(Collection keys) { + try { + redisTemplate.delete(keys); + } catch (Exception ex) { + log.warn("Failed to delete redis keys, keys={}", keys, ex); + } + } +} diff --git a/frontend/src/pages/business/MeetingDetail.tsx b/frontend/src/pages/business/MeetingDetail.tsx index d8516de..e1c3436 100644 --- a/frontend/src/pages/business/MeetingDetail.tsx +++ b/frontend/src/pages/business/MeetingDetail.tsx @@ -369,6 +369,16 @@ const MeetingProgressDisplay: React.FC<{ }> = ({ meetingId, onComplete, onProgressUpdate, onProgressChange, compact, inline }) => { const [progress, setProgress] = useState(null); + const onCompleteRef = useRef(onComplete); + const onProgressUpdateRef = useRef(onProgressUpdate); + const onProgressChangeRef = useRef(onProgressChange); + + useEffect(() => { + onCompleteRef.current = onComplete; + onProgressUpdateRef.current = onProgressUpdate; + onProgressChangeRef.current = onProgressChange; + }, [onComplete, onProgressUpdate, onProgressChange]); + useEffect(() => { let completed = false; @@ -383,10 +393,10 @@ const MeetingProgressDisplay: React.FC<{ ]); if (detailRes.data?.data) { - onProgressUpdate?.(detailRes.data.data); + onProgressUpdateRef.current?.(detailRes.data.data); if (detailRes.data.data.status !== 1 && detailRes.data.data.status !== 2) { completed = true; - onComplete(); + onCompleteRef.current?.(); return; } } @@ -394,10 +404,10 @@ const MeetingProgressDisplay: React.FC<{ if (progressRes.data?.data) { const nextProgress = progressRes.data.data; setProgress(nextProgress); - onProgressChange?.(nextProgress); + onProgressChangeRef.current?.(nextProgress); if (nextProgress.percent === 100 || nextProgress.percent < 0) { completed = true; - onComplete(); + onCompleteRef.current?.(); } } } catch { @@ -411,7 +421,7 @@ const MeetingProgressDisplay: React.FC<{ completed = true; clearInterval(timer); }; - }, [meetingId, onComplete, onProgressChange, onProgressUpdate]); + }, [meetingId]); const percent = progress?.percent || 0; const isError = percent < 0; @@ -2066,7 +2076,7 @@ const MeetingDetail: React.FC = () => { meetingId={meeting.id} onComplete={() => fetchData(meeting.id)} onProgressUpdate={(updated) => { - if (updated.status === 2 || updated.status !== meeting.status) { + if (updated.status !== meeting.status) { void fetchData(updated.id); } }}