diff --git a/app/products/calls/connection/connection.ts b/app/products/calls/connection/connection.ts index dcef6e454a..33fee60dca 100644 --- a/app/products/calls/connection/connection.ts +++ b/app/products/calls/connection/connection.ts @@ -14,11 +14,12 @@ import { import {CallsConnection} from '@calls/types/calls'; import {WebsocketEvents} from '@constants'; +import {getServerCredentials} from '@init/credentials'; import NetworkManager from '@managers/network_manager'; -import {logError} from '@utils/log'; +import {logError, logDebug} from '@utils/log'; import Peer from './simple-peer'; -import WebSocketClient from './websocket_client'; +import {WebSocketClient, wsReconnectionTimeoutErr} from './websocket_client'; const websocketConnectTimeout = 3000; @@ -46,15 +47,21 @@ export async function newConnection(serverUrl: string, channelID: string, closeC // getClient can throw an error, which will be handled by the caller. const client = NetworkManager.getClient(serverUrl); - const ws = new WebSocketClient(serverUrl, client.getWebSocketUrl()); + const credentials = await getServerCredentials(serverUrl); + + const ws = new WebSocketClient(serverUrl, client.getWebSocketUrl(), credentials?.token); // Throws an error, to be caught by caller. await ws.initialize(); const disconnect = () => { - if (!isClosed) { - ws.close(); + if (isClosed) { + return; } + isClosed = true; + + ws.send('leave'); + ws.close(); if (onCallEnd) { onCallEnd.remove(); @@ -143,13 +150,14 @@ export async function newConnection(serverUrl: string, channelID: string, closeC }; ws.on('error', (err: Event) => { - logError('WS (CALLS):', err); - ws.close(); + logDebug('calls: ws error', err); + if (err === wsReconnectionTimeoutErr) { + disconnect(); + } }); ws.on('close', () => { - isClosed = true; - disconnect(); + logDebug('calls: ws close'); }); ws.on('join', async () => { @@ -184,14 +192,30 @@ export async function newConnection(serverUrl: string, channelID: string, closeC }); peer.on('error', (err: any) => { - logError('FROM PEER:', err); + logError('calls: peer error:', err); + }); + + peer.on('close', () => { + logDebug('calls: peer closed'); + if (!isClosed) { + disconnect(); + } }); }); - ws.on('open', async () => { - ws.send('join', { - channelID, - }); + ws.on('open', (originalConnID: string, prevConnID: string, isReconnect: boolean) => { + if (isReconnect) { + logDebug('calls: ws reconnect, sending reconnect msg'); + ws.send('reconnect', { + channelID, + originalConnID, + prevConnID, + }); + } else { + ws.send('join', { + channelID, + }); + } }); ws.on('message', ({data}: { data: string }) => { diff --git a/app/products/calls/connection/websocket_client.ts b/app/products/calls/connection/websocket_client.ts index ca25f7de45..f632348f1a 100644 --- a/app/products/calls/connection/websocket_client.ts +++ b/app/products/calls/connection/websocket_client.ts @@ -10,21 +10,33 @@ import DatabaseManager from '@database/manager'; import {getCommonSystemValues} from '@queries/servers/system'; import {logError} from '@utils/log'; -export default class WebSocketClient extends EventEmitter { +const wsMinReconnectRetryTimeMs = 1000; // 1 second +const wsReconnectionTimeout = 30000; // 30 seconds +const wsReconnectTimeIncrement = 500; // 0.5 seconds +export const wsReconnectionTimeoutErr = new Error('max disconnected time reached'); + +export class WebSocketClient extends EventEmitter { private readonly serverUrl: string; private readonly wsPath: string; + private authToken: string; private ws: WebSocket | null = null; - private seqNo = 0; + private seqNo = 1; + private serverSeqNo = 0; private connID = ''; + private originalConnID = ''; + private lastDisconnect = 0; + private reconnectRetryTime = wsMinReconnectRetryTimeMs; + private closed = false; private eventPrefix = `custom_${Calls.PluginId}`; - constructor(serverUrl: string, wsPath: string) { + constructor(serverUrl: string, wsPath: string, authToken?: string) { super(); this.serverUrl = serverUrl; this.wsPath = wsPath; + this.authToken = authToken || ''; } - async initialize() { + private async init(isReconnect: boolean) { const database = DatabaseManager.serverDatabases[this.serverUrl]?.database; if (!database) { return; @@ -33,17 +45,25 @@ export default class WebSocketClient extends EventEmitter { const system = await getCommonSystemValues(database); const connectionUrl = (system.config.WebsocketURL || this.serverUrl) + this.wsPath; - this.ws = new WebSocket(connectionUrl); + this.ws = new WebSocket(`${connectionUrl}?connection_id=${this.connID}&sequence_number=${this.serverSeqNo}`, [], {headers: {authorization: `Bearer ${this.authToken}`}}); + + if (isReconnect) { + this.ws.onopen = () => { + this.lastDisconnect = 0; + this.reconnectRetryTime = wsMinReconnectRetryTimeMs; + this.emit('open', this.originalConnID, this.connID, true); + }; + } this.ws.onerror = (err) => { this.emit('error', err); - this.ws = null; - this.close(); }; this.ws.onclose = () => { this.ws = null; - this.close(); + if (!this.closed) { + this.close(); + } }; this.ws.onmessage = ({data}) => { @@ -54,7 +74,12 @@ export default class WebSocketClient extends EventEmitter { try { msg = JSON.parse(data); } catch (err) { - logError(err); + logError('calls: ws msg parse error', err); + return; + } + + if (msg) { + this.serverSeqNo = msg.seq + 1; } if (!msg || !msg.event || !msg.data) { @@ -62,14 +87,22 @@ export default class WebSocketClient extends EventEmitter { } if (msg.event === 'hello') { - this.connID = msg.data.connection_id; - this.emit('open'); + if (msg.data.connection_id !== this.connID) { + this.connID = msg.data.connection_id; + this.serverSeqNo = 0; + if (this.originalConnID === '') { + this.originalConnID = this.connID; + } + } + if (!isReconnect) { + this.emit('open', this.originalConnID, this.connID, false); + } return; } else if (!this.connID) { return; } - if (msg.data.connID !== this.connID) { + if (msg.data.connID !== this.connID && msg.data.connID !== this.originalConnID) { return; } @@ -87,6 +120,10 @@ export default class WebSocketClient extends EventEmitter { }; } + initialize() { + return this.init(false); + } + send(action: string, data?: Object, binary?: boolean) { const msg = { action: `${this.eventPrefix}_${action}`, @@ -104,12 +141,35 @@ export default class WebSocketClient extends EventEmitter { close() { if (this.ws) { + this.closed = true; this.ws.close(); this.ws = null; + this.seqNo = 1; + this.serverSeqNo = 0; + this.connID = ''; + this.originalConnID = ''; + } else { + this.emit('close'); + + const now = Date.now(); + if (this.lastDisconnect === 0) { + this.lastDisconnect = now; + } + + if ((now - this.lastDisconnect) >= wsReconnectionTimeout) { + this.closed = true; + this.emit('error', wsReconnectionTimeoutErr); + return; + } + + setTimeout(() => { + if (!this.ws && !this.closed) { + this.init(true); + } + }, this.reconnectRetryTime); + + this.reconnectRetryTime += wsReconnectTimeIncrement; } - this.seqNo = 0; - this.connID = ''; - this.emit('close'); } state() {