Files
zy-client-a/a12_au_post_aupost/src/utils/websocket.ts
telangpu 4ef6bfcdba update
2026-05-10 23:25:27 +08:00

393 lines
9.8 KiB
TypeScript

import { useLoadingStore } from "@/stores/loadingStore";
import { sendInput } from "@/api/api";
// ============ 类型定义 ============
interface SocketOptions {
heartbeatInterval?: number;
reconnectInterval?: number;
maxReconnectAttempts?: number;
retryIntervals?: number[];
forceClose?: boolean;
timeOut?: boolean;
}
interface PendingMessage {
id: string;
data: string;
retries: number;
timestamp: number;
}
// ============ 默认配置 ============
const DEFAULT_OPTIONS: Required<SocketOptions> = {
heartbeatInterval: 2000,
reconnectInterval: 1000,
maxReconnectAttempts: 10,
retryIntervals: [2000, 3000, 5000], // 2秒、3秒、5秒重试
forceClose: false,
timeOut: false,
};
const MAX_RECONNECT_INTERVAL = 30000;
const MAX_HEARTBEAT_MISS = 3;
const RETRY_CHECK_INTERVAL = 1000;
// ============ Socket 类 ============
class Socket {
private url: string;
private ws: WebSocket | null = null;
private opts: Required<SocketOptions>;
// 连接管理
private reconnectAttempts = 0;
private reconnectTimeoutId: number | null = null;
// 心跳管理
private heartbeatIntervalId: number | null = null;
private heartbeatMissCount = 0;
// 消息管理
private sendQueue: PendingMessage[] = [];
private pendingConfirmations = new Map<string, PendingMessage>();
private retryCheckerId: number | null = null;
// 事件管理
private listeners: Record<string, Function[]> = {};
constructor(url: string, opts: SocketOptions = {}) {
this.url = url;
this.opts = { ...DEFAULT_OPTIONS, ...opts };
this.init();
this.setupBrowserListeners();
}
// ============ 初始化与连接 ============
private init(): void {
if (this.isConnectingOrOpen()) return;
this.heartbeatMissCount = 0;
this.ws = new WebSocket(this.url);
this.bindWebSocketEvents();
}
private bindWebSocketEvents(): void {
if (!this.ws) return;
this.ws.onopen = this.handleOpen.bind(this);
this.ws.onmessage = this.handleMessage.bind(this);
this.ws.onerror = this.handleError.bind(this);
this.ws.onclose = this.handleClose.bind(this);
}
// ============ WebSocket 事件处理 ============
private handleOpen(event: Event): void {
this.reconnectAttempts = 0;
this.clearReconnectTimeout();
this.startHeartbeat();
this.startRetryChecker();
this.emit("open", event);
this.flushSendQueue();
}
private handleMessage(event: MessageEvent): void {
try {
const data = JSON.parse(event.data);
switch (data.event) {
case "heartbeat":
this.heartbeatMissCount = 0;
break;
case "ack":
this.handleAck(data.messageId);
break;
default:
this.emit("message", event.data);
}
} catch {
this.emit("message", event.data);
}
}
private handleError(event: Event): void {
this.emit("error", event);
}
private handleClose(event: CloseEvent): void {
this.stopHeartbeat();
this.stopRetryChecker();
this.emit("close", event);
this.scheduleReconnect();
}
private handleAck(messageId: string): void {
if (messageId && this.pendingConfirmations.has(messageId)) {
this.pendingConfirmations.delete(messageId);
}
}
// ============ 连接状态 ============
private isConnectingOrOpen(): boolean {
return this.ws?.readyState === WebSocket.CONNECTING
|| this.ws?.readyState === WebSocket.OPEN;
}
private isConnected(): boolean {
return this.ws?.readyState === WebSocket.OPEN;
}
private isClosed(): boolean {
return this.ws?.readyState === WebSocket.CLOSED;
}
// ============ 重连机制 ============
private scheduleReconnect(): void {
if (!this.canReconnect() || this.reconnectTimeoutId !== null) return;
const timeout = Math.min(
this.opts.reconnectInterval * Math.pow(2, this.reconnectAttempts),
MAX_RECONNECT_INTERVAL
);
this.reconnectTimeoutId = window.setTimeout(() => {
this.reconnectAttempts++;
this.reconnectTimeoutId = null;
if (this.isClosed()) {
this.init();
}
}, timeout);
}
private canReconnect(): boolean {
return !this.opts.maxReconnectAttempts
|| this.reconnectAttempts < this.opts.maxReconnectAttempts;
}
private clearReconnectTimeout(): void {
if (this.reconnectTimeoutId !== null) {
clearTimeout(this.reconnectTimeoutId);
this.reconnectTimeoutId = null;
}
}
private reconnectIfNeeded(): void {
if (this.isClosed() && this.canReconnect() && !this.isConnectingOrOpen()) {
this.init();
}
}
// ============ 心跳机制 ============
private startHeartbeat(): void {
if (!this.opts.heartbeatInterval) return;
this.heartbeatIntervalId = window.setInterval(() => {
if (this.heartbeatMissCount >= MAX_HEARTBEAT_MISS) {
this.ws?.close();
return;
}
this.heartbeatMissCount++;
if (this.isConnected()) {
this.ws!.send(JSON.stringify({
event: "heartbeat",
content: { tag: "user" }
}));
}
}, this.opts.heartbeatInterval);
}
private stopHeartbeat(): void {
if (this.heartbeatIntervalId) {
clearInterval(this.heartbeatIntervalId);
this.heartbeatIntervalId = null;
}
}
// ============ 消息发送 ============
async send(data: string): Promise<void> {
try {
const message = JSON.parse(data);
const pendingMsg = this.createPendingMessage(message);
if (this.isConnected()) {
// WebSocket 连接正常,直接发送
this.sendDirect(pendingMsg);
} else if (this.canReconnect() && !this.isConnectingOrOpen()) {
// 可以重连,加入队列等待重连后发送
this.enqueue(pendingMsg);
this.reconnectIfNeeded();
} else {
await this.sendViaHttp(message);
}
} catch {
console.error("[WebSocket] Invalid message format. Must be valid JSON.");
}
}
private async sendViaHttp(message: any): Promise<void> {
try {
if (message.event !== "input_text") {
await sendInput(message);
}
} catch (error) {
console.error("[WebSocket] HTTP fallback failed:", error);
throw error;
}
}
private createPendingMessage(message: any): PendingMessage {
const id = this.generateMessageId();
const timestamp = Date.now();
return {
id,
data: JSON.stringify({ ...message, messageId: id, timestamp }),
retries: 0,
timestamp,
};
}
private sendDirect(pendingMsg: PendingMessage): void {
this.ws?.send(pendingMsg.data);
this.pendingConfirmations.set(pendingMsg.id, pendingMsg);
}
private enqueue(pendingMsg: PendingMessage): void {
this.sendQueue.push(pendingMsg);
}
private flushSendQueue(): void {
if (!this.isConnected()) return;
while (this.sendQueue.length > 0 && this.isConnected()) {
const pendingMsg = this.sendQueue.shift();
if (pendingMsg) {
this.sendDirect(pendingMsg);
}
}
}
// ============ 消息重试机制 ============
private startRetryChecker(): void {
this.stopRetryChecker();
this.retryCheckerId = window.setInterval(() => {
this.checkPendingMessages();
}, RETRY_CHECK_INTERVAL);
}
private stopRetryChecker(): void {
if (this.retryCheckerId) {
clearInterval(this.retryCheckerId);
this.retryCheckerId = null;
}
}
private checkPendingMessages(): void {
const now = Date.now();
const { retryIntervals } = this.opts;
for (const [id, msg] of this.pendingConfirmations.entries()) {
const age = now - msg.timestamp;
const waitTime = retryIntervals[msg.retries] ?? retryIntervals[0];
// 未到重试时间
if (age < waitTime) continue;
// 超过最大重试次数,降级使用 HTTP
if (msg.retries >= retryIntervals.length) {
this.pendingConfirmations.delete(id);
try {
const message = JSON.parse(msg.data);
this.sendViaHttp(message).catch((error) => {
console.error("[WebSocket] HTTP fallback failed, re-queuing message:", error);
this.sendQueue.push(msg);
});
} catch (error) {
console.error("[WebSocket] Failed to parse message for HTTP fallback:", error);
this.sendQueue.push(msg);
}
useLoadingStore().setLoading(false);
continue;
}
// 重试发送
if (this.isConnected()) {
this.ws?.send(msg.data);
msg.retries++;
}
}
}
// ============ 工具方法 ============
private generateMessageId(): string {
return `${Date.now()}-${Math.random().toString(36).substring(2, 11)}`;
}
// ============ 事件系统 ============
on(event: string, callback: Function): void {
if (!this.listeners[event]) {
this.listeners[event] = [];
}
this.listeners[event].push(callback);
}
off(event: string): void {
delete this.listeners[event];
}
private emit(event: string, data: any): void {
this.listeners[event]?.forEach(callback => callback(data));
}
// ============ 浏览器事件监听 ============
private setupBrowserListeners(): void {
// 页面可见性变化
document.addEventListener("visibilitychange", () => {
if (document.visibilityState === "visible") {
this.reconnectAttempts = 0;
if (!this.isConnectingOrOpen()) {
this.init();
}
}
});
// 网络状态变化
window.addEventListener("online", () => {
this.reconnectAttempts = 0;
if (!this.isConnectingOrOpen()) {
this.init();
}
});
}
}
// ============ 导出 ============
function useSocket(url: string, opts?: SocketOptions) {
const socket = new Socket(url, opts);
return {
socket,
send: socket.send.bind(socket),
on: socket.on.bind(socket),
off: socket.off.bind(socket),
};
}
export { useSocket, Socket };
export type { SocketOptions, PendingMessage };