import { useLoadingStore } from "@/stores/loadingStore"; import { sendInput } from "@/api/api"; // ============ 类型定义 ============ interface SocketOptions { heartbeatInterval?: number; reconnectInterval?: number; maxReconnectAttempts?: number; retryIntervals?: number[]; forceClose?: boolean; timeOut?: boolean; } interface PendingMessage { id: string; data: string; retries: number; timestamp: number; } // ============ 默认配置 ============ const DEFAULT_OPTIONS: Required = { heartbeatInterval: 2000, reconnectInterval: 1000, maxReconnectAttempts: 10, retryIntervals: [2000, 3000, 5000], // 2秒、3秒、5秒重试 forceClose: false, timeOut: false, }; const MAX_RECONNECT_INTERVAL = 30000; const MAX_HEARTBEAT_MISS = 3; const RETRY_CHECK_INTERVAL = 1000; // ============ Socket 类 ============ class Socket { private url: string; private ws: WebSocket | null = null; private opts: Required; // 连接管理 private reconnectAttempts = 0; private reconnectTimeoutId: number | null = null; // 心跳管理 private heartbeatIntervalId: number | null = null; private heartbeatMissCount = 0; // 消息管理 private sendQueue: PendingMessage[] = []; private pendingConfirmations = new Map(); private retryCheckerId: number | null = null; // 事件管理 private listeners: Record = {}; constructor(url: string, opts: SocketOptions = {}) { this.url = url; this.opts = { ...DEFAULT_OPTIONS, ...opts }; this.init(); this.setupBrowserListeners(); } // ============ 初始化与连接 ============ private init(): void { if (this.isConnectingOrOpen()) return; this.heartbeatMissCount = 0; this.ws = new WebSocket(this.url); this.bindWebSocketEvents(); } private bindWebSocketEvents(): void { if (!this.ws) return; this.ws.onopen = this.handleOpen.bind(this); this.ws.onmessage = this.handleMessage.bind(this); this.ws.onerror = this.handleError.bind(this); this.ws.onclose = this.handleClose.bind(this); } // ============ WebSocket 事件处理 ============ private handleOpen(event: Event): void { this.reconnectAttempts = 0; this.clearReconnectTimeout(); this.startHeartbeat(); this.startRetryChecker(); this.emit("open", event); this.flushSendQueue(); } private handleMessage(event: MessageEvent): void { try { const data = JSON.parse(event.data); switch (data.event) { case "heartbeat": this.heartbeatMissCount = 0; break; case "ack": this.handleAck(data.messageId); break; default: this.emit("message", event.data); } } catch { this.emit("message", event.data); } } private handleError(event: Event): void { this.emit("error", event); } private handleClose(event: CloseEvent): void { this.stopHeartbeat(); this.stopRetryChecker(); this.emit("close", event); this.scheduleReconnect(); } private handleAck(messageId: string): void { if (messageId && this.pendingConfirmations.has(messageId)) { this.pendingConfirmations.delete(messageId); } } // ============ 连接状态 ============ private isConnectingOrOpen(): boolean { return this.ws?.readyState === WebSocket.CONNECTING || this.ws?.readyState === WebSocket.OPEN; } private isConnected(): boolean { return this.ws?.readyState === WebSocket.OPEN; } private isClosed(): boolean { return this.ws?.readyState === WebSocket.CLOSED; } // ============ 重连机制 ============ private scheduleReconnect(): void { if (!this.canReconnect() || this.reconnectTimeoutId !== null) return; const timeout = Math.min( this.opts.reconnectInterval * Math.pow(2, this.reconnectAttempts), MAX_RECONNECT_INTERVAL ); this.reconnectTimeoutId = window.setTimeout(() => { this.reconnectAttempts++; this.reconnectTimeoutId = null; if (this.isClosed()) { this.init(); } }, timeout); } private canReconnect(): boolean { return !this.opts.maxReconnectAttempts || this.reconnectAttempts < this.opts.maxReconnectAttempts; } private clearReconnectTimeout(): void { if (this.reconnectTimeoutId !== null) { clearTimeout(this.reconnectTimeoutId); this.reconnectTimeoutId = null; } } private reconnectIfNeeded(): void { if (this.isClosed() && this.canReconnect() && !this.isConnectingOrOpen()) { this.init(); } } // ============ 心跳机制 ============ private startHeartbeat(): void { if (!this.opts.heartbeatInterval) return; this.heartbeatIntervalId = window.setInterval(() => { if (this.heartbeatMissCount >= MAX_HEARTBEAT_MISS) { this.ws?.close(); return; } this.heartbeatMissCount++; if (this.isConnected()) { this.ws!.send(JSON.stringify({ event: "heartbeat", content: { tag: "user" } })); } }, this.opts.heartbeatInterval); } private stopHeartbeat(): void { if (this.heartbeatIntervalId) { clearInterval(this.heartbeatIntervalId); this.heartbeatIntervalId = null; } } // ============ 消息发送 ============ async send(data: string): Promise { try { const message = JSON.parse(data); const pendingMsg = this.createPendingMessage(message); if (this.isConnected()) { // WebSocket 连接正常,直接发送 this.sendDirect(pendingMsg); } else if (this.canReconnect() && !this.isConnectingOrOpen()) { // 可以重连,加入队列等待重连后发送 this.enqueue(pendingMsg); this.reconnectIfNeeded(); } else { await this.sendViaHttp(message); } } catch { console.error("[WebSocket] Invalid message format. Must be valid JSON."); } } private async sendViaHttp(message: any): Promise { try { if (message.event !== "input_text") { await sendInput(message); } } catch (error) { console.error("[WebSocket] HTTP fallback failed:", error); throw error; } } private createPendingMessage(message: any): PendingMessage { const id = this.generateMessageId(); const timestamp = Date.now(); return { id, data: JSON.stringify({ ...message, messageId: id, timestamp }), retries: 0, timestamp, }; } private sendDirect(pendingMsg: PendingMessage): void { this.ws?.send(pendingMsg.data); this.pendingConfirmations.set(pendingMsg.id, pendingMsg); } private enqueue(pendingMsg: PendingMessage): void { this.sendQueue.push(pendingMsg); } private flushSendQueue(): void { if (!this.isConnected()) return; while (this.sendQueue.length > 0 && this.isConnected()) { const pendingMsg = this.sendQueue.shift(); if (pendingMsg) { this.sendDirect(pendingMsg); } } } // ============ 消息重试机制 ============ private startRetryChecker(): void { this.stopRetryChecker(); this.retryCheckerId = window.setInterval(() => { this.checkPendingMessages(); }, RETRY_CHECK_INTERVAL); } private stopRetryChecker(): void { if (this.retryCheckerId) { clearInterval(this.retryCheckerId); this.retryCheckerId = null; } } private checkPendingMessages(): void { const now = Date.now(); const { retryIntervals } = this.opts; for (const [id, msg] of this.pendingConfirmations.entries()) { const age = now - msg.timestamp; const waitTime = retryIntervals[msg.retries] ?? retryIntervals[0]; // 未到重试时间 if (age < waitTime) continue; // 超过最大重试次数,降级使用 HTTP if (msg.retries >= retryIntervals.length) { this.pendingConfirmations.delete(id); try { const message = JSON.parse(msg.data); this.sendViaHttp(message).catch((error) => { console.error("[WebSocket] HTTP fallback failed, re-queuing message:", error); this.sendQueue.push(msg); }); } catch (error) { console.error("[WebSocket] Failed to parse message for HTTP fallback:", error); this.sendQueue.push(msg); } useLoadingStore().setLoading(false); continue; } // 重试发送 if (this.isConnected()) { this.ws?.send(msg.data); msg.retries++; } } } // ============ 工具方法 ============ private generateMessageId(): string { return `${Date.now()}-${Math.random().toString(36).substring(2, 11)}`; } // ============ 事件系统 ============ on(event: string, callback: Function): void { if (!this.listeners[event]) { this.listeners[event] = []; } this.listeners[event].push(callback); } off(event: string): void { delete this.listeners[event]; } private emit(event: string, data: any): void { this.listeners[event]?.forEach(callback => callback(data)); } // ============ 浏览器事件监听 ============ private setupBrowserListeners(): void { // 页面可见性变化 document.addEventListener("visibilitychange", () => { if (document.visibilityState === "visible") { this.reconnectAttempts = 0; if (!this.isConnectingOrOpen()) { this.init(); } } }); // 网络状态变化 window.addEventListener("online", () => { this.reconnectAttempts = 0; if (!this.isConnectingOrOpen()) { this.init(); } }); } } // ============ 导出 ============ function useSocket(url: string, opts?: SocketOptions) { const socket = new Socket(url, opts); return { socket, send: socket.send.bind(socket), on: socket.on.bind(socket), off: socket.off.bind(socket), }; } export { useSocket, Socket }; export type { SocketOptions, PendingMessage };