mirror of
https://github.com/bitwarden/browser
synced 2025-12-13 14:53:33 +00:00
[PM-7489] Introduce MessageSender & MessageListener (#8709)
* Introduce MessageSender * Update `messageSenderFactory` * Remove Comment * Use BrowserApi * Update Comment * Rename to CommandDefinition * Add More Documentation to MessageSender * Add `EMPTY` helpers and remove NoopMessageSender * Calm Down Logging * Limit Logging On Known Errors * Use `messageStream` Parameter Co-authored-by: Matt Gibson <mgibson@bitwarden.com> * Add eslint rules * Update Error Handling Co-authored-by: Cesar Gonzalez <cesar.a.gonzalezcs@gmail.com> * Delete Lazy Classes In Favor of Observable Factories * Remove Fido Messages --------- Co-authored-by: Matt Gibson <mgibson@bitwarden.com> Co-authored-by: Cesar Gonzalez <cesar.a.gonzalezcs@gmail.com>
This commit is contained in:
@@ -1,6 +0,0 @@
|
||||
import { Injectable } from "@angular/core";
|
||||
|
||||
import { BroadcasterService as BaseBroadcasterService } from "@bitwarden/common/platform/services/broadcaster.service";
|
||||
|
||||
@Injectable()
|
||||
export class BroadcasterService extends BaseBroadcasterService {}
|
||||
@@ -1,5 +1,5 @@
|
||||
import { InjectionToken } from "@angular/core";
|
||||
import { Observable } from "rxjs";
|
||||
import { Observable, Subject } from "rxjs";
|
||||
|
||||
import {
|
||||
AbstractMemoryStorageService,
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
} from "@bitwarden/common/platform/abstractions/storage.service";
|
||||
import { ThemeType } from "@bitwarden/common/platform/enums";
|
||||
import { StateFactory } from "@bitwarden/common/platform/factories/state-factory";
|
||||
import { Message } from "@bitwarden/common/platform/messaging";
|
||||
|
||||
declare const tag: unique symbol;
|
||||
/**
|
||||
@@ -49,3 +50,6 @@ export const LOG_MAC_FAILURES = new SafeInjectionToken<boolean>("LOG_MAC_FAILURE
|
||||
export const SYSTEM_THEME_OBSERVABLE = new SafeInjectionToken<Observable<ThemeType>>(
|
||||
"SYSTEM_THEME_OBSERVABLE",
|
||||
);
|
||||
export const INTRAPROCESS_MESSAGING_SUBJECT = new SafeInjectionToken<Subject<Message<object>>>(
|
||||
"INTRAPROCESS_MESSAGING_SUBJECT",
|
||||
);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { ErrorHandler, LOCALE_ID, NgModule } from "@angular/core";
|
||||
import { Subject } from "rxjs";
|
||||
|
||||
import {
|
||||
AuthRequestServiceAbstraction,
|
||||
@@ -116,7 +117,7 @@ import { BillingApiService } from "@bitwarden/common/billing/services/billing-ap
|
||||
import { OrganizationBillingService } from "@bitwarden/common/billing/services/organization-billing.service";
|
||||
import { PaymentMethodWarningsService } from "@bitwarden/common/billing/services/payment-method-warnings.service";
|
||||
import { AppIdService as AppIdServiceAbstraction } from "@bitwarden/common/platform/abstractions/app-id.service";
|
||||
import { BroadcasterService as BroadcasterServiceAbstraction } from "@bitwarden/common/platform/abstractions/broadcaster.service";
|
||||
import { BroadcasterService } from "@bitwarden/common/platform/abstractions/broadcaster.service";
|
||||
import { ConfigApiServiceAbstraction } from "@bitwarden/common/platform/abstractions/config/config-api.service.abstraction";
|
||||
import { ConfigService } from "@bitwarden/common/platform/abstractions/config/config.service";
|
||||
import { CryptoFunctionService as CryptoFunctionServiceAbstraction } from "@bitwarden/common/platform/abstractions/crypto-function.service";
|
||||
@@ -137,6 +138,9 @@ import {
|
||||
DefaultBiometricStateService,
|
||||
} from "@bitwarden/common/platform/biometrics/biometric-state.service";
|
||||
import { StateFactory } from "@bitwarden/common/platform/factories/state-factory";
|
||||
import { Message, MessageListener, MessageSender } from "@bitwarden/common/platform/messaging";
|
||||
// eslint-disable-next-line no-restricted-imports -- Used for dependency injection
|
||||
import { SubjectMessageSender } from "@bitwarden/common/platform/messaging/internal";
|
||||
import { devFlagEnabled, flagEnabled } from "@bitwarden/common/platform/misc/flags";
|
||||
import { Account } from "@bitwarden/common/platform/models/domain/account";
|
||||
import { GlobalState } from "@bitwarden/common/platform/models/domain/global-state";
|
||||
@@ -147,6 +151,7 @@ import { ConsoleLogService } from "@bitwarden/common/platform/services/console-l
|
||||
import { CryptoService } from "@bitwarden/common/platform/services/crypto.service";
|
||||
import { EncryptServiceImplementation } from "@bitwarden/common/platform/services/cryptography/encrypt.service.implementation";
|
||||
import { MultithreadEncryptServiceImplementation } from "@bitwarden/common/platform/services/cryptography/multithread-encrypt.service.implementation";
|
||||
import { DefaultBroadcasterService } from "@bitwarden/common/platform/services/default-broadcaster.service";
|
||||
import { DefaultEnvironmentService } from "@bitwarden/common/platform/services/default-environment.service";
|
||||
import { FileUploadService } from "@bitwarden/common/platform/services/file-upload/file-upload.service";
|
||||
import { KeyGenerationService } from "@bitwarden/common/platform/services/key-generation.service";
|
||||
@@ -247,7 +252,6 @@ import {
|
||||
import { AuthGuard } from "../auth/guards/auth.guard";
|
||||
import { UnauthGuard } from "../auth/guards/unauth.guard";
|
||||
import { FormValidationErrorsService as FormValidationErrorsServiceAbstraction } from "../platform/abstractions/form-validation-errors.service";
|
||||
import { BroadcasterService } from "../platform/services/broadcaster.service";
|
||||
import { FormValidationErrorsService } from "../platform/services/form-validation-errors.service";
|
||||
import { LoggingErrorHandler } from "../platform/services/logging-error-handler";
|
||||
import { AngularThemingService } from "../platform/services/theming/angular-theming.service";
|
||||
@@ -270,6 +274,7 @@ import {
|
||||
SYSTEM_LANGUAGE,
|
||||
SYSTEM_THEME_OBSERVABLE,
|
||||
WINDOW,
|
||||
INTRAPROCESS_MESSAGING_SUBJECT,
|
||||
} from "./injection-tokens";
|
||||
import { ModalService } from "./modal.service";
|
||||
|
||||
@@ -625,7 +630,11 @@ const safeProviders: SafeProvider[] = [
|
||||
BillingAccountProfileStateService,
|
||||
],
|
||||
}),
|
||||
safeProvider({ provide: BroadcasterServiceAbstraction, useClass: BroadcasterService, deps: [] }),
|
||||
safeProvider({
|
||||
provide: BroadcasterService,
|
||||
useClass: DefaultBroadcasterService,
|
||||
deps: [MessageSender, MessageListener],
|
||||
}),
|
||||
safeProvider({
|
||||
provide: VaultTimeoutSettingsServiceAbstraction,
|
||||
useClass: VaultTimeoutSettingsService,
|
||||
@@ -1127,6 +1136,21 @@ const safeProviders: SafeProvider[] = [
|
||||
useClass: LoggingErrorHandler,
|
||||
deps: [],
|
||||
}),
|
||||
safeProvider({
|
||||
provide: INTRAPROCESS_MESSAGING_SUBJECT,
|
||||
useFactory: () => new Subject<Message<object>>(),
|
||||
deps: [],
|
||||
}),
|
||||
safeProvider({
|
||||
provide: MessageListener,
|
||||
useFactory: (subject: Subject<Message<object>>) => new MessageListener(subject.asObservable()),
|
||||
deps: [INTRAPROCESS_MESSAGING_SUBJECT],
|
||||
}),
|
||||
safeProvider({
|
||||
provide: MessageSender,
|
||||
useFactory: (subject: Subject<Message<object>>) => new SubjectMessageSender(subject),
|
||||
deps: [INTRAPROCESS_MESSAGING_SUBJECT],
|
||||
}),
|
||||
safeProvider({
|
||||
provide: ProviderApiServiceAbstraction,
|
||||
useClass: ProviderApiService,
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
export abstract class MessagingService {
|
||||
abstract send(subscriber: string, arg?: any): void;
|
||||
}
|
||||
// Export the new message sender as the legacy MessagingService to minimize changes in the initial PR,
|
||||
// team specific PR's will come after.
|
||||
export { MessageSender as MessagingService } from "../messaging/message.sender";
|
||||
|
||||
46
libs/common/src/platform/messaging/helpers.spec.ts
Normal file
46
libs/common/src/platform/messaging/helpers.spec.ts
Normal file
@@ -0,0 +1,46 @@
|
||||
import { Subject, firstValueFrom } from "rxjs";
|
||||
|
||||
import { getCommand, isExternalMessage, tagAsExternal } from "./helpers";
|
||||
import { Message, CommandDefinition } from "./types";
|
||||
|
||||
describe("helpers", () => {
|
||||
describe("getCommand", () => {
|
||||
it("can get the command from just a string", () => {
|
||||
const command = getCommand("myCommand");
|
||||
|
||||
expect(command).toEqual("myCommand");
|
||||
});
|
||||
|
||||
it("can get the command from a message definition", () => {
|
||||
const commandDefinition = new CommandDefinition<object>("myCommand");
|
||||
|
||||
const command = getCommand(commandDefinition);
|
||||
|
||||
expect(command).toEqual("myCommand");
|
||||
});
|
||||
});
|
||||
|
||||
describe("tag integration", () => {
|
||||
it("can tag and identify as tagged", async () => {
|
||||
const messagesSubject = new Subject<Message<object>>();
|
||||
|
||||
const taggedMessages = messagesSubject.asObservable().pipe(tagAsExternal);
|
||||
|
||||
const firstValuePromise = firstValueFrom(taggedMessages);
|
||||
|
||||
messagesSubject.next({ command: "test" });
|
||||
|
||||
const result = await firstValuePromise;
|
||||
|
||||
expect(isExternalMessage(result)).toEqual(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("isExternalMessage", () => {
|
||||
it.each([null, { command: "myCommand", test: "object" }, undefined] as Message<
|
||||
Record<string, unknown>
|
||||
>[])("returns false when value is %s", (value: Message<object>) => {
|
||||
expect(isExternalMessage(value)).toBe(false);
|
||||
});
|
||||
});
|
||||
});
|
||||
23
libs/common/src/platform/messaging/helpers.ts
Normal file
23
libs/common/src/platform/messaging/helpers.ts
Normal file
@@ -0,0 +1,23 @@
|
||||
import { MonoTypeOperatorFunction, map } from "rxjs";
|
||||
|
||||
import { Message, CommandDefinition } from "./types";
|
||||
|
||||
export const getCommand = (commandDefinition: CommandDefinition<object> | string) => {
|
||||
if (typeof commandDefinition === "string") {
|
||||
return commandDefinition;
|
||||
} else {
|
||||
return commandDefinition.command;
|
||||
}
|
||||
};
|
||||
|
||||
export const EXTERNAL_SOURCE_TAG = Symbol("externalSource");
|
||||
|
||||
export const isExternalMessage = (message: Message<object>) => {
|
||||
return (message as Record<PropertyKey, unknown>)?.[EXTERNAL_SOURCE_TAG] === true;
|
||||
};
|
||||
|
||||
export const tagAsExternal: MonoTypeOperatorFunction<Message<object>> = map(
|
||||
(message: Message<object>) => {
|
||||
return Object.assign(message, { [EXTERNAL_SOURCE_TAG]: true });
|
||||
},
|
||||
);
|
||||
4
libs/common/src/platform/messaging/index.ts
Normal file
4
libs/common/src/platform/messaging/index.ts
Normal file
@@ -0,0 +1,4 @@
|
||||
export { MessageListener } from "./message.listener";
|
||||
export { MessageSender } from "./message.sender";
|
||||
export { Message, CommandDefinition } from "./types";
|
||||
export { isExternalMessage } from "./helpers";
|
||||
5
libs/common/src/platform/messaging/internal.ts
Normal file
5
libs/common/src/platform/messaging/internal.ts
Normal file
@@ -0,0 +1,5 @@
|
||||
// Built in implementations
|
||||
export { SubjectMessageSender } from "./subject-message.sender";
|
||||
|
||||
// Helpers meant to be used only by other implementations
|
||||
export { tagAsExternal, getCommand } from "./helpers";
|
||||
47
libs/common/src/platform/messaging/message.listener.spec.ts
Normal file
47
libs/common/src/platform/messaging/message.listener.spec.ts
Normal file
@@ -0,0 +1,47 @@
|
||||
import { Subject } from "rxjs";
|
||||
|
||||
import { subscribeTo } from "../../../spec/observable-tracker";
|
||||
|
||||
import { MessageListener } from "./message.listener";
|
||||
import { Message, CommandDefinition } from "./types";
|
||||
|
||||
describe("MessageListener", () => {
|
||||
const subject = new Subject<Message<{ test: number }>>();
|
||||
const sut = new MessageListener(subject.asObservable());
|
||||
|
||||
const testCommandDefinition = new CommandDefinition<{ test: number }>("myCommand");
|
||||
|
||||
describe("allMessages$", () => {
|
||||
it("runs on all nexts", async () => {
|
||||
const tracker = subscribeTo(sut.allMessages$);
|
||||
|
||||
const pausePromise = tracker.pauseUntilReceived(2);
|
||||
|
||||
subject.next({ command: "command1", test: 1 });
|
||||
subject.next({ command: "command2", test: 2 });
|
||||
|
||||
await pausePromise;
|
||||
|
||||
expect(tracker.emissions[0]).toEqual({ command: "command1", test: 1 });
|
||||
expect(tracker.emissions[1]).toEqual({ command: "command2", test: 2 });
|
||||
});
|
||||
});
|
||||
|
||||
describe("messages$", () => {
|
||||
it("runs on only my commands", async () => {
|
||||
const tracker = subscribeTo(sut.messages$(testCommandDefinition));
|
||||
|
||||
const pausePromise = tracker.pauseUntilReceived(2);
|
||||
|
||||
subject.next({ command: "notMyCommand", test: 1 });
|
||||
subject.next({ command: "myCommand", test: 2 });
|
||||
subject.next({ command: "myCommand", test: 3 });
|
||||
subject.next({ command: "notMyCommand", test: 4 });
|
||||
|
||||
await pausePromise;
|
||||
|
||||
expect(tracker.emissions[0]).toEqual({ command: "myCommand", test: 2 });
|
||||
expect(tracker.emissions[1]).toEqual({ command: "myCommand", test: 3 });
|
||||
});
|
||||
});
|
||||
});
|
||||
41
libs/common/src/platform/messaging/message.listener.ts
Normal file
41
libs/common/src/platform/messaging/message.listener.ts
Normal file
@@ -0,0 +1,41 @@
|
||||
import { EMPTY, Observable, filter } from "rxjs";
|
||||
|
||||
import { Message, CommandDefinition } from "./types";
|
||||
|
||||
/**
|
||||
* A class that allows for listening to messages coming through the application,
|
||||
* allows for listening of all messages or just the messages you care about.
|
||||
*
|
||||
* @note Consider NOT using messaging at all if you can. State Providers offer an observable stream of
|
||||
* data that is persisted. This can serve messages that might have been used to notify of settings changes
|
||||
* or vault data changes and those observables should be preferred over messaging.
|
||||
*/
|
||||
export class MessageListener {
|
||||
constructor(private readonly messageStream: Observable<Message<object>>) {}
|
||||
|
||||
/**
|
||||
* A stream of all messages sent through the application. It does not contain type information for the
|
||||
* other properties on the messages. You are encouraged to instead subscribe to an individual message
|
||||
* through {@link messages$}.
|
||||
*/
|
||||
allMessages$ = this.messageStream;
|
||||
|
||||
/**
|
||||
* Creates an observable stream filtered to just the command given via the {@link CommandDefinition} and typed
|
||||
* to the generic contained in the CommandDefinition. Be careful using this method unless all your messages are being
|
||||
* sent through `MessageSender.send`, if that isn't the case you should have lower confidence in the message
|
||||
* payload being the expected type.
|
||||
*
|
||||
* @param commandDefinition The CommandDefinition containing the information about the message type you care about.
|
||||
*/
|
||||
messages$<T extends object>(commandDefinition: CommandDefinition<T>): Observable<T> {
|
||||
return this.allMessages$.pipe(
|
||||
filter((msg) => msg?.command === commandDefinition.command),
|
||||
) as Observable<T>;
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper property for returning a MessageListener that will never emit any messages and will immediately complete.
|
||||
*/
|
||||
static readonly EMPTY = new MessageListener(EMPTY);
|
||||
}
|
||||
62
libs/common/src/platform/messaging/message.sender.ts
Normal file
62
libs/common/src/platform/messaging/message.sender.ts
Normal file
@@ -0,0 +1,62 @@
|
||||
import { CommandDefinition } from "./types";
|
||||
|
||||
class MultiMessageSender implements MessageSender {
|
||||
constructor(private readonly innerMessageSenders: MessageSender[]) {}
|
||||
|
||||
send<T extends object>(
|
||||
commandDefinition: string | CommandDefinition<T>,
|
||||
payload: object | T = {},
|
||||
): void {
|
||||
for (const messageSender of this.innerMessageSenders) {
|
||||
messageSender.send(commandDefinition, payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export abstract class MessageSender {
|
||||
/**
|
||||
* A method for sending messages in a type safe manner. The passed in command definition
|
||||
* will require you to provide a compatible type in the payload parameter.
|
||||
*
|
||||
* @example
|
||||
* const MY_COMMAND = new CommandDefinition<{ test: number }>("myCommand");
|
||||
*
|
||||
* this.messageSender.send(MY_COMMAND, { test: 14 });
|
||||
*
|
||||
* @param commandDefinition
|
||||
* @param payload
|
||||
*/
|
||||
abstract send<T extends object>(commandDefinition: CommandDefinition<T>, payload: T): void;
|
||||
|
||||
/**
|
||||
* A legacy method for sending messages in a non-type safe way.
|
||||
*
|
||||
* @remarks Consider defining a {@link CommandDefinition} and passing that in for the first parameter to
|
||||
* get compilation errors when defining an incompatible payload.
|
||||
*
|
||||
* @param command The string based command of your message.
|
||||
* @param payload Extra contextual information regarding the message. Be aware that this payload may
|
||||
* be serialized and lose all prototype information.
|
||||
*/
|
||||
abstract send(command: string, payload?: object): void;
|
||||
|
||||
/** Implementation of the other two overloads, read their docs instead. */
|
||||
abstract send<T extends object>(
|
||||
commandDefinition: CommandDefinition<T> | string,
|
||||
payload: T | object,
|
||||
): void;
|
||||
|
||||
/**
|
||||
* A helper method for combine multiple {@link MessageSender}'s.
|
||||
* @param messageSenders The message senders that should be combined.
|
||||
* @returns A message sender that will relay all messages to the given message senders.
|
||||
*/
|
||||
static combine(...messageSenders: MessageSender[]) {
|
||||
return new MultiMessageSender(messageSenders);
|
||||
}
|
||||
|
||||
/**
|
||||
* A helper property for creating a {@link MessageSender} that sends to nowhere.
|
||||
*/
|
||||
static readonly EMPTY: MessageSender = new MultiMessageSender([]);
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
import { Subject } from "rxjs";
|
||||
|
||||
import { subscribeTo } from "../../../spec/observable-tracker";
|
||||
|
||||
import { SubjectMessageSender } from "./internal";
|
||||
import { MessageSender } from "./message.sender";
|
||||
import { Message, CommandDefinition } from "./types";
|
||||
|
||||
describe("SubjectMessageSender", () => {
|
||||
const subject = new Subject<Message<{ test: number }>>();
|
||||
const subjectObservable = subject.asObservable();
|
||||
|
||||
const sut: MessageSender = new SubjectMessageSender(subject);
|
||||
|
||||
describe("send", () => {
|
||||
it("will send message with command from message definition", async () => {
|
||||
const commandDefinition = new CommandDefinition<{ test: number }>("myCommand");
|
||||
|
||||
const tracker = subscribeTo(subjectObservable);
|
||||
const pausePromise = tracker.pauseUntilReceived(1);
|
||||
|
||||
sut.send(commandDefinition, { test: 1 });
|
||||
|
||||
await pausePromise;
|
||||
|
||||
expect(tracker.emissions[0]).toEqual({ command: "myCommand", test: 1 });
|
||||
});
|
||||
|
||||
it("will send message with command from normal string", async () => {
|
||||
const tracker = subscribeTo(subjectObservable);
|
||||
const pausePromise = tracker.pauseUntilReceived(1);
|
||||
|
||||
sut.send("myCommand", { test: 1 });
|
||||
|
||||
await pausePromise;
|
||||
|
||||
expect(tracker.emissions[0]).toEqual({ command: "myCommand", test: 1 });
|
||||
});
|
||||
|
||||
it("will send message with object even if payload not given", async () => {
|
||||
const tracker = subscribeTo(subjectObservable);
|
||||
const pausePromise = tracker.pauseUntilReceived(1);
|
||||
|
||||
sut.send("myCommand");
|
||||
|
||||
await pausePromise;
|
||||
|
||||
expect(tracker.emissions[0]).toEqual({ command: "myCommand" });
|
||||
});
|
||||
|
||||
it.each([null, undefined])(
|
||||
"will send message with object even if payload is null-ish (%s)",
|
||||
async (payloadValue) => {
|
||||
const tracker = subscribeTo(subjectObservable);
|
||||
const pausePromise = tracker.pauseUntilReceived(1);
|
||||
|
||||
sut.send("myCommand", payloadValue);
|
||||
|
||||
await pausePromise;
|
||||
|
||||
expect(tracker.emissions[0]).toEqual({ command: "myCommand" });
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
17
libs/common/src/platform/messaging/subject-message.sender.ts
Normal file
17
libs/common/src/platform/messaging/subject-message.sender.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
import { Subject } from "rxjs";
|
||||
|
||||
import { getCommand } from "./internal";
|
||||
import { MessageSender } from "./message.sender";
|
||||
import { Message, CommandDefinition } from "./types";
|
||||
|
||||
export class SubjectMessageSender implements MessageSender {
|
||||
constructor(private readonly messagesSubject: Subject<Message<object>>) {}
|
||||
|
||||
send<T extends object>(
|
||||
commandDefinition: string | CommandDefinition<T>,
|
||||
payload: object | T = {},
|
||||
): void {
|
||||
const command = getCommand(commandDefinition);
|
||||
this.messagesSubject.next(Object.assign(payload ?? {}, { command: command }));
|
||||
}
|
||||
}
|
||||
13
libs/common/src/platform/messaging/types.ts
Normal file
13
libs/common/src/platform/messaging/types.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
declare const tag: unique symbol;
|
||||
|
||||
/**
|
||||
* A class for defining information about a message, this is helpful
|
||||
* alonside `MessageSender` and `MessageListener` for providing a type
|
||||
* safe(-ish) way of sending and receiving messages.
|
||||
*/
|
||||
export class CommandDefinition<T extends object> {
|
||||
[tag]: T;
|
||||
constructor(readonly command: string) {}
|
||||
}
|
||||
|
||||
export type Message<T extends object> = { command: string } & T;
|
||||
@@ -1,34 +0,0 @@
|
||||
import {
|
||||
BroadcasterService as BroadcasterServiceAbstraction,
|
||||
MessageBase,
|
||||
} from "../abstractions/broadcaster.service";
|
||||
|
||||
export class BroadcasterService implements BroadcasterServiceAbstraction {
|
||||
subscribers: Map<string, (message: MessageBase) => void> = new Map<
|
||||
string,
|
||||
(message: MessageBase) => void
|
||||
>();
|
||||
|
||||
send(message: MessageBase, id?: string) {
|
||||
if (id != null) {
|
||||
if (this.subscribers.has(id)) {
|
||||
this.subscribers.get(id)(message);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
this.subscribers.forEach((value) => {
|
||||
value(message);
|
||||
});
|
||||
}
|
||||
|
||||
subscribe(id: string, messageCallback: (message: MessageBase) => void) {
|
||||
this.subscribers.set(id, messageCallback);
|
||||
}
|
||||
|
||||
unsubscribe(id: string) {
|
||||
if (this.subscribers.has(id)) {
|
||||
this.subscribers.delete(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
import { Subscription } from "rxjs";
|
||||
|
||||
import { BroadcasterService, MessageBase } from "../abstractions/broadcaster.service";
|
||||
import { MessageListener, MessageSender } from "../messaging";
|
||||
|
||||
/**
|
||||
* Temporary implementation that just delegates to the message sender and message listener
|
||||
* and manages their subscriptions.
|
||||
*/
|
||||
export class DefaultBroadcasterService implements BroadcasterService {
|
||||
subscriptions = new Map<string, Subscription>();
|
||||
|
||||
constructor(
|
||||
private readonly messageSender: MessageSender,
|
||||
private readonly messageListener: MessageListener,
|
||||
) {}
|
||||
|
||||
send(message: MessageBase, id?: string) {
|
||||
this.messageSender.send(message?.command, message);
|
||||
}
|
||||
|
||||
subscribe(id: string, messageCallback: (message: MessageBase) => void) {
|
||||
this.subscriptions.set(
|
||||
id,
|
||||
this.messageListener.allMessages$.subscribe((message) => {
|
||||
messageCallback(message);
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
unsubscribe(id: string) {
|
||||
const subscription = this.subscriptions.get(id);
|
||||
subscription?.unsubscribe();
|
||||
this.subscriptions.delete(id);
|
||||
}
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
import { MessagingService } from "../abstractions/messaging.service";
|
||||
|
||||
export class NoopMessagingService implements MessagingService {
|
||||
send(subscriber: string, arg: any = {}) {
|
||||
// Do nothing...
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user