From 1e3955ad9f7418c3cae4a8af2ebd988b9d8dfd55 Mon Sep 17 00:00:00 2001 From: David Moten Date: Fri, 16 Aug 2019 14:31:52 +1000 Subject: [PATCH 1/5] constrain upstream requests by FlowableElementAtMaybe --- src/main/java/io/reactivex/Flowable.java | 2 +- .../flowable/FlowableElementAtMaybe.java | 2 +- .../flowable/FlowableElementAtTest.java | 17 +++++++++++++++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 574eb37a50..c81c031865 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -9507,7 +9507,7 @@ public final Flowable doOnTerminate(final Action onTerminate) { * @see ReactiveX operators documentation: ElementAt */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Maybe elementAt(long index) { if (index < 0) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java index 4d411990ac..3ef874d5b5 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtMaybe.java @@ -63,7 +63,7 @@ public void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.upstream, s)) { this.upstream = s; downstream.onSubscribe(this); - s.request(Long.MAX_VALUE); + s.request(index + 1); } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java index 27b967d15a..98e3b15230 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java @@ -26,6 +26,7 @@ import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.functions.LongConsumer; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; @@ -69,6 +70,22 @@ public void elementAt() { assertEquals(2, Flowable.fromArray(1, 2).elementAt(1).blockingGet() .intValue()); } + + @Test + public void elementAtConstrainsUpstreamRequests() { + final List requests = new ArrayList(); + Flowable.fromArray(1, 2, 3, 4) + .doOnRequest(new LongConsumer() { + @Override + public void accept(long n) throws Throwable { + requests.add(n); + } + }) + .elementAt(2) + .blockingGet() + .intValue(); + assertEquals(Arrays.asList(3L), requests); + } @Test(expected = IndexOutOfBoundsException.class) public void elementAtWithMinusIndex() { From eb41d6f0d6883dc670994cc4bbf56916eaf79454 Mon Sep 17 00:00:00 2001 From: David Moten Date: Fri, 16 Aug 2019 14:39:23 +1000 Subject: [PATCH 2/5] constrain upstream requests by FlowableElementAtSingle --- src/main/java/io/reactivex/Flowable.java | 2 +- .../flowable/FlowableElementAtSingle.java | 2 +- .../flowable/FlowableElementAtTest.java | 16 ++++++++++++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index c81c031865..8b8b5252a4 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -9541,7 +9541,7 @@ public final Maybe elementAt(long index) { */ @CheckReturnValue @NonNull - @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Single elementAt(long index, T defaultItem) { if (index < 0) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java index 7cd542d497..e488724dac 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java @@ -70,7 +70,7 @@ public void onSubscribe(Subscription s) { if (SubscriptionHelper.validate(this.upstream, s)) { this.upstream = s; downstream.onSubscribe(this); - s.request(Long.MAX_VALUE); + s.request(index + 1); } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java index 98e3b15230..06530b329b 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java @@ -86,6 +86,22 @@ public void accept(long n) throws Throwable { .intValue(); assertEquals(Arrays.asList(3L), requests); } + + @Test + public void elementAtWithDefaultConstrainsUpstreamRequests() { + final List requests = new ArrayList(); + Flowable.fromArray(1, 2, 3, 4) + .doOnRequest(new LongConsumer() { + @Override + public void accept(long n) throws Throwable { + requests.add(n); + } + }) + .elementAt(2, 100) + .blockingGet() + .intValue(); + assertEquals(Arrays.asList(3L), requests); + } @Test(expected = IndexOutOfBoundsException.class) public void elementAtWithMinusIndex() { From a995bd3dee4f3028ff2f157f1ebfc9bffdc606d2 Mon Sep 17 00:00:00 2001 From: David Moten Date: Fri, 16 Aug 2019 14:43:13 +1000 Subject: [PATCH 3/5] update first(default) and firstElement BackpressureSupport type to reflect upstream request constraints --- src/main/java/io/reactivex/Flowable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 8b8b5252a4..73259dceab 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -9627,7 +9627,7 @@ public final Flowable filter(Predicate predicate) { * @see ReactiveX operators documentation: First */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN + @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Maybe firstElement() { return elementAt(0); @@ -9653,7 +9653,7 @@ public final Maybe firstElement() { * @see ReactiveX operators documentation: First */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN + @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Single first(T defaultItem) { return elementAt(0, defaultItem); From 2cc2068728f2f854118a9ac12bb6bd11b34ae62a Mon Sep 17 00:00:00 2001 From: David Moten Date: Fri, 16 Aug 2019 14:44:17 +1000 Subject: [PATCH 4/5] set BackpressureSupport for Flowable.firstOrError to FULL --- src/main/java/io/reactivex/Flowable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 73259dceab..ff062625fd 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -9676,7 +9676,7 @@ public final Single first(T defaultItem) { * @see ReactiveX operators documentation: First */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN + @BackpressureSupport(BackpressureKind.FULL) // take may trigger UNBOUNDED_IN @SchedulerSupport(SchedulerSupport.NONE) public final Single firstOrError() { return elementAtOrError(0); From 7f120bfc51d978cc083c71c00ad4eb352a2af080 Mon Sep 17 00:00:00 2001 From: David Moten Date: Fri, 16 Aug 2019 15:14:28 +1000 Subject: [PATCH 5/5] update javadoc on backpressure for elementAt and first --- src/main/java/io/reactivex/Flowable.java | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index ff062625fd..9f45a1a610 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -9494,8 +9494,7 @@ public final Flowable doOnTerminate(final Action onTerminate) { * *
*
Backpressure:
- *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner - * (i.e., no backpressure applied to it).
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.
*
Scheduler:
*
{@code elementAt} does not operate by default on a particular {@link Scheduler}.
*
@@ -9523,8 +9522,7 @@ public final Maybe elementAt(long index) { * *
*
Backpressure:
- *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner - * (i.e., no backpressure applied to it).
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.
*
Scheduler:
*
{@code elementAt} does not operate by default on a particular {@link Scheduler}.
*
@@ -9558,8 +9556,7 @@ public final Single elementAt(long index, T defaultItem) { * *
*
Backpressure:
- *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner - * (i.e., no backpressure applied to it).
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.
*
Scheduler:
*
{@code elementAtOrError} does not operate by default on a particular {@link Scheduler}.
*
@@ -9573,7 +9570,7 @@ public final Single elementAt(long index, T defaultItem) { * @see ReactiveX operators documentation: ElementAt */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Single elementAtOrError(long index) { if (index < 0) { @@ -9617,8 +9614,7 @@ public final Flowable filter(Predicate predicate) { * *
*
Backpressure:
- *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an - * unbounded manner (i.e., without applying backpressure).
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.
*
Scheduler:
*
{@code firstElement} does not operate by default on a particular {@link Scheduler}.
*
@@ -9640,8 +9636,7 @@ public final Maybe firstElement() { * *
*
Backpressure:
- *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an - * unbounded manner (i.e., without applying backpressure).
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.
*
Scheduler:
*
{@code first} does not operate by default on a particular {@link Scheduler}.
*
@@ -9666,8 +9661,7 @@ public final Single first(T defaultItem) { * *
*
Backpressure:
- *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in an - * unbounded manner (i.e., without applying backpressure).
+ *
The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.
*
Scheduler:
*
{@code firstOrError} does not operate by default on a particular {@link Scheduler}.
*