Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/streams/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ describe('index', () => {
'MessagePortMultiplexer',
'MessagePortReader',
'MessagePortWriter',
'NodeWorkerDuplexStream',
'NodeWorkerMultiplexer',
'NodeWorkerReader',
'NodeWorkerWriter',
'PostMessageDuplexStream',
'PostMessageReader',
'PostMessageWriter',
Expand Down
6 changes: 6 additions & 0 deletions packages/streams/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ export {
PostMessageReader,
PostMessageWriter,
} from './browser/PostMessageStream.js';
export {
NodeWorkerReader,
NodeWorkerWriter,
NodeWorkerDuplexStream,
NodeWorkerMultiplexer,
} from './node/NodeWorkerStream.js';
export type {
PostMessageEnvelope,
PostMessageTarget,
Expand Down
212 changes: 212 additions & 0 deletions packages/streams/src/node/NodeWorkerStream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import { delay } from '@ocap/utils';
import { describe, it, expect, vi } from 'vitest';
import type { Mocked } from 'vitest';

import {
NodeWorkerDuplexStream,
NodeWorkerMultiplexer,
NodeWorkerReader,
NodeWorkerWriter,
} from './NodeWorkerStream.js';
import type { NodePort, OnMessage } from './NodeWorkerStream.js';
import { makeMultiplexEnvelope } from '../../test/stream-mocks.js';
import { makeAck } from '../BaseDuplexStream.js';
import type { ValidateInput } from '../BaseStream.js';
import {
makeDoneResult,
makePendingResult,
makeStreamDoneSignal,
} from '../utils.js';

const makeMockNodePort = (): Mocked<NodePort> & {
messageHandler?: OnMessage | undefined;
} => {
const port = {
on: vi.fn((_event, listener) => {
port.messageHandler = listener;
}),
postMessage: vi.fn(),
messageHandler: undefined,
};
return port;
};

describe('NodeWorkerReader', () => {
it('constructs a NodeWorkerReader', () => {
const port = makeMockNodePort();
const reader = new NodeWorkerReader(port);

expect(reader).toBeInstanceOf(NodeWorkerReader);
expect(reader[Symbol.asyncIterator]()).toBe(reader);
expect(port.on).toHaveBeenCalledOnce();
});

it('emits messages received from port', async () => {
const port = makeMockNodePort();
const reader = new NodeWorkerReader(port);

const message = { foo: 'bar' };
port.messageHandler?.(message);

expect(await reader.next()).toStrictEqual(makePendingResult(message));
});

it('calls validateInput with received input if specified', async () => {
const port = makeMockNodePort();
const validateInput = vi
.fn()
.mockReturnValue(true) as unknown as ValidateInput<number>;
const reader = new NodeWorkerReader(port, { validateInput });

const message = { foo: 'bar' };
port.messageHandler?.(message);

expect(await reader.next()).toStrictEqual(makePendingResult(message));
expect(validateInput).toHaveBeenCalledWith(message);
});

it('throws if validateInput throws', async () => {
const port = makeMockNodePort();
const validateInput = (() => {
throw new Error('foo');
}) as unknown as ValidateInput<number>;
const reader = new NodeWorkerReader(port, { validateInput });

port.messageHandler?.(42);
await expect(reader.next()).rejects.toThrow('foo');
expect(await reader.next()).toStrictEqual(makeDoneResult());
});

it('calls onEnd once when ending', async () => {
const port = makeMockNodePort();
const onEnd = vi.fn();
const reader = new NodeWorkerReader(port, { onEnd });

port.messageHandler?.(makeStreamDoneSignal());

expect(await reader.next()).toStrictEqual(makeDoneResult());
expect(onEnd).toHaveBeenCalledTimes(1);
expect(await reader.next()).toStrictEqual(makeDoneResult());
expect(onEnd).toHaveBeenCalledTimes(1);
});
});

describe('NodeWorkerWriter', () => {
it('constructs a NodeWorkerWriter', () => {
const port = makeMockNodePort();
const writer = new NodeWorkerWriter(port);

expect(writer).toBeInstanceOf(NodeWorkerWriter);
expect(writer[Symbol.asyncIterator]()).toBe(writer);
});

it('writes messages to the port', async () => {
const port = makeMockNodePort();
const writer = new NodeWorkerWriter(port);

const message = { foo: 'bar' };
const nextP = writer.next(message);

expect(await nextP).toStrictEqual(makePendingResult(undefined));
expect(port.postMessage).toHaveBeenCalledWith(message);
});

it('calls onEnd once when ending', async () => {
const port = makeMockNodePort();
const onEnd = vi.fn();
const writer = new NodeWorkerWriter(port, { onEnd });

expect(await writer.return()).toStrictEqual(makeDoneResult());
expect(onEnd).toHaveBeenCalledTimes(1);
expect(await writer.return()).toStrictEqual(makeDoneResult());
expect(onEnd).toHaveBeenCalledTimes(1);
});
});

describe('NodeWorkerDuplexStream', () => {
const makeDuplexStream = async (
port = makeMockNodePort(),
validateInput?: ValidateInput<number>,
): Promise<NodeWorkerDuplexStream<number>> => {
const duplexStreamP = NodeWorkerDuplexStream.make<number>(
port,
validateInput,
);
port.messageHandler?.(makeAck());
return await duplexStreamP;
};

it('constructs a NodeWorkerDuplexStream', async () => {
const duplexStream = await makeDuplexStream();

expect(duplexStream).toBeInstanceOf(NodeWorkerDuplexStream);
expect(duplexStream[Symbol.asyncIterator]()).toBe(duplexStream);
});

it('calls validateInput with received input if specified', async () => {
const validateInput = vi
.fn()
.mockReturnValue(true) as unknown as ValidateInput<number>;
const port = makeMockNodePort();
const duplexStream = await makeDuplexStream(port, validateInput);

port.messageHandler?.(42);

expect(await duplexStream.next()).toStrictEqual(makePendingResult(42));
expect(validateInput).toHaveBeenCalledWith(42);
});

it('ends the reader when the writer ends', async () => {
const port = makeMockNodePort();
port.postMessage
.mockImplementationOnce(() => undefined)
.mockImplementationOnce(() => {
throw new Error('foo');
});
const duplexStream = await makeDuplexStream(port);

await expect(duplexStream.write(42)).rejects.toThrow(
'NodeWorkerDuplexStream experienced a dispatch failure',
);
expect(await duplexStream.next()).toStrictEqual(makeDoneResult());
});

it('ends the writer when the reader ends', async () => {
const port = makeMockNodePort();
const duplexStream = await makeDuplexStream(port);

const readP = duplexStream.next();
port.messageHandler?.(makeStreamDoneSignal());
await delay(10);
expect(await duplexStream.write(42)).toStrictEqual(makeDoneResult());
expect(await readP).toStrictEqual(makeDoneResult());
});
});

describe('NodeWorkerMultiplexer', () => {
it('constructs a NodeWorkerMultiplexer', () => {
const port = makeMockNodePort();
const multiplexer = new NodeWorkerMultiplexer(port);

expect(multiplexer).toBeInstanceOf(NodeWorkerMultiplexer);
});

it('can create and drain channels', async () => {
const port = makeMockNodePort();
const multiplexer = new NodeWorkerMultiplexer(port);
const ch1Handler = vi.fn();
const ch1 = multiplexer.createChannel<number, number>(
'1',
(value: unknown): value is number => typeof value === 'number',
);

const drainP = Promise.all([multiplexer.start(), ch1.drain(ch1Handler)]);
port.messageHandler?.(makeAck());
port.messageHandler?.(makeMultiplexEnvelope('1', makeAck()));
port.messageHandler?.(makeMultiplexEnvelope('1', 42));
port.messageHandler?.(makeStreamDoneSignal());

await drainP;
expect(ch1Handler).toHaveBeenCalledWith(42);
});
});
122 changes: 122 additions & 0 deletions packages/streams/src/node/NodeWorkerStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/**
* @module Node Worker streams
*/

import {
BaseDuplexStream,
makeDuplexStreamInputValidator,
} from '../BaseDuplexStream.js';
import type {
BaseReaderArgs,
BaseWriterArgs,
ValidateInput,
} from '../BaseStream.js';
import { BaseReader, BaseWriter } from '../BaseStream.js';
import {
isMultiplexEnvelope,
StreamMultiplexer,
} from '../StreamMultiplexer.js';
import type { Dispatchable } from '../utils.js';

export type OnMessage = (message: unknown) => void;

export type NodePort = {
on: (event: 'message', listener: OnMessage) => void;
postMessage: (message: unknown) => void;
};

/**
* A readable stream over a {@link NodePort}.
*
* @see
* - {@link NodeWorkerWriter} for the corresponding writable stream.
* - The module-level documentation for more details.
*/
export class NodeWorkerReader<Read> extends BaseReader<Read> {
constructor(
port: NodePort,
{ validateInput, onEnd }: BaseReaderArgs<Read> = {},
) {
super({
validateInput,
onEnd: async () => await onEnd?.(),
});

const receiveInput = super.getReceiveInput();
port.on('message', (data) => {
receiveInput(data).catch(async (error) => this.throw(error));
});
harden(this);
}
}
harden(NodeWorkerReader);

/**
* A writable stream over a {@link NodeWorker}.
*
* @see
* - {@link NodeWorkerReader} for the corresponding readable stream.
* - The module-level documentation for more details.
*/
export class NodeWorkerWriter<Write> extends BaseWriter<Write> {
constructor(
port: NodePort,
{ name, onEnd }: Omit<BaseWriterArgs<Write>, 'onDispatch'> = {},
) {
super({
name,
onDispatch: (value: Dispatchable<Write>) => port.postMessage(value),
onEnd: async () => {
await onEnd?.();
},
});
harden(this);
}
}
harden(NodeWorkerWriter);

export class NodeWorkerDuplexStream<
Read,
Write = Read,
> extends BaseDuplexStream<
Read,
NodeWorkerReader<Read>,
Write,
NodeWorkerWriter<Write>
> {
constructor(port: NodePort, validateInput?: ValidateInput<Read>) {
let writer: NodeWorkerWriter<Write>; // eslint-disable-line prefer-const
const reader = new NodeWorkerReader<Read>(port, {
name: 'NodeWorkerDuplexStream',
validateInput: makeDuplexStreamInputValidator(validateInput),
onEnd: async () => {
await writer.return();
},
});
writer = new NodeWorkerWriter<Write>(port, {
name: 'NodeWorkerDuplexStream',
onEnd: async () => {
await reader.return();
},
});
super(reader, writer);
}

static async make<Read, Write = Read>(
port: NodePort,
validateInput?: ValidateInput<Read>,
): Promise<NodeWorkerDuplexStream<Read, Write>> {
const stream = new NodeWorkerDuplexStream<Read, Write>(port, validateInput);
await stream.synchronize();
return stream;
}
}
harden(NodeWorkerDuplexStream);

export class NodeWorkerMultiplexer extends StreamMultiplexer {
constructor(port: NodePort, name?: string) {
super(new NodeWorkerDuplexStream(port, isMultiplexEnvelope), name);
harden(this);
}
}
harden(NodeWorkerMultiplexer);
Loading