From 534abad46b4937c16067db1320e07798db48b43f Mon Sep 17 00:00:00 2001 From: Erik Marks Date: Thu, 7 Nov 2024 15:26:36 +0700 Subject: [PATCH 1/2] feat(streams): Add Node worker_thread stream --- packages/streams/src/index.test.ts | 4 + packages/streams/src/index.ts | 6 + packages/streams/src/node/NodeWorkerStream.ts | 122 ++++++++++++++++++ 3 files changed, 132 insertions(+) create mode 100644 packages/streams/src/node/NodeWorkerStream.ts diff --git a/packages/streams/src/index.test.ts b/packages/streams/src/index.test.ts index 4daecae1a..fcb8fc631 100644 --- a/packages/streams/src/index.test.ts +++ b/packages/streams/src/index.test.ts @@ -14,6 +14,10 @@ describe('index', () => { 'MessagePortMultiplexer', 'MessagePortReader', 'MessagePortWriter', + 'NodeWorkerDuplexStream', + 'NodeWorkerMultiplexer', + 'NodeWorkerReader', + 'NodeWorkerWriter', 'PostMessageDuplexStream', 'PostMessageReader', 'PostMessageWriter', diff --git a/packages/streams/src/index.ts b/packages/streams/src/index.ts index a3b18080f..540ee65a6 100644 --- a/packages/streams/src/index.ts +++ b/packages/streams/src/index.ts @@ -23,6 +23,12 @@ export { PostMessageReader, PostMessageWriter, } from './browser/PostMessageStream.js'; +export { + NodeWorkerReader, + NodeWorkerWriter, + NodeWorkerDuplexStream, + NodeWorkerMultiplexer, +} from './node/NodeWorkerStream.js'; export type { PostMessageEnvelope, PostMessageTarget, diff --git a/packages/streams/src/node/NodeWorkerStream.ts b/packages/streams/src/node/NodeWorkerStream.ts new file mode 100644 index 000000000..19f283407 --- /dev/null +++ b/packages/streams/src/node/NodeWorkerStream.ts @@ -0,0 +1,122 @@ +/** + * @module Node Worker streams + */ + +import { + BaseDuplexStream, + makeDuplexStreamInputValidator, +} from '../BaseDuplexStream.js'; +import type { + BaseReaderArgs, + BaseWriterArgs, + ValidateInput, +} from '../BaseStream.js'; +import { BaseReader, BaseWriter } from '../BaseStream.js'; +import { + isMultiplexEnvelope, + StreamMultiplexer, +} from '../StreamMultiplexer.js'; +import type { Dispatchable } from '../utils.js'; + +type OnMessage = (message: unknown) => void; + +type NodePort = { + postMessage: (message: unknown) => void; + on: (event: 'message', listener: OnMessage) => void; +}; + +/** + * A readable stream over a {@link NodePort}. + * + * @see + * - {@link NodeWorkerWriter} for the corresponding writable stream. + * - The module-level documentation for more details. + */ +export class NodeWorkerReader extends BaseReader { + constructor( + port: NodePort, + { validateInput, onEnd }: BaseReaderArgs = {}, + ) { + super({ + validateInput, + onEnd: async () => await onEnd?.(), + }); + + const receiveInput = super.getReceiveInput(); + port.on('message', (data) => { + receiveInput(data).catch(async (error) => this.throw(error)); + }); + harden(this); + } +} +harden(NodeWorkerReader); + +/** + * A writable stream over a {@link NodeWorker}. + * + * @see + * - {@link NodeWorkerReader} for the corresponding readable stream. + * - The module-level documentation for more details. + */ +export class NodeWorkerWriter extends BaseWriter { + constructor( + port: NodePort, + { name, onEnd }: Omit, 'onDispatch'> = {}, + ) { + super({ + name, + onDispatch: (value: Dispatchable) => port.postMessage(value), + onEnd: async () => { + await onEnd?.(); + }, + }); + harden(this); + } +} +harden(NodeWorkerWriter); + +export class NodeWorkerDuplexStream< + Read, + Write = Read, +> extends BaseDuplexStream< + Read, + NodeWorkerReader, + Write, + NodeWorkerWriter +> { + constructor(port: NodePort, validateInput?: ValidateInput) { + let writer: NodeWorkerWriter; // eslint-disable-line prefer-const + const reader = new NodeWorkerReader(port, { + name: 'NodeWorkerDuplexStream', + validateInput: makeDuplexStreamInputValidator(validateInput), + onEnd: async () => { + await writer.return(); + }, + }); + writer = new NodeWorkerWriter(port, { + name: 'NodeWorkerDuplexStream', + onEnd: async () => { + await reader.return(); + }, + }); + super(reader, writer); + } + + static async make( + port: NodePort, + validateInput?: ValidateInput, + ): Promise> { + const stream = new NodeWorkerDuplexStream(port, validateInput); + await stream.synchronize(); + return stream; + } +} +harden(NodeWorkerDuplexStream); + +export class NodeWorkerMultiplexer extends StreamMultiplexer { + constructor(port: NodePort, name?: string) { + super(new NodeWorkerDuplexStream(port, isMultiplexEnvelope), name); + harden(this); + } +} +harden(NodeWorkerMultiplexer); From 9f45fb8222688cd22a79d3d7df94ec45be00d81c Mon Sep 17 00:00:00 2001 From: Erik Marks Date: Tue, 10 Dec 2024 11:25:41 -0800 Subject: [PATCH 2/2] test(streams): Add NodeWorkerStream tests --- .../streams/src/node/NodeWorkerStream.test.ts | 212 ++++++++++++++++++ packages/streams/src/node/NodeWorkerStream.ts | 6 +- 2 files changed, 215 insertions(+), 3 deletions(-) create mode 100644 packages/streams/src/node/NodeWorkerStream.test.ts diff --git a/packages/streams/src/node/NodeWorkerStream.test.ts b/packages/streams/src/node/NodeWorkerStream.test.ts new file mode 100644 index 000000000..b8ac9a5ad --- /dev/null +++ b/packages/streams/src/node/NodeWorkerStream.test.ts @@ -0,0 +1,212 @@ +import { delay } from '@ocap/utils'; +import { describe, it, expect, vi } from 'vitest'; +import type { Mocked } from 'vitest'; + +import { + NodeWorkerDuplexStream, + NodeWorkerMultiplexer, + NodeWorkerReader, + NodeWorkerWriter, +} from './NodeWorkerStream.js'; +import type { NodePort, OnMessage } from './NodeWorkerStream.js'; +import { makeMultiplexEnvelope } from '../../test/stream-mocks.js'; +import { makeAck } from '../BaseDuplexStream.js'; +import type { ValidateInput } from '../BaseStream.js'; +import { + makeDoneResult, + makePendingResult, + makeStreamDoneSignal, +} from '../utils.js'; + +const makeMockNodePort = (): Mocked & { + messageHandler?: OnMessage | undefined; +} => { + const port = { + on: vi.fn((_event, listener) => { + port.messageHandler = listener; + }), + postMessage: vi.fn(), + messageHandler: undefined, + }; + return port; +}; + +describe('NodeWorkerReader', () => { + it('constructs a NodeWorkerReader', () => { + const port = makeMockNodePort(); + const reader = new NodeWorkerReader(port); + + expect(reader).toBeInstanceOf(NodeWorkerReader); + expect(reader[Symbol.asyncIterator]()).toBe(reader); + expect(port.on).toHaveBeenCalledOnce(); + }); + + it('emits messages received from port', async () => { + const port = makeMockNodePort(); + const reader = new NodeWorkerReader(port); + + const message = { foo: 'bar' }; + port.messageHandler?.(message); + + expect(await reader.next()).toStrictEqual(makePendingResult(message)); + }); + + it('calls validateInput with received input if specified', async () => { + const port = makeMockNodePort(); + const validateInput = vi + .fn() + .mockReturnValue(true) as unknown as ValidateInput; + const reader = new NodeWorkerReader(port, { validateInput }); + + const message = { foo: 'bar' }; + port.messageHandler?.(message); + + expect(await reader.next()).toStrictEqual(makePendingResult(message)); + expect(validateInput).toHaveBeenCalledWith(message); + }); + + it('throws if validateInput throws', async () => { + const port = makeMockNodePort(); + const validateInput = (() => { + throw new Error('foo'); + }) as unknown as ValidateInput; + const reader = new NodeWorkerReader(port, { validateInput }); + + port.messageHandler?.(42); + await expect(reader.next()).rejects.toThrow('foo'); + expect(await reader.next()).toStrictEqual(makeDoneResult()); + }); + + it('calls onEnd once when ending', async () => { + const port = makeMockNodePort(); + const onEnd = vi.fn(); + const reader = new NodeWorkerReader(port, { onEnd }); + + port.messageHandler?.(makeStreamDoneSignal()); + + expect(await reader.next()).toStrictEqual(makeDoneResult()); + expect(onEnd).toHaveBeenCalledTimes(1); + expect(await reader.next()).toStrictEqual(makeDoneResult()); + expect(onEnd).toHaveBeenCalledTimes(1); + }); +}); + +describe('NodeWorkerWriter', () => { + it('constructs a NodeWorkerWriter', () => { + const port = makeMockNodePort(); + const writer = new NodeWorkerWriter(port); + + expect(writer).toBeInstanceOf(NodeWorkerWriter); + expect(writer[Symbol.asyncIterator]()).toBe(writer); + }); + + it('writes messages to the port', async () => { + const port = makeMockNodePort(); + const writer = new NodeWorkerWriter(port); + + const message = { foo: 'bar' }; + const nextP = writer.next(message); + + expect(await nextP).toStrictEqual(makePendingResult(undefined)); + expect(port.postMessage).toHaveBeenCalledWith(message); + }); + + it('calls onEnd once when ending', async () => { + const port = makeMockNodePort(); + const onEnd = vi.fn(); + const writer = new NodeWorkerWriter(port, { onEnd }); + + expect(await writer.return()).toStrictEqual(makeDoneResult()); + expect(onEnd).toHaveBeenCalledTimes(1); + expect(await writer.return()).toStrictEqual(makeDoneResult()); + expect(onEnd).toHaveBeenCalledTimes(1); + }); +}); + +describe('NodeWorkerDuplexStream', () => { + const makeDuplexStream = async ( + port = makeMockNodePort(), + validateInput?: ValidateInput, + ): Promise> => { + const duplexStreamP = NodeWorkerDuplexStream.make( + port, + validateInput, + ); + port.messageHandler?.(makeAck()); + return await duplexStreamP; + }; + + it('constructs a NodeWorkerDuplexStream', async () => { + const duplexStream = await makeDuplexStream(); + + expect(duplexStream).toBeInstanceOf(NodeWorkerDuplexStream); + expect(duplexStream[Symbol.asyncIterator]()).toBe(duplexStream); + }); + + it('calls validateInput with received input if specified', async () => { + const validateInput = vi + .fn() + .mockReturnValue(true) as unknown as ValidateInput; + const port = makeMockNodePort(); + const duplexStream = await makeDuplexStream(port, validateInput); + + port.messageHandler?.(42); + + expect(await duplexStream.next()).toStrictEqual(makePendingResult(42)); + expect(validateInput).toHaveBeenCalledWith(42); + }); + + it('ends the reader when the writer ends', async () => { + const port = makeMockNodePort(); + port.postMessage + .mockImplementationOnce(() => undefined) + .mockImplementationOnce(() => { + throw new Error('foo'); + }); + const duplexStream = await makeDuplexStream(port); + + await expect(duplexStream.write(42)).rejects.toThrow( + 'NodeWorkerDuplexStream experienced a dispatch failure', + ); + expect(await duplexStream.next()).toStrictEqual(makeDoneResult()); + }); + + it('ends the writer when the reader ends', async () => { + const port = makeMockNodePort(); + const duplexStream = await makeDuplexStream(port); + + const readP = duplexStream.next(); + port.messageHandler?.(makeStreamDoneSignal()); + await delay(10); + expect(await duplexStream.write(42)).toStrictEqual(makeDoneResult()); + expect(await readP).toStrictEqual(makeDoneResult()); + }); +}); + +describe('NodeWorkerMultiplexer', () => { + it('constructs a NodeWorkerMultiplexer', () => { + const port = makeMockNodePort(); + const multiplexer = new NodeWorkerMultiplexer(port); + + expect(multiplexer).toBeInstanceOf(NodeWorkerMultiplexer); + }); + + it('can create and drain channels', async () => { + const port = makeMockNodePort(); + const multiplexer = new NodeWorkerMultiplexer(port); + const ch1Handler = vi.fn(); + const ch1 = multiplexer.createChannel( + '1', + (value: unknown): value is number => typeof value === 'number', + ); + + const drainP = Promise.all([multiplexer.start(), ch1.drain(ch1Handler)]); + port.messageHandler?.(makeAck()); + port.messageHandler?.(makeMultiplexEnvelope('1', makeAck())); + port.messageHandler?.(makeMultiplexEnvelope('1', 42)); + port.messageHandler?.(makeStreamDoneSignal()); + + await drainP; + expect(ch1Handler).toHaveBeenCalledWith(42); + }); +}); diff --git a/packages/streams/src/node/NodeWorkerStream.ts b/packages/streams/src/node/NodeWorkerStream.ts index 19f283407..e9f6c20e5 100644 --- a/packages/streams/src/node/NodeWorkerStream.ts +++ b/packages/streams/src/node/NodeWorkerStream.ts @@ -18,11 +18,11 @@ import { } from '../StreamMultiplexer.js'; import type { Dispatchable } from '../utils.js'; -type OnMessage = (message: unknown) => void; +export type OnMessage = (message: unknown) => void; -type NodePort = { - postMessage: (message: unknown) => void; +export type NodePort = { on: (event: 'message', listener: OnMessage) => void; + postMessage: (message: unknown) => void; }; /**