Calls: WebSocket reconnection support (#6543)

Co-authored-by: Christopher Poile <cpoile@gmail.com>
This commit is contained in:
Claudio Costa
2022-08-10 17:12:39 +02:00
committed by GitHub
parent d575b536bb
commit a2a3f4940d
2 changed files with 113 additions and 29 deletions

View File

@@ -14,11 +14,12 @@ import {
import {CallsConnection} from '@calls/types/calls';
import {WebsocketEvents} from '@constants';
import {getServerCredentials} from '@init/credentials';
import NetworkManager from '@managers/network_manager';
import {logError} from '@utils/log';
import {logError, logDebug} from '@utils/log';
import Peer from './simple-peer';
import WebSocketClient from './websocket_client';
import {WebSocketClient, wsReconnectionTimeoutErr} from './websocket_client';
const websocketConnectTimeout = 3000;
@@ -46,15 +47,21 @@ export async function newConnection(serverUrl: string, channelID: string, closeC
// getClient can throw an error, which will be handled by the caller.
const client = NetworkManager.getClient(serverUrl);
const ws = new WebSocketClient(serverUrl, client.getWebSocketUrl());
const credentials = await getServerCredentials(serverUrl);
const ws = new WebSocketClient(serverUrl, client.getWebSocketUrl(), credentials?.token);
// Throws an error, to be caught by caller.
await ws.initialize();
const disconnect = () => {
if (!isClosed) {
ws.close();
if (isClosed) {
return;
}
isClosed = true;
ws.send('leave');
ws.close();
if (onCallEnd) {
onCallEnd.remove();
@@ -143,13 +150,14 @@ export async function newConnection(serverUrl: string, channelID: string, closeC
};
ws.on('error', (err: Event) => {
logError('WS (CALLS):', err);
ws.close();
logDebug('calls: ws error', err);
if (err === wsReconnectionTimeoutErr) {
disconnect();
}
});
ws.on('close', () => {
isClosed = true;
disconnect();
logDebug('calls: ws close');
});
ws.on('join', async () => {
@@ -184,14 +192,30 @@ export async function newConnection(serverUrl: string, channelID: string, closeC
});
peer.on('error', (err: any) => {
logError('FROM PEER:', err);
logError('calls: peer error:', err);
});
peer.on('close', () => {
logDebug('calls: peer closed');
if (!isClosed) {
disconnect();
}
});
});
ws.on('open', async () => {
ws.send('join', {
channelID,
});
ws.on('open', (originalConnID: string, prevConnID: string, isReconnect: boolean) => {
if (isReconnect) {
logDebug('calls: ws reconnect, sending reconnect msg');
ws.send('reconnect', {
channelID,
originalConnID,
prevConnID,
});
} else {
ws.send('join', {
channelID,
});
}
});
ws.on('message', ({data}: { data: string }) => {

View File

@@ -10,21 +10,33 @@ import DatabaseManager from '@database/manager';
import {getCommonSystemValues} from '@queries/servers/system';
import {logError} from '@utils/log';
export default class WebSocketClient extends EventEmitter {
const wsMinReconnectRetryTimeMs = 1000; // 1 second
const wsReconnectionTimeout = 30000; // 30 seconds
const wsReconnectTimeIncrement = 500; // 0.5 seconds
export const wsReconnectionTimeoutErr = new Error('max disconnected time reached');
export class WebSocketClient extends EventEmitter {
private readonly serverUrl: string;
private readonly wsPath: string;
private authToken: string;
private ws: WebSocket | null = null;
private seqNo = 0;
private seqNo = 1;
private serverSeqNo = 0;
private connID = '';
private originalConnID = '';
private lastDisconnect = 0;
private reconnectRetryTime = wsMinReconnectRetryTimeMs;
private closed = false;
private eventPrefix = `custom_${Calls.PluginId}`;
constructor(serverUrl: string, wsPath: string) {
constructor(serverUrl: string, wsPath: string, authToken?: string) {
super();
this.serverUrl = serverUrl;
this.wsPath = wsPath;
this.authToken = authToken || '';
}
async initialize() {
private async init(isReconnect: boolean) {
const database = DatabaseManager.serverDatabases[this.serverUrl]?.database;
if (!database) {
return;
@@ -33,17 +45,25 @@ export default class WebSocketClient extends EventEmitter {
const system = await getCommonSystemValues(database);
const connectionUrl = (system.config.WebsocketURL || this.serverUrl) + this.wsPath;
this.ws = new WebSocket(connectionUrl);
this.ws = new WebSocket(`${connectionUrl}?connection_id=${this.connID}&sequence_number=${this.serverSeqNo}`, [], {headers: {authorization: `Bearer ${this.authToken}`}});
if (isReconnect) {
this.ws.onopen = () => {
this.lastDisconnect = 0;
this.reconnectRetryTime = wsMinReconnectRetryTimeMs;
this.emit('open', this.originalConnID, this.connID, true);
};
}
this.ws.onerror = (err) => {
this.emit('error', err);
this.ws = null;
this.close();
};
this.ws.onclose = () => {
this.ws = null;
this.close();
if (!this.closed) {
this.close();
}
};
this.ws.onmessage = ({data}) => {
@@ -54,7 +74,12 @@ export default class WebSocketClient extends EventEmitter {
try {
msg = JSON.parse(data);
} catch (err) {
logError(err);
logError('calls: ws msg parse error', err);
return;
}
if (msg) {
this.serverSeqNo = msg.seq + 1;
}
if (!msg || !msg.event || !msg.data) {
@@ -62,14 +87,22 @@ export default class WebSocketClient extends EventEmitter {
}
if (msg.event === 'hello') {
this.connID = msg.data.connection_id;
this.emit('open');
if (msg.data.connection_id !== this.connID) {
this.connID = msg.data.connection_id;
this.serverSeqNo = 0;
if (this.originalConnID === '') {
this.originalConnID = this.connID;
}
}
if (!isReconnect) {
this.emit('open', this.originalConnID, this.connID, false);
}
return;
} else if (!this.connID) {
return;
}
if (msg.data.connID !== this.connID) {
if (msg.data.connID !== this.connID && msg.data.connID !== this.originalConnID) {
return;
}
@@ -87,6 +120,10 @@ export default class WebSocketClient extends EventEmitter {
};
}
initialize() {
return this.init(false);
}
send(action: string, data?: Object, binary?: boolean) {
const msg = {
action: `${this.eventPrefix}_${action}`,
@@ -104,12 +141,35 @@ export default class WebSocketClient extends EventEmitter {
close() {
if (this.ws) {
this.closed = true;
this.ws.close();
this.ws = null;
this.seqNo = 1;
this.serverSeqNo = 0;
this.connID = '';
this.originalConnID = '';
} else {
this.emit('close');
const now = Date.now();
if (this.lastDisconnect === 0) {
this.lastDisconnect = now;
}
if ((now - this.lastDisconnect) >= wsReconnectionTimeout) {
this.closed = true;
this.emit('error', wsReconnectionTimeoutErr);
return;
}
setTimeout(() => {
if (!this.ws && !this.closed) {
this.init(true);
}
}, this.reconnectRetryTime);
this.reconnectRetryTime += wsReconnectTimeIncrement;
}
this.seqNo = 0;
this.connID = '';
this.emit('close');
}
state() {