Publishers cleanup and new functions#127
Conversation
#### Problem `Publishers.retry()` had the following bugs: - Retry subscription was sending duplicate `onSubscribe` callbacks to the downstream subscribers. This is illegal according to spec. - Flow control was not properly managed if the error happens after a few emissions. Since we have removed retry functionality from the `ClientBuilder` there is no reason to maintain this code. `Publishers` was not wiring the downstream subscriber cancellation to the cancellation of the `Subscriber`. This means that a cancel followed by `onNext` would not have stopped the emission, which although not necessary (because of the inherent race between cancel and emissions) but is good to have. #### Modification - Removed `RetrySocket` and `Publishers.retry()` since they are not used. - Added (moved from `PublisherUtils`) `.empty()`, `.just()` and `.error()` to `Publishers` - Added `concatEmpty()` to `Publishers`. This is required in places where we concat `Publisher<Void>` in custom crafter code. (I will send another PR which is to use this operator in `ReactiveSocket.close()` - Added tests for functions in `Publishers`.
|
@stevegury @yschimke I am merging this as I need these changes for another fix. Feel free to review and comment and I will send another PR to address the comments. There isn't anything controversial in this PR I believe. |
| } | ||
|
|
||
| @Override | ||
| public void onError(Throwable t) { |
There was a problem hiding this comment.
nit: for this sort of class making the AtomicBooleans private and methods that access them final seems prudent.
There was a problem hiding this comment.
That is a good point. I changed this error prone approach to remove inheritance and use lambdas for callbacks. PR #137 does this cleanup, thanks!
#### Problem `Publishers` was using different implementations of `CancellableSubscriber` to provide different callbacks using inheritance. eg: by providing methods like `doOnNext`, `doOnCancel`, etc. This is problematic as it requires calls to `super.do*` by implementator. Also, it created unnecessary class hierarchy with little benefit. #### Modification Rolled up all implementations into a single class that takes various callbacks as different lambdas. This is easier to use and there are less chances of errors when the implementor forgot to call `super` methods. Thanks @yschimke for the comment in PR rsocket#127 which led me to change the approach.
| super.onSubscribe(s); | ||
| } else { | ||
| onError(new IllegalStateException("Duplicate subscription.")); | ||
| } |
There was a problem hiding this comment.
this is violates the Reactive Streams specification Rule 2.5:
A
SubscriberMUST callSubscription.cancel()on the givenSubscriptionafter anonSubscribesignal if it already has an activeSubscription.
Please use the TCK each time you implement a custom Subscriber / Publisher, as there's plenty cases like this which are well defined in the spec but easy to forget if one does not test for them. The TCK covers the vast majority of such cases: https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck
There was a problem hiding this comment.
Thanks @ktoso for the review!
Yes, one of my intentions behind this refactor was to centralize the creation of Subscriber and Publisher in the project so that we can easily verify the behaviors. Currently we create lambdas all over the code base which are hard to test and have bugs related to violating the spec (eg: #136 ). I intend to add tck validation tests for these subscribers and publishers to easily validate these conditions.
There was a problem hiding this comment.
Very glad to hear that 👍
I'll keep my eye on the repo, hopefully being able to provide some useful feedback :)
Problem Publishers was using different implementations of CancellableSubscriber to provide different callbacks using inheritance. eg: by providing methods like doOnNext, doOnCancel, etc. This is problematic as it requires calls to super.do* by implementator. Also, it created unnecessary class hierarchy with little benefit. (Edit: Adding more context from the comments below) There were 3 classes CancellableSubscriber, SafeCancellableSubscriber and SafeCancellableSubscriberProxy, each of them would override some of the methods from Subscriber. eg: doOnCancel was something that was to be used by SafeCancellableSubscriber and CancellableSubscriber in a way that SafeCancellableSubscriber had to implement doOnCancel and provide yet another method doOnCancel0 which if implemented by SafeCancellableSubscriberProxy will have to provide yet another protected method. So, I figured not only the approach was error prone it was painful to code and maintain. Thus this approach where in specific callbacks can be overridden and controlled by the caller. Modification Rolled up all implementations into a single class that takes various callbacks as different lambdas. This is easier to use and there are less chances of errors when the implementor forgot to call super methods. Thanks @yschimke for the comment in PR #127 which led me to change the approach.
* Pull out framing into own section. Move StreamID to start RS Header. Reduce size of frame length, frame type, flags, and metadata length. Remove REQUEST_SUB. Add resume fields to SETUP and KEEPALIVE. * remove topic subscription from FAQ and Motivations. * update number of interaction models. * incorporate comments. * h2 merging of data frames means framing must be preserved.
Problem
Publishers.retry()had the following bugs:onSubscribecallbacks to the downstream subscribers. This is illegal according to spec.Since we have removed retry functionality from the
ClientBuilderthere is no reason to maintain this code.Apart from the above,
Publisherswas not wiring the downstream subscriber cancellation to the cancellation of theSubscriber. This means that a cancel followed byonNextwould not have stopped the emission, which although not necessary (because of the inherent race between cancel and emissions) but is good to have.Modification
RetrySocketandPublishers.retry()since they are not used.PublisherUtils).empty(),.just()and.error()toPublishersconcatEmpty()toPublishers. This is required in places where we concatPublisher<Void>in custom crafter code. (I will send another PR which is to use this operator inReactiveSocket.close()Publishers.