From b74203d70fbece4c985dc58ac263ba4d5d3c53dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 8 Jan 2020 19:47:44 +0100 Subject: [PATCH] 3.x: Fix Flowable.flatMap not canceling the inner sources on outer error --- .../operators/flowable/FlowableFlatMap.java | 5 +++ .../flowable/FlowableFlatMapTest.java | 34 ++++++++++++++++++ .../observable/ObservableFlatMapTest.java | 36 ++++++++++++++++++- 3 files changed, 74 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMap.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMap.java index ea6a4f76ed..0a671cf88a 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMap.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMap.java @@ -321,6 +321,11 @@ public void onError(Throwable t) { } if (errors.tryAddThrowableOrReport(t)) { done = true; + if (!delayErrors) { + for (InnerSubscriber a : subscribers.getAndSet(CANCELLED)) { + a.dispose(); + } + } drain(); } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapTest.java index 068f652db0..6df4dc7211 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapTest.java @@ -1116,4 +1116,38 @@ public Publisher apply(Integer v) throws Throwable { } }); } + + @Test + public void mainErrorsInnerCancelled() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + pp1 + .flatMap(v -> pp2) + .test(); + + pp1.onNext(1); + assertTrue("No subscribers?", pp2.hasSubscribers()); + + pp1.onError(new TestException()); + + assertFalse("Has subscribers?", pp2.hasSubscribers()); + } + + @Test + public void innerErrorsMainCancelled() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + pp1 + .flatMap(v -> pp2) + .test(); + + pp1.onNext(1); + assertTrue("No subscribers?", pp2.hasSubscribers()); + + pp2.onError(new TestException()); + + assertFalse("Has subscribers?", pp1.hasSubscribers()); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMapTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMapTest.java index 00f2523a33..8dca948c81 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMapTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMapTest.java @@ -26,7 +26,7 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Observer; -import io.reactivex.rxjava3.disposables.*; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.functions.*; import io.reactivex.rxjava3.internal.functions.Functions; @@ -1079,4 +1079,38 @@ public Observable apply(Integer v) throws Throwable { } }); } + + @Test + public void mainErrorsInnerCancelled() { + PublishSubject ps1 = PublishSubject.create(); + PublishSubject ps2 = PublishSubject.create(); + + ps1 + .flatMap(v -> ps2) + .test(); + + ps1.onNext(1); + assertTrue("No subscribers?", ps2.hasObservers()); + + ps1.onError(new TestException()); + + assertFalse("Has subscribers?", ps2.hasObservers()); + } + + @Test + public void innerErrorsMainCancelled() { + PublishSubject ps1 = PublishSubject.create(); + PublishSubject ps2 = PublishSubject.create(); + + ps1 + .flatMap(v -> ps2) + .test(); + + ps1.onNext(1); + assertTrue("No subscribers?", ps2.hasObservers()); + + ps2.onError(new TestException()); + + assertFalse("Has subscribers?", ps1.hasObservers()); + } }