mirror of
https://github.com/bitwarden/browser
synced 2025-12-12 14:23:32 +00:00
[PM-8285] Resolve app id race (#9501)
* Do not update appId if it is not null * Prefer linear transformations to side-effect-based changes This leaves us open to repeat emits due to updates, but distinct until changed stops those. Tracker improvements are due to passed in observables with replay causing immediate emits when `expectingEmission`s. This converts to a cold observable that only emits when the tracked observable does _after_ subscribing. * Prefer while * PR review
This commit is contained in:
@@ -1,10 +1,11 @@
|
||||
import { Observable, Subscription, firstValueFrom, throwError, timeout } from "rxjs";
|
||||
import { Observable, Subject, Subscription, firstValueFrom, throwError, timeout } from "rxjs";
|
||||
|
||||
/** Test class to enable async awaiting of observable emissions */
|
||||
export class ObservableTracker<T> {
|
||||
private subscription: Subscription;
|
||||
private emissionReceived = new Subject<T>();
|
||||
emissions: T[] = [];
|
||||
constructor(private observable: Observable<T>) {
|
||||
constructor(observable: Observable<T>) {
|
||||
this.emissions = this.trackEmissions(observable);
|
||||
}
|
||||
|
||||
@@ -21,7 +22,7 @@ export class ObservableTracker<T> {
|
||||
*/
|
||||
async expectEmission(msTimeout = 50): Promise<T> {
|
||||
return await firstValueFrom(
|
||||
this.observable.pipe(
|
||||
this.emissionReceived.pipe(
|
||||
timeout({
|
||||
first: msTimeout,
|
||||
with: () => throwError(() => new Error("Timeout exceeded waiting for another emission.")),
|
||||
@@ -34,40 +35,38 @@ export class ObservableTracker<T> {
|
||||
* @param count The number of emissions to wait for
|
||||
*/
|
||||
async pauseUntilReceived(count: number, msTimeout = 50): Promise<T[]> {
|
||||
for (let i = 0; i < count - this.emissions.length; i++) {
|
||||
while (this.emissions.length < count) {
|
||||
await this.expectEmission(msTimeout);
|
||||
}
|
||||
return this.emissions;
|
||||
}
|
||||
|
||||
private trackEmissions<T>(observable: Observable<T>): T[] {
|
||||
private trackEmissions(observable: Observable<T>): T[] {
|
||||
const emissions: T[] = [];
|
||||
this.subscription = observable.subscribe((value) => {
|
||||
switch (value) {
|
||||
case undefined:
|
||||
case null:
|
||||
emissions.push(value);
|
||||
return;
|
||||
default:
|
||||
// process by type
|
||||
break;
|
||||
if (value == null) {
|
||||
this.emissionReceived.next(null);
|
||||
return;
|
||||
}
|
||||
|
||||
switch (typeof value) {
|
||||
case "string":
|
||||
case "number":
|
||||
case "boolean":
|
||||
emissions.push(value);
|
||||
this.emissionReceived.next(value);
|
||||
break;
|
||||
case "symbol":
|
||||
// Cheating types to make symbols work at all
|
||||
emissions.push(value.toString() as T);
|
||||
this.emissionReceived.next(value as T);
|
||||
break;
|
||||
default: {
|
||||
emissions.push(clone(value));
|
||||
this.emissionReceived.next(clone(value));
|
||||
}
|
||||
}
|
||||
});
|
||||
this.emissionReceived.subscribe((value) => {
|
||||
emissions.push(value);
|
||||
});
|
||||
return emissions;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user