diff --git a/libs/common/src/platform/server-notifications/internal/default-server-notifications.multiuser.spec.ts b/libs/common/src/platform/server-notifications/internal/default-server-notifications.multiuser.spec.ts new file mode 100644 index 00000000000..7c0baef0318 --- /dev/null +++ b/libs/common/src/platform/server-notifications/internal/default-server-notifications.multiuser.spec.ts @@ -0,0 +1,310 @@ +import { mock, MockProxy } from "jest-mock-extended"; +import { BehaviorSubject, bufferCount, firstValueFrom, Subject, ObservedValueOf } from "rxjs"; + +// eslint-disable-next-line no-restricted-imports +import { LogoutReason } from "@bitwarden/auth/common"; +// TODO: When PM-14943 goes in, uncomment +// import { AuthRequestAnsweringServiceAbstraction } from "@bitwarden/common/auth/abstractions/auth-request-answering/auth-request-answering.service.abstraction"; + +import { AccountService } from "../../../auth/abstractions/account.service"; +import { AuthService } from "../../../auth/abstractions/auth.service"; +import { AuthenticationStatus } from "../../../auth/enums/authentication-status"; +import { NotificationType } from "../../../enums"; +import { NotificationResponse } from "../../../models/response/notification.response"; +import { UserId } from "../../../types/guid"; +import { AppIdService } from "../../abstractions/app-id.service"; +import { ConfigService } from "../../abstractions/config/config.service"; +import { Environment, EnvironmentService } from "../../abstractions/environment.service"; +import { LogService } from "../../abstractions/log.service"; +import { MessagingService } from "../../abstractions/messaging.service"; + +import { DefaultServerNotificationsService } from "./default-server-notifications.service"; +import { SignalRConnectionService } from "./signalr-connection.service"; +import { WebPushConnectionService, WebPushConnector } from "./webpush-connection.service"; + +describe("DefaultServerNotificationsService (multi-user)", () => { + let syncService: any; + let appIdService: MockProxy; + let environmentConfigurationService: MockProxy; + let userLogoutCallback: jest.Mock, [logoutReason: LogoutReason, userId: UserId]>; + let messagingService: MockProxy; + let accountService: MockProxy; + let signalRNotificationConnectionService: MockProxy; + let authService: MockProxy; + let webPushNotificationConnectionService: MockProxy; + // TODO: When PM-14943 goes in, uncomment + // let authRequestAnsweringService: MockProxy; + let configurationService: MockProxy; + + let activeUserAccount$: BehaviorSubject>; + let userAccounts$: BehaviorSubject>; + + let environmentConfiguration$: BehaviorSubject; + + let authenticationStatusByUser: Map>; + let webPushSupportStatusByUser: Map< + UserId, + BehaviorSubject< + { type: "supported"; service: WebPushConnector } | { type: "not-supported"; reason: string } + > + >; + + let connectionSubjectByUser: Map>; + let defaultServerNotificationsService: DefaultServerNotificationsService; + + const mockUserId1 = "user1" as UserId; + const mockUserId2 = "user2" as UserId; + + beforeEach(() => { + syncService = { + fullSync: jest.fn().mockResolvedValue(undefined), + syncUpsertCipher: jest.fn().mockResolvedValue(undefined), + syncDeleteCipher: jest.fn().mockResolvedValue(undefined), + syncUpsertFolder: jest.fn().mockResolvedValue(undefined), + syncDeleteFolder: jest.fn().mockResolvedValue(undefined), + syncUpsertSend: jest.fn().mockResolvedValue(undefined), + syncDeleteSend: jest.fn().mockResolvedValue(undefined), + }; + + appIdService = mock(); + appIdService.getAppId.mockResolvedValue("app-id"); + + environmentConfigurationService = mock(); + environmentConfiguration$ = new BehaviorSubject({ + getNotificationsUrl: () => "http://test.example.com", + } as Environment); + environmentConfigurationService.environment$ = environmentConfiguration$ as any; + + userLogoutCallback = jest.fn, [LogoutReason, UserId]>(); + + messagingService = mock(); + + accountService = mock(); + activeUserAccount$ = new BehaviorSubject>( + null, + ); + accountService.activeAccount$ = activeUserAccount$.asObservable(); + userAccounts$ = new BehaviorSubject>({} as any); + accountService.accounts$ = userAccounts$.asObservable(); + + signalRNotificationConnectionService = mock(); + connectionSubjectByUser = new Map(); + (signalRNotificationConnectionService.connect$ as unknown as jest.Mock).mockImplementation( + (userId: UserId, _url: string) => { + if (!connectionSubjectByUser.has(userId)) { + connectionSubjectByUser.set(userId, new Subject()); + } + return connectionSubjectByUser.get(userId)!.asObservable(); + }, + ); + + authService = mock(); + authenticationStatusByUser = new Map(); + (authService.authStatusFor$ as unknown as jest.Mock).mockImplementation((userId: UserId) => { + if (!authenticationStatusByUser.has(userId)) { + authenticationStatusByUser.set( + userId, + new BehaviorSubject(AuthenticationStatus.LoggedOut), + ); + } + return authenticationStatusByUser.get(userId)!.asObservable(); + }); + + webPushNotificationConnectionService = mock(); + webPushSupportStatusByUser = new Map(); + ( + webPushNotificationConnectionService.supportStatus$ as unknown as jest.Mock + ).mockImplementation((userId: UserId) => { + if (!webPushSupportStatusByUser.has(userId)) { + webPushSupportStatusByUser.set( + userId, + new BehaviorSubject({ type: "not-supported", reason: "init" } as any), + ); + } + return webPushSupportStatusByUser.get(userId)!.asObservable(); + }); + + // TODO: When PM-14943 goes in, uncomment + // authRequestAnsweringService = mock(); + + configurationService = mock(); + configurationService.getFeatureFlag$.mockReturnValue(new BehaviorSubject(true) as any); + + defaultServerNotificationsService = new DefaultServerNotificationsService( + mock(), + syncService, + appIdService, + environmentConfigurationService, + userLogoutCallback, + messagingService, + accountService, + signalRNotificationConnectionService, + authService, + webPushNotificationConnectionService, + // authRequestAnsweringService, + // configurationService, + ); + }); + + function setActiveUserAccount(userId: UserId | null) { + if (userId == null) { + activeUserAccount$.next(null); + } else { + activeUserAccount$.next({ + id: userId, + email: "email", + name: "Test Name", + emailVerified: true, + }); + } + } + + function addUserAccount(userId: UserId) { + const currentAccounts = (userAccounts$.getValue() as Record) ?? {}; + userAccounts$.next({ + ...currentAccounts, + [userId]: { email: "email", name: "Test Name", emailVerified: true }, + } as any); + } + + function setUserUnlocked(userId: UserId) { + if (!authenticationStatusByUser.has(userId)) { + authenticationStatusByUser.set( + userId, + new BehaviorSubject(AuthenticationStatus.LoggedOut), + ); + } + authenticationStatusByUser.get(userId)!.next(AuthenticationStatus.Unlocked); + } + + function setWebPushConnectorForUser(userId: UserId) { + const webPushConnector = mock(); + const notificationSubject = new Subject(); + webPushConnector.notifications$ = notificationSubject.asObservable(); + if (!webPushSupportStatusByUser.has(userId)) { + webPushSupportStatusByUser.set( + userId, + new BehaviorSubject({ type: "supported", service: webPushConnector } as any), + ); + } else { + webPushSupportStatusByUser + .get(userId)! + .next({ type: "supported", service: webPushConnector } as any); + } + return { webPushConnector, notificationSubject } as const; + } + + it("merges notification streams from multiple users", async () => { + addUserAccount(mockUserId1); + addUserAccount(mockUserId2); + setUserUnlocked(mockUserId1); + setUserUnlocked(mockUserId2); + setActiveUserAccount(mockUserId1); + + const user1WebPush = setWebPushConnectorForUser(mockUserId1); + const user2WebPush = setWebPushConnectorForUser(mockUserId2); + + const twoNotifications = firstValueFrom( + defaultServerNotificationsService.notifications$.pipe(bufferCount(2)), + ); + + user1WebPush.notificationSubject.next( + new NotificationResponse({ type: NotificationType.SyncFolderCreate }), + ); + user2WebPush.notificationSubject.next( + new NotificationResponse({ type: NotificationType.SyncFolderDelete }), + ); + + const notificationResults = await twoNotifications; + expect(notificationResults.length).toBe(2); + const [notification1, userA] = notificationResults[0]; + const [notification2, userB] = notificationResults[1]; + expect(userA === mockUserId1 || userA === mockUserId2).toBe(true); + expect(userB === mockUserId1 || userB === mockUserId2).toBe(true); + expect([NotificationType.SyncFolderCreate, NotificationType.SyncFolderDelete]).toContain( + notification1.type, + ); + expect([NotificationType.SyncFolderCreate, NotificationType.SyncFolderDelete]).toContain( + notification2.type, + ); + }); + + it("processes allowed multi-user notifications for non-active users (AuthRequest)", async () => { + addUserAccount(mockUserId1); + addUserAccount(mockUserId2); + setUserUnlocked(mockUserId1); + setUserUnlocked(mockUserId2); + setActiveUserAccount(mockUserId1); + + // Force SignalR path for user2 + if (!webPushSupportStatusByUser.has(mockUserId2)) { + webPushSupportStatusByUser.set( + mockUserId2, + new BehaviorSubject({ type: "not-supported", reason: "test" } as any), + ); + } else { + webPushSupportStatusByUser + .get(mockUserId2)! + .next({ type: "not-supported", reason: "test" } as any); + } + + // TODO: When PM-14943 goes in, uncomment + // authRequestAnsweringService.receivedPendingAuthRequest.mockResolvedValue(undefined as any); + + const subscription = defaultServerNotificationsService.startListening(); + + // Emit via SignalR connect$ for user2 + connectionSubjectByUser + .get(mockUserId2)! + .next({ + type: "ReceiveMessage", + message: new NotificationResponse({ + type: NotificationType.AuthRequest, + payload: { id: "auth-id-2", userId: mockUserId2 }, + }), + }); + + // allow async queue to drain + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(messagingService.send).toHaveBeenCalledWith("openLoginApproval", { + notificationId: "auth-id-2", + }); + + // TODO: When PM-14943 goes in, uncomment + // expect(authRequestAnsweringService.receivedPendingAuthRequest).toHaveBeenCalledWith( + // mockUserId2, + // "auth-id-2", + // ); + + subscription.unsubscribe(); + }); + + it("does not process restricted notification types for non-active users", async () => { + addUserAccount(mockUserId1); + addUserAccount(mockUserId2); + setUserUnlocked(mockUserId1); + setUserUnlocked(mockUserId2); + setActiveUserAccount(mockUserId1); + + const user1WebPush = setWebPushConnectorForUser(mockUserId1); + const user2WebPush = setWebPushConnectorForUser(mockUserId2); + + const subscription = defaultServerNotificationsService.startListening(); + + // Emit a folder create for non-active user (should be ignored) + user2WebPush.notificationSubject.next( + new NotificationResponse({ type: NotificationType.SyncFolderCreate }), + ); + // Emit a folder create for active user (should be processed) + user1WebPush.notificationSubject.next( + new NotificationResponse({ type: NotificationType.SyncFolderCreate }), + ); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(syncService.syncUpsertFolder).toHaveBeenCalledTimes(1); + + subscription.unsubscribe(); + }); +}); diff --git a/libs/common/src/platform/server-notifications/internal/default-notifications.service.spec.ts b/libs/common/src/platform/server-notifications/internal/default-server-notifications.service.spec.ts similarity index 96% rename from libs/common/src/platform/server-notifications/internal/default-notifications.service.spec.ts rename to libs/common/src/platform/server-notifications/internal/default-server-notifications.service.spec.ts index 567e0fbfc3d..c0ea893d539 100644 --- a/libs/common/src/platform/server-notifications/internal/default-notifications.service.spec.ts +++ b/libs/common/src/platform/server-notifications/internal/default-server-notifications.service.spec.ts @@ -40,6 +40,7 @@ describe("NotificationsService", () => { let webPushNotificationConnectionService: MockProxy; let activeAccount: BehaviorSubject>; + let accounts: BehaviorSubject>; let environment: BehaviorSubject>; @@ -68,6 +69,9 @@ describe("NotificationsService", () => { activeAccount = new BehaviorSubject>(null); accountService.activeAccount$ = activeAccount.asObservable(); + accounts = new BehaviorSubject>({} as any); + accountService.accounts$ = accounts.asObservable(); + environment = new BehaviorSubject>({ getNotificationsUrl: () => "https://notifications.bitwarden.com", } as Environment); @@ -113,8 +117,14 @@ describe("NotificationsService", () => { function emitActiveUser(userId: UserId) { if (userId == null) { activeAccount.next(null); + accounts.next({} as any); } else { activeAccount.next({ id: userId, email: "email", name: "Test Name", emailVerified: true }); + const current = (accounts.getValue() as Record) ?? {}; + accounts.next({ + ...current, + [userId]: { email: "email", name: "Test Name", emailVerified: true }, + } as any); } } diff --git a/libs/common/src/platform/server-notifications/internal/default-server-notifications.service.ts b/libs/common/src/platform/server-notifications/internal/default-server-notifications.service.ts index 4502d9663a3..4758c06cf73 100644 --- a/libs/common/src/platform/server-notifications/internal/default-server-notifications.service.ts +++ b/libs/common/src/platform/server-notifications/internal/default-server-notifications.service.ts @@ -4,7 +4,9 @@ import { distinctUntilChanged, EMPTY, filter, + firstValueFrom, map, + merge, mergeMap, Observable, share, @@ -56,20 +58,20 @@ export class DefaultServerNotificationsService implements ServerNotificationsSer private readonly authService: AuthService, private readonly webPushConnectionService: WebPushConnectionService, ) { - this.notifications$ = this.accountService.activeAccount$.pipe( - map((account) => account?.id), - distinctUntilChanged(), - switchMap((activeAccountId) => { - if (activeAccountId == null) { - // We don't emit server-notifications for inactive accounts currently + this.notifications$ = this.accountService.accounts$.pipe( + map((accounts) => Object.keys(accounts) as UserId[]), + switchMap((userIds) => { + if (userIds.length === 0) { return EMPTY; } - return this.userNotifications$(activeAccountId).pipe( - map((notification) => [notification, activeAccountId] as const), + const streams = userIds.map((id) => + this.userNotifications$(id).pipe(map((notification) => [notification, id] as const)), ); + + return merge(...streams); }), - share(), // Multiple subscribers should only create a single connection to the server + share(), // Multiple subscribers should only create a single connection to the server per subscriber ); } @@ -154,6 +156,18 @@ export class DefaultServerNotificationsService implements ServerNotificationsSer return; } + // Allow-list of notification types that are safe to process for non-active users + const multiUserNotificationTypes = new Set([NotificationType.AuthRequest]); + + const activeAccountId = await firstValueFrom( + this.accountService.activeAccount$.pipe(map((a) => a?.id)), + ); + + const isActiveUser = activeAccountId === userId; + if (!isActiveUser && !multiUserNotificationTypes.has(notification.type)) { + return; + } + switch (notification.type) { case NotificationType.SyncCipherCreate: case NotificationType.SyncCipherUpdate: