From 3a7baa0341ecef25e87e7eca692062265f55cbc9 Mon Sep 17 00:00:00 2001 From: chenhao Date: Tue, 31 Mar 2026 14:44:36 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E4=BC=9A=E8=AE=AE=E4=BC=9A=E8=AF=9D=E7=8A=B6=E6=80=81=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E5=92=8CWebSocket=E6=8E=A7=E5=88=B6=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 创建 `RealtimeMeetingSessionStateService` 及其实现类,管理实时会议会话状态 - 添加 `RealtimeMeetingSessionExpirationListener` 监听器,处理会话过期事件 - 更新前端API和组件,支持获取和控制实时会议会话状态 - 在 `RealtimeAsrSession` 组件中添加暂停和恢复识别的功能 --- .../java/com/imeeting/common/RedisKeys.java | 24 ++ .../config/RedisKeyExpirationConfig.java | 17 + .../controller/biz/MeetingController.java | 23 ++ .../dto/biz/RealtimeMeetingResumeConfig.java | 19 + .../dto/biz/RealtimeMeetingSessionState.java | 20 + .../biz/RealtimeMeetingSessionStatusVO.java | 15 + ...ltimeMeetingSessionExpirationListener.java | 76 ++++ .../RealtimeMeetingSessionStateService.java | 28 ++ .../service/biz/impl/AiTaskServiceImpl.java | 10 + .../biz/impl/MeetingCommandServiceImpl.java | 20 +- ...ealtimeMeetingSessionStateServiceImpl.java | 379 ++++++++++++++++++ ...altimeMeetingSocketSessionServiceImpl.java | 18 + .../RealtimeMeetingProxyWebSocketHandler.java | 27 +- backend/src/main/resources/application.yml | 8 +- frontend/src/api/business/meeting.ts | 40 +- frontend/src/pages/business/Meetings.tsx | 109 ++++- .../src/pages/business/RealtimeAsrSession.tsx | 128 +++++- frontend/src/routes/routes.tsx | 4 +- 18 files changed, 912 insertions(+), 53 deletions(-) create mode 100644 backend/src/main/java/com/imeeting/config/RedisKeyExpirationConfig.java create mode 100644 backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingResumeConfig.java create mode 100644 backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingSessionState.java create mode 100644 backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingSessionStatusVO.java create mode 100644 backend/src/main/java/com/imeeting/listener/RealtimeMeetingSessionExpirationListener.java create mode 100644 backend/src/main/java/com/imeeting/service/biz/RealtimeMeetingSessionStateService.java create mode 100644 backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java diff --git a/backend/src/main/java/com/imeeting/common/RedisKeys.java b/backend/src/main/java/com/imeeting/common/RedisKeys.java index e2b60d3..71c3cc5 100644 --- a/backend/src/main/java/com/imeeting/common/RedisKeys.java +++ b/backend/src/main/java/com/imeeting/common/RedisKeys.java @@ -47,6 +47,30 @@ public final class RedisKeys { return "biz:meeting:realtime:socket:" + sessionToken; } + public static String realtimeMeetingSessionStateKey(Long meetingId) { + return "biz:meeting:realtime:state:" + meetingId; + } + + public static String realtimeMeetingResumeTimeoutKey(Long meetingId) { + return realtimeMeetingResumeTimeoutPrefix() + meetingId; + } + + public static String realtimeMeetingEmptyTimeoutKey(Long meetingId) { + return realtimeMeetingEmptyTimeoutPrefix() + meetingId; + } + + public static String realtimeMeetingTimeoutLockKey(Long meetingId) { + return "biz:meeting:realtime:timeout:lock:" + meetingId; + } + + public static String realtimeMeetingResumeTimeoutPrefix() { + return "biz:meeting:realtime:resume-timeout:"; + } + + public static String realtimeMeetingEmptyTimeoutPrefix() { + return "biz:meeting:realtime:empty-timeout:"; + } + public static final String CACHE_EMPTY_MARKER = "EMPTY_MARKER"; public static final String SYS_PARAM_FIELD_VALUE = "value"; public static final String SYS_PARAM_FIELD_TYPE = "type"; diff --git a/backend/src/main/java/com/imeeting/config/RedisKeyExpirationConfig.java b/backend/src/main/java/com/imeeting/config/RedisKeyExpirationConfig.java new file mode 100644 index 0000000..17dcfb6 --- /dev/null +++ b/backend/src/main/java/com/imeeting/config/RedisKeyExpirationConfig.java @@ -0,0 +1,17 @@ +package com.imeeting.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; + +@Configuration +public class RedisKeyExpirationConfig { + + @Bean + public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) { + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + return container; + } +} diff --git a/backend/src/main/java/com/imeeting/controller/biz/MeetingController.java b/backend/src/main/java/com/imeeting/controller/biz/MeetingController.java index 1f77697..a5aa612 100644 --- a/backend/src/main/java/com/imeeting/controller/biz/MeetingController.java +++ b/backend/src/main/java/com/imeeting/controller/biz/MeetingController.java @@ -10,6 +10,7 @@ import com.imeeting.dto.biz.MeetingTranscriptVO; import com.imeeting.dto.biz.MeetingVO; import com.imeeting.dto.biz.OpenRealtimeSocketSessionCommand; import com.imeeting.dto.biz.RealtimeMeetingCompleteDTO; +import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO; import com.imeeting.dto.biz.RealtimeSocketSessionVO; import com.imeeting.dto.biz.RealtimeTranscriptItemDTO; import com.imeeting.dto.biz.UpdateMeetingBasicCommand; @@ -22,6 +23,7 @@ import com.imeeting.service.biz.MeetingCommandService; import com.imeeting.service.biz.MeetingExportService; import com.imeeting.service.biz.MeetingQueryService; import com.imeeting.service.biz.PromptTemplateService; +import com.imeeting.service.biz.RealtimeMeetingSessionStateService; import com.imeeting.service.biz.RealtimeMeetingSocketSessionService; import com.unisbase.common.ApiResponse; import com.unisbase.dto.PageResult; @@ -63,6 +65,7 @@ public class MeetingController { private final MeetingExportService meetingExportService; private final PromptTemplateService promptTemplateService; private final RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService; + private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; private final StringRedisTemplate redisTemplate; private final String uploadPath; private final String resourcePrefix; @@ -73,6 +76,7 @@ public class MeetingController { MeetingExportService meetingExportService, PromptTemplateService promptTemplateService, RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService, + RealtimeMeetingSessionStateService realtimeMeetingSessionStateService, StringRedisTemplate redisTemplate, @Value("${unisbase.app.upload-path}") String uploadPath, @Value("${unisbase.app.resource-prefix}") String resourcePrefix) { @@ -82,6 +86,7 @@ public class MeetingController { this.meetingExportService = meetingExportService; this.promptTemplateService = promptTemplateService; this.realtimeMeetingSocketSessionService = realtimeMeetingSocketSessionService; + this.realtimeMeetingSessionStateService = realtimeMeetingSessionStateService; this.redisTemplate = redisTemplate; this.uploadPath = uploadPath; this.resourcePrefix = resourcePrefix; @@ -221,6 +226,15 @@ public class MeetingController { return ApiResponse.ok(meetingQueryService.getTranscripts(id)); } + @GetMapping("/{id}/realtime/session-status") + @PreAuthorize("isAuthenticated()") + public ApiResponse getRealtimeSessionStatus(@PathVariable Long id) { + LoginUser loginUser = currentLoginUser(); + Meeting meeting = meetingAccessService.requireMeeting(id); + meetingAccessService.assertCanManageRealtimeMeeting(meeting, loginUser); + return ApiResponse.ok(realtimeMeetingSessionStateService.getStatus(id)); + } + @PostMapping("/{id}/realtime/transcripts") @PreAuthorize("isAuthenticated()") public ApiResponse appendRealtimeTranscripts(@PathVariable Long id, @RequestBody List items) { @@ -231,6 +245,15 @@ public class MeetingController { return ApiResponse.ok(true); } + @PostMapping("/{id}/realtime/pause") + @PreAuthorize("isAuthenticated()") + public ApiResponse pauseRealtimeMeeting(@PathVariable Long id) { + LoginUser loginUser = currentLoginUser(); + Meeting meeting = meetingAccessService.requireMeeting(id); + meetingAccessService.assertCanManageRealtimeMeeting(meeting, loginUser); + return ApiResponse.ok(realtimeMeetingSessionStateService.pause(id)); + } + @PostMapping("/{id}/realtime/socket-session") @PreAuthorize("isAuthenticated()") public ApiResponse openRealtimeSocketSession(@PathVariable Long id, diff --git a/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingResumeConfig.java b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingResumeConfig.java new file mode 100644 index 0000000..4a55d86 --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingResumeConfig.java @@ -0,0 +1,19 @@ +package com.imeeting.dto.biz; + +import lombok.Data; + +import java.util.List; +import java.util.Map; + +@Data +public class RealtimeMeetingResumeConfig { + private Long asrModelId; + private String mode; + private String language; + private Integer useSpkId; + private Boolean enablePunctuation; + private Boolean enableItn; + private Boolean enableTextRefine; + private Boolean saveAudio; + private List> hotwords; +} diff --git a/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingSessionState.java b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingSessionState.java new file mode 100644 index 0000000..aaa3ab0 --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingSessionState.java @@ -0,0 +1,20 @@ +package com.imeeting.dto.biz; + +import lombok.Data; + +@Data +public class RealtimeMeetingSessionState { + private Long meetingId; + private Long tenantId; + private Long userId; + private String status; + private Boolean hasTranscript; + private Long transcriptCountSnapshot; + private Long lastTranscriptAt; + private Long pauseAt; + private Long resumeExpireAt; + private Long lastResumeAt; + private String activeConnectionId; + private Long updatedAt; + private RealtimeMeetingResumeConfig resumeConfig; +} diff --git a/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingSessionStatusVO.java b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingSessionStatusVO.java new file mode 100644 index 0000000..b802e7d --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/biz/RealtimeMeetingSessionStatusVO.java @@ -0,0 +1,15 @@ +package com.imeeting.dto.biz; + +import lombok.Data; + +@Data +public class RealtimeMeetingSessionStatusVO { + private Long meetingId; + private String status; + private Boolean hasTranscript; + private Boolean canResume; + private Long remainingSeconds; + private Long resumeExpireAt; + private Boolean activeConnection; + private RealtimeMeetingResumeConfig resumeConfig; +} diff --git a/backend/src/main/java/com/imeeting/listener/RealtimeMeetingSessionExpirationListener.java b/backend/src/main/java/com/imeeting/listener/RealtimeMeetingSessionExpirationListener.java new file mode 100644 index 0000000..2e2f9ff --- /dev/null +++ b/backend/src/main/java/com/imeeting/listener/RealtimeMeetingSessionExpirationListener.java @@ -0,0 +1,76 @@ +package com.imeeting.listener; + +import com.imeeting.common.RedisKeys; +import com.imeeting.service.biz.MeetingCommandService; +import com.imeeting.service.biz.RealtimeMeetingSessionStateService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; + +@Component +@Slf4j +public class RealtimeMeetingSessionExpirationListener extends KeyExpirationEventMessageListener { + + private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; + private final MeetingCommandService meetingCommandService; + private final boolean listenerEnabled; + + public RealtimeMeetingSessionExpirationListener( + RedisMessageListenerContainer listenerContainer, + RealtimeMeetingSessionStateService realtimeMeetingSessionStateService, + MeetingCommandService meetingCommandService, + @Value("${imeeting.realtime.redis-expire-listener-enabled:true}") String listenerEnabledRaw + ) { + super(listenerContainer); + this.realtimeMeetingSessionStateService = realtimeMeetingSessionStateService; + this.meetingCommandService = meetingCommandService; + this.listenerEnabled = parseBooleanOrDefault(listenerEnabledRaw, true); + } + + @Override + public void onMessage(Message message, byte[] pattern) { + super.onMessage(message, pattern); + if (!listenerEnabled || message == null || message.getBody() == null) { + return; + } + + String expiredKey = new String(message.getBody(), StandardCharsets.UTF_8); + try { + if (expiredKey.startsWith(RedisKeys.realtimeMeetingResumeTimeoutPrefix())) { + Long meetingId = parseMeetingId(expiredKey, RedisKeys.realtimeMeetingResumeTimeoutPrefix()); + if (meetingId != null && realtimeMeetingSessionStateService.markCompletingIfResumeExpired(meetingId)) { + meetingCommandService.completeRealtimeMeeting(meetingId, null); + } + return; + } + if (expiredKey.startsWith(RedisKeys.realtimeMeetingEmptyTimeoutPrefix())) { + Long meetingId = parseMeetingId(expiredKey, RedisKeys.realtimeMeetingEmptyTimeoutPrefix()); + if (meetingId != null) { + realtimeMeetingSessionStateService.expireEmptySession(meetingId); + } + } + } catch (Exception ex) { + log.error("Handle realtime meeting expiration failed, key={}", expiredKey, ex); + } + } + + private Long parseMeetingId(String key, String prefix) { + String raw = key.substring(prefix.length()); + if (raw.isBlank()) { + return null; + } + return Long.parseLong(raw); + } + + private boolean parseBooleanOrDefault(String raw, boolean defaultValue) { + if (raw == null || raw.isBlank()) { + return defaultValue; + } + return Boolean.parseBoolean(raw.trim()); + } +} diff --git a/backend/src/main/java/com/imeeting/service/biz/RealtimeMeetingSessionStateService.java b/backend/src/main/java/com/imeeting/service/biz/RealtimeMeetingSessionStateService.java new file mode 100644 index 0000000..51aa4d0 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/biz/RealtimeMeetingSessionStateService.java @@ -0,0 +1,28 @@ +package com.imeeting.service.biz; + +import com.imeeting.dto.biz.RealtimeMeetingResumeConfig; +import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO; + +public interface RealtimeMeetingSessionStateService { + void initSessionIfAbsent(Long meetingId, Long tenantId, Long userId); + + void rememberResumeConfig(Long meetingId, RealtimeMeetingResumeConfig resumeConfig); + + void assertCanOpenSession(Long meetingId); + + boolean activate(Long meetingId, String connectionId); + + RealtimeMeetingSessionStatusVO getStatus(Long meetingId); + + RealtimeMeetingSessionStatusVO pause(Long meetingId); + + void pauseByDisconnect(Long meetingId, String connectionId); + + void refreshAfterTranscript(Long meetingId); + + boolean markCompletingIfResumeExpired(Long meetingId); + + void expireEmptySession(Long meetingId); + + void clear(Long meetingId); +} diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java index 54e174f..4b3a85d 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/AiTaskServiceImpl.java @@ -101,6 +101,16 @@ public class AiTaskServiceImpl extends ServiceImpl impleme .collect(Collectors.joining("\n")); } + // Real-time meetings are created without audio files and without ASR tasks. + // If they have no transcripts yet, they must stay resumable instead of being + // pushed into summary flow and accidentally marked completed. + if ((meeting.getAudioUrl() == null || meeting.getAudioUrl().isBlank()) + && asrTask == null + && (asrText == null || asrText.isBlank())) { + updateProgress(meetingId, 0, "等待实时识别开始...", 0); + return; + } + AiTask sumTask = this.getOne(new LambdaQueryWrapper() .eq(AiTask::getMeetingId, meetingId) .eq(AiTask::getTaskType, "SUMMARY") diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java index 6eb967c..b9fbfa1 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java @@ -19,6 +19,7 @@ import com.imeeting.service.biz.HotWordService; import com.imeeting.service.biz.MeetingCommandService; import com.imeeting.service.biz.MeetingService; import com.imeeting.service.biz.MeetingSummaryFileService; +import com.imeeting.service.biz.RealtimeMeetingSessionStateService; import lombok.RequiredArgsConstructor; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; @@ -40,6 +41,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { private final com.imeeting.mapper.biz.MeetingTranscriptMapper transcriptMapper; private final MeetingSummaryFileService meetingSummaryFileService; private final MeetingDomainSupport meetingDomainSupport; + private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; private final StringRedisTemplate redisTemplate; private final ObjectMapper objectMapper; @@ -90,6 +92,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { null, tenantId, creatorId, creatorName, 1); meetingService.save(meeting); meetingDomainSupport.createSummaryTask(meeting.getId(), command.getSummaryModelId(), command.getPromptId()); + realtimeMeetingSessionStateService.initSessionIfAbsent(meeting.getId(), tenantId, creatorId); MeetingVO vo = new MeetingVO(); meetingDomainSupport.fillMeetingVO(meeting, vo, false); @@ -100,6 +103,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { @Transactional(rollbackFor = Exception.class) public void deleteMeeting(Long id) { meetingService.removeById(id); + realtimeMeetingSessionStateService.clear(id); } @Override @@ -119,6 +123,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { .orElse(0); int nextSortOrder = maxSortOrder == null ? 0 : maxSortOrder + 1; + boolean inserted = false; for (RealtimeTranscriptItemDTO item : items) { if (item.getContent() == null || item.getContent().isBlank()) { continue; @@ -144,6 +149,11 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { transcript.setEndTime(item.getEndTime()); transcript.setSortOrder(nextSortOrder++); transcriptMapper.insert(transcript); + inserted = true; + } + + if (inserted) { + realtimeMeetingSessionStateService.refreshAfterTranscript(meetingId); } } @@ -163,14 +173,14 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { long transcriptCount = transcriptMapper.selectCount(new LambdaQueryWrapper() .eq(MeetingTranscript::getMeetingId, meetingId)); if (transcriptCount <= 0) { - meeting.setStatus(4); - meetingService.updateById(meeting); - throw new RuntimeException("当前会议还没有可用的转录文本,无法生成总结"); + realtimeMeetingSessionStateService.pause(meetingId); + throw new RuntimeException("当前还没有转录内容,无法结束会议。请先开始识别,或直接离开页面稍后继续。"); } + realtimeMeetingSessionStateService.clear(meetingId); meeting.setStatus(2); meetingService.updateById(meeting); - updateMeetingProgress(meetingId, 90, "正在生成智能总结纪要...", 0); + updateMeetingProgress(meetingId, 90, "正在生成会议总结...", 0); aiTaskService.dispatchSummaryTask(meetingId); } @@ -260,4 +270,4 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { // Ignore progress write failures. } } -} +} \ No newline at end of file diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java new file mode 100644 index 0000000..bd9dc4e --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java @@ -0,0 +1,379 @@ +package com.imeeting.service.biz.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.imeeting.common.RedisKeys; +import com.imeeting.dto.biz.RealtimeMeetingResumeConfig; +import com.imeeting.dto.biz.RealtimeMeetingSessionState; +import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO; +import com.imeeting.entity.biz.Meeting; +import com.imeeting.entity.biz.MeetingTranscript; +import com.imeeting.mapper.biz.MeetingMapper; +import com.imeeting.mapper.biz.MeetingTranscriptMapper; +import com.imeeting.service.biz.RealtimeMeetingSessionStateService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import java.time.Duration; + +@Service +@Slf4j +@RequiredArgsConstructor +public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSessionStateService { + + private final StringRedisTemplate redisTemplate; + private final ObjectMapper objectMapper; + private final MeetingTranscriptMapper transcriptMapper; + private final MeetingMapper meetingMapper; + + @Value("${imeeting.realtime.resume-window-minutes:30}") + private String resumeWindowMinutesValue; + + @Value("${imeeting.realtime.empty-session-retention-minutes:720}") + private String emptySessionRetentionMinutesValue; + + @Override + public void initSessionIfAbsent(Long meetingId, Long tenantId, Long userId) { + RealtimeMeetingSessionState state = readState(meetingId); + if (state != null) { + return; + } + RealtimeMeetingSessionState next = new RealtimeMeetingSessionState(); + next.setMeetingId(meetingId); + next.setTenantId(tenantId); + next.setUserId(userId); + next.setStatus("IDLE"); + next.setHasTranscript(countTranscripts(meetingId) > 0); + next.setTranscriptCountSnapshot(countTranscripts(meetingId)); + next.setUpdatedAt(System.currentTimeMillis()); + writeState(next); + } + + @Override + public void rememberResumeConfig(Long meetingId, RealtimeMeetingResumeConfig resumeConfig) { + RealtimeMeetingSessionState state = getOrCreateState(meetingId); + state.setResumeConfig(resumeConfig); + state.setUpdatedAt(System.currentTimeMillis()); + writeState(state); + } + + @Override + public void assertCanOpenSession(Long meetingId) { + RealtimeMeetingSessionStatusVO status = getStatus(meetingId); + if (status == null) { + return; + } + + String currentStatus = status.getStatus(); + if ("COMPLETING".equals(currentStatus)) { + throw new RuntimeException("Realtime meeting is completing"); + } + if ("COMPLETED".equals(currentStatus)) { + throw new RuntimeException("Realtime meeting has completed"); + } + if ("ACTIVE".equals(currentStatus) || Boolean.TRUE.equals(status.getActiveConnection())) { + throw new RuntimeException("Realtime meeting already has an active connection"); + } + if ("PAUSED_RESUMABLE".equals(currentStatus) && !Boolean.TRUE.equals(status.getCanResume())) { + throw new RuntimeException("Realtime meeting resume window has expired"); + } + } + + @Override + public boolean activate(Long meetingId, String connectionId) { + if (meetingId == null || connectionId == null || connectionId.isBlank()) { + return false; + } + + RealtimeMeetingSessionState state = getOrCreateState(meetingId); + if ("COMPLETING".equals(state.getStatus()) || "COMPLETED".equals(state.getStatus())) { + return false; + } + + String activeConnectionId = state.getActiveConnectionId(); + if (activeConnectionId != null && !activeConnectionId.isBlank() && !activeConnectionId.equals(connectionId)) { + return false; + } + + long now = System.currentTimeMillis(); + long transcriptCount = countTranscripts(meetingId); + state.setStatus("ACTIVE"); + state.setHasTranscript(transcriptCount > 0); + state.setTranscriptCountSnapshot(transcriptCount); + state.setActiveConnectionId(connectionId); + state.setLastResumeAt(now); + state.setPauseAt(null); + state.setResumeExpireAt(null); + state.setUpdatedAt(now); + writeState(state); + + redisTemplate.delete(RedisKeys.realtimeMeetingResumeTimeoutKey(meetingId)); + redisTemplate.delete(RedisKeys.realtimeMeetingEmptyTimeoutKey(meetingId)); + return true; + } + + @Override + public RealtimeMeetingSessionStatusVO getStatus(Long meetingId) { + RealtimeMeetingSessionState state = readState(meetingId); + if (state == null) { + return buildFallbackStatus(meetingId); + } + return toStatusVO(state); + } + + @Override + public RealtimeMeetingSessionStatusVO pause(Long meetingId) { + RealtimeMeetingSessionState state = getOrCreateState(meetingId); + if ("COMPLETING".equals(state.getStatus()) || "COMPLETED".equals(state.getStatus())) { + return toStatusVO(state); + } + return pauseState(meetingId, state); + } + + @Override + public void pauseByDisconnect(Long meetingId, String connectionId) { + if (meetingId == null || connectionId == null || connectionId.isBlank()) { + return; + } + + RealtimeMeetingSessionState state = readState(meetingId); + if (state == null) { + return; + } + if (!"ACTIVE".equals(state.getStatus())) { + return; + } + if (state.getActiveConnectionId() == null || !connectionId.equals(state.getActiveConnectionId())) { + return; + } + + pauseState(meetingId, state); + } + + @Override + public void refreshAfterTranscript(Long meetingId) { + RealtimeMeetingSessionState state = getOrCreateState(meetingId); + long now = System.currentTimeMillis(); + long transcriptCount = countTranscripts(meetingId); + + state.setHasTranscript(transcriptCount > 0); + state.setTranscriptCountSnapshot(transcriptCount); + state.setLastTranscriptAt(now); + state.setUpdatedAt(now); + + if ("PAUSED_EMPTY".equals(state.getStatus()) || "PAUSED_RESUMABLE".equals(state.getStatus())) { + state.setStatus("PAUSED_RESUMABLE"); + state.setResumeExpireAt(now + Duration.ofMinutes(getResumeWindowMinutes()).toMillis()); + ensureTimeoutKey(meetingId, RedisKeys.realtimeMeetingResumeTimeoutKey(meetingId), Duration.ofMinutes(getResumeWindowMinutes())); + redisTemplate.delete(RedisKeys.realtimeMeetingEmptyTimeoutKey(meetingId)); + } + + writeState(state); + } + + @Override + public boolean markCompletingIfResumeExpired(Long meetingId) { + String lockKey = RedisKeys.realtimeMeetingTimeoutLockKey(meetingId); + Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", Duration.ofMinutes(1)); + if (Boolean.FALSE.equals(locked)) { + return false; + } + + try { + RealtimeMeetingSessionState state = readState(meetingId); + if (state == null || !"PAUSED_RESUMABLE".equals(state.getStatus())) { + return false; + } + + long transcriptCount = countTranscripts(meetingId); + if (transcriptCount <= 0) { + clear(meetingId); + return false; + } + + if (state.getTranscriptCountSnapshot() != null && transcriptCount > state.getTranscriptCountSnapshot()) { + long now = System.currentTimeMillis(); + state.setTranscriptCountSnapshot(transcriptCount); + state.setLastTranscriptAt(now); + state.setResumeExpireAt(now + Duration.ofMinutes(getResumeWindowMinutes()).toMillis()); + writeState(state); + ensureTimeoutKey(meetingId, RedisKeys.realtimeMeetingResumeTimeoutKey(meetingId), Duration.ofMinutes(getResumeWindowMinutes())); + return false; + } + + state.setStatus("COMPLETING"); + state.setUpdatedAt(System.currentTimeMillis()); + writeState(state); + return true; + } finally { + redisTemplate.delete(lockKey); + } + } + + @Override + public void expireEmptySession(Long meetingId) { + RealtimeMeetingSessionState state = readState(meetingId); + if (state == null) { + return; + } + if ("PAUSED_EMPTY".equals(state.getStatus())) { + clear(meetingId); + } + } + + @Override + public void clear(Long meetingId) { + redisTemplate.delete(RedisKeys.realtimeMeetingSessionStateKey(meetingId)); + redisTemplate.delete(RedisKeys.realtimeMeetingResumeTimeoutKey(meetingId)); + redisTemplate.delete(RedisKeys.realtimeMeetingEmptyTimeoutKey(meetingId)); + } + + private RealtimeMeetingSessionStatusVO pauseState(Long meetingId, RealtimeMeetingSessionState state) { + long transcriptCount = countTranscripts(meetingId); + long now = System.currentTimeMillis(); + + state.setHasTranscript(transcriptCount > 0); + state.setTranscriptCountSnapshot(transcriptCount); + state.setPauseAt(now); + state.setActiveConnectionId(null); + state.setUpdatedAt(now); + + if (transcriptCount > 0) { + state.setStatus("PAUSED_RESUMABLE"); + state.setResumeExpireAt(now + Duration.ofMinutes(getResumeWindowMinutes()).toMillis()); + ensureTimeoutKey(meetingId, RedisKeys.realtimeMeetingResumeTimeoutKey(meetingId), Duration.ofMinutes(getResumeWindowMinutes())); + redisTemplate.delete(RedisKeys.realtimeMeetingEmptyTimeoutKey(meetingId)); + if (state.getLastTranscriptAt() == null) { + state.setLastTranscriptAt(now); + } + } else { + state.setStatus("PAUSED_EMPTY"); + state.setResumeExpireAt(null); + redisTemplate.delete(RedisKeys.realtimeMeetingResumeTimeoutKey(meetingId)); + ensureTimeoutKey(meetingId, RedisKeys.realtimeMeetingEmptyTimeoutKey(meetingId), Duration.ofMinutes(getEmptySessionRetentionMinutes())); + } + + writeState(state); + return toStatusVO(state); + } + + private RealtimeMeetingSessionStatusVO buildFallbackStatus(Long meetingId) { + RealtimeMeetingSessionStatusVO vo = new RealtimeMeetingSessionStatusVO(); + vo.setMeetingId(meetingId); + Meeting meeting = meetingMapper.selectById(meetingId); + if (meeting == null) { + vo.setStatus("IDLE"); + vo.setHasTranscript(false); + vo.setCanResume(false); + vo.setRemainingSeconds(0L); + vo.setActiveConnection(false); + return vo; + } + + if (Integer.valueOf(2).equals(meeting.getStatus())) { + vo.setStatus("COMPLETING"); + } else if (Integer.valueOf(3).equals(meeting.getStatus()) || Integer.valueOf(4).equals(meeting.getStatus())) { + vo.setStatus("COMPLETED"); + } else { + vo.setStatus("IDLE"); + } + vo.setHasTranscript(countTranscripts(meetingId) > 0); + vo.setCanResume(false); + vo.setRemainingSeconds(0L); + vo.setActiveConnection(false); + return vo; + } + + private RealtimeMeetingSessionStatusVO toStatusVO(RealtimeMeetingSessionState state) { + RealtimeMeetingSessionStatusVO vo = new RealtimeMeetingSessionStatusVO(); + vo.setMeetingId(state.getMeetingId()); + vo.setStatus(state.getStatus()); + vo.setHasTranscript(Boolean.TRUE.equals(state.getHasTranscript())); + vo.setResumeExpireAt(state.getResumeExpireAt()); + vo.setResumeConfig(state.getResumeConfig()); + vo.setActiveConnection(state.getActiveConnectionId() != null && !state.getActiveConnectionId().isBlank()); + + long now = System.currentTimeMillis(); + long remainingSeconds = 0L; + if (state.getResumeExpireAt() != null) { + remainingSeconds = Math.max(0L, (state.getResumeExpireAt() - now) / 1000); + } + vo.setRemainingSeconds(remainingSeconds); + vo.setCanResume( + "PAUSED_EMPTY".equals(state.getStatus()) + || ("PAUSED_RESUMABLE".equals(state.getStatus()) && remainingSeconds > 0) + || "IDLE".equals(state.getStatus()) + ); + return vo; + } + + private RealtimeMeetingSessionState getOrCreateState(Long meetingId) { + RealtimeMeetingSessionState state = readState(meetingId); + if (state != null) { + return state; + } + RealtimeMeetingSessionState next = new RealtimeMeetingSessionState(); + next.setMeetingId(meetingId); + next.setStatus("IDLE"); + next.setHasTranscript(countTranscripts(meetingId) > 0); + next.setTranscriptCountSnapshot(countTranscripts(meetingId)); + next.setUpdatedAt(System.currentTimeMillis()); + return next; + } + + private RealtimeMeetingSessionState readState(Long meetingId) { + String raw = redisTemplate.opsForValue().get(RedisKeys.realtimeMeetingSessionStateKey(meetingId)); + if (raw == null || raw.isBlank()) { + return null; + } + try { + return objectMapper.readValue(raw, RealtimeMeetingSessionState.class); + } catch (Exception ex) { + log.warn("Failed to read realtime meeting session state, meetingId={}", meetingId, ex); + return null; + } + } + + private void writeState(RealtimeMeetingSessionState state) { + try { + redisTemplate.opsForValue().set( + RedisKeys.realtimeMeetingSessionStateKey(state.getMeetingId()), + objectMapper.writeValueAsString(state) + ); + } catch (Exception ex) { + throw new RuntimeException("Failed to write realtime meeting session state", ex); + } + } + + private void ensureTimeoutKey(Long meetingId, String key, Duration ttl) { + redisTemplate.opsForValue().set(key, String.valueOf(meetingId), ttl); + } + + private long getResumeWindowMinutes() { + return parseLongOrDefault(resumeWindowMinutesValue, 30L, "resume-window-minutes"); + } + + private long getEmptySessionRetentionMinutes() { + return parseLongOrDefault(emptySessionRetentionMinutesValue, 720L, "empty-session-retention-minutes"); + } + + private long parseLongOrDefault(String raw, long defaultValue, String configName) { + if (raw == null || raw.isBlank()) { + return defaultValue; + } + try { + return Long.parseLong(raw.trim()); + } catch (NumberFormatException ex) { + log.warn("Invalid realtime meeting config {}, rawValue={}, use default={}", configName, raw, defaultValue); + return defaultValue; + } + } + + private long countTranscripts(Long meetingId) { + return transcriptMapper.selectCount(new LambdaQueryWrapper() + .eq(MeetingTranscript::getMeetingId, meetingId)); + } +} diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSocketSessionServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSocketSessionServiceImpl.java index badd32e..cdb3b59 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSocketSessionServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSocketSessionServiceImpl.java @@ -3,11 +3,13 @@ package com.imeeting.service.biz.impl; import com.fasterxml.jackson.databind.ObjectMapper; import com.imeeting.common.RedisKeys; import com.imeeting.dto.biz.AiModelVO; +import com.imeeting.dto.biz.RealtimeMeetingResumeConfig; import com.imeeting.dto.biz.RealtimeSocketSessionData; import com.imeeting.dto.biz.RealtimeSocketSessionVO; import com.imeeting.entity.biz.Meeting; import com.imeeting.service.biz.AiModelService; import com.imeeting.service.biz.MeetingAccessService; +import com.imeeting.service.biz.RealtimeMeetingSessionStateService; import com.imeeting.service.biz.RealtimeMeetingSocketSessionService; import com.unisbase.security.LoginUser; import lombok.RequiredArgsConstructor; @@ -31,6 +33,7 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS private final StringRedisTemplate redisTemplate; private final MeetingAccessService meetingAccessService; private final AiModelService aiModelService; + private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; @Override public RealtimeSocketSessionVO createSession(Long meetingId, Long asrModelId, String mode, String language, @@ -47,6 +50,9 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS Meeting meeting = meetingAccessService.requireMeeting(meetingId); meetingAccessService.assertCanManageRealtimeMeeting(meeting, loginUser); + realtimeMeetingSessionStateService.initSessionIfAbsent(meetingId, loginUser.getTenantId(), loginUser.getUserId()); + realtimeMeetingSessionStateService.assertCanOpenSession(meetingId); + AiModelVO asrModel = aiModelService.getModelById(asrModelId, "ASR"); if (asrModel == null) { throw new RuntimeException("ASR model not found"); @@ -57,6 +63,18 @@ public class RealtimeMeetingSocketSessionServiceImpl implements RealtimeMeetingS throw new RuntimeException("ASR model WebSocket is not configured"); } + RealtimeMeetingResumeConfig resumeConfig = new RealtimeMeetingResumeConfig(); + resumeConfig.setAsrModelId(asrModelId); + resumeConfig.setMode(mode); + resumeConfig.setLanguage(language); + resumeConfig.setUseSpkId(useSpkId); + resumeConfig.setEnablePunctuation(enablePunctuation); + resumeConfig.setEnableItn(enableItn); + resumeConfig.setEnableTextRefine(enableTextRefine); + resumeConfig.setSaveAudio(saveAudio); + resumeConfig.setHotwords(hotwords); + realtimeMeetingSessionStateService.rememberResumeConfig(meetingId, resumeConfig); + RealtimeSocketSessionData sessionData = new RealtimeSocketSessionData(); sessionData.setMeetingId(meetingId); sessionData.setUserId(loginUser.getUserId()); diff --git a/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java b/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java index 2365b8f..e517a50 100644 --- a/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java +++ b/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java @@ -1,6 +1,7 @@ package com.imeeting.websocket; import com.imeeting.dto.biz.RealtimeSocketSessionData; +import com.imeeting.service.biz.RealtimeMeetingSessionStateService; import com.imeeting.service.biz.RealtimeMeetingSocketSessionService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -25,11 +26,11 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Supplier; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; @Slf4j @Component @@ -48,6 +49,7 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl private static final CompletableFuture COMPLETED = CompletableFuture.completedFuture(null); private final RealtimeMeetingSocketSessionService realtimeMeetingSocketSessionService; + private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { @@ -77,7 +79,13 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl upstreamSocket = java.net.http.HttpClient.newHttpClient() .newWebSocketBuilder() .buildAsync(URI.create(sessionData.getTargetWsUrl()), - new UpstreamListener(frontendSession, session, sessionData.getMeetingId(), sessionData.getTargetWsUrl())) + new UpstreamListener( + frontendSession, + session, + sessionData.getMeetingId(), + sessionData.getTargetWsUrl(), + realtimeMeetingSessionStateService + )) .get(); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); @@ -163,6 +171,10 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { log.info("Realtime websocket closed: meetingId={}, sessionId={}, code={}, reason={}", session.getAttributes().get(ATTR_MEETING_ID), session.getId(), status.getCode(), status.getReason()); + Object meetingIdValue = session.getAttributes().get(ATTR_MEETING_ID); + if (meetingIdValue instanceof Long meetingId) { + realtimeMeetingSessionStateService.pauseByDisconnect(meetingId, session.getId()); + } closeUpstreamSocket(session, status); } @@ -306,23 +318,32 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl private final WebSocketSession rawSession; private final Long meetingId; private final String targetWsUrl; + private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; private final StringBuilder textBuffer = new StringBuilder(); private final ByteArrayOutputStream binaryBuffer = new ByteArrayOutputStream(); private final AtomicInteger upstreamTextCount = new AtomicInteger(); private final AtomicInteger upstreamBinaryCount = new AtomicInteger(); private UpstreamListener(ConcurrentWebSocketSessionDecorator frontendSession, WebSocketSession rawSession, - Long meetingId, String targetWsUrl) { + Long meetingId, String targetWsUrl, + RealtimeMeetingSessionStateService realtimeMeetingSessionStateService) { this.frontendSession = frontendSession; this.rawSession = rawSession; this.meetingId = meetingId; this.targetWsUrl = targetWsUrl; + this.realtimeMeetingSessionStateService = realtimeMeetingSessionStateService; } @Override public void onOpen(java.net.http.WebSocket webSocket) { log.info("Upstream websocket opened: meetingId={}, sessionId={}, upstream={}", meetingId, rawSession.getId(), targetWsUrl); + if (!realtimeMeetingSessionStateService.activate(meetingId, rawSession.getId())) { + sendFrontendError("REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃实时连接,请先关闭旧连接后再继续"); + webSocket.sendClose(CloseStatus.POLICY_VIOLATION.getCode(), "Active realtime connection already exists"); + closeFrontend(CloseStatus.POLICY_VIOLATION.withReason("Active realtime connection already exists")); + return; + } try { if (frontendSession.isOpen()) { frontendSession.sendMessage(new TextMessage("{\"type\":\"proxy_ready\"}")); diff --git a/backend/src/main/resources/application.yml b/backend/src/main/resources/application.yml index 9192048..ea259be 100644 --- a/backend/src/main/resources/application.yml +++ b/backend/src/main/resources/application.yml @@ -52,4 +52,10 @@ unisbase: max-attempts: 5 token: access-default-minutes: 30 - refresh-default-days: 7 \ No newline at end of file + refresh-default-days: 7 + +imeeting: + realtime: + resume-window-minutes: 30 + empty-session-retention-minutes: 720 + redis-expire-listener-enabled: true diff --git a/frontend/src/api/business/meeting.ts b/frontend/src/api/business/meeting.ts index 2292228..20d2a9d 100644 --- a/frontend/src/api/business/meeting.ts +++ b/frontend/src/api/business/meeting.ts @@ -1,4 +1,4 @@ -import http from "../http"; +import http from "../http"; import axios from "axios"; export interface MeetingVO { @@ -22,6 +22,8 @@ export interface MeetingVO { todos?: string[]; }; status: number; + displayStatus?: number; + realtimeSessionStatus?: RealtimeMeetingSessionStatus["status"]; createdAt: string; } @@ -58,11 +60,11 @@ export interface UpdateMeetingSummaryCommand { export type MeetingUpdateSummaryDTO = UpdateMeetingSummaryCommand; -export const getMeetingPage = (params: { - current: number; - size: number; +export const getMeetingPage = (params: { + current: number; + size: number; title?: string; - viewType?: 'all' | 'created' | 'involved'; + viewType?: "all" | "created" | "involved"; }) => { return http.get( "/api/biz/meeting/page", @@ -104,6 +106,17 @@ export interface RealtimeSocketSessionRequest { hotwords?: Array<{ hotword: string; weight: number }>; } +export interface RealtimeMeetingSessionStatus { + meetingId: number; + status: "IDLE" | "ACTIVE" | "PAUSED_EMPTY" | "PAUSED_RESUMABLE" | "COMPLETING" | "COMPLETED"; + hasTranscript: boolean; + canResume: boolean; + remainingSeconds: number; + resumeExpireAt?: number; + activeConnection: boolean; + resumeConfig?: RealtimeSocketSessionRequest; +} + export const createRealtimeMeeting = (data: CreateMeetingCommand) => { return http.post( "/api/biz/meeting/realtime/start", @@ -118,6 +131,19 @@ export const appendRealtimeTranscripts = (meetingId: number, data: RealtimeTrans ); }; +export const getRealtimeMeetingSessionStatus = (meetingId: number) => { + return http.get( + `/api/biz/meeting/${meetingId}/realtime/session-status` + ); +}; + +export const pauseRealtimeMeeting = (meetingId: number) => { + return http.post( + `/api/biz/meeting/${meetingId}/realtime/pause`, + {} + ); +}; + export const openRealtimeMeetingSocketSession = ( meetingId: number, data: RealtimeSocketSessionRequest, @@ -253,11 +279,11 @@ export const getMeetingProgress = (id: number) => { ); }; -export const downloadMeetingSummary = (id: number, format: 'pdf' | 'word') => { +export const downloadMeetingSummary = (id: number, format: "pdf" | "word") => { const token = localStorage.getItem("accessToken"); return axios.get(`/api/biz/meeting/${id}/summary/export`, { params: { format }, - responseType: 'blob', + responseType: "blob", headers: token ? { Authorization: `Bearer ${token}` } : {} }); }; diff --git a/frontend/src/pages/business/Meetings.tsx b/frontend/src/pages/business/Meetings.tsx index 964159c..2328869 100644 --- a/frontend/src/pages/business/Meetings.tsx +++ b/frontend/src/pages/business/Meetings.tsx @@ -6,11 +6,11 @@ import { TeamOutlined, ClockCircleOutlined, EditOutlined, RightOutlined, SyncOutlined, InfoCircleOutlined, CloudUploadOutlined, SettingOutlined, QuestionCircleOutlined, FileTextOutlined, CheckOutlined, RocketOutlined, - AudioOutlined + AudioOutlined, PauseCircleOutlined } from '@ant-design/icons'; import { useNavigate, useSearchParams } from 'react-router-dom'; import { usePermission } from '../../hooks/usePermission'; -import { getMeetingPage, deleteMeeting, MeetingVO, getMeetingProgress, MeetingProgress, createMeeting, uploadAudio, updateMeetingParticipants } from '../../api/business/meeting'; +import { getMeetingPage, deleteMeeting, MeetingVO, getMeetingProgress, MeetingProgress, createMeeting, uploadAudio, updateMeetingParticipants, getRealtimeMeetingSessionStatus } from '../../api/business/meeting'; import { getAiModelPage, getAiModelDefault, AiModelVO } from '../../api/business/aimodel'; import { getPromptPage, PromptTemplateVO } from '../../api/business/prompt'; import { getHotWordPage, HotWordVO } from '../../api/business/hotword'; @@ -22,12 +22,14 @@ import { useTranslation } from 'react-i18next'; const { Text, Title } = Typography; const { Dragger } = Upload; const { Option } = Select; +const PAUSED_DISPLAY_STATUS = 5; // --- 进度感知 Hook --- const useMeetingProgress = (meeting: MeetingVO, onComplete?: () => void) => { const [progress, setProgress] = useState(null); useEffect(() => { - if (meeting.status !== 1 && meeting.status !== 2) return; + const effectiveStatus = meeting.displayStatus ?? meeting.status; + if (effectiveStatus !== 1 && effectiveStatus !== 2) return; const fetchProgress = async () => { try { const res = await getMeetingProgress(meeting.id); @@ -43,27 +45,29 @@ const useMeetingProgress = (meeting: MeetingVO, onComplete?: () => void) => { fetchProgress(); const timer = setInterval(fetchProgress, 3000); return () => clearInterval(timer); - }, [meeting.id, meeting.status]); + }, [meeting.id, meeting.status, meeting.displayStatus]); return progress; }; // --- 状态标签组件 --- const IntegratedStatusTag: React.FC<{ meeting: MeetingVO, progress: MeetingProgress | null }> = ({ meeting, progress }) => { + const effectiveStatus = meeting.displayStatus ?? meeting.status; const statusConfig: Record = { 0: { text: '排队中', color: '#8c8c8c', bgColor: '#f5f5f5' }, 1: { text: '识别中', color: '#1890ff', bgColor: '#e6f7ff' }, 2: { text: '总结中', color: '#faad14', bgColor: '#fff7e6' }, 3: { text: '已完成', color: '#52c41a', bgColor: '#f6ffed' }, - 4: { text: '失败', color: '#ff4d4f', bgColor: '#fff1f0' } + 4: { text: '失败', color: '#ff4d4f', bgColor: '#fff1f0' }, + 5: { text: '会议暂停', color: '#d48806', bgColor: '#fff7e6' } }; - const config = statusConfig[meeting.status] || statusConfig[0]; + const config = statusConfig[effectiveStatus] || statusConfig[0]; const percent = progress?.percent || 0; - const isProcessing = meeting.status === 1 || meeting.status === 2; + const isProcessing = effectiveStatus === 1 || effectiveStatus === 2; return (
{/* 进度填充背景 */} {isProcessing && percent > 0 && ( -
+
)} {isProcessing ? : null} @@ -259,15 +263,16 @@ const MeetingCreateForm: React.FC<{ }; // --- 卡片项组件 --- -const MeetingCardItem: React.FC<{ item: MeetingVO, config: any, fetchData: () => void, t: any, onEditParticipants: (meeting: MeetingVO) => void }> = ({ item, config, fetchData, t, onEditParticipants }) => { - const navigate = useNavigate(); +const MeetingCardItem: React.FC<{ item: MeetingVO, config: any, fetchData: () => void, t: any, onEditParticipants: (meeting: MeetingVO) => void, onOpenMeeting: (meeting: MeetingVO) => void }> = ({ item, config, fetchData, t, onEditParticipants, onOpenMeeting }) => { // 注入自动刷新回调 const progress = useMeetingProgress(item, () => fetchData()); - const isProcessing = item.status === 1 || item.status === 2; + const effectiveStatus = item.displayStatus ?? item.status; + const isProcessing = effectiveStatus === 1 || effectiveStatus === 2; + const isPaused = effectiveStatus === PAUSED_DISPLAY_STATUS; return ( - navigate(`/meetings/${item.id}`)} className="meeting-card" style={{ borderRadius: 16, border: '1px solid var(--app-border-color)', background: 'var(--app-bg-card)', backdropFilter: 'blur(16px)', height: '220px', position: 'relative', boxShadow: 'var(--app-shadow)', transition: 'all 0.3s cubic-bezier(0.25, 0.8, 0.25, 1)' }} bodyStyle={{ padding: 0, display: 'flex', height: '100%' }}> + onOpenMeeting(item)} className="meeting-card" style={{ borderRadius: 16, border: '1px solid var(--app-border-color)', background: 'var(--app-bg-card)', backdropFilter: 'blur(16px)', height: '220px', position: 'relative', boxShadow: 'var(--app-shadow)', transition: 'all 0.3s cubic-bezier(0.25, 0.8, 0.25, 1)' }} bodyStyle={{ padding: 0, display: 'flex', height: '100%' }}>
e.stopPropagation()}> @@ -294,10 +299,10 @@ const MeetingCardItem: React.FC<{ item: MeetingVO, config: any, fetchData: () => {isProcessing ? (
{progress?.message || '等待引擎调度...'}
+ ) : isPaused ? ( +
+ + + 会议已暂停,可继续识别 + +
) : (
{item.participants || '无参与人员'}
@@ -375,7 +400,10 @@ const Meetings: React.FC = () => { const [editingMeeting, setEditingMeeting] = useState(null); const [participantsEditLoading, setParticipantsEditLoading] = useState(false); const [participantsEditForm] = Form.useForm(); - const hasRunningTasks = data.some(item => item.status === 0 || item.status === 1 || item.status === 2); + const hasRunningTasks = data.some(item => { + const effectiveStatus = item.displayStatus ?? item.status; + return effectiveStatus === 0 || effectiveStatus === 1 || effectiveStatus === 2; + }); useEffect(() => { fetchData(); }, [current, size, searchTitle, viewType]); useEffect(() => { @@ -391,10 +419,52 @@ const Meetings: React.FC = () => { if (!silent) setLoading(true); try { const res = await getMeetingPage({ current, size, title: searchTitle, viewType }); - if (res.data && res.data.data) { setData(res.data.data.records); setTotal(res.data.data.total); } + if (res.data && res.data.data) { + const records = res.data.data.records || []; + const withDisplayStatus = await Promise.all(records.map(async (item) => { + try { + const sessionRes = await getRealtimeMeetingSessionStatus(item.id); + const sessionStatus = sessionRes.data?.data; + if (sessionStatus?.status === 'PAUSED_EMPTY' || sessionStatus?.status === 'PAUSED_RESUMABLE') { + return { + ...item, + displayStatus: PAUSED_DISPLAY_STATUS, + realtimeSessionStatus: sessionStatus.status + }; + } + if (sessionStatus?.status === 'ACTIVE') { + return { + ...item, + displayStatus: 1, + realtimeSessionStatus: sessionStatus.status + }; + } + return { + ...item, + realtimeSessionStatus: sessionStatus?.status + }; + } catch { + return item; + } + })); + setData(withDisplayStatus); + setTotal(res.data.data.total); + } } catch (err) {} finally { if (!silent) setLoading(false); } }; + const handleOpenMeeting = async (meeting: MeetingVO) => { + try { + const res = await getRealtimeMeetingSessionStatus(meeting.id); + const sessionStatus = res.data?.data; + if (sessionStatus && (sessionStatus.status === 'PAUSED_EMPTY' || sessionStatus.status === 'PAUSED_RESUMABLE' || sessionStatus.status === 'ACTIVE')) { + navigate(`/meeting-live-session/${meeting.id}`); + return; + } + } catch (error) {} + navigate(`/meetings/${meeting.id}`); + }; + const handleCreateSubmit = async () => { if (!audioUrl) { message.error('请先上传录音文件'); @@ -451,7 +521,8 @@ const Meetings: React.FC = () => { 1: { text: '识别中', color: '#1890ff', bgColor: '#e6f7ff' }, 2: { text: '总结中', color: '#faad14', bgColor: '#fff7e6' }, 3: { text: '已完成', color: '#52c41a', bgColor: '#f6ffed' }, - 4: { text: '失败', color: '#ff4d4f', bgColor: '#fff1f0' } + 4: { text: '失败', color: '#ff4d4f', bgColor: '#fff1f0' }, + 5: { text: '会议暂停', color: '#d48806', bgColor: '#fff7e6' } }; return ( @@ -482,8 +553,8 @@ const Meetings: React.FC = () => {
{ - const config = statusConfig[item.status] || statusConfig[0]; - return ; + const config = statusConfig[item.displayStatus ?? item.status] || statusConfig[0]; + return ; }} locale={{ emptyText: }} />
diff --git a/frontend/src/pages/business/RealtimeAsrSession.tsx b/frontend/src/pages/business/RealtimeAsrSession.tsx index cabc4d3..752f038 100644 --- a/frontend/src/pages/business/RealtimeAsrSession.tsx +++ b/frontend/src/pages/business/RealtimeAsrSession.tsx @@ -1,4 +1,4 @@ -import { useEffect, useMemo, useRef, useState } from "react"; +import { useEffect, useMemo, useRef, useState } from "react"; import { Alert, Avatar, @@ -30,14 +30,16 @@ import { appendRealtimeTranscripts, completeRealtimeMeeting, getMeetingDetail, + getRealtimeMeetingSessionStatus, getTranscripts, openRealtimeMeetingSocketSession, + pauseRealtimeMeeting, type MeetingTranscriptVO, type MeetingVO, + type RealtimeMeetingSessionStatus, type RealtimeTranscriptItemDTO, type RealtimeSocketSessionVO, } from "../../api/business/meeting"; - const { Text, Title } = Typography; const SAMPLE_RATE = 16000; const CHUNK_SIZE = 1280; @@ -92,6 +94,28 @@ function getSessionKey(meetingId: number) { return `realtimeMeetingSession:${meetingId}`; } +function buildDraftFromStatus(meetingId: number, meeting: MeetingVO | null, status?: RealtimeMeetingSessionStatus | null): RealtimeMeetingSessionDraft | null { + const config = status?.resumeConfig; + if (!config?.asrModelId) { + return null; + } + return { + meetingId, + meetingTitle: meeting?.title || `实时会议 ${meetingId}`, + asrModelName: "ASR", + summaryModelName: "LLM", + asrModelId: config.asrModelId, + mode: config.mode || "2pass", + language: config.language || "auto", + useSpkId: config.useSpkId ? 1 : 0, + enablePunctuation: config.enablePunctuation !== false, + enableItn: config.enableItn !== false, + enableTextRefine: !!config.enableTextRefine, + saveAudio: !!config.saveAudio, + hotwords: config.hotwords || [], + }; +} + function floatTo16BitPCM(input: Float32Array) { const buffer = new ArrayBuffer(input.length * 2); const view = new DataView(buffer); @@ -176,7 +200,7 @@ function normalizeWsMessage(payload: WsMessage) { }; } -export default function RealtimeAsrSession() { +export function RealtimeAsrSession() { const navigate = useNavigate(); const { id } = useParams<{ id: string }>(); const meetingId = Number(id); @@ -187,12 +211,14 @@ export default function RealtimeAsrSession() { const [recording, setRecording] = useState(false); const [connecting, setConnecting] = useState(false); const [finishing, setFinishing] = useState(false); + const [pausing, setPausing] = useState(false); const [statusText, setStatusText] = useState("待开始"); const [streamingText, setStreamingText] = useState(""); const [streamingSpeaker, setStreamingSpeaker] = useState("Unknown"); const [transcripts, setTranscripts] = useState([]); const [audioLevel, setAudioLevel] = useState(0); const [elapsedSeconds, setElapsedSeconds] = useState(0); + const [sessionStatus, setSessionStatus] = useState(null); const transcriptRef = useRef(null); const wsRef = useRef(null); @@ -211,6 +237,7 @@ export default function RealtimeAsrSession() { [streamingText, transcripts], ); const statusColor = recording ? "#1677ff" : connecting || finishing ? "#faad14" : "#94a3b8"; + const hasRemoteActiveConnection = Boolean(sessionStatus?.activeConnection) && !recording && !connecting; useEffect(() => { if (!meetingId || Number.isNaN(meetingId)) { @@ -221,10 +248,32 @@ export default function RealtimeAsrSession() { setLoading(true); try { const stored = sessionStorage.getItem(getSessionKey(meetingId)); - setSessionDraft(stored ? JSON.parse(stored) : null); + const parsedDraft = stored ? JSON.parse(stored) : null; - const [detailRes, transcriptRes] = await Promise.all([getMeetingDetail(meetingId), getTranscripts(meetingId)]); - setMeeting(detailRes.data.data); + const [detailRes, transcriptRes, statusRes] = await Promise.all([ + getMeetingDetail(meetingId), + getTranscripts(meetingId), + getRealtimeMeetingSessionStatus(meetingId), + ]); + const detail = detailRes.data.data; + const realtimeStatus = statusRes.data.data; + setMeeting(detail); + setSessionStatus(realtimeStatus); + const fallbackDraft = buildDraftFromStatus(meetingId, detail, realtimeStatus); + const resolvedDraft = parsedDraft || fallbackDraft; + setSessionDraft(resolvedDraft); + if (resolvedDraft) { + sessionStorage.setItem(getSessionKey(meetingId), JSON.stringify(resolvedDraft)); + } + if (realtimeStatus?.status === "PAUSED_RESUMABLE") { + setStatusText(`已暂停,可在 ${Math.max(1, Math.ceil((realtimeStatus.remainingSeconds || 0) / 60))} 分钟内继续`); + } else if (realtimeStatus?.status === "PAUSED_EMPTY") { + setStatusText("已暂停,可继续识别"); + } else if (realtimeStatus?.status === "ACTIVE" && realtimeStatus?.activeConnection) { + setStatusText("当前会议已有活跃实时连接"); + } else if (realtimeStatus?.status === "COMPLETING") { + setStatusText("正在生成总结"); + } setTranscripts( (transcriptRes.data.data || []).map((item: MeetingTranscriptVO) => ({ id: String(item.id), @@ -271,11 +320,10 @@ export default function RealtimeAsrSession() { return; } const token = localStorage.getItem("accessToken"); - completeOnceRef.current = true; if (wsRef.current?.readyState === WebSocket.OPEN) { wsRef.current.send(JSON.stringify({ is_speaking: false })); } - fetch(`/api/biz/meeting/${meetingId}/realtime/complete`, { + fetch(`/api/biz/meeting/${meetingId}/realtime/pause`, { method: "POST", keepalive: true, headers: { @@ -386,6 +434,36 @@ export default function RealtimeAsrSession() { await appendRealtimeTranscripts(meetingId, [item]); }; + const handlePause = async () => { + if (!meetingId || pausing || finishing || (!recording && !connecting)) { + return; + } + + setPausing(true); + setStatusText("暂停识别中..."); + try { + if (wsRef.current?.readyState === WebSocket.OPEN) { + wsRef.current.send(JSON.stringify({ is_speaking: false })); + } + wsRef.current?.close(); + wsRef.current = null; + sessionStartedRef.current = false; + await shutdownAudioPipeline(); + const pauseRes = await pauseRealtimeMeeting(meetingId); + setSessionStatus(pauseRes.data.data); + setRecording(false); + setConnecting(false); + startedAtRef.current = null; + setStatusText(pauseRes.data.data?.hasTranscript ? "已暂停,可继续识别" : "已暂停,当前还没有转录内容"); + message.success("实时识别已暂停"); + } catch (error) { + setStatusText("暂停失败"); + message.error(error instanceof Error ? error.message : "暂停实时识别失败"); + } finally { + setPausing(false); + } + }; + const handleStart = async () => { if (!sessionDraft?.asrModelId) { message.error("未找到实时识别配置,请返回创建页重新进入"); @@ -434,6 +512,7 @@ export default function RealtimeAsrSession() { startedAtRef.current = Date.now(); setConnecting(false); setRecording(true); + setSessionStatus((prev) => prev ? { ...prev, status: "ACTIVE", activeConnection: true } : prev); setStatusText("实时识别中"); }) .catch((error) => { @@ -487,6 +566,7 @@ export default function RealtimeAsrSession() { setConnecting(false); setRecording(false); sessionStartedRef.current = false; + setSessionStatus((prev) => prev ? { ...prev, activeConnection: false } : prev); }; } catch (error) { setConnecting(false); @@ -517,14 +597,26 @@ export default function RealtimeAsrSession() { try { await completeRealtimeMeeting(meetingId, {}); sessionStorage.removeItem(getSessionKey(meetingId)); + setSessionStatus((prev) => prev ? { ...prev, status: "COMPLETING", canResume: false, activeConnection: false } : prev); setStatusText("已提交总结任务"); message.success("实时会议已结束,正在生成总结"); if (navigateAfterStop) { navigate(`/meetings/${meetingId}`); } - } catch { + } catch (error) { completeOnceRef.current = false; - setStatusText("结束失败"); + const errorMessage = error instanceof Error ? error.message : "结束会议失败"; + if (errorMessage.includes("当前还没有转录内容")) { + try { + const statusRes = await getRealtimeMeetingSessionStatus(meetingId); + setSessionStatus(statusRes.data.data); + } catch { + // ignore status refresh failure + } + setStatusText("当前还没有转录内容,可继续识别"); + } else { + setStatusText("结束失败"); + } } finally { setRecording(false); setFinishing(false); @@ -666,10 +758,13 @@ export default function RealtimeAsrSession() {
- - + @@ -697,7 +792,7 @@ export default function RealtimeAsrSession() {
- +
@@ -710,7 +805,7 @@ export default function RealtimeAsrSession() { 优先展示最终片段,流式草稿保留在底部作为当前正在识别的内容。
- } color={recording ? "processing" : "default"}>{recording ? "采集中" : connecting ? "连接中" : "待命"} + } color={recording ? "processing" : sessionStatus?.status === "ACTIVE" && hasRemoteActiveConnection ? "processing" : sessionStatus?.status === "PAUSED_RESUMABLE" || sessionStatus?.status === "PAUSED_EMPTY" ? "warning" : "default"}>{recording ? "采集中" : connecting ? "连接中" : sessionStatus?.status === "ACTIVE" && hasRemoteActiveConnection ? "连接占用中" : sessionStatus?.status === "PAUSED_RESUMABLE" || sessionStatus?.status === "PAUSED_EMPTY" ? "已暂停" : "待命"} {sessionDraft.asrModelName}
@@ -718,7 +813,7 @@ export default function RealtimeAsrSession() {
{transcripts.length === 0 && !streamingText ? (
- +
) : ( @@ -762,3 +857,4 @@ export default function RealtimeAsrSession() { ); } +export default RealtimeAsrSession; diff --git a/frontend/src/routes/routes.tsx b/frontend/src/routes/routes.tsx index 1ed098d..4162f83 100644 --- a/frontend/src/routes/routes.tsx +++ b/frontend/src/routes/routes.tsx @@ -1,4 +1,4 @@ -import { Spin } from "antd"; +import { Spin } from "antd"; import { Suspense, lazy } from "react"; import type { MenuRoute } from "@/types"; @@ -20,7 +20,7 @@ const RolePermissionBinding = lazy(() => import("@/pages/bindings/role-permissio import SpeakerReg from "../pages/business/SpeakerReg"; import RealtimeAsr from "../pages/business/RealtimeAsr"; -import RealtimeAsrSession from "../pages/business/RealtimeAsrSession"; +const RealtimeAsrSession = lazy(async () => { const mod = await import("../pages/business/RealtimeAsrSession"); return { default: mod.default ?? mod.RealtimeAsrSession }; }); import HotWords from "../pages/business/HotWords"; import PromptTemplates from "../pages/business/PromptTemplates"; import AiModels from "../pages/business/AiModels";