From b2e2f2c46a71395bdc112717bcb68bfbaae665b4 Mon Sep 17 00:00:00 2001 From: chenhao Date: Wed, 8 Apr 2026 19:15:03 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=BC=BA=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E4=BC=9A=E8=AE=AEgRPC=E6=9C=8D=E5=8A=A1=E5=92=8C=E4=BC=9A?= =?UTF-8?q?=E8=AF=9D=E7=8A=B6=E6=80=81=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 `MeetingCommandServiceImpl` 中更新 `saveRealtimeTranscriptSnapshot` 方法,仅保存最终结果 - 在 `GrpcServerLifecycle` 中添加 `GrpcExceptionLoggingInterceptor` - 在 `RealtimeMeetingSessionStateServiceImpl` 中添加终端状态处理逻辑 - 在 `RealtimeMeetingGrpcService` 中增强错误处理和流关闭逻辑 - 添加 `saveRealtimeTranscriptSnapshotShouldIgnoreNonFinalTranscript` 测试用例 - 在 `MeetingAuthorizationServiceImpl` 中添加匿名访问支持 - 在 `RealtimeMeetingGrpcSessionServiceImpl` 中添加异常处理和清理逻辑 --- .gitignore | 13 ++- .../config/grpc/GrpcServerLifecycle.java | 4 +- .../realtime/RealtimeMeetingGrpcService.java | 82 ++++++++++++++++--- .../impl/MeetingAuthorizationServiceImpl.java | 13 +++ .../biz/impl/MeetingCommandServiceImpl.java | 2 +- ...ealtimeMeetingSessionStateServiceImpl.java | 33 +++++++- ...RealtimeMeetingGrpcSessionServiceImpl.java | 73 ++++++++++++++--- .../impl/MeetingCommandServiceImplTest.java | 37 +++++++++ 8 files changed, 227 insertions(+), 30 deletions(-) diff --git a/.gitignore b/.gitignore index 48abcaa..452e9a5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,11 @@ -backend/target/ -node_modules/ +# Local environment files +.env +.env.* +backend/.env +backend/.env.* +backend/src/main/resources/application-local.yml + +# Logs +*.log +!backend/.env.example +.omx/ diff --git a/backend/src/main/java/com/imeeting/config/grpc/GrpcServerLifecycle.java b/backend/src/main/java/com/imeeting/config/grpc/GrpcServerLifecycle.java index 95f9d7a..0e18844 100644 --- a/backend/src/main/java/com/imeeting/config/grpc/GrpcServerLifecycle.java +++ b/backend/src/main/java/com/imeeting/config/grpc/GrpcServerLifecycle.java @@ -21,6 +21,7 @@ import java.util.List; public class GrpcServerLifecycle { private final GrpcServerProperties properties; + private final GrpcExceptionLoggingInterceptor grpcExceptionLoggingInterceptor; private final List bindableServices; private Server server; @@ -32,7 +33,8 @@ public class GrpcServerLifecycle { } NettyServerBuilder builder = NettyServerBuilder.forPort(properties.getPort()) - .maxInboundMessageSize(properties.getMaxInboundMessageSize()); + .maxInboundMessageSize(properties.getMaxInboundMessageSize()) + .intercept(grpcExceptionLoggingInterceptor); bindableServices.forEach(builder::addService); if (properties.isReflectionEnabled()) { builder.addService(ProtoReflectionService.newInstance()); diff --git a/backend/src/main/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcService.java b/backend/src/main/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcService.java index 3a0d385..1ffaaed 100644 --- a/backend/src/main/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcService.java +++ b/backend/src/main/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcService.java @@ -1,13 +1,16 @@ package com.imeeting.grpc.realtime; import com.imeeting.dto.android.AndroidAuthContext; +import com.imeeting.grpc.common.ErrorEvent; import com.imeeting.service.android.AndroidAuthService; import com.imeeting.service.realtime.RealtimeMeetingGrpcSessionService; import io.grpc.BindableService; import io.grpc.stub.StreamObserver; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +@Slf4j @Service @RequiredArgsConstructor public class RealtimeMeetingGrpcService extends RealtimeMeetingServiceGrpc.RealtimeMeetingServiceImplBase implements BindableService { @@ -20,31 +23,40 @@ public class RealtimeMeetingGrpcService extends RealtimeMeetingServiceGrpc.Realt return new StreamObserver<>() { private String connectionId; private AndroidAuthContext authContext; + private boolean completed; @Override public void onNext(RealtimeClientPacket packet) { - switch (packet.getBodyCase()) { - case OPEN -> handleOpen(packet); - case AUDIO -> handleAudio(packet.getAudio()); - case CONTROL -> handleControl(packet.getControl()); - case BODY_NOT_SET -> { + if (completed) { + return; + } + try { + switch (packet.getBodyCase()) { + case OPEN -> handleOpen(packet); + case AUDIO -> handleAudio(packet.getAudio()); + case CONTROL -> handleControl(packet.getControl()); + case BODY_NOT_SET -> { + } } + } catch (Exception ex) { + handleProcessingError(packet, ex); } } @Override public void onError(Throwable t) { - if (connectionId != null) { - realtimeMeetingGrpcSessionService.closeStream(connectionId, "client_error", false); - } + completed = true; + log.warn("Realtime meeting gRPC client stream failed, connectionId={}", connectionId, t); + safeCloseStream("client_error", false); } @Override public void onCompleted() { + completed = true; if (connectionId != null) { - realtimeMeetingGrpcSessionService.closeStream(connectionId, "client_completed", false); + safeCloseStream("client_completed", false); } else { - responseObserver.onCompleted(); + safeCompleteResponse(); } } @@ -71,11 +83,57 @@ public class RealtimeMeetingGrpcService extends RealtimeMeetingServiceGrpc.Realt } switch (control.getType()) { case STOP_SPEAKING, END_INPUT -> realtimeMeetingGrpcSessionService.onStopSpeaking(connectionId); - case CLOSE_STREAM -> realtimeMeetingGrpcSessionService.closeStream(connectionId, "client_close_stream", true); + case CLOSE_STREAM -> safeCloseStream("client_close_stream", true); case START, CONTROL_TYPE_UNSPECIFIED -> { } } } + + private void handleProcessingError(RealtimeClientPacket packet, Exception ex) { + String requestId = packet == null ? "" : packet.getRequestId(); + log.error("Realtime meeting gRPC packet processing failed, requestId={}, connectionId={}", requestId, connectionId, ex); + safeSendError(requestId, ex.getMessage()); + completed = true; + if (connectionId != null) { + safeCloseStream("grpc_processing_error", true); + } else { + safeCompleteResponse(); + } + } + + private void safeSendError(String requestId, String message) { + try { + responseObserver.onNext(RealtimeServerPacket.newBuilder() + .setRequestId(requestId == null ? "" : requestId) + .setError(ErrorEvent.newBuilder() + .setCode("REALTIME_GRPC_ERROR") + .setMessage(message == null || message.isBlank() ? "Realtime meeting gRPC processing failed" : message) + .setRetryable(false) + .build()) + .build()); + } catch (Exception observerEx) { + log.warn("Failed to deliver realtime gRPC error packet, requestId={}, connectionId={}", requestId, connectionId, observerEx); + } + } + + private void safeCloseStream(String reason, boolean notifyClient) { + if (connectionId == null) { + return; + } + try { + realtimeMeetingGrpcSessionService.closeStream(connectionId, reason, notifyClient); + } catch (Exception closeEx) { + log.error("Failed to close realtime gRPC stream, connectionId={}, reason={}", connectionId, reason, closeEx); + } + } + + private void safeCompleteResponse() { + try { + responseObserver.onCompleted(); + } catch (Exception observerEx) { + log.warn("Failed to complete realtime gRPC response, connectionId={}", connectionId, observerEx); + } + } }; } -} +} \ No newline at end of file diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/MeetingAuthorizationServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingAuthorizationServiceImpl.java index 3807caa..65163e2 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/MeetingAuthorizationServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingAuthorizationServiceImpl.java @@ -16,19 +16,32 @@ public class MeetingAuthorizationServiceImpl implements MeetingAuthorizationServ @Override public void assertCanCreateMeeting(AndroidAuthContext authContext) { + if (allowAnonymous(authContext)) { + return; + } requireUser(authContext); } @Override public void assertCanViewMeeting(Meeting meeting, AndroidAuthContext authContext) { + if (allowAnonymous(authContext)) { + return; + } meetingAccessService.assertCanViewMeeting(meeting, requireUser(authContext)); } @Override public void assertCanManageRealtimeMeeting(Meeting meeting, AndroidAuthContext authContext) { + if (allowAnonymous(authContext)) { + return; + } meetingAccessService.assertCanManageRealtimeMeeting(meeting, requireUser(authContext)); } + private boolean allowAnonymous(AndroidAuthContext authContext) { + return authContext != null && authContext.isAnonymous(); + } + private LoginUser requireUser(AndroidAuthContext authContext) { if (authContext == null || authContext.isAnonymous() || authContext.getUserId() == null || authContext.getTenantId() == null) { throw new RuntimeException("安卓用户未登录或认证无效"); diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java index 7dec192..47b8243 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/MeetingCommandServiceImpl.java @@ -174,7 +174,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService { @Override @Transactional(rollbackFor = Exception.class) public void saveRealtimeTranscriptSnapshot(Long meetingId, RealtimeTranscriptItemDTO item, boolean finalResult) { - if (item == null || item.getContent() == null || item.getContent().isBlank()) { + if (!finalResult || item == null || item.getContent() == null || item.getContent().isBlank()) { return; } diff --git a/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java index 40b769c..8267d61 100644 --- a/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/biz/impl/RealtimeMeetingSessionStateServiceImpl.java @@ -124,6 +124,15 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe if (state == null) { return buildFallbackStatus(meetingId); } + if (isRedisTerminalStatus(state.getStatus())) { + return toStatusVO(state); + } + + RealtimeMeetingSessionStatusVO terminalFallback = terminalFallbackIfDatabaseTerminal(meetingId); + if (terminalFallback != null) { + clear(meetingId); + return terminalFallback; + } return toStatusVO(state); } @@ -247,6 +256,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe redisTemplate.delete(RedisKeys.realtimeMeetingSessionStateKey(meetingId)); redisTemplate.delete(RedisKeys.realtimeMeetingResumeTimeoutKey(meetingId)); redisTemplate.delete(RedisKeys.realtimeMeetingEmptyTimeoutKey(meetingId)); + redisTemplate.delete(RedisKeys.realtimeMeetingEventSeqKey(meetingId)); } private RealtimeMeetingSessionStatusVO pauseState(Long meetingId, RealtimeMeetingSessionState state) { @@ -279,9 +289,12 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe } private RealtimeMeetingSessionStatusVO buildFallbackStatus(Long meetingId) { + return buildFallbackStatus(meetingId, meetingMapper.selectById(meetingId)); + } + + private RealtimeMeetingSessionStatusVO buildFallbackStatus(Long meetingId, Meeting meeting) { RealtimeMeetingSessionStatusVO vo = new RealtimeMeetingSessionStatusVO(); vo.setMeetingId(meetingId); - Meeting meeting = meetingMapper.selectById(meetingId); if (meeting == null) { vo.setStatus("IDLE"); vo.setHasTranscript(false); @@ -293,7 +306,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe if (Integer.valueOf(2).equals(meeting.getStatus())) { vo.setStatus("COMPLETING"); - } else if (Integer.valueOf(3).equals(meeting.getStatus()) || Integer.valueOf(4).equals(meeting.getStatus())) { + } else if (isDatabaseTerminalStatus(meeting.getStatus())) { vo.setStatus("COMPLETED"); } else { vo.setStatus("IDLE"); @@ -305,6 +318,22 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe return vo; } + private RealtimeMeetingSessionStatusVO terminalFallbackIfDatabaseTerminal(Long meetingId) { + Meeting meeting = meetingMapper.selectById(meetingId); + if (meeting == null || !isDatabaseTerminalStatus(meeting.getStatus())) { + return null; + } + return buildFallbackStatus(meetingId, meeting); + } + + private boolean isDatabaseTerminalStatus(Integer status) { + return Integer.valueOf(3).equals(status) || Integer.valueOf(4).equals(status); + } + + private boolean isRedisTerminalStatus(String status) { + return "COMPLETED".equals(status); + } + private RealtimeMeetingSessionStatusVO toStatusVO(RealtimeMeetingSessionState state) { RealtimeMeetingSessionStatusVO vo = new RealtimeMeetingSessionStatusVO(); vo.setMeetingId(state.getMeetingId()); diff --git a/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImpl.java b/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImpl.java index e5e4ee8..c3f2eba 100644 --- a/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImpl.java +++ b/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImpl.java @@ -72,10 +72,16 @@ public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrp throw new RuntimeException("Duplicate realtime gRPC connectionId"); } - writeConnectionState(runtime); - realtimeMeetingAudioStorageService.openSession(sessionData.getMeetingId(), connectionId); - runtime.upstreamSession = asrUpstreamBridgeService.openSession(sessionData, connectionId, new UpstreamCallback(runtime)); - return connectionId; + try { + writeConnectionState(runtime); + realtimeMeetingAudioStorageService.openSession(sessionData.getMeetingId(), connectionId); + runtime.upstreamSession = asrUpstreamBridgeService.openSession(sessionData, connectionId, new UpstreamCallback(runtime)); + return connectionId; + } catch (Exception ex) { + sessions.remove(connectionId); + cleanupFailedOpen(runtime); + throw ex; + } } @Override @@ -120,9 +126,24 @@ public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrp log.warn("Failed to close upstream realtime session, connectionId={}", connectionId, ex); } - redisTemplate.delete(RedisKeys.realtimeMeetingGrpcConnectionKey(connectionId)); - realtimeMeetingAudioStorageService.closeSession(connectionId); - realtimeMeetingSessionStateService.pauseByDisconnect(runtime.sessionData.getMeetingId(), connectionId); + try { + redisTemplate.delete(RedisKeys.realtimeMeetingGrpcConnectionKey(connectionId)); + } catch (Exception ex) { + log.error("Failed to delete realtime gRPC connection state, connectionId={}", connectionId, ex); + } + + try { + realtimeMeetingAudioStorageService.closeSession(connectionId); + } catch (Exception ex) { + log.error("Failed to close realtime gRPC audio session, connectionId={}", connectionId, ex); + } + + try { + realtimeMeetingSessionStateService.pauseByDisconnect(runtime.sessionData.getMeetingId(), connectionId); + } catch (Exception ex) { + log.error("Failed to pause realtime meeting by disconnect, meetingId={}, connectionId={}", + runtime.sessionData.getMeetingId(), connectionId, ex); + } if (notifyClient) { runtime.send(RealtimeServerPacket.newBuilder() @@ -135,6 +156,26 @@ public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrp runtime.complete(); } + private void cleanupFailedOpen(SessionRuntime runtime) { + try { + redisTemplate.delete(RedisKeys.realtimeMeetingGrpcConnectionKey(runtime.connectionId)); + } catch (Exception ex) { + log.warn("Failed to rollback realtime gRPC connection state, connectionId={}", runtime.connectionId, ex); + } + try { + realtimeMeetingAudioStorageService.closeSession(runtime.connectionId); + } catch (Exception ex) { + log.warn("Failed to rollback realtime gRPC audio session, connectionId={}", runtime.connectionId, ex); + } + try { + if (runtime.upstreamSession != null) { + runtime.upstreamSession.close("open_failed"); + } + } catch (Exception ex) { + log.warn("Failed to rollback realtime gRPC upstream session, connectionId={}", runtime.connectionId, ex); + } + } + private void writeConnectionState(SessionRuntime runtime) { Duration ttl = Duration.ofSeconds(grpcServerProperties.getRealtime().getConnectionTtlSeconds()); String value = runtime.sessionData.getMeetingId() + ":" + runtime.sessionData.getDeviceId() + ":" + runtime.streamToken; @@ -203,7 +244,7 @@ public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrp @Override public void onTranscript(AsrUpstreamBridgeService.AsrTranscriptResult result) { - if (runtime.closed.get() || result == null || result.getText() == null || result.getText().isBlank()) { + if (runtime.closed.get() || result == null || !result.isFinalResult() || result.getText() == null || result.getText().isBlank()) { return; } RealtimeTranscriptItemDTO item = new RealtimeTranscriptItemDTO(); @@ -263,8 +304,12 @@ public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrp synchronized (responseObserver) { responseObserver.onNext(packet); } - } catch (Exception ignored) { - // ignore downstream delivery failure + } catch (Exception ex) { + RealtimeMeetingGrpcSessionServiceImpl.log.debug( + "Ignore downstream realtime gRPC delivery failure, connectionId={}", + connectionId, + ex + ); } } @@ -281,8 +326,12 @@ public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrp private void complete() { try { responseObserver.onCompleted(); - } catch (Exception ignored) { - // ignore observer completion failure + } catch (Exception ex) { + RealtimeMeetingGrpcSessionServiceImpl.log.debug( + "Ignore downstream realtime gRPC completion failure, connectionId={}", + connectionId, + ex + ); } } } diff --git a/backend/src/test/java/com/imeeting/service/biz/impl/MeetingCommandServiceImplTest.java b/backend/src/test/java/com/imeeting/service/biz/impl/MeetingCommandServiceImplTest.java index c340d2d..464f04a 100644 --- a/backend/src/test/java/com/imeeting/service/biz/impl/MeetingCommandServiceImplTest.java +++ b/backend/src/test/java/com/imeeting/service/biz/impl/MeetingCommandServiceImplTest.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.imeeting.dto.biz.CreateMeetingCommand; import com.imeeting.dto.biz.CreateRealtimeMeetingCommand; import com.imeeting.dto.biz.MeetingVO; +import com.imeeting.dto.biz.RealtimeTranscriptItemDTO; import com.imeeting.entity.biz.Meeting; import com.imeeting.service.biz.AiTaskService; import com.imeeting.service.biz.HotWordService; @@ -22,6 +23,7 @@ import java.time.LocalDateTime; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doAnswer; @@ -246,6 +248,41 @@ class MeetingCommandServiceImplTest { verify(audioStorageService, never()).finalizeMeetingAudio(203L); } + @Test + void saveRealtimeTranscriptSnapshotShouldIgnoreNonFinalTranscript() { + MeetingService meetingService = mock(MeetingService.class); + MeetingDomainSupport meetingDomainSupport = mock(MeetingDomainSupport.class); + com.imeeting.mapper.biz.MeetingTranscriptMapper transcriptMapper = mock(com.imeeting.mapper.biz.MeetingTranscriptMapper.class); + RealtimeMeetingSessionStateService sessionStateService = mock(RealtimeMeetingSessionStateService.class); + + MeetingCommandServiceImpl service = new MeetingCommandServiceImpl( + meetingService, + mock(AiTaskService.class), + mock(HotWordService.class), + transcriptMapper, + mock(MeetingSummaryFileService.class), + meetingDomainSupport, + sessionStateService, + mock(RealtimeMeetingAudioStorageService.class), + mock(StringRedisTemplate.class), + new ObjectMapper() + ); + + RealtimeTranscriptItemDTO item = new RealtimeTranscriptItemDTO(); + item.setSpeakerId("spk-1"); + item.setSpeakerName("Speaker 1"); + item.setContent("partial transcript"); + item.setStartTime(100); + item.setEndTime(500); + + service.saveRealtimeTranscriptSnapshot(1001L, item, false); + + verify(transcriptMapper, never()).selectOne(any()); + verify(transcriptMapper, never()).insert(any()); + verify(transcriptMapper, never()).update(any(), any()); + verify(sessionStateService, never()).refreshAfterTranscript(anyLong()); + } + @Test void reSummaryShouldDispatchAfterTransactionCommit() { MeetingService meetingService = mock(MeetingService.class);