diff --git a/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java b/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java index 1d50161..2365b8f 100644 --- a/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java +++ b/backend/src/main/java/com/imeeting/websocket/RealtimeMeetingProxyWebSocketHandler.java @@ -22,7 +22,9 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Supplier; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -81,10 +83,12 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl Thread.currentThread().interrupt(); log.error("Realtime websocket upstream connect interrupted: meetingId={}, sessionId={}", sessionData.getMeetingId(), session.getId(), ex); + sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_INTERRUPTED", "连接第三方识别服务时被中断"); frontendSession.close(CloseStatus.SERVER_ERROR.withReason("Interrupted while connecting upstream")); return; } catch (ExecutionException | CompletionException ex) { log.warn("Failed to connect upstream websocket, meetingId={}, target={}", sessionData.getMeetingId(), sessionData.getTargetWsUrl(), ex); + sendFrontendError(frontendSession, "REALTIME_UPSTREAM_CONNECT_FAILED", "连接第三方识别服务失败,请检查模型 WebSocket 配置或服务状态"); frontendSession.close(CloseStatus.SERVER_ERROR.withReason("Failed to connect ASR websocket")); return; } @@ -253,6 +257,21 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl return normalized.contains("\"type\":\"start\""); } + private void sendFrontendError(ConcurrentWebSocketSessionDecorator frontendSession, String code, String message) { + try { + if (!frontendSession.isOpen()) { + return; + } + Map payload = new HashMap<>(); + payload.put("type", "error"); + payload.put("code", code); + payload.put("message", message); + frontendSession.sendMessage(new TextMessage(new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(payload))); + } catch (Exception ex) { + log.warn("Failed to send realtime proxy error to frontend: code={}", code, ex); + } + } + @SuppressWarnings("unchecked") private void queuePendingAudioFrame(WebSocketSession session, byte[] payload) { synchronized (session) { @@ -304,6 +323,15 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl public void onOpen(java.net.http.WebSocket webSocket) { log.info("Upstream websocket opened: meetingId={}, sessionId={}, upstream={}", meetingId, rawSession.getId(), targetWsUrl); + try { + if (frontendSession.isOpen()) { + frontendSession.sendMessage(new TextMessage("{\"type\":\"proxy_ready\"}")); + } + } catch (Exception ex) { + log.error("Failed to notify frontend that upstream websocket is ready: meetingId={}, sessionId={}", meetingId, rawSession.getId(), ex); + closeFrontend(CloseStatus.SERVER_ERROR); + return; + } webSocket.request(1); } @@ -391,6 +419,7 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl public java.util.concurrent.CompletionStage onClose(java.net.http.WebSocket webSocket, int statusCode, String reason) { log.info("Upstream websocket closed: meetingId={}, sessionId={}, code={}, reason={}", meetingId, rawSession.getId(), statusCode, reason); + sendFrontendError("REALTIME_UPSTREAM_CLOSED", reason == null || reason.isBlank() ? "第三方识别服务已断开连接" : "第三方识别服务已断开: " + reason); closeFrontend(new CloseStatus(statusCode, reason)); return COMPLETED; } @@ -399,9 +428,34 @@ public class RealtimeMeetingProxyWebSocketHandler extends AbstractWebSocketHandl public void onError(java.net.http.WebSocket webSocket, Throwable error) { log.error("Upstream websocket error: meetingId={}, sessionId={}, upstream={}", meetingId, rawSession.getId(), targetWsUrl, error); + sendFrontendError("REALTIME_UPSTREAM_ERROR", error == null || error.getMessage() == null || error.getMessage().isBlank() + ? "第三方识别服务连接异常" + : "第三方识别服务连接异常: " + error.getMessage()); closeFrontend(CloseStatus.SERVER_ERROR); } + private void sendFrontendError(String code, String message) { + try { + if (!frontendSession.isOpen()) { + return; + } + frontendSession.sendMessage(new TextMessage("{\"type\":\"error\",\"code\":\"" + code + "\",\"message\":\"" + escapeJson(message) + "\"}")); + } catch (Exception ex) { + log.warn("Failed to send upstream error to frontend: meetingId={}, sessionId={}, code={}", meetingId, rawSession.getId(), code, ex); + } + } + + private String escapeJson(String value) { + if (value == null) { + return ""; + } + return value + .replace("\\", "\\\\") + .replace("\"", "\\\"") + .replace("\r", "\\r") + .replace("\n", "\\n"); + } + private void closeFrontend(CloseStatus status) { try { if (rawSession.isOpen()) { diff --git a/frontend/src/pages/business/RealtimeAsrSession.tsx b/frontend/src/pages/business/RealtimeAsrSession.tsx index 973db7b..cabc4d3 100644 --- a/frontend/src/pages/business/RealtimeAsrSession.tsx +++ b/frontend/src/pages/business/RealtimeAsrSession.tsx @@ -45,7 +45,7 @@ const CHUNK_SIZE = 1280; type WsSpeaker = string | { name?: string; user_id?: string | number } | undefined; type WsMessage = { type?: string; - code?: number; + code?: number | string; message?: string; data?: { text?: string; @@ -203,6 +203,7 @@ export default function RealtimeAsrSession() { const audioBufferRef = useRef([]); const completeOnceRef = useRef(false); const startedAtRef = useRef(null); + const sessionStartedRef = useRef(false); const finalTranscriptCount = transcripts.length; const totalTranscriptChars = useMemo( @@ -306,6 +307,18 @@ export default function RealtimeAsrSession() { setAudioLevel(0); }; + const handleFatalRealtimeError = async (errorMessage: string) => { + setConnecting(false); + setRecording(false); + setStatusText("连接失败"); + sessionStartedRef.current = false; + wsRef.current?.close(); + wsRef.current = null; + await shutdownAudioPipeline(); + startedAtRef.current = null; + message.error(errorMessage); + }; + const startAudioPipeline = async () => { if (!window.isSecureContext || !navigator.mediaDevices?.getUserMedia) { throw new Error("当前浏览器环境不支持麦克风访问。请使用 localhost 或 HTTPS 域名访问系统。"); @@ -384,6 +397,7 @@ export default function RealtimeAsrSession() { setConnecting(true); setStatusText("连接识别服务..."); + sessionStartedRef.current = false; try { const socketSessionRes = await openRealtimeMeetingSocketSession(meetingId, { asrModelId: sessionDraft.asrModelId, @@ -401,21 +415,36 @@ export default function RealtimeAsrSession() { socket.binaryType = "arraybuffer"; wsRef.current = socket; - socket.onopen = async () => { - socket.send(JSON.stringify(socketSession.startMessage || {})); - await startAudioPipeline(); - startedAtRef.current = Date.now(); - setConnecting(false); - setRecording(true); - setStatusText("实时识别中"); + socket.onopen = () => { + setStatusText("识别服务连接中,等待第三方服务就绪..."); }; socket.onmessage = (event) => { try { const payload = JSON.parse(event.data) as WsMessage; - if (payload.code && payload.message) { + if (payload.type === "proxy_ready") { + if (sessionStartedRef.current) { + return; + } + sessionStartedRef.current = true; + setStatusText("启动音频采集中..."); + socket.send(JSON.stringify(socketSession.startMessage || {})); + void startAudioPipeline() + .then(() => { + startedAtRef.current = Date.now(); + setConnecting(false); + setRecording(true); + setStatusText("实时识别中"); + }) + .catch((error) => { + void handleFatalRealtimeError(error instanceof Error ? error.message : "启动麦克风失败"); + }); + return; + } + + if ((payload.code || payload.type === "error") && payload.message) { setStatusText(payload.message); - message.error(payload.message); + void handleFatalRealtimeError(payload.message); return; } @@ -451,19 +480,18 @@ export default function RealtimeAsrSession() { }; socket.onerror = () => { - setConnecting(false); - setRecording(false); - setStatusText("连接失败"); - message.error("实时识别 WebSocket 连接失败"); + void handleFatalRealtimeError("实时识别 WebSocket 连接失败"); }; socket.onclose = () => { setConnecting(false); setRecording(false); + sessionStartedRef.current = false; }; } catch (error) { setConnecting(false); setStatusText("启动失败"); + sessionStartedRef.current = false; message.error(error instanceof Error ? error.message : "启动实时识别失败"); } }; @@ -482,6 +510,7 @@ export default function RealtimeAsrSession() { } wsRef.current?.close(); wsRef.current = null; + sessionStartedRef.current = false; await shutdownAudioPipeline(); @@ -500,6 +529,7 @@ export default function RealtimeAsrSession() { setRecording(false); setFinishing(false); startedAtRef.current = null; + sessionStartedRef.current = false; } };