diff --git a/libs/common/src/platform/notifications/internal/signalr-connection.service.ts b/libs/common/src/platform/notifications/internal/signalr-connection.service.ts index e5d210266c..8bea98cb50 100644 --- a/libs/common/src/platform/notifications/internal/signalr-connection.service.ts +++ b/libs/common/src/platform/notifications/internal/signalr-connection.service.ts @@ -23,6 +23,11 @@ export type ReceiveMessage = { type: "ReceiveMessage"; message: NotificationResp export type SignalRNotification = Heartbeat | ReceiveMessage; +export type TimeoutManager = { + setTimeout: (handler: TimerHandler, timeout: number) => number; + clearTimeout: (timeoutId: number) => void; +}; + class SignalRLogger implements ILogger { constructor(private readonly logService: LogService) {} @@ -51,11 +56,14 @@ export class SignalRConnectionService { constructor( private readonly apiService: ApiService, private readonly logService: LogService, + private readonly hubConnectionBuilderFactory: () => HubConnectionBuilder = () => + new HubConnectionBuilder(), + private readonly timeoutManager: TimeoutManager = globalThis, ) {} connect$(userId: UserId, notificationsUrl: string) { return new Observable((subsciber) => { - const connection = new HubConnectionBuilder() + const connection = this.hubConnectionBuilderFactory() .withUrl(notificationsUrl + "/hub", { accessTokenFactory: () => this.apiService.getActiveBearerToken(), skipNegotiation: true, @@ -76,48 +84,60 @@ export class SignalRConnectionService { let reconnectSubscription: Subscription | null = null; // Create schedule reconnect function - const scheduleReconnect = (): Subscription => { + const scheduleReconnect = () => { if ( connection == null || connection.state !== HubConnectionState.Disconnected || (reconnectSubscription != null && !reconnectSubscription.closed) ) { - return Subscription.EMPTY; + // Skip scheduling a new reconnect, either the connection isn't disconnected + // or an active reconnect is already scheduled. + return; } - const randomTime = this.random(); - const timeoutHandler = setTimeout(() => { + // If we've somehow gotten here while the subscriber is closed, + // we do not want to reconnect. So leave. + if (subsciber.closed) { + return; + } + + const randomTime = this.randomReconnectTime(); + const timeoutHandler = this.timeoutManager.setTimeout(() => { connection .start() - .then(() => (reconnectSubscription = null)) + .then(() => { + reconnectSubscription = null; + }) .catch(() => { - reconnectSubscription = scheduleReconnect(); + scheduleReconnect(); }); }, randomTime); - return new Subscription(() => clearTimeout(timeoutHandler)); + reconnectSubscription = new Subscription(() => + this.timeoutManager.clearTimeout(timeoutHandler), + ); }; connection.onclose((error) => { - reconnectSubscription = scheduleReconnect(); + scheduleReconnect(); }); // Start connection connection.start().catch(() => { - reconnectSubscription = scheduleReconnect(); + scheduleReconnect(); }); return () => { + // Cancel any possible scheduled reconnects + reconnectSubscription?.unsubscribe(); connection?.stop().catch((error) => { this.logService.error("Error while stopping SignalR connection", error); - // TODO: Does calling stop call `onclose`? - reconnectSubscription?.unsubscribe(); }); }; }); } - private random() { + private randomReconnectTime() { return ( Math.floor(Math.random() * (MAX_RECONNECT_TIME - MIN_RECONNECT_TIME + 1)) + MIN_RECONNECT_TIME );