mirror of
https://github.com/bitwarden/browser
synced 2025-12-20 10:13:31 +00:00
Feat PM-19877 System Notification Processing (#15611)
* feat(notification-processing): [PM-19877] System Notification Implementation - Minor changes to popup logic and removed content in login component. * docs(notification-processing): [PM-19877] System Notification Implementation - Added more docs. * docs(notification-processing): [PM-19877] System Notification Implementation - Added markdown document. * fix(notification-processing): [PM-19877] System Notification Implementation - Updated condition for if notification is supported. * fix(notification-processing): [PM-19877] System Notification Implementation - Updated services module with correct platform utils service.
This commit is contained in:
committed by
GitHub
parent
bcd73a9c00
commit
719a43d050
1
libs/common/src/platform/server-notifications/index.ts
Normal file
1
libs/common/src/platform/server-notifications/index.ts
Normal file
@@ -0,0 +1 @@
|
||||
export { ServerNotificationsService } from "./server-notifications.service";
|
||||
@@ -0,0 +1,323 @@
|
||||
import { mock, MockProxy } from "jest-mock-extended";
|
||||
import { BehaviorSubject, bufferCount, firstValueFrom, ObservedValueOf, Subject } from "rxjs";
|
||||
|
||||
// This import has been flagged as unallowed for this class. It may be involved in a circular dependency loop.
|
||||
// eslint-disable-next-line no-restricted-imports
|
||||
import { LogoutReason } from "@bitwarden/auth/common";
|
||||
|
||||
import { awaitAsync } from "../../../../spec";
|
||||
import { Matrix } from "../../../../spec/matrix";
|
||||
import { AccountService } from "../../../auth/abstractions/account.service";
|
||||
import { AuthService } from "../../../auth/abstractions/auth.service";
|
||||
import { AuthenticationStatus } from "../../../auth/enums/authentication-status";
|
||||
import { NotificationType } from "../../../enums";
|
||||
import { NotificationResponse } from "../../../models/response/notification.response";
|
||||
import { UserId } from "../../../types/guid";
|
||||
import { AppIdService } from "../../abstractions/app-id.service";
|
||||
import { Environment, EnvironmentService } from "../../abstractions/environment.service";
|
||||
import { LogService } from "../../abstractions/log.service";
|
||||
import { MessageSender } from "../../messaging";
|
||||
import { SupportStatus } from "../../misc/support-status";
|
||||
import { SyncService } from "../../sync";
|
||||
|
||||
import {
|
||||
DefaultServerNotificationsService,
|
||||
DISABLED_NOTIFICATIONS_URL,
|
||||
} from "./default-server-notifications.service";
|
||||
import { SignalRConnectionService, SignalRNotification } from "./signalr-connection.service";
|
||||
import { WebPushConnectionService, WebPushConnector } from "./webpush-connection.service";
|
||||
import { WorkerWebPushConnectionService } from "./worker-webpush-connection.service";
|
||||
|
||||
describe("NotificationsService", () => {
|
||||
let syncService: MockProxy<SyncService>;
|
||||
let appIdService: MockProxy<AppIdService>;
|
||||
let environmentService: MockProxy<EnvironmentService>;
|
||||
let logoutCallback: jest.Mock<Promise<void>, [logoutReason: LogoutReason]>;
|
||||
let messagingService: MockProxy<MessageSender>;
|
||||
let accountService: MockProxy<AccountService>;
|
||||
let signalRNotificationConnectionService: MockProxy<SignalRConnectionService>;
|
||||
let authService: MockProxy<AuthService>;
|
||||
let webPushNotificationConnectionService: MockProxy<WebPushConnectionService>;
|
||||
|
||||
let activeAccount: BehaviorSubject<ObservedValueOf<AccountService["activeAccount$"]>>;
|
||||
|
||||
let environment: BehaviorSubject<ObservedValueOf<EnvironmentService["environment$"]>>;
|
||||
|
||||
let authStatusGetter: (userId: UserId) => BehaviorSubject<AuthenticationStatus>;
|
||||
|
||||
let webPushSupportGetter: (userId: UserId) => BehaviorSubject<SupportStatus<WebPushConnector>>;
|
||||
|
||||
let signalrNotificationGetter: (
|
||||
userId: UserId,
|
||||
notificationsUrl: string,
|
||||
) => Subject<SignalRNotification>;
|
||||
|
||||
let sut: DefaultServerNotificationsService;
|
||||
|
||||
beforeEach(() => {
|
||||
syncService = mock<SyncService>();
|
||||
appIdService = mock<AppIdService>();
|
||||
environmentService = mock<EnvironmentService>();
|
||||
logoutCallback = jest.fn<Promise<void>, [logoutReason: LogoutReason]>();
|
||||
messagingService = mock<MessageSender>();
|
||||
accountService = mock<AccountService>();
|
||||
signalRNotificationConnectionService = mock<SignalRConnectionService>();
|
||||
authService = mock<AuthService>();
|
||||
webPushNotificationConnectionService = mock<WorkerWebPushConnectionService>();
|
||||
|
||||
activeAccount = new BehaviorSubject<ObservedValueOf<AccountService["activeAccount$"]>>(null);
|
||||
accountService.activeAccount$ = activeAccount.asObservable();
|
||||
|
||||
environment = new BehaviorSubject<ObservedValueOf<EnvironmentService["environment$"]>>({
|
||||
getNotificationsUrl: () => "https://notifications.bitwarden.com",
|
||||
} as Environment);
|
||||
|
||||
environmentService.environment$ = environment;
|
||||
|
||||
authStatusGetter = Matrix.autoMockMethod(
|
||||
authService.authStatusFor$,
|
||||
() => new BehaviorSubject<AuthenticationStatus>(AuthenticationStatus.LoggedOut),
|
||||
);
|
||||
|
||||
webPushSupportGetter = Matrix.autoMockMethod(
|
||||
webPushNotificationConnectionService.supportStatus$,
|
||||
() =>
|
||||
new BehaviorSubject<SupportStatus<WebPushConnector>>({
|
||||
type: "not-supported",
|
||||
reason: "test",
|
||||
}),
|
||||
);
|
||||
|
||||
signalrNotificationGetter = Matrix.autoMockMethod(
|
||||
signalRNotificationConnectionService.connect$,
|
||||
() => new Subject<SignalRNotification>(),
|
||||
);
|
||||
|
||||
sut = new DefaultServerNotificationsService(
|
||||
mock<LogService>(),
|
||||
syncService,
|
||||
appIdService,
|
||||
environmentService,
|
||||
logoutCallback,
|
||||
messagingService,
|
||||
accountService,
|
||||
signalRNotificationConnectionService,
|
||||
authService,
|
||||
webPushNotificationConnectionService,
|
||||
);
|
||||
});
|
||||
|
||||
const mockUser1 = "user1" as UserId;
|
||||
const mockUser2 = "user2" as UserId;
|
||||
|
||||
function emitActiveUser(userId: UserId) {
|
||||
if (userId == null) {
|
||||
activeAccount.next(null);
|
||||
} else {
|
||||
activeAccount.next({ id: userId, email: "email", name: "Test Name", emailVerified: true });
|
||||
}
|
||||
}
|
||||
|
||||
function emitNotificationUrl(url: string) {
|
||||
environment.next({
|
||||
getNotificationsUrl: () => url,
|
||||
} as Environment);
|
||||
}
|
||||
|
||||
const expectNotification = (
|
||||
notification: readonly [NotificationResponse, UserId],
|
||||
expectedUser: UserId,
|
||||
expectedType: NotificationType,
|
||||
) => {
|
||||
const [actualNotification, actualUser] = notification;
|
||||
expect(actualUser).toBe(expectedUser);
|
||||
expect(actualNotification.type).toBe(expectedType);
|
||||
};
|
||||
|
||||
it("emits server notifications through WebPush when supported", async () => {
|
||||
const notificationsPromise = firstValueFrom(sut.notifications$.pipe(bufferCount(2)));
|
||||
|
||||
emitActiveUser(mockUser1);
|
||||
emitNotificationUrl("http://test.example.com");
|
||||
authStatusGetter(mockUser1).next(AuthenticationStatus.Unlocked);
|
||||
|
||||
const webPush = mock<WebPushConnector>();
|
||||
const webPushSubject = new Subject<NotificationResponse>();
|
||||
webPush.notifications$ = webPushSubject;
|
||||
|
||||
webPushSupportGetter(mockUser1).next({ type: "supported", service: webPush });
|
||||
webPushSubject.next(new NotificationResponse({ type: NotificationType.SyncFolderCreate }));
|
||||
webPushSubject.next(new NotificationResponse({ type: NotificationType.SyncFolderDelete }));
|
||||
|
||||
const notifications = await notificationsPromise;
|
||||
expectNotification(notifications[0], mockUser1, NotificationType.SyncFolderCreate);
|
||||
expectNotification(notifications[1], mockUser1, NotificationType.SyncFolderDelete);
|
||||
});
|
||||
|
||||
it("switches to SignalR when web push is not supported.", async () => {
|
||||
const notificationsPromise = firstValueFrom(sut.notifications$.pipe(bufferCount(2)));
|
||||
|
||||
emitActiveUser(mockUser1);
|
||||
emitNotificationUrl("http://test.example.com");
|
||||
authStatusGetter(mockUser1).next(AuthenticationStatus.Unlocked);
|
||||
|
||||
const webPush = mock<WebPushConnector>();
|
||||
const webPushSubject = new Subject<NotificationResponse>();
|
||||
webPush.notifications$ = webPushSubject;
|
||||
|
||||
webPushSupportGetter(mockUser1).next({ type: "supported", service: webPush });
|
||||
webPushSubject.next(new NotificationResponse({ type: NotificationType.SyncFolderCreate }));
|
||||
|
||||
emitActiveUser(mockUser2);
|
||||
authStatusGetter(mockUser2).next(AuthenticationStatus.Unlocked);
|
||||
// Second user does not support web push
|
||||
webPushSupportGetter(mockUser2).next({ type: "not-supported", reason: "test" });
|
||||
|
||||
signalrNotificationGetter(mockUser2, "http://test.example.com").next({
|
||||
type: "ReceiveMessage",
|
||||
message: new NotificationResponse({ type: NotificationType.SyncCipherUpdate }),
|
||||
});
|
||||
|
||||
const notifications = await notificationsPromise;
|
||||
expectNotification(notifications[0], mockUser1, NotificationType.SyncFolderCreate);
|
||||
expectNotification(notifications[1], mockUser2, NotificationType.SyncCipherUpdate);
|
||||
});
|
||||
|
||||
it("switches to WebPush when it becomes supported.", async () => {
|
||||
const notificationsPromise = firstValueFrom(sut.notifications$.pipe(bufferCount(2)));
|
||||
|
||||
emitActiveUser(mockUser1);
|
||||
emitNotificationUrl("http://test.example.com");
|
||||
authStatusGetter(mockUser1).next(AuthenticationStatus.Unlocked);
|
||||
webPushSupportGetter(mockUser1).next({ type: "not-supported", reason: "test" });
|
||||
|
||||
signalrNotificationGetter(mockUser1, "http://test.example.com").next({
|
||||
type: "ReceiveMessage",
|
||||
message: new NotificationResponse({ type: NotificationType.AuthRequest }),
|
||||
});
|
||||
|
||||
const webPush = mock<WebPushConnector>();
|
||||
const webPushSubject = new Subject<NotificationResponse>();
|
||||
webPush.notifications$ = webPushSubject;
|
||||
|
||||
webPushSupportGetter(mockUser1).next({ type: "supported", service: webPush });
|
||||
webPushSubject.next(new NotificationResponse({ type: NotificationType.SyncLoginDelete }));
|
||||
|
||||
const notifications = await notificationsPromise;
|
||||
expectNotification(notifications[0], mockUser1, NotificationType.AuthRequest);
|
||||
expectNotification(notifications[1], mockUser1, NotificationType.SyncLoginDelete);
|
||||
});
|
||||
|
||||
it("does not emit SignalR heartbeats", async () => {
|
||||
const notificationsPromise = firstValueFrom(sut.notifications$.pipe(bufferCount(1)));
|
||||
|
||||
emitActiveUser(mockUser1);
|
||||
emitNotificationUrl("http://test.example.com");
|
||||
authStatusGetter(mockUser1).next(AuthenticationStatus.Unlocked);
|
||||
webPushSupportGetter(mockUser1).next({ type: "not-supported", reason: "test" });
|
||||
|
||||
signalrNotificationGetter(mockUser1, "http://test.example.com").next({ type: "Heartbeat" });
|
||||
signalrNotificationGetter(mockUser1, "http://test.example.com").next({
|
||||
type: "ReceiveMessage",
|
||||
message: new NotificationResponse({ type: NotificationType.AuthRequestResponse }),
|
||||
});
|
||||
|
||||
const notifications = await notificationsPromise;
|
||||
expectNotification(notifications[0], mockUser1, NotificationType.AuthRequestResponse);
|
||||
});
|
||||
|
||||
it.each([
|
||||
// Temporarily rolling back server notifications being connected while locked
|
||||
// { initialStatus: AuthenticationStatus.Locked, updatedStatus: AuthenticationStatus.Unlocked },
|
||||
// { initialStatus: AuthenticationStatus.Unlocked, updatedStatus: AuthenticationStatus.Locked },
|
||||
// { initialStatus: AuthenticationStatus.Locked, updatedStatus: AuthenticationStatus.Locked },
|
||||
{ initialStatus: AuthenticationStatus.Unlocked, updatedStatus: AuthenticationStatus.Unlocked },
|
||||
])(
|
||||
"does not re-connect when the user transitions from $initialStatus to $updatedStatus",
|
||||
async ({ initialStatus, updatedStatus }) => {
|
||||
emitActiveUser(mockUser1);
|
||||
emitNotificationUrl("http://test.example.com");
|
||||
authStatusGetter(mockUser1).next(initialStatus);
|
||||
webPushSupportGetter(mockUser1).next({ type: "not-supported", reason: "test" });
|
||||
|
||||
const notificationsSubscriptions = sut.notifications$.subscribe();
|
||||
await awaitAsync(1);
|
||||
|
||||
authStatusGetter(mockUser1).next(updatedStatus);
|
||||
await awaitAsync(1);
|
||||
|
||||
expect(signalRNotificationConnectionService.connect$).toHaveBeenCalledTimes(1);
|
||||
expect(signalRNotificationConnectionService.connect$).toHaveBeenCalledWith(
|
||||
mockUser1,
|
||||
"http://test.example.com",
|
||||
);
|
||||
notificationsSubscriptions.unsubscribe();
|
||||
},
|
||||
);
|
||||
|
||||
it.each([
|
||||
// Temporarily disabling server notifications connecting while in a locked state
|
||||
// AuthenticationStatus.Locked,
|
||||
AuthenticationStatus.Unlocked,
|
||||
])(
|
||||
"connects when a user transitions from logged out to %s",
|
||||
async (newStatus: AuthenticationStatus) => {
|
||||
emitActiveUser(mockUser1);
|
||||
emitNotificationUrl("http://test.example.com");
|
||||
authStatusGetter(mockUser1).next(AuthenticationStatus.LoggedOut);
|
||||
webPushSupportGetter(mockUser1).next({ type: "not-supported", reason: "test" });
|
||||
|
||||
const notificationsSubscriptions = sut.notifications$.subscribe();
|
||||
await awaitAsync(1);
|
||||
|
||||
authStatusGetter(mockUser1).next(newStatus);
|
||||
await awaitAsync(1);
|
||||
|
||||
expect(signalRNotificationConnectionService.connect$).toHaveBeenCalledTimes(1);
|
||||
expect(signalRNotificationConnectionService.connect$).toHaveBeenCalledWith(
|
||||
mockUser1,
|
||||
"http://test.example.com",
|
||||
);
|
||||
notificationsSubscriptions.unsubscribe();
|
||||
},
|
||||
);
|
||||
|
||||
it("does not connect to any notification stream when server notifications are disabled through special url", () => {
|
||||
const subscription = sut.notifications$.subscribe();
|
||||
emitActiveUser(mockUser1);
|
||||
emitNotificationUrl(DISABLED_NOTIFICATIONS_URL);
|
||||
|
||||
expect(signalRNotificationConnectionService.connect$).not.toHaveBeenCalled();
|
||||
expect(webPushNotificationConnectionService.supportStatus$).not.toHaveBeenCalled();
|
||||
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
|
||||
it("does not connect to any notification stream when there is no active user", () => {
|
||||
const subscription = sut.notifications$.subscribe();
|
||||
emitActiveUser(null);
|
||||
|
||||
expect(signalRNotificationConnectionService.connect$).not.toHaveBeenCalled();
|
||||
expect(webPushNotificationConnectionService.supportStatus$).not.toHaveBeenCalled();
|
||||
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
|
||||
it("does not reconnect if the same notification url is emitted", async () => {
|
||||
const subscription = sut.notifications$.subscribe();
|
||||
|
||||
emitActiveUser(mockUser1);
|
||||
emitNotificationUrl("http://test.example.com");
|
||||
authStatusGetter(mockUser1).next(AuthenticationStatus.Unlocked);
|
||||
|
||||
await awaitAsync(1);
|
||||
|
||||
expect(webPushNotificationConnectionService.supportStatus$).toHaveBeenCalledTimes(1);
|
||||
emitNotificationUrl("http://test.example.com");
|
||||
|
||||
await awaitAsync(1);
|
||||
|
||||
expect(webPushNotificationConnectionService.supportStatus$).toHaveBeenCalledTimes(1);
|
||||
subscription.unsubscribe();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,251 @@
|
||||
import {
|
||||
BehaviorSubject,
|
||||
catchError,
|
||||
distinctUntilChanged,
|
||||
EMPTY,
|
||||
filter,
|
||||
map,
|
||||
mergeMap,
|
||||
Observable,
|
||||
share,
|
||||
switchMap,
|
||||
} from "rxjs";
|
||||
|
||||
// This import has been flagged as unallowed for this class. It may be involved in a circular dependency loop.
|
||||
// eslint-disable-next-line no-restricted-imports
|
||||
import { LogoutReason } from "@bitwarden/auth/common";
|
||||
|
||||
import { AccountService } from "../../../auth/abstractions/account.service";
|
||||
import { AuthService } from "../../../auth/abstractions/auth.service";
|
||||
import { AuthenticationStatus } from "../../../auth/enums/authentication-status";
|
||||
import { NotificationType } from "../../../enums";
|
||||
import {
|
||||
NotificationResponse,
|
||||
SyncCipherNotification,
|
||||
SyncFolderNotification,
|
||||
SyncSendNotification,
|
||||
} from "../../../models/response/notification.response";
|
||||
import { UserId } from "../../../types/guid";
|
||||
import { SyncService } from "../../../vault/abstractions/sync/sync.service.abstraction";
|
||||
import { AppIdService } from "../../abstractions/app-id.service";
|
||||
import { EnvironmentService } from "../../abstractions/environment.service";
|
||||
import { LogService } from "../../abstractions/log.service";
|
||||
import { MessagingService } from "../../abstractions/messaging.service";
|
||||
import { supportSwitch } from "../../misc/support-status";
|
||||
import { ServerNotificationsService } from "../server-notifications.service";
|
||||
|
||||
import { ReceiveMessage, SignalRConnectionService } from "./signalr-connection.service";
|
||||
import { WebPushConnectionService } from "./webpush-connection.service";
|
||||
|
||||
export const DISABLED_NOTIFICATIONS_URL = "http://-";
|
||||
|
||||
export class DefaultServerNotificationsService implements ServerNotificationsService {
|
||||
notifications$: Observable<readonly [NotificationResponse, UserId]>;
|
||||
|
||||
private activitySubject = new BehaviorSubject<"active" | "inactive">("active");
|
||||
|
||||
constructor(
|
||||
private readonly logService: LogService,
|
||||
private syncService: SyncService,
|
||||
private appIdService: AppIdService,
|
||||
private environmentService: EnvironmentService,
|
||||
private logoutCallback: (logoutReason: LogoutReason, userId: UserId) => Promise<void>,
|
||||
private messagingService: MessagingService,
|
||||
private readonly accountService: AccountService,
|
||||
private readonly signalRConnectionService: SignalRConnectionService,
|
||||
private readonly authService: AuthService,
|
||||
private readonly webPushConnectionService: WebPushConnectionService,
|
||||
) {
|
||||
this.notifications$ = this.accountService.activeAccount$.pipe(
|
||||
map((account) => account?.id),
|
||||
distinctUntilChanged(),
|
||||
switchMap((activeAccountId) => {
|
||||
if (activeAccountId == null) {
|
||||
// We don't emit server-notifications for inactive accounts currently
|
||||
return EMPTY;
|
||||
}
|
||||
|
||||
return this.userNotifications$(activeAccountId).pipe(
|
||||
map((notification) => [notification, activeAccountId] as const),
|
||||
);
|
||||
}),
|
||||
share(), // Multiple subscribers should only create a single connection to the server
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves a stream of push server notifications for the given user.
|
||||
* @param userId The user id of the user to get the push server notifications for.
|
||||
*/
|
||||
private userNotifications$(userId: UserId) {
|
||||
return this.environmentService.environment$.pipe(
|
||||
map((env) => env.getNotificationsUrl()),
|
||||
distinctUntilChanged(),
|
||||
switchMap((notificationsUrl) => {
|
||||
if (notificationsUrl === DISABLED_NOTIFICATIONS_URL) {
|
||||
return EMPTY;
|
||||
}
|
||||
|
||||
return this.userNotificationsHelper$(userId, notificationsUrl);
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
private userNotificationsHelper$(userId: UserId, notificationsUrl: string) {
|
||||
return this.hasAccessToken$(userId).pipe(
|
||||
switchMap((hasAccessToken) => {
|
||||
if (!hasAccessToken) {
|
||||
return EMPTY;
|
||||
}
|
||||
|
||||
return this.activitySubject;
|
||||
}),
|
||||
switchMap((activityStatus) => {
|
||||
if (activityStatus === "inactive") {
|
||||
return EMPTY;
|
||||
}
|
||||
|
||||
return this.webPushConnectionService.supportStatus$(userId);
|
||||
}),
|
||||
supportSwitch({
|
||||
supported: (service) => {
|
||||
this.logService.info("Using WebPush for server notifications");
|
||||
return service.notifications$.pipe(
|
||||
catchError((err: unknown) => {
|
||||
this.logService.warning("Issue with web push, falling back to SignalR", err);
|
||||
return this.connectSignalR$(userId, notificationsUrl);
|
||||
}),
|
||||
);
|
||||
},
|
||||
notSupported: () => {
|
||||
this.logService.info("Using SignalR for server notifications");
|
||||
return this.connectSignalR$(userId, notificationsUrl);
|
||||
},
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
private connectSignalR$(userId: UserId, notificationsUrl: string) {
|
||||
return this.signalRConnectionService.connect$(userId, notificationsUrl).pipe(
|
||||
filter((n) => n.type === "ReceiveMessage"),
|
||||
map((n) => (n as ReceiveMessage).message),
|
||||
);
|
||||
}
|
||||
|
||||
// This method name is a lie currently as we also have an access token
|
||||
// when locked, this is eventually where we want to be but it increases load
|
||||
// on signalR so we are rolling back until we can move the load of browser to
|
||||
// web push.
|
||||
private hasAccessToken$(userId: UserId) {
|
||||
return this.authService.authStatusFor$(userId).pipe(
|
||||
map((authStatus) => authStatus === AuthenticationStatus.Unlocked),
|
||||
distinctUntilChanged(),
|
||||
);
|
||||
}
|
||||
|
||||
private async processNotification(notification: NotificationResponse, userId: UserId) {
|
||||
const appId = await this.appIdService.getAppId();
|
||||
if (notification == null || notification.contextId === appId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const payloadUserId = notification.payload?.userId || notification.payload?.UserId;
|
||||
if (payloadUserId != null && payloadUserId !== userId) {
|
||||
return;
|
||||
}
|
||||
|
||||
switch (notification.type) {
|
||||
case NotificationType.SyncCipherCreate:
|
||||
case NotificationType.SyncCipherUpdate:
|
||||
await this.syncService.syncUpsertCipher(
|
||||
notification.payload as SyncCipherNotification,
|
||||
notification.type === NotificationType.SyncCipherUpdate,
|
||||
userId,
|
||||
);
|
||||
break;
|
||||
case NotificationType.SyncCipherDelete:
|
||||
case NotificationType.SyncLoginDelete:
|
||||
await this.syncService.syncDeleteCipher(
|
||||
notification.payload as SyncCipherNotification,
|
||||
userId,
|
||||
);
|
||||
break;
|
||||
case NotificationType.SyncFolderCreate:
|
||||
case NotificationType.SyncFolderUpdate:
|
||||
await this.syncService.syncUpsertFolder(
|
||||
notification.payload as SyncFolderNotification,
|
||||
notification.type === NotificationType.SyncFolderUpdate,
|
||||
userId,
|
||||
);
|
||||
break;
|
||||
case NotificationType.SyncFolderDelete:
|
||||
await this.syncService.syncDeleteFolder(
|
||||
notification.payload as SyncFolderNotification,
|
||||
userId,
|
||||
);
|
||||
break;
|
||||
case NotificationType.SyncVault:
|
||||
case NotificationType.SyncCiphers:
|
||||
case NotificationType.SyncSettings:
|
||||
await this.syncService.fullSync(false);
|
||||
break;
|
||||
case NotificationType.SyncOrganizations:
|
||||
// An organization update may not have bumped the user's account revision date, so force a sync
|
||||
await this.syncService.fullSync(true);
|
||||
break;
|
||||
case NotificationType.SyncOrgKeys:
|
||||
await this.syncService.fullSync(true);
|
||||
this.activitySubject.next("inactive"); // Force a disconnect
|
||||
this.activitySubject.next("active"); // Allow a reconnect
|
||||
break;
|
||||
case NotificationType.LogOut:
|
||||
this.logService.info("[Notifications Service] Received logout notification");
|
||||
await this.logoutCallback("logoutNotification", userId);
|
||||
break;
|
||||
case NotificationType.SyncSendCreate:
|
||||
case NotificationType.SyncSendUpdate:
|
||||
await this.syncService.syncUpsertSend(
|
||||
notification.payload as SyncSendNotification,
|
||||
notification.type === NotificationType.SyncSendUpdate,
|
||||
);
|
||||
break;
|
||||
case NotificationType.SyncSendDelete:
|
||||
await this.syncService.syncDeleteSend(notification.payload as SyncSendNotification);
|
||||
break;
|
||||
case NotificationType.AuthRequest:
|
||||
// create notification
|
||||
|
||||
this.messagingService.send("openLoginApproval", {
|
||||
notificationId: notification.payload.id,
|
||||
});
|
||||
break;
|
||||
case NotificationType.SyncOrganizationStatusChanged:
|
||||
await this.syncService.fullSync(true);
|
||||
break;
|
||||
case NotificationType.SyncOrganizationCollectionSettingChanged:
|
||||
await this.syncService.fullSync(true);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
startListening() {
|
||||
return this.notifications$
|
||||
.pipe(
|
||||
mergeMap(async ([notification, userId]) => this.processNotification(notification, userId)),
|
||||
)
|
||||
.subscribe({
|
||||
error: (e: unknown) =>
|
||||
this.logService.warning("Error in server notifications$ observable", e),
|
||||
});
|
||||
}
|
||||
|
||||
reconnectFromActivity(): void {
|
||||
this.activitySubject.next("active");
|
||||
}
|
||||
|
||||
disconnectFromInactivity(): void {
|
||||
this.activitySubject.next("inactive");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
export * from "./worker-webpush-connection.service";
|
||||
export * from "./signalr-connection.service";
|
||||
export * from "./default-server-notifications.service";
|
||||
export * from "./noop-server-notifications.service";
|
||||
export * from "./unsupported-webpush-connection.service";
|
||||
export * from "./webpush-connection.service";
|
||||
export * from "./websocket-webpush-connection.service";
|
||||
export * from "./web-push-notifications-api.service";
|
||||
@@ -0,0 +1,28 @@
|
||||
import { Observable, Subject, Subscription } from "rxjs";
|
||||
|
||||
import { NotificationResponse } from "@bitwarden/common/models/response/notification.response";
|
||||
import { UserId } from "@bitwarden/common/types/guid";
|
||||
|
||||
import { LogService } from "../../abstractions/log.service";
|
||||
import { ServerNotificationsService } from "../server-notifications.service";
|
||||
|
||||
export class NoopServerNotificationsService implements ServerNotificationsService {
|
||||
notifications$: Observable<readonly [NotificationResponse, UserId]> = new Subject();
|
||||
|
||||
constructor(private logService: LogService) {}
|
||||
|
||||
startListening(): Subscription {
|
||||
this.logService.info(
|
||||
"Initializing no-op notification service, no push server notifications will be received",
|
||||
);
|
||||
return Subscription.EMPTY;
|
||||
}
|
||||
|
||||
reconnectFromActivity(): void {
|
||||
this.logService.info("Reconnecting notification service from activity");
|
||||
}
|
||||
|
||||
disconnectFromInactivity(): void {
|
||||
this.logService.info("Disconnecting notification service from inactivity");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,158 @@
|
||||
import {
|
||||
HttpTransportType,
|
||||
HubConnectionBuilder,
|
||||
HubConnectionState,
|
||||
ILogger,
|
||||
LogLevel,
|
||||
} from "@microsoft/signalr";
|
||||
import { MessagePackHubProtocol } from "@microsoft/signalr-protocol-msgpack";
|
||||
import { Observable, Subscription } from "rxjs";
|
||||
|
||||
import { ApiService } from "../../../abstractions/api.service";
|
||||
import { NotificationResponse } from "../../../models/response/notification.response";
|
||||
import { UserId } from "../../../types/guid";
|
||||
import { LogService } from "../../abstractions/log.service";
|
||||
|
||||
// 2 Minutes
|
||||
const MIN_RECONNECT_TIME = 2 * 60 * 1000;
|
||||
// 5 Minutes
|
||||
const MAX_RECONNECT_TIME = 5 * 60 * 1000;
|
||||
|
||||
export type Heartbeat = { type: "Heartbeat" };
|
||||
export type ReceiveMessage = { type: "ReceiveMessage"; message: NotificationResponse };
|
||||
|
||||
export type SignalRNotification = Heartbeat | ReceiveMessage;
|
||||
|
||||
export type TimeoutManager = {
|
||||
setTimeout: (handler: TimerHandler, timeout: number) => number;
|
||||
clearTimeout: (timeoutId: number) => void;
|
||||
};
|
||||
|
||||
class SignalRLogger implements ILogger {
|
||||
constructor(private readonly logService: LogService) {}
|
||||
|
||||
redactMessage(message: string): string {
|
||||
const ACCESS_TOKEN_TEXT = "access_token=";
|
||||
// Redact the access token from the logs if it exists.
|
||||
const accessTokenIndex = message.indexOf(ACCESS_TOKEN_TEXT);
|
||||
if (accessTokenIndex !== -1) {
|
||||
return message.substring(0, accessTokenIndex + ACCESS_TOKEN_TEXT.length) + "[REDACTED]";
|
||||
}
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
log(logLevel: LogLevel, message: string): void {
|
||||
const redactedMessage = `[SignalR] ${this.redactMessage(message)}`;
|
||||
|
||||
switch (logLevel) {
|
||||
case LogLevel.Critical:
|
||||
this.logService.error(redactedMessage);
|
||||
break;
|
||||
case LogLevel.Error:
|
||||
this.logService.error(redactedMessage);
|
||||
break;
|
||||
case LogLevel.Warning:
|
||||
this.logService.warning(redactedMessage);
|
||||
break;
|
||||
case LogLevel.Information:
|
||||
this.logService.info(redactedMessage);
|
||||
break;
|
||||
case LogLevel.Debug:
|
||||
this.logService.debug(redactedMessage);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class SignalRConnectionService {
|
||||
constructor(
|
||||
private readonly apiService: ApiService,
|
||||
private readonly logService: LogService,
|
||||
private readonly hubConnectionBuilderFactory: () => HubConnectionBuilder = () =>
|
||||
new HubConnectionBuilder(),
|
||||
private readonly timeoutManager: TimeoutManager = globalThis,
|
||||
) {}
|
||||
|
||||
connect$(userId: UserId, notificationsUrl: string) {
|
||||
return new Observable<SignalRNotification>((subsciber) => {
|
||||
const connection = this.hubConnectionBuilderFactory()
|
||||
.withUrl(notificationsUrl + "/hub", {
|
||||
accessTokenFactory: () => this.apiService.getActiveBearerToken(),
|
||||
skipNegotiation: true,
|
||||
transport: HttpTransportType.WebSockets,
|
||||
})
|
||||
.withHubProtocol(new MessagePackHubProtocol())
|
||||
.configureLogging(new SignalRLogger(this.logService))
|
||||
.build();
|
||||
|
||||
connection.on("ReceiveMessage", (data: any) => {
|
||||
subsciber.next({ type: "ReceiveMessage", message: new NotificationResponse(data) });
|
||||
});
|
||||
|
||||
connection.on("Heartbeat", () => {
|
||||
subsciber.next({ type: "Heartbeat" });
|
||||
});
|
||||
|
||||
let reconnectSubscription: Subscription | null = null;
|
||||
|
||||
// Create schedule reconnect function
|
||||
const scheduleReconnect = () => {
|
||||
if (
|
||||
connection == null ||
|
||||
connection.state !== HubConnectionState.Disconnected ||
|
||||
(reconnectSubscription != null && !reconnectSubscription.closed)
|
||||
) {
|
||||
// Skip scheduling a new reconnect, either the connection isn't disconnected
|
||||
// or an active reconnect is already scheduled.
|
||||
return;
|
||||
}
|
||||
|
||||
// If we've somehow gotten here while the subscriber is closed,
|
||||
// we do not want to reconnect. So leave.
|
||||
if (subsciber.closed) {
|
||||
return;
|
||||
}
|
||||
|
||||
const randomTime = this.randomReconnectTime();
|
||||
const timeoutHandler = this.timeoutManager.setTimeout(() => {
|
||||
connection
|
||||
.start()
|
||||
.then(() => {
|
||||
reconnectSubscription = null;
|
||||
})
|
||||
.catch(() => {
|
||||
scheduleReconnect();
|
||||
});
|
||||
}, randomTime);
|
||||
|
||||
reconnectSubscription = new Subscription(() =>
|
||||
this.timeoutManager.clearTimeout(timeoutHandler),
|
||||
);
|
||||
};
|
||||
|
||||
connection.onclose((error) => {
|
||||
scheduleReconnect();
|
||||
});
|
||||
|
||||
// Start connection
|
||||
connection.start().catch(() => {
|
||||
scheduleReconnect();
|
||||
});
|
||||
|
||||
return () => {
|
||||
// Cancel any possible scheduled reconnects
|
||||
reconnectSubscription?.unsubscribe();
|
||||
connection?.stop().catch((error) => {
|
||||
this.logService.error("Error while stopping SignalR connection", error);
|
||||
});
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
private randomReconnectTime() {
|
||||
return (
|
||||
Math.floor(Math.random() * (MAX_RECONNECT_TIME - MIN_RECONNECT_TIME + 1)) + MIN_RECONNECT_TIME
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
import { Observable, of } from "rxjs";
|
||||
|
||||
import { UserId } from "../../../types/guid";
|
||||
import { SupportStatus } from "../../misc/support-status";
|
||||
|
||||
import { WebPushConnectionService, WebPushConnector } from "./webpush-connection.service";
|
||||
|
||||
/**
|
||||
* An implementation of {@see WebPushConnectionService} for clients that do not have support for WebPush
|
||||
*/
|
||||
export class UnsupportedWebPushConnectionService implements WebPushConnectionService {
|
||||
supportStatus$(userId: UserId): Observable<SupportStatus<WebPushConnector>> {
|
||||
return of({ type: "not-supported", reason: "client-not-supported" });
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
import { ApiService } from "../../../abstractions/api.service";
|
||||
import { AppIdService } from "../../abstractions/app-id.service";
|
||||
|
||||
import { WebPushRequest } from "./web-push.request";
|
||||
|
||||
export class WebPushNotificationsApiService {
|
||||
constructor(
|
||||
private readonly apiService: ApiService,
|
||||
private readonly appIdService: AppIdService,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Posts a device-user association to the server and ensures it's installed for push server notifications
|
||||
*/
|
||||
async putSubscription(pushSubscription: PushSubscriptionJSON): Promise<void> {
|
||||
const request = WebPushRequest.from(pushSubscription);
|
||||
await this.apiService.send(
|
||||
"POST",
|
||||
`/devices/identifier/${await this.appIdService.getAppId()}/web-push-auth`,
|
||||
request,
|
||||
true,
|
||||
false,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
export class WebPushRequest {
|
||||
endpoint: string | undefined;
|
||||
p256dh: string | undefined;
|
||||
auth: string | undefined;
|
||||
|
||||
static from(pushSubscription: PushSubscriptionJSON): WebPushRequest {
|
||||
const result = new WebPushRequest();
|
||||
result.endpoint = pushSubscription.endpoint;
|
||||
result.p256dh = pushSubscription.keys?.p256dh;
|
||||
result.auth = pushSubscription.keys?.auth;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
import { Observable } from "rxjs";
|
||||
|
||||
import { NotificationResponse } from "../../../models/response/notification.response";
|
||||
import { UserId } from "../../../types/guid";
|
||||
import { SupportStatus } from "../../misc/support-status";
|
||||
|
||||
export interface WebPushConnector {
|
||||
notifications$: Observable<NotificationResponse>;
|
||||
}
|
||||
|
||||
export abstract class WebPushConnectionService {
|
||||
abstract supportStatus$(userId: UserId): Observable<SupportStatus<WebPushConnector>>;
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
import { Observable, of } from "rxjs";
|
||||
|
||||
import { UserId } from "../../../types/guid";
|
||||
import { SupportStatus } from "../../misc/support-status";
|
||||
|
||||
import { WebPushConnectionService, WebPushConnector } from "./webpush-connection.service";
|
||||
|
||||
export class WebSocketWebPushConnectionService implements WebPushConnectionService {
|
||||
supportStatus$(userId: UserId): Observable<SupportStatus<WebPushConnector>> {
|
||||
return of({ type: "not-supported", reason: "work-in-progress" });
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,387 @@
|
||||
import { mock, MockProxy } from "jest-mock-extended";
|
||||
import { firstValueFrom, of } from "rxjs";
|
||||
|
||||
import {
|
||||
awaitAsync,
|
||||
FakeGlobalState,
|
||||
FakeStateProvider,
|
||||
mockAccountServiceWith,
|
||||
} from "../../../../spec";
|
||||
import { PushTechnology } from "../../../enums/push-technology.enum";
|
||||
import { UserId } from "../../../types/guid";
|
||||
import { ConfigService } from "../../abstractions/config/config.service";
|
||||
import { ServerConfig } from "../../abstractions/config/server-config";
|
||||
import { Supported } from "../../misc/support-status";
|
||||
import { Utils } from "../../misc/utils";
|
||||
import { ServerConfigData } from "../../models/data/server-config.data";
|
||||
import { PushSettingsConfigResponse } from "../../models/response/server-config.response";
|
||||
import { KeyDefinition } from "../../state";
|
||||
|
||||
import { WebPushNotificationsApiService } from "./web-push-notifications-api.service";
|
||||
import { WebPushConnector } from "./webpush-connection.service";
|
||||
import {
|
||||
WEB_PUSH_SUBSCRIPTION_USERS,
|
||||
WorkerWebPushConnectionService,
|
||||
} from "./worker-webpush-connection.service";
|
||||
|
||||
const mockUser1 = "testUser1" as UserId;
|
||||
|
||||
const createSub = (key: string) => {
|
||||
return {
|
||||
options: { applicationServerKey: Utils.fromUrlB64ToArray(key), userVisibleOnly: true },
|
||||
endpoint: `web.push.endpoint/?${Utils.newGuid()}`,
|
||||
expirationTime: 5,
|
||||
getKey: () => null,
|
||||
toJSON: () => ({ endpoint: "something", keys: {}, expirationTime: 5 }),
|
||||
unsubscribe: () => Promise.resolve(true),
|
||||
} satisfies PushSubscription;
|
||||
};
|
||||
|
||||
describe("WorkerWebpushConnectionService", () => {
|
||||
let configService: MockProxy<ConfigService>;
|
||||
let webPushApiService: MockProxy<WebPushNotificationsApiService>;
|
||||
let stateProvider: FakeStateProvider;
|
||||
let pushManager: MockProxy<PushManager>;
|
||||
const userId = "testUser1" as UserId;
|
||||
|
||||
let sut: WorkerWebPushConnectionService;
|
||||
|
||||
beforeEach(() => {
|
||||
configService = mock();
|
||||
webPushApiService = mock();
|
||||
stateProvider = new FakeStateProvider(mockAccountServiceWith(userId));
|
||||
pushManager = mock();
|
||||
|
||||
sut = new WorkerWebPushConnectionService(
|
||||
configService,
|
||||
webPushApiService,
|
||||
mock<ServiceWorkerRegistration>({ pushManager: pushManager }),
|
||||
stateProvider,
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.resetAllMocks();
|
||||
});
|
||||
type ExtractKeyDefinitionType<T> = T extends KeyDefinition<infer U> ? U : never;
|
||||
describe("supportStatus$", () => {
|
||||
let fakeGlobalState: FakeGlobalState<
|
||||
ExtractKeyDefinitionType<typeof WEB_PUSH_SUBSCRIPTION_USERS>
|
||||
>;
|
||||
|
||||
beforeEach(() => {
|
||||
fakeGlobalState = stateProvider.getGlobal(WEB_PUSH_SUBSCRIPTION_USERS) as FakeGlobalState<
|
||||
ExtractKeyDefinitionType<typeof WEB_PUSH_SUBSCRIPTION_USERS>
|
||||
>;
|
||||
});
|
||||
|
||||
test("when web push is supported, have an existing subscription, and we've already registered the user, should not call API", async () => {
|
||||
configService.serverConfig$ = of(
|
||||
new ServerConfig(
|
||||
new ServerConfigData({
|
||||
push: new PushSettingsConfigResponse({
|
||||
pushTechnology: PushTechnology.WebPush,
|
||||
vapidPublicKey: "dGVzdA",
|
||||
}),
|
||||
}),
|
||||
),
|
||||
);
|
||||
const existingSubscription = createSub("dGVzdA");
|
||||
await fakeGlobalState.nextState({ [existingSubscription.endpoint]: [userId] });
|
||||
|
||||
pushManager.getSubscription.mockResolvedValue(existingSubscription);
|
||||
|
||||
const supportStatus = await firstValueFrom(sut.supportStatus$(mockUser1));
|
||||
expect(supportStatus.type).toBe("supported");
|
||||
const service = (supportStatus as Supported<WebPushConnector>).service;
|
||||
expect(service).not.toBeFalsy();
|
||||
|
||||
const notificationsSub = service.notifications$.subscribe();
|
||||
|
||||
await awaitAsync(2);
|
||||
|
||||
expect(pushManager.getSubscription).toHaveBeenCalledTimes(1);
|
||||
expect(webPushApiService.putSubscription).toHaveBeenCalledTimes(0);
|
||||
|
||||
expect(fakeGlobalState.nextMock).toHaveBeenCalledTimes(0);
|
||||
|
||||
notificationsSub.unsubscribe();
|
||||
});
|
||||
|
||||
test("when web push is supported, have an existing subscription, and we haven't registered the user, should call API", async () => {
|
||||
configService.serverConfig$ = of(
|
||||
new ServerConfig(
|
||||
new ServerConfigData({
|
||||
push: new PushSettingsConfigResponse({
|
||||
pushTechnology: PushTechnology.WebPush,
|
||||
vapidPublicKey: "dGVzdA",
|
||||
}),
|
||||
}),
|
||||
),
|
||||
);
|
||||
const existingSubscription = createSub("dGVzdA");
|
||||
await fakeGlobalState.nextState({
|
||||
[existingSubscription.endpoint]: ["otherUserId" as UserId],
|
||||
});
|
||||
|
||||
pushManager.getSubscription.mockResolvedValue(existingSubscription);
|
||||
|
||||
const supportStatus = await firstValueFrom(sut.supportStatus$(mockUser1));
|
||||
expect(supportStatus.type).toBe("supported");
|
||||
const service = (supportStatus as Supported<WebPushConnector>).service;
|
||||
expect(service).not.toBeFalsy();
|
||||
|
||||
const notificationsSub = service.notifications$.subscribe();
|
||||
|
||||
await awaitAsync(2);
|
||||
|
||||
expect(pushManager.getSubscription).toHaveBeenCalledTimes(1);
|
||||
expect(webPushApiService.putSubscription).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(fakeGlobalState.nextMock).toHaveBeenCalledTimes(1);
|
||||
expect(fakeGlobalState.nextMock).toHaveBeenCalledWith({
|
||||
[existingSubscription.endpoint]: ["otherUserId", mockUser1],
|
||||
});
|
||||
|
||||
notificationsSub.unsubscribe();
|
||||
});
|
||||
|
||||
test("when web push is supported, have an existing subscription, but it isn't in state, should call API and add to state", async () => {
|
||||
configService.serverConfig$ = of(
|
||||
new ServerConfig(
|
||||
new ServerConfigData({
|
||||
push: new PushSettingsConfigResponse({
|
||||
pushTechnology: PushTechnology.WebPush,
|
||||
vapidPublicKey: "dGVzdA",
|
||||
}),
|
||||
}),
|
||||
),
|
||||
);
|
||||
const existingSubscription = createSub("dGVzdA");
|
||||
await fakeGlobalState.nextState({
|
||||
[existingSubscription.endpoint]: null!,
|
||||
});
|
||||
|
||||
pushManager.getSubscription.mockResolvedValue(existingSubscription);
|
||||
|
||||
const supportStatus = await firstValueFrom(sut.supportStatus$(mockUser1));
|
||||
expect(supportStatus.type).toBe("supported");
|
||||
const service = (supportStatus as Supported<WebPushConnector>).service;
|
||||
expect(service).not.toBeFalsy();
|
||||
|
||||
const notificationsSub = service.notifications$.subscribe();
|
||||
|
||||
await awaitAsync(2);
|
||||
|
||||
expect(pushManager.getSubscription).toHaveBeenCalledTimes(1);
|
||||
expect(webPushApiService.putSubscription).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(fakeGlobalState.nextMock).toHaveBeenCalledTimes(1);
|
||||
expect(fakeGlobalState.nextMock).toHaveBeenCalledWith({
|
||||
[existingSubscription.endpoint]: [mockUser1],
|
||||
});
|
||||
|
||||
notificationsSub.unsubscribe();
|
||||
});
|
||||
|
||||
test("when web push is supported, have an existing subscription, but state array is null, should call API and add to state", async () => {
|
||||
configService.serverConfig$ = of(
|
||||
new ServerConfig(
|
||||
new ServerConfigData({
|
||||
push: new PushSettingsConfigResponse({
|
||||
pushTechnology: PushTechnology.WebPush,
|
||||
vapidPublicKey: "dGVzdA",
|
||||
}),
|
||||
}),
|
||||
),
|
||||
);
|
||||
const existingSubscription = createSub("dGVzdA");
|
||||
await fakeGlobalState.nextState({});
|
||||
|
||||
pushManager.getSubscription.mockResolvedValue(existingSubscription);
|
||||
|
||||
const supportStatus = await firstValueFrom(sut.supportStatus$(mockUser1));
|
||||
expect(supportStatus.type).toBe("supported");
|
||||
const service = (supportStatus as Supported<WebPushConnector>).service;
|
||||
expect(service).not.toBeFalsy();
|
||||
|
||||
const notificationsSub = service.notifications$.subscribe();
|
||||
|
||||
await awaitAsync(2);
|
||||
|
||||
expect(pushManager.getSubscription).toHaveBeenCalledTimes(1);
|
||||
expect(webPushApiService.putSubscription).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(fakeGlobalState.nextMock).toHaveBeenCalledTimes(1);
|
||||
expect(fakeGlobalState.nextMock).toHaveBeenCalledWith({
|
||||
[existingSubscription.endpoint]: [mockUser1],
|
||||
});
|
||||
|
||||
notificationsSub.unsubscribe();
|
||||
});
|
||||
|
||||
test("when web push is supported, but we don't have an existing subscription, should call the api and wipe out existing state", async () => {
|
||||
configService.serverConfig$ = of(
|
||||
new ServerConfig(
|
||||
new ServerConfigData({
|
||||
push: new PushSettingsConfigResponse({
|
||||
pushTechnology: PushTechnology.WebPush,
|
||||
vapidPublicKey: "dGVzdA",
|
||||
}),
|
||||
}),
|
||||
),
|
||||
);
|
||||
const existingState = createSub("dGVzdA");
|
||||
await fakeGlobalState.nextState({ [existingState.endpoint]: [userId] });
|
||||
|
||||
pushManager.getSubscription.mockResolvedValue(null);
|
||||
const newSubscription = createSub("dGVzdA");
|
||||
pushManager.subscribe.mockResolvedValue(newSubscription);
|
||||
|
||||
const supportStatus = await firstValueFrom(sut.supportStatus$(mockUser1));
|
||||
expect(supportStatus.type).toBe("supported");
|
||||
const service = (supportStatus as Supported<WebPushConnector>).service;
|
||||
expect(service).not.toBeFalsy();
|
||||
|
||||
const notificationsSub = service.notifications$.subscribe();
|
||||
|
||||
await awaitAsync(2);
|
||||
|
||||
expect(pushManager.getSubscription).toHaveBeenCalledTimes(1);
|
||||
expect(webPushApiService.putSubscription).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(fakeGlobalState.nextMock).toHaveBeenCalledTimes(1);
|
||||
expect(fakeGlobalState.nextMock).toHaveBeenCalledWith({
|
||||
[newSubscription.endpoint]: [mockUser1],
|
||||
});
|
||||
|
||||
notificationsSub.unsubscribe();
|
||||
});
|
||||
|
||||
test("when web push is supported and no existing subscription, should call API", async () => {
|
||||
configService.serverConfig$ = of(
|
||||
new ServerConfig(
|
||||
new ServerConfigData({
|
||||
push: new PushSettingsConfigResponse({
|
||||
pushTechnology: PushTechnology.WebPush,
|
||||
vapidPublicKey: "dGVzdA",
|
||||
}),
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
pushManager.getSubscription.mockResolvedValue(null);
|
||||
pushManager.subscribe.mockResolvedValue(createSub("dGVzdA"));
|
||||
|
||||
const supportStatus = await firstValueFrom(sut.supportStatus$(mockUser1));
|
||||
expect(supportStatus.type).toBe("supported");
|
||||
const service = (supportStatus as Supported<WebPushConnector>).service;
|
||||
expect(service).not.toBeFalsy();
|
||||
|
||||
const notificationsSub = service.notifications$.subscribe();
|
||||
|
||||
await awaitAsync(2);
|
||||
|
||||
expect(pushManager.getSubscription).toHaveBeenCalledTimes(1);
|
||||
expect(pushManager.subscribe).toHaveBeenCalledTimes(1);
|
||||
expect(webPushApiService.putSubscription).toHaveBeenCalledTimes(1);
|
||||
|
||||
notificationsSub.unsubscribe();
|
||||
});
|
||||
|
||||
test("when web push is supported and existing subscription with different key, should call API", async () => {
|
||||
configService.serverConfig$ = of(
|
||||
new ServerConfig(
|
||||
new ServerConfigData({
|
||||
push: new PushSettingsConfigResponse({
|
||||
pushTechnology: PushTechnology.WebPush,
|
||||
vapidPublicKey: "dGVzdA",
|
||||
}),
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
pushManager.getSubscription.mockResolvedValue(createSub("dGVzdF9hbHQ"));
|
||||
|
||||
pushManager.subscribe.mockResolvedValue(createSub("dGVzdA"));
|
||||
|
||||
const supportStatus = await firstValueFrom(sut.supportStatus$(mockUser1));
|
||||
expect(supportStatus.type).toBe("supported");
|
||||
const service = (supportStatus as Supported<WebPushConnector>).service;
|
||||
expect(service).not.toBeFalsy();
|
||||
|
||||
const notificationsSub = service.notifications$.subscribe();
|
||||
|
||||
await awaitAsync(2);
|
||||
|
||||
expect(pushManager.getSubscription).toHaveBeenCalledTimes(1);
|
||||
expect(pushManager.subscribe).toHaveBeenCalledTimes(1);
|
||||
expect(webPushApiService.putSubscription).toHaveBeenCalledTimes(1);
|
||||
|
||||
notificationsSub.unsubscribe();
|
||||
});
|
||||
|
||||
test("when server config emits multiple times quickly while api call takes a long time will only call API once", async () => {
|
||||
configService.serverConfig$ = of(
|
||||
new ServerConfig(
|
||||
new ServerConfigData({
|
||||
push: new PushSettingsConfigResponse({
|
||||
pushTechnology: PushTechnology.WebPush,
|
||||
vapidPublicKey: "dGVzdA",
|
||||
}),
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
pushManager.getSubscription.mockResolvedValue(createSub("dGVzdF9hbHQ"));
|
||||
|
||||
pushManager.subscribe.mockResolvedValue(createSub("dGVzdA"));
|
||||
|
||||
const supportStatus = await firstValueFrom(sut.supportStatus$(mockUser1));
|
||||
expect(supportStatus.type).toBe("supported");
|
||||
const service = (supportStatus as Supported<WebPushConnector>).service;
|
||||
expect(service).not.toBeFalsy();
|
||||
|
||||
const notificationsSub = service.notifications$.subscribe();
|
||||
|
||||
await awaitAsync(2);
|
||||
|
||||
expect(pushManager.getSubscription).toHaveBeenCalledTimes(1);
|
||||
expect(pushManager.subscribe).toHaveBeenCalledTimes(1);
|
||||
expect(webPushApiService.putSubscription).toHaveBeenCalledTimes(1);
|
||||
|
||||
notificationsSub.unsubscribe();
|
||||
});
|
||||
|
||||
it("server config shows SignalR support should return not-supported", async () => {
|
||||
configService.serverConfig$ = of(
|
||||
new ServerConfig(
|
||||
new ServerConfigData({
|
||||
push: new PushSettingsConfigResponse({
|
||||
pushTechnology: PushTechnology.SignalR,
|
||||
}),
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
const supportStatus = await firstValueFrom(sut.supportStatus$(mockUser1));
|
||||
expect(supportStatus.type).toBe("not-supported");
|
||||
});
|
||||
|
||||
it("server config shows web push but no public key support should return not-supported", async () => {
|
||||
configService.serverConfig$ = of(
|
||||
new ServerConfig(
|
||||
new ServerConfigData({
|
||||
push: new PushSettingsConfigResponse({
|
||||
pushTechnology: PushTechnology.WebPush,
|
||||
vapidPublicKey: null,
|
||||
}),
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
const supportStatus = await firstValueFrom(sut.supportStatus$(mockUser1));
|
||||
expect(supportStatus.type).toBe("not-supported");
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,208 @@
|
||||
import {
|
||||
concat,
|
||||
concatMap,
|
||||
defer,
|
||||
distinctUntilChanged,
|
||||
fromEvent,
|
||||
map,
|
||||
Observable,
|
||||
Subject,
|
||||
Subscription,
|
||||
switchMap,
|
||||
withLatestFrom,
|
||||
} from "rxjs";
|
||||
|
||||
import { PushTechnology } from "../../../enums/push-technology.enum";
|
||||
import { NotificationResponse } from "../../../models/response/notification.response";
|
||||
import { UserId } from "../../../types/guid";
|
||||
import { ConfigService } from "../../abstractions/config/config.service";
|
||||
import { SupportStatus } from "../../misc/support-status";
|
||||
import { Utils } from "../../misc/utils";
|
||||
import { KeyDefinition, StateProvider, WEB_PUSH_SUBSCRIPTION } from "../../state";
|
||||
|
||||
import { WebPushNotificationsApiService } from "./web-push-notifications-api.service";
|
||||
import { WebPushConnectionService, WebPushConnector } from "./webpush-connection.service";
|
||||
|
||||
// Ref: https://w3c.github.io/push-api/#the-pushsubscriptionchange-event
|
||||
interface PushSubscriptionChangeEvent {
|
||||
readonly newSubscription?: PushSubscription;
|
||||
readonly oldSubscription?: PushSubscription;
|
||||
}
|
||||
|
||||
// Ref: https://developer.mozilla.org/en-US/docs/Web/API/PushMessageData
|
||||
interface PushMessageData {
|
||||
json(): any;
|
||||
}
|
||||
|
||||
// Ref: https://developer.mozilla.org/en-US/docs/Web/API/PushEvent
|
||||
interface PushEvent {
|
||||
data: PushMessageData;
|
||||
}
|
||||
|
||||
/**
|
||||
* An implementation for connecting to web push based server notifications running in a Worker.
|
||||
*/
|
||||
export class WorkerWebPushConnectionService implements WebPushConnectionService {
|
||||
private pushEvent = new Subject<PushEvent>();
|
||||
private pushChangeEvent = new Subject<PushSubscriptionChangeEvent>();
|
||||
|
||||
constructor(
|
||||
private readonly configService: ConfigService,
|
||||
private readonly webPushApiService: WebPushNotificationsApiService,
|
||||
private readonly serviceWorkerRegistration: ServiceWorkerRegistration,
|
||||
private readonly stateProvider: StateProvider,
|
||||
) {}
|
||||
|
||||
start(): Subscription {
|
||||
const subscription = new Subscription(() => {
|
||||
this.pushEvent.complete();
|
||||
this.pushChangeEvent.complete();
|
||||
this.pushEvent = new Subject<PushEvent>();
|
||||
this.pushChangeEvent = new Subject<PushSubscriptionChangeEvent>();
|
||||
});
|
||||
|
||||
const pushEventSubscription = fromEvent<PushEvent>(self, "push").subscribe(this.pushEvent);
|
||||
|
||||
const pushChangeEventSubscription = fromEvent<PushSubscriptionChangeEvent>(
|
||||
self,
|
||||
"pushsubscriptionchange",
|
||||
).subscribe(this.pushChangeEvent);
|
||||
|
||||
subscription.add(pushEventSubscription);
|
||||
subscription.add(pushChangeEventSubscription);
|
||||
|
||||
return subscription;
|
||||
}
|
||||
|
||||
supportStatus$(userId: UserId): Observable<SupportStatus<WebPushConnector>> {
|
||||
// Check the server config to see if it supports sending WebPush server notifications
|
||||
// FIXME: get config of server for the specified userId, once ConfigService supports it
|
||||
return this.configService.serverConfig$.pipe(
|
||||
map((config) =>
|
||||
config?.push?.pushTechnology === PushTechnology.WebPush ? config.push.vapidPublicKey : null,
|
||||
),
|
||||
// No need to re-emit when there is new server config if the vapidPublicKey is still there and the exact same
|
||||
distinctUntilChanged(),
|
||||
map((publicKey) => {
|
||||
if (publicKey == null) {
|
||||
return {
|
||||
type: "not-supported",
|
||||
reason: "server-not-configured",
|
||||
} satisfies SupportStatus<WebPushConnector>;
|
||||
}
|
||||
|
||||
return {
|
||||
type: "supported",
|
||||
service: new MyWebPushConnector(
|
||||
publicKey,
|
||||
userId,
|
||||
this.webPushApiService,
|
||||
this.serviceWorkerRegistration,
|
||||
this.pushEvent,
|
||||
this.pushChangeEvent,
|
||||
this.stateProvider,
|
||||
),
|
||||
} satisfies SupportStatus<WebPushConnector>;
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
class MyWebPushConnector implements WebPushConnector {
|
||||
notifications$: Observable<NotificationResponse>;
|
||||
|
||||
constructor(
|
||||
private readonly vapidPublicKey: string,
|
||||
private readonly userId: UserId,
|
||||
private readonly webPushApiService: WebPushNotificationsApiService,
|
||||
private readonly serviceWorkerRegistration: ServiceWorkerRegistration,
|
||||
private readonly pushEvent$: Observable<PushEvent>,
|
||||
private readonly pushChangeEvent$: Observable<PushSubscriptionChangeEvent>,
|
||||
private readonly stateProvider: StateProvider,
|
||||
) {
|
||||
const subscriptionUsersState = this.stateProvider.getGlobal(WEB_PUSH_SUBSCRIPTION_USERS);
|
||||
this.notifications$ = this.getOrCreateSubscription$(this.vapidPublicKey).pipe(
|
||||
withLatestFrom(subscriptionUsersState.state$.pipe(map((x) => x ?? {}))),
|
||||
concatMap(async ([[isExistingSubscription, subscription], subscriptionUsers]) => {
|
||||
if (subscription == null) {
|
||||
throw new Error("Expected a non-null subscription.");
|
||||
}
|
||||
|
||||
// If this is a new subscription, we can clear state and start over
|
||||
if (!isExistingSubscription) {
|
||||
subscriptionUsers = {};
|
||||
}
|
||||
|
||||
// If the user is already subscribed, we don't need to do anything
|
||||
if (subscriptionUsers[subscription.endpoint]?.includes(this.userId)) {
|
||||
return;
|
||||
}
|
||||
subscriptionUsers[subscription.endpoint] ??= [];
|
||||
subscriptionUsers[subscription.endpoint].push(this.userId);
|
||||
// Update the state with the new subscription-user association
|
||||
await subscriptionUsersState.update(() => subscriptionUsers);
|
||||
|
||||
// Inform the server about the new subscription-user association
|
||||
await this.webPushApiService.putSubscription(subscription.toJSON());
|
||||
}),
|
||||
switchMap(() => this.pushEvent$),
|
||||
map((e) => {
|
||||
return new NotificationResponse(e.data.json().data);
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
private async pushManagerSubscribe(key: string) {
|
||||
return await this.serviceWorkerRegistration.pushManager.subscribe({
|
||||
userVisibleOnly: false,
|
||||
applicationServerKey: key,
|
||||
});
|
||||
}
|
||||
|
||||
private getOrCreateSubscription$(key: string) {
|
||||
return concat(
|
||||
defer(async () => {
|
||||
const existingSubscription =
|
||||
await this.serviceWorkerRegistration.pushManager.getSubscription();
|
||||
|
||||
if (existingSubscription == null) {
|
||||
return [false, await this.pushManagerSubscribe(key)] as const;
|
||||
}
|
||||
|
||||
const subscriptionKey = Utils.fromBufferToUrlB64(
|
||||
// REASON: `Utils.fromBufferToUrlB64` handles null by returning null back to it.
|
||||
// its annotation should be updated and then this assertion can be removed.
|
||||
// eslint-disable-next-line @typescript-eslint/no-non-null-asserted-optional-chain
|
||||
existingSubscription.options?.applicationServerKey!,
|
||||
);
|
||||
|
||||
if (subscriptionKey !== key) {
|
||||
// There is a subscription, but it's not for the current server, unsubscribe and then make a new one
|
||||
await existingSubscription.unsubscribe();
|
||||
return [false, await this.pushManagerSubscribe(key)] as const;
|
||||
}
|
||||
|
||||
return [true, existingSubscription] as const;
|
||||
}),
|
||||
this.pushChangeEvent$.pipe(map((event) => [false, event.newSubscription] as const)),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export const WEB_PUSH_SUBSCRIPTION_USERS = new KeyDefinition<Record<string, UserId[]>>(
|
||||
WEB_PUSH_SUBSCRIPTION,
|
||||
"subUsers",
|
||||
{
|
||||
deserializer: (obj) => {
|
||||
if (obj == null) {
|
||||
return {};
|
||||
}
|
||||
|
||||
const result: Record<string, UserId[]> = {};
|
||||
for (const [key, value] of Object.entries(obj)) {
|
||||
result[key] = Array.isArray(value) ? value : [];
|
||||
}
|
||||
return result;
|
||||
},
|
||||
},
|
||||
);
|
||||
@@ -0,0 +1,30 @@
|
||||
import { Observable, Subscription } from "rxjs";
|
||||
|
||||
import { NotificationResponse } from "@bitwarden/common/models/response/notification.response";
|
||||
import { UserId } from "@bitwarden/common/types/guid";
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars -- Needed to link to API
|
||||
import type { DefaultServerNotificationsService } from "./internal";
|
||||
|
||||
/**
|
||||
* A service offering abilities to interact with push notifications from the server.
|
||||
*/
|
||||
export abstract class ServerNotificationsService {
|
||||
/**
|
||||
* @deprecated This method should not be consumed, an observable to listen to server
|
||||
* notifications will be available one day but it is not ready to be consumed generally.
|
||||
* Please add code reacting to server notifications in {@link DefaultServerNotificationsService.processNotification}
|
||||
*/
|
||||
abstract notifications$: Observable<readonly [NotificationResponse, UserId]>;
|
||||
/**
|
||||
* Starts automatic listening and processing of server notifications, should only be called once per application,
|
||||
* or you will risk notifications being processed multiple times.
|
||||
*/
|
||||
abstract startListening(): Subscription;
|
||||
// TODO: Delete this method in favor of an `ActivityService` that notifications can depend on.
|
||||
// https://bitwarden.atlassian.net/browse/PM-14264
|
||||
abstract reconnectFromActivity(): void;
|
||||
// TODO: Delete this method in favor of an `ActivityService` that notifications can depend on.
|
||||
// https://bitwarden.atlassian.net/browse/PM-14264
|
||||
abstract disconnectFromInactivity(): void;
|
||||
}
|
||||
Reference in New Issue
Block a user