From 281918a7b9911688dad536a93b392bea33b169a7 Mon Sep 17 00:00:00 2001 From: Patrick-Pimentel-Bitwarden Date: Wed, 3 Sep 2025 17:01:45 -0400 Subject: [PATCH] feat(inactive-user-server-notification): [PM-25130] Inactive User Server Notify (#16151) * feat(inactive-user-server-notification): [PM-25130] Inactive User Server Notify - Adds in tests and feature for notifying inactive users. * feat(inactive-user-server-notification): [PM-25130] Inactive User Server Notify - Added feature flag. * fix(inactive-user-server-notification): [PM-25130] Inactive User Server Notify - Implemented trackedMerge. --- libs/common/src/enums/feature-flag.enum.ts | 2 + .../src/platform/misc/rxjs-operators.ts | 89 ++++- ...ult-server-notifications.multiuser.spec.ts | 313 ++++++++++++++++++ ...ault-server-notifications.service.spec.ts} | 24 +- .../default-server-notifications.service.ts | 79 ++++- 5 files changed, 482 insertions(+), 25 deletions(-) create mode 100644 libs/common/src/platform/server-notifications/internal/default-server-notifications.multiuser.spec.ts rename libs/common/src/platform/server-notifications/internal/{default-notifications.service.spec.ts => default-server-notifications.service.spec.ts} (94%) diff --git a/libs/common/src/enums/feature-flag.enum.ts b/libs/common/src/enums/feature-flag.enum.ts index 00bd8c4c207..d5f1f5f4fd7 100644 --- a/libs/common/src/enums/feature-flag.enum.ts +++ b/libs/common/src/enums/feature-flag.enum.ts @@ -50,6 +50,7 @@ export enum FeatureFlag { /* Platform */ IpcChannelFramework = "ipc-channel-framework", + InactiveUserServerNotification = "pm-25130-receive-push-notifications-for-inactive-users", PushNotificationsWhenLocked = "pm-19388-push-notifications-when-locked", } @@ -107,6 +108,7 @@ export const DefaultFeatureFlagValue = { /* Platform */ [FeatureFlag.IpcChannelFramework]: FALSE, + [FeatureFlag.InactiveUserServerNotification]: FALSE, [FeatureFlag.PushNotificationsWhenLocked]: FALSE, } satisfies Record; diff --git a/libs/common/src/platform/misc/rxjs-operators.ts b/libs/common/src/platform/misc/rxjs-operators.ts index b3c4423c36f..6c767d3458d 100644 --- a/libs/common/src/platform/misc/rxjs-operators.ts +++ b/libs/common/src/platform/misc/rxjs-operators.ts @@ -1,4 +1,4 @@ -import { map } from "rxjs"; +import { map, Observable, OperatorFunction, Subscription } from "rxjs"; /** * An rxjs operator that extracts an object by ID from an array of objects. @@ -19,3 +19,90 @@ export const getByIds = (ids: TId[]) => { return objects.filter((o) => o.id && idSet.has(o.id)); }); }; + +/** + * A merge-like operator that takes a Set of primitives and tracks if they've been + * seen before. + * + * An emitted set that looks like `["1", "2"]` will call selector and subscribe to the resulting + * observable for both `"1"` and `"2"` but if the next emission contains just `["1"]` then the + * subscription created for `"2"` will be unsubscribed from and the observable for `"1"` will be + * left alone. If the following emission a set like `["1", "2", "3"]` then the subscription for + * `"1"` is still left alone, `"2"` has a selector called for it again, and `"3"` has a selector + * called for it the first time. If an empty set is emitted then all items are unsubscribed from. + * + * Since this operator will keep track of an observable for `n` number of items given to it. It is + * smartest to only use this on sets that you know will only get so large. + * + * *IMPORTANT NOTE* + * This observable may not be super friendly to very quick emissions/near parallel execution. + */ +export function trackedMerge( + selector: (value: T) => Observable, +): OperatorFunction, E> { + return (source: Observable>) => { + // Setup a Map to track all inner subscriptions + const tracked: Map = new Map(); + + const cleanupTracked = () => { + for (const [, trackedSub] of tracked.entries()) { + trackedSub.unsubscribe(); + } + tracked.clear(); + }; + + return new Observable((subscriber) => { + const sourceSub = source.subscribe({ + next: (values) => { + // Loop through the subscriptions we are tracking, if the new list + // doesn't have any of those values, we should clean them up. + for (const value of tracked.keys()) { + if (!values.has(value)) { + // Tracked item is no longer in the list, cleanup + tracked.get(value)?.unsubscribe(); + tracked.delete(value); + continue; + } + + // We are already tracking something for this key, remove it + values.delete(value); + } + + for (const newKey of values.keys()) { + // These are new entries, create and track subscription for them + tracked.set( + newKey, + /* eslint-disable-next-line rxjs/no-nested-subscribe */ + selector(newKey).subscribe({ + next: (innerValue) => { + subscriber.next(innerValue); + }, + error: (err: unknown) => { + // TODO: Do I need to call cleanupTracked or will calling error run my teardown logic below? + subscriber.error(err); + }, + complete: () => { + tracked.delete(newKey); + }, + }), + ); + } + }, + error: (err: unknown) => { + // TODO: Do I need to call cleanupTracked or will calling error run my teardown logic below? + subscriber.error(err); + }, + complete: () => { + // TODO: Do I need to call cleanupTracked or will calling complete run my teardown logic below? + cleanupTracked(); + subscriber.complete(); + }, + }); + + return () => { + cleanupTracked(); + sourceSub.unsubscribe(); + }; + }); + }; +} diff --git a/libs/common/src/platform/server-notifications/internal/default-server-notifications.multiuser.spec.ts b/libs/common/src/platform/server-notifications/internal/default-server-notifications.multiuser.spec.ts new file mode 100644 index 00000000000..a70623783dc --- /dev/null +++ b/libs/common/src/platform/server-notifications/internal/default-server-notifications.multiuser.spec.ts @@ -0,0 +1,313 @@ +import { mock, MockProxy } from "jest-mock-extended"; +import { BehaviorSubject, bufferCount, firstValueFrom, Subject, ObservedValueOf } from "rxjs"; + +// eslint-disable-next-line no-restricted-imports +import { LogoutReason } from "@bitwarden/auth/common"; +import { AuthRequestAnsweringServiceAbstraction } from "@bitwarden/common/auth/abstractions/auth-request-answering/auth-request-answering.service.abstraction"; +import { FeatureFlag } from "@bitwarden/common/enums/feature-flag.enum"; + +import { AccountService } from "../../../auth/abstractions/account.service"; +import { AuthService } from "../../../auth/abstractions/auth.service"; +import { AuthenticationStatus } from "../../../auth/enums/authentication-status"; +import { NotificationType } from "../../../enums"; +import { NotificationResponse } from "../../../models/response/notification.response"; +import { UserId } from "../../../types/guid"; +import { AppIdService } from "../../abstractions/app-id.service"; +import { ConfigService } from "../../abstractions/config/config.service"; +import { Environment, EnvironmentService } from "../../abstractions/environment.service"; +import { LogService } from "../../abstractions/log.service"; +import { MessagingService } from "../../abstractions/messaging.service"; + +import { DefaultServerNotificationsService } from "./default-server-notifications.service"; +import { SignalRConnectionService } from "./signalr-connection.service"; +import { WebPushConnectionService, WebPushConnector } from "./webpush-connection.service"; + +describe("DefaultServerNotificationsService (multi-user)", () => { + let syncService: any; + let appIdService: MockProxy; + let environmentConfigurationService: MockProxy; + let userLogoutCallback: jest.Mock, [logoutReason: LogoutReason, userId: UserId]>; + let messagingService: MockProxy; + let accountService: MockProxy; + let signalRNotificationConnectionService: MockProxy; + let authService: MockProxy; + let webPushNotificationConnectionService: MockProxy; + let authRequestAnsweringService: MockProxy; + let configService: MockProxy; + + let activeUserAccount$: BehaviorSubject>; + let userAccounts$: BehaviorSubject>; + + let environmentConfiguration$: BehaviorSubject; + + let authenticationStatusByUser: Map>; + let webPushSupportStatusByUser: Map< + UserId, + BehaviorSubject< + { type: "supported"; service: WebPushConnector } | { type: "not-supported"; reason: string } + > + >; + + let connectionSubjectByUser: Map>; + let defaultServerNotificationsService: DefaultServerNotificationsService; + + const mockUserId1 = "user1" as UserId; + const mockUserId2 = "user2" as UserId; + + beforeEach(() => { + syncService = { + fullSync: jest.fn().mockResolvedValue(undefined), + syncUpsertCipher: jest.fn().mockResolvedValue(undefined), + syncDeleteCipher: jest.fn().mockResolvedValue(undefined), + syncUpsertFolder: jest.fn().mockResolvedValue(undefined), + syncDeleteFolder: jest.fn().mockResolvedValue(undefined), + syncUpsertSend: jest.fn().mockResolvedValue(undefined), + syncDeleteSend: jest.fn().mockResolvedValue(undefined), + }; + + appIdService = mock(); + appIdService.getAppId.mockResolvedValue("app-id"); + + environmentConfigurationService = mock(); + environmentConfiguration$ = new BehaviorSubject({ + getNotificationsUrl: () => "http://test.example.com", + } as Environment); + environmentConfigurationService.environment$ = environmentConfiguration$ as any; + // Ensure user-scoped environment lookups return the same test environment stream + environmentConfigurationService.getEnvironment$.mockImplementation( + (_userId: UserId) => environmentConfiguration$.asObservable() as any, + ); + + userLogoutCallback = jest.fn, [LogoutReason, UserId]>(); + + messagingService = mock(); + + accountService = mock(); + activeUserAccount$ = new BehaviorSubject>( + null, + ); + accountService.activeAccount$ = activeUserAccount$.asObservable(); + userAccounts$ = new BehaviorSubject>({} as any); + accountService.accounts$ = userAccounts$.asObservable(); + + signalRNotificationConnectionService = mock(); + connectionSubjectByUser = new Map(); + signalRNotificationConnectionService.connect$.mockImplementation( + (userId: UserId, _url: string) => { + if (!connectionSubjectByUser.has(userId)) { + connectionSubjectByUser.set(userId, new Subject()); + } + return connectionSubjectByUser.get(userId)!.asObservable(); + }, + ); + + authService = mock(); + authenticationStatusByUser = new Map(); + authService.authStatusFor$.mockImplementation((userId: UserId) => { + if (!authenticationStatusByUser.has(userId)) { + authenticationStatusByUser.set( + userId, + new BehaviorSubject(AuthenticationStatus.LoggedOut), + ); + } + return authenticationStatusByUser.get(userId)!.asObservable(); + }); + + webPushNotificationConnectionService = mock(); + webPushSupportStatusByUser = new Map(); + webPushNotificationConnectionService.supportStatus$.mockImplementation((userId: UserId) => { + if (!webPushSupportStatusByUser.has(userId)) { + webPushSupportStatusByUser.set( + userId, + new BehaviorSubject({ type: "not-supported", reason: "init" } as any), + ); + } + return webPushSupportStatusByUser.get(userId)!.asObservable(); + }); + + authRequestAnsweringService = mock(); + + configService = mock(); + configService.getFeatureFlag$.mockImplementation((flag: FeatureFlag) => { + const flagValueByFlag: Partial> = { + [FeatureFlag.InactiveUserServerNotification]: true, + }; + return new BehaviorSubject(flagValueByFlag[flag] ?? false) as any; + }); + + defaultServerNotificationsService = new DefaultServerNotificationsService( + mock(), + syncService, + appIdService, + environmentConfigurationService, + userLogoutCallback, + messagingService, + accountService, + signalRNotificationConnectionService, + authService, + webPushNotificationConnectionService, + authRequestAnsweringService, + configService, + ); + }); + + function setActiveUserAccount(userId: UserId | null) { + if (userId == null) { + activeUserAccount$.next(null); + } else { + activeUserAccount$.next({ + id: userId, + email: "email", + name: "Test Name", + emailVerified: true, + }); + } + } + + function addUserAccount(userId: UserId) { + const currentAccounts = (userAccounts$.getValue() as Record) ?? {}; + userAccounts$.next({ + ...currentAccounts, + [userId]: { email: "email", name: "Test Name", emailVerified: true }, + } as any); + } + + function setUserUnlocked(userId: UserId) { + if (!authenticationStatusByUser.has(userId)) { + authenticationStatusByUser.set( + userId, + new BehaviorSubject(AuthenticationStatus.LoggedOut), + ); + } + authenticationStatusByUser.get(userId)!.next(AuthenticationStatus.Unlocked); + } + + function setWebPushConnectorForUser(userId: UserId) { + const webPushConnector = mock(); + const notificationSubject = new Subject(); + webPushConnector.notifications$ = notificationSubject.asObservable(); + if (!webPushSupportStatusByUser.has(userId)) { + webPushSupportStatusByUser.set( + userId, + new BehaviorSubject({ type: "supported", service: webPushConnector } as any), + ); + } else { + webPushSupportStatusByUser + .get(userId)! + .next({ type: "supported", service: webPushConnector } as any); + } + return { webPushConnector, notificationSubject } as const; + } + + it("merges notification streams from multiple users", async () => { + addUserAccount(mockUserId1); + addUserAccount(mockUserId2); + setUserUnlocked(mockUserId1); + setUserUnlocked(mockUserId2); + setActiveUserAccount(mockUserId1); + + const user1WebPush = setWebPushConnectorForUser(mockUserId1); + const user2WebPush = setWebPushConnectorForUser(mockUserId2); + + const twoNotifications = firstValueFrom( + defaultServerNotificationsService.notifications$.pipe(bufferCount(2)), + ); + + user1WebPush.notificationSubject.next( + new NotificationResponse({ type: NotificationType.SyncFolderCreate }), + ); + user2WebPush.notificationSubject.next( + new NotificationResponse({ type: NotificationType.SyncFolderDelete }), + ); + + const notificationResults = await twoNotifications; + expect(notificationResults.length).toBe(2); + const [notification1, userA] = notificationResults[0]; + const [notification2, userB] = notificationResults[1]; + expect(userA === mockUserId1 || userA === mockUserId2).toBe(true); + expect(userB === mockUserId1 || userB === mockUserId2).toBe(true); + expect([NotificationType.SyncFolderCreate, NotificationType.SyncFolderDelete]).toContain( + notification1.type, + ); + expect([NotificationType.SyncFolderCreate, NotificationType.SyncFolderDelete]).toContain( + notification2.type, + ); + }); + + it("processes allowed multi-user notifications for non-active users (AuthRequest)", async () => { + addUserAccount(mockUserId1); + addUserAccount(mockUserId2); + setUserUnlocked(mockUserId1); + setUserUnlocked(mockUserId2); + setActiveUserAccount(mockUserId1); + + // Force SignalR path for user2 + if (!webPushSupportStatusByUser.has(mockUserId2)) { + webPushSupportStatusByUser.set( + mockUserId2, + new BehaviorSubject({ type: "not-supported", reason: "test" } as any), + ); + } else { + webPushSupportStatusByUser + .get(mockUserId2)! + .next({ type: "not-supported", reason: "test" } as any); + } + + // TODO: When PM-14943 goes in, uncomment + // authRequestAnsweringService.receivedPendingAuthRequest.mockResolvedValue(undefined as any); + + const subscription = defaultServerNotificationsService.startListening(); + + // Emit via SignalR connect$ for user2 + connectionSubjectByUser.get(mockUserId2)!.next({ + type: "ReceiveMessage", + message: new NotificationResponse({ + type: NotificationType.AuthRequest, + payload: { id: "auth-id-2", userId: mockUserId2 }, + }), + }); + + // allow async queue to drain + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(messagingService.send).toHaveBeenCalledWith("openLoginApproval", { + notificationId: "auth-id-2", + }); + + // TODO: When PM-14943 goes in, uncomment + // expect(authRequestAnsweringService.receivedPendingAuthRequest).toHaveBeenCalledWith( + // mockUserId2, + // "auth-id-2", + // ); + + subscription.unsubscribe(); + }); + + it("does not process restricted notification types for non-active users", async () => { + addUserAccount(mockUserId1); + addUserAccount(mockUserId2); + setUserUnlocked(mockUserId1); + setUserUnlocked(mockUserId2); + setActiveUserAccount(mockUserId1); + + const user1WebPush = setWebPushConnectorForUser(mockUserId1); + const user2WebPush = setWebPushConnectorForUser(mockUserId2); + + const subscription = defaultServerNotificationsService.startListening(); + + // Emit a folder create for non-active user (should be ignored) + user2WebPush.notificationSubject.next( + new NotificationResponse({ type: NotificationType.SyncFolderCreate }), + ); + // Emit a folder create for active user (should be processed) + user1WebPush.notificationSubject.next( + new NotificationResponse({ type: NotificationType.SyncFolderCreate }), + ); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(syncService.syncUpsertFolder).toHaveBeenCalledTimes(1); + + subscription.unsubscribe(); + }); +}); diff --git a/libs/common/src/platform/server-notifications/internal/default-notifications.service.spec.ts b/libs/common/src/platform/server-notifications/internal/default-server-notifications.service.spec.ts similarity index 94% rename from libs/common/src/platform/server-notifications/internal/default-notifications.service.spec.ts rename to libs/common/src/platform/server-notifications/internal/default-server-notifications.service.spec.ts index 2d12027e19f..a7b608f5b56 100644 --- a/libs/common/src/platform/server-notifications/internal/default-notifications.service.spec.ts +++ b/libs/common/src/platform/server-notifications/internal/default-server-notifications.service.spec.ts @@ -1,11 +1,10 @@ import { mock, MockProxy } from "jest-mock-extended"; -import { BehaviorSubject, bufferCount, firstValueFrom, ObservedValueOf, Subject } from "rxjs"; +import { BehaviorSubject, bufferCount, firstValueFrom, ObservedValueOf, of, Subject } from "rxjs"; // This import has been flagged as unallowed for this class. It may be involved in a circular dependency loop. // eslint-disable-next-line no-restricted-imports import { LogoutReason } from "@bitwarden/auth/common"; import { AuthRequestAnsweringServiceAbstraction } from "@bitwarden/common/auth/abstractions/auth-request-answering/auth-request-answering.service.abstraction"; -import { FeatureFlag } from "@bitwarden/common/enums/feature-flag.enum"; import { awaitAsync } from "../../../../spec"; import { Matrix } from "../../../../spec/matrix"; @@ -45,6 +44,7 @@ describe("NotificationsService", () => { let configService: MockProxy; let activeAccount: BehaviorSubject>; + let accounts: BehaviorSubject>; let environment: BehaviorSubject>; @@ -73,21 +73,23 @@ describe("NotificationsService", () => { configService = mock(); // For these tests, use the active-user implementation (feature flag disabled) - configService.getFeatureFlag$.mockImplementation((flag: FeatureFlag) => { - const flagValueByFlag: Partial> = { - [FeatureFlag.PushNotificationsWhenLocked]: true, - }; - return new BehaviorSubject(flagValueByFlag[flag] ?? false) as any; - }); + configService.getFeatureFlag$.mockImplementation(() => of(true)); activeAccount = new BehaviorSubject>(null); accountService.activeAccount$ = activeAccount.asObservable(); + accounts = new BehaviorSubject>({} as any); + accountService.accounts$ = accounts.asObservable(); + environment = new BehaviorSubject>({ getNotificationsUrl: () => "https://notifications.bitwarden.com", } as Environment); environmentService.environment$ = environment; + // Ensure user-scoped environment lookups return the same test environment stream + environmentService.getEnvironment$.mockImplementation( + (_userId: UserId) => environment.asObservable() as any, + ); authStatusGetter = Matrix.autoMockMethod( authService.authStatusFor$, @@ -130,8 +132,14 @@ describe("NotificationsService", () => { function emitActiveUser(userId: UserId | null) { if (userId == null) { activeAccount.next(null); + accounts.next({} as any); } else { activeAccount.next({ id: userId, email: "email", name: "Test Name", emailVerified: true }); + const current = (accounts.getValue() as Record) ?? {}; + accounts.next({ + ...current, + [userId]: { email: "email", name: "Test Name", emailVerified: true }, + } as any); } } diff --git a/libs/common/src/platform/server-notifications/internal/default-server-notifications.service.ts b/libs/common/src/platform/server-notifications/internal/default-server-notifications.service.ts index 89e88d645c6..e8ac93dc61f 100644 --- a/libs/common/src/platform/server-notifications/internal/default-server-notifications.service.ts +++ b/libs/common/src/platform/server-notifications/internal/default-server-notifications.service.ts @@ -17,8 +17,9 @@ import { import { LogoutReason } from "@bitwarden/auth/common"; import { AuthRequestAnsweringServiceAbstraction } from "@bitwarden/common/auth/abstractions/auth-request-answering/auth-request-answering.service.abstraction"; import { FeatureFlag } from "@bitwarden/common/enums/feature-flag.enum"; +import { trackedMerge } from "@bitwarden/common/platform/misc"; -import { AccountService } from "../../../auth/abstractions/account.service"; +import { AccountInfo, AccountService } from "../../../auth/abstractions/account.service"; import { AuthService } from "../../../auth/abstractions/auth.service"; import { AuthenticationStatus } from "../../../auth/enums/authentication-status"; import { NotificationType } from "../../../enums"; @@ -43,6 +44,10 @@ import { WebPushConnectionService } from "./webpush-connection.service"; export const DISABLED_NOTIFICATIONS_URL = "http://-"; +export const AllowedMultiUserNotificationTypes = new Set([ + NotificationType.AuthRequest, +]); + export class DefaultServerNotificationsService implements ServerNotificationsService { notifications$: Observable; @@ -62,21 +67,48 @@ export class DefaultServerNotificationsService implements ServerNotificationsSer private readonly authRequestAnsweringService: AuthRequestAnsweringServiceAbstraction, private readonly configService: ConfigService, ) { - this.notifications$ = this.accountService.activeAccount$.pipe( - map((account) => account?.id), - distinctUntilChanged(), - switchMap((activeAccountId) => { - if (activeAccountId == null) { - // We don't emit server-notifications for inactive accounts currently - return EMPTY; - } + this.notifications$ = this.configService + .getFeatureFlag$(FeatureFlag.InactiveUserServerNotification) + .pipe( + distinctUntilChanged(), + switchMap((inactiveUserServerNotificationEnabled) => { + if (inactiveUserServerNotificationEnabled) { + return this.accountService.accounts$.pipe( + map((accounts: Record): Set => { + const validUserIds = Object.entries(accounts) + .filter( + ([_, accountInfo]) => accountInfo.email !== "" || accountInfo.emailVerified, + ) + .map(([userId, _]) => userId as UserId); + return new Set(validUserIds); + }), + trackedMerge((id: UserId) => { + return this.userNotifications$(id as UserId).pipe( + map( + (notification: NotificationResponse) => [notification, id as UserId] as const, + ), + ); + }), + ); + } - return this.userNotifications$(activeAccountId).pipe( - map((notification) => [notification, activeAccountId] as const), - ); - }), - share(), // Multiple subscribers should only create a single connection to the server - ); + return this.accountService.activeAccount$.pipe( + map((account) => account?.id), + distinctUntilChanged(), + switchMap((activeAccountId) => { + if (activeAccountId == null) { + // We don't emit server-notifications for inactive accounts currently + return EMPTY; + } + + return this.userNotifications$(activeAccountId).pipe( + map((notification) => [notification, activeAccountId] as const), + ); + }), + ); + }), + share(), // Multiple subscribers should only create a single connection to the server + ); } /** @@ -84,7 +116,7 @@ export class DefaultServerNotificationsService implements ServerNotificationsSer * @param userId The user id of the user to get the push server notifications for. */ private userNotifications$(userId: UserId) { - return this.environmentService.environment$.pipe( + return this.environmentService.getEnvironment$(userId).pipe( map((env) => env.getNotificationsUrl()), distinctUntilChanged(), switchMap((notificationsUrl) => { @@ -171,6 +203,21 @@ export class DefaultServerNotificationsService implements ServerNotificationsSer return; } + if ( + await firstValueFrom( + this.configService.getFeatureFlag$(FeatureFlag.InactiveUserServerNotification), + ) + ) { + const activeAccountId = await firstValueFrom( + this.accountService.activeAccount$.pipe(map((a) => a?.id)), + ); + + const isActiveUser = activeAccountId === userId; + if (!isActiveUser && !AllowedMultiUserNotificationTypes.has(notification.type)) { + return; + } + } + switch (notification.type) { case NotificationType.SyncCipherCreate: case NotificationType.SyncCipherUpdate: