From f0d63c97a3b6888185c93b8b64fde00f5d519135 Mon Sep 17 00:00:00 2001 From: chenhao Date: Wed, 1 Apr 2026 09:27:30 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0Android=E5=AE=9E?= =?UTF-8?q?=E6=97=B6=E4=BC=9A=E8=AE=AEgRPC=E6=9C=8D=E5=8A=A1=E5=92=8C?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 定义 `common.proto` 和 `realtime_meeting.proto`,包含客户端认证、错误事件和实时会议相关的gRPC消息 - 实现 `AndroidGatewayPushServiceImpl`,处理设备注册、注销和消息推送 - 实现 `AndroidDeviceSessionServiceImpl`,管理设备会话状态和心跳更新 - 实现 `AsrUpstreamBridgeServiceImpl`,桥接ASR上游服务并处理音频流 - 实现 `AndroidRealtimeSessionTicketServiceImpl`,创建和获取实时会议会话票据 - 定义 `gateway.proto`,包含网关服务的消息定义 --- backend/pom.xml | 59 +++- .../java/com/imeeting/common/RedisKeys.java | 36 +++ .../config/AndroidApiSecurityConfig.java | 21 ++ .../grpc/AndroidGrpcAuthProperties.java | 12 + .../config/grpc/GrpcServerLifecycle.java | 54 ++++ .../config/grpc/GrpcServerProperties.java | 31 ++ .../AndroidMeetingRealtimeController.java | 75 +++++ .../dto/android/AndroidAuthContext.java | 15 + .../android/AndroidDeviceSessionState.java | 14 + ...AndroidOpenRealtimeGrpcSessionCommand.java | 19 ++ .../AndroidRealtimeGrpcSessionData.java | 16 + .../android/AndroidRealtimeGrpcSessionVO.java | 17 ++ .../gateway/AndroidGatewayGrpcService.java | 116 ++++++++ .../realtime/RealtimeMeetingGrpcService.java | 76 +++++ .../service/android/AndroidAuthService.java | 11 + .../android/AndroidDeviceSessionService.java | 19 ++ .../android/AndroidGatewayPushService.java | 14 + .../android/impl/AndroidAuthServiceImpl.java | 89 ++++++ .../impl/AndroidDeviceSessionServiceImpl.java | 110 +++++++ .../impl/AndroidGatewayPushServiceImpl.java | 69 +++++ .../AndroidRealtimeSessionTicketService.java | 12 + .../realtime/AsrUpstreamBridgeService.java | 71 +++++ .../RealtimeMeetingGrpcSessionService.java | 15 + ...droidRealtimeSessionTicketServiceImpl.java | 219 ++++++++++++++ .../impl/AsrUpstreamBridgeServiceImpl.java | 262 +++++++++++++++++ ...RealtimeMeetingGrpcSessionServiceImpl.java | 278 ++++++++++++++++++ backend/src/main/proto/android/common.proto | 36 +++ backend/src/main/proto/android/gateway.proto | 85 ++++++ .../main/proto/android/realtime_meeting.proto | 100 +++++++ backend/src/main/resources/application.yml | 20 +- 30 files changed, 1965 insertions(+), 6 deletions(-) create mode 100644 backend/src/main/java/com/imeeting/config/AndroidApiSecurityConfig.java create mode 100644 backend/src/main/java/com/imeeting/config/grpc/AndroidGrpcAuthProperties.java create mode 100644 backend/src/main/java/com/imeeting/config/grpc/GrpcServerLifecycle.java create mode 100644 backend/src/main/java/com/imeeting/config/grpc/GrpcServerProperties.java create mode 100644 backend/src/main/java/com/imeeting/controller/android/AndroidMeetingRealtimeController.java create mode 100644 backend/src/main/java/com/imeeting/dto/android/AndroidAuthContext.java create mode 100644 backend/src/main/java/com/imeeting/dto/android/AndroidDeviceSessionState.java create mode 100644 backend/src/main/java/com/imeeting/dto/android/AndroidOpenRealtimeGrpcSessionCommand.java create mode 100644 backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionData.java create mode 100644 backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionVO.java create mode 100644 backend/src/main/java/com/imeeting/grpc/gateway/AndroidGatewayGrpcService.java create mode 100644 backend/src/main/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcService.java create mode 100644 backend/src/main/java/com/imeeting/service/android/AndroidAuthService.java create mode 100644 backend/src/main/java/com/imeeting/service/android/AndroidDeviceSessionService.java create mode 100644 backend/src/main/java/com/imeeting/service/android/AndroidGatewayPushService.java create mode 100644 backend/src/main/java/com/imeeting/service/android/impl/AndroidAuthServiceImpl.java create mode 100644 backend/src/main/java/com/imeeting/service/android/impl/AndroidDeviceSessionServiceImpl.java create mode 100644 backend/src/main/java/com/imeeting/service/android/impl/AndroidGatewayPushServiceImpl.java create mode 100644 backend/src/main/java/com/imeeting/service/realtime/AndroidRealtimeSessionTicketService.java create mode 100644 backend/src/main/java/com/imeeting/service/realtime/AsrUpstreamBridgeService.java create mode 100644 backend/src/main/java/com/imeeting/service/realtime/RealtimeMeetingGrpcSessionService.java create mode 100644 backend/src/main/java/com/imeeting/service/realtime/impl/AndroidRealtimeSessionTicketServiceImpl.java create mode 100644 backend/src/main/java/com/imeeting/service/realtime/impl/AsrUpstreamBridgeServiceImpl.java create mode 100644 backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImpl.java create mode 100644 backend/src/main/proto/android/common.proto create mode 100644 backend/src/main/proto/android/gateway.proto create mode 100644 backend/src/main/proto/android/realtime_meeting.proto diff --git a/backend/pom.xml b/backend/pom.xml index 1a3188c..e3bbb83 100644 --- a/backend/pom.xml +++ b/backend/pom.xml @@ -1,4 +1,4 @@ - 4.0.0 @@ -20,6 +20,10 @@ 3.5.6 0.11.5 1.6.2 + 1.76.1 + 3.25.8 + 0.6.1 + 1.7.1 @@ -88,6 +92,26 @@ pinyin4j 2.5.1 + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + io.grpc + grpc-services + ${grpc.version} + org.projectlombok lombok @@ -133,14 +157,39 @@ unisbase-spring-boot-starter 0.1.0 - - org.springframework.boot - spring-boot-starter-actuator - + + org.springframework.boot + spring-boot-starter-actuator + + + + kr.motd.maven + os-maven-plugin + ${os.maven.plugin.version} + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + ${protobuf.plugin.version} + + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + org.springframework.boot spring-boot-maven-plugin diff --git a/backend/src/main/java/com/imeeting/common/RedisKeys.java b/backend/src/main/java/com/imeeting/common/RedisKeys.java index 71c3cc5..789a06e 100644 --- a/backend/src/main/java/com/imeeting/common/RedisKeys.java +++ b/backend/src/main/java/com/imeeting/common/RedisKeys.java @@ -71,6 +71,42 @@ public final class RedisKeys { return "biz:meeting:realtime:empty-timeout:"; } + public static String androidDeviceOnlineKey(String deviceId) { + return "biz:android:device:online:" + deviceId; + } + + public static String androidDeviceActiveConnectionKey(String deviceId) { + return "biz:android:device:active-conn:" + deviceId; + } + + public static String androidDeviceConnectionKey(String connectionId) { + return "biz:android:device:conn:" + connectionId; + } + + public static String androidDeviceTopicsKey(String deviceId) { + return "biz:android:device:topics:" + deviceId; + } + + public static String androidDeviceOutboxKey(String deviceId) { + return "biz:android:device:outbox:" + deviceId; + } + + public static String androidDeviceMessageSeqKey(String deviceId) { + return "biz:android:device:message-seq:" + deviceId; + } + + public static String realtimeMeetingGrpcSessionKey(String streamToken) { + return "biz:meeting:realtime:grpc-session:" + streamToken; + } + + public static String realtimeMeetingGrpcConnectionKey(String connectionId) { + return "biz:meeting:realtime:grpc-conn:" + connectionId; + } + + public static String realtimeMeetingEventSeqKey(Long meetingId) { + return "biz:meeting:realtime:event-seq:" + meetingId; + } + public static final String CACHE_EMPTY_MARKER = "EMPTY_MARKER"; public static final String SYS_PARAM_FIELD_VALUE = "value"; public static final String SYS_PARAM_FIELD_TYPE = "type"; diff --git a/backend/src/main/java/com/imeeting/config/AndroidApiSecurityConfig.java b/backend/src/main/java/com/imeeting/config/AndroidApiSecurityConfig.java new file mode 100644 index 0000000..126f060 --- /dev/null +++ b/backend/src/main/java/com/imeeting/config/AndroidApiSecurityConfig.java @@ -0,0 +1,21 @@ +package com.imeeting.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.annotation.Order; +import org.springframework.security.config.annotation.web.builders.HttpSecurity; +import org.springframework.security.config.annotation.web.configurers.AbstractHttpConfigurer; +import org.springframework.security.web.SecurityFilterChain; + +@Configuration +public class AndroidApiSecurityConfig { + + @Bean + @Order(1) + public SecurityFilterChain androidApiSecurityFilterChain(HttpSecurity http) throws Exception { + http.securityMatcher("/api/android/**") + .csrf(AbstractHttpConfigurer::disable) + .authorizeHttpRequests(authorize -> authorize.anyRequest().permitAll()); + return http.build(); + } +} diff --git a/backend/src/main/java/com/imeeting/config/grpc/AndroidGrpcAuthProperties.java b/backend/src/main/java/com/imeeting/config/grpc/AndroidGrpcAuthProperties.java new file mode 100644 index 0000000..fa43080 --- /dev/null +++ b/backend/src/main/java/com/imeeting/config/grpc/AndroidGrpcAuthProperties.java @@ -0,0 +1,12 @@ +package com.imeeting.config.grpc; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@Data +@ConfigurationProperties(prefix = "imeeting.grpc.auth") +public class AndroidGrpcAuthProperties { + + private boolean enabled = false; + private boolean allowAnonymous = true; +} diff --git a/backend/src/main/java/com/imeeting/config/grpc/GrpcServerLifecycle.java b/backend/src/main/java/com/imeeting/config/grpc/GrpcServerLifecycle.java new file mode 100644 index 0000000..95f9d7a --- /dev/null +++ b/backend/src/main/java/com/imeeting/config/grpc/GrpcServerLifecycle.java @@ -0,0 +1,54 @@ +package com.imeeting.config.grpc; + +import io.grpc.BindableService; +import io.grpc.Server; +import io.grpc.protobuf.services.ProtoReflectionService; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.util.List; + +@Slf4j +@Component +@RequiredArgsConstructor +@EnableConfigurationProperties({GrpcServerProperties.class, AndroidGrpcAuthProperties.class}) +public class GrpcServerLifecycle { + + private final GrpcServerProperties properties; + private final List bindableServices; + private Server server; + + @PostConstruct + public void start() throws IOException { + if (!properties.isEnabled()) { + log.info("gRPC server is disabled by configuration"); + return; + } + + NettyServerBuilder builder = NettyServerBuilder.forPort(properties.getPort()) + .maxInboundMessageSize(properties.getMaxInboundMessageSize()); + bindableServices.forEach(builder::addService); + if (properties.isReflectionEnabled()) { + builder.addService(ProtoReflectionService.newInstance()); + } + + server = builder.build(); + server.start(); + log.info("gRPC server started on port {} with {} services", properties.getPort(), bindableServices.size()); + } + + @PreDestroy + public void stop() { + if (server == null) { + return; + } + log.info("Stopping gRPC server"); + server.shutdown(); + } +} diff --git a/backend/src/main/java/com/imeeting/config/grpc/GrpcServerProperties.java b/backend/src/main/java/com/imeeting/config/grpc/GrpcServerProperties.java new file mode 100644 index 0000000..58b1a67 --- /dev/null +++ b/backend/src/main/java/com/imeeting/config/grpc/GrpcServerProperties.java @@ -0,0 +1,31 @@ +package com.imeeting.config.grpc; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@Data +@ConfigurationProperties(prefix = "imeeting.grpc") +public class GrpcServerProperties { + + private boolean enabled = true; + private int port = 19090; + private int maxInboundMessageSize = 4194304; + private boolean reflectionEnabled = true; + private Gateway gateway = new Gateway(); + private Realtime realtime = new Realtime(); + + @Data + public static class Gateway { + private long heartbeatIntervalSeconds = 15; + private long heartbeatTimeoutSeconds = 45; + } + + @Data + public static class Realtime { + private long sessionTtlSeconds = 600; + private int sampleRate = 16000; + private int channels = 1; + private String encoding = "PCM16LE"; + private long connectionTtlSeconds = 1800; + } +} diff --git a/backend/src/main/java/com/imeeting/controller/android/AndroidMeetingRealtimeController.java b/backend/src/main/java/com/imeeting/controller/android/AndroidMeetingRealtimeController.java new file mode 100644 index 0000000..b0f55ed --- /dev/null +++ b/backend/src/main/java/com/imeeting/controller/android/AndroidMeetingRealtimeController.java @@ -0,0 +1,75 @@ +package com.imeeting.controller.android; + +import com.imeeting.dto.android.AndroidOpenRealtimeGrpcSessionCommand; +import com.imeeting.dto.android.AndroidRealtimeGrpcSessionVO; +import com.imeeting.dto.biz.MeetingTranscriptVO; +import com.imeeting.dto.biz.RealtimeMeetingCompleteDTO; +import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO; +import com.imeeting.service.android.AndroidAuthService; +import com.imeeting.service.biz.MeetingAccessService; +import com.imeeting.service.biz.MeetingCommandService; +import com.imeeting.service.biz.MeetingQueryService; +import com.imeeting.service.biz.RealtimeMeetingSessionStateService; +import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService; +import com.unisbase.common.ApiResponse; +import jakarta.servlet.http.HttpServletRequest; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +@RestController +@RequestMapping("/api/android/meeting") +@RequiredArgsConstructor +public class AndroidMeetingRealtimeController { + + private final AndroidAuthService androidAuthService; + private final MeetingAccessService meetingAccessService; + private final MeetingQueryService meetingQueryService; + private final MeetingCommandService meetingCommandService; + private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; + private final AndroidRealtimeSessionTicketService androidRealtimeSessionTicketService; + + @GetMapping("/{id}/realtime/session-status") + public ApiResponse getRealtimeSessionStatus(@PathVariable Long id, HttpServletRequest request) { + androidAuthService.authenticateHttp(request); + meetingAccessService.requireMeeting(id); + return ApiResponse.ok(realtimeMeetingSessionStateService.getStatus(id)); + } + + @GetMapping("/{id}/transcripts") + public ApiResponse> getTranscripts(@PathVariable Long id, HttpServletRequest request) { + androidAuthService.authenticateHttp(request); + meetingAccessService.requireMeeting(id); + return ApiResponse.ok(meetingQueryService.getTranscripts(id)); + } + + @PostMapping("/{id}/realtime/pause") + public ApiResponse pauseRealtimeMeeting(@PathVariable Long id, HttpServletRequest request) { + androidAuthService.authenticateHttp(request); + meetingAccessService.requireMeeting(id); + return ApiResponse.ok(realtimeMeetingSessionStateService.pause(id)); + } + + @PostMapping("/{id}/realtime/complete") + public ApiResponse completeRealtimeMeeting(@PathVariable Long id, + HttpServletRequest request, + @RequestBody(required = false) RealtimeMeetingCompleteDTO dto) { + androidAuthService.authenticateHttp(request); + meetingAccessService.requireMeeting(id); + meetingCommandService.completeRealtimeMeeting(id, dto != null ? dto.getAudioUrl() : null); + return ApiResponse.ok(true); + } + + @PostMapping("/{id}/realtime/grpc-session") + public ApiResponse openRealtimeGrpcSession(@PathVariable Long id, + HttpServletRequest request, + @RequestBody(required = false) AndroidOpenRealtimeGrpcSessionCommand command) { + return ApiResponse.ok(androidRealtimeSessionTicketService.createSession(id, command, androidAuthService.authenticateHttp(request))); + } +} diff --git a/backend/src/main/java/com/imeeting/dto/android/AndroidAuthContext.java b/backend/src/main/java/com/imeeting/dto/android/AndroidAuthContext.java new file mode 100644 index 0000000..537de0a --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/android/AndroidAuthContext.java @@ -0,0 +1,15 @@ +package com.imeeting.dto.android; + +import lombok.Data; + +@Data +public class AndroidAuthContext { + private String authMode; + private String deviceId; + private String tenantCode; + private String appId; + private String appVersion; + private String platform; + private String accessToken; + private boolean anonymous; +} diff --git a/backend/src/main/java/com/imeeting/dto/android/AndroidDeviceSessionState.java b/backend/src/main/java/com/imeeting/dto/android/AndroidDeviceSessionState.java new file mode 100644 index 0000000..2cf3ad0 --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/android/AndroidDeviceSessionState.java @@ -0,0 +1,14 @@ +package com.imeeting.dto.android; + +import lombok.Data; + +@Data +public class AndroidDeviceSessionState { + private String connectionId; + private String deviceId; + private String status; + private Long lastSeenAt; + private String appVersion; + private String platform; + private String tenantCode; +} diff --git a/backend/src/main/java/com/imeeting/dto/android/AndroidOpenRealtimeGrpcSessionCommand.java b/backend/src/main/java/com/imeeting/dto/android/AndroidOpenRealtimeGrpcSessionCommand.java new file mode 100644 index 0000000..b583fe4 --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/android/AndroidOpenRealtimeGrpcSessionCommand.java @@ -0,0 +1,19 @@ +package com.imeeting.dto.android; + +import lombok.Data; + +import java.util.List; +import java.util.Map; + +@Data +public class AndroidOpenRealtimeGrpcSessionCommand { + private Long asrModelId; + private String mode; + private String language; + private Integer useSpkId; + private Boolean enablePunctuation; + private Boolean enableItn; + private Boolean enableTextRefine; + private Boolean saveAudio; + private List> hotwords; +} diff --git a/backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionData.java b/backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionData.java new file mode 100644 index 0000000..da12ec9 --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionData.java @@ -0,0 +1,16 @@ +package com.imeeting.dto.android; + +import com.imeeting.dto.biz.RealtimeMeetingResumeConfig; +import lombok.Data; + +@Data +public class AndroidRealtimeGrpcSessionData { + private Long meetingId; + private Long tenantId; + private Long userId; + private String deviceId; + private Long asrModelId; + private String targetWsUrl; + private String startMessageJson; + private RealtimeMeetingResumeConfig resumeConfig; +} diff --git a/backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionVO.java b/backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionVO.java new file mode 100644 index 0000000..e1dce32 --- /dev/null +++ b/backend/src/main/java/com/imeeting/dto/android/AndroidRealtimeGrpcSessionVO.java @@ -0,0 +1,17 @@ +package com.imeeting.dto.android; + +import com.imeeting.dto.biz.RealtimeMeetingResumeConfig; +import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO; +import lombok.Data; + +@Data +public class AndroidRealtimeGrpcSessionVO { + private Long meetingId; + private String streamToken; + private Long expiresInSeconds; + private Integer sampleRate; + private Integer channels; + private String encoding; + private RealtimeMeetingResumeConfig resumeConfig; + private RealtimeMeetingSessionStatusVO status; +} diff --git a/backend/src/main/java/com/imeeting/grpc/gateway/AndroidGatewayGrpcService.java b/backend/src/main/java/com/imeeting/grpc/gateway/AndroidGatewayGrpcService.java new file mode 100644 index 0000000..1a09b02 --- /dev/null +++ b/backend/src/main/java/com/imeeting/grpc/gateway/AndroidGatewayGrpcService.java @@ -0,0 +1,116 @@ +package com.imeeting.grpc.gateway; + +import com.imeeting.config.grpc.GrpcServerProperties; +import com.imeeting.dto.android.AndroidAuthContext; +import com.imeeting.dto.android.AndroidDeviceSessionState; +import com.imeeting.grpc.common.ErrorEvent; +import com.imeeting.service.android.AndroidAuthService; +import com.imeeting.service.android.AndroidDeviceSessionService; +import com.imeeting.service.android.AndroidGatewayPushService; +import io.grpc.BindableService; +import io.grpc.stub.StreamObserver; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +public class AndroidGatewayGrpcService extends AndroidGatewayServiceGrpc.AndroidGatewayServiceImplBase implements BindableService { + + private final AndroidAuthService androidAuthService; + private final AndroidDeviceSessionService androidDeviceSessionService; + private final AndroidGatewayPushService androidGatewayPushService; + private final GrpcServerProperties grpcServerProperties; + + @Override + public StreamObserver connect(StreamObserver responseObserver) { + return new StreamObserver<>() { + private String connectionId; + private String deviceId; + private AndroidAuthContext authContext; + + @Override + public void onNext(GatewayClientPacket packet) { + try { + switch (packet.getBodyCase()) { + case HELLO -> handleHello(packet); + case HEARTBEAT -> handleHeartbeat(packet.getHeartbeat()); + case SUBSCRIBE -> handleSubscribe(packet.getSubscribe()); + case ACK, BODY_NOT_SET -> { + } + } + } catch (Exception ex) { + sendError(responseObserver, ex.getMessage()); + } + } + + @Override + public void onError(Throwable t) { + cleanup(); + } + + @Override + public void onCompleted() { + cleanup(); + responseObserver.onCompleted(); + } + + private void handleHello(GatewayClientPacket packet) { + authContext = androidAuthService.authenticateGrpc(packet.getAuth(), packet.getHello().getDeviceId()); + AndroidDeviceSessionState sessionState = androidDeviceSessionService.openSession(authContext, packet.getHello()); + connectionId = sessionState.getConnectionId(); + deviceId = sessionState.getDeviceId(); + androidGatewayPushService.register(connectionId, deviceId, responseObserver); + responseObserver.onNext(GatewayServerPacket.newBuilder() + .setRequestId(packet.getRequestId()) + .setHelloAck(HelloAck.newBuilder() + .setConnectionId(connectionId) + .setAuthMode(authContext.getAuthMode()) + .setServerTime(System.currentTimeMillis()) + .setHeartbeatIntervalSeconds(grpcServerProperties.getGateway().getHeartbeatIntervalSeconds()) + .build()) + .build()); + } + + private void handleHeartbeat(Heartbeat heartbeat) { + if (heartbeat == null || connectionId == null) { + return; + } + AndroidDeviceSessionState state = androidDeviceSessionService.refreshHeartbeat(connectionId, heartbeat.getClientTime()); + if (state == null) { + sendError(responseObserver, "Android device session not found"); + return; + } + responseObserver.onNext(GatewayServerPacket.newBuilder() + .setPong(Pong.newBuilder() + .setConnectionId(connectionId) + .setServerTime(System.currentTimeMillis()) + .build()) + .build()); + } + + private void handleSubscribe(Subscribe subscribe) { + if (deviceId == null) { + return; + } + androidDeviceSessionService.updateTopics(deviceId, subscribe.getTopicsList()); + } + + private void cleanup() { + if (connectionId != null) { + androidGatewayPushService.unregister(connectionId); + androidDeviceSessionService.closeSession(connectionId); + } + } + }; + } + + private void sendError(StreamObserver responseObserver, String message) { + responseObserver.onNext(GatewayServerPacket.newBuilder() + .setError(ErrorEvent.newBuilder() + .setCode("ANDROID_GATEWAY_ERROR") + .setMessage(message == null ? "网关处理失败" : message) + .setRetryable(false) + .build()) + .build()); + } +} diff --git a/backend/src/main/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcService.java b/backend/src/main/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcService.java new file mode 100644 index 0000000..6e3f9c5 --- /dev/null +++ b/backend/src/main/java/com/imeeting/grpc/realtime/RealtimeMeetingGrpcService.java @@ -0,0 +1,76 @@ +package com.imeeting.grpc.realtime; + +import com.imeeting.dto.android.AndroidAuthContext; +import com.imeeting.service.android.AndroidAuthService; +import com.imeeting.service.realtime.RealtimeMeetingGrpcSessionService; +import io.grpc.BindableService; +import io.grpc.stub.StreamObserver; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +public class RealtimeMeetingGrpcService extends RealtimeMeetingServiceGrpc.RealtimeMeetingServiceImplBase implements BindableService { + + private final AndroidAuthService androidAuthService; + private final RealtimeMeetingGrpcSessionService realtimeMeetingGrpcSessionService; + + @Override + public StreamObserver streamMeetingAudio(StreamObserver responseObserver) { + return new StreamObserver<>() { + private String connectionId; + private AndroidAuthContext authContext; + + @Override + public void onNext(RealtimeClientPacket packet) { + switch (packet.getBodyCase()) { + case OPEN -> handleOpen(packet); + case AUDIO -> handleAudio(packet.getAudio()); + case CONTROL -> handleControl(packet.getControl()); + case BODY_NOT_SET -> { + } + } + } + + @Override + public void onError(Throwable t) { + if (connectionId != null) { + realtimeMeetingGrpcSessionService.closeStream(connectionId, "client_error", false); + } + } + + @Override + public void onCompleted() { + if (connectionId != null) { + realtimeMeetingGrpcSessionService.closeStream(connectionId, "client_completed", false); + } else { + responseObserver.onCompleted(); + } + } + + private void handleOpen(RealtimeClientPacket packet) { + authContext = androidAuthService.authenticateGrpc(packet.getAuth(), null); + connectionId = realtimeMeetingGrpcSessionService.openStream(packet.getOpen().getStreamToken(), authContext, responseObserver); + } + + private void handleAudio(AudioChunk audioChunk) { + if (connectionId == null) { + throw new RuntimeException("Realtime gRPC stream is not opened"); + } + realtimeMeetingGrpcSessionService.onAudio(connectionId, audioChunk.getPcm16().toByteArray(), audioChunk.getSeq(), audioChunk.getLastChunk()); + } + + private void handleControl(RealtimeControl control) { + if (connectionId == null) { + return; + } + switch (control.getType()) { + case STOP_SPEAKING, END_INPUT -> realtimeMeetingGrpcSessionService.onStopSpeaking(connectionId); + case CLOSE_STREAM -> realtimeMeetingGrpcSessionService.closeStream(connectionId, "client_close_stream", true); + case START, CONTROL_TYPE_UNSPECIFIED -> { + } + } + } + }; + } +} diff --git a/backend/src/main/java/com/imeeting/service/android/AndroidAuthService.java b/backend/src/main/java/com/imeeting/service/android/AndroidAuthService.java new file mode 100644 index 0000000..5ec8ba5 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/android/AndroidAuthService.java @@ -0,0 +1,11 @@ +package com.imeeting.service.android; + +import com.imeeting.dto.android.AndroidAuthContext; +import com.imeeting.grpc.common.ClientAuth; +import jakarta.servlet.http.HttpServletRequest; + +public interface AndroidAuthService { + AndroidAuthContext authenticateGrpc(ClientAuth auth, String fallbackDeviceId); + + AndroidAuthContext authenticateHttp(HttpServletRequest request); +} diff --git a/backend/src/main/java/com/imeeting/service/android/AndroidDeviceSessionService.java b/backend/src/main/java/com/imeeting/service/android/AndroidDeviceSessionService.java new file mode 100644 index 0000000..2743453 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/android/AndroidDeviceSessionService.java @@ -0,0 +1,19 @@ +package com.imeeting.service.android; + +import com.imeeting.dto.android.AndroidAuthContext; +import com.imeeting.dto.android.AndroidDeviceSessionState; +import com.imeeting.grpc.gateway.DeviceHello; + +import java.util.List; + +public interface AndroidDeviceSessionService { + AndroidDeviceSessionState openSession(AndroidAuthContext authContext, DeviceHello hello); + + AndroidDeviceSessionState refreshHeartbeat(String connectionId, long clientTime); + + AndroidDeviceSessionState getByConnectionId(String connectionId); + + void updateTopics(String deviceId, List topics); + + void closeSession(String connectionId); +} diff --git a/backend/src/main/java/com/imeeting/service/android/AndroidGatewayPushService.java b/backend/src/main/java/com/imeeting/service/android/AndroidGatewayPushService.java new file mode 100644 index 0000000..471d341 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/android/AndroidGatewayPushService.java @@ -0,0 +1,14 @@ +package com.imeeting.service.android; + +import com.imeeting.grpc.gateway.GatewayServerPacket; +import io.grpc.stub.StreamObserver; + +public interface AndroidGatewayPushService { + void register(String connectionId, String deviceId, StreamObserver observer); + + void unregister(String connectionId); + + boolean pushToConnection(String connectionId, GatewayServerPacket packet); + + int pushToDevice(String deviceId, GatewayServerPacket packet); +} diff --git a/backend/src/main/java/com/imeeting/service/android/impl/AndroidAuthServiceImpl.java b/backend/src/main/java/com/imeeting/service/android/impl/AndroidAuthServiceImpl.java new file mode 100644 index 0000000..e0e4f3a --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/android/impl/AndroidAuthServiceImpl.java @@ -0,0 +1,89 @@ +package com.imeeting.service.android.impl; + +import com.imeeting.config.grpc.AndroidGrpcAuthProperties; +import com.imeeting.dto.android.AndroidAuthContext; +import com.imeeting.grpc.common.ClientAuth; +import com.imeeting.service.android.AndroidAuthService; +import jakarta.servlet.http.HttpServletRequest; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +@Service +@RequiredArgsConstructor +public class AndroidAuthServiceImpl implements AndroidAuthService { + + private static final String HEADER_DEVICE_ID = "X-Android-Device-Id"; + private static final String HEADER_TENANT_CODE = "X-Android-Tenant-Code"; + private static final String HEADER_ACCESS_TOKEN = "X-Android-Access-Token"; + private static final String HEADER_APP_ID = "X-Android-App-Id"; + private static final String HEADER_APP_VERSION = "X-Android-App-Version"; + private static final String HEADER_PLATFORM = "X-Android-Platform"; + + private final AndroidGrpcAuthProperties properties; + + @Override + public AndroidAuthContext authenticateGrpc(ClientAuth auth, String fallbackDeviceId) { + ClientAuth.AuthType authType = auth == null ? ClientAuth.AuthType.AUTH_TYPE_UNSPECIFIED : auth.getAuthType(); + if (authType == ClientAuth.AuthType.DEVICE_TOKEN || authType == ClientAuth.AuthType.USER_JWT) { + return buildContext(authType.name(), false, auth.getDeviceId(), auth.getTenantCode(), auth.getAppId(), + auth.getAppVersion(), auth.getPlatform(), auth.getAccessToken(), fallbackDeviceId); + } + if (properties.isEnabled() && !properties.isAllowAnonymous()) { + throw new RuntimeException("Android gRPC auth is required"); + } + return buildContext("NONE", true, + auth == null ? null : auth.getDeviceId(), + auth == null ? null : auth.getTenantCode(), + auth == null ? null : auth.getAppId(), + auth == null ? null : auth.getAppVersion(), + auth == null ? null : auth.getPlatform(), + auth == null ? null : auth.getAccessToken(), + fallbackDeviceId); + } + + @Override + public AndroidAuthContext authenticateHttp(HttpServletRequest request) { + if (properties.isEnabled() && !properties.isAllowAnonymous()) { + String token = request.getHeader(HEADER_ACCESS_TOKEN); + if (!StringUtils.hasText(token)) { + throw new RuntimeException("Android HTTP auth is required"); + } + return buildContext("DEVICE_TOKEN", false, + request.getHeader(HEADER_DEVICE_ID), + request.getHeader(HEADER_TENANT_CODE), + request.getHeader(HEADER_APP_ID), + request.getHeader(HEADER_APP_VERSION), + request.getHeader(HEADER_PLATFORM), + token, + null); + } + return buildContext("NONE", true, + request.getHeader(HEADER_DEVICE_ID), + request.getHeader(HEADER_TENANT_CODE), + request.getHeader(HEADER_APP_ID), + request.getHeader(HEADER_APP_VERSION), + request.getHeader(HEADER_PLATFORM), + request.getHeader(HEADER_ACCESS_TOKEN), + null); + } + + private AndroidAuthContext buildContext(String authMode, boolean anonymous, String deviceId, String tenantCode, + String appId, String appVersion, String platform, String accessToken, + String fallbackDeviceId) { + String resolvedDeviceId = StringUtils.hasText(deviceId) ? deviceId : fallbackDeviceId; + if (!StringUtils.hasText(resolvedDeviceId)) { + throw new RuntimeException("Android deviceId is required"); + } + AndroidAuthContext context = new AndroidAuthContext(); + context.setAuthMode(authMode); + context.setAnonymous(anonymous); + context.setDeviceId(resolvedDeviceId.trim()); + context.setTenantCode(StringUtils.hasText(tenantCode) ? tenantCode.trim() : null); + context.setAppId(StringUtils.hasText(appId) ? appId.trim() : null); + context.setAppVersion(StringUtils.hasText(appVersion) ? appVersion.trim() : null); + context.setPlatform(StringUtils.hasText(platform) ? platform.trim() : "android"); + context.setAccessToken(StringUtils.hasText(accessToken) ? accessToken.trim() : null); + return context; + } +} diff --git a/backend/src/main/java/com/imeeting/service/android/impl/AndroidDeviceSessionServiceImpl.java b/backend/src/main/java/com/imeeting/service/android/impl/AndroidDeviceSessionServiceImpl.java new file mode 100644 index 0000000..ffc7f06 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/android/impl/AndroidDeviceSessionServiceImpl.java @@ -0,0 +1,110 @@ +package com.imeeting.service.android.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.imeeting.common.RedisKeys; +import com.imeeting.config.grpc.GrpcServerProperties; +import com.imeeting.dto.android.AndroidAuthContext; +import com.imeeting.dto.android.AndroidDeviceSessionState; +import com.imeeting.grpc.gateway.DeviceHello; +import com.imeeting.service.android.AndroidDeviceSessionService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; + +@Slf4j +@Service +@RequiredArgsConstructor +public class AndroidDeviceSessionServiceImpl implements AndroidDeviceSessionService { + + private final StringRedisTemplate redisTemplate; + private final ObjectMapper objectMapper; + private final GrpcServerProperties grpcServerProperties; + + @Override + public AndroidDeviceSessionState openSession(AndroidAuthContext authContext, DeviceHello hello) { + AndroidDeviceSessionState state = new AndroidDeviceSessionState(); + state.setConnectionId("android_" + UUID.randomUUID().toString().replace("-", "")); + state.setDeviceId(authContext.getDeviceId()); + state.setStatus("ONLINE"); + state.setLastSeenAt(System.currentTimeMillis()); + state.setAppVersion(nonBlank(hello.getAppVersion(), authContext.getAppVersion())); + state.setPlatform(nonBlank(authContext.getPlatform(), "android")); + state.setTenantCode(authContext.getTenantCode()); + writeState(state); + return state; + } + + @Override + public AndroidDeviceSessionState refreshHeartbeat(String connectionId, long clientTime) { + AndroidDeviceSessionState state = getByConnectionId(connectionId); + if (state == null) { + return null; + } + state.setStatus("ONLINE"); + state.setLastSeenAt(clientTime > 0 ? clientTime : System.currentTimeMillis()); + writeState(state); + return state; + } + + @Override + public AndroidDeviceSessionState getByConnectionId(String connectionId) { + String raw = redisTemplate.opsForValue().get(RedisKeys.androidDeviceConnectionKey(connectionId)); + if (raw == null || raw.isBlank()) { + return null; + } + try { + return objectMapper.readValue(raw, AndroidDeviceSessionState.class); + } catch (Exception ex) { + log.warn("Failed to read android device session, connectionId={}", connectionId, ex); + return null; + } + } + + @Override + public void updateTopics(String deviceId, List topics) { + try { + redisTemplate.opsForValue().set( + RedisKeys.androidDeviceTopicsKey(deviceId), + objectMapper.writeValueAsString(topics == null ? List.of() : topics) + ); + } catch (Exception ex) { + throw new RuntimeException("Failed to update android device topics", ex); + } + } + + @Override + public void closeSession(String connectionId) { + AndroidDeviceSessionState state = getByConnectionId(connectionId); + if (state == null) { + redisTemplate.delete(RedisKeys.androidDeviceConnectionKey(connectionId)); + return; + } + String activeConn = redisTemplate.opsForValue().get(RedisKeys.androidDeviceActiveConnectionKey(state.getDeviceId())); + if (connectionId.equals(activeConn)) { + redisTemplate.delete(RedisKeys.androidDeviceActiveConnectionKey(state.getDeviceId())); + } + redisTemplate.delete(RedisKeys.androidDeviceOnlineKey(state.getDeviceId())); + redisTemplate.delete(RedisKeys.androidDeviceConnectionKey(connectionId)); + } + + private void writeState(AndroidDeviceSessionState state) { + Duration ttl = Duration.ofSeconds(grpcServerProperties.getGateway().getHeartbeatTimeoutSeconds()); + try { + String json = objectMapper.writeValueAsString(state); + redisTemplate.opsForValue().set(RedisKeys.androidDeviceOnlineKey(state.getDeviceId()), json, ttl); + redisTemplate.opsForValue().set(RedisKeys.androidDeviceActiveConnectionKey(state.getDeviceId()), state.getConnectionId(), ttl); + redisTemplate.opsForValue().set(RedisKeys.androidDeviceConnectionKey(state.getConnectionId()), json, ttl); + } catch (Exception ex) { + throw new RuntimeException("Failed to write android device session state", ex); + } + } + + private String nonBlank(String value, String defaultValue) { + return value != null && !value.isBlank() ? value : defaultValue; + } +} diff --git a/backend/src/main/java/com/imeeting/service/android/impl/AndroidGatewayPushServiceImpl.java b/backend/src/main/java/com/imeeting/service/android/impl/AndroidGatewayPushServiceImpl.java new file mode 100644 index 0000000..ed27d63 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/android/impl/AndroidGatewayPushServiceImpl.java @@ -0,0 +1,69 @@ +package com.imeeting.service.android.impl; + +import com.imeeting.grpc.gateway.GatewayServerPacket; +import com.imeeting.service.android.AndroidGatewayPushService; +import io.grpc.stub.StreamObserver; +import org.springframework.stereotype.Service; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +@Service +public class AndroidGatewayPushServiceImpl implements AndroidGatewayPushService { + + private final Map byConnectionId = new ConcurrentHashMap<>(); + private final Map> byDeviceId = new ConcurrentHashMap<>(); + + @Override + public void register(String connectionId, String deviceId, StreamObserver observer) { + byConnectionId.put(connectionId, new Binding(deviceId, observer)); + byDeviceId.computeIfAbsent(deviceId, key -> ConcurrentHashMap.newKeySet()).add(connectionId); + } + + @Override + public void unregister(String connectionId) { + Binding binding = byConnectionId.remove(connectionId); + if (binding == null) { + return; + } + Set connectionIds = byDeviceId.get(binding.deviceId()); + if (connectionIds == null) { + return; + } + connectionIds.remove(connectionId); + if (connectionIds.isEmpty()) { + byDeviceId.remove(binding.deviceId()); + } + } + + @Override + public boolean pushToConnection(String connectionId, GatewayServerPacket packet) { + Binding binding = byConnectionId.get(connectionId); + if (binding == null) { + return false; + } + synchronized (binding) { + binding.observer().onNext(packet); + } + return true; + } + + @Override + public int pushToDevice(String deviceId, GatewayServerPacket packet) { + Set connectionIds = byDeviceId.get(deviceId); + if (connectionIds == null || connectionIds.isEmpty()) { + return 0; + } + int pushed = 0; + for (String connectionId : connectionIds) { + if (pushToConnection(connectionId, packet)) { + pushed++; + } + } + return pushed; + } + + private record Binding(String deviceId, StreamObserver observer) { + } +} diff --git a/backend/src/main/java/com/imeeting/service/realtime/AndroidRealtimeSessionTicketService.java b/backend/src/main/java/com/imeeting/service/realtime/AndroidRealtimeSessionTicketService.java new file mode 100644 index 0000000..095e94e --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/AndroidRealtimeSessionTicketService.java @@ -0,0 +1,12 @@ +package com.imeeting.service.realtime; + +import com.imeeting.dto.android.AndroidAuthContext; +import com.imeeting.dto.android.AndroidOpenRealtimeGrpcSessionCommand; +import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData; +import com.imeeting.dto.android.AndroidRealtimeGrpcSessionVO; + +public interface AndroidRealtimeSessionTicketService { + AndroidRealtimeGrpcSessionVO createSession(Long meetingId, AndroidOpenRealtimeGrpcSessionCommand command, AndroidAuthContext authContext); + + AndroidRealtimeGrpcSessionData getSessionData(String streamToken); +} diff --git a/backend/src/main/java/com/imeeting/service/realtime/AsrUpstreamBridgeService.java b/backend/src/main/java/com/imeeting/service/realtime/AsrUpstreamBridgeService.java new file mode 100644 index 0000000..21a0265 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/AsrUpstreamBridgeService.java @@ -0,0 +1,71 @@ +package com.imeeting.service.realtime; + +import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData; + +public interface AsrUpstreamBridgeService { + + AsrUpstreamSession openSession(AndroidRealtimeGrpcSessionData sessionData, String connectionId, AsrUpstreamEventListener listener); + + interface AsrUpstreamSession { + boolean isReady(); + + void sendAudio(byte[] payload); + + void sendStopSpeaking(); + + void close(String reason); + } + + interface AsrUpstreamEventListener { + void onReady(); + + void onTranscript(AsrTranscriptResult result); + + void onError(String code, String message, boolean retryable); + + void onClosed(String reason); + } + + class AsrTranscriptResult { + private final boolean finalResult; + private final String text; + private final String speakerId; + private final String speakerName; + private final Integer startTime; + private final Integer endTime; + + public AsrTranscriptResult(boolean finalResult, String text, String speakerId, String speakerName, + Integer startTime, Integer endTime) { + this.finalResult = finalResult; + this.text = text; + this.speakerId = speakerId; + this.speakerName = speakerName; + this.startTime = startTime; + this.endTime = endTime; + } + + public boolean isFinalResult() { + return finalResult; + } + + public String getText() { + return text; + } + + public String getSpeakerId() { + return speakerId; + } + + public String getSpeakerName() { + return speakerName; + } + + public Integer getStartTime() { + return startTime; + } + + public Integer getEndTime() { + return endTime; + } + } +} diff --git a/backend/src/main/java/com/imeeting/service/realtime/RealtimeMeetingGrpcSessionService.java b/backend/src/main/java/com/imeeting/service/realtime/RealtimeMeetingGrpcSessionService.java new file mode 100644 index 0000000..b79d6b8 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/RealtimeMeetingGrpcSessionService.java @@ -0,0 +1,15 @@ +package com.imeeting.service.realtime; + +import com.imeeting.dto.android.AndroidAuthContext; +import io.grpc.stub.StreamObserver; +import com.imeeting.grpc.realtime.RealtimeServerPacket; + +public interface RealtimeMeetingGrpcSessionService { + String openStream(String streamToken, AndroidAuthContext authContext, StreamObserver responseObserver); + + void onAudio(String connectionId, byte[] payload, long seq, boolean lastChunk); + + void onStopSpeaking(String connectionId); + + void closeStream(String connectionId, String reason, boolean notifyClient); +} diff --git a/backend/src/main/java/com/imeeting/service/realtime/impl/AndroidRealtimeSessionTicketServiceImpl.java b/backend/src/main/java/com/imeeting/service/realtime/impl/AndroidRealtimeSessionTicketServiceImpl.java new file mode 100644 index 0000000..52d200a --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/impl/AndroidRealtimeSessionTicketServiceImpl.java @@ -0,0 +1,219 @@ +package com.imeeting.service.realtime.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.imeeting.common.RedisKeys; +import com.imeeting.config.grpc.GrpcServerProperties; +import com.imeeting.dto.android.AndroidAuthContext; +import com.imeeting.dto.android.AndroidOpenRealtimeGrpcSessionCommand; +import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData; +import com.imeeting.dto.android.AndroidRealtimeGrpcSessionVO; +import com.imeeting.dto.biz.AiModelVO; +import com.imeeting.dto.biz.RealtimeMeetingResumeConfig; +import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO; +import com.imeeting.entity.biz.Meeting; +import com.imeeting.service.biz.AiModelService; +import com.imeeting.service.biz.MeetingAccessService; +import com.imeeting.service.biz.RealtimeMeetingSessionStateService; +import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService; +import lombok.RequiredArgsConstructor; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +@Service +@RequiredArgsConstructor +public class AndroidRealtimeSessionTicketServiceImpl implements AndroidRealtimeSessionTicketService { + + private final ObjectMapper objectMapper; + private final StringRedisTemplate redisTemplate; + private final MeetingAccessService meetingAccessService; + private final AiModelService aiModelService; + private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; + private final GrpcServerProperties grpcServerProperties; + + @Override + public AndroidRealtimeGrpcSessionVO createSession(Long meetingId, AndroidOpenRealtimeGrpcSessionCommand command, AndroidAuthContext authContext) { + if (meetingId == null) { + throw new RuntimeException("Meeting ID is required"); + } + Meeting meeting = meetingAccessService.requireMeeting(meetingId); + realtimeMeetingSessionStateService.initSessionIfAbsent(meetingId, meeting.getTenantId(), meeting.getCreatorId()); + RealtimeMeetingSessionStatusVO currentStatus = realtimeMeetingSessionStateService.getStatus(meetingId); + RealtimeMeetingResumeConfig currentResumeConfig = currentStatus == null ? null : currentStatus.getResumeConfig(); + + Long asrModelId = firstNonNull(command == null ? null : command.getAsrModelId(), currentResumeConfig == null ? null : currentResumeConfig.getAsrModelId()); + if (asrModelId == null) { + throw new RuntimeException("ASR model ID is required"); + } + + realtimeMeetingSessionStateService.assertCanOpenSession(meetingId); + AiModelVO asrModel = aiModelService.getModelById(asrModelId, "ASR"); + if (asrModel == null) { + throw new RuntimeException("ASR model not found"); + } + String targetWsUrl = resolveWsUrl(asrModel); + if (targetWsUrl == null || targetWsUrl.isBlank()) { + throw new RuntimeException("ASR model WebSocket is not configured"); + } + + RealtimeMeetingResumeConfig resumeConfig = buildResumeConfig(command, currentResumeConfig, asrModelId); + realtimeMeetingSessionStateService.rememberResumeConfig(meetingId, resumeConfig); + currentStatus = realtimeMeetingSessionStateService.getStatus(meetingId); + + Map startMessage = buildStartMessage(asrModel, meetingId, resumeConfig); + AndroidRealtimeGrpcSessionData sessionData = new AndroidRealtimeGrpcSessionData(); + sessionData.setMeetingId(meetingId); + sessionData.setTenantId(meeting.getTenantId()); + sessionData.setUserId(meeting.getCreatorId()); + sessionData.setDeviceId(authContext.getDeviceId()); + sessionData.setAsrModelId(asrModelId); + sessionData.setTargetWsUrl(targetWsUrl); + sessionData.setResumeConfig(resumeConfig); + try { + sessionData.setStartMessageJson(objectMapper.writeValueAsString(startMessage)); + } catch (Exception ex) { + throw new RuntimeException("Failed to serialize realtime start message", ex); + } + + String streamToken = UUID.randomUUID().toString().replace("-", ""); + Duration ttl = Duration.ofSeconds(grpcServerProperties.getRealtime().getSessionTtlSeconds()); + try { + redisTemplate.opsForValue().set( + RedisKeys.realtimeMeetingGrpcSessionKey(streamToken), + objectMapper.writeValueAsString(sessionData), + ttl + ); + } catch (Exception ex) { + throw new RuntimeException("Failed to create realtime gRPC session", ex); + } + + AndroidRealtimeGrpcSessionVO vo = new AndroidRealtimeGrpcSessionVO(); + vo.setMeetingId(meetingId); + vo.setStreamToken(streamToken); + vo.setExpiresInSeconds(ttl.toSeconds()); + vo.setSampleRate(grpcServerProperties.getRealtime().getSampleRate()); + vo.setChannels(grpcServerProperties.getRealtime().getChannels()); + vo.setEncoding(grpcServerProperties.getRealtime().getEncoding()); + vo.setResumeConfig(resumeConfig); + vo.setStatus(currentStatus); + return vo; + } + + @Override + public AndroidRealtimeGrpcSessionData getSessionData(String streamToken) { + if (streamToken == null || streamToken.isBlank()) { + return null; + } + String raw = redisTemplate.opsForValue().get(RedisKeys.realtimeMeetingGrpcSessionKey(streamToken)); + if (raw == null || raw.isBlank()) { + return null; + } + try { + return objectMapper.readValue(raw, AndroidRealtimeGrpcSessionData.class); + } catch (Exception ex) { + throw new RuntimeException("Failed to read realtime gRPC session", ex); + } + } + + private RealtimeMeetingResumeConfig buildResumeConfig(AndroidOpenRealtimeGrpcSessionCommand command, + RealtimeMeetingResumeConfig currentResumeConfig, + Long asrModelId) { + RealtimeMeetingResumeConfig config = new RealtimeMeetingResumeConfig(); + config.setAsrModelId(asrModelId); + config.setMode(nonBlank(command == null ? null : command.getMode(), currentResumeConfig == null ? null : currentResumeConfig.getMode(), "2pass")); + config.setLanguage(nonBlank(command == null ? null : command.getLanguage(), currentResumeConfig == null ? null : currentResumeConfig.getLanguage(), "auto")); + config.setUseSpkId(firstNonNull(command == null ? null : command.getUseSpkId(), currentResumeConfig == null ? null : currentResumeConfig.getUseSpkId(), 1)); + config.setEnablePunctuation(firstNonNull(command == null ? null : command.getEnablePunctuation(), currentResumeConfig == null ? null : currentResumeConfig.getEnablePunctuation(), Boolean.TRUE)); + config.setEnableItn(firstNonNull(command == null ? null : command.getEnableItn(), currentResumeConfig == null ? null : currentResumeConfig.getEnableItn(), Boolean.TRUE)); + config.setEnableTextRefine(firstNonNull(command == null ? null : command.getEnableTextRefine(), currentResumeConfig == null ? null : currentResumeConfig.getEnableTextRefine(), Boolean.FALSE)); + config.setSaveAudio(firstNonNull(command == null ? null : command.getSaveAudio(), currentResumeConfig == null ? null : currentResumeConfig.getSaveAudio(), Boolean.FALSE)); + config.setHotwords(command != null && command.getHotwords() != null ? command.getHotwords() : currentResumeConfig == null ? List.of() : currentResumeConfig.getHotwords()); + return config; + } + + private String resolveWsUrl(AiModelVO model) { + if (model.getWsUrl() != null && !model.getWsUrl().isBlank()) { + return model.getWsUrl(); + } + if (model.getBaseUrl() == null || model.getBaseUrl().isBlank()) { + return ""; + } + return model.getBaseUrl() + .replaceFirst("^http://", "ws://") + .replaceFirst("^https://", "wss://"); + } + + private Map buildStartMessage(AiModelVO model, Long meetingId, RealtimeMeetingResumeConfig resumeConfig) { + Map root = new HashMap<>(); + root.put("type", "start"); + root.put("request_id", "android_" + System.currentTimeMillis() + "_" + meetingId); + root.put("authorization", buildAuthorization(model.getApiKey())); + + Map config = new HashMap<>(); + Map audio = new HashMap<>(); + audio.put("format", "pcm"); + audio.put("sample_rate", grpcServerProperties.getRealtime().getSampleRate()); + audio.put("channels", grpcServerProperties.getRealtime().getChannels()); + config.put("audio", audio); + + Map recognition = new HashMap<>(); + recognition.put("language", nonBlank(resumeConfig.getLanguage(), "auto")); + recognition.put("enable_punctuation", boolOrDefault(resumeConfig.getEnablePunctuation(), true)); + recognition.put("enable_itn", boolOrDefault(resumeConfig.getEnableItn(), true)); + recognition.put("enable_speaker", Integer.valueOf(1).equals(resumeConfig.getUseSpkId())); + recognition.put("enable_two_pass", !"online".equalsIgnoreCase(resumeConfig.getMode())); + recognition.put("enable_text_refine", boolOrDefault(resumeConfig.getEnableTextRefine(), false)); + recognition.put("speaker_threshold", readSpeakerThreshold(model.getMediaConfig())); + recognition.put("hotwords", resumeConfig.getHotwords() == null ? List.of() : resumeConfig.getHotwords()); + config.put("recognition", recognition); + + config.put("model", model.getModelCode()); + config.put("save_audio", boolOrDefault(resumeConfig.getSaveAudio(), false)); + root.put("config", config); + return root; + } + + private String buildAuthorization(String apiKey) { + if (apiKey == null || apiKey.isBlank()) { + return ""; + } + return apiKey.startsWith("Bearer ") ? apiKey : "Bearer " + apiKey; + } + + private Object readSpeakerThreshold(Map mediaConfig) { + return mediaConfig == null ? null : mediaConfig.get("svThreshold"); + } + + private boolean boolOrDefault(Boolean value, boolean defaultValue) { + return value != null ? value : defaultValue; + } + + @SafeVarargs + private T firstNonNull(T... values) { + for (T value : values) { + if (value != null) { + return value; + } + } + return null; + } + + private String nonBlank(String value, String defaultValue) { + return nonBlank(value, defaultValue, null); + } + + private String nonBlank(String value, String fallbackValue, String defaultValue) { + if (value != null && !value.isBlank()) { + return value.trim(); + } + if (fallbackValue != null && !fallbackValue.isBlank()) { + return fallbackValue.trim(); + } + return defaultValue; + } +} diff --git a/backend/src/main/java/com/imeeting/service/realtime/impl/AsrUpstreamBridgeServiceImpl.java b/backend/src/main/java/com/imeeting/service/realtime/impl/AsrUpstreamBridgeServiceImpl.java new file mode 100644 index 0000000..a599452 --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/impl/AsrUpstreamBridgeServiceImpl.java @@ -0,0 +1,262 @@ +package com.imeeting.service.realtime.impl; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData; +import com.imeeting.service.realtime.AsrUpstreamBridgeService; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.WebSocket; +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j +@Service +@RequiredArgsConstructor +public class AsrUpstreamBridgeServiceImpl implements AsrUpstreamBridgeService { + + private final ObjectMapper objectMapper; + + @Override + public AsrUpstreamSession openSession(AndroidRealtimeGrpcSessionData sessionData, String connectionId, + AsrUpstreamEventListener listener) { + BridgeSession session = new BridgeSession(sessionData, connectionId, listener, objectMapper); + session.connect(); + return session; + } + + private static final class BridgeSession implements AsrUpstreamSession { + private final AndroidRealtimeGrpcSessionData sessionData; + private final String connectionId; + private final AsrUpstreamEventListener listener; + private final ObjectMapper objectMapper; + private final HttpClient httpClient = HttpClient.newHttpClient(); + private final Queue pendingAudio = new ConcurrentLinkedQueue<>(); + private final AtomicBoolean ready = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final StringBuilder textBuffer = new StringBuilder(); + private volatile WebSocket webSocket; + private CompletableFuture sendChain = CompletableFuture.completedFuture(null); + + private BridgeSession(AndroidRealtimeGrpcSessionData sessionData, String connectionId, + AsrUpstreamEventListener listener, ObjectMapper objectMapper) { + this.sessionData = sessionData; + this.connectionId = connectionId; + this.listener = listener; + this.objectMapper = objectMapper; + } + + private void connect() { + try { + WebSocket socket = httpClient.newWebSocketBuilder() + .buildAsync(URI.create(sessionData.getTargetWsUrl()), new ListenerImpl()) + .get(); + this.webSocket = socket; + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + listener.onError("REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接第三方识别服务时被中断", true); + } catch (ExecutionException ex) { + listener.onError("REALTIME_UPSTREAM_CONNECT_FAILED", "连接第三方识别服务失败,请检查模型 WebSocket 配置或服务状态", true); + } + } + + @Override + public boolean isReady() { + return ready.get(); + } + + @Override + public void sendAudio(byte[] payload) { + if (payload == null || payload.length == 0 || closed.get()) { + return; + } + if (!ready.get() || webSocket == null) { + pendingAudio.add(payload); + return; + } + sendOrdered(() -> webSocket.sendBinary(ByteBuffer.wrap(payload), true)); + } + + @Override + public void sendStopSpeaking() { + if (closed.get() || webSocket == null) { + return; + } + sendOrdered(() -> webSocket.sendText("{\"is_speaking\":false}", true)); + } + + @Override + public void close(String reason) { + if (!closed.compareAndSet(false, true)) { + return; + } + if (webSocket != null) { + webSocket.sendClose(1000, reason == null ? "" : reason); + } + } + + private void sendOrdered(java.util.function.Supplier> action) { + synchronized (this) { + sendChain = sendChain.exceptionally(ex -> null) + .thenCompose(ignored -> action.get().thenApply(ws -> null)); + } + } + + private void flushPendingAudio() { + byte[] payload; + while ((payload = pendingAudio.poll()) != null) { + sendAudio(payload); + } + } + + private void handleTextMessage(String payload) { + try { + JsonNode root = objectMapper.readTree(payload); + if ((root.hasNonNull("code") || "error".equals(root.path("type").asText())) && root.hasNonNull("message")) { + listener.onError( + root.path("code").asText("REALTIME_UPSTREAM_ERROR"), + root.path("message").asText("第三方识别服务连接异常"), + true + ); + return; + } + AsrTranscriptResult result = normalizeTranscript(root); + if (result != null) { + listener.onTranscript(result); + } + } catch (Exception ex) { + log.debug("Ignore invalid upstream ASR payload, connectionId={}, payload={}", connectionId, payload, ex); + } + } + + private AsrTranscriptResult normalizeTranscript(JsonNode root) { + String type = root.path("type").asText(); + if ("partial".equals(type) || "segment".equals(type)) { + JsonNode data = root.path("data"); + String text = data.path("text").asText(""); + if (text.isBlank()) { + return null; + } + boolean isFinal = "segment".equals(type) || data.path("is_final").asBoolean(false); + String speakerId = textValue(data, "speaker_id"); + String speakerName = textValue(data, "speaker_name"); + if (speakerName == null) { + speakerName = speakerId; + } + Integer startTime = toMs(data.path("start")); + Integer endTime = toMs(data.path("end")); + return new AsrTranscriptResult(isFinal, text, speakerId, speakerName, startTime, endTime); + } + + String text = root.path("text").asText(""); + if (text.isBlank()) { + return null; + } + boolean isFinal = root.path("is_final").asBoolean(false); + JsonNode speaker = root.path("speaker"); + String speakerId = null; + String speakerName = null; + if (speaker.isTextual()) { + speakerId = speaker.asText(); + speakerName = speaker.asText(); + } else if (speaker.isObject()) { + speakerId = textValue(speaker, "user_id"); + if (speakerId == null) { + speakerId = textValue(speaker, "speaker_id"); + } + speakerName = textValue(speaker, "name"); + if (speakerName == null) { + speakerName = speakerId; + } + } + Integer startTime = null; + Integer endTime = null; + JsonNode timestamp = root.path("timestamp"); + if (timestamp.isArray() && timestamp.size() > 0) { + JsonNode first = timestamp.get(0); + JsonNode last = timestamp.get(timestamp.size() - 1); + if (first.isArray() && first.size() > 0) { + startTime = first.get(0).isInt() ? first.get(0).asInt() : null; + } + if (last.isArray() && last.size() > 1) { + endTime = last.get(1).isInt() ? last.get(1).asInt() : null; + } + } + return new AsrTranscriptResult(isFinal, text, speakerId, speakerName, startTime, endTime); + } + + private String textValue(JsonNode node, String fieldName) { + JsonNode target = node.path(fieldName); + if (target.isMissingNode() || target.isNull()) { + return null; + } + String value = target.asText(); + return value == null || value.isBlank() ? null : value; + } + + private Integer toMs(JsonNode node) { + if (!node.isNumber()) { + return null; + } + return (int) Math.round(node.asDouble() * 1000D); + } + + private final class ListenerImpl implements WebSocket.Listener { + @Override + public void onOpen(WebSocket webSocket) { + sendOrdered(() -> webSocket.sendText(sessionData.getStartMessageJson(), true)); + ready.set(true); + flushPendingAudio(); + listener.onReady(); + webSocket.request(1); + } + + @Override + public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { + textBuffer.append(data); + if (last) { + handleTextMessage(textBuffer.toString()); + textBuffer.setLength(0); + } + webSocket.request(1); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletionStage onBinary(WebSocket webSocket, ByteBuffer data, boolean last) { + webSocket.request(1); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { + if (closed.compareAndSet(false, true)) { + listener.onClosed(reason == null || reason.isBlank() ? "upstream_closed" : reason); + } + return CompletableFuture.completedFuture(null); + } + + @Override + public void onError(WebSocket webSocket, Throwable error) { + if (closed.compareAndSet(false, true)) { + listener.onError( + "REALTIME_UPSTREAM_ERROR", + error == null || error.getMessage() == null || error.getMessage().isBlank() + ? "第三方识别服务连接异常" + : "第三方识别服务连接异常: " + error.getMessage(), + true + ); + } + } + } + } +} diff --git a/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImpl.java b/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImpl.java new file mode 100644 index 0000000..28e0e3d --- /dev/null +++ b/backend/src/main/java/com/imeeting/service/realtime/impl/RealtimeMeetingGrpcSessionServiceImpl.java @@ -0,0 +1,278 @@ +package com.imeeting.service.realtime.impl; + +import com.imeeting.common.RedisKeys; +import com.imeeting.config.grpc.GrpcServerProperties; +import com.imeeting.dto.android.AndroidAuthContext; +import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData; +import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO; +import com.imeeting.dto.biz.RealtimeTranscriptItemDTO; +import com.imeeting.grpc.common.ErrorEvent; +import com.imeeting.grpc.realtime.RealtimeServerPacket; +import com.imeeting.grpc.realtime.SessionStatusEvent; +import com.imeeting.grpc.realtime.StreamClosed; +import com.imeeting.grpc.realtime.StreamReady; +import com.imeeting.grpc.realtime.TranscriptEvent; +import com.imeeting.service.biz.MeetingCommandService; +import com.imeeting.service.biz.RealtimeMeetingSessionStateService; +import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService; +import com.imeeting.service.realtime.AsrUpstreamBridgeService; +import com.imeeting.service.realtime.RealtimeMeetingGrpcSessionService; +import io.grpc.stub.StreamObserver; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j +@Service +@RequiredArgsConstructor +public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrpcSessionService { + + private final AndroidRealtimeSessionTicketService ticketService; + private final AsrUpstreamBridgeService asrUpstreamBridgeService; + private final MeetingCommandService meetingCommandService; + private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService; + private final StringRedisTemplate redisTemplate; + private final GrpcServerProperties grpcServerProperties; + + private final ConcurrentMap sessions = new ConcurrentHashMap<>(); + + @Override + public String openStream(String streamToken, AndroidAuthContext authContext, StreamObserver responseObserver) { + AndroidRealtimeGrpcSessionData sessionData = ticketService.getSessionData(streamToken); + if (sessionData == null) { + throw new RuntimeException("Invalid realtime gRPC session token"); + } + if (sessionData.getDeviceId() != null && !sessionData.getDeviceId().isBlank() + && authContext.getDeviceId() != null + && !sessionData.getDeviceId().equals(authContext.getDeviceId())) { + throw new RuntimeException("Realtime gRPC session token does not match deviceId"); + } + + String connectionId = "grpc_" + java.util.UUID.randomUUID().toString().replace("-", ""); + SessionRuntime runtime = new SessionRuntime(connectionId, streamToken, sessionData, responseObserver); + SessionRuntime previous = sessions.putIfAbsent(connectionId, runtime); + if (previous != null) { + throw new RuntimeException("Duplicate realtime gRPC connectionId"); + } + + writeConnectionState(runtime); + runtime.upstreamSession = asrUpstreamBridgeService.openSession(sessionData, connectionId, new UpstreamCallback(runtime)); + return connectionId; + } + + @Override + public void onAudio(String connectionId, byte[] payload, long seq, boolean lastChunk) { + SessionRuntime runtime = sessions.get(connectionId); + if (runtime == null || runtime.closed.get()) { + return; + } + touchConnectionState(runtime); + runtime.upstreamSession.sendAudio(payload); + if (lastChunk) { + runtime.upstreamSession.sendStopSpeaking(); + } + } + + @Override + public void onStopSpeaking(String connectionId) { + SessionRuntime runtime = sessions.get(connectionId); + if (runtime == null || runtime.closed.get()) { + return; + } + touchConnectionState(runtime); + runtime.upstreamSession.sendStopSpeaking(); + } + + @Override + public void closeStream(String connectionId, String reason, boolean notifyClient) { + SessionRuntime runtime = sessions.remove(connectionId); + if (runtime == null) { + return; + } + if (!runtime.closed.compareAndSet(false, true)) { + return; + } + + try { + if (runtime.upstreamSession != null) { + runtime.upstreamSession.close(reason == null ? "closed" : reason); + } + } catch (Exception ex) { + log.warn("Failed to close upstream realtime session, connectionId={}", connectionId, ex); + } + + redisTemplate.delete(RedisKeys.realtimeMeetingGrpcConnectionKey(connectionId)); + realtimeMeetingSessionStateService.pauseByDisconnect(runtime.sessionData.getMeetingId(), connectionId); + + if (notifyClient) { + runtime.send(RealtimeServerPacket.newBuilder() + .setClosed(StreamClosed.newBuilder() + .setMeetingId(runtime.sessionData.getMeetingId()) + .setReason(reason == null ? "closed" : reason) + .build()) + .build()); + } + runtime.complete(); + } + + private void writeConnectionState(SessionRuntime runtime) { + Duration ttl = Duration.ofSeconds(grpcServerProperties.getRealtime().getConnectionTtlSeconds()); + String value = runtime.sessionData.getMeetingId() + ":" + runtime.sessionData.getDeviceId() + ":" + runtime.streamToken; + redisTemplate.opsForValue().set(RedisKeys.realtimeMeetingGrpcConnectionKey(runtime.connectionId), value, ttl); + } + + private void touchConnectionState(SessionRuntime runtime) { + writeConnectionState(runtime); + } + + private long nextEventSeq(Long meetingId) { + Long value = redisTemplate.opsForValue().increment(RedisKeys.realtimeMeetingEventSeqKey(meetingId)); + return value == null ? 1L : value; + } + + private TranscriptEvent.TranscriptType toTranscriptType(boolean finalResult) { + return finalResult ? TranscriptEvent.TranscriptType.FINAL : TranscriptEvent.TranscriptType.PARTIAL; + } + + private SessionStatusEvent buildStatusEvent(Long meetingId) { + RealtimeMeetingSessionStatusVO status = realtimeMeetingSessionStateService.getStatus(meetingId); + if (status == null) { + return SessionStatusEvent.newBuilder().setMeetingId(meetingId).build(); + } + return SessionStatusEvent.newBuilder() + .setMeetingId(meetingId) + .setStatus(nullToEmpty(status.getStatus())) + .setHasTranscript(Boolean.TRUE.equals(status.getHasTranscript())) + .setCanResume(Boolean.TRUE.equals(status.getCanResume())) + .setRemainingSeconds(status.getRemainingSeconds() == null ? 0L : status.getRemainingSeconds()) + .setActiveConnection(Boolean.TRUE.equals(status.getActiveConnection())) + .build(); + } + + private String nullToEmpty(String value) { + return value == null ? "" : value; + } + + private final class UpstreamCallback implements AsrUpstreamBridgeService.AsrUpstreamEventListener { + private final SessionRuntime runtime; + + private UpstreamCallback(SessionRuntime runtime) { + this.runtime = runtime; + } + + @Override + public void onReady() { + if (runtime.closed.get()) { + return; + } + if (!realtimeMeetingSessionStateService.activate(runtime.sessionData.getMeetingId(), runtime.connectionId)) { + runtime.sendError("REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃实时连接,请先关闭旧连接后再继续", false); + closeStream(runtime.connectionId, "active_connection_exists", true); + return; + } + touchConnectionState(runtime); + runtime.send(RealtimeServerPacket.newBuilder() + .setReady(StreamReady.newBuilder() + .setConnectionId(runtime.connectionId) + .setMeetingId(runtime.sessionData.getMeetingId()) + .setServerTime(System.currentTimeMillis()) + .build()) + .build()); + runtime.send(RealtimeServerPacket.newBuilder().setStatus(buildStatusEvent(runtime.sessionData.getMeetingId())).build()); + } + + @Override + public void onTranscript(AsrUpstreamBridgeService.AsrTranscriptResult result) { + if (runtime.closed.get() || result == null || result.getText() == null || result.getText().isBlank()) { + return; + } + if (result.isFinalResult()) { + RealtimeTranscriptItemDTO item = new RealtimeTranscriptItemDTO(); + item.setSpeakerId(result.getSpeakerId()); + item.setSpeakerName(result.getSpeakerName()); + item.setContent(result.getText()); + item.setStartTime(result.getStartTime()); + item.setEndTime(result.getEndTime()); + meetingCommandService.appendRealtimeTranscripts(runtime.sessionData.getMeetingId(), List.of(item)); + } + runtime.send(RealtimeServerPacket.newBuilder() + .setTranscript(TranscriptEvent.newBuilder() + .setMeetingId(runtime.sessionData.getMeetingId()) + .setEventSeq(nextEventSeq(runtime.sessionData.getMeetingId())) + .setType(toTranscriptType(result.isFinalResult())) + .setText(result.getText()) + .setSpeakerId(nullToEmpty(result.getSpeakerId())) + .setSpeakerName(nullToEmpty(result.getSpeakerName())) + .setStartTime(result.getStartTime() == null ? 0 : result.getStartTime()) + .setEndTime(result.getEndTime() == null ? 0 : result.getEndTime()) + .build()) + .build()); + } + + @Override + public void onError(String code, String message, boolean retryable) { + if (runtime.closed.get()) { + return; + } + runtime.sendError(code, message, retryable); + closeStream(runtime.connectionId, code == null ? "upstream_error" : code, true); + } + + @Override + public void onClosed(String reason) { + closeStream(runtime.connectionId, reason == null ? "upstream_closed" : reason, true); + } + } + + private static final class SessionRuntime { + private final String connectionId; + private final String streamToken; + private final AndroidRealtimeGrpcSessionData sessionData; + private final StreamObserver responseObserver; + private final AtomicBoolean closed = new AtomicBoolean(false); + private AsrUpstreamBridgeService.AsrUpstreamSession upstreamSession; + + private SessionRuntime(String connectionId, String streamToken, AndroidRealtimeGrpcSessionData sessionData, + StreamObserver responseObserver) { + this.connectionId = connectionId; + this.streamToken = streamToken; + this.sessionData = sessionData; + this.responseObserver = responseObserver; + } + + private void send(RealtimeServerPacket packet) { + try { + synchronized (responseObserver) { + responseObserver.onNext(packet); + } + } catch (Exception ignored) { + // ignore downstream delivery failure + } + } + + private void sendError(String code, String message, boolean retryable) { + send(RealtimeServerPacket.newBuilder() + .setError(ErrorEvent.newBuilder() + .setCode(code == null ? "REALTIME_ERROR" : code) + .setMessage(message == null ? "未知错误" : message) + .setRetryable(retryable) + .build()) + .build()); + } + + private void complete() { + try { + responseObserver.onCompleted(); + } catch (Exception ignored) { + // ignore observer completion failure + } + } + } +} diff --git a/backend/src/main/proto/android/common.proto b/backend/src/main/proto/android/common.proto new file mode 100644 index 0000000..20ae3ac --- /dev/null +++ b/backend/src/main/proto/android/common.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; + +package imeeting.android.common; + +option java_multiple_files = true; +option java_package = "com.imeeting.grpc.common"; +option java_outer_classname = "AndroidCommonProto"; + +message ClientAuth { + enum AuthType { + AUTH_TYPE_UNSPECIFIED = 0; + NONE = 1; + DEVICE_TOKEN = 2; + USER_JWT = 3; + STREAM_TOKEN = 4; + } + + AuthType auth_type = 1; + string access_token = 2; + string device_id = 3; + string tenant_code = 4; + string app_id = 5; + string app_version = 6; + string platform = 7; +} + +message ErrorEvent { + string code = 1; + string message = 2; + bool retryable = 3; +} + +message JsonPayload { + string topic = 1; + string json = 2; +} diff --git a/backend/src/main/proto/android/gateway.proto b/backend/src/main/proto/android/gateway.proto new file mode 100644 index 0000000..10de1b8 --- /dev/null +++ b/backend/src/main/proto/android/gateway.proto @@ -0,0 +1,85 @@ +syntax = "proto3"; + +package imeeting.android.gateway; + +option java_multiple_files = true; +option java_package = "com.imeeting.grpc.gateway"; +option java_outer_classname = "AndroidGatewayProto"; + +import "android/common.proto"; + +service AndroidGatewayService { + rpc Connect(stream GatewayClientPacket) returns (stream GatewayServerPacket); +} + +message GatewayClientPacket { + string request_id = 1; + imeeting.android.common.ClientAuth auth = 2; + + oneof body { + DeviceHello hello = 10; + Heartbeat heartbeat = 11; + Ack ack = 12; + Subscribe subscribe = 13; + } +} + +message GatewayServerPacket { + string request_id = 1; + + oneof body { + HelloAck hello_ack = 10; + ServerPush push = 11; + DevicePresenceEvent device_presence = 12; + imeeting.android.common.ErrorEvent error = 13; + Pong pong = 14; + } +} + +message DeviceHello { + string device_id = 1; + string device_name = 2; + string device_model = 3; + string os_version = 4; + string app_version = 5; + string network_type = 6; +} + +message HelloAck { + string connection_id = 1; + string auth_mode = 2; + int64 server_time = 3; + int64 heartbeat_interval_seconds = 4; +} + +message Heartbeat { + string connection_id = 1; + int64 client_time = 2; +} + +message Pong { + string connection_id = 1; + int64 server_time = 2; +} + +message Ack { + string message_id = 1; +} + +message Subscribe { + repeated string topics = 1; +} + +message ServerPush { + string message_id = 1; + string topic = 2; + string type = 3; + string json = 4; + int64 server_time = 5; +} + +message DevicePresenceEvent { + string device_id = 1; + string status = 2; + int64 last_seen_time = 3; +} diff --git a/backend/src/main/proto/android/realtime_meeting.proto b/backend/src/main/proto/android/realtime_meeting.proto new file mode 100644 index 0000000..9f6f8ec --- /dev/null +++ b/backend/src/main/proto/android/realtime_meeting.proto @@ -0,0 +1,100 @@ +syntax = "proto3"; + +package imeeting.android.realtime; + +option java_multiple_files = true; +option java_package = "com.imeeting.grpc.realtime"; +option java_outer_classname = "RealtimeMeetingProto"; + +import "android/common.proto"; + +service RealtimeMeetingService { + rpc StreamMeetingAudio(stream RealtimeClientPacket) returns (stream RealtimeServerPacket); +} + +message RealtimeClientPacket { + string request_id = 1; + imeeting.android.common.ClientAuth auth = 2; + + oneof body { + OpenMeetingStream open = 10; + AudioChunk audio = 11; + RealtimeControl control = 12; + } +} + +message RealtimeServerPacket { + string request_id = 1; + + oneof body { + StreamReady ready = 10; + TranscriptEvent transcript = 11; + SessionStatusEvent status = 12; + imeeting.android.common.ErrorEvent error = 13; + StreamClosed closed = 14; + } +} + +message OpenMeetingStream { + string stream_token = 1; + int64 meeting_id = 2; + int32 sample_rate = 3; + int32 channels = 4; + string encoding = 5; +} + +message AudioChunk { + bytes pcm16 = 1; + int64 seq = 2; + int64 client_time = 3; + bool last_chunk = 4; +} + +message RealtimeControl { + enum ControlType { + CONTROL_TYPE_UNSPECIFIED = 0; + START = 1; + STOP_SPEAKING = 2; + END_INPUT = 3; + CLOSE_STREAM = 4; + } + + ControlType type = 1; +} + +message StreamReady { + string connection_id = 1; + int64 meeting_id = 2; + int64 server_time = 3; +} + +message TranscriptEvent { + enum TranscriptType { + TRANSCRIPT_TYPE_UNSPECIFIED = 0; + PARTIAL = 1; + FINAL = 2; + } + + TranscriptType type = 1; + int64 meeting_id = 2; + int64 event_seq = 3; + string text = 4; + string speaker_id = 5; + string speaker_name = 6; + int32 start_time = 7; + int32 end_time = 8; +} + +message SessionStatusEvent { + int64 meeting_id = 1; + string status = 2; + bool has_transcript = 3; + bool can_resume = 4; + int64 remaining_seconds = 5; + bool active_connection = 6; +} + +message StreamClosed { + int64 meeting_id = 1; + string reason = 2; +} diff --git a/backend/src/main/resources/application.yml b/backend/src/main/resources/application.yml index ea259be..132fd57 100644 --- a/backend/src/main/resources/application.yml +++ b/backend/src/main/resources/application.yml @@ -1,4 +1,4 @@ -server: +server: port: ${SERVER_PORT:8080} spring: @@ -59,3 +59,21 @@ imeeting: resume-window-minutes: 30 empty-session-retention-minutes: 720 redis-expire-listener-enabled: true + grpc: + enabled: true + port: 19090 + max-inbound-message-size: 4194304 + reflection-enabled: true + gateway: + heartbeat-interval-seconds: 15 + heartbeat-timeout-seconds: 45 + realtime: + session-ttl-seconds: 600 + sample-rate: 16000 + channels: 1 + encoding: PCM16LE + connection-ttl-seconds: 1800 + auth: + enabled: false + allow-anonymous: true +