feat: 添加Android实时会议gRPC服务和相关实现

- 定义 `common.proto` 和 `realtime_meeting.proto`,包含客户端认证、错误事件和实时会议相关的gRPC消息
- 实现 `AndroidGatewayPushServiceImpl`,处理设备注册、注销和消息推送
- 实现 `AndroidDeviceSessionServiceImpl`,管理设备会话状态和心跳更新
- 实现 `AsrUpstreamBridgeServiceImpl`,桥接ASR上游服务并处理音频流
- 实现 `AndroidRealtimeSessionTicketServiceImpl`,创建和获取实时会议会话票据
- 定义 `gateway.proto`,包含网关服务的消息定义
dev_na
chenhao 2026-04-01 09:27:30 +08:00
parent 3a7baa0341
commit f0d63c97a3
30 changed files with 1965 additions and 6 deletions

View File

@ -1,4 +1,4 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@ -20,6 +20,10 @@
<mybatis-plus.version>3.5.6</mybatis-plus.version>
<jjwt.version>0.11.5</jjwt.version>
<easycaptcha.version>1.6.2</easycaptcha.version>
<grpc.version>1.76.1</grpc.version>
<protobuf.version>3.25.8</protobuf.version>
<protobuf.plugin.version>0.6.1</protobuf.plugin.version>
<os.maven.plugin.version>1.7.1</os.maven.plugin.version>
</properties>
<dependencies>
@ -88,6 +92,26 @@
<artifactId>pinyin4j</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
@ -133,14 +157,39 @@
<artifactId>unisbase-spring-boot-starter</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>${os.maven.plugin.version}</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf.plugin.version}</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>

View File

@ -71,6 +71,42 @@ public final class RedisKeys {
return "biz:meeting:realtime:empty-timeout:";
}
public static String androidDeviceOnlineKey(String deviceId) {
return "biz:android:device:online:" + deviceId;
}
public static String androidDeviceActiveConnectionKey(String deviceId) {
return "biz:android:device:active-conn:" + deviceId;
}
public static String androidDeviceConnectionKey(String connectionId) {
return "biz:android:device:conn:" + connectionId;
}
public static String androidDeviceTopicsKey(String deviceId) {
return "biz:android:device:topics:" + deviceId;
}
public static String androidDeviceOutboxKey(String deviceId) {
return "biz:android:device:outbox:" + deviceId;
}
public static String androidDeviceMessageSeqKey(String deviceId) {
return "biz:android:device:message-seq:" + deviceId;
}
public static String realtimeMeetingGrpcSessionKey(String streamToken) {
return "biz:meeting:realtime:grpc-session:" + streamToken;
}
public static String realtimeMeetingGrpcConnectionKey(String connectionId) {
return "biz:meeting:realtime:grpc-conn:" + connectionId;
}
public static String realtimeMeetingEventSeqKey(Long meetingId) {
return "biz:meeting:realtime:event-seq:" + meetingId;
}
public static final String CACHE_EMPTY_MARKER = "EMPTY_MARKER";
public static final String SYS_PARAM_FIELD_VALUE = "value";
public static final String SYS_PARAM_FIELD_TYPE = "type";

View File

@ -0,0 +1,21 @@
package com.imeeting.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configurers.AbstractHttpConfigurer;
import org.springframework.security.web.SecurityFilterChain;
@Configuration
public class AndroidApiSecurityConfig {
@Bean
@Order(1)
public SecurityFilterChain androidApiSecurityFilterChain(HttpSecurity http) throws Exception {
http.securityMatcher("/api/android/**")
.csrf(AbstractHttpConfigurer::disable)
.authorizeHttpRequests(authorize -> authorize.anyRequest().permitAll());
return http.build();
}
}

View File

@ -0,0 +1,12 @@
package com.imeeting.config.grpc;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "imeeting.grpc.auth")
public class AndroidGrpcAuthProperties {
private boolean enabled = false;
private boolean allowAnonymous = true;
}

View File

@ -0,0 +1,54 @@
package com.imeeting.config.grpc;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.protobuf.services.ProtoReflectionService;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
@EnableConfigurationProperties({GrpcServerProperties.class, AndroidGrpcAuthProperties.class})
public class GrpcServerLifecycle {
private final GrpcServerProperties properties;
private final List<BindableService> bindableServices;
private Server server;
@PostConstruct
public void start() throws IOException {
if (!properties.isEnabled()) {
log.info("gRPC server is disabled by configuration");
return;
}
NettyServerBuilder builder = NettyServerBuilder.forPort(properties.getPort())
.maxInboundMessageSize(properties.getMaxInboundMessageSize());
bindableServices.forEach(builder::addService);
if (properties.isReflectionEnabled()) {
builder.addService(ProtoReflectionService.newInstance());
}
server = builder.build();
server.start();
log.info("gRPC server started on port {} with {} services", properties.getPort(), bindableServices.size());
}
@PreDestroy
public void stop() {
if (server == null) {
return;
}
log.info("Stopping gRPC server");
server.shutdown();
}
}

View File

@ -0,0 +1,31 @@
package com.imeeting.config.grpc;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "imeeting.grpc")
public class GrpcServerProperties {
private boolean enabled = true;
private int port = 19090;
private int maxInboundMessageSize = 4194304;
private boolean reflectionEnabled = true;
private Gateway gateway = new Gateway();
private Realtime realtime = new Realtime();
@Data
public static class Gateway {
private long heartbeatIntervalSeconds = 15;
private long heartbeatTimeoutSeconds = 45;
}
@Data
public static class Realtime {
private long sessionTtlSeconds = 600;
private int sampleRate = 16000;
private int channels = 1;
private String encoding = "PCM16LE";
private long connectionTtlSeconds = 1800;
}
}

View File

@ -0,0 +1,75 @@
package com.imeeting.controller.android;
import com.imeeting.dto.android.AndroidOpenRealtimeGrpcSessionCommand;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionVO;
import com.imeeting.dto.biz.MeetingTranscriptVO;
import com.imeeting.dto.biz.RealtimeMeetingCompleteDTO;
import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO;
import com.imeeting.service.android.AndroidAuthService;
import com.imeeting.service.biz.MeetingAccessService;
import com.imeeting.service.biz.MeetingCommandService;
import com.imeeting.service.biz.MeetingQueryService;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService;
import com.unisbase.common.ApiResponse;
import jakarta.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/api/android/meeting")
@RequiredArgsConstructor
public class AndroidMeetingRealtimeController {
private final AndroidAuthService androidAuthService;
private final MeetingAccessService meetingAccessService;
private final MeetingQueryService meetingQueryService;
private final MeetingCommandService meetingCommandService;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final AndroidRealtimeSessionTicketService androidRealtimeSessionTicketService;
@GetMapping("/{id}/realtime/session-status")
public ApiResponse<RealtimeMeetingSessionStatusVO> getRealtimeSessionStatus(@PathVariable Long id, HttpServletRequest request) {
androidAuthService.authenticateHttp(request);
meetingAccessService.requireMeeting(id);
return ApiResponse.ok(realtimeMeetingSessionStateService.getStatus(id));
}
@GetMapping("/{id}/transcripts")
public ApiResponse<List<MeetingTranscriptVO>> getTranscripts(@PathVariable Long id, HttpServletRequest request) {
androidAuthService.authenticateHttp(request);
meetingAccessService.requireMeeting(id);
return ApiResponse.ok(meetingQueryService.getTranscripts(id));
}
@PostMapping("/{id}/realtime/pause")
public ApiResponse<RealtimeMeetingSessionStatusVO> pauseRealtimeMeeting(@PathVariable Long id, HttpServletRequest request) {
androidAuthService.authenticateHttp(request);
meetingAccessService.requireMeeting(id);
return ApiResponse.ok(realtimeMeetingSessionStateService.pause(id));
}
@PostMapping("/{id}/realtime/complete")
public ApiResponse<Boolean> completeRealtimeMeeting(@PathVariable Long id,
HttpServletRequest request,
@RequestBody(required = false) RealtimeMeetingCompleteDTO dto) {
androidAuthService.authenticateHttp(request);
meetingAccessService.requireMeeting(id);
meetingCommandService.completeRealtimeMeeting(id, dto != null ? dto.getAudioUrl() : null);
return ApiResponse.ok(true);
}
@PostMapping("/{id}/realtime/grpc-session")
public ApiResponse<AndroidRealtimeGrpcSessionVO> openRealtimeGrpcSession(@PathVariable Long id,
HttpServletRequest request,
@RequestBody(required = false) AndroidOpenRealtimeGrpcSessionCommand command) {
return ApiResponse.ok(androidRealtimeSessionTicketService.createSession(id, command, androidAuthService.authenticateHttp(request)));
}
}

View File

@ -0,0 +1,15 @@
package com.imeeting.dto.android;
import lombok.Data;
@Data
public class AndroidAuthContext {
private String authMode;
private String deviceId;
private String tenantCode;
private String appId;
private String appVersion;
private String platform;
private String accessToken;
private boolean anonymous;
}

View File

@ -0,0 +1,14 @@
package com.imeeting.dto.android;
import lombok.Data;
@Data
public class AndroidDeviceSessionState {
private String connectionId;
private String deviceId;
private String status;
private Long lastSeenAt;
private String appVersion;
private String platform;
private String tenantCode;
}

View File

@ -0,0 +1,19 @@
package com.imeeting.dto.android;
import lombok.Data;
import java.util.List;
import java.util.Map;
@Data
public class AndroidOpenRealtimeGrpcSessionCommand {
private Long asrModelId;
private String mode;
private String language;
private Integer useSpkId;
private Boolean enablePunctuation;
private Boolean enableItn;
private Boolean enableTextRefine;
private Boolean saveAudio;
private List<Map<String, Object>> hotwords;
}

View File

@ -0,0 +1,16 @@
package com.imeeting.dto.android;
import com.imeeting.dto.biz.RealtimeMeetingResumeConfig;
import lombok.Data;
@Data
public class AndroidRealtimeGrpcSessionData {
private Long meetingId;
private Long tenantId;
private Long userId;
private String deviceId;
private Long asrModelId;
private String targetWsUrl;
private String startMessageJson;
private RealtimeMeetingResumeConfig resumeConfig;
}

View File

@ -0,0 +1,17 @@
package com.imeeting.dto.android;
import com.imeeting.dto.biz.RealtimeMeetingResumeConfig;
import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO;
import lombok.Data;
@Data
public class AndroidRealtimeGrpcSessionVO {
private Long meetingId;
private String streamToken;
private Long expiresInSeconds;
private Integer sampleRate;
private Integer channels;
private String encoding;
private RealtimeMeetingResumeConfig resumeConfig;
private RealtimeMeetingSessionStatusVO status;
}

View File

@ -0,0 +1,116 @@
package com.imeeting.grpc.gateway;
import com.imeeting.config.grpc.GrpcServerProperties;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidDeviceSessionState;
import com.imeeting.grpc.common.ErrorEvent;
import com.imeeting.service.android.AndroidAuthService;
import com.imeeting.service.android.AndroidDeviceSessionService;
import com.imeeting.service.android.AndroidGatewayPushService;
import io.grpc.BindableService;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class AndroidGatewayGrpcService extends AndroidGatewayServiceGrpc.AndroidGatewayServiceImplBase implements BindableService {
private final AndroidAuthService androidAuthService;
private final AndroidDeviceSessionService androidDeviceSessionService;
private final AndroidGatewayPushService androidGatewayPushService;
private final GrpcServerProperties grpcServerProperties;
@Override
public StreamObserver<GatewayClientPacket> connect(StreamObserver<GatewayServerPacket> responseObserver) {
return new StreamObserver<>() {
private String connectionId;
private String deviceId;
private AndroidAuthContext authContext;
@Override
public void onNext(GatewayClientPacket packet) {
try {
switch (packet.getBodyCase()) {
case HELLO -> handleHello(packet);
case HEARTBEAT -> handleHeartbeat(packet.getHeartbeat());
case SUBSCRIBE -> handleSubscribe(packet.getSubscribe());
case ACK, BODY_NOT_SET -> {
}
}
} catch (Exception ex) {
sendError(responseObserver, ex.getMessage());
}
}
@Override
public void onError(Throwable t) {
cleanup();
}
@Override
public void onCompleted() {
cleanup();
responseObserver.onCompleted();
}
private void handleHello(GatewayClientPacket packet) {
authContext = androidAuthService.authenticateGrpc(packet.getAuth(), packet.getHello().getDeviceId());
AndroidDeviceSessionState sessionState = androidDeviceSessionService.openSession(authContext, packet.getHello());
connectionId = sessionState.getConnectionId();
deviceId = sessionState.getDeviceId();
androidGatewayPushService.register(connectionId, deviceId, responseObserver);
responseObserver.onNext(GatewayServerPacket.newBuilder()
.setRequestId(packet.getRequestId())
.setHelloAck(HelloAck.newBuilder()
.setConnectionId(connectionId)
.setAuthMode(authContext.getAuthMode())
.setServerTime(System.currentTimeMillis())
.setHeartbeatIntervalSeconds(grpcServerProperties.getGateway().getHeartbeatIntervalSeconds())
.build())
.build());
}
private void handleHeartbeat(Heartbeat heartbeat) {
if (heartbeat == null || connectionId == null) {
return;
}
AndroidDeviceSessionState state = androidDeviceSessionService.refreshHeartbeat(connectionId, heartbeat.getClientTime());
if (state == null) {
sendError(responseObserver, "Android device session not found");
return;
}
responseObserver.onNext(GatewayServerPacket.newBuilder()
.setPong(Pong.newBuilder()
.setConnectionId(connectionId)
.setServerTime(System.currentTimeMillis())
.build())
.build());
}
private void handleSubscribe(Subscribe subscribe) {
if (deviceId == null) {
return;
}
androidDeviceSessionService.updateTopics(deviceId, subscribe.getTopicsList());
}
private void cleanup() {
if (connectionId != null) {
androidGatewayPushService.unregister(connectionId);
androidDeviceSessionService.closeSession(connectionId);
}
}
};
}
private void sendError(StreamObserver<GatewayServerPacket> responseObserver, String message) {
responseObserver.onNext(GatewayServerPacket.newBuilder()
.setError(ErrorEvent.newBuilder()
.setCode("ANDROID_GATEWAY_ERROR")
.setMessage(message == null ? "网关处理失败" : message)
.setRetryable(false)
.build())
.build());
}
}

View File

@ -0,0 +1,76 @@
package com.imeeting.grpc.realtime;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.service.android.AndroidAuthService;
import com.imeeting.service.realtime.RealtimeMeetingGrpcSessionService;
import io.grpc.BindableService;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class RealtimeMeetingGrpcService extends RealtimeMeetingServiceGrpc.RealtimeMeetingServiceImplBase implements BindableService {
private final AndroidAuthService androidAuthService;
private final RealtimeMeetingGrpcSessionService realtimeMeetingGrpcSessionService;
@Override
public StreamObserver<RealtimeClientPacket> streamMeetingAudio(StreamObserver<RealtimeServerPacket> responseObserver) {
return new StreamObserver<>() {
private String connectionId;
private AndroidAuthContext authContext;
@Override
public void onNext(RealtimeClientPacket packet) {
switch (packet.getBodyCase()) {
case OPEN -> handleOpen(packet);
case AUDIO -> handleAudio(packet.getAudio());
case CONTROL -> handleControl(packet.getControl());
case BODY_NOT_SET -> {
}
}
}
@Override
public void onError(Throwable t) {
if (connectionId != null) {
realtimeMeetingGrpcSessionService.closeStream(connectionId, "client_error", false);
}
}
@Override
public void onCompleted() {
if (connectionId != null) {
realtimeMeetingGrpcSessionService.closeStream(connectionId, "client_completed", false);
} else {
responseObserver.onCompleted();
}
}
private void handleOpen(RealtimeClientPacket packet) {
authContext = androidAuthService.authenticateGrpc(packet.getAuth(), null);
connectionId = realtimeMeetingGrpcSessionService.openStream(packet.getOpen().getStreamToken(), authContext, responseObserver);
}
private void handleAudio(AudioChunk audioChunk) {
if (connectionId == null) {
throw new RuntimeException("Realtime gRPC stream is not opened");
}
realtimeMeetingGrpcSessionService.onAudio(connectionId, audioChunk.getPcm16().toByteArray(), audioChunk.getSeq(), audioChunk.getLastChunk());
}
private void handleControl(RealtimeControl control) {
if (connectionId == null) {
return;
}
switch (control.getType()) {
case STOP_SPEAKING, END_INPUT -> realtimeMeetingGrpcSessionService.onStopSpeaking(connectionId);
case CLOSE_STREAM -> realtimeMeetingGrpcSessionService.closeStream(connectionId, "client_close_stream", true);
case START, CONTROL_TYPE_UNSPECIFIED -> {
}
}
}
};
}
}

View File

@ -0,0 +1,11 @@
package com.imeeting.service.android;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.grpc.common.ClientAuth;
import jakarta.servlet.http.HttpServletRequest;
public interface AndroidAuthService {
AndroidAuthContext authenticateGrpc(ClientAuth auth, String fallbackDeviceId);
AndroidAuthContext authenticateHttp(HttpServletRequest request);
}

View File

@ -0,0 +1,19 @@
package com.imeeting.service.android;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidDeviceSessionState;
import com.imeeting.grpc.gateway.DeviceHello;
import java.util.List;
public interface AndroidDeviceSessionService {
AndroidDeviceSessionState openSession(AndroidAuthContext authContext, DeviceHello hello);
AndroidDeviceSessionState refreshHeartbeat(String connectionId, long clientTime);
AndroidDeviceSessionState getByConnectionId(String connectionId);
void updateTopics(String deviceId, List<String> topics);
void closeSession(String connectionId);
}

View File

@ -0,0 +1,14 @@
package com.imeeting.service.android;
import com.imeeting.grpc.gateway.GatewayServerPacket;
import io.grpc.stub.StreamObserver;
public interface AndroidGatewayPushService {
void register(String connectionId, String deviceId, StreamObserver<GatewayServerPacket> observer);
void unregister(String connectionId);
boolean pushToConnection(String connectionId, GatewayServerPacket packet);
int pushToDevice(String deviceId, GatewayServerPacket packet);
}

View File

@ -0,0 +1,89 @@
package com.imeeting.service.android.impl;
import com.imeeting.config.grpc.AndroidGrpcAuthProperties;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.grpc.common.ClientAuth;
import com.imeeting.service.android.AndroidAuthService;
import jakarta.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
@Service
@RequiredArgsConstructor
public class AndroidAuthServiceImpl implements AndroidAuthService {
private static final String HEADER_DEVICE_ID = "X-Android-Device-Id";
private static final String HEADER_TENANT_CODE = "X-Android-Tenant-Code";
private static final String HEADER_ACCESS_TOKEN = "X-Android-Access-Token";
private static final String HEADER_APP_ID = "X-Android-App-Id";
private static final String HEADER_APP_VERSION = "X-Android-App-Version";
private static final String HEADER_PLATFORM = "X-Android-Platform";
private final AndroidGrpcAuthProperties properties;
@Override
public AndroidAuthContext authenticateGrpc(ClientAuth auth, String fallbackDeviceId) {
ClientAuth.AuthType authType = auth == null ? ClientAuth.AuthType.AUTH_TYPE_UNSPECIFIED : auth.getAuthType();
if (authType == ClientAuth.AuthType.DEVICE_TOKEN || authType == ClientAuth.AuthType.USER_JWT) {
return buildContext(authType.name(), false, auth.getDeviceId(), auth.getTenantCode(), auth.getAppId(),
auth.getAppVersion(), auth.getPlatform(), auth.getAccessToken(), fallbackDeviceId);
}
if (properties.isEnabled() && !properties.isAllowAnonymous()) {
throw new RuntimeException("Android gRPC auth is required");
}
return buildContext("NONE", true,
auth == null ? null : auth.getDeviceId(),
auth == null ? null : auth.getTenantCode(),
auth == null ? null : auth.getAppId(),
auth == null ? null : auth.getAppVersion(),
auth == null ? null : auth.getPlatform(),
auth == null ? null : auth.getAccessToken(),
fallbackDeviceId);
}
@Override
public AndroidAuthContext authenticateHttp(HttpServletRequest request) {
if (properties.isEnabled() && !properties.isAllowAnonymous()) {
String token = request.getHeader(HEADER_ACCESS_TOKEN);
if (!StringUtils.hasText(token)) {
throw new RuntimeException("Android HTTP auth is required");
}
return buildContext("DEVICE_TOKEN", false,
request.getHeader(HEADER_DEVICE_ID),
request.getHeader(HEADER_TENANT_CODE),
request.getHeader(HEADER_APP_ID),
request.getHeader(HEADER_APP_VERSION),
request.getHeader(HEADER_PLATFORM),
token,
null);
}
return buildContext("NONE", true,
request.getHeader(HEADER_DEVICE_ID),
request.getHeader(HEADER_TENANT_CODE),
request.getHeader(HEADER_APP_ID),
request.getHeader(HEADER_APP_VERSION),
request.getHeader(HEADER_PLATFORM),
request.getHeader(HEADER_ACCESS_TOKEN),
null);
}
private AndroidAuthContext buildContext(String authMode, boolean anonymous, String deviceId, String tenantCode,
String appId, String appVersion, String platform, String accessToken,
String fallbackDeviceId) {
String resolvedDeviceId = StringUtils.hasText(deviceId) ? deviceId : fallbackDeviceId;
if (!StringUtils.hasText(resolvedDeviceId)) {
throw new RuntimeException("Android deviceId is required");
}
AndroidAuthContext context = new AndroidAuthContext();
context.setAuthMode(authMode);
context.setAnonymous(anonymous);
context.setDeviceId(resolvedDeviceId.trim());
context.setTenantCode(StringUtils.hasText(tenantCode) ? tenantCode.trim() : null);
context.setAppId(StringUtils.hasText(appId) ? appId.trim() : null);
context.setAppVersion(StringUtils.hasText(appVersion) ? appVersion.trim() : null);
context.setPlatform(StringUtils.hasText(platform) ? platform.trim() : "android");
context.setAccessToken(StringUtils.hasText(accessToken) ? accessToken.trim() : null);
return context;
}
}

View File

@ -0,0 +1,110 @@
package com.imeeting.service.android.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.imeeting.common.RedisKeys;
import com.imeeting.config.grpc.GrpcServerProperties;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidDeviceSessionState;
import com.imeeting.grpc.gateway.DeviceHello;
import com.imeeting.service.android.AndroidDeviceSessionService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
@Slf4j
@Service
@RequiredArgsConstructor
public class AndroidDeviceSessionServiceImpl implements AndroidDeviceSessionService {
private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
private final GrpcServerProperties grpcServerProperties;
@Override
public AndroidDeviceSessionState openSession(AndroidAuthContext authContext, DeviceHello hello) {
AndroidDeviceSessionState state = new AndroidDeviceSessionState();
state.setConnectionId("android_" + UUID.randomUUID().toString().replace("-", ""));
state.setDeviceId(authContext.getDeviceId());
state.setStatus("ONLINE");
state.setLastSeenAt(System.currentTimeMillis());
state.setAppVersion(nonBlank(hello.getAppVersion(), authContext.getAppVersion()));
state.setPlatform(nonBlank(authContext.getPlatform(), "android"));
state.setTenantCode(authContext.getTenantCode());
writeState(state);
return state;
}
@Override
public AndroidDeviceSessionState refreshHeartbeat(String connectionId, long clientTime) {
AndroidDeviceSessionState state = getByConnectionId(connectionId);
if (state == null) {
return null;
}
state.setStatus("ONLINE");
state.setLastSeenAt(clientTime > 0 ? clientTime : System.currentTimeMillis());
writeState(state);
return state;
}
@Override
public AndroidDeviceSessionState getByConnectionId(String connectionId) {
String raw = redisTemplate.opsForValue().get(RedisKeys.androidDeviceConnectionKey(connectionId));
if (raw == null || raw.isBlank()) {
return null;
}
try {
return objectMapper.readValue(raw, AndroidDeviceSessionState.class);
} catch (Exception ex) {
log.warn("Failed to read android device session, connectionId={}", connectionId, ex);
return null;
}
}
@Override
public void updateTopics(String deviceId, List<String> topics) {
try {
redisTemplate.opsForValue().set(
RedisKeys.androidDeviceTopicsKey(deviceId),
objectMapper.writeValueAsString(topics == null ? List.of() : topics)
);
} catch (Exception ex) {
throw new RuntimeException("Failed to update android device topics", ex);
}
}
@Override
public void closeSession(String connectionId) {
AndroidDeviceSessionState state = getByConnectionId(connectionId);
if (state == null) {
redisTemplate.delete(RedisKeys.androidDeviceConnectionKey(connectionId));
return;
}
String activeConn = redisTemplate.opsForValue().get(RedisKeys.androidDeviceActiveConnectionKey(state.getDeviceId()));
if (connectionId.equals(activeConn)) {
redisTemplate.delete(RedisKeys.androidDeviceActiveConnectionKey(state.getDeviceId()));
}
redisTemplate.delete(RedisKeys.androidDeviceOnlineKey(state.getDeviceId()));
redisTemplate.delete(RedisKeys.androidDeviceConnectionKey(connectionId));
}
private void writeState(AndroidDeviceSessionState state) {
Duration ttl = Duration.ofSeconds(grpcServerProperties.getGateway().getHeartbeatTimeoutSeconds());
try {
String json = objectMapper.writeValueAsString(state);
redisTemplate.opsForValue().set(RedisKeys.androidDeviceOnlineKey(state.getDeviceId()), json, ttl);
redisTemplate.opsForValue().set(RedisKeys.androidDeviceActiveConnectionKey(state.getDeviceId()), state.getConnectionId(), ttl);
redisTemplate.opsForValue().set(RedisKeys.androidDeviceConnectionKey(state.getConnectionId()), json, ttl);
} catch (Exception ex) {
throw new RuntimeException("Failed to write android device session state", ex);
}
}
private String nonBlank(String value, String defaultValue) {
return value != null && !value.isBlank() ? value : defaultValue;
}
}

View File

@ -0,0 +1,69 @@
package com.imeeting.service.android.impl;
import com.imeeting.grpc.gateway.GatewayServerPacket;
import com.imeeting.service.android.AndroidGatewayPushService;
import io.grpc.stub.StreamObserver;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class AndroidGatewayPushServiceImpl implements AndroidGatewayPushService {
private final Map<String, Binding> byConnectionId = new ConcurrentHashMap<>();
private final Map<String, Set<String>> byDeviceId = new ConcurrentHashMap<>();
@Override
public void register(String connectionId, String deviceId, StreamObserver<GatewayServerPacket> observer) {
byConnectionId.put(connectionId, new Binding(deviceId, observer));
byDeviceId.computeIfAbsent(deviceId, key -> ConcurrentHashMap.newKeySet()).add(connectionId);
}
@Override
public void unregister(String connectionId) {
Binding binding = byConnectionId.remove(connectionId);
if (binding == null) {
return;
}
Set<String> connectionIds = byDeviceId.get(binding.deviceId());
if (connectionIds == null) {
return;
}
connectionIds.remove(connectionId);
if (connectionIds.isEmpty()) {
byDeviceId.remove(binding.deviceId());
}
}
@Override
public boolean pushToConnection(String connectionId, GatewayServerPacket packet) {
Binding binding = byConnectionId.get(connectionId);
if (binding == null) {
return false;
}
synchronized (binding) {
binding.observer().onNext(packet);
}
return true;
}
@Override
public int pushToDevice(String deviceId, GatewayServerPacket packet) {
Set<String> connectionIds = byDeviceId.get(deviceId);
if (connectionIds == null || connectionIds.isEmpty()) {
return 0;
}
int pushed = 0;
for (String connectionId : connectionIds) {
if (pushToConnection(connectionId, packet)) {
pushed++;
}
}
return pushed;
}
private record Binding(String deviceId, StreamObserver<GatewayServerPacket> observer) {
}
}

View File

@ -0,0 +1,12 @@
package com.imeeting.service.realtime;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidOpenRealtimeGrpcSessionCommand;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionVO;
public interface AndroidRealtimeSessionTicketService {
AndroidRealtimeGrpcSessionVO createSession(Long meetingId, AndroidOpenRealtimeGrpcSessionCommand command, AndroidAuthContext authContext);
AndroidRealtimeGrpcSessionData getSessionData(String streamToken);
}

View File

@ -0,0 +1,71 @@
package com.imeeting.service.realtime;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData;
public interface AsrUpstreamBridgeService {
AsrUpstreamSession openSession(AndroidRealtimeGrpcSessionData sessionData, String connectionId, AsrUpstreamEventListener listener);
interface AsrUpstreamSession {
boolean isReady();
void sendAudio(byte[] payload);
void sendStopSpeaking();
void close(String reason);
}
interface AsrUpstreamEventListener {
void onReady();
void onTranscript(AsrTranscriptResult result);
void onError(String code, String message, boolean retryable);
void onClosed(String reason);
}
class AsrTranscriptResult {
private final boolean finalResult;
private final String text;
private final String speakerId;
private final String speakerName;
private final Integer startTime;
private final Integer endTime;
public AsrTranscriptResult(boolean finalResult, String text, String speakerId, String speakerName,
Integer startTime, Integer endTime) {
this.finalResult = finalResult;
this.text = text;
this.speakerId = speakerId;
this.speakerName = speakerName;
this.startTime = startTime;
this.endTime = endTime;
}
public boolean isFinalResult() {
return finalResult;
}
public String getText() {
return text;
}
public String getSpeakerId() {
return speakerId;
}
public String getSpeakerName() {
return speakerName;
}
public Integer getStartTime() {
return startTime;
}
public Integer getEndTime() {
return endTime;
}
}
}

View File

@ -0,0 +1,15 @@
package com.imeeting.service.realtime;
import com.imeeting.dto.android.AndroidAuthContext;
import io.grpc.stub.StreamObserver;
import com.imeeting.grpc.realtime.RealtimeServerPacket;
public interface RealtimeMeetingGrpcSessionService {
String openStream(String streamToken, AndroidAuthContext authContext, StreamObserver<RealtimeServerPacket> responseObserver);
void onAudio(String connectionId, byte[] payload, long seq, boolean lastChunk);
void onStopSpeaking(String connectionId);
void closeStream(String connectionId, String reason, boolean notifyClient);
}

View File

@ -0,0 +1,219 @@
package com.imeeting.service.realtime.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.imeeting.common.RedisKeys;
import com.imeeting.config.grpc.GrpcServerProperties;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidOpenRealtimeGrpcSessionCommand;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionVO;
import com.imeeting.dto.biz.AiModelVO;
import com.imeeting.dto.biz.RealtimeMeetingResumeConfig;
import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO;
import com.imeeting.entity.biz.Meeting;
import com.imeeting.service.biz.AiModelService;
import com.imeeting.service.biz.MeetingAccessService;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Service
@RequiredArgsConstructor
public class AndroidRealtimeSessionTicketServiceImpl implements AndroidRealtimeSessionTicketService {
private final ObjectMapper objectMapper;
private final StringRedisTemplate redisTemplate;
private final MeetingAccessService meetingAccessService;
private final AiModelService aiModelService;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final GrpcServerProperties grpcServerProperties;
@Override
public AndroidRealtimeGrpcSessionVO createSession(Long meetingId, AndroidOpenRealtimeGrpcSessionCommand command, AndroidAuthContext authContext) {
if (meetingId == null) {
throw new RuntimeException("Meeting ID is required");
}
Meeting meeting = meetingAccessService.requireMeeting(meetingId);
realtimeMeetingSessionStateService.initSessionIfAbsent(meetingId, meeting.getTenantId(), meeting.getCreatorId());
RealtimeMeetingSessionStatusVO currentStatus = realtimeMeetingSessionStateService.getStatus(meetingId);
RealtimeMeetingResumeConfig currentResumeConfig = currentStatus == null ? null : currentStatus.getResumeConfig();
Long asrModelId = firstNonNull(command == null ? null : command.getAsrModelId(), currentResumeConfig == null ? null : currentResumeConfig.getAsrModelId());
if (asrModelId == null) {
throw new RuntimeException("ASR model ID is required");
}
realtimeMeetingSessionStateService.assertCanOpenSession(meetingId);
AiModelVO asrModel = aiModelService.getModelById(asrModelId, "ASR");
if (asrModel == null) {
throw new RuntimeException("ASR model not found");
}
String targetWsUrl = resolveWsUrl(asrModel);
if (targetWsUrl == null || targetWsUrl.isBlank()) {
throw new RuntimeException("ASR model WebSocket is not configured");
}
RealtimeMeetingResumeConfig resumeConfig = buildResumeConfig(command, currentResumeConfig, asrModelId);
realtimeMeetingSessionStateService.rememberResumeConfig(meetingId, resumeConfig);
currentStatus = realtimeMeetingSessionStateService.getStatus(meetingId);
Map<String, Object> startMessage = buildStartMessage(asrModel, meetingId, resumeConfig);
AndroidRealtimeGrpcSessionData sessionData = new AndroidRealtimeGrpcSessionData();
sessionData.setMeetingId(meetingId);
sessionData.setTenantId(meeting.getTenantId());
sessionData.setUserId(meeting.getCreatorId());
sessionData.setDeviceId(authContext.getDeviceId());
sessionData.setAsrModelId(asrModelId);
sessionData.setTargetWsUrl(targetWsUrl);
sessionData.setResumeConfig(resumeConfig);
try {
sessionData.setStartMessageJson(objectMapper.writeValueAsString(startMessage));
} catch (Exception ex) {
throw new RuntimeException("Failed to serialize realtime start message", ex);
}
String streamToken = UUID.randomUUID().toString().replace("-", "");
Duration ttl = Duration.ofSeconds(grpcServerProperties.getRealtime().getSessionTtlSeconds());
try {
redisTemplate.opsForValue().set(
RedisKeys.realtimeMeetingGrpcSessionKey(streamToken),
objectMapper.writeValueAsString(sessionData),
ttl
);
} catch (Exception ex) {
throw new RuntimeException("Failed to create realtime gRPC session", ex);
}
AndroidRealtimeGrpcSessionVO vo = new AndroidRealtimeGrpcSessionVO();
vo.setMeetingId(meetingId);
vo.setStreamToken(streamToken);
vo.setExpiresInSeconds(ttl.toSeconds());
vo.setSampleRate(grpcServerProperties.getRealtime().getSampleRate());
vo.setChannels(grpcServerProperties.getRealtime().getChannels());
vo.setEncoding(grpcServerProperties.getRealtime().getEncoding());
vo.setResumeConfig(resumeConfig);
vo.setStatus(currentStatus);
return vo;
}
@Override
public AndroidRealtimeGrpcSessionData getSessionData(String streamToken) {
if (streamToken == null || streamToken.isBlank()) {
return null;
}
String raw = redisTemplate.opsForValue().get(RedisKeys.realtimeMeetingGrpcSessionKey(streamToken));
if (raw == null || raw.isBlank()) {
return null;
}
try {
return objectMapper.readValue(raw, AndroidRealtimeGrpcSessionData.class);
} catch (Exception ex) {
throw new RuntimeException("Failed to read realtime gRPC session", ex);
}
}
private RealtimeMeetingResumeConfig buildResumeConfig(AndroidOpenRealtimeGrpcSessionCommand command,
RealtimeMeetingResumeConfig currentResumeConfig,
Long asrModelId) {
RealtimeMeetingResumeConfig config = new RealtimeMeetingResumeConfig();
config.setAsrModelId(asrModelId);
config.setMode(nonBlank(command == null ? null : command.getMode(), currentResumeConfig == null ? null : currentResumeConfig.getMode(), "2pass"));
config.setLanguage(nonBlank(command == null ? null : command.getLanguage(), currentResumeConfig == null ? null : currentResumeConfig.getLanguage(), "auto"));
config.setUseSpkId(firstNonNull(command == null ? null : command.getUseSpkId(), currentResumeConfig == null ? null : currentResumeConfig.getUseSpkId(), 1));
config.setEnablePunctuation(firstNonNull(command == null ? null : command.getEnablePunctuation(), currentResumeConfig == null ? null : currentResumeConfig.getEnablePunctuation(), Boolean.TRUE));
config.setEnableItn(firstNonNull(command == null ? null : command.getEnableItn(), currentResumeConfig == null ? null : currentResumeConfig.getEnableItn(), Boolean.TRUE));
config.setEnableTextRefine(firstNonNull(command == null ? null : command.getEnableTextRefine(), currentResumeConfig == null ? null : currentResumeConfig.getEnableTextRefine(), Boolean.FALSE));
config.setSaveAudio(firstNonNull(command == null ? null : command.getSaveAudio(), currentResumeConfig == null ? null : currentResumeConfig.getSaveAudio(), Boolean.FALSE));
config.setHotwords(command != null && command.getHotwords() != null ? command.getHotwords() : currentResumeConfig == null ? List.of() : currentResumeConfig.getHotwords());
return config;
}
private String resolveWsUrl(AiModelVO model) {
if (model.getWsUrl() != null && !model.getWsUrl().isBlank()) {
return model.getWsUrl();
}
if (model.getBaseUrl() == null || model.getBaseUrl().isBlank()) {
return "";
}
return model.getBaseUrl()
.replaceFirst("^http://", "ws://")
.replaceFirst("^https://", "wss://");
}
private Map<String, Object> buildStartMessage(AiModelVO model, Long meetingId, RealtimeMeetingResumeConfig resumeConfig) {
Map<String, Object> root = new HashMap<>();
root.put("type", "start");
root.put("request_id", "android_" + System.currentTimeMillis() + "_" + meetingId);
root.put("authorization", buildAuthorization(model.getApiKey()));
Map<String, Object> config = new HashMap<>();
Map<String, Object> audio = new HashMap<>();
audio.put("format", "pcm");
audio.put("sample_rate", grpcServerProperties.getRealtime().getSampleRate());
audio.put("channels", grpcServerProperties.getRealtime().getChannels());
config.put("audio", audio);
Map<String, Object> recognition = new HashMap<>();
recognition.put("language", nonBlank(resumeConfig.getLanguage(), "auto"));
recognition.put("enable_punctuation", boolOrDefault(resumeConfig.getEnablePunctuation(), true));
recognition.put("enable_itn", boolOrDefault(resumeConfig.getEnableItn(), true));
recognition.put("enable_speaker", Integer.valueOf(1).equals(resumeConfig.getUseSpkId()));
recognition.put("enable_two_pass", !"online".equalsIgnoreCase(resumeConfig.getMode()));
recognition.put("enable_text_refine", boolOrDefault(resumeConfig.getEnableTextRefine(), false));
recognition.put("speaker_threshold", readSpeakerThreshold(model.getMediaConfig()));
recognition.put("hotwords", resumeConfig.getHotwords() == null ? List.of() : resumeConfig.getHotwords());
config.put("recognition", recognition);
config.put("model", model.getModelCode());
config.put("save_audio", boolOrDefault(resumeConfig.getSaveAudio(), false));
root.put("config", config);
return root;
}
private String buildAuthorization(String apiKey) {
if (apiKey == null || apiKey.isBlank()) {
return "";
}
return apiKey.startsWith("Bearer ") ? apiKey : "Bearer " + apiKey;
}
private Object readSpeakerThreshold(Map<String, Object> mediaConfig) {
return mediaConfig == null ? null : mediaConfig.get("svThreshold");
}
private boolean boolOrDefault(Boolean value, boolean defaultValue) {
return value != null ? value : defaultValue;
}
@SafeVarargs
private <T> T firstNonNull(T... values) {
for (T value : values) {
if (value != null) {
return value;
}
}
return null;
}
private String nonBlank(String value, String defaultValue) {
return nonBlank(value, defaultValue, null);
}
private String nonBlank(String value, String fallbackValue, String defaultValue) {
if (value != null && !value.isBlank()) {
return value.trim();
}
if (fallbackValue != null && !fallbackValue.isBlank()) {
return fallbackValue.trim();
}
return defaultValue;
}
}

View File

@ -0,0 +1,262 @@
package com.imeeting.service.realtime.impl;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData;
import com.imeeting.service.realtime.AsrUpstreamBridgeService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Service
@RequiredArgsConstructor
public class AsrUpstreamBridgeServiceImpl implements AsrUpstreamBridgeService {
private final ObjectMapper objectMapper;
@Override
public AsrUpstreamSession openSession(AndroidRealtimeGrpcSessionData sessionData, String connectionId,
AsrUpstreamEventListener listener) {
BridgeSession session = new BridgeSession(sessionData, connectionId, listener, objectMapper);
session.connect();
return session;
}
private static final class BridgeSession implements AsrUpstreamSession {
private final AndroidRealtimeGrpcSessionData sessionData;
private final String connectionId;
private final AsrUpstreamEventListener listener;
private final ObjectMapper objectMapper;
private final HttpClient httpClient = HttpClient.newHttpClient();
private final Queue<byte[]> pendingAudio = new ConcurrentLinkedQueue<>();
private final AtomicBoolean ready = new AtomicBoolean(false);
private final AtomicBoolean closed = new AtomicBoolean(false);
private final StringBuilder textBuffer = new StringBuilder();
private volatile WebSocket webSocket;
private CompletableFuture<Void> sendChain = CompletableFuture.completedFuture(null);
private BridgeSession(AndroidRealtimeGrpcSessionData sessionData, String connectionId,
AsrUpstreamEventListener listener, ObjectMapper objectMapper) {
this.sessionData = sessionData;
this.connectionId = connectionId;
this.listener = listener;
this.objectMapper = objectMapper;
}
private void connect() {
try {
WebSocket socket = httpClient.newWebSocketBuilder()
.buildAsync(URI.create(sessionData.getTargetWsUrl()), new ListenerImpl())
.get();
this.webSocket = socket;
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
listener.onError("REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接第三方识别服务时被中断", true);
} catch (ExecutionException ex) {
listener.onError("REALTIME_UPSTREAM_CONNECT_FAILED", "连接第三方识别服务失败,请检查模型 WebSocket 配置或服务状态", true);
}
}
@Override
public boolean isReady() {
return ready.get();
}
@Override
public void sendAudio(byte[] payload) {
if (payload == null || payload.length == 0 || closed.get()) {
return;
}
if (!ready.get() || webSocket == null) {
pendingAudio.add(payload);
return;
}
sendOrdered(() -> webSocket.sendBinary(ByteBuffer.wrap(payload), true));
}
@Override
public void sendStopSpeaking() {
if (closed.get() || webSocket == null) {
return;
}
sendOrdered(() -> webSocket.sendText("{\"is_speaking\":false}", true));
}
@Override
public void close(String reason) {
if (!closed.compareAndSet(false, true)) {
return;
}
if (webSocket != null) {
webSocket.sendClose(1000, reason == null ? "" : reason);
}
}
private void sendOrdered(java.util.function.Supplier<CompletableFuture<WebSocket>> action) {
synchronized (this) {
sendChain = sendChain.exceptionally(ex -> null)
.thenCompose(ignored -> action.get().thenApply(ws -> null));
}
}
private void flushPendingAudio() {
byte[] payload;
while ((payload = pendingAudio.poll()) != null) {
sendAudio(payload);
}
}
private void handleTextMessage(String payload) {
try {
JsonNode root = objectMapper.readTree(payload);
if ((root.hasNonNull("code") || "error".equals(root.path("type").asText())) && root.hasNonNull("message")) {
listener.onError(
root.path("code").asText("REALTIME_UPSTREAM_ERROR"),
root.path("message").asText("第三方识别服务连接异常"),
true
);
return;
}
AsrTranscriptResult result = normalizeTranscript(root);
if (result != null) {
listener.onTranscript(result);
}
} catch (Exception ex) {
log.debug("Ignore invalid upstream ASR payload, connectionId={}, payload={}", connectionId, payload, ex);
}
}
private AsrTranscriptResult normalizeTranscript(JsonNode root) {
String type = root.path("type").asText();
if ("partial".equals(type) || "segment".equals(type)) {
JsonNode data = root.path("data");
String text = data.path("text").asText("");
if (text.isBlank()) {
return null;
}
boolean isFinal = "segment".equals(type) || data.path("is_final").asBoolean(false);
String speakerId = textValue(data, "speaker_id");
String speakerName = textValue(data, "speaker_name");
if (speakerName == null) {
speakerName = speakerId;
}
Integer startTime = toMs(data.path("start"));
Integer endTime = toMs(data.path("end"));
return new AsrTranscriptResult(isFinal, text, speakerId, speakerName, startTime, endTime);
}
String text = root.path("text").asText("");
if (text.isBlank()) {
return null;
}
boolean isFinal = root.path("is_final").asBoolean(false);
JsonNode speaker = root.path("speaker");
String speakerId = null;
String speakerName = null;
if (speaker.isTextual()) {
speakerId = speaker.asText();
speakerName = speaker.asText();
} else if (speaker.isObject()) {
speakerId = textValue(speaker, "user_id");
if (speakerId == null) {
speakerId = textValue(speaker, "speaker_id");
}
speakerName = textValue(speaker, "name");
if (speakerName == null) {
speakerName = speakerId;
}
}
Integer startTime = null;
Integer endTime = null;
JsonNode timestamp = root.path("timestamp");
if (timestamp.isArray() && timestamp.size() > 0) {
JsonNode first = timestamp.get(0);
JsonNode last = timestamp.get(timestamp.size() - 1);
if (first.isArray() && first.size() > 0) {
startTime = first.get(0).isInt() ? first.get(0).asInt() : null;
}
if (last.isArray() && last.size() > 1) {
endTime = last.get(1).isInt() ? last.get(1).asInt() : null;
}
}
return new AsrTranscriptResult(isFinal, text, speakerId, speakerName, startTime, endTime);
}
private String textValue(JsonNode node, String fieldName) {
JsonNode target = node.path(fieldName);
if (target.isMissingNode() || target.isNull()) {
return null;
}
String value = target.asText();
return value == null || value.isBlank() ? null : value;
}
private Integer toMs(JsonNode node) {
if (!node.isNumber()) {
return null;
}
return (int) Math.round(node.asDouble() * 1000D);
}
private final class ListenerImpl implements WebSocket.Listener {
@Override
public void onOpen(WebSocket webSocket) {
sendOrdered(() -> webSocket.sendText(sessionData.getStartMessageJson(), true));
ready.set(true);
flushPendingAudio();
listener.onReady();
webSocket.request(1);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
textBuffer.append(data);
if (last) {
handleTextMessage(textBuffer.toString());
textBuffer.setLength(0);
}
webSocket.request(1);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
webSocket.request(1);
return CompletableFuture.completedFuture(null);
}
@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
if (closed.compareAndSet(false, true)) {
listener.onClosed(reason == null || reason.isBlank() ? "upstream_closed" : reason);
}
return CompletableFuture.completedFuture(null);
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
if (closed.compareAndSet(false, true)) {
listener.onError(
"REALTIME_UPSTREAM_ERROR",
error == null || error.getMessage() == null || error.getMessage().isBlank()
? "第三方识别服务连接异常"
: "第三方识别服务连接异常: " + error.getMessage(),
true
);
}
}
}
}
}

View File

@ -0,0 +1,278 @@
package com.imeeting.service.realtime.impl;
import com.imeeting.common.RedisKeys;
import com.imeeting.config.grpc.GrpcServerProperties;
import com.imeeting.dto.android.AndroidAuthContext;
import com.imeeting.dto.android.AndroidRealtimeGrpcSessionData;
import com.imeeting.dto.biz.RealtimeMeetingSessionStatusVO;
import com.imeeting.dto.biz.RealtimeTranscriptItemDTO;
import com.imeeting.grpc.common.ErrorEvent;
import com.imeeting.grpc.realtime.RealtimeServerPacket;
import com.imeeting.grpc.realtime.SessionStatusEvent;
import com.imeeting.grpc.realtime.StreamClosed;
import com.imeeting.grpc.realtime.StreamReady;
import com.imeeting.grpc.realtime.TranscriptEvent;
import com.imeeting.service.biz.MeetingCommandService;
import com.imeeting.service.biz.RealtimeMeetingSessionStateService;
import com.imeeting.service.realtime.AndroidRealtimeSessionTicketService;
import com.imeeting.service.realtime.AsrUpstreamBridgeService;
import com.imeeting.service.realtime.RealtimeMeetingGrpcSessionService;
import io.grpc.stub.StreamObserver;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Service
@RequiredArgsConstructor
public class RealtimeMeetingGrpcSessionServiceImpl implements RealtimeMeetingGrpcSessionService {
private final AndroidRealtimeSessionTicketService ticketService;
private final AsrUpstreamBridgeService asrUpstreamBridgeService;
private final MeetingCommandService meetingCommandService;
private final RealtimeMeetingSessionStateService realtimeMeetingSessionStateService;
private final StringRedisTemplate redisTemplate;
private final GrpcServerProperties grpcServerProperties;
private final ConcurrentMap<String, SessionRuntime> sessions = new ConcurrentHashMap<>();
@Override
public String openStream(String streamToken, AndroidAuthContext authContext, StreamObserver<RealtimeServerPacket> responseObserver) {
AndroidRealtimeGrpcSessionData sessionData = ticketService.getSessionData(streamToken);
if (sessionData == null) {
throw new RuntimeException("Invalid realtime gRPC session token");
}
if (sessionData.getDeviceId() != null && !sessionData.getDeviceId().isBlank()
&& authContext.getDeviceId() != null
&& !sessionData.getDeviceId().equals(authContext.getDeviceId())) {
throw new RuntimeException("Realtime gRPC session token does not match deviceId");
}
String connectionId = "grpc_" + java.util.UUID.randomUUID().toString().replace("-", "");
SessionRuntime runtime = new SessionRuntime(connectionId, streamToken, sessionData, responseObserver);
SessionRuntime previous = sessions.putIfAbsent(connectionId, runtime);
if (previous != null) {
throw new RuntimeException("Duplicate realtime gRPC connectionId");
}
writeConnectionState(runtime);
runtime.upstreamSession = asrUpstreamBridgeService.openSession(sessionData, connectionId, new UpstreamCallback(runtime));
return connectionId;
}
@Override
public void onAudio(String connectionId, byte[] payload, long seq, boolean lastChunk) {
SessionRuntime runtime = sessions.get(connectionId);
if (runtime == null || runtime.closed.get()) {
return;
}
touchConnectionState(runtime);
runtime.upstreamSession.sendAudio(payload);
if (lastChunk) {
runtime.upstreamSession.sendStopSpeaking();
}
}
@Override
public void onStopSpeaking(String connectionId) {
SessionRuntime runtime = sessions.get(connectionId);
if (runtime == null || runtime.closed.get()) {
return;
}
touchConnectionState(runtime);
runtime.upstreamSession.sendStopSpeaking();
}
@Override
public void closeStream(String connectionId, String reason, boolean notifyClient) {
SessionRuntime runtime = sessions.remove(connectionId);
if (runtime == null) {
return;
}
if (!runtime.closed.compareAndSet(false, true)) {
return;
}
try {
if (runtime.upstreamSession != null) {
runtime.upstreamSession.close(reason == null ? "closed" : reason);
}
} catch (Exception ex) {
log.warn("Failed to close upstream realtime session, connectionId={}", connectionId, ex);
}
redisTemplate.delete(RedisKeys.realtimeMeetingGrpcConnectionKey(connectionId));
realtimeMeetingSessionStateService.pauseByDisconnect(runtime.sessionData.getMeetingId(), connectionId);
if (notifyClient) {
runtime.send(RealtimeServerPacket.newBuilder()
.setClosed(StreamClosed.newBuilder()
.setMeetingId(runtime.sessionData.getMeetingId())
.setReason(reason == null ? "closed" : reason)
.build())
.build());
}
runtime.complete();
}
private void writeConnectionState(SessionRuntime runtime) {
Duration ttl = Duration.ofSeconds(grpcServerProperties.getRealtime().getConnectionTtlSeconds());
String value = runtime.sessionData.getMeetingId() + ":" + runtime.sessionData.getDeviceId() + ":" + runtime.streamToken;
redisTemplate.opsForValue().set(RedisKeys.realtimeMeetingGrpcConnectionKey(runtime.connectionId), value, ttl);
}
private void touchConnectionState(SessionRuntime runtime) {
writeConnectionState(runtime);
}
private long nextEventSeq(Long meetingId) {
Long value = redisTemplate.opsForValue().increment(RedisKeys.realtimeMeetingEventSeqKey(meetingId));
return value == null ? 1L : value;
}
private TranscriptEvent.TranscriptType toTranscriptType(boolean finalResult) {
return finalResult ? TranscriptEvent.TranscriptType.FINAL : TranscriptEvent.TranscriptType.PARTIAL;
}
private SessionStatusEvent buildStatusEvent(Long meetingId) {
RealtimeMeetingSessionStatusVO status = realtimeMeetingSessionStateService.getStatus(meetingId);
if (status == null) {
return SessionStatusEvent.newBuilder().setMeetingId(meetingId).build();
}
return SessionStatusEvent.newBuilder()
.setMeetingId(meetingId)
.setStatus(nullToEmpty(status.getStatus()))
.setHasTranscript(Boolean.TRUE.equals(status.getHasTranscript()))
.setCanResume(Boolean.TRUE.equals(status.getCanResume()))
.setRemainingSeconds(status.getRemainingSeconds() == null ? 0L : status.getRemainingSeconds())
.setActiveConnection(Boolean.TRUE.equals(status.getActiveConnection()))
.build();
}
private String nullToEmpty(String value) {
return value == null ? "" : value;
}
private final class UpstreamCallback implements AsrUpstreamBridgeService.AsrUpstreamEventListener {
private final SessionRuntime runtime;
private UpstreamCallback(SessionRuntime runtime) {
this.runtime = runtime;
}
@Override
public void onReady() {
if (runtime.closed.get()) {
return;
}
if (!realtimeMeetingSessionStateService.activate(runtime.sessionData.getMeetingId(), runtime.connectionId)) {
runtime.sendError("REALTIME_ACTIVE_CONNECTION_EXISTS", "当前会议已有活跃实时连接,请先关闭旧连接后再继续", false);
closeStream(runtime.connectionId, "active_connection_exists", true);
return;
}
touchConnectionState(runtime);
runtime.send(RealtimeServerPacket.newBuilder()
.setReady(StreamReady.newBuilder()
.setConnectionId(runtime.connectionId)
.setMeetingId(runtime.sessionData.getMeetingId())
.setServerTime(System.currentTimeMillis())
.build())
.build());
runtime.send(RealtimeServerPacket.newBuilder().setStatus(buildStatusEvent(runtime.sessionData.getMeetingId())).build());
}
@Override
public void onTranscript(AsrUpstreamBridgeService.AsrTranscriptResult result) {
if (runtime.closed.get() || result == null || result.getText() == null || result.getText().isBlank()) {
return;
}
if (result.isFinalResult()) {
RealtimeTranscriptItemDTO item = new RealtimeTranscriptItemDTO();
item.setSpeakerId(result.getSpeakerId());
item.setSpeakerName(result.getSpeakerName());
item.setContent(result.getText());
item.setStartTime(result.getStartTime());
item.setEndTime(result.getEndTime());
meetingCommandService.appendRealtimeTranscripts(runtime.sessionData.getMeetingId(), List.of(item));
}
runtime.send(RealtimeServerPacket.newBuilder()
.setTranscript(TranscriptEvent.newBuilder()
.setMeetingId(runtime.sessionData.getMeetingId())
.setEventSeq(nextEventSeq(runtime.sessionData.getMeetingId()))
.setType(toTranscriptType(result.isFinalResult()))
.setText(result.getText())
.setSpeakerId(nullToEmpty(result.getSpeakerId()))
.setSpeakerName(nullToEmpty(result.getSpeakerName()))
.setStartTime(result.getStartTime() == null ? 0 : result.getStartTime())
.setEndTime(result.getEndTime() == null ? 0 : result.getEndTime())
.build())
.build());
}
@Override
public void onError(String code, String message, boolean retryable) {
if (runtime.closed.get()) {
return;
}
runtime.sendError(code, message, retryable);
closeStream(runtime.connectionId, code == null ? "upstream_error" : code, true);
}
@Override
public void onClosed(String reason) {
closeStream(runtime.connectionId, reason == null ? "upstream_closed" : reason, true);
}
}
private static final class SessionRuntime {
private final String connectionId;
private final String streamToken;
private final AndroidRealtimeGrpcSessionData sessionData;
private final StreamObserver<RealtimeServerPacket> responseObserver;
private final AtomicBoolean closed = new AtomicBoolean(false);
private AsrUpstreamBridgeService.AsrUpstreamSession upstreamSession;
private SessionRuntime(String connectionId, String streamToken, AndroidRealtimeGrpcSessionData sessionData,
StreamObserver<RealtimeServerPacket> responseObserver) {
this.connectionId = connectionId;
this.streamToken = streamToken;
this.sessionData = sessionData;
this.responseObserver = responseObserver;
}
private void send(RealtimeServerPacket packet) {
try {
synchronized (responseObserver) {
responseObserver.onNext(packet);
}
} catch (Exception ignored) {
// ignore downstream delivery failure
}
}
private void sendError(String code, String message, boolean retryable) {
send(RealtimeServerPacket.newBuilder()
.setError(ErrorEvent.newBuilder()
.setCode(code == null ? "REALTIME_ERROR" : code)
.setMessage(message == null ? "未知错误" : message)
.setRetryable(retryable)
.build())
.build());
}
private void complete() {
try {
responseObserver.onCompleted();
} catch (Exception ignored) {
// ignore observer completion failure
}
}
}
}

View File

@ -0,0 +1,36 @@
syntax = "proto3";
package imeeting.android.common;
option java_multiple_files = true;
option java_package = "com.imeeting.grpc.common";
option java_outer_classname = "AndroidCommonProto";
message ClientAuth {
enum AuthType {
AUTH_TYPE_UNSPECIFIED = 0;
NONE = 1;
DEVICE_TOKEN = 2;
USER_JWT = 3;
STREAM_TOKEN = 4;
}
AuthType auth_type = 1;
string access_token = 2;
string device_id = 3;
string tenant_code = 4;
string app_id = 5;
string app_version = 6;
string platform = 7;
}
message ErrorEvent {
string code = 1;
string message = 2;
bool retryable = 3;
}
message JsonPayload {
string topic = 1;
string json = 2;
}

View File

@ -0,0 +1,85 @@
syntax = "proto3";
package imeeting.android.gateway;
option java_multiple_files = true;
option java_package = "com.imeeting.grpc.gateway";
option java_outer_classname = "AndroidGatewayProto";
import "android/common.proto";
service AndroidGatewayService {
rpc Connect(stream GatewayClientPacket) returns (stream GatewayServerPacket);
}
message GatewayClientPacket {
string request_id = 1;
imeeting.android.common.ClientAuth auth = 2;
oneof body {
DeviceHello hello = 10;
Heartbeat heartbeat = 11;
Ack ack = 12;
Subscribe subscribe = 13;
}
}
message GatewayServerPacket {
string request_id = 1;
oneof body {
HelloAck hello_ack = 10;
ServerPush push = 11;
DevicePresenceEvent device_presence = 12;
imeeting.android.common.ErrorEvent error = 13;
Pong pong = 14;
}
}
message DeviceHello {
string device_id = 1;
string device_name = 2;
string device_model = 3;
string os_version = 4;
string app_version = 5;
string network_type = 6;
}
message HelloAck {
string connection_id = 1;
string auth_mode = 2;
int64 server_time = 3;
int64 heartbeat_interval_seconds = 4;
}
message Heartbeat {
string connection_id = 1;
int64 client_time = 2;
}
message Pong {
string connection_id = 1;
int64 server_time = 2;
}
message Ack {
string message_id = 1;
}
message Subscribe {
repeated string topics = 1;
}
message ServerPush {
string message_id = 1;
string topic = 2;
string type = 3;
string json = 4;
int64 server_time = 5;
}
message DevicePresenceEvent {
string device_id = 1;
string status = 2;
int64 last_seen_time = 3;
}

View File

@ -0,0 +1,100 @@
syntax = "proto3";
package imeeting.android.realtime;
option java_multiple_files = true;
option java_package = "com.imeeting.grpc.realtime";
option java_outer_classname = "RealtimeMeetingProto";
import "android/common.proto";
service RealtimeMeetingService {
rpc StreamMeetingAudio(stream RealtimeClientPacket) returns (stream RealtimeServerPacket);
}
message RealtimeClientPacket {
string request_id = 1;
imeeting.android.common.ClientAuth auth = 2;
oneof body {
OpenMeetingStream open = 10;
AudioChunk audio = 11;
RealtimeControl control = 12;
}
}
message RealtimeServerPacket {
string request_id = 1;
oneof body {
StreamReady ready = 10;
TranscriptEvent transcript = 11;
SessionStatusEvent status = 12;
imeeting.android.common.ErrorEvent error = 13;
StreamClosed closed = 14;
}
}
message OpenMeetingStream {
string stream_token = 1;
int64 meeting_id = 2;
int32 sample_rate = 3;
int32 channels = 4;
string encoding = 5;
}
message AudioChunk {
bytes pcm16 = 1;
int64 seq = 2;
int64 client_time = 3;
bool last_chunk = 4;
}
message RealtimeControl {
enum ControlType {
CONTROL_TYPE_UNSPECIFIED = 0;
START = 1;
STOP_SPEAKING = 2;
END_INPUT = 3;
CLOSE_STREAM = 4;
}
ControlType type = 1;
}
message StreamReady {
string connection_id = 1;
int64 meeting_id = 2;
int64 server_time = 3;
}
message TranscriptEvent {
enum TranscriptType {
TRANSCRIPT_TYPE_UNSPECIFIED = 0;
PARTIAL = 1;
FINAL = 2;
}
TranscriptType type = 1;
int64 meeting_id = 2;
int64 event_seq = 3;
string text = 4;
string speaker_id = 5;
string speaker_name = 6;
int32 start_time = 7;
int32 end_time = 8;
}
message SessionStatusEvent {
int64 meeting_id = 1;
string status = 2;
bool has_transcript = 3;
bool can_resume = 4;
int64 remaining_seconds = 5;
bool active_connection = 6;
}
message StreamClosed {
int64 meeting_id = 1;
string reason = 2;
}

View File

@ -1,4 +1,4 @@
server:
server:
port: ${SERVER_PORT:8080}
spring:
@ -59,3 +59,21 @@ imeeting:
resume-window-minutes: 30
empty-session-retention-minutes: 720
redis-expire-listener-enabled: true
grpc:
enabled: true
port: 19090
max-inbound-message-size: 4194304
reflection-enabled: true
gateway:
heartbeat-interval-seconds: 15
heartbeat-timeout-seconds: 45
realtime:
session-ttl-seconds: 600
sample-rate: 16000
channels: 1
encoding: PCM16LE
connection-ttl-seconds: 1800
auth:
enabled: false
allow-anonymous: true