-
Notifications
You must be signed in to change notification settings - Fork 6
feat(streams): Add stream multiplexer #221
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Also tidy up various asynchronous operations in base stream classes. These were causing promises returned from the stream multiplexer to fulfill erratically.
Get rid of the `makeChannels()` contrivance in favor of exposing a singular `addChannel()`. Also expose `start()` function in order to enable either "manually" draining channels or using the `drainAll()` method.
Also make `synchronize()` method of duplex streams optional.
Also fix issues in tests following rebase.
b0b0d32 to
bac8db2
Compare
sirtimid
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat! 👌
| // eslint-disable-next-line @typescript-eslint/await-thenable | ||
| await null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't we have a rule to allow this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it didn't get merged, and I think it's fine to disable it when necessary? If someone feels strongly about it (cc: @FUDCo) I'm down to just disable it in our lint config.
Ref: #218
Adds a new class
StreamMultiplexerto@ocap/streams. A multiplexer is not a stream itself, but rather a wrapper around a duplex stream. The multiplexer provides methods for creating "channels" over the underlying stream, which are themselves duplex streams and may have a different message type and validation logic.The multiplexer is constructed in an idle state, and must be explicitly "started" via the
start()ordrainAll()methods. All channels must be added before the multiplexer is started.Starting the multiplexer will synchronize the underlying duplex stream, if it is synchronizable. Therefore, in order to prevent message loss, callers should not synchronize the underlying duplex stream before passing it to the multiplexer. For the same reason, the multiplexer will throw if any channels are added after it has started. We could enable "dynamic" channel creation by synchronization the channels themselves, but only if there's a clear need.
See tests for example usage.