1
0
mirror of https://github.com/bitwarden/browser synced 2026-02-19 02:44:01 +00:00

Merge branch 'innovation/user-achievements/event-stream-prototype' of https://github.com/bitwarden/clients into innovation/user-achievements/event-stream-prototype

Merging from the base branch
This commit is contained in:
Tom
2025-03-20 12:31:36 -04:00
18 changed files with 399 additions and 66 deletions

View File

@@ -1,9 +1,92 @@
import { BehaviorSubject, ReplaySubject, Subject, firstValueFrom } from "rxjs";
import { ConsoleLogService } from "../../platform/services/console-log.service";
import { consoleSemanticLoggerProvider } from "../log";
import { AchievementHub } from "./achievement-hub";
import { ItemCreatedEarnedEvent } from "./examples/achievement-events";
import {
TotallyAttachedAchievement,
TotallyAttachedValidator,
} from "./examples/example-validators";
import { itemAdded$ } from "./examples/user-events";
import {
AchievementEarnedEvent,
AchievementEvent,
AchievementId,
AchievementProgressEvent,
AchievementValidator,
MetricId,
UserActionEvent,
} from "./types";
const testLog = consoleSemanticLoggerProvider(new ConsoleLogService(true), {});
describe("AchievementHub", () => {
describe("earned$", () => {});
describe("all$", () => {
it("emits achievements constructor emissions", async () => {
const validators$ = new Subject<AchievementValidator[]>();
const events$ = new Subject<UserActionEvent>();
const achievements$ = new Subject<AchievementEvent>();
const hub = new AchievementHub(validators$, events$, achievements$);
const results$ = new ReplaySubject<AchievementEvent>(3);
hub.all$().subscribe(results$);
describe("metrics$", () => {});
achievements$.next(ItemCreatedEarnedEvent);
describe("all$", () => {});
const result = firstValueFrom(results$);
await expect(result).resolves.toEqual(ItemCreatedEarnedEvent);
});
describe("named$", () => {});
it("emits achievements derived from events", async () => {
const validators$ = new BehaviorSubject<AchievementValidator[]>([TotallyAttachedValidator]);
const events$ = new Subject<UserActionEvent>();
const achievements$ = new Subject<AchievementEvent>();
const hub = new AchievementHub(validators$, events$, achievements$, 10, testLog);
const results$ = new ReplaySubject<AchievementEvent>(3);
hub.all$().subscribe(results$);
// hub starts listening when achievements$ completes
achievements$.complete();
itemAdded$.subscribe(events$);
const result = firstValueFrom(results$);
await expect(result).resolves.toMatchObject({
achievement: { type: "earned", name: TotallyAttachedAchievement },
});
});
});
describe("new$", () => {
it("", async () => {
const validators$ = new Subject<AchievementValidator[]>();
const events$ = new Subject<UserActionEvent>();
const achievements$ = new Subject<AchievementEvent>();
const hub = new AchievementHub(validators$, events$, achievements$);
const results$ = new ReplaySubject<AchievementEvent>(3);
hub.new$().subscribe(results$);
});
});
describe("earned$", () => {
it("", async () => {
const validators$ = new Subject<AchievementValidator[]>();
const events$ = new Subject<UserActionEvent>();
const achievements$ = new Subject<AchievementEvent>();
const hub = new AchievementHub(validators$, events$, achievements$);
const results$ = new ReplaySubject<Map<AchievementId, AchievementEarnedEvent>>(1);
hub.earned$().subscribe(results$);
});
});
describe("metrics$", () => {
it("", async () => {
const validators$ = new Subject<AchievementValidator[]>();
const events$ = new Subject<UserActionEvent>();
const achievements$ = new Subject<AchievementEvent>();
const hub = new AchievementHub(validators$, events$, achievements$);
const results$ = new ReplaySubject<Map<MetricId, AchievementProgressEvent>>(1);
hub.metrics$().subscribe(results$);
});
});
});

View File

@@ -2,13 +2,17 @@ import {
Observable,
ReplaySubject,
Subject,
concat,
debounceTime,
filter,
map,
share,
shareReplay,
startWith,
tap,
} from "rxjs";
import { SemanticLogger, disabledSemanticLoggerProvider } from "../log";
import { active } from "./achievement-manager";
import { achievements } from "./achievement-processor";
import { latestEarnedMetrics, latestProgressMetrics } from "./latest-metrics";
@@ -26,10 +30,26 @@ import {
const ACHIEVEMENT_INITIAL_DEBOUNCE_MS = 100;
export class AchievementHub {
/** Instantiates the achievement hub. A new achievement hub should be created
* per-user, and streams should be partitioned by user.
* @param validators$ emits the most recent achievement validator list and
* re-emits the full list when the validators change.
* @param events$ emits events captured from the system as they occur. THIS
* OBSERVABLE IS SUBSCRIBED DURING INITIALIZATION. It must emit a complete
* event to prevent the event hub from leaking the subscription.
* @param achievements$ emits the list of achievement events captured before
* initialization and then completes. THIS OBSERVABLE IS SUBSCRIBED DURING
* INITIALIZATION. Achievement processing begins once this observable
* completes.
* @param bufferSize the maximum number of achievement events retained by the
* achievement hub.
*/
constructor(
validators$: Observable<AchievementValidator[]>,
events$: Observable<UserActionEvent>,
achievements$: Observable<AchievementEvent>,
bufferSize: number = 1000,
private log: SemanticLogger = disabledSemanticLoggerProvider({}),
) {
this.achievements = new Subject<AchievementEvent>();
this.achievementLog = new ReplaySubject<AchievementEvent>(bufferSize);
@@ -37,36 +57,22 @@ export class AchievementHub {
const metrics$ = this.metrics$().pipe(
map((m) => new Map(Array.from(m.entries(), ([k, v]) => [k, v.achievement.value] as const))),
share(),
shareReplay({ bufferSize: 1, refCount: true }),
);
const earned$ = this.earned$().pipe(map((m) => new Set(m.keys())));
const active$ = validators$.pipe(active(metrics$, earned$));
events$.pipe(achievements(active$, metrics$)).subscribe(this.achievements);
// TODO: figure out how to to unsubscribe from the event stream;
// this likely requires accepting an account-bound observable, which
// would also let the hub maintain it's "one user" invariant.
concat(achievements$, events$.pipe(achievements(active$, metrics$))).subscribe(
this.achievements,
);
}
private readonly achievements: Subject<AchievementEvent>;
private readonly achievementLog: ReplaySubject<AchievementEvent>;
earned$(): Observable<Map<AchievementId, AchievementEarnedEvent>> {
return this.achievementLog.pipe(
filter((e) => isEarnedEvent(e)),
map((e) => e as AchievementEarnedEvent),
latestEarnedMetrics(),
startWith(new Map<AchievementId, AchievementEarnedEvent>()),
debounceTime(ACHIEVEMENT_INITIAL_DEBOUNCE_MS),
);
}
metrics$(): Observable<Map<MetricId, AchievementProgressEvent>> {
return this.achievementLog.pipe(
filter((e) => isProgressEvent(e)),
map((e) => e as AchievementProgressEvent),
latestProgressMetrics(),
startWith(new Map<MetricId, AchievementProgressEvent>()),
);
}
/** emit all achievement events */
all$(): Observable<AchievementEvent> {
return this.achievementLog.asObservable();
@@ -76,4 +82,26 @@ export class AchievementHub {
new$(): Observable<AchievementEvent> {
return this.achievements.asObservable();
}
earned$(): Observable<Map<AchievementId, AchievementEarnedEvent>> {
return this.achievementLog.pipe(
filter((e) => isEarnedEvent(e)),
map((e) => e as AchievementEarnedEvent),
latestEarnedMetrics(),
debounceTime(ACHIEVEMENT_INITIAL_DEBOUNCE_MS),
tap((m) => this.log.debug(m, "earned achievements update")),
startWith(new Map<AchievementId, AchievementEarnedEvent>()),
);
}
metrics$(): Observable<Map<MetricId, AchievementProgressEvent>> {
return this.achievementLog.pipe(
filter((e) => isProgressEvent(e)),
map((e) => e as AchievementProgressEvent),
latestProgressMetrics(),
debounceTime(ACHIEVEMENT_INITIAL_DEBOUNCE_MS),
tap((m) => this.log.debug(m, "achievement metrics update")),
startWith(new Map<MetricId, AchievementProgressEvent>()),
);
}
}

View File

@@ -21,7 +21,7 @@ const VaultItems_10_Added_Achievement: Achievement = {
const VaultItems_50_Added_Achievement: Achievement = {
achievement: "50-vault-items-added" as AchievementId,
name: "It's 50/50 Vault Items Added",
name: "It's 50/50",
description: "Saved your 50th item to Bitwarden",
validator: "Threshold",
active: { metric: VaultItemCreatedProgress, high: 50 },

View File

@@ -1,5 +1,18 @@
import { Primitive } from "type-fest";
export type EcsEventType =
| "access"
| "admin"
| "allowed"
| "creation"
| "deletion"
| "denied"
| "end"
| "error"
| "info"
| "start"
| "user";
/** Elastic Common Schema log format - core fields.
*/
export interface EcsFormat {
@@ -18,18 +31,7 @@ export interface EcsFormat {
event: {
kind?: "alert" | "enrichment" | "event" | "metric" | "state";
category?: "api" | "authentication" | "iam" | "process" | "session";
type?:
| "access"
| "admin"
| "allowed"
| "creation"
| "deletion"
| "denied"
| "end"
| "error"
| "info"
| "start"
| "user";
type?: EcsEventType;
outcome?: "failure" | "success" | "unknown";
};
}

View File

@@ -1,4 +1,4 @@
export { EcsFormat } from "./core";
export { EcsFormat, EcsEventType } from "./core";
export { ErrorFormat } from "./error";
export { EventFormat } from "./event";
export { LogFormat } from "./log";

View File

@@ -3,8 +3,10 @@ import { EcsFormat } from "./core";
export type ServiceFormat = EcsFormat & {
/** documents the program providing the log */
service: {
/** Which kind of client is it? */
name: "android" | "cli" | "desktop" | "extension" | "ios" | "web";
/** Which kind of client is it?
* @remarks this contains the output of `BrowserPlatformUtilsService.getDeviceString()` in practice.
*/
name: string;
/** identifies the service as a type of client device */
type: "client";
@@ -18,6 +20,6 @@ export type ServiceFormat = EcsFormat & {
environment: "production" | "testing" | "development" | "local";
/** the unique identifier(s) for this client installation */
version: "2025.3.1-innovation-sprint";
version: string;
};
};

View File

@@ -0,0 +1,107 @@
import { BehaviorSubject, SubjectLike, from, map, zip } from "rxjs";
import { Primitive } from "type-fest";
import { Account } from "../../auth/abstractions/account.service";
import { AppIdService } from "../../platform/abstractions/app-id.service";
import { PlatformUtilsService } from "../../platform/abstractions/platform-utils.service";
import { UserActionEvent } from "../achievements/types";
import { ServiceFormat, UserFormat, EcsEventType } from "./ecs-format";
import { disabledSemanticLoggerProvider } from "./factory";
import { SemanticLogger } from "./semantic-logger.abstraction";
export abstract class UserEventLogProvider {
abstract create: (account: Account) => UserEventLogger;
}
type BaselineType = Omit<ServiceFormat & UserFormat, "@timestamp">;
type EventInfo = {
action: string;
labels?: Record<string, Primitive>;
tags?: Array<string>;
};
export class UserEventLogger {
constructor(
idService: AppIdService,
utilService: PlatformUtilsService,
account: Account,
private now: () => number,
private events$: SubjectLike<UserActionEvent>,
private log: SemanticLogger = disabledSemanticLoggerProvider({}),
) {
zip(from(idService.getAppId()), from(utilService.getApplicationVersion()))
.pipe(
map(
([appId, version]) =>
({
event: {
kind: "event",
category: "session",
},
service: {
name: utilService.getDeviceString(),
type: "client",
node: {
name: appId,
},
environment: "local",
version,
},
user: {
// `account` verified not-null via `filter`
id: account!.id,
email: (account!.emailVerified && account!.email) || undefined,
},
}) satisfies BaselineType,
),
)
.subscribe((next) => this.baseline$.next(next));
}
private readonly baseline$ = new BehaviorSubject<BaselineType | null>(null);
creation(event: EventInfo) {
this.collect("creation", event);
}
deletion(event: EventInfo) {
this.collect("deletion", event);
}
info(event: EventInfo) {
this.collect("info", event);
}
access(event: EventInfo) {
this.collect("access", event);
}
private collect(type: EcsEventType, info: EventInfo) {
const { value: baseline } = this.baseline$;
if (!baseline) {
// TODO: buffer logs and stream them when `baseline$` becomes available.
this.log.error("baseline log not available; dropping user event");
return;
}
const event = structuredClone(this.baseline$.value) as UserActionEvent;
event["@timestamp"] = this.now();
event.event.type = type;
event.action = info.action;
event.tags = info.tags && info.tags.filter((t) => !!t);
if (info.labels) {
const entries = Object.keys(info.labels)
.filter((k) => !!info.labels![k])
.map((k) => [k, info.labels![k]] as const);
const labels = Object.fromEntries(entries);
event.labels = labels;
}
this.events$.next(event);
}
}

View File

@@ -2,6 +2,7 @@ import { PolicyService } from "../admin-console/abstractions/policy/policy.servi
import { ExtensionService } from "./extension/extension.service";
import { LogProvider } from "./log";
import { UserEventLogProvider } from "./log/logger";
/** Provides access to commonly-used cross-cutting services. */
export type SystemServiceProvider = {
@@ -13,4 +14,6 @@ export type SystemServiceProvider = {
/** Event monitoring and diagnostic interfaces */
readonly log: LogProvider;
readonly event: UserEventLogProvider;
};