feat: 增强实时会议gRPC服务和会话状态管理

- 在 `MeetingCommandServiceImpl` 中更新 `saveRealtimeTranscriptSnapshot` 方法,仅保存最终结果
- 在 `GrpcServerLifecycle` 中添加 `GrpcExceptionLoggingInterceptor`
- 在 `RealtimeMeetingSessionStateServiceImpl` 中添加终端状态处理逻辑
- 在 `RealtimeMeetingGrpcService` 中增强错误处理和流关闭逻辑
- 添加 `saveRealtimeTranscriptSnapshotShouldIgnoreNonFinalTranscript` 测试用例
- 在 `MeetingAuthorizationServiceImpl` 中添加匿名访问支持
- 在 `RealtimeMeetingGrpcSessionServiceImpl` 中添加异常处理和清理逻辑
dev_na
chenhao 2026-04-08 19:15:03 +08:00
parent e83a0ece32
commit b2e2f2c46a
8 changed files with 227 additions and 30 deletions

13
.gitignore vendored
View File

@ -1,2 +1,11 @@
backend/target/
node_modules/
# Local environment files
.env
.env.*
backend/.env
backend/.env.*
backend/src/main/resources/application-local.yml
# Logs
*.log
!backend/.env.example
.omx/

View File

@ -21,6 +21,7 @@ import java.util.List;
public class GrpcServerLifecycle {
private final GrpcServerProperties properties;
private final GrpcExceptionLoggingInterceptor grpcExceptionLoggingInterceptor;
private final List<BindableService> bindableServices;
private Server server;
@ -32,7 +33,8 @@ public class GrpcServerLifecycle {
}
NettyServerBuilder builder = NettyServerBuilder.forPort(properties.getPort())
.maxInboundMessageSize(properties.getMaxInboundMessageSize());
.maxInboundMessageSize(properties.getMaxInboundMessageSize())
.intercept(grpcExceptionLoggingInterceptor);
bindableServices.forEach(builder::addService);
if (properties.isReflectionEnabled()) {
builder.addService(ProtoReflectionService.newInstance());

View File

@ -1,13 +1,16 @@
package com.imeeting.grpc.realtime;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.grpc.common.ErrorEvent;
import com.imeeting.service.android.AndroidAuthService;
import com.imeeting.service.realtime.RealtimeMeetingGrpcSessionService;
import io.grpc.BindableService;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class RealtimeMeetingGrpcService extends RealtimeMeetingServiceGrpc.RealtimeMeetingServiceImplBase implements BindableService {
@ -20,9 +23,14 @@ public class RealtimeMeetingGrpcService extends RealtimeMeetingServiceGrpc.Realt
return new StreamObserver<>() {
private String connectionId;
private AndroidAuthContext authContext;
private boolean completed;
@Override
public void onNext(RealtimeClientPacket packet) {
if (completed) {
return;
}
try {
switch (packet.getBodyCase()) {
case OPEN -> handleOpen(packet);
case AUDIO -> handleAudio(packet.getAudio());
@ -30,21 +38,25 @@ public class RealtimeMeetingGrpcService extends RealtimeMeetingServiceGrpc.Realt
case BODY_NOT_SET -> {
}
}
} catch (Exception ex) {
handleProcessingError(packet, ex);
}
}
@Override
public void onError(Throwable t) {
if (connectionId != null) {
realtimeMeetingGrpcSessionService.closeStream(connectionId, "client_error", false);
}
completed = true;
log.warn("Realtime meeting gRPC client stream failed, connectionId={}", connectionId, t);
safeCloseStream("client_error", false);
}
@Override
public void onCompleted() {
completed = true;
if (connectionId != null) {
realtimeMeetingGrpcSessionService.closeStream(connectionId, "client_completed", false);
safeCloseStream("client_completed", false);
} else {
responseObserver.onCompleted();
safeCompleteResponse();
}
}
@ -71,11 +83,57 @@ public class RealtimeMeetingGrpcService extends RealtimeMeetingServiceGrpc.Realt
}
switch (control.getType()) {
case STOP_SPEAKING, END_INPUT -> realtimeMeetingGrpcSessionService.onStopSpeaking(connectionId);
case CLOSE_STREAM -> realtimeMeetingGrpcSessionService.closeStream(connectionId, "client_close_stream", true);
case CLOSE_STREAM -> safeCloseStream("client_close_stream", true);
case START, CONTROL_TYPE_UNSPECIFIED -> {
}
}
}
private void handleProcessingError(RealtimeClientPacket packet, Exception ex) {
String requestId = packet == null ? "" : packet.getRequestId();
log.error("Realtime meeting gRPC packet processing failed, requestId={}, connectionId={}", requestId, connectionId, ex);
safeSendError(requestId, ex.getMessage());
completed = true;
if (connectionId != null) {
safeCloseStream("grpc_processing_error", true);
} else {
safeCompleteResponse();
}
}
private void safeSendError(String requestId, String message) {
try {
responseObserver.onNext(RealtimeServerPacket.newBuilder()
.setRequestId(requestId == null ? "" : requestId)
.setError(ErrorEvent.newBuilder()
.setCode("REALTIME_GRPC_ERROR")
.setMessage(message == null || message.isBlank() ? "Realtime meeting gRPC processing failed" : message)
.setRetryable(false)
.build())
.build());
} catch (Exception observerEx) {
log.warn("Failed to deliver realtime gRPC error packet, requestId={}, connectionId={}", requestId, connectionId, observerEx);
}
}
private void safeCloseStream(String reason, boolean notifyClient) {
if (connectionId == null) {
return;
}
try {
realtimeMeetingGrpcSessionService.closeStream(connectionId, reason, notifyClient);
} catch (Exception closeEx) {
log.error("Failed to close realtime gRPC stream, connectionId={}, reason={}", connectionId, reason, closeEx);
}
}
private void safeCompleteResponse() {
try {
responseObserver.onCompleted();
} catch (Exception observerEx) {
log.warn("Failed to complete realtime gRPC response, connectionId={}", connectionId, observerEx);
}
}
};
}
}

View File

@ -16,19 +16,32 @@ public class MeetingAuthorizationServiceImpl implements MeetingAuthorizationServ
@Override
public void assertCanCreateMeeting(AndroidAuthContext authContext) {
if (allowAnonymous(authContext)) {
return;
}
requireUser(authContext);
}
@Override
public void assertCanViewMeeting(Meeting meeting, AndroidAuthContext authContext) {
if (allowAnonymous(authContext)) {
return;
}
meetingAccessService.assertCanViewMeeting(meeting, requireUser(authContext));
}
@Override
public void assertCanManageRealtimeMeeting(Meeting meeting, AndroidAuthContext authContext) {
if (allowAnonymous(authContext)) {
return;
}
meetingAccessService.assertCanManageRealtimeMeeting(meeting, requireUser(authContext));
}
private boolean allowAnonymous(AndroidAuthContext authContext) {
return authContext != null && authContext.isAnonymous();
}
private LoginUser requireUser(AndroidAuthContext authContext) {
if (authContext == null || authContext.isAnonymous() || authContext.getUserId() == null || authContext.getTenantId() == null) {
throw new RuntimeException("安卓用户未登录或认证无效");

View File

@ -174,7 +174,7 @@ public class MeetingCommandServiceImpl implements MeetingCommandService {
@Override
@Transactional(rollbackFor = Exception.class)
public void saveRealtimeTranscriptSnapshot(Long meetingId, RealtimeTranscriptItemDTO item, boolean finalResult) {
if (item == null || item.getContent() == null || item.getContent().isBlank()) {
if (!finalResult || item == null || item.getContent() == null || item.getContent().isBlank()) {
return;
}

View File

@ -124,6 +124,15 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
if (state == null) {
return buildFallbackStatus(meetingId);
}
if (isRedisTerminalStatus(state.getStatus())) {
return toStatusVO(state);
}
RealtimeMeetingSessionStatusVO terminalFallback = terminalFallbackIfDatabaseTerminal(meetingId);
if (terminalFallback != null) {
clear(meetingId);
return terminalFallback;
}
return toStatusVO(state);
}
@ -247,6 +256,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
redisTemplate.delete(RedisKeys.realtimeMeetingSessionStateKey(meetingId));
redisTemplate.delete(RedisKeys.realtimeMeetingResumeTimeoutKey(meetingId));
redisTemplate.delete(RedisKeys.realtimeMeetingEmptyTimeoutKey(meetingId));
redisTemplate.delete(RedisKeys.realtimeMeetingEventSeqKey(meetingId));
}
private RealtimeMeetingSessionStatusVO pauseState(Long meetingId, RealtimeMeetingSessionState state) {
@ -279,9 +289,12 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
}
private RealtimeMeetingSessionStatusVO buildFallbackStatus(Long meetingId) {
return buildFallbackStatus(meetingId, meetingMapper.selectById(meetingId));
}
private RealtimeMeetingSessionStatusVO buildFallbackStatus(Long meetingId, Meeting meeting) {
RealtimeMeetingSessionStatusVO vo = new RealtimeMeetingSessionStatusVO();
vo.setMeetingId(meetingId);
Meeting meeting = meetingMapper.selectById(meetingId);
if (meeting == null) {
vo.setStatus("IDLE");
vo.setHasTranscript(false);
@ -293,7 +306,7 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
if (Integer.valueOf(2).equals(meeting.getStatus())) {
vo.setStatus("COMPLETING");
} else if (Integer.valueOf(3).equals(meeting.getStatus()) || Integer.valueOf(4).equals(meeting.getStatus())) {
} else if (isDatabaseTerminalStatus(meeting.getStatus())) {
vo.setStatus("COMPLETED");
} else {
vo.setStatus("IDLE");
@ -305,6 +318,22 @@ public class RealtimeMeetingSessionStateServiceImpl implements RealtimeMeetingSe
return vo;
}
private RealtimeMeetingSessionStatusVO terminalFallbackIfDatabaseTerminal(Long meetingId) {
Meeting meeting = meetingMapper.selectById(meetingId);
if (meeting == null || !isDatabaseTerminalStatus(meeting.getStatus())) {
return null;
}
return buildFallbackStatus(meetingId, meeting);
}
private boolean isDatabaseTerminalStatus(Integer status) {
return Integer.valueOf(3).equals(status) || Integer.valueOf(4).equals(status);
}
private boolean isRedisTerminalStatus(String status) {
return "COMPLETED".equals(status);
}
private RealtimeMeetingSessionStatusVO toStatusVO(RealtimeMeetingSessionState state) {
RealtimeMeetingSessionStatusVO vo = new RealtimeMeetingSessionStatusVO();
vo.setMeetingId(state.getMeetingId());

View File

@ -72,10 +72,16 @@ public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrp
throw new RuntimeException("Duplicate realtime gRPC connectionId");
}
try {
writeConnectionState(runtime);
realtimeMeetingAudioStorageService.openSession(sessionData.getMeetingId(), connectionId);
runtime.upstreamSession = asrUpstreamBridgeService.openSession(sessionData, connectionId, new UpstreamCallback(runtime));
return connectionId;
} catch (Exception ex) {
sessions.remove(connectionId);
cleanupFailedOpen(runtime);
throw ex;
}
}
@Override
@ -120,9 +126,24 @@ public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrp
log.warn("Failed to close upstream realtime session, connectionId={}", connectionId, ex);
}
try {
redisTemplate.delete(RedisKeys.realtimeMeetingGrpcConnectionKey(connectionId));
} catch (Exception ex) {
log.error("Failed to delete realtime gRPC connection state, connectionId={}", connectionId, ex);
}
try {
realtimeMeetingAudioStorageService.closeSession(connectionId);
} catch (Exception ex) {
log.error("Failed to close realtime gRPC audio session, connectionId={}", connectionId, ex);
}
try {
realtimeMeetingSessionStateService.pauseByDisconnect(runtime.sessionData.getMeetingId(), connectionId);
} catch (Exception ex) {
log.error("Failed to pause realtime meeting by disconnect, meetingId={}, connectionId={}",
runtime.sessionData.getMeetingId(), connectionId, ex);
}
if (notifyClient) {
runtime.send(RealtimeServerPacket.newBuilder()
@ -135,6 +156,26 @@ public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrp
runtime.complete();
}
private void cleanupFailedOpen(SessionRuntime runtime) {
try {
redisTemplate.delete(RedisKeys.realtimeMeetingGrpcConnectionKey(runtime.connectionId));
} catch (Exception ex) {
log.warn("Failed to rollback realtime gRPC connection state, connectionId={}", runtime.connectionId, ex);
}
try {
realtimeMeetingAudioStorageService.closeSession(runtime.connectionId);
} catch (Exception ex) {
log.warn("Failed to rollback realtime gRPC audio session, connectionId={}", runtime.connectionId, ex);
}
try {
if (runtime.upstreamSession != null) {
runtime.upstreamSession.close("open_failed");
}
} catch (Exception ex) {
log.warn("Failed to rollback realtime gRPC upstream session, connectionId={}", runtime.connectionId, ex);
}
}
private void writeConnectionState(SessionRuntime runtime) {
Duration ttl = Duration.ofSeconds(grpcServerProperties.getRealtime().getConnectionTtlSeconds());
String value = runtime.sessionData.getMeetingId() + ":" + runtime.sessionData.getDeviceId() + ":" + runtime.streamToken;
@ -203,7 +244,7 @@ public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrp
@Override
public void onTranscript(AsrUpstreamBridgeService.AsrTranscriptResult result) {
if (runtime.closed.get() || result == null || result.getText() == null || result.getText().isBlank()) {
if (runtime.closed.get() || result == null || !result.isFinalResult() || result.getText() == null || result.getText().isBlank()) {
return;
}
RealtimeTranscriptItemDTO item = new RealtimeTranscriptItemDTO();
@ -263,8 +304,12 @@ public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrp
synchronized (responseObserver) {
responseObserver.onNext(packet);
}
} catch (Exception ignored) {
// ignore downstream delivery failure
} catch (Exception ex) {
RealtimeMeetingGrpcSessionServiceImpl.log.debug(
"Ignore downstream realtime gRPC delivery failure, connectionId={}",
connectionId,
ex
);
}
}
@ -281,8 +326,12 @@ public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrp
private void complete() {
try {
responseObserver.onCompleted();
} catch (Exception ignored) {
// ignore observer completion failure
} catch (Exception ex) {
RealtimeMeetingGrpcSessionServiceImpl.log.debug(
"Ignore downstream realtime gRPC completion failure, connectionId={}",
connectionId,
ex
);
}
}
}

View File

@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.imeeting.dto.biz.CreateMeetingCommand;
import com.imeeting.dto.biz.CreateRealtimeMeetingCommand;
import com.imeeting.dto.biz.MeetingVO;
import com.imeeting.dto.biz.RealtimeTranscriptItemDTO;
import com.imeeting.entity.biz.Meeting;
import com.imeeting.service.biz.AiTaskService;
import com.imeeting.service.biz.HotWordService;
@ -22,6 +23,7 @@ import java.time.LocalDateTime;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doAnswer;
@ -246,6 +248,41 @@ class MeetingCommandServiceImplTest {
verify(audioStorageService, never()).finalizeMeetingAudio(203L);
}
@Test
void saveRealtimeTranscriptSnapshotShouldIgnoreNonFinalTranscript() {
MeetingService meetingService = mock(MeetingService.class);
MeetingDomainSupport meetingDomainSupport = mock(MeetingDomainSupport.class);
com.imeeting.mapper.biz.MeetingTranscriptMapper transcriptMapper = mock(com.imeeting.mapper.biz.MeetingTranscriptMapper.class);
RealtimeMeetingSessionStateService sessionStateService = mock(RealtimeMeetingSessionStateService.class);
MeetingCommandServiceImpl service = new MeetingCommandServiceImpl(
meetingService,
mock(AiTaskService.class),
mock(HotWordService.class),
transcriptMapper,
mock(MeetingSummaryFileService.class),
meetingDomainSupport,
sessionStateService,
mock(RealtimeMeetingAudioStorageService.class),
mock(StringRedisTemplate.class),
new ObjectMapper()
);
RealtimeTranscriptItemDTO item = new RealtimeTranscriptItemDTO();
item.setSpeakerId("spk-1");
item.setSpeakerName("Speaker 1");
item.setContent("partial transcript");
item.setStartTime(100);
item.setEndTime(500);
service.saveRealtimeTranscriptSnapshot(1001L, item, false);
verify(transcriptMapper, never()).selectOne(any());
verify(transcriptMapper, never()).insert(any());
verify(transcriptMapper, never()).update(any(), any());
verify(sessionStateService, never()).refreshAfterTranscript(anyLong());
}
@Test
void reSummaryShouldDispatchAfterTransactionCommit() {
MeetingService meetingService = mock(MeetingService.class);