From c119fd0f4e7fd74ee3190dc3188767dc25e7c2c4 Mon Sep 17 00:00:00 2001 From: Andreas Coroiu Date: Fri, 24 Oct 2025 16:35:39 +0200 Subject: [PATCH] feat: working value fetching --- .../src/platform/services/sdk/rpc/client.ts | 35 ++++++ .../src/platform/services/sdk/rpc/protocol.ts | 12 ++ .../src/platform/services/sdk/rpc/proxies.ts | 105 ++++++++++++++++++ .../services/sdk/rpc/reference-store.ts | 24 ++++ .../src/platform/services/sdk/rpc/rpc.spec.ts | 53 +++++++++ .../src/platform/services/sdk/rpc/server.ts | 43 +++++++ 6 files changed, 272 insertions(+) create mode 100644 libs/common/src/platform/services/sdk/rpc/client.ts create mode 100644 libs/common/src/platform/services/sdk/rpc/protocol.ts create mode 100644 libs/common/src/platform/services/sdk/rpc/proxies.ts create mode 100644 libs/common/src/platform/services/sdk/rpc/reference-store.ts create mode 100644 libs/common/src/platform/services/sdk/rpc/rpc.spec.ts create mode 100644 libs/common/src/platform/services/sdk/rpc/server.ts diff --git a/libs/common/src/platform/services/sdk/rpc/client.ts b/libs/common/src/platform/services/sdk/rpc/client.ts new file mode 100644 index 00000000000..e08e402c932 --- /dev/null +++ b/libs/common/src/platform/services/sdk/rpc/client.ts @@ -0,0 +1,35 @@ +import { map, Observable } from "rxjs"; + +import { Remote } from "../remote"; + +import { Command, Response } from "./protocol"; +import { RpcObjectReference } from "./proxies"; + +export interface RpcRequestChannel { + sendCommand(command: Command): Promise; + subscribeToRoot(): Observable; +} + +export class RpcClient { + constructor(private channel: RpcRequestChannel) {} + + getRoot(): Observable> { + return this.channel.subscribeToRoot().pipe( + map((response) => { + if (response.status === "error") { + throw new Error(`RPC Error: ${response.error}`); + } + + if (response.result.type !== "reference") { + throw new Error(`Expected reference result for root object`); + } + + return RpcObjectReference.create( + this.channel, + response.result.referenceId, + response.result.objectType, + ) as Remote; + }), + ); + } +} diff --git a/libs/common/src/platform/services/sdk/rpc/protocol.ts b/libs/common/src/platform/services/sdk/rpc/protocol.ts new file mode 100644 index 00000000000..04fe7189161 --- /dev/null +++ b/libs/common/src/platform/services/sdk/rpc/protocol.ts @@ -0,0 +1,12 @@ +export type ReferenceId = number; + +export type Command = + | { method: "get"; referenceId: ReferenceId; propertyName: string } + | { method: "call"; referenceId: ReferenceId; propertyName: string; args: unknown[] } + | { method: "release"; referenceId: ReferenceId }; + +export type Response = { status: "success"; result: Result } | { status: "error"; error: unknown }; + +export type Result = + | { type: "value"; value: unknown } + | { type: "reference"; referenceId: ReferenceId; objectType?: string }; diff --git a/libs/common/src/platform/services/sdk/rpc/proxies.ts b/libs/common/src/platform/services/sdk/rpc/proxies.ts new file mode 100644 index 00000000000..88b99e58f37 --- /dev/null +++ b/libs/common/src/platform/services/sdk/rpc/proxies.ts @@ -0,0 +1,105 @@ +import { RpcRequestChannel } from "./client"; +import { Command, ReferenceId } from "./protocol"; + +/** + * A reference to a remote object. + */ +export class RpcObjectReference { + static create( + channel: RpcRequestChannel, + referenceId: ReferenceId, + objectType?: string, + ): RpcObjectReference { + return ProxiedReference(channel, new RpcObjectReference(referenceId, objectType)); + } + + private constructor( + public referenceId: ReferenceId, + public objectType?: string, + ) {} +} + +function ProxiedReference( + channel: RpcRequestChannel, + reference: RpcObjectReference, +): RpcObjectReference { + return new Proxy(reference, { + get(target, propertyName: string) { + if (propertyName === "then") { + // Allow awaiting the proxy itself + return undefined; + } + + // console.log(`Accessing ${reference.objectType}.${propertyName}`); + return RpcPropertyReference.create(channel, target, propertyName); + }, + }); +} + +/** + * A reference to a specific property on a remote object. + */ +export class RpcPropertyReference { + static create( + channel: RpcRequestChannel, + objectReference: RpcObjectReference, + propertyName: string, + ): RpcPropertyReference { + return ProxiedReferenceProperty( + channel, + new RpcPropertyReference(objectReference, propertyName), + ); + } + + private constructor( + public objectReference: RpcObjectReference, + public propertyName: string, + ) {} +} + +/** + * A sub-proxy for a specific property of a proxied reference + * This is because we need to handle property accesses differently than method calls + * but we don't know which type it is until it gets consumed. + * + * If this references a method then the `apply` trap will be called on this proxy. + * If this references a property then they'll try to await the value, triggering the `get` trap + * when they access the `then` property. + */ +function ProxiedReferenceProperty(channel: RpcRequestChannel, reference: RpcPropertyReference) { + return new Proxy(reference, { + get(_target, propertyName: string) { + // console.log( + // `Accessing ${reference.objectReference.objectType}.${reference.propertyName}.${propertyName}`, + // ); + + if (propertyName !== "then") { + throw new Error(`Cannot access property '${propertyName}' on remote proxy synchronously`); + } + + return (onFulfilled: (value: any) => void, onRejected: (error: any) => void) => { + (async () => { + // Handle property access + const command: Command = { + method: "get", + referenceId: reference.objectReference.referenceId, + propertyName: reference.propertyName, + }; + // Send the command over the channel + const result = await channel.sendCommand(command); + + if (result.status === "error") { + throw new Error(`RPC Error: ${result.error}`); + } + + if (result.result.type === "value") { + return result.result.value; + } else if (result.result.type === "reference") { + return RpcObjectReference.create(channel, result.result.referenceId); + } + })().then(onFulfilled, onRejected); + }; + }, + apply(_target, _thisArg, argArray: unknown[]) {}, + }); +} diff --git a/libs/common/src/platform/services/sdk/rpc/reference-store.ts b/libs/common/src/platform/services/sdk/rpc/reference-store.ts new file mode 100644 index 00000000000..b91304796aa --- /dev/null +++ b/libs/common/src/platform/services/sdk/rpc/reference-store.ts @@ -0,0 +1,24 @@ +import { ReferenceId } from "./protocol"; + +export class ReferenceStore { + private _store = new Map(); + private _nextId = 1; + + get(id: number): T | undefined { + return this._store.get(id); + } + + store(value: T): ReferenceId { + const id = this.generateId(); + this._store.set(id, value); + return id; + } + + release(id: ReferenceId): void { + this._store.delete(id); + } + + private generateId(): ReferenceId { + return this._nextId++; + } +} diff --git a/libs/common/src/platform/services/sdk/rpc/rpc.spec.ts b/libs/common/src/platform/services/sdk/rpc/rpc.spec.ts new file mode 100644 index 00000000000..8f96b29728f --- /dev/null +++ b/libs/common/src/platform/services/sdk/rpc/rpc.spec.ts @@ -0,0 +1,53 @@ +import { firstValueFrom, map, Observable } from "rxjs"; + +import { RpcClient, RpcRequestChannel } from "./client"; +import { Response } from "./protocol"; +import { RpcServer } from "./server"; + +describe("RpcServer", () => { + let server!: RpcServer; + let client!: RpcClient; + + beforeEach(() => { + server = new RpcServer(); + client = new RpcClient(new InMemoryChannel(server)); + + server.setValue(new TestClass()); + }); + + it("fetches property value", async () => { + const remoteInstance = await firstValueFrom(client.getRoot()); + + const value = await remoteInstance.value; + + expect(value).toBe(42); + }); + + it.skip("calls sync function and returns value", async () => { + const remoteInstance = await firstValueFrom(client.getRoot()); + + const result = await remoteInstance.greet("World"); + + expect(result).toBe("Hello, World!"); + }); +}); + +class TestClass { + value: number = 42; + + greet(name: string): string { + return `Hello, ${name}!`; + } +} + +class InMemoryChannel implements RpcRequestChannel { + constructor(private server: RpcServer) {} + + async sendCommand(command: any): Promise { + return this.server.handle(command); + } + + subscribeToRoot(): Observable { + return this.server.value$.pipe(map((result) => ({ status: "success", result }))); + } +} diff --git a/libs/common/src/platform/services/sdk/rpc/server.ts b/libs/common/src/platform/services/sdk/rpc/server.ts new file mode 100644 index 00000000000..f206d106989 --- /dev/null +++ b/libs/common/src/platform/services/sdk/rpc/server.ts @@ -0,0 +1,43 @@ +import { map, Observable, ReplaySubject } from "rxjs"; + +import { Command, Response, Result } from "./protocol"; +import { ReferenceStore } from "./reference-store"; + +export class RpcServer { + private references = new ReferenceStore(); + private _value$ = new ReplaySubject(1); + readonly value$: Observable = this._value$.pipe( + map((value) => { + const referenceId = this.references.store(value); + return { type: "reference", referenceId, objectType: value?.constructor?.name }; + }), + ); + + constructor() {} + + handle(command: Command): Response { + if (command.method === "get") { + const target = this.references.get(command.referenceId); + if (!target) { + return { status: "error", error: `[RPC] Reference ID ${command.referenceId} not found` }; + } + + try { + const propertyValue = target[command.propertyName]; + if (typeof propertyValue === "function") { + return { status: "error", error: `[RPC] Property ${command.propertyName} is a function` }; + } else { + return { status: "success", result: { type: "value", value: propertyValue } }; + } + } catch (error) { + return { status: "error", error }; + } + } + + return { status: "error", error: `Unknown command method: ${command.method}` }; + } + + setValue(value: T) { + this._value$.next(value); + } +}