1
0
mirror of https://github.com/bitwarden/browser synced 2025-12-11 13:53:34 +00:00

[PM-18903] Desktop sync issues (#13681)

* [PM-18707] Use different BroadcasterSubscriptionId in base view component to avoid collision with desktop view component

* [PM-18707] Use userId instead of payloadUserId for cipher notification syncs

* [PM-19032] Live Sync on Desktop (#13851)

* migrate the vault-items to an observables rather than async/promises

- this helps keep data in sync with the service state and avoids race conditions

* migrate the view component to an observables rather than async/promises

- this helps keep data in sync with the service state and avoids race conditions

* decrypt saved cipher from server

* bump timeout for upserting ciphers

* mark `go` as async in desktop vault

- previously it was a floating promise

* Revert "mark `go` as async in desktop vault"

This reverts commit fd28f40b18.

* Revert "bump timeout for upserting ciphers"

This reverts commit e963acc377.

* move vault utilities to `common` rather than `lib` to avoid circular dependencies

* use `perUserCache$` for `cipherViews$` to avoid new subscriptions from being created

* use userId from observable rather than locally set to be the most up to date

* [PM-18707] Add clearBuffer$ input to perUserCache$ helper so that  the internal share replay buffers can be cleared

* [PM-18707] Rework forceCipherViews$ to clearBuffer$ refactor

- Add dependency for cipherDecryptionKeys$ for the cipherViews so that decryption is never attempted without keys

* [PM-18707] Add overload to perUserCache to satisfy type checker

* [PM-18707] Fix overloads

* [PM-18707] Add check for empty failed to decrypt ciphers

* [PM-18707] Mark vault component for check after observable emits.

The cipherViews$ observable now persists between subscriptions, meaning that updates via the sync push notifications can occur outside the AngularZone causing delays in updating the view.

---------

Co-authored-by: Nick Krantz <125900171+nick-livefront@users.noreply.github.com>
Co-authored-by: Nick Krantz <nick@livefront.com>
This commit is contained in:
Shane Melton
2025-04-15 12:17:41 -07:00
committed by GitHub
parent 4cddc40828
commit 8258ea39b0
10 changed files with 239 additions and 168 deletions

View File

@@ -27,9 +27,6 @@ export class VaultItemsComponent extends BaseVaultItemsComponent {
// eslint-disable-next-line rxjs-angular/prefer-takeuntil
searchBarService.searchText$.pipe(distinctUntilChanged()).subscribe((searchText) => {
this.searchText = searchText;
// FIXME: Verify that this floating promise is intentional. If it is, add an explanatory comment and ensure there is proper error handling.
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.search(200);
});
}

View File

@@ -126,9 +126,7 @@ export class ViewComponent extends BaseViewComponent implements OnInit, OnDestro
}
async ngOnChanges() {
await super.load();
if (this.cipher.decryptionFailure) {
if (this.cipher?.decryptionFailure) {
DecryptionFailureDialogComponent.open(this.dialogService, {
cipherIds: [this.cipherId as CipherId],
});

View File

@@ -13,6 +13,7 @@ import {
Subject,
} from "rxjs";
import {
catchError,
concatMap,
debounceTime,
filter,
@@ -23,7 +24,6 @@ import {
take,
takeUntil,
tap,
catchError,
} from "rxjs/operators";
import {
@@ -64,6 +64,7 @@ import { CipherRepromptType } from "@bitwarden/common/vault/enums/cipher-repromp
import { TreeNode } from "@bitwarden/common/vault/models/domain/tree-node";
import { CipherView } from "@bitwarden/common/vault/models/view/cipher.view";
import { ServiceUtils } from "@bitwarden/common/vault/service-utils";
import { filterOutNullish } from "@bitwarden/common/vault/utils/observable-utilities";
import { DialogRef, DialogService, Icons, ToastService } from "@bitwarden/components";
import {
AddEditFolderDialogComponent,
@@ -138,7 +139,6 @@ const SearchTextDebounceInterval = 200;
VaultFilterModule,
VaultItemsModule,
SharedModule,
DecryptionFailureDialogComponent,
],
providers: [
RoutedVaultFilterService,
@@ -348,9 +348,8 @@ export class VaultComponent implements OnInit, OnDestroy {
]).pipe(
filter(([ciphers, filter]) => ciphers != undefined && filter != undefined),
concatMap(async ([ciphers, filter, searchText]) => {
const failedCiphers = await firstValueFrom(
this.cipherService.failedToDecryptCiphers$(activeUserId),
);
const failedCiphers =
(await firstValueFrom(this.cipherService.failedToDecryptCiphers$(activeUserId))) ?? [];
const filterFunction = createFilterFunction(filter);
// Append any failed to decrypt ciphers to the top of the cipher list
const allCiphers = [...failedCiphers, ...ciphers];
@@ -472,6 +471,7 @@ export class VaultComponent implements OnInit, OnDestroy {
firstSetup$
.pipe(
switchMap(() => this.cipherService.failedToDecryptCiphers$(activeUserId)),
filterOutNullish(),
map((ciphers) => ciphers.filter((c) => !c.isDeleted)),
filter((ciphers) => ciphers.length > 0),
take(1),
@@ -528,6 +528,10 @@ export class VaultComponent implements OnInit, OnDestroy {
this.isEmpty = collections?.length === 0 && ciphers?.length === 0;
this.performingInitialLoad = false;
this.refreshing = false;
// Explicitly mark for check to ensure the view is updated
// Some sources are not always emitted within the Angular zone (e.g. ciphers updated via WS notifications)
this.changeDetectorRef.markForCheck();
},
);
}

View File

@@ -422,10 +422,15 @@ export class AddEditComponent implements OnInit, OnDestroy {
const activeUserId = await firstValueFrom(this.accountService.activeAccount$.pipe(getUserId));
const cipher = await this.encryptCipher(activeUserId);
try {
this.formPromise = this.saveCipher(cipher);
await this.formPromise;
this.cipher.id = cipher.id;
const savedCipher = await this.formPromise;
// Reset local cipher from the saved cipher returned from the server
this.cipher = await savedCipher.decrypt(
await this.cipherService.getKeyForCipherKeyDecryption(savedCipher, activeUserId),
);
this.toastService.showToast({
variant: "success",
title: null,

View File

@@ -1,13 +1,22 @@
// FIXME: Update this file to be type safe and remove this and next line
// @ts-strict-ignore
import { Directive, EventEmitter, Input, OnDestroy, OnInit, Output } from "@angular/core";
import { BehaviorSubject, Subject, firstValueFrom, from, switchMap, takeUntil } from "rxjs";
import { takeUntilDestroyed } from "@angular/core/rxjs-interop";
import {
BehaviorSubject,
Subject,
combineLatest,
filter,
from,
of,
switchMap,
takeUntil,
} from "rxjs";
import { SearchService } from "@bitwarden/common/abstractions/search.service";
import { Organization } from "@bitwarden/common/admin-console/models/domain/organization";
import { AccountService } from "@bitwarden/common/auth/abstractions/account.service";
import { getUserId } from "@bitwarden/common/auth/services/account.service";
import { UserId } from "@bitwarden/common/types/guid";
import { CipherService } from "@bitwarden/common/vault/abstractions/cipher.service";
import { CipherView } from "@bitwarden/common/vault/models/view/cipher.view";
@@ -21,17 +30,17 @@ export class VaultItemsComponent implements OnInit, OnDestroy {
loaded = false;
ciphers: CipherView[] = [];
filter: (cipher: CipherView) => boolean = null;
deleted = false;
organization: Organization;
protected searchPending = false;
private userId: UserId;
/** Construct filters as an observable so it can be appended to the cipher stream. */
private _filter$ = new BehaviorSubject<(cipher: CipherView) => boolean | null>(null);
private destroy$ = new Subject<void>();
private searchTimeout: any = null;
private isSearchable: boolean = false;
private _searchText$ = new BehaviorSubject<string>("");
get searchText() {
return this._searchText$.value;
}
@@ -39,18 +48,28 @@ export class VaultItemsComponent implements OnInit, OnDestroy {
this._searchText$.next(value);
}
get filter() {
return this._filter$.value;
}
set filter(value: (cipher: CipherView) => boolean | null) {
this._filter$.next(value);
}
constructor(
protected searchService: SearchService,
protected cipherService: CipherService,
protected accountService: AccountService,
) {}
) {
this.subscribeToCiphers();
}
async ngOnInit() {
this.userId = await firstValueFrom(getUserId(this.accountService.activeAccount$));
this._searchText$
combineLatest([getUserId(this.accountService.activeAccount$), this._searchText$])
.pipe(
switchMap((searchText) => from(this.searchService.isSearchable(this.userId, searchText))),
switchMap(([userId, searchText]) =>
from(this.searchService.isSearchable(userId, searchText)),
),
takeUntil(this.destroy$),
)
.subscribe((isSearchable) => {
@@ -80,23 +99,6 @@ export class VaultItemsComponent implements OnInit, OnDestroy {
async applyFilter(filter: (cipher: CipherView) => boolean = null) {
this.filter = filter;
await this.search(null);
}
async search(timeout: number = null, indexedCiphers?: CipherView[]) {
this.searchPending = false;
if (this.searchTimeout != null) {
clearTimeout(this.searchTimeout);
}
if (timeout == null) {
await this.doSearch(indexedCiphers);
return;
}
this.searchPending = true;
this.searchTimeout = setTimeout(async () => {
await this.doSearch(indexedCiphers);
this.searchPending = false;
}, timeout);
}
selectCipher(cipher: CipherView) {
@@ -121,25 +123,44 @@ export class VaultItemsComponent implements OnInit, OnDestroy {
protected deletedFilter: (cipher: CipherView) => boolean = (c) => c.isDeleted === this.deleted;
protected async doSearch(indexedCiphers?: CipherView[], userId?: UserId) {
// Get userId from activeAccount if not provided from parent stream
if (!userId) {
userId = await firstValueFrom(getUserId(this.accountService.activeAccount$));
}
/**
* Creates stream of dependencies that results in the list of ciphers to display
* within the vault list.
*
* Note: This previously used promises but race conditions with how the ciphers were
* stored in electron. Using observables is more reliable as fresh values will always
* cascade through the components.
*/
private subscribeToCiphers() {
getUserId(this.accountService.activeAccount$)
.pipe(
switchMap((userId) =>
combineLatest([
this.cipherService.cipherViews$(userId).pipe(filter((ciphers) => ciphers != null)),
this.cipherService.failedToDecryptCiphers$(userId),
this._searchText$,
this._filter$,
of(userId),
]),
),
switchMap(([indexedCiphers, failedCiphers, searchText, filter, userId]) => {
let allCiphers = indexedCiphers ?? [];
const _failedCiphers = failedCiphers ?? [];
indexedCiphers =
indexedCiphers ?? (await firstValueFrom(this.cipherService.cipherViews$(userId)));
allCiphers = [..._failedCiphers, ...allCiphers];
const failedCiphers = await firstValueFrom(this.cipherService.failedToDecryptCiphers$(userId));
if (failedCiphers != null && failedCiphers.length > 0) {
indexedCiphers = [...failedCiphers, ...indexedCiphers];
}
this.ciphers = await this.searchService.searchCiphers(
this.userId,
this.searchText,
[this.filter, this.deletedFilter],
indexedCiphers,
return this.searchService.searchCiphers(
userId,
searchText,
[filter, this.deletedFilter],
allCiphers,
);
}),
takeUntilDestroyed(),
)
.subscribe((ciphers) => {
this.ciphers = ciphers;
this.loaded = true;
});
}
}

View File

@@ -11,7 +11,17 @@ import {
OnInit,
Output,
} from "@angular/core";
import { filter, firstValueFrom, map, Observable } from "rxjs";
import {
BehaviorSubject,
combineLatest,
filter,
firstValueFrom,
map,
Observable,
of,
switchMap,
tap,
} from "rxjs";
import { ApiService } from "@bitwarden/common/abstractions/api.service";
import { AuditService } from "@bitwarden/common/abstractions/audit.service";
@@ -46,11 +56,22 @@ import { DialogService, ToastService } from "@bitwarden/components";
import { KeyService } from "@bitwarden/key-management";
import { PasswordRepromptService } from "@bitwarden/vault";
const BroadcasterSubscriptionId = "ViewComponent";
const BroadcasterSubscriptionId = "BaseViewComponent";
@Directive()
export class ViewComponent implements OnDestroy, OnInit {
@Input() cipherId: string;
/** Observable of cipherId$ that will update each time the `Input` updates */
private _cipherId$ = new BehaviorSubject<string>(null);
@Input()
set cipherId(value: string) {
this._cipherId$.next(value);
}
get cipherId(): string {
return this._cipherId$.getValue();
}
@Input() collectionId: string;
@Output() onEditCipher = new EventEmitter<CipherView>();
@Output() onCloneCipher = new EventEmitter<CipherView>();
@@ -126,13 +147,30 @@ export class ViewComponent implements OnDestroy, OnInit {
switch (message.command) {
case "syncCompleted":
if (message.successfully) {
await this.load();
this.changeDetectorRef.detectChanges();
}
break;
}
});
});
// Set up the subscription to the activeAccount$ and cipherId$ observables
combineLatest([this.accountService.activeAccount$.pipe(getUserId), this._cipherId$])
.pipe(
tap(() => this.cleanUp()),
switchMap(([userId, cipherId]) => {
const cipher$ = this.cipherService.cipherViews$(userId).pipe(
map((ciphers) => ciphers?.find((c) => c.id === cipherId)),
filter((cipher) => !!cipher),
);
return combineLatest([of(userId), cipher$]);
}),
)
.subscribe(([userId, cipher]) => {
this.cipher = cipher;
void this.constructCipherDetails(userId);
});
}
ngOnDestroy() {
@@ -140,70 +178,6 @@ export class ViewComponent implements OnDestroy, OnInit {
this.cleanUp();
}
async load() {
this.cleanUp();
// Grab individual cipher from `cipherViews$` for the most up-to-date information
const activeUserId = await firstValueFrom(this.accountService.activeAccount$.pipe(getUserId));
this.cipher = await firstValueFrom(
this.cipherService.cipherViews$(activeUserId).pipe(
map((ciphers) => ciphers?.find((c) => c.id === this.cipherId)),
filter((cipher) => !!cipher),
),
);
this.canAccessPremium = await firstValueFrom(
this.billingAccountProfileStateService.hasPremiumFromAnySource$(activeUserId),
);
this.showPremiumRequiredTotp =
this.cipher.login.totp && !this.canAccessPremium && !this.cipher.organizationUseTotp;
this.canDeleteCipher$ = this.cipherAuthorizationService.canDeleteCipher$(this.cipher, [
this.collectionId as CollectionId,
]);
this.canRestoreCipher$ = this.cipherAuthorizationService.canRestoreCipher$(this.cipher);
if (this.cipher.folderId) {
this.folder = await (
await firstValueFrom(this.folderService.folderViews$(activeUserId))
).find((f) => f.id == this.cipher.folderId);
}
const canGenerateTotp =
this.cipher.type === CipherType.Login &&
this.cipher.login.totp &&
(this.cipher.organizationUseTotp || this.canAccessPremium);
this.totpInfo$ = canGenerateTotp
? this.totpService.getCode$(this.cipher.login.totp).pipe(
map((response) => {
const epoch = Math.round(new Date().getTime() / 1000.0);
const mod = epoch % response.period;
// Format code
const totpCodeFormatted =
response.code.length > 4
? `${response.code.slice(0, Math.floor(response.code.length / 2))} ${response.code.slice(Math.floor(response.code.length / 2))}`
: response.code;
return {
totpCode: response.code,
totpCodeFormatted,
totpDash: +(Math.round(((78.6 / response.period) * mod + "e+2") as any) + "e-2"),
totpSec: response.period - mod,
totpLow: response.period - mod <= 7,
} as TotpInfo;
}),
)
: undefined;
if (this.previousCipherId !== this.cipherId) {
// FIXME: Verify that this floating promise is intentional. If it is, add an explanatory comment and ensure there is proper error handling.
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.eventCollectionService.collect(EventType.Cipher_ClientViewed, this.cipherId);
}
this.previousCipherId = this.cipherId;
}
async edit() {
this.onEditCipher.emit(this.cipher);
}
@@ -533,4 +507,61 @@ export class ViewComponent implements OnDestroy, OnInit {
this.showCardCode = false;
this.passwordReprompted = false;
}
/**
* When a cipher is viewed, construct all details for the view that are not directly
* available from the cipher object itself.
*/
private async constructCipherDetails(userId: UserId) {
this.canAccessPremium = await firstValueFrom(
this.billingAccountProfileStateService.hasPremiumFromAnySource$(userId),
);
this.showPremiumRequiredTotp =
this.cipher.login.totp && !this.canAccessPremium && !this.cipher.organizationUseTotp;
this.canDeleteCipher$ = this.cipherAuthorizationService.canDeleteCipher$(this.cipher, [
this.collectionId as CollectionId,
]);
this.canRestoreCipher$ = this.cipherAuthorizationService.canRestoreCipher$(this.cipher);
if (this.cipher.folderId) {
this.folder = await (
await firstValueFrom(this.folderService.folderViews$(userId))
).find((f) => f.id == this.cipher.folderId);
}
const canGenerateTotp =
this.cipher.type === CipherType.Login &&
this.cipher.login.totp &&
(this.cipher.organizationUseTotp || this.canAccessPremium);
this.totpInfo$ = canGenerateTotp
? this.totpService.getCode$(this.cipher.login.totp).pipe(
map((response) => {
const epoch = Math.round(new Date().getTime() / 1000.0);
const mod = epoch % response.period;
// Format code
const totpCodeFormatted =
response.code.length > 4
? `${response.code.slice(0, Math.floor(response.code.length / 2))} ${response.code.slice(Math.floor(response.code.length / 2))}`
: response.code;
return {
totpCode: response.code,
totpCodeFormatted,
totpDash: +(Math.round(((78.6 / response.period) * mod + "e+2") as any) + "e-2"),
totpSec: response.period - mod,
totpLow: response.period - mod <= 7,
} as TotpInfo;
}),
)
: undefined;
if (this.previousCipherId !== this.cipherId) {
// FIXME: Verify that this floating promise is intentional. If it is, add an explanatory comment and ensure there is proper error handling.
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.eventCollectionService.collect(EventType.Cipher_ClientViewed, this.cipherId);
}
this.previousCipherId = this.cipherId;
}
}

View File

@@ -153,14 +153,14 @@ export class DefaultNotificationsService implements NotificationsServiceAbstract
await this.syncService.syncUpsertCipher(
notification.payload as SyncCipherNotification,
notification.type === NotificationType.SyncCipherUpdate,
payloadUserId,
userId,
);
break;
case NotificationType.SyncCipherDelete:
case NotificationType.SyncLoginDelete:
await this.syncService.syncDeleteCipher(
notification.payload as SyncCipherNotification,
payloadUserId,
userId,
);
break;
case NotificationType.SyncFolderCreate:

View File

@@ -1,17 +1,6 @@
// FIXME: Update this file to be type safe and remove this and next line
// @ts-strict-ignore
import {
combineLatest,
filter,
firstValueFrom,
map,
merge,
Observable,
of,
shareReplay,
Subject,
switchMap,
} from "rxjs";
import { combineLatest, filter, firstValueFrom, map, Observable, Subject, switchMap } from "rxjs";
import { SemVer } from "semver";
import { LogService } from "@bitwarden/common/platform/abstractions/log.service";
@@ -40,6 +29,7 @@ import { SymmetricCryptoKey } from "../../platform/models/domain/symmetric-crypt
import { StateProvider } from "../../platform/state";
import { CipherId, CollectionId, OrganizationId, UserId } from "../../types/guid";
import { OrgKey, UserKey } from "../../types/key";
import { perUserCache$ } from "../../vault/utils/observable-utilities";
import { CipherService as CipherServiceAbstraction } from "../abstractions/cipher.service";
import { CipherFileUploadService } from "../abstractions/file-upload/cipher-file-upload.service";
import { FieldType } from "../enums";
@@ -91,11 +81,12 @@ export class CipherService implements CipherServiceAbstraction {
this.sortCiphersByLastUsed,
);
/**
* Observable that forces the `cipherViews$` observable to re-emit with the provided value.
* Used to let subscribers of `cipherViews$` know that the decrypted ciphers have been cleared for the active user.
* Observable that forces the `cipherViews$` observable for the given user to emit a null value.
* Used to let subscribers of `cipherViews$` know that the decrypted ciphers have been cleared for the user and to
* clear them from the shareReplay buffer created in perUserCache$().
* @private
*/
private forceCipherViews$: Subject<CipherView[]> = new Subject<CipherView[]>();
private clearCipherViewsForUser$: Subject<UserId> = new Subject<UserId>();
constructor(
private keyService: KeyService,
@@ -132,13 +123,16 @@ export class CipherService implements CipherServiceAbstraction {
* A `null` value indicates that the latest encrypted ciphers have not been decrypted yet and that
* decryption is in progress. The latest decrypted ciphers will be emitted once decryption is complete.
*/
cipherViews$(userId: UserId): Observable<CipherView[] | null> {
return combineLatest([this.encryptedCiphersState(userId).state$, this.localData$(userId)]).pipe(
filter(([ciphers]) => ciphers != null), // Skip if ciphers haven't been loaded yor synced yet
switchMap(() => merge(this.forceCipherViews$, this.getAllDecrypted(userId))),
shareReplay({ bufferSize: 1, refCount: true }),
cipherViews$ = perUserCache$((userId: UserId): Observable<CipherView[] | null> => {
return combineLatest([
this.encryptedCiphersState(userId).state$,
this.localData$(userId),
this.keyService.cipherDecryptionKeys$(userId, true),
]).pipe(
filter(([ciphers, keys]) => ciphers != null && keys != null), // Skip if ciphers haven't been loaded yor synced yet
switchMap(() => this.getAllDecrypted(userId)),
);
}
}, this.clearCipherViewsForUser$);
addEditCipherInfo$(userId: UserId): Observable<AddEditCipherInfo> {
return this.addEditCipherInfoState(userId).state$;
@@ -149,13 +143,11 @@ export class CipherService implements CipherServiceAbstraction {
*
* An empty array indicates that all ciphers were successfully decrypted.
*/
failedToDecryptCiphers$(userId: UserId): Observable<CipherView[]> {
failedToDecryptCiphers$ = perUserCache$((userId: UserId): Observable<CipherView[]> => {
return this.failedToDecryptCiphersState(userId).state$.pipe(
filter((ciphers) => ciphers != null),
switchMap((ciphers) => merge(this.forceCipherViews$, of(ciphers))),
shareReplay({ bufferSize: 1, refCount: true }),
);
}
}, this.clearCipherViewsForUser$);
async setDecryptedCipherCache(value: CipherView[], userId: UserId) {
// Sometimes we might prematurely decrypt the vault and that will result in no ciphers
@@ -190,10 +182,8 @@ export class CipherService implements CipherServiceAbstraction {
userId ??= activeUserId;
await this.clearDecryptedCiphersState(userId);
// Force the cipherView$ observable (which always tracks the active user) to re-emit
if (userId == activeUserId) {
this.forceCipherViews$.next(null);
}
// Force the cached cipherView$ observable(s) to emit a null value
this.clearCipherViewsForUser$.next(userId);
}
async encrypt(
@@ -402,10 +392,14 @@ export class CipherService implements CipherServiceAbstraction {
return await this.getDecryptedCiphers(userId);
}
const [newDecCiphers, failedCiphers] = await this.decryptCiphers(
await this.getAll(userId),
userId,
);
const decrypted = await this.decryptCiphers(await this.getAll(userId), userId);
// We failed to decrypt, return empty array but do not cache
if (decrypted == null) {
return [];
}
const [newDecCiphers, failedCiphers] = decrypted;
await this.setDecryptedCipherCache(newDecCiphers, userId);
await this.setFailedDecryptedCiphers(failedCiphers, userId);
@@ -429,12 +423,12 @@ export class CipherService implements CipherServiceAbstraction {
private async decryptCiphers(
ciphers: Cipher[],
userId: UserId,
): Promise<[CipherView[], CipherView[]]> {
): Promise<[CipherView[], CipherView[]] | null> {
const keys = await firstValueFrom(this.keyService.cipherDecryptionKeys$(userId, true));
if (keys == null || (keys.userKey == null && Object.keys(keys.orgKeys).length === 0)) {
// return early if there are no keys to decrypt with
return [[], []];
return null;
}
// Group ciphers by orgId or under 'null' for the user's ciphers

View File

@@ -12,8 +12,11 @@ import { MessageListener } from "@bitwarden/common/platform/messaging";
import { NotificationsService } from "@bitwarden/common/platform/notifications";
import { StateProvider } from "@bitwarden/common/platform/state";
import { SecurityTaskId, UserId } from "@bitwarden/common/types/guid";
import {
filterOutNullish,
perUserCache$,
} from "@bitwarden/common/vault/utils/observable-utilities";
import { filterOutNullish, perUserCache$ } from "../../utils/observable-utilities";
import { TaskService } from "../abstractions/task.service";
import { SecurityTaskStatus } from "../enums";
import { SecurityTask, SecurityTaskData, SecurityTaskResponse } from "../models";

View File

@@ -1,20 +1,38 @@
import { filter, Observable, OperatorFunction, shareReplay } from "rxjs";
import { EMPTY, filter, map, merge, Observable, OperatorFunction, shareReplay } from "rxjs";
import { UserId } from "@bitwarden/common/types/guid";
/**
* Builds an observable once per userId and caches it for future requests.
* The built observables are shared among subscribers with a replay buffer size of 1.
*
* Optionally, a clearBuffer$ observable can be provided to clear the replay buffer for a specific or all userIds.
* @param create - A function that creates an observable for a given userId.
* @param clearBuffer$ - An observable that, when emitted, clears the buffer for the emitted userId. When null is emitted, all caches are cleared.
*/
export function perUserCache$<TValue>(
create: (userId: UserId) => Observable<TValue>,
): (userId: UserId) => Observable<TValue> {
const cache = new Map<UserId, Observable<TValue>>();
clearBuffer$: Observable<UserId | null>,
): (userId: UserId) => Observable<TValue | null>;
export function perUserCache$<TValue>(
create: (userId: UserId) => Observable<TValue>,
): (userId: UserId) => Observable<TValue>;
export function perUserCache$<TValue>(
create: (userId: UserId) => Observable<TValue>,
clearBuffer$: Observable<UserId | null> | undefined = undefined,
): (userId: UserId) => Observable<TValue | null> {
const cache = new Map<UserId, Observable<TValue | null>>();
return (userId: UserId) => {
let observable = cache.get(userId);
if (!observable) {
observable = create(userId).pipe(shareReplay({ bufferSize: 1, refCount: false }));
clearBuffer$ ??= EMPTY;
observable = merge(
create(userId),
clearBuffer$.pipe(
filter((clearId) => clearId === userId || clearId === null),
map(() => null),
),
).pipe(shareReplay({ bufferSize: 1, refCount: false }));
cache.set(userId, observable);
}
return observable;