From fc6eb1b74914d8358f940fae788c2ac6e0bc79a6 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 29 Jul 2019 16:55:48 +0200 Subject: [PATCH 1/2] 3.x: Fix mergeWith not cancelling the other source if the main errors --- .../FlowableMergeWithCompletable.java | 2 +- .../flowable/FlowableMergeWithMaybe.java | 2 +- .../flowable/FlowableMergeWithSingle.java | 2 +- .../ObservableMergeWithCompletable.java | 2 +- .../observable/ObservableMergeWithMaybe.java | 2 +- .../observable/ObservableMergeWithSingle.java | 2 +- .../FlowableMergeWithCompletableTest.java | 40 ++++++++++++++++++- .../flowable/FlowableMergeWithMaybeTest.java | 38 +++++++++++++++++- .../flowable/FlowableMergeWithSingleTest.java | 36 +++++++++++++++++ .../ObservableMergeWithCompletableTest.java | 36 +++++++++++++++++ .../ObservableMergeWithMaybeTest.java | 35 ++++++++++++++++ .../ObservableMergeWithSingleTest.java | 36 +++++++++++++++++ 12 files changed, 224 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletable.java index 271bd9c50d..c65386bbb1 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletable.java @@ -86,7 +86,7 @@ public void onNext(T t) { @Override public void onError(Throwable ex) { - SubscriptionHelper.cancel(mainSubscription); + DisposableHelper.dispose(otherObserver); HalfSerializer.onError(downstream, ex, this, error); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybe.java index a32a0c92fc..1787d5fce3 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybe.java @@ -143,7 +143,7 @@ public void onNext(T t) { @Override public void onError(Throwable ex) { if (error.addThrowable(ex)) { - SubscriptionHelper.cancel(mainSubscription); + DisposableHelper.dispose(otherObserver); drain(); } else { RxJavaPlugins.onError(ex); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingle.java index 586bc07c07..486cb73f8c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingle.java @@ -143,7 +143,7 @@ public void onNext(T t) { @Override public void onError(Throwable ex) { if (error.addThrowable(ex)) { - SubscriptionHelper.cancel(mainSubscription); + DisposableHelper.dispose(otherObserver); drain(); } else { RxJavaPlugins.onError(ex); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletable.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletable.java index fa020b6ae4..3b9e649062 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletable.java @@ -80,7 +80,7 @@ public void onNext(T t) { @Override public void onError(Throwable ex) { - DisposableHelper.dispose(mainDisposable); + DisposableHelper.dispose(otherObserver); HalfSerializer.onError(downstream, ex, this, error); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybe.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybe.java index 23b2532d9b..e7caad3b21 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybe.java @@ -106,7 +106,7 @@ public void onNext(T t) { @Override public void onError(Throwable ex) { if (error.addThrowable(ex)) { - DisposableHelper.dispose(mainDisposable); + DisposableHelper.dispose(otherObserver); drain(); } else { RxJavaPlugins.onError(ex); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingle.java index 20c4d21b5c..7332a29a25 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingle.java @@ -106,7 +106,7 @@ public void onNext(T t) { @Override public void onError(Throwable ex) { if (error.addThrowable(ex)) { - DisposableHelper.dispose(mainDisposable); + DisposableHelper.dispose(otherObserver); drain(); } else { RxJavaPlugins.onError(ex); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletableTest.java index eab8439e0b..9861ecfaf1 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletableTest.java @@ -13,10 +13,10 @@ package io.reactivex.internal.operators.flowable; -import org.junit.Test; - import static org.junit.Assert.*; +import org.junit.Test; + import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Action; @@ -137,4 +137,40 @@ public void run() { ts.assertResult(1); } } + + @Test + public void cancelOtherOnMainError() { + PublishProcessor pp = PublishProcessor.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestSubscriber ts = pp.mergeWith(cs).test(); + + assertTrue(pp.hasSubscribers()); + assertTrue(cs.hasObservers()); + + pp.onError(new TestException()); + + ts.assertFailure(TestException.class); + + assertFalse("main has observers!", pp.hasSubscribers()); + assertFalse("other has observers", cs.hasObservers()); + } + + @Test + public void cancelMainOnOhterError() { + PublishProcessor pp = PublishProcessor.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestSubscriber ts = pp.mergeWith(cs).test(); + + assertTrue(pp.hasSubscribers()); + assertTrue(cs.hasObservers()); + + cs.onError(new TestException()); + + ts.assertFailure(TestException.class); + + assertFalse("main has observers!", pp.hasSubscribers()); + assertFalse("other has observers", cs.hasObservers()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybeTest.java index 0585a65543..b1de4985ec 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybeTest.java @@ -28,7 +28,7 @@ import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; -import io.reactivex.subjects.MaybeSubject; +import io.reactivex.subjects.*; import io.reactivex.subscribers.TestSubscriber; import io.reactivex.testsupport.TestHelper; @@ -402,4 +402,40 @@ public void onNext(Integer t) { ts.assertValueCount(Flowable.bufferSize()); ts.assertComplete(); } + + @Test + public void cancelOtherOnMainError() { + PublishProcessor pp = PublishProcessor.create(); + MaybeSubject ms = MaybeSubject.create(); + + TestSubscriber ts = pp.mergeWith(ms).test(); + + assertTrue(pp.hasSubscribers()); + assertTrue(ms.hasObservers()); + + pp.onError(new TestException()); + + ts.assertFailure(TestException.class); + + assertFalse("main has observers!", pp.hasSubscribers()); + assertFalse("other has observers", ms.hasObservers()); + } + + @Test + public void cancelMainOnOhterError() { + PublishProcessor pp = PublishProcessor.create(); + MaybeSubject ms = MaybeSubject.create(); + + TestSubscriber ts = pp.mergeWith(ms).test(); + + assertTrue(pp.hasSubscribers()); + assertTrue(ms.hasObservers()); + + ms.onError(new TestException()); + + ts.assertFailure(TestException.class); + + assertFalse("main has observers!", pp.hasSubscribers()); + assertFalse("other has observers", ms.hasObservers()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingleTest.java index 180384ff1a..bbba5798ee 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingleTest.java @@ -398,4 +398,40 @@ public void onNext(Integer t) { ts.assertValueCount(Flowable.bufferSize()); ts.assertComplete(); } + + @Test + public void cancelOtherOnMainError() { + PublishProcessor pp = PublishProcessor.create(); + SingleSubject ss = SingleSubject.create(); + + TestSubscriber ts = pp.mergeWith(ss).test(); + + assertTrue(pp.hasSubscribers()); + assertTrue(ss.hasObservers()); + + pp.onError(new TestException()); + + ts.assertFailure(TestException.class); + + assertFalse("main has observers!", pp.hasSubscribers()); + assertFalse("other has observers", ss.hasObservers()); + } + + @Test + public void cancelMainOnOhterError() { + PublishProcessor pp = PublishProcessor.create(); + SingleSubject ss = SingleSubject.create(); + + TestSubscriber ts = pp.mergeWith(ss).test(); + + assertTrue(pp.hasSubscribers()); + assertTrue(ss.hasObservers()); + + ss.onError(new TestException()); + + ts.assertFailure(TestException.class); + + assertFalse("main has observers!", pp.hasSubscribers()); + assertFalse("other has observers", ss.hasObservers()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletableTest.java index 76c939bc76..6193798283 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletableTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletableTest.java @@ -136,4 +136,40 @@ protected void subscribeActual(Observer observer) { .test() .assertResult(1); } + + @Test + public void cancelOtherOnMainError() { + PublishSubject ps = PublishSubject.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.mergeWith(cs).test(); + + assertTrue(ps.hasObservers()); + assertTrue(cs.hasObservers()); + + ps.onError(new TestException()); + + to.assertFailure(TestException.class); + + assertFalse("main has observers!", ps.hasObservers()); + assertFalse("other has observers", cs.hasObservers()); + } + + @Test + public void cancelMainOnOhterError() { + PublishSubject ps = PublishSubject.create(); + CompletableSubject cs = CompletableSubject.create(); + + TestObserver to = ps.mergeWith(cs).test(); + + assertTrue(ps.hasObservers()); + assertTrue(cs.hasObservers()); + + cs.onError(new TestException()); + + to.assertFailure(TestException.class); + + assertFalse("main has observers!", ps.hasObservers()); + assertFalse("other has observers", cs.hasObservers()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybeTest.java index fec139f169..3519a5247b 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybeTest.java @@ -273,4 +273,39 @@ public void onNext(Integer t) { to.assertResult(0, 1, 2, 3, 4); } + @Test + public void cancelOtherOnMainError() { + PublishSubject ps = PublishSubject.create(); + MaybeSubject ms = MaybeSubject.create(); + + TestObserver to = ps.mergeWith(ms).test(); + + assertTrue(ps.hasObservers()); + assertTrue(ms.hasObservers()); + + ps.onError(new TestException()); + + to.assertFailure(TestException.class); + + assertFalse("main has observers!", ps.hasObservers()); + assertFalse("other has observers", ms.hasObservers()); + } + + @Test + public void cancelMainOnOhterError() { + PublishSubject ps = PublishSubject.create(); + MaybeSubject ms = MaybeSubject.create(); + + TestObserver to = ps.mergeWith(ms).test(); + + assertTrue(ps.hasObservers()); + assertTrue(ms.hasObservers()); + + ms.onError(new TestException()); + + to.assertFailure(TestException.class); + + assertFalse("main has observers!", ps.hasObservers()); + assertFalse("other has observers", ms.hasObservers()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingleTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingleTest.java index cf5710ad00..16abe4e30f 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingleTest.java @@ -264,4 +264,40 @@ public void onNext(Integer t) { to.assertResult(0, 1, 2, 3, 4); } + + @Test + public void cancelOtherOnMainError() { + PublishSubject ps = PublishSubject.create(); + SingleSubject ss = SingleSubject.create(); + + TestObserver to = ps.mergeWith(ss).test(); + + assertTrue(ps.hasObservers()); + assertTrue(ss.hasObservers()); + + ps.onError(new TestException()); + + to.assertFailure(TestException.class); + + assertFalse("main has observers!", ps.hasObservers()); + assertFalse("other has observers", ss.hasObservers()); + } + + @Test + public void cancelMainOnOhterError() { + PublishSubject ps = PublishSubject.create(); + SingleSubject ss = SingleSubject.create(); + + TestObserver to = ps.mergeWith(ss).test(); + + assertTrue(ps.hasObservers()); + assertTrue(ss.hasObservers()); + + ss.onError(new TestException()); + + to.assertFailure(TestException.class); + + assertFalse("main has observers!", ps.hasObservers()); + assertFalse("other has observers", ss.hasObservers()); + } } From 76583a1bec3cc9f58d687935b4da6423b117ca94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Mon, 29 Jul 2019 19:58:10 +0200 Subject: [PATCH 2/2] Fix typo in the test name --- .../operators/flowable/FlowableMergeWithCompletableTest.java | 2 +- .../internal/operators/flowable/FlowableMergeWithMaybeTest.java | 2 +- .../operators/flowable/FlowableMergeWithSingleTest.java | 2 +- .../observable/ObservableMergeWithCompletableTest.java | 2 +- .../operators/observable/ObservableMergeWithMaybeTest.java | 2 +- .../operators/observable/ObservableMergeWithSingleTest.java | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletableTest.java index 9861ecfaf1..b97fb77d65 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithCompletableTest.java @@ -157,7 +157,7 @@ public void cancelOtherOnMainError() { } @Test - public void cancelMainOnOhterError() { + public void cancelMainOnOtherError() { PublishProcessor pp = PublishProcessor.create(); CompletableSubject cs = CompletableSubject.create(); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybeTest.java index b1de4985ec..5b41573d21 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithMaybeTest.java @@ -422,7 +422,7 @@ public void cancelOtherOnMainError() { } @Test - public void cancelMainOnOhterError() { + public void cancelMainOnOtherError() { PublishProcessor pp = PublishProcessor.create(); MaybeSubject ms = MaybeSubject.create(); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingleTest.java index bbba5798ee..4534b6d918 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeWithSingleTest.java @@ -418,7 +418,7 @@ public void cancelOtherOnMainError() { } @Test - public void cancelMainOnOhterError() { + public void cancelMainOnOtherError() { PublishProcessor pp = PublishProcessor.create(); SingleSubject ss = SingleSubject.create(); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletableTest.java index 6193798283..1d138993ea 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletableTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithCompletableTest.java @@ -156,7 +156,7 @@ public void cancelOtherOnMainError() { } @Test - public void cancelMainOnOhterError() { + public void cancelMainOnOtherError() { PublishSubject ps = PublishSubject.create(); CompletableSubject cs = CompletableSubject.create(); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybeTest.java index 3519a5247b..e11c925cef 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithMaybeTest.java @@ -292,7 +292,7 @@ public void cancelOtherOnMainError() { } @Test - public void cancelMainOnOhterError() { + public void cancelMainOnOtherError() { PublishSubject ps = PublishSubject.create(); MaybeSubject ms = MaybeSubject.create(); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingleTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingleTest.java index 16abe4e30f..cad2dad3bd 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableMergeWithSingleTest.java @@ -284,7 +284,7 @@ public void cancelOtherOnMainError() { } @Test - public void cancelMainOnOhterError() { + public void cancelMainOnOtherError() { PublishSubject ps = PublishSubject.create(); SingleSubject ss = SingleSubject.create();