mirror of
https://github.com/bitwarden/browser
synced 2025-12-15 07:43:35 +00:00
[PM-16792] [PM-16822] Encapsulate encryptor and state provision within UserStateSubject (#13195)
This commit is contained in:
@@ -1,11 +1,7 @@
|
||||
import { Observable } from "rxjs";
|
||||
|
||||
import { Policy } from "@bitwarden/common/admin-console/models/domain/policy";
|
||||
import { OrganizationId, UserId } from "@bitwarden/common/types/guid";
|
||||
|
||||
import { OrganizationEncryptor } from "./cryptography/organization-encryptor.abstraction";
|
||||
import { UserEncryptor } from "./cryptography/user-encryptor.abstraction";
|
||||
|
||||
/** error emitted when the `SingleUserDependency` changes Ids */
|
||||
export type UserChangedError = {
|
||||
/** the userId pinned by the single user dependency */
|
||||
@@ -22,22 +18,21 @@ export type OrganizationChangedError = {
|
||||
actualOrganizationId: OrganizationId;
|
||||
};
|
||||
|
||||
/** A pattern for types that depend upon a dynamic policy stream and return
|
||||
* an observable.
|
||||
/** A pattern for types that depend upon the lifetime of a fixed dependency.
|
||||
* The dependency's lifetime is tracked through the observable. The observable
|
||||
* emits the dependency once it becomes available and completes when the
|
||||
* dependency becomes unavailable.
|
||||
*
|
||||
* Consumers of this dependency should emit when `policy$`
|
||||
* emits, provided that the latest message materially
|
||||
* changes the output of the consumer. If `policy$` emits
|
||||
* an unrecoverable error, the consumer should continue using
|
||||
* the last-emitted policy. If `policy$` completes, the consumer
|
||||
* should continue using the last-emitted policy.
|
||||
* Consumers of this dependency should emit a `SequenceError` if the dependency emits
|
||||
* multiple times. When the dependency completes, the consumer should also
|
||||
* complete. When the dependency errors, the consumer should also error.
|
||||
*/
|
||||
export type PolicyDependency = {
|
||||
/** A stream that emits policies when subscribed and
|
||||
* when the policy changes. The stream should not
|
||||
* emit null or undefined.
|
||||
export type BoundDependency<Name extends string, T> = {
|
||||
/** A stream that emits a dependency once it becomes available
|
||||
* and completes when the dependency becomes unavailable. The stream emits
|
||||
* only once per subscription and never emits null or undefined.
|
||||
*/
|
||||
policy$: Observable<Policy[]>;
|
||||
[K in `${Name}$`]: Observable<T>;
|
||||
};
|
||||
|
||||
/** A pattern for types that depend upon a dynamic userid and return
|
||||
@@ -72,26 +67,6 @@ export type OrganizationBound<K extends keyof any, T> = { [P in K]: T } & {
|
||||
organizationId: OrganizationId;
|
||||
};
|
||||
|
||||
/** A pattern for types that depend upon a fixed-key encryptor and return
|
||||
* an observable.
|
||||
*
|
||||
* Consumers of this dependency should emit a `OrganizationChangedError` if
|
||||
* the bound OrganizationId changes or if the encryptor changes. If
|
||||
* `singleOrganizationEncryptor$` completes, the consumer should complete
|
||||
* once all events received prior to the completion event are
|
||||
* finished processing. The consumer should, where possible,
|
||||
* prioritize these events in order to complete as soon as possible.
|
||||
* If `singleOrganizationEncryptor$` emits an unrecoverable error, the consumer
|
||||
* should also emit the error.
|
||||
*/
|
||||
export type SingleOrganizationEncryptorDependency = {
|
||||
/** A stream that emits an encryptor when subscribed and the org key
|
||||
* is available, and completes when the org key is no longer available.
|
||||
* The stream should not emit null or undefined.
|
||||
*/
|
||||
singleOrgEncryptor$: Observable<OrganizationBound<"encryptor", OrganizationEncryptor>>;
|
||||
};
|
||||
|
||||
/** A pattern for types that depend upon a fixed-value organizationId and return
|
||||
* an observable.
|
||||
*
|
||||
@@ -112,26 +87,6 @@ export type SingleOrganizationDependency = {
|
||||
singleOrganizationId$: Observable<UserBound<"organizationId", OrganizationId>>;
|
||||
};
|
||||
|
||||
/** A pattern for types that depend upon a fixed-key encryptor and return
|
||||
* an observable.
|
||||
*
|
||||
* Consumers of this dependency should emit a `UserChangedError` if
|
||||
* the bound UserId changes or if the encryptor changes. If
|
||||
* `singleUserEncryptor$` completes, the consumer should complete
|
||||
* once all events received prior to the completion event are
|
||||
* finished processing. The consumer should, where possible,
|
||||
* prioritize these events in order to complete as soon as possible.
|
||||
* If `singleUserEncryptor$` emits an unrecoverable error, the consumer
|
||||
* should also emit the error.
|
||||
*/
|
||||
export type SingleUserEncryptorDependency = {
|
||||
/** A stream that emits an encryptor when subscribed and the user key
|
||||
* is available, and completes when the user key is no longer available.
|
||||
* The stream should not emit null or undefined.
|
||||
*/
|
||||
singleUserEncryptor$: Observable<UserBound<"encryptor", UserEncryptor>>;
|
||||
};
|
||||
|
||||
/** A pattern for types that depend upon a fixed-value userid and return
|
||||
* an observable.
|
||||
*
|
||||
@@ -182,22 +137,6 @@ export type WhenDependency = {
|
||||
when$: Observable<boolean>;
|
||||
};
|
||||
|
||||
/** A pattern for types that allow their managed settings to
|
||||
* be overridden.
|
||||
*
|
||||
* Consumers of this dependency should emit when `settings$`
|
||||
* change. If `settings$` completes, the consumer should also
|
||||
* complete. If `settings$` errors, the consumer should also
|
||||
* emit the error.
|
||||
*/
|
||||
export type SettingsDependency<Settings> = {
|
||||
/** A stream that emits settings when settings become available
|
||||
* and when they change. If the settings are not available, the
|
||||
* stream should wait to emit until they become available.
|
||||
*/
|
||||
settings$: Observable<Settings>;
|
||||
};
|
||||
|
||||
/** A pattern for types that accept an arbitrary dependency and
|
||||
* inject it into behavior-customizing functions.
|
||||
*
|
||||
|
||||
@@ -20,7 +20,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Debug, {
|
||||
message: "this is a debug message",
|
||||
level: LogLevelType.Debug,
|
||||
level: "debug",
|
||||
});
|
||||
});
|
||||
|
||||
@@ -31,7 +31,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Debug, {
|
||||
content: { example: "this is content" },
|
||||
level: LogLevelType.Debug,
|
||||
level: "debug",
|
||||
});
|
||||
});
|
||||
|
||||
@@ -43,7 +43,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Info, {
|
||||
content: { example: "this is content" },
|
||||
message: "this is a message",
|
||||
level: LogLevelType.Info,
|
||||
level: "information",
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -56,7 +56,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Info, {
|
||||
message: "this is an info message",
|
||||
level: LogLevelType.Info,
|
||||
level: "information",
|
||||
});
|
||||
});
|
||||
|
||||
@@ -67,7 +67,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Info, {
|
||||
content: { example: "this is content" },
|
||||
level: LogLevelType.Info,
|
||||
level: "information",
|
||||
});
|
||||
});
|
||||
|
||||
@@ -79,7 +79,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Info, {
|
||||
content: { example: "this is content" },
|
||||
message: "this is a message",
|
||||
level: LogLevelType.Info,
|
||||
level: "information",
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -92,7 +92,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Warning, {
|
||||
message: "this is a warning message",
|
||||
level: LogLevelType.Warning,
|
||||
level: "warning",
|
||||
});
|
||||
});
|
||||
|
||||
@@ -103,7 +103,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Warning, {
|
||||
content: { example: "this is content" },
|
||||
level: LogLevelType.Warning,
|
||||
level: "warning",
|
||||
});
|
||||
});
|
||||
|
||||
@@ -115,7 +115,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Warning, {
|
||||
content: { example: "this is content" },
|
||||
message: "this is a message",
|
||||
level: LogLevelType.Warning,
|
||||
level: "warning",
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -128,7 +128,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Error, {
|
||||
message: "this is an error message",
|
||||
level: LogLevelType.Error,
|
||||
level: "error",
|
||||
});
|
||||
});
|
||||
|
||||
@@ -139,7 +139,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Error, {
|
||||
content: { example: "this is content" },
|
||||
level: LogLevelType.Error,
|
||||
level: "error",
|
||||
});
|
||||
});
|
||||
|
||||
@@ -151,7 +151,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Error, {
|
||||
content: { example: "this is content" },
|
||||
message: "this is a message",
|
||||
level: LogLevelType.Error,
|
||||
level: "error",
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -164,7 +164,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Error, {
|
||||
message: "this is an error message",
|
||||
level: LogLevelType.Error,
|
||||
level: "error",
|
||||
});
|
||||
});
|
||||
|
||||
@@ -178,7 +178,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Error, {
|
||||
content: { example: "this is content" },
|
||||
message: "this is an error message",
|
||||
level: LogLevelType.Error,
|
||||
level: "error",
|
||||
});
|
||||
});
|
||||
|
||||
@@ -192,7 +192,7 @@ describe("DefaultSemanticLogger", () => {
|
||||
expect(logger.write).toHaveBeenCalledWith(LogLevelType.Error, {
|
||||
content: "this is content",
|
||||
message: "this is an error message",
|
||||
level: LogLevelType.Error,
|
||||
level: "error",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -52,7 +52,7 @@ export class DefaultSemanticLogger<Context extends object> implements SemanticLo
|
||||
...this.context,
|
||||
message,
|
||||
content: content ?? undefined,
|
||||
level,
|
||||
level: stringifyLevel(level),
|
||||
};
|
||||
|
||||
if (typeof content === "string" && !message) {
|
||||
@@ -63,3 +63,18 @@ export class DefaultSemanticLogger<Context extends object> implements SemanticLo
|
||||
this.logger.write(level, log);
|
||||
}
|
||||
}
|
||||
|
||||
function stringifyLevel(level: LogLevelType) {
|
||||
switch (level) {
|
||||
case LogLevelType.Debug:
|
||||
return "debug";
|
||||
case LogLevelType.Info:
|
||||
return "information";
|
||||
case LogLevelType.Warning:
|
||||
return "warning";
|
||||
case LogLevelType.Error:
|
||||
return "error";
|
||||
default:
|
||||
return `${level}`;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,3 +28,22 @@ export function disabledSemanticLoggerProvider<Context extends object>(
|
||||
export function consoleSemanticLoggerProvider(logger: LogService): SemanticLogger {
|
||||
return new DefaultSemanticLogger(logger, {});
|
||||
}
|
||||
|
||||
/** Instantiates a semantic logger that emits logs to the console.
|
||||
* @param context a static payload that is cloned when the logger
|
||||
* logs a message. The `messages`, `level`, and `content` fields
|
||||
* are reserved for use by loggers.
|
||||
* @param settings specializes how the semantic logger functions.
|
||||
* If this is omitted, the logger suppresses debug messages.
|
||||
*/
|
||||
export function ifEnabledSemanticLoggerProvider<Context extends object>(
|
||||
enable: boolean,
|
||||
logger: LogService,
|
||||
context: Jsonify<Context>,
|
||||
) {
|
||||
if (enable) {
|
||||
return new DefaultSemanticLogger(logger, context);
|
||||
} else {
|
||||
return disabledSemanticLoggerProvider(context);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
export { disabledSemanticLoggerProvider, consoleSemanticLoggerProvider } from "./factory";
|
||||
export * from "./factory";
|
||||
export { SemanticLogger } from "./semantic-logger.abstraction";
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
* include structuredClone in test environment.
|
||||
* @jest-environment ../../../../shared/test.environment.ts
|
||||
*/
|
||||
// @ts-strict-ignore this file explicitly tests what happens when types are ignored
|
||||
import { of, firstValueFrom, Subject, tap, EmptyError } from "rxjs";
|
||||
|
||||
import { awaitAsync, trackEmissions } from "../../spec";
|
||||
@@ -14,6 +15,7 @@ import {
|
||||
ready,
|
||||
reduceCollection,
|
||||
withLatestReady,
|
||||
pin,
|
||||
} from "./rx";
|
||||
|
||||
describe("errorOnChange", () => {
|
||||
@@ -675,3 +677,72 @@ describe("on", () => {
|
||||
expect(error).toEqual(expected);
|
||||
});
|
||||
});
|
||||
|
||||
describe("pin", () => {
|
||||
it("emits the first value", async () => {
|
||||
const input = new Subject<unknown>();
|
||||
const result: unknown[] = [];
|
||||
|
||||
input.pipe(pin()).subscribe((v) => result.push(v));
|
||||
input.next(1);
|
||||
|
||||
expect(result).toEqual([1]);
|
||||
});
|
||||
|
||||
it("filters repeated emissions", async () => {
|
||||
const input = new Subject<unknown>();
|
||||
const result: unknown[] = [];
|
||||
|
||||
input.pipe(pin({ distinct: (p, c) => p == c })).subscribe((v) => result.push(v));
|
||||
input.next(1);
|
||||
input.next(1);
|
||||
|
||||
expect(result).toEqual([1]);
|
||||
});
|
||||
|
||||
it("errors if multiple emissions occur", async () => {
|
||||
const input = new Subject<unknown>();
|
||||
let error: any = null!;
|
||||
|
||||
input.pipe(pin()).subscribe({
|
||||
error: (e: unknown) => {
|
||||
error = e;
|
||||
},
|
||||
});
|
||||
input.next(1);
|
||||
input.next(1);
|
||||
|
||||
expect(error).toBeInstanceOf(Error);
|
||||
expect(error.message).toMatch(/^unknown/);
|
||||
});
|
||||
|
||||
it("names the pinned observables if multiple emissions occur", async () => {
|
||||
const input = new Subject<unknown>();
|
||||
let error: any = null!;
|
||||
|
||||
input.pipe(pin({ name: () => "example" })).subscribe({
|
||||
error: (e: unknown) => {
|
||||
error = e;
|
||||
},
|
||||
});
|
||||
input.next(1);
|
||||
input.next(1);
|
||||
|
||||
expect(error).toBeInstanceOf(Error);
|
||||
expect(error.message).toMatch(/^example/);
|
||||
});
|
||||
|
||||
it("errors if indistinct emissions occur", async () => {
|
||||
const input = new Subject<unknown>();
|
||||
let error: any = null!;
|
||||
|
||||
input
|
||||
.pipe(pin({ distinct: (p, c) => p == c }))
|
||||
.subscribe({ error: (e: unknown) => (error = e) });
|
||||
input.next(1);
|
||||
input.next(2);
|
||||
|
||||
expect(error).toBeInstanceOf(Error);
|
||||
expect(error.message).toMatch(/^unknown/);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -19,6 +19,7 @@ import {
|
||||
concatMap,
|
||||
startWith,
|
||||
pairwise,
|
||||
MonoTypeOperatorFunction,
|
||||
} from "rxjs";
|
||||
|
||||
/** Returns its input. */
|
||||
@@ -213,3 +214,27 @@ export function on<T>(watch$: Observable<any>) {
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
/** Create an observable that emits the first value from the source and
|
||||
* throws if the observable emits another value.
|
||||
* @param options.name names the pin to make discovering failing observables easier
|
||||
* @param options.distinct compares two emissions with each other to determine whether
|
||||
* the second emission is a duplicate. When this is specified, duplicates are ignored.
|
||||
* When this isn't specified, any emission after the first causes the pin to throw
|
||||
* an error.
|
||||
*/
|
||||
export function pin<T>(options?: {
|
||||
name?: () => string;
|
||||
distinct?: (previous: T, current: T) => boolean;
|
||||
}): MonoTypeOperatorFunction<T> {
|
||||
return pipe(
|
||||
options?.distinct ? distinctUntilChanged(options.distinct) : (i) => i,
|
||||
map((value, index) => {
|
||||
if (index > 0) {
|
||||
throw new Error(`${options?.name?.() ?? "unknown"} observable should only emit one value.`);
|
||||
} else {
|
||||
return value;
|
||||
}
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -10,9 +10,6 @@ import { Classifier } from "./classifier";
|
||||
* when you are performing your own encryption and decryption.
|
||||
* `classified` uses the `ClassifiedFormat` type as its format.
|
||||
* `secret-state` uses `Array<ClassifiedFormat>` with a length of 1.
|
||||
* @remarks - CAUTION! If your on-disk data is not in a correct format,
|
||||
* the storage system treats the data as corrupt and returns your initial
|
||||
* value.
|
||||
*/
|
||||
export type ObjectStorageFormat = "plain" | "classified" | "secret-state";
|
||||
|
||||
@@ -27,19 +24,54 @@ export type ObjectStorageFormat = "plain" | "classified" | "secret-state";
|
||||
// options. Also allow swap between "classifier" and "classification"; the
|
||||
// latter is a list of properties/arguments to the specific classifier in-use.
|
||||
export type ObjectKey<State, Secret = State, Disclosed = Record<string, never>> = {
|
||||
/** Type of data stored by this key; Object keys always use "object" targets.
|
||||
* "object" - a singleton value.
|
||||
* "list" - multiple values identified by their list index.
|
||||
* "record" - multiple values identified by a uuid.
|
||||
*/
|
||||
target: "object";
|
||||
|
||||
/** Identifies the stored state */
|
||||
key: string;
|
||||
|
||||
/** Defines the storage location and parameters for this state */
|
||||
state: StateDefinition;
|
||||
|
||||
/** Defines the visibility and encryption treatment for the stored state.
|
||||
* Disclosed data is written as plain-text. Secret data is protected with
|
||||
* the user key.
|
||||
*/
|
||||
classifier: Classifier<State, Disclosed, Secret>;
|
||||
|
||||
/** Specifies the format of data written to storage.
|
||||
* @remarks - CAUTION! If your on-disk data is not in a correct format,
|
||||
* the storage system treats the data as corrupt and returns your initial
|
||||
* value.
|
||||
*/
|
||||
format: ObjectStorageFormat;
|
||||
|
||||
/** customizes the behavior of the storage location */
|
||||
options: UserKeyDefinitionOptions<State>;
|
||||
|
||||
/** When this is defined, empty data is replaced with a copy of the initial data.
|
||||
* This causes the state to always be defined from the perspective of the
|
||||
* subject's consumer.
|
||||
*/
|
||||
initial?: State;
|
||||
|
||||
/** For encrypted outputs, determines how much padding is applied to
|
||||
* encoded inputs. When this isn't specified, each frame is 32 bytes
|
||||
* long.
|
||||
*/
|
||||
frame?: number;
|
||||
};
|
||||
|
||||
/** Performs a type inference that identifies object keys. */
|
||||
export function isObjectKey(key: any): key is ObjectKey<unknown> {
|
||||
return key.target === "object" && "format" in key && "classifier" in key;
|
||||
}
|
||||
|
||||
/** Converts an object key to a plaform-compatible `UserKeyDefinition`. */
|
||||
export function toUserKeyDefinition<State, Secret, Disclosed>(
|
||||
key: ObjectKey<State, Secret, Disclosed>,
|
||||
) {
|
||||
|
||||
@@ -1,20 +1,13 @@
|
||||
import { RequireExactlyOne, Simplify } from "type-fest";
|
||||
import { Simplify } from "type-fest";
|
||||
|
||||
import {
|
||||
Dependencies,
|
||||
SingleUserDependency,
|
||||
SingleUserEncryptorDependency,
|
||||
WhenDependency,
|
||||
} from "../dependencies";
|
||||
import { Account } from "../../auth/abstractions/account.service";
|
||||
import { Dependencies, BoundDependency, WhenDependency } from "../dependencies";
|
||||
|
||||
import { SubjectConstraintsDependency } from "./state-constraints-dependency";
|
||||
|
||||
/** dependencies accepted by the user state subject */
|
||||
export type UserStateSubjectDependencies<State, Dependency> = Simplify<
|
||||
RequireExactlyOne<
|
||||
SingleUserDependency & SingleUserEncryptorDependency,
|
||||
"singleUserEncryptor$" | "singleUserId$"
|
||||
> &
|
||||
BoundDependency<"account", Account> &
|
||||
Partial<WhenDependency> &
|
||||
Partial<Dependencies<Dependency>> &
|
||||
Partial<SubjectConstraintsDependency<State>> & {
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
import { Jsonify } from "type-fest";
|
||||
|
||||
import { StateProvider } from "../../platform/state";
|
||||
import { LegacyEncryptorProvider } from "../cryptography/legacy-encryptor-provider";
|
||||
import { SemanticLogger } from "../log";
|
||||
|
||||
/** Aggregates user state subject dependencies */
|
||||
export abstract class UserStateSubjectDependencyProvider {
|
||||
/** Provides objects that encrypt and decrypt user and organization data */
|
||||
abstract encryptor: LegacyEncryptorProvider;
|
||||
|
||||
/** Provides local object persistence */
|
||||
abstract state: StateProvider;
|
||||
|
||||
/** Provides semantic logging */
|
||||
abstract log: <Context extends object>(_context: Jsonify<Context>) => SemanticLogger;
|
||||
}
|
||||
@@ -1,33 +1,61 @@
|
||||
// FIXME: Update this file to be type safe and remove this and next line
|
||||
// @ts-strict-ignore
|
||||
import { BehaviorSubject, of, Subject } from "rxjs";
|
||||
|
||||
import { awaitAsync, FakeSingleUserState, ObservableTracker } from "../../../spec";
|
||||
import {
|
||||
awaitAsync,
|
||||
FakeAccountService,
|
||||
FakeStateProvider,
|
||||
ObservableTracker,
|
||||
} from "../../../spec";
|
||||
import { Account } from "../../auth/abstractions/account.service";
|
||||
import { GENERATOR_DISK, UserKeyDefinition } from "../../platform/state";
|
||||
import { UserId } from "../../types/guid";
|
||||
import { LegacyEncryptorProvider } from "../cryptography/legacy-encryptor-provider";
|
||||
import { UserEncryptor } from "../cryptography/user-encryptor.abstraction";
|
||||
import { UserBound } from "../dependencies";
|
||||
import { disabledSemanticLoggerProvider } from "../log";
|
||||
import { PrivateClassifier } from "../private-classifier";
|
||||
import { StateConstraints } from "../types";
|
||||
|
||||
import { ClassifiedFormat } from "./classified-format";
|
||||
import { ObjectKey } from "./object-key";
|
||||
import { UserStateSubject } from "./user-state-subject";
|
||||
|
||||
const SomeUser = "some user" as UserId;
|
||||
const SomeAccount = {
|
||||
id: SomeUser,
|
||||
email: "someone@example.com",
|
||||
emailVerified: true,
|
||||
name: "Someone",
|
||||
};
|
||||
const SomeAccount$ = new BehaviorSubject<Account>(SomeAccount);
|
||||
|
||||
const SomeOtherAccount = {
|
||||
id: "some other user" as UserId,
|
||||
email: "someone@example.com",
|
||||
emailVerified: true,
|
||||
name: "Someone",
|
||||
};
|
||||
|
||||
type TestType = { foo: string };
|
||||
const SomeKey = new UserKeyDefinition<TestType>(GENERATOR_DISK, "TestKey", {
|
||||
deserializer: (d) => d as TestType,
|
||||
clearOn: [],
|
||||
});
|
||||
|
||||
const SomeObjectKeyDefinition = new UserKeyDefinition<unknown>(GENERATOR_DISK, "TestKey", {
|
||||
deserializer: (d) => d as unknown,
|
||||
clearOn: ["logout"],
|
||||
});
|
||||
|
||||
const SomeObjectKey = {
|
||||
target: "object",
|
||||
key: "TestObjectKey",
|
||||
state: GENERATOR_DISK,
|
||||
key: SomeObjectKeyDefinition.key,
|
||||
state: SomeObjectKeyDefinition.stateDefinition,
|
||||
classifier: new PrivateClassifier(),
|
||||
format: "classified",
|
||||
options: {
|
||||
deserializer: (d) => d as TestType,
|
||||
clearOn: ["logout"],
|
||||
clearOn: SomeObjectKeyDefinition.clearOn,
|
||||
},
|
||||
} satisfies ObjectKey<TestType>;
|
||||
|
||||
@@ -45,6 +73,25 @@ const SomeEncryptor: UserEncryptor = {
|
||||
},
|
||||
};
|
||||
|
||||
const SomeAccountService = new FakeAccountService({
|
||||
[SomeUser]: SomeAccount,
|
||||
});
|
||||
|
||||
const SomeStateProvider = new FakeStateProvider(SomeAccountService);
|
||||
|
||||
const SomeProvider = {
|
||||
encryptor: {
|
||||
userEncryptor$: jest.fn(() => {
|
||||
return new BehaviorSubject({ encryptor: SomeEncryptor, userId: SomeUser }).asObservable();
|
||||
}),
|
||||
organizationEncryptor$() {
|
||||
throw new Error("`organizationEncryptor$` should never be invoked.");
|
||||
},
|
||||
} as LegacyEncryptorProvider,
|
||||
state: SomeStateProvider,
|
||||
log: disabledSemanticLoggerProvider,
|
||||
};
|
||||
|
||||
function fooMaxLength(maxLength: number): StateConstraints<TestType> {
|
||||
return Object.freeze({
|
||||
constraints: { foo: { maxLength } },
|
||||
@@ -68,18 +115,21 @@ const DynamicFooMaxLength = Object.freeze({
|
||||
},
|
||||
});
|
||||
|
||||
const SomeKeySomeUserInitialValue = Object.freeze({ foo: "init" });
|
||||
|
||||
describe("UserStateSubject", () => {
|
||||
beforeEach(async () => {
|
||||
await SomeStateProvider.setUserState(SomeKey, SomeKeySomeUserInitialValue, SomeUser);
|
||||
});
|
||||
|
||||
describe("dependencies", () => {
|
||||
it("ignores repeated when$ emissions", async () => {
|
||||
// this test looks for `nextValue` because a subscription isn't necessary for
|
||||
// the subject to update
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const nextValue = jest.fn((_, next) => next);
|
||||
const when$ = new BehaviorSubject(true);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, {
|
||||
singleUserId$,
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
nextValue,
|
||||
when$,
|
||||
});
|
||||
@@ -96,90 +146,64 @@ describe("UserStateSubject", () => {
|
||||
expect(nextValue).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("ignores repeated singleUserId$ emissions", async () => {
|
||||
// this test looks for `nextValue` because a subscription isn't necessary for
|
||||
// the subject to update
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const nextValue = jest.fn((_, next) => next);
|
||||
const when$ = new BehaviorSubject(true);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, {
|
||||
singleUserId$,
|
||||
nextValue,
|
||||
when$,
|
||||
it("errors when account$ changes accounts", async () => {
|
||||
const account$ = new BehaviorSubject<Account>(SomeAccount);
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$,
|
||||
});
|
||||
let error: any = null;
|
||||
subject.subscribe({
|
||||
error(e: unknown) {
|
||||
error = e;
|
||||
},
|
||||
});
|
||||
|
||||
// the interleaved await asyncs are only necessary b/c `nextValue` is called asynchronously
|
||||
subject.next({ foo: "next" });
|
||||
await awaitAsync();
|
||||
singleUserId$.next(SomeUser);
|
||||
await awaitAsync();
|
||||
singleUserId$.next(SomeUser);
|
||||
singleUserId$.next(SomeUser);
|
||||
account$.next(SomeOtherAccount);
|
||||
await awaitAsync();
|
||||
|
||||
expect(nextValue).toHaveBeenCalledTimes(1);
|
||||
expect(error).toBeInstanceOf(Error);
|
||||
expect(error.message).toMatch(/UserStateSubject\(generator, TestKey\) \{ account\$ \}/);
|
||||
});
|
||||
|
||||
it("ignores repeated singleUserEncryptor$ emissions", async () => {
|
||||
// this test looks for `nextValue` because a subscription isn't necessary for
|
||||
// the subject to update
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const nextValue = jest.fn((_, next) => next);
|
||||
const singleUserEncryptor$ = new BehaviorSubject({ userId: SomeUser, encryptor: null });
|
||||
const subject = new UserStateSubject(SomeKey, () => state, {
|
||||
nextValue,
|
||||
singleUserEncryptor$,
|
||||
});
|
||||
it("waits for account$", async () => {
|
||||
await SomeStateProvider.setUserState(
|
||||
SomeObjectKeyDefinition,
|
||||
{ id: null, secret: '{"foo":"init"}', disclosed: {} } as unknown,
|
||||
SomeUser,
|
||||
);
|
||||
const account$ = new Subject<Account>();
|
||||
const subject = new UserStateSubject(SomeObjectKey, SomeProvider, { account$ });
|
||||
|
||||
// the interleaved await asyncs are only necessary b/c `nextValue` is called asynchronously
|
||||
subject.next({ foo: "next" });
|
||||
await awaitAsync();
|
||||
singleUserEncryptor$.next({ userId: SomeUser, encryptor: null });
|
||||
await awaitAsync();
|
||||
singleUserEncryptor$.next({ userId: SomeUser, encryptor: null });
|
||||
singleUserEncryptor$.next({ userId: SomeUser, encryptor: null });
|
||||
const results = [] as any[];
|
||||
subject.subscribe((v) => results.push(v));
|
||||
// precondition: no immediate emission upon subscribe
|
||||
expect(results).toEqual([]);
|
||||
|
||||
account$.next(SomeAccount);
|
||||
await awaitAsync();
|
||||
|
||||
expect(nextValue).toHaveBeenCalledTimes(1);
|
||||
expect(results).toEqual([{ foo: "decrypt(init)" }]);
|
||||
});
|
||||
|
||||
it("waits for constraints$", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const constraints$ = new Subject<StateConstraints<TestType>>();
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const tracker = new ObservableTracker(subject);
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const results = [] as any[];
|
||||
subject.subscribe((v) => results.push(v));
|
||||
|
||||
constraints$.next(fooMaxLength(3));
|
||||
const [initResult] = await tracker.pauseUntilReceived(1);
|
||||
await awaitAsync();
|
||||
|
||||
expect(initResult).toEqual({ foo: "ini" });
|
||||
});
|
||||
|
||||
it("waits for singleUserEncryptor$", async () => {
|
||||
const state = new FakeSingleUserState<ClassifiedFormat<void, Record<string, never>>>(
|
||||
SomeUser,
|
||||
{ id: null, secret: '{"foo":"init"}', disclosed: {} },
|
||||
);
|
||||
const singleUserEncryptor$ = new Subject<UserBound<"encryptor", UserEncryptor>>();
|
||||
const subject = new UserStateSubject(SomeObjectKey, () => state, { singleUserEncryptor$ });
|
||||
const tracker = new ObservableTracker(subject);
|
||||
|
||||
singleUserEncryptor$.next({ userId: SomeUser, encryptor: SomeEncryptor });
|
||||
const [initResult] = await tracker.pauseUntilReceived(1);
|
||||
|
||||
expect(initResult).toEqual({ foo: "decrypt(init)" });
|
||||
expect(results).toEqual([{ foo: "ini" }]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("next", () => {
|
||||
it("emits the next value", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$: SomeAccount$ });
|
||||
const expected: TestType = { foo: "next" };
|
||||
|
||||
let actual: TestType = null;
|
||||
@@ -193,44 +217,39 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("ceases emissions once complete", async () => {
|
||||
const initialState = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialState);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$: SomeAccount$ });
|
||||
let actual: TestType = null;
|
||||
subject.subscribe((value) => {
|
||||
actual = value;
|
||||
});
|
||||
await awaitAsync();
|
||||
|
||||
subject.complete();
|
||||
subject.next({ foo: "ignored" });
|
||||
await awaitAsync();
|
||||
|
||||
expect(actual).toEqual(initialState);
|
||||
expect(actual).toEqual(SomeKeySomeUserInitialValue);
|
||||
});
|
||||
|
||||
it("evaluates shouldUpdate", async () => {
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const shouldUpdate = jest.fn(() => true);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, shouldUpdate });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
shouldUpdate,
|
||||
});
|
||||
|
||||
const nextVal: TestType = { foo: "next" };
|
||||
subject.next(nextVal);
|
||||
await awaitAsync();
|
||||
|
||||
expect(shouldUpdate).toHaveBeenCalledWith(initialValue, nextVal, null);
|
||||
expect(shouldUpdate).toHaveBeenCalledWith(SomeKeySomeUserInitialValue, nextVal, null);
|
||||
});
|
||||
|
||||
it("evaluates shouldUpdate with a dependency", async () => {
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const shouldUpdate = jest.fn(() => true);
|
||||
const dependencyValue = { bar: "dependency" };
|
||||
const subject = new UserStateSubject(SomeKey, () => state, {
|
||||
singleUserId$,
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
shouldUpdate,
|
||||
dependencies$: of(dependencyValue),
|
||||
});
|
||||
@@ -239,15 +258,19 @@ describe("UserStateSubject", () => {
|
||||
subject.next(nextVal);
|
||||
await awaitAsync();
|
||||
|
||||
expect(shouldUpdate).toHaveBeenCalledWith(initialValue, nextVal, dependencyValue);
|
||||
expect(shouldUpdate).toHaveBeenCalledWith(
|
||||
SomeKeySomeUserInitialValue,
|
||||
nextVal,
|
||||
dependencyValue,
|
||||
);
|
||||
});
|
||||
|
||||
it("emits a value when shouldUpdate returns `true`", async () => {
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const shouldUpdate = jest.fn(() => true);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, shouldUpdate });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
shouldUpdate,
|
||||
});
|
||||
const expected: TestType = { foo: "next" };
|
||||
|
||||
let actual: TestType = null;
|
||||
@@ -261,11 +284,11 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("retains the current value when shouldUpdate returns `false`", async () => {
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const shouldUpdate = jest.fn(() => false);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, shouldUpdate });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
shouldUpdate,
|
||||
});
|
||||
|
||||
subject.next({ foo: "next" });
|
||||
await awaitAsync();
|
||||
@@ -274,31 +297,28 @@ describe("UserStateSubject", () => {
|
||||
actual = value;
|
||||
});
|
||||
|
||||
expect(actual).toEqual(initialValue);
|
||||
expect(actual).toEqual(SomeKeySomeUserInitialValue);
|
||||
});
|
||||
|
||||
it("evaluates nextValue", async () => {
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const nextValue = jest.fn((_, next) => next);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, nextValue });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
nextValue,
|
||||
});
|
||||
|
||||
const nextVal: TestType = { foo: "next" };
|
||||
subject.next(nextVal);
|
||||
await awaitAsync();
|
||||
|
||||
expect(nextValue).toHaveBeenCalledWith(initialValue, nextVal, null);
|
||||
expect(nextValue).toHaveBeenCalledWith(SomeKeySomeUserInitialValue, nextVal, null);
|
||||
});
|
||||
|
||||
it("evaluates nextValue with a dependency", async () => {
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const nextValue = jest.fn((_, next) => next);
|
||||
const dependencyValue = { bar: "dependency" };
|
||||
const subject = new UserStateSubject(SomeKey, () => state, {
|
||||
singleUserId$,
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
nextValue,
|
||||
dependencies$: of(dependencyValue),
|
||||
});
|
||||
@@ -307,19 +327,16 @@ describe("UserStateSubject", () => {
|
||||
subject.next(nextVal);
|
||||
await awaitAsync();
|
||||
|
||||
expect(nextValue).toHaveBeenCalledWith(initialValue, nextVal, dependencyValue);
|
||||
expect(nextValue).toHaveBeenCalledWith(SomeKeySomeUserInitialValue, nextVal, dependencyValue);
|
||||
});
|
||||
|
||||
it("evaluates nextValue when when$ is true", async () => {
|
||||
// this test looks for `nextValue` because a subscription isn't necessary for
|
||||
// the subject to update
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const nextValue = jest.fn((_, next) => next);
|
||||
const when$ = new BehaviorSubject(true);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, {
|
||||
singleUserId$,
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
nextValue,
|
||||
when$,
|
||||
});
|
||||
@@ -334,13 +351,10 @@ describe("UserStateSubject", () => {
|
||||
it("waits to evaluate nextValue until when$ is true", async () => {
|
||||
// this test looks for `nextValue` because a subscription isn't necessary for
|
||||
// the subject to update.
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const nextValue = jest.fn((_, next) => next);
|
||||
const when$ = new BehaviorSubject(false);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, {
|
||||
singleUserId$,
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
nextValue,
|
||||
when$,
|
||||
});
|
||||
@@ -355,62 +369,31 @@ describe("UserStateSubject", () => {
|
||||
expect(nextValue).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("waits to evaluate `UserState.update` until singleUserId$ emits", async () => {
|
||||
// this test looks for `nextMock` because a subscription isn't necessary for
|
||||
it("waits to evaluate `UserState.update` until account$ emits", async () => {
|
||||
// this test looks for `nextValue` because a subscription isn't necessary for
|
||||
// the subject to update.
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new Subject<UserId>();
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
const account$ = new Subject<Account>();
|
||||
const nextValue = jest.fn((_, pending) => pending);
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$, nextValue });
|
||||
|
||||
// precondition: subject doesn't update after `next`
|
||||
const nextVal: TestType = { foo: "next" };
|
||||
subject.next(nextVal);
|
||||
await awaitAsync();
|
||||
expect(state.nextMock).not.toHaveBeenCalled();
|
||||
expect(nextValue).not.toHaveBeenCalled();
|
||||
|
||||
singleUserId$.next(SomeUser);
|
||||
account$.next(SomeAccount);
|
||||
await awaitAsync();
|
||||
|
||||
expect(state.nextMock).toHaveBeenCalledWith({
|
||||
foo: "next",
|
||||
// FIXME: don't leak this detail into the test
|
||||
"$^$ALWAYS_UPDATE_KLUDGE_PROPERTY$^$": 0,
|
||||
});
|
||||
});
|
||||
|
||||
it("waits to evaluate `UserState.update` until singleUserEncryptor$ emits", async () => {
|
||||
const state = new FakeSingleUserState<ClassifiedFormat<void, Record<string, never>>>(
|
||||
SomeUser,
|
||||
{ id: null, secret: '{"foo":"init"}', disclosed: null },
|
||||
);
|
||||
const singleUserEncryptor$ = new Subject<UserBound<"encryptor", UserEncryptor>>();
|
||||
const subject = new UserStateSubject(SomeObjectKey, () => state, { singleUserEncryptor$ });
|
||||
|
||||
// precondition: subject doesn't update after `next`
|
||||
const nextVal: TestType = { foo: "next" };
|
||||
subject.next(nextVal);
|
||||
await awaitAsync();
|
||||
expect(state.nextMock).not.toHaveBeenCalled();
|
||||
|
||||
singleUserEncryptor$.next({ userId: SomeUser, encryptor: SomeEncryptor });
|
||||
await awaitAsync();
|
||||
|
||||
const encrypted = { foo: "encrypt(next)" };
|
||||
expect(state.nextMock).toHaveBeenCalledWith({
|
||||
id: null,
|
||||
secret: encrypted,
|
||||
disclosed: null,
|
||||
// FIXME: don't leak this detail into the test
|
||||
"$^$ALWAYS_UPDATE_KLUDGE_PROPERTY$^$": 0,
|
||||
});
|
||||
expect(nextValue).toHaveBeenCalledWith(SomeKeySomeUserInitialValue, { foo: "next" }, null);
|
||||
});
|
||||
|
||||
it("applies dynamic constraints", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const constraints$ = new BehaviorSubject(DynamicFooMaxLength);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject);
|
||||
const expected: TestType = { foo: "next" };
|
||||
const emission = tracker.expectEmission();
|
||||
@@ -422,10 +405,11 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("applies constraints$ on next", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const constraints$ = new BehaviorSubject(fooMaxLength(2));
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject);
|
||||
|
||||
subject.next({ foo: "next" });
|
||||
@@ -435,10 +419,11 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("applies latest constraints$ on next", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const constraints$ = new BehaviorSubject(fooMaxLength(2));
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject);
|
||||
|
||||
constraints$.next(fooMaxLength(3));
|
||||
@@ -449,10 +434,11 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("waits for constraints$", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const constraints$ = new Subject<StateConstraints<TestType>>();
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const results: any[] = [];
|
||||
subject.subscribe((r) => {
|
||||
results.push(r);
|
||||
@@ -468,10 +454,11 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("uses the last-emitted value from constraints$ when constraints$ errors", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const constraints$ = new BehaviorSubject(fooMaxLength(3));
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject);
|
||||
|
||||
constraints$.error({ some: "error" });
|
||||
@@ -482,10 +469,11 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("uses the last-emitted value from constraints$ when constraints$ completes", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const constraints$ = new BehaviorSubject(fooMaxLength(3));
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject);
|
||||
|
||||
constraints$.complete();
|
||||
@@ -498,9 +486,7 @@ describe("UserStateSubject", () => {
|
||||
|
||||
describe("error", () => {
|
||||
it("emits errors", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$: SomeAccount$ });
|
||||
const expected: TestType = { foo: "error" };
|
||||
|
||||
let actual: TestType = null;
|
||||
@@ -516,10 +502,7 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("ceases emissions once errored", async () => {
|
||||
const initialState = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialState);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$: SomeAccount$ });
|
||||
|
||||
let actual: TestType = null;
|
||||
subject.subscribe({
|
||||
@@ -535,10 +518,7 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("ceases emissions once complete", async () => {
|
||||
const initialState = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialState);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$: SomeAccount$ });
|
||||
|
||||
let shouldNotRun = false;
|
||||
subject.subscribe({
|
||||
@@ -556,9 +536,7 @@ describe("UserStateSubject", () => {
|
||||
|
||||
describe("complete", () => {
|
||||
it("emits completes", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$: SomeAccount$ });
|
||||
|
||||
let actual = false;
|
||||
subject.subscribe({
|
||||
@@ -573,10 +551,7 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("ceases emissions once errored", async () => {
|
||||
const initialState = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialState);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$: SomeAccount$ });
|
||||
|
||||
let shouldNotRun = false;
|
||||
subject.subscribe({
|
||||
@@ -594,10 +569,7 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("ceases emissions once complete", async () => {
|
||||
const initialState = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialState);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$: SomeAccount$ });
|
||||
|
||||
let timesRun = 0;
|
||||
subject.subscribe({
|
||||
@@ -615,10 +587,11 @@ describe("UserStateSubject", () => {
|
||||
|
||||
describe("subscribe", () => {
|
||||
it("applies constraints$ on init", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const constraints$ = new BehaviorSubject(fooMaxLength(2));
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject);
|
||||
|
||||
const [result] = await tracker.pauseUntilReceived(1);
|
||||
@@ -627,10 +600,11 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("applies constraints$ on constraints$ emission", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const constraints$ = new BehaviorSubject(fooMaxLength(2));
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject);
|
||||
|
||||
constraints$.next(fooMaxLength(1));
|
||||
@@ -639,11 +613,9 @@ describe("UserStateSubject", () => {
|
||||
expect(result).toEqual({ foo: "i" });
|
||||
});
|
||||
|
||||
it("completes when singleUserId$ completes", async () => {
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
it("completes when account$ completes", async () => {
|
||||
const account$ = new BehaviorSubject(SomeAccount);
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$ });
|
||||
|
||||
let actual = false;
|
||||
subject.subscribe({
|
||||
@@ -651,38 +623,18 @@ describe("UserStateSubject", () => {
|
||||
actual = true;
|
||||
},
|
||||
});
|
||||
singleUserId$.complete();
|
||||
await awaitAsync();
|
||||
|
||||
expect(actual).toBeTruthy();
|
||||
});
|
||||
|
||||
it("completes when singleUserId$ completes", async () => {
|
||||
const state = new FakeSingleUserState<ClassifiedFormat<void, Record<string, never>>>(
|
||||
SomeUser,
|
||||
{ id: null, secret: '{"foo":"init"}', disclosed: null },
|
||||
);
|
||||
const singleUserEncryptor$ = new Subject<UserBound<"encryptor", UserEncryptor>>();
|
||||
const subject = new UserStateSubject(SomeObjectKey, () => state, { singleUserEncryptor$ });
|
||||
|
||||
let actual = false;
|
||||
subject.subscribe({
|
||||
complete: () => {
|
||||
actual = true;
|
||||
},
|
||||
});
|
||||
singleUserEncryptor$.complete();
|
||||
account$.complete();
|
||||
await awaitAsync();
|
||||
|
||||
expect(actual).toBeTruthy();
|
||||
});
|
||||
|
||||
it("completes when when$ completes", async () => {
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const when$ = new BehaviorSubject(true);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, when$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
when$,
|
||||
});
|
||||
|
||||
let actual = false;
|
||||
subject.subscribe({
|
||||
@@ -699,12 +651,9 @@ describe("UserStateSubject", () => {
|
||||
// FIXME: add test for `this.state.catch` once `FakeSingleUserState` supports
|
||||
// simulated errors
|
||||
|
||||
it("errors when singleUserId$ changes", async () => {
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
const errorUserId = "error" as UserId;
|
||||
it("errors when account$ changes", async () => {
|
||||
const account$ = new BehaviorSubject(SomeAccount);
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$ });
|
||||
|
||||
let error = false;
|
||||
subject.subscribe({
|
||||
@@ -712,39 +661,15 @@ describe("UserStateSubject", () => {
|
||||
error = e as any;
|
||||
},
|
||||
});
|
||||
singleUserId$.next(errorUserId);
|
||||
account$.next(SomeOtherAccount);
|
||||
await awaitAsync();
|
||||
|
||||
expect(error).toEqual({ expectedUserId: SomeUser, actualUserId: errorUserId });
|
||||
expect(error).toBeInstanceOf(Error);
|
||||
});
|
||||
|
||||
it("errors when singleUserEncryptor$ changes", async () => {
|
||||
const state = new FakeSingleUserState<ClassifiedFormat<void, Record<string, never>>>(
|
||||
SomeUser,
|
||||
{ id: null, secret: '{"foo":"init"}', disclosed: null },
|
||||
);
|
||||
const singleUserEncryptor$ = new Subject<UserBound<"encryptor", UserEncryptor>>();
|
||||
const subject = new UserStateSubject(SomeObjectKey, () => state, { singleUserEncryptor$ });
|
||||
const errorUserId = "error" as UserId;
|
||||
|
||||
let error = false;
|
||||
subject.subscribe({
|
||||
error: (e: unknown) => {
|
||||
error = e as any;
|
||||
},
|
||||
});
|
||||
singleUserEncryptor$.next({ userId: SomeUser, encryptor: SomeEncryptor });
|
||||
singleUserEncryptor$.next({ userId: errorUserId, encryptor: SomeEncryptor });
|
||||
await awaitAsync();
|
||||
|
||||
expect(error).toEqual({ expectedUserId: SomeUser, actualUserId: errorUserId });
|
||||
});
|
||||
|
||||
it("errors when singleUserId$ errors", async () => {
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
it("errors when account$ errors", async () => {
|
||||
const account$ = new BehaviorSubject(SomeAccount);
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$ });
|
||||
const expected = { error: "description" };
|
||||
|
||||
let actual = false;
|
||||
@@ -753,37 +678,18 @@ describe("UserStateSubject", () => {
|
||||
actual = e as any;
|
||||
},
|
||||
});
|
||||
singleUserId$.error(expected);
|
||||
await awaitAsync();
|
||||
|
||||
expect(actual).toEqual(expected);
|
||||
});
|
||||
|
||||
it("errors when singleUserEncryptor$ errors", async () => {
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserEncryptor$ = new Subject<UserBound<"encryptor", UserEncryptor>>();
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserEncryptor$ });
|
||||
const expected = { error: "description" };
|
||||
|
||||
let actual = false;
|
||||
subject.subscribe({
|
||||
error: (e: unknown) => {
|
||||
actual = e as any;
|
||||
},
|
||||
});
|
||||
singleUserEncryptor$.error(expected);
|
||||
account$.error(expected);
|
||||
await awaitAsync();
|
||||
|
||||
expect(actual).toEqual(expected);
|
||||
});
|
||||
|
||||
it("errors when when$ errors", async () => {
|
||||
const initialValue: TestType = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialValue);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const when$ = new BehaviorSubject(true);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, when$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
when$,
|
||||
});
|
||||
const expected = { error: "description" };
|
||||
|
||||
let actual = false;
|
||||
@@ -799,21 +705,9 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("userId", () => {
|
||||
it("returns the userId to which the subject is bound", () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new Subject<UserId>();
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
|
||||
expect(subject.userId).toEqual(SomeUser);
|
||||
});
|
||||
});
|
||||
|
||||
describe("withConstraints$", () => {
|
||||
it("emits the next value with an empty constraint", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$: SomeAccount$ });
|
||||
const tracker = new ObservableTracker(subject.withConstraints$);
|
||||
const expected: TestType = { foo: "next" };
|
||||
const emission = tracker.expectEmission();
|
||||
@@ -826,25 +720,23 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("ceases emissions once the subject completes", async () => {
|
||||
const initialState = { foo: "init" };
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, initialState);
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, { account$: SomeAccount$ });
|
||||
const tracker = new ObservableTracker(subject.withConstraints$);
|
||||
|
||||
subject.complete();
|
||||
subject.next({ foo: "ignored" });
|
||||
const [result] = await tracker.pauseUntilReceived(1);
|
||||
|
||||
expect(result.state).toEqual(initialState);
|
||||
expect(result.state).toEqual(SomeKeySomeUserInitialValue);
|
||||
expect(tracker.emissions.length).toEqual(1);
|
||||
});
|
||||
|
||||
it("emits constraints$ on constraints$ emission", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const constraints$ = new BehaviorSubject(fooMaxLength(2));
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject.withConstraints$);
|
||||
const expected = fooMaxLength(1);
|
||||
const emission = tracker.expectEmission();
|
||||
@@ -857,10 +749,11 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("emits dynamic constraints", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const constraints$ = new BehaviorSubject(DynamicFooMaxLength);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject.withConstraints$);
|
||||
const expected: TestType = { foo: "next" };
|
||||
const emission = tracker.expectEmission();
|
||||
@@ -873,11 +766,12 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("emits constraints$ on next", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const expected = fooMaxLength(2);
|
||||
const constraints$ = new BehaviorSubject(expected);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject.withConstraints$);
|
||||
const emission = tracker.expectEmission();
|
||||
|
||||
@@ -889,10 +783,11 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("emits the latest constraints$ on next", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const constraints$ = new BehaviorSubject(fooMaxLength(2));
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject.withConstraints$);
|
||||
const expected = fooMaxLength(3);
|
||||
constraints$.next(expected);
|
||||
@@ -906,10 +801,11 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("waits for constraints$", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const constraints$ = new Subject<StateConstraints<TestType>>();
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject.withConstraints$);
|
||||
const expected = fooMaxLength(3);
|
||||
|
||||
@@ -923,11 +819,12 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("emits the last-emitted value from constraints$ when constraints$ errors", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const expected = fooMaxLength(3);
|
||||
const constraints$ = new BehaviorSubject(expected);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject.withConstraints$);
|
||||
|
||||
constraints$.error({ some: "error" });
|
||||
@@ -939,11 +836,12 @@ describe("UserStateSubject", () => {
|
||||
});
|
||||
|
||||
it("emits the last-emitted value from constraints$ when constraints$ completes", async () => {
|
||||
const state = new FakeSingleUserState<TestType>(SomeUser, { foo: "init" });
|
||||
const singleUserId$ = new BehaviorSubject(SomeUser);
|
||||
const expected = fooMaxLength(3);
|
||||
const constraints$ = new BehaviorSubject(expected);
|
||||
const subject = new UserStateSubject(SomeKey, () => state, { singleUserId$, constraints$ });
|
||||
const subject = new UserStateSubject(SomeKey, SomeProvider, {
|
||||
account$: SomeAccount$,
|
||||
constraints$,
|
||||
});
|
||||
const tracker = new ObservableTracker(subject.withConstraints$);
|
||||
|
||||
constraints$.complete();
|
||||
|
||||
@@ -24,14 +24,17 @@ import {
|
||||
withLatestFrom,
|
||||
scan,
|
||||
skip,
|
||||
shareReplay,
|
||||
tap,
|
||||
switchMap,
|
||||
} from "rxjs";
|
||||
|
||||
import { Account } from "../../auth/abstractions/account.service";
|
||||
import { EncString } from "../../platform/models/domain/enc-string";
|
||||
import { SingleUserState, UserKeyDefinition } from "../../platform/state";
|
||||
import { UserId } from "../../types/guid";
|
||||
import { UserEncryptor } from "../cryptography/user-encryptor.abstraction";
|
||||
import { UserBound } from "../dependencies";
|
||||
import { anyComplete, errorOnChange, ready, withLatestReady } from "../rx";
|
||||
import { SemanticLogger } from "../log";
|
||||
import { anyComplete, pin, ready, withLatestReady } from "../rx";
|
||||
import { Constraints, SubjectConstraints, WithConstraints } from "../types";
|
||||
|
||||
import { ClassifiedFormat, isClassifiedFormat } from "./classified-format";
|
||||
@@ -39,6 +42,7 @@ import { unconstrained$ } from "./identity-state-constraint";
|
||||
import { isObjectKey, ObjectKey, toUserKeyDefinition } from "./object-key";
|
||||
import { isDynamic } from "./state-constraints-dependency";
|
||||
import { UserStateSubjectDependencies } from "./user-state-subject-dependencies";
|
||||
import { UserStateSubjectDependencyProvider } from "./user-state-subject-dependency-provider";
|
||||
|
||||
type Constrained<State> = { constraints: Readonly<Constraints<State>>; state: State };
|
||||
|
||||
@@ -59,6 +63,9 @@ type Constrained<State> = { constraints: Readonly<Constraints<State>>; state: St
|
||||
// update b/c their IVs change.
|
||||
const ALWAYS_UPDATE_KLUDGE = "$^$ALWAYS_UPDATE_KLUDGE_PROPERTY$^$";
|
||||
|
||||
/** Default frame size for data packing */
|
||||
const DEFAULT_FRAME_SIZE = 32;
|
||||
|
||||
/**
|
||||
* Adapt a state provider to an rxjs subject.
|
||||
*
|
||||
@@ -90,12 +97,12 @@ export class UserStateSubject<
|
||||
* this becomes true. When this occurs, only the last-received update
|
||||
* is applied. The blocked update is kept in memory. It does not persist
|
||||
* to disk.
|
||||
* @param dependencies.singleUserId$ writes block until the singleUserId$
|
||||
* @param dependencies.account$ writes block until the account$
|
||||
* is available.
|
||||
*/
|
||||
constructor(
|
||||
private key: UserKeyDefinition<State> | ObjectKey<State, Secret, Disclosed>,
|
||||
getState: (key: UserKeyDefinition<unknown>) => SingleUserState<unknown>,
|
||||
private providers: UserStateSubjectDependencyProvider,
|
||||
private context: UserStateSubjectDependencies<State, Dependencies>,
|
||||
) {
|
||||
super();
|
||||
@@ -104,42 +111,60 @@ export class UserStateSubject<
|
||||
// classification and encryption only supported with `ObjectKey`
|
||||
this.objectKey = this.key;
|
||||
this.stateKey = toUserKeyDefinition(this.key);
|
||||
this.state = getState(this.stateKey);
|
||||
} else {
|
||||
// raw state access granted with `UserKeyDefinition`
|
||||
this.objectKey = null;
|
||||
this.stateKey = this.key as UserKeyDefinition<State>;
|
||||
this.state = getState(this.stateKey);
|
||||
}
|
||||
|
||||
this.log = this.providers.log({
|
||||
contextId: this.contextId,
|
||||
type: "UserStateSubject",
|
||||
storage: {
|
||||
state: this.stateKey.stateDefinition.name,
|
||||
key: this.stateKey.key,
|
||||
},
|
||||
});
|
||||
|
||||
// normalize dependencies
|
||||
const when$ = (this.context.when$ ?? new BehaviorSubject(true)).pipe(distinctUntilChanged());
|
||||
const account$ = context.account$.pipe(
|
||||
pin({
|
||||
name: () => `${this.contextId} { account$ }`,
|
||||
distinct(prev, current) {
|
||||
return prev.id === current.id;
|
||||
},
|
||||
}),
|
||||
shareReplay({ refCount: true, bufferSize: 1 }),
|
||||
);
|
||||
const encryptor$ = this.encryptor(account$);
|
||||
const constraints$ = (this.context.constraints$ ?? unconstrained$<State>()).pipe(
|
||||
catchError((e: unknown) => {
|
||||
this.log.error(e as object, "constraints$ dependency failed; using last-known constraints");
|
||||
return EMPTY;
|
||||
}),
|
||||
shareReplay({ refCount: true, bufferSize: 1 }),
|
||||
);
|
||||
const dependencies$ = (
|
||||
this.context.dependencies$ ?? new BehaviorSubject<Dependencies>(null)
|
||||
).pipe(shareReplay({ refCount: true, bufferSize: 1 }));
|
||||
|
||||
// manage dependencies through replay subjects since `UserStateSubject`
|
||||
// reads them in multiple places
|
||||
const encryptor$ = new ReplaySubject<UserEncryptor>(1);
|
||||
const { singleUserId$, singleUserEncryptor$ } = this.context;
|
||||
this.encryptor(singleUserEncryptor$ ?? singleUserId$).subscribe(encryptor$);
|
||||
|
||||
const constraints$ = new ReplaySubject<SubjectConstraints<State>>(1);
|
||||
(this.context.constraints$ ?? unconstrained$<State>())
|
||||
.pipe(
|
||||
// FIXME: this should probably log that an error occurred
|
||||
catchError(() => EMPTY),
|
||||
)
|
||||
.subscribe(constraints$);
|
||||
|
||||
const dependencies$ = new ReplaySubject<Dependencies>(1);
|
||||
if (this.context.dependencies$) {
|
||||
this.context.dependencies$.subscribe(dependencies$);
|
||||
} else {
|
||||
dependencies$.next(null);
|
||||
}
|
||||
// load state once the account becomes available
|
||||
const userState$ = account$.pipe(
|
||||
tap((account) => this.log.debug({ accountId: account.id }, "loading user state")),
|
||||
map((account) => this.providers.state.getUser(account.id, this.stateKey)),
|
||||
shareReplay({ refCount: true, bufferSize: 1 }),
|
||||
);
|
||||
|
||||
// wire output before input so that output normalizes the current state
|
||||
// before any `next` value is processed
|
||||
this.outputSubscription = this.state.state$
|
||||
.pipe(this.declassify(encryptor$), this.adjust(combineLatestWith(constraints$)))
|
||||
this.outputSubscription = userState$
|
||||
.pipe(
|
||||
switchMap((userState) => userState.state$),
|
||||
this.declassify(encryptor$),
|
||||
this.adjust(combineLatestWith(constraints$)),
|
||||
takeUntil(anyComplete(account$)),
|
||||
)
|
||||
.subscribe(this.output);
|
||||
|
||||
const last$ = new ReplaySubject<State>(1);
|
||||
@@ -168,45 +193,42 @@ export class UserStateSubject<
|
||||
//
|
||||
// FIXME: this should probably timeout when a lock occurs
|
||||
this.inputSubscription = updates$
|
||||
.pipe(this.classify(encryptor$), takeUntil(anyComplete([when$, this.input, encryptor$])))
|
||||
.pipe(
|
||||
this.classify(encryptor$),
|
||||
withLatestFrom(userState$),
|
||||
takeUntil(anyComplete([when$, this.input, encryptor$])),
|
||||
)
|
||||
.subscribe({
|
||||
next: (state) => this.onNext(state),
|
||||
next: ([input, state]) => this.onNext(input, state),
|
||||
error: (e: unknown) => this.onError(e),
|
||||
complete: () => this.onComplete(),
|
||||
});
|
||||
}
|
||||
|
||||
private stateKey: UserKeyDefinition<unknown>;
|
||||
private objectKey: ObjectKey<State, Secret, Disclosed>;
|
||||
private get contextId() {
|
||||
return `UserStateSubject(${this.stateKey.stateDefinition.name}, ${this.stateKey.key})`;
|
||||
}
|
||||
|
||||
private encryptor(
|
||||
singleUserEncryptor$: Observable<UserBound<"encryptor", UserEncryptor> | UserId>,
|
||||
): Observable<UserEncryptor> {
|
||||
return singleUserEncryptor$.pipe(
|
||||
// normalize inputs
|
||||
map((maybe): UserBound<"encryptor", UserEncryptor> => {
|
||||
if (typeof maybe === "object" && "encryptor" in maybe) {
|
||||
return maybe;
|
||||
} else if (typeof maybe === "string") {
|
||||
return { encryptor: null, userId: maybe as UserId };
|
||||
} else {
|
||||
throw new Error(`Invalid encryptor input received for ${this.key.key}.`);
|
||||
}
|
||||
}),
|
||||
// fail the stream if the state desyncs from the bound userId
|
||||
errorOnChange(
|
||||
({ userId }) => userId,
|
||||
(expectedUserId, actualUserId) => ({ expectedUserId, actualUserId }),
|
||||
),
|
||||
// reduce emissions to when encryptor changes
|
||||
private readonly log: SemanticLogger;
|
||||
|
||||
private readonly stateKey: UserKeyDefinition<unknown>;
|
||||
private readonly objectKey: ObjectKey<State, Secret, Disclosed>;
|
||||
|
||||
private encryptor(account$: Observable<Account>): Observable<UserEncryptor> {
|
||||
const singleUserId$ = account$.pipe(map((account) => account.id));
|
||||
const frameSize = this.objectKey?.frame ?? DEFAULT_FRAME_SIZE;
|
||||
const encryptor$ = this.providers.encryptor.userEncryptor$(frameSize, { singleUserId$ }).pipe(
|
||||
tap(() => this.log.debug("encryptor constructed")),
|
||||
map(({ encryptor }) => encryptor),
|
||||
distinctUntilChanged(),
|
||||
shareReplay({ refCount: true, bufferSize: 1 }),
|
||||
);
|
||||
return encryptor$;
|
||||
}
|
||||
|
||||
private when(when$: Observable<boolean>): OperatorFunction<State, State> {
|
||||
return pipe(
|
||||
combineLatestWith(when$.pipe(distinctUntilChanged())),
|
||||
tap(([_, when]) => this.log.debug({ when }, "when status")),
|
||||
filter(([_, when]) => !!when),
|
||||
map(([input]) => input),
|
||||
);
|
||||
@@ -237,6 +259,7 @@ export class UserStateSubject<
|
||||
return [next, dependencies];
|
||||
} else {
|
||||
// false update
|
||||
this.log.debug("shouldUpdate prevented write");
|
||||
return [prev, null];
|
||||
}
|
||||
}),
|
||||
@@ -258,20 +281,22 @@ export class UserStateSubject<
|
||||
// * `input` needs to wait until a message flows through the pipe
|
||||
withConstraints,
|
||||
map(([loadedState, constraints]) => {
|
||||
// bypass nulls
|
||||
if (!loadedState && !this.objectKey?.initial) {
|
||||
this.log.debug("no value; bypassing adjustment");
|
||||
return {
|
||||
constraints: {} as Constraints<State>,
|
||||
state: null,
|
||||
} satisfies Constrained<State>;
|
||||
}
|
||||
|
||||
this.log.debug("adjusting");
|
||||
const unconstrained = loadedState ?? structuredClone(this.objectKey.initial);
|
||||
const calibration = isDynamic(constraints)
|
||||
? constraints.calibrate(unconstrained)
|
||||
: constraints;
|
||||
const adjusted = calibration.adjust(unconstrained);
|
||||
|
||||
this.log.debug("adjusted");
|
||||
return {
|
||||
constraints: calibration.constraints,
|
||||
state: adjusted,
|
||||
@@ -286,11 +311,14 @@ export class UserStateSubject<
|
||||
return pipe(
|
||||
combineLatestWith(constraints$),
|
||||
map(([loadedState, constraints]) => {
|
||||
this.log.debug("fixing");
|
||||
|
||||
const calibration = isDynamic(constraints)
|
||||
? constraints.calibrate(loadedState)
|
||||
: constraints;
|
||||
const fixed = calibration.fix(loadedState);
|
||||
|
||||
this.log.debug("fixed");
|
||||
return {
|
||||
constraints: calibration.constraints,
|
||||
state: fixed,
|
||||
@@ -302,6 +330,7 @@ export class UserStateSubject<
|
||||
private declassify(encryptor$: Observable<UserEncryptor>): OperatorFunction<unknown, State> {
|
||||
// short-circuit if they key lacks encryption support
|
||||
if (!this.objectKey || this.objectKey.format === "plain") {
|
||||
this.log.debug("key uses plain format; bypassing declassification");
|
||||
return (input$) => input$ as Observable<State>;
|
||||
}
|
||||
|
||||
@@ -312,9 +341,12 @@ export class UserStateSubject<
|
||||
concatMap(async ([input, encryptor]) => {
|
||||
// pass through null values
|
||||
if (input === null || input === undefined) {
|
||||
this.log.debug("no value; bypassing declassification");
|
||||
return null;
|
||||
}
|
||||
|
||||
this.log.debug("declassifying");
|
||||
|
||||
// decrypt classified data
|
||||
const { secret, disclosed } = input;
|
||||
const encrypted = EncString.fromJSON(secret);
|
||||
@@ -324,6 +356,7 @@ export class UserStateSubject<
|
||||
const declassified = this.objectKey.classifier.declassify(disclosed, decryptedSecret);
|
||||
const state = this.objectKey.options.deserializer(declassified);
|
||||
|
||||
this.log.debug("declassified");
|
||||
return state;
|
||||
}),
|
||||
);
|
||||
@@ -338,6 +371,7 @@ export class UserStateSubject<
|
||||
if (this.objectKey && this.objectKey.format === "classified") {
|
||||
return map((input) => {
|
||||
if (!isClassifiedFormat(input)) {
|
||||
this.log.warn("classified data must be in classified format; dropping");
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -349,11 +383,13 @@ export class UserStateSubject<
|
||||
if (this.objectKey && this.objectKey.format === "secret-state") {
|
||||
return map((input) => {
|
||||
if (!Array.isArray(input)) {
|
||||
this.log.warn("secret-state requires array formatting; dropping");
|
||||
return null;
|
||||
}
|
||||
|
||||
const [unwrapped] = input;
|
||||
if (!isClassifiedFormat(unwrapped)) {
|
||||
this.log.warn("unwrapped secret-state must be in classified format; dropping");
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -361,13 +397,14 @@ export class UserStateSubject<
|
||||
});
|
||||
}
|
||||
|
||||
throw new Error(`unsupported serialization format: ${this.objectKey.format}`);
|
||||
this.log.panic({ format: this.objectKey.format }, "unsupported serialization format");
|
||||
}
|
||||
|
||||
private classify(encryptor$: Observable<UserEncryptor>): OperatorFunction<State, unknown> {
|
||||
// short-circuit if they key lacks encryption support; `encryptor` is
|
||||
// readied to preserve `dependencies.singleUserId$` emission contract
|
||||
if (!this.objectKey || this.objectKey.format === "plain") {
|
||||
this.log.debug("key uses plain format; bypassing classification");
|
||||
return pipe(
|
||||
ready(encryptor$),
|
||||
map((input) => input as unknown),
|
||||
@@ -380,9 +417,12 @@ export class UserStateSubject<
|
||||
concatMap(async ([input, encryptor]) => {
|
||||
// fail fast if there's no value
|
||||
if (input === null || input === undefined) {
|
||||
this.log.debug("no value; bypassing classification");
|
||||
return null;
|
||||
}
|
||||
|
||||
this.log.debug("classifying");
|
||||
|
||||
// split data by classification level
|
||||
const serialized = JSON.parse(JSON.stringify(input));
|
||||
const classified = this.objectKey.classifier.classify(serialized);
|
||||
@@ -398,6 +438,7 @@ export class UserStateSubject<
|
||||
disclosed: classified.disclosed,
|
||||
} satisfies ClassifiedFormat<void, Disclosed>;
|
||||
|
||||
this.log.debug("classified");
|
||||
// deliberate type erasure; the type is restored during `declassify`
|
||||
return envelope as ClassifiedFormat<unknown, unknown>;
|
||||
}),
|
||||
@@ -416,13 +457,7 @@ export class UserStateSubject<
|
||||
return map((input) => [input] as unknown);
|
||||
}
|
||||
|
||||
throw new Error(`unsupported serialization format: ${this.objectKey.format}`);
|
||||
}
|
||||
|
||||
/** The userId to which the subject is bound.
|
||||
*/
|
||||
get userId() {
|
||||
return this.state.userId;
|
||||
this.log.panic({ format: this.objectKey.format }, "unsupported serialization format");
|
||||
}
|
||||
|
||||
next(value: State) {
|
||||
@@ -449,7 +484,6 @@ export class UserStateSubject<
|
||||
// if greater efficiency becomes desirable, consider implementing
|
||||
// `SubjectLike` directly
|
||||
private input = new ReplaySubject<State>(1);
|
||||
private state: SingleUserState<unknown>;
|
||||
private readonly output = new ReplaySubject<WithConstraints<State>>(1);
|
||||
|
||||
/** A stream containing settings and their last-applied constraints. */
|
||||
@@ -462,9 +496,11 @@ export class UserStateSubject<
|
||||
|
||||
private counter = 0;
|
||||
|
||||
private onNext(value: unknown) {
|
||||
this.state
|
||||
private onNext(value: unknown, state: SingleUserState<unknown>) {
|
||||
state
|
||||
.update(() => {
|
||||
this.log.debug("updating");
|
||||
|
||||
if (typeof value === "object") {
|
||||
// related: ALWAYS_UPDATE_KLUDGE FIXME
|
||||
const counter = this.counter++;
|
||||
@@ -472,13 +508,17 @@ export class UserStateSubject<
|
||||
this.counter = 0;
|
||||
}
|
||||
|
||||
const kludge = value as any;
|
||||
const kludge = { ...value } as any;
|
||||
kludge[ALWAYS_UPDATE_KLUDGE] = counter;
|
||||
}
|
||||
|
||||
this.log.debug("updated");
|
||||
return value;
|
||||
})
|
||||
.catch((e: any) => this.onError(e));
|
||||
.catch((e: any) => {
|
||||
this.log.error(e as object, "updating failed");
|
||||
this.onError(e);
|
||||
});
|
||||
}
|
||||
|
||||
private onError(value: any) {
|
||||
@@ -503,6 +543,8 @@ export class UserStateSubject<
|
||||
|
||||
private dispose() {
|
||||
if (!this.isDisposed) {
|
||||
this.log.debug("disposing");
|
||||
|
||||
// clean up internal subscriptions
|
||||
this.inputSubscription?.unsubscribe();
|
||||
this.outputSubscription?.unsubscribe();
|
||||
@@ -511,6 +553,8 @@ export class UserStateSubject<
|
||||
|
||||
// drop input to ensure its value is removed from memory
|
||||
this.input = null;
|
||||
|
||||
this.log.debug("disposed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user