mirror of
https://github.com/bitwarden/browser
synced 2026-02-11 05:53:42 +00:00
fix engine-settings desync error
This commit is contained in:
13
libs/common/src/tools/rx.rxjs.ts
Normal file
13
libs/common/src/tools/rx.rxjs.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import { Observable } from "rxjs";
|
||||
|
||||
/**
|
||||
* Used to infer types from arguments to functions like {@link withLatestReady}.
|
||||
* So that you can have `forkJoin([Observable<A>, PromiseLike<B>]): Observable<[A, B]>`
|
||||
* et al.
|
||||
* @remarks this type definition is derived from rxjs' {@link ObservableInputTuple}.
|
||||
* The difference is it *only* works with observables, while the rx version works
|
||||
* with any thing that can become an observable.
|
||||
*/
|
||||
export type ObservableTuple<T> = {
|
||||
[K in keyof T]: Observable<T[K]>;
|
||||
};
|
||||
@@ -20,8 +20,11 @@ import {
|
||||
startWith,
|
||||
pairwise,
|
||||
MonoTypeOperatorFunction,
|
||||
Cons,
|
||||
} from "rxjs";
|
||||
|
||||
import { ObservableTuple } from "./rx.rxjs";
|
||||
|
||||
/** Returns its input. */
|
||||
function identity(value: any): any {
|
||||
return value;
|
||||
@@ -164,26 +167,30 @@ export function ready<T>(watch$: Observable<any> | Observable<any>[]) {
|
||||
);
|
||||
}
|
||||
|
||||
export function withLatestReady<Source, Watch>(
|
||||
watch$: Observable<Watch>,
|
||||
): OperatorFunction<Source, [Source, Watch]> {
|
||||
export function withLatestReady<Source, Watch extends readonly unknown[]>(
|
||||
...watches$: [...ObservableTuple<Watch>]
|
||||
): OperatorFunction<Source, Cons<Source, Watch>> {
|
||||
return connect((source$) => {
|
||||
// these subscriptions are safe because `source$` connects only after there
|
||||
// is an external subscriber.
|
||||
const source = new ReplaySubject<Source>(1);
|
||||
source$.subscribe(source);
|
||||
const watch = new ReplaySubject<Watch>(1);
|
||||
watch$.subscribe(watch);
|
||||
|
||||
const watches = watches$.map((w) => {
|
||||
const watch$ = new ReplaySubject<unknown>(1);
|
||||
w.subscribe(watch$);
|
||||
return watch$;
|
||||
}) as [...ObservableTuple<Watch>];
|
||||
|
||||
// `concat` is subscribed immediately after it's returned, at which point
|
||||
// `zip` blocks until all items in `watching$` are ready. If that occurs
|
||||
// `zip` blocks until all items in `watches` are ready. If that occurs
|
||||
// after `source$` is hot, then the replay subject sends the last-captured
|
||||
// emission through immediately. Otherwise, `ready` waits for the next
|
||||
// emission
|
||||
return concat(zip(watch).pipe(first(), ignoreElements()), source).pipe(
|
||||
withLatestFrom(watch),
|
||||
// emission through immediately. Otherwise, `withLatestFrom` waits for the
|
||||
// next emission
|
||||
return concat(zip(watches).pipe(first(), ignoreElements()), source).pipe(
|
||||
withLatestFrom(...watches),
|
||||
takeUntil(anyComplete(source)),
|
||||
);
|
||||
) as Observable<Cons<Source, Watch>>;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user