From 014456fdd80a86b32dc62baf52d4066d5ca1489e Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 14 Jun 2016 15:44:14 +0200 Subject: [PATCH 1/2] 1.x: new hook management proposal --- src/main/java/rx/Completable.java | 54 +- src/main/java/rx/Observable.java | 28 +- src/main/java/rx/Single.java | 44 +- .../AssemblyStackTraceException.java | 52 ++ .../java/rx/exceptions/OnErrorThrowable.java | 1 + .../CompletableOnSubscribeConcat.java | 6 +- .../CompletableOnSubscribeMerge.java | 12 +- .../CompletableOnSubscribeMergeArray.java | 6 +- .../CompletableOnSubscribeMergeIterable.java | 10 +- .../CompletableOnSubscribeTimeout.java | 4 +- .../operators/OnSubscribeCombineLatest.java | 4 +- .../operators/OnSubscribeConcatMap.java | 4 +- .../OnSubscribeDelaySubscriptionOther.java | 2 +- .../internal/operators/OnSubscribeLift.java | 4 +- .../operators/OnSubscribeOnAssembly.java | 128 ++++ .../OnSubscribeOnAssemblyCompletable.java | 78 +++ .../OnSubscribeOnAssemblySingle.java | 74 +++ .../operators/OperatorDoAfterTerminate.java | 4 +- .../internal/operators/OperatorGroupBy.java | 4 +- .../operators/OperatorMaterialize.java | 8 +- .../internal/operators/OperatorObserveOn.java | 6 +- .../OperatorOnErrorResumeNextViaFunction.java | 4 +- .../rx/internal/operators/OperatorSwitch.java | 4 +- ...ngleOnSubscribeDelaySubscriptionOther.java | 9 +- .../operators/SingleOnSubscribeUsing.java | 6 +- .../internal/operators/SingleOperatorZip.java | 14 +- .../schedulers/ExecutorScheduler.java | 6 +- .../internal/schedulers/NewThreadWorker.java | 12 +- .../internal/schedulers/ScheduledAction.java | 4 +- .../rx/internal/util/RxJavaPluginUtils.java | 4 +- .../util/ScalarSynchronousObservable.java | 10 +- .../java/rx/observables/AsyncOnSubscribe.java | 4 +- .../java/rx/observables/SyncOnSubscribe.java | 17 +- .../RxJavaCompletableExecutionHook.java | 4 + .../java/rx/plugins/RxJavaErrorHandler.java | 3 +- src/main/java/rx/plugins/RxJavaHooks.java | 599 ++++++++++++++++++ .../RxJavaObservableExecutionHook.java | 5 + src/main/java/rx/plugins/RxJavaPlugins.java | 6 + .../java/rx/plugins/RxJavaSchedulersHook.java | 1 + .../rx/plugins/RxJavaSingleExecutionHook.java | 5 + src/main/java/rx/schedulers/Schedulers.java | 10 +- src/test/java/rx/CompletableTest.java | 39 +- src/test/java/rx/SingleTest.java | 67 +- .../util/ScalarSynchronousObservableTest.java | 31 +- .../java/rx/observers/SafeSubscriberTest.java | 12 +- src/test/java/rx/plugins/RxJavaHooksTest.java | 165 +++++ .../java/rx/plugins/RxJavaPluginsTest.java | 1 + .../rx/schedulers/ResetSchedulersTest.java | 1 + 48 files changed, 1351 insertions(+), 225 deletions(-) create mode 100644 src/main/java/rx/exceptions/AssemblyStackTraceException.java create mode 100644 src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java create mode 100644 src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java create mode 100644 src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java create mode 100644 src/main/java/rx/plugins/RxJavaHooks.java create mode 100644 src/test/java/rx/plugins/RxJavaHooksTest.java diff --git a/src/main/java/rx/Completable.java b/src/main/java/rx/Completable.java index b5f9baebb7..e9e7a38c2a 100644 --- a/src/main/java/rx/Completable.java +++ b/src/main/java/rx/Completable.java @@ -38,12 +38,6 @@ */ @Experimental public class Completable { - /** The error handler instance. */ - static final RxJavaErrorHandler ERROR_HANDLER = RxJavaPlugins.getInstance().getErrorHandler(); - - /** The completable hook. */ - static RxJavaCompletableExecutionHook HOOK = RxJavaPlugins.getInstance().getCompletableExecutionHook(); - /** * Callback used for building deferred computations that takes a CompletableSubscriber. */ @@ -146,7 +140,7 @@ public void onError(Throwable e) { set.unsubscribe(); s.onError(e); } else { - ERROR_HANDLER.handleError(e); + RxJavaHooks.onError(e); } } @@ -167,7 +161,7 @@ public void onSubscribe(Subscription d) { set.unsubscribe(); s.onError(npe); } else { - ERROR_HANDLER.handleError(npe); + RxJavaHooks.onError(npe); } return; } @@ -215,7 +209,7 @@ public void onError(Throwable e) { set.unsubscribe(); s.onError(e); } else { - ERROR_HANDLER.handleError(e); + RxJavaHooks.onError(e); } } @@ -256,7 +250,7 @@ public void onSubscribe(Subscription d) { set.unsubscribe(); s.onError(e); } else { - ERROR_HANDLER.handleError(e); + RxJavaHooks.onError(e); } return; } @@ -283,7 +277,7 @@ public void onSubscribe(Subscription d) { set.unsubscribe(); s.onError(e); } else { - ERROR_HANDLER.handleError(e); + RxJavaHooks.onError(e); } return; } @@ -294,7 +288,7 @@ public void onSubscribe(Subscription d) { set.unsubscribe(); s.onError(npe); } else { - ERROR_HANDLER.handleError(npe); + RxJavaHooks.onError(npe); } return; } @@ -386,7 +380,7 @@ public static Completable create(CompletableOnSubscribe onSubscribe) { } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { - ERROR_HANDLER.handleError(ex); + RxJavaHooks.onError(ex); throw toNpe(ex); } } @@ -908,7 +902,7 @@ void dispose() { try { disposer.call(resource); } catch (Throwable ex) { - ERROR_HANDLER.handleError(ex); + RxJavaHooks.onError(ex); } } } @@ -976,7 +970,7 @@ public void call() { * not null (not verified) */ protected Completable(CompletableOnSubscribe onSubscribe) { - this.onSubscribe = HOOK.onCreate(onSubscribe); + this.onSubscribe = RxJavaHooks.onCreate(onSubscribe); } /** @@ -1332,7 +1326,7 @@ public void onCompleted() { try { onAfterComplete.call(); } catch (Throwable e) { - ERROR_HANDLER.handleError(e); + RxJavaHooks.onError(e); } } @@ -1365,7 +1359,7 @@ public void call() { try { onUnsubscribe.call(); } catch (Throwable e) { - ERROR_HANDLER.handleError(e); + RxJavaHooks.onError(e); } d.unsubscribe(); } @@ -1543,7 +1537,7 @@ public final Completable lift(final CompletableOperator onLift) { @Override public void call(CompletableSubscriber s) { try { - CompletableOperator onLiftDecorated = HOOK.onLift(onLift); + CompletableOperator onLiftDecorated = RxJavaHooks.onCompletableLift(onLift); CompletableSubscriber sw = onLiftDecorated.call(s); unsafeSubscribe(sw); @@ -1868,7 +1862,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { - ERROR_HANDLER.handleError(e); + RxJavaHooks.onError(e); mad.unsubscribe(); deliverUncaughtException(e); } @@ -1885,7 +1879,7 @@ public void onSubscribe(Subscription d) { * Subscribes to this Completable and calls the given Action0 when this Completable * completes normally. *

- * If this Completable emits an error, it is sent to ERROR_HANDLER.handleError and gets swallowed. + * If this Completable emits an error, it is sent to RxJavaHooks.onError and gets swallowed. * @param onComplete the runnable called when this Completable completes normally * @return the Subscription that allows cancelling the subscription */ @@ -1902,7 +1896,7 @@ public void onCompleted() { try { onComplete.call(); } catch (Throwable e) { - ERROR_HANDLER.handleError(e); + RxJavaHooks.onError(e); deliverUncaughtException(e); } finally { mad.unsubscribe(); @@ -1912,7 +1906,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { - ERROR_HANDLER.handleError(e); + RxJavaHooks.onError(e); mad.unsubscribe(); deliverUncaughtException(e); } @@ -1961,7 +1955,7 @@ public void onError(Throwable e) { done = true; callOnError(e); } else { - ERROR_HANDLER.handleError(e); + RxJavaHooks.onError(e); deliverUncaughtException(e); } } @@ -1971,7 +1965,7 @@ void callOnError(Throwable e) { onError.call(e); } catch (Throwable ex) { e = new CompositeException(Arrays.asList(e, ex)); - ERROR_HANDLER.handleError(e); + RxJavaHooks.onError(e); deliverUncaughtException(e); } finally { mad.unsubscribe(); @@ -2000,15 +1994,15 @@ private static void deliverUncaughtException(Throwable e) { public final void unsafeSubscribe(CompletableSubscriber s) { requireNonNull(s); try { - CompletableOnSubscribe onSubscribeDecorated = HOOK.onSubscribeStart(this, this.onSubscribe); + CompletableOnSubscribe onSubscribeDecorated = RxJavaHooks.onCompletableStart(this, this.onSubscribe); onSubscribeDecorated.call(s); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - ex = HOOK.onSubscribeError(ex); - ERROR_HANDLER.handleError(ex); + ex = RxJavaHooks.onCompletableError(ex); + RxJavaHooks.onError(ex); throw toNpe(ex); } } @@ -2066,13 +2060,13 @@ public void onSubscribe(Subscription d) { s.add(d); } }); - RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(s); + RxJavaHooks.onObservableReturn(s); } catch (NullPointerException ex) { throw ex; } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - ex = HOOK.onSubscribeError(ex); - ERROR_HANDLER.handleError(ex); + ex = RxJavaHooks.onObservableError(ex); + RxJavaHooks.onError(ex); throw toNpe(ex); } } diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index b48299cd65..85d28438cd 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -59,8 +59,6 @@ protected Observable(OnSubscribe f) { this.onSubscribe = f; } - static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); - /** * Returns an Observable that will execute the specified function when a {@link Subscriber} subscribes to * it. @@ -91,7 +89,7 @@ protected Observable(OnSubscribe f) { * @see ReactiveX operators documentation: Create */ public static Observable create(OnSubscribe f) { - return new Observable(hook.onCreate(f)); + return new Observable(RxJavaHooks.onCreate(f)); } /** @@ -128,7 +126,7 @@ public static Observable create(OnSubscribe f) { */ @Beta public static Observable create(SyncOnSubscribe syncOnSubscribe) { - return new Observable(hook.onCreate(syncOnSubscribe)); + return create((OnSubscribe)syncOnSubscribe); } /** @@ -164,7 +162,7 @@ public static Observable create(SyncOnSubscribe syncOnSubscribe) */ @Experimental public static Observable create(AsyncOnSubscribe asyncOnSubscribe) { - return new Observable(hook.onCreate(asyncOnSubscribe)); + return create((OnSubscribe)asyncOnSubscribe); } /** @@ -238,7 +236,7 @@ public void call(Subscriber subscriber) { * @see RxJava wiki: Implementing Your Own Operators */ public final Observable lift(final Operator operator) { - return new Observable(new OnSubscribeLift(onSubscribe, operator)); + return create(new OnSubscribeLift(onSubscribe, operator)); } /** @@ -8663,21 +8661,21 @@ public final Subscription unsafeSubscribe(Subscriber subscriber) { // new Subscriber so onStart it subscriber.onStart(); // allow the hook to intercept and/or decorate - hook.onSubscribeStart(this, onSubscribe).call(subscriber); - return hook.onSubscribeReturn(subscriber); + RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber); + return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { - subscriber.onError(hook.onSubscribeError(e)); + subscriber.onError(RxJavaHooks.onObservableError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. - hook.onSubscribeError(r); + RxJavaHooks.onObservableError(r); // TODO why aren't we throwing the hook's return value. throw r; } @@ -8756,25 +8754,25 @@ static Subscription subscribe(Subscriber subscriber, Observable { * {@code OnExecute} to be executed when {@code execute(SingleSubscriber)} or * {@code subscribe(Subscriber)} is called */ - protected Single(final OnSubscribe f) { + protected Single(OnSubscribe f) { + final OnSubscribe g = RxJavaHooks.onCreate(f); // bridge between OnSubscribe (which all Operators and Observables use) and OnExecute (for Single) this.onSubscribe = new Observable.OnSubscribe() { @@ -91,18 +87,16 @@ public void onError(Throwable error) { }; child.add(ss); - f.call(ss); + g.call(ss); } }; } private Single(final Observable.OnSubscribe f) { - this.onSubscribe = f; + this.onSubscribe = RxJavaHooks.onCreate(f); } - static RxJavaSingleExecutionHook hook = RxJavaPlugins.getInstance().getSingleExecutionHook(); - /** * Returns a Single that will execute the specified function when a {@link SingleSubscriber} executes it or * a {@link Subscriber} subscribes to it. @@ -130,7 +124,7 @@ private Single(final Observable.OnSubscribe f) { * @see ReactiveX operators documentation: Create */ public static Single create(OnSubscribe f) { - return new Single(hook.onCreate(f)); + return new Single(f); } /** @@ -170,7 +164,7 @@ public final Single lift(final Operator lift) { @Override public void call(Subscriber o) { try { - final Subscriber st = hook.onLift(lift).call(o); + final Subscriber st = RxJavaHooks.onSingleLift(lift).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); @@ -1709,21 +1703,21 @@ public final Subscription unsafeSubscribe(Subscriber subscriber) { try { // new Subscriber so onStart it subscriber.onStart(); - hook.onSubscribeStart(this, onSubscribe).call(subscriber); - return hook.onSubscribeReturn(subscriber); + RxJavaHooks.onSingleStart(this, onSubscribe).call(subscriber); + return RxJavaHooks.onSingleReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { - subscriber.onError(hook.onSubscribeError(e)); + subscriber.onError(RxJavaHooks.onSingleError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. - hook.onSubscribeError(r); + RxJavaHooks.onSingleError(r); // TODO why aren't we throwing the hook's return value. throw r; } @@ -1819,21 +1813,21 @@ public final Subscription subscribe(Subscriber subscriber) { // The code below is exactly the same an unsafeSubscribe but not used because it would add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate - hook.onSubscribeStart(this, onSubscribe).call(subscriber); - return hook.onSubscribeReturn(subscriber); + RxJavaHooks.onSingleStart(this, onSubscribe).call(subscriber); + return RxJavaHooks.onSingleReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // if an unhandled error occurs executing the onSubscribe we will propagate it try { - subscriber.onError(hook.onSubscribeError(e)); + subscriber.onError(RxJavaHooks.onSingleError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. - hook.onSubscribeError(r); + RxJavaHooks.onSingleError(r); // TODO why aren't we throwing the hook's return value. throw r; } diff --git a/src/main/java/rx/exceptions/AssemblyStackTraceException.java b/src/main/java/rx/exceptions/AssemblyStackTraceException.java new file mode 100644 index 0000000000..ee6a8be6a9 --- /dev/null +++ b/src/main/java/rx/exceptions/AssemblyStackTraceException.java @@ -0,0 +1,52 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.exceptions; + +import rx.annotations.Experimental; + +/** + * A RuntimeException that is stackless but holds onto a textual + * stacktrace from tracking the assembly location of operators. + */ +@Experimental +public final class AssemblyStackTraceException extends RuntimeException { + + /** */ + private static final long serialVersionUID = 2038859767182585852L; + + /** + * Constructs an AssemblyStackTraceException with the given message and + * a cause. + * @param message the message + * @param cause the cause + */ + public AssemblyStackTraceException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs an AssemblyStackTraceException with the given message. + * @param message the message + */ + public AssemblyStackTraceException(String message) { + super(message); + } + + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } +} diff --git a/src/main/java/rx/exceptions/OnErrorThrowable.java b/src/main/java/rx/exceptions/OnErrorThrowable.java index 52ce45ed2a..2ef465141b 100644 --- a/src/main/java/rx/exceptions/OnErrorThrowable.java +++ b/src/main/java/rx/exceptions/OnErrorThrowable.java @@ -191,6 +191,7 @@ static String renderValue(Object value){ return ((Enum) value).name(); } + @SuppressWarnings("deprecation") String pluggedRendering = RxJavaPlugins.getInstance().getErrorHandler().handleOnNextValueRendering(value); if (pluggedRendering != null) { return pluggedRendering; diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java index bccea15cc4..96cccc23f7 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java @@ -22,7 +22,7 @@ import rx.Completable.*; import rx.exceptions.MissingBackpressureException; import rx.internal.util.unsafe.SpscArrayQueue; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; import rx.subscriptions.SerialSubscription; public final class CompletableOnSubscribeConcat implements CompletableOnSubscribe { @@ -87,7 +87,7 @@ public void onError(Throwable t) { actual.onError(t); return; } - RxJavaPlugins.getInstance().getErrorHandler().handleError(t); + RxJavaHooks.onError(t); } @Override @@ -125,7 +125,7 @@ void next() { } return; } - RxJavaPlugins.getInstance().getErrorHandler().handleError(new IllegalStateException("Queue is empty?!")); + RxJavaHooks.onError(new IllegalStateException("Queue is empty?!")); return; } diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java index 059e193da7..48dce53c9c 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java @@ -22,9 +22,9 @@ import rx.*; import rx.Completable.*; -import rx.exceptions.CompositeException; import rx.Observable; -import rx.plugins.RxJavaPlugins; +import rx.exceptions.CompositeException; +import rx.plugins.RxJavaHooks; import rx.subscriptions.CompositeSubscription; public final class CompletableOnSubscribeMerge implements CompletableOnSubscribe { @@ -110,7 +110,7 @@ public void onSubscribe(Subscription d) { @Override public void onError(Throwable e) { if (innerDone) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); return; } innerDone = true; @@ -145,7 +145,7 @@ public void onCompleted() { @Override public void onError(Throwable t) { if (done) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(t); + RxJavaHooks.onError(t); return; } getOrCreateErrors().offer(t); @@ -172,7 +172,7 @@ void terminate() { if (once.compareAndSet(false, true)) { actual.onError(e); } else { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); } } } else @@ -183,7 +183,7 @@ void terminate() { if (once.compareAndSet(false, true)) { actual.onError(e); } else { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); } } } diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java index f78c960430..d80f50abf6 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java @@ -20,7 +20,7 @@ import rx.*; import rx.Completable.*; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; import rx.subscriptions.CompositeSubscription; public final class CompletableOnSubscribeMergeArray implements CompletableOnSubscribe { @@ -50,7 +50,7 @@ public void call(final CompletableSubscriber s) { s.onError(npe); return; } else { - RxJavaPlugins.getInstance().getErrorHandler().handleError(npe); + RxJavaHooks.onError(npe); } } @@ -66,7 +66,7 @@ public void onError(Throwable e) { if (once.compareAndSet(false, true)) { s.onError(e); } else { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); } } diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java index e13670cd20..fed111a6c7 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java @@ -21,7 +21,7 @@ import rx.*; import rx.Completable.*; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; import rx.subscriptions.CompositeSubscription; public final class CompletableOnSubscribeMergeIterable implements CompletableOnSubscribe { @@ -66,7 +66,7 @@ public void call(final CompletableSubscriber s) { if (once.compareAndSet(false, true)) { s.onError(e); } else { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); } return; } @@ -88,7 +88,7 @@ public void call(final CompletableSubscriber s) { if (once.compareAndSet(false, true)) { s.onError(e); } else { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); } return; } @@ -103,7 +103,7 @@ public void call(final CompletableSubscriber s) { if (once.compareAndSet(false, true)) { s.onError(npe); } else { - RxJavaPlugins.getInstance().getErrorHandler().handleError(npe); + RxJavaHooks.onError(npe); } return; } @@ -122,7 +122,7 @@ public void onError(Throwable e) { if (once.compareAndSet(false, true)) { s.onError(e); } else { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); } } diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeTimeout.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeTimeout.java index b62c62274d..672cfad61b 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeTimeout.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeTimeout.java @@ -22,7 +22,7 @@ import rx.*; import rx.Completable.*; import rx.functions.Action0; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; import rx.subscriptions.CompositeSubscription; public final class CompletableOnSubscribeTimeout implements CompletableOnSubscribe { @@ -98,7 +98,7 @@ public void onError(Throwable e) { set.unsubscribe(); s.onError(e); } else { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); } } diff --git a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java index 10b4b582cd..e9dac2d026 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java +++ b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java @@ -23,7 +23,7 @@ import rx.functions.FuncN; import rx.internal.util.RxRingBuffer; import rx.internal.util.atomic.SpscLinkedArrayQueue; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; public final class OnSubscribeCombineLatest implements OnSubscribe { final Observable[] sources; @@ -386,7 +386,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(t); + RxJavaHooks.onError(t); return; } parent.onError(t); diff --git a/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java b/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java index c2799df758..fba34e2f6b 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java +++ b/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java @@ -28,7 +28,7 @@ import rx.internal.util.atomic.SpscAtomicArrayQueue; import rx.internal.util.unsafe.*; import rx.observers.SerializedSubscriber; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; import rx.subscriptions.SerialSubscription; /** @@ -210,7 +210,7 @@ void innerCompleted(long produced) { } void pluginError(Throwable e) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); } void drain() { diff --git a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java index dc3146d7e6..d25d940d61 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java +++ b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java @@ -56,7 +56,7 @@ public void onNext(U t) { @Override public void onError(Throwable e) { if (done) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); return; } done = true; diff --git a/src/main/java/rx/internal/operators/OnSubscribeLift.java b/src/main/java/rx/internal/operators/OnSubscribeLift.java index ba6210f38b..65f6f20d14 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeLift.java +++ b/src/main/java/rx/internal/operators/OnSubscribeLift.java @@ -29,8 +29,6 @@ */ public final class OnSubscribeLift implements OnSubscribe { - static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); - final OnSubscribe parent; final Operator operator; @@ -43,7 +41,7 @@ public OnSubscribeLift(OnSubscribe parent, Operator o @Override public void call(Subscriber o) { try { - Subscriber st = hook.onLift(operator).call(o); + Subscriber st = RxJavaHooks.onObservableLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java new file mode 100644 index 0000000000..fd338c1ece --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java @@ -0,0 +1,128 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import rx.*; +import rx.Observable.OnSubscribe; +import rx.exceptions.AssemblyStackTraceException; + +/** + * Captures the current stack when it is instantiated, makes + * it available through a field and attaches it to all + * passing exception. + * + * @param the value type + */ +public final class OnSubscribeOnAssembly implements OnSubscribe { + + final OnSubscribe source; + + final String stacktrace; + + /** + * If set to true, the creation of PublisherOnAssembly will capture the raw + * stacktrace instead of the sanitized version. + */ + public static volatile boolean fullStackTrace; + + public OnSubscribeOnAssembly(OnSubscribe source) { + this.source = source; + this.stacktrace = createStacktrace(); + } + + static String createStacktrace() { + StackTraceElement[] stes = Thread.currentThread().getStackTrace(); + + StringBuilder sb = new StringBuilder("Assembly trace:"); + + for (StackTraceElement e : stes) { + String row = e.toString(); + if (!fullStackTrace) { + if (e.getLineNumber() <= 1) { + continue; + } + if (row.contains("RxJavaHooks.")) { + continue; + } + if (row.contains("OnSubscribeOnAssembly")) { + continue; + } + if (row.contains(".junit.runner")) { + continue; + } + if (row.contains(".junit4.runner")) { + continue; + } + if (row.contains(".junit.internal")) { + continue; + } + if (row.contains("sun.reflect")) { + continue; + } + if (row.contains("java.lang.Thread.")) { + continue; + } + if (row.contains("ThreadPoolExecutor")) { + continue; + } + if (row.contains("org.apache.catalina.")) { + continue; + } + if (row.contains("org.apache.tomcat.")) { + continue; + } + } + sb.append("\n at ").append(row); + } + + return sb.append("\nOriginal exception:").toString(); + } + + @Override + public void call(Subscriber t) { + source.call(new OnAssemblySubscriber(t, stacktrace)); + } + + static final class OnAssemblySubscriber extends Subscriber { + + final Subscriber actual; + + final String stacktrace; + + public OnAssemblySubscriber(Subscriber actual, String stacktrace) { + super(actual); + this.actual = actual; + this.stacktrace = stacktrace; + } + + @Override + public void onCompleted() { + actual.onCompleted(); + } + + @Override + public void onError(Throwable e) { + e = new AssemblyStackTraceException(stacktrace, e); + actual.onError(e); + } + + @Override + public void onNext(T t) { + actual.onNext(t); + } + + } +} diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java new file mode 100644 index 0000000000..da5a144045 --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java @@ -0,0 +1,78 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import rx.*; +import rx.Completable.CompletableSubscriber; +import rx.exceptions.AssemblyStackTraceException; + +/** + * Captures the current stack when it is instantiated, makes + * it available through a field and attaches it to all + * passing exception. + * + * @param the value type + */ +public final class OnSubscribeOnAssemblyCompletable implements Completable.CompletableOnSubscribe { + + final Completable.CompletableOnSubscribe source; + + final String stacktrace; + + /** + * If set to true, the creation of PublisherOnAssembly will capture the raw + * stacktrace instead of the sanitized version. + */ + public static volatile boolean fullStackTrace; + + public OnSubscribeOnAssemblyCompletable(Completable.CompletableOnSubscribe source) { + this.source = source; + this.stacktrace = OnSubscribeOnAssembly.createStacktrace(); + } + + @Override + public void call(Completable.CompletableSubscriber t) { + source.call(new OnAssemblyCompletableSubscriber(t, stacktrace)); + } + + static final class OnAssemblyCompletableSubscriber implements CompletableSubscriber { + + final CompletableSubscriber actual; + + final String stacktrace; + + public OnAssemblyCompletableSubscriber(CompletableSubscriber actual, String stacktrace) { + this.actual = actual; + this.stacktrace = stacktrace; + } + + @Override + public void onSubscribe(Subscription d) { + actual.onSubscribe(d); + } + + @Override + public void onCompleted() { + actual.onCompleted(); + } + + @Override + public void onError(Throwable e) { + e = new AssemblyStackTraceException(stacktrace, e); + actual.onError(e); + } + } +} diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java new file mode 100644 index 0000000000..aeed623a7b --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java @@ -0,0 +1,74 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import rx.*; +import rx.exceptions.AssemblyStackTraceException; + +/** + * Captures the current stack when it is instantiated, makes + * it available through a field and attaches it to all + * passing exception. + * + * @param the value type + */ +public final class OnSubscribeOnAssemblySingle implements Single.OnSubscribe { + + final Single.OnSubscribe source; + + final String stacktrace; + + /** + * If set to true, the creation of PublisherOnAssembly will capture the raw + * stacktrace instead of the sanitized version. + */ + public static volatile boolean fullStackTrace; + + public OnSubscribeOnAssemblySingle(Single.OnSubscribe source) { + this.source = source; + this.stacktrace = OnSubscribeOnAssembly.createStacktrace(); + } + + @Override + public void call(SingleSubscriber t) { + source.call(new OnAssemblySingleSubscriber(t, stacktrace)); + } + + static final class OnAssemblySingleSubscriber extends SingleSubscriber { + + final SingleSubscriber actual; + + final String stacktrace; + + public OnAssemblySingleSubscriber(SingleSubscriber actual, String stacktrace) { + this.actual = actual; + this.stacktrace = stacktrace; + actual.add(this); + } + + @Override + public void onError(Throwable e) { + e = new AssemblyStackTraceException(stacktrace, e); + actual.onError(e); + } + + @Override + public void onSuccess(T t) { + actual.onSuccess(t); + } + + } +} diff --git a/src/main/java/rx/internal/operators/OperatorDoAfterTerminate.java b/src/main/java/rx/internal/operators/OperatorDoAfterTerminate.java index 64afca478a..a3efa8f845 100644 --- a/src/main/java/rx/internal/operators/OperatorDoAfterTerminate.java +++ b/src/main/java/rx/internal/operators/OperatorDoAfterTerminate.java @@ -19,7 +19,7 @@ import rx.Subscriber; import rx.exceptions.Exceptions; import rx.functions.Action0; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; /** * Registers an action to be called after an Observable invokes {@code onComplete} or {@code onError}. @@ -73,7 +73,7 @@ void callAction() { action.call(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - RxJavaPlugins.getInstance().getErrorHandler().handleError(ex); + RxJavaHooks.onError(ex); } } }; diff --git a/src/main/java/rx/internal/operators/OperatorGroupBy.java b/src/main/java/rx/internal/operators/OperatorGroupBy.java index bf474619bb..4ed1a504f3 100644 --- a/src/main/java/rx/internal/operators/OperatorGroupBy.java +++ b/src/main/java/rx/internal/operators/OperatorGroupBy.java @@ -25,7 +25,7 @@ import rx.internal.producers.ProducerArbiter; import rx.internal.util.*; import rx.observables.GroupedObservable; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; import rx.subscriptions.Subscriptions; /** @@ -196,7 +196,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (done) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(t); + RxJavaHooks.onError(t); return; } error = t; diff --git a/src/main/java/rx/internal/operators/OperatorMaterialize.java b/src/main/java/rx/internal/operators/OperatorMaterialize.java index 75700b8d11..4d01dadf39 100644 --- a/src/main/java/rx/internal/operators/OperatorMaterialize.java +++ b/src/main/java/rx/internal/operators/OperatorMaterialize.java @@ -17,11 +17,9 @@ import java.util.concurrent.atomic.AtomicLong; -import rx.Notification; +import rx.*; import rx.Observable.Operator; -import rx.Producer; -import rx.Subscriber; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; /** * Turns all of the notifications from an Observable into {@code onNext} emissions, and marks @@ -104,7 +102,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { terminalNotification = Notification.createOnError(e); - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); drain(); } diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index 10ad74e4e2..42fc5518ae 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -23,10 +23,10 @@ import rx.exceptions.MissingBackpressureException; import rx.functions.Action0; import rx.internal.schedulers.*; -import rx.internal.util.*; +import rx.internal.util.RxRingBuffer; import rx.internal.util.atomic.SpscAtomicArrayQueue; import rx.internal.util.unsafe.*; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; import rx.schedulers.Schedulers; /** @@ -177,7 +177,7 @@ public void onCompleted() { @Override public void onError(final Throwable e) { if (isUnsubscribed() || finished) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); return; } error = e; diff --git a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java b/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java index 7f5b22c0a2..d78b75ce57 100644 --- a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java +++ b/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java @@ -20,7 +20,7 @@ import rx.exceptions.Exceptions; import rx.functions.Func1; import rx.internal.producers.ProducerArbiter; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; import rx.subscriptions.SerialSubscription; /** @@ -105,7 +105,7 @@ public void onCompleted() { public void onError(Throwable e) { if (done) { Exceptions.throwIfFatal(e); - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); return; } done = true; diff --git a/src/main/java/rx/internal/operators/OperatorSwitch.java b/src/main/java/rx/internal/operators/OperatorSwitch.java index bbbcd9879d..c76e3008c9 100644 --- a/src/main/java/rx/internal/operators/OperatorSwitch.java +++ b/src/main/java/rx/internal/operators/OperatorSwitch.java @@ -25,7 +25,7 @@ import rx.functions.Action0; import rx.internal.util.RxRingBuffer; import rx.internal.util.atomic.SpscLinkedArrayQueue; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; import rx.subscriptions.*; /** @@ -238,7 +238,7 @@ void complete(long id) { } void pluginError(Throwable e) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); } void innerProducer(Producer p, long id) { diff --git a/src/main/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOther.java b/src/main/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOther.java index 41d0383ac9..92452706cb 100644 --- a/src/main/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOther.java +++ b/src/main/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOther.java @@ -16,11 +16,8 @@ package rx.internal.operators; -import rx.Observable; -import rx.Single; -import rx.SingleSubscriber; -import rx.Subscriber; -import rx.plugins.RxJavaPlugins; +import rx.*; +import rx.plugins.RxJavaHooks; import rx.subscriptions.SerialSubscription; /** @@ -66,7 +63,7 @@ public void onNext(Object t) { @Override public void onError(Throwable e) { if (done) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); return; } done = true; diff --git a/src/main/java/rx/internal/operators/SingleOnSubscribeUsing.java b/src/main/java/rx/internal/operators/SingleOnSubscribeUsing.java index 54cdfe5c24..9d7319724c 100644 --- a/src/main/java/rx/internal/operators/SingleOnSubscribeUsing.java +++ b/src/main/java/rx/internal/operators/SingleOnSubscribeUsing.java @@ -21,7 +21,7 @@ import rx.*; import rx.exceptions.*; import rx.functions.*; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; /** * Generates a resource, derives a Single from it and disposes that resource once the @@ -91,7 +91,7 @@ public void onSuccess(T value) { disposeAction.call(resource); } catch (Throwable ex2) { Exceptions.throwIfFatal(ex2); - RxJavaPlugins.getInstance().getErrorHandler().handleError(ex2); + RxJavaHooks.onError(ex2); } } } @@ -125,7 +125,7 @@ void handleSubscriptionTimeError(SingleSubscriber t, Resource resourc disposeAction.call(resource); } catch (Throwable ex2) { Exceptions.throwIfFatal(ex2); - RxJavaPlugins.getInstance().getErrorHandler().handleError(ex2); + RxJavaHooks.onError(ex2); } } } diff --git a/src/main/java/rx/internal/operators/SingleOperatorZip.java b/src/main/java/rx/internal/operators/SingleOperatorZip.java index 7614a6a6c5..93afe523d3 100644 --- a/src/main/java/rx/internal/operators/SingleOperatorZip.java +++ b/src/main/java/rx/internal/operators/SingleOperatorZip.java @@ -16,17 +16,15 @@ package rx.internal.operators; -import rx.Single; -import rx.SingleSubscriber; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.*; + +import rx.*; import rx.exceptions.Exceptions; import rx.functions.FuncN; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; import rx.subscriptions.CompositeSubscription; -import java.util.NoSuchElementException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - public class SingleOperatorZip { public static Single zip(final Single[] singles, final FuncN zipper) { @@ -75,7 +73,7 @@ public void onError(Throwable error) { if (once.compareAndSet(false, true)) { subscriber.onError(error); } else { - RxJavaPlugins.getInstance().getErrorHandler().handleError(error); + RxJavaHooks.onError(error); } } }; diff --git a/src/main/java/rx/internal/schedulers/ExecutorScheduler.java b/src/main/java/rx/internal/schedulers/ExecutorScheduler.java index a57ee8c938..b4bcf19d7f 100644 --- a/src/main/java/rx/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/rx/internal/schedulers/ExecutorScheduler.java @@ -20,7 +20,7 @@ import rx.*; import rx.functions.Action0; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; import rx.subscriptions.*; /** @@ -79,7 +79,7 @@ public Subscription schedule(Action0 action) { tasks.remove(ea); wip.decrementAndGet(); // report the error to the plugin - RxJavaPlugins.getInstance().getErrorHandler().handleError(t); + RxJavaHooks.onError(t); // throw it to the caller throw t; } @@ -158,7 +158,7 @@ public void call() { ea.add(f); } catch (RejectedExecutionException t) { // report the rejection to plugins - RxJavaPlugins.getInstance().getErrorHandler().handleError(t); + RxJavaHooks.onError(t); throw t; } diff --git a/src/main/java/rx/internal/schedulers/NewThreadWorker.java b/src/main/java/rx/internal/schedulers/NewThreadWorker.java index 585d27b3b6..6521aadf46 100644 --- a/src/main/java/rx/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/rx/internal/schedulers/NewThreadWorker.java @@ -34,7 +34,6 @@ */ public class NewThreadWorker extends Scheduler.Worker implements Subscription { private final ScheduledExecutorService executor; - private final RxJavaSchedulersHook schedulersHook; volatile boolean isUnsubscribed; /** The purge frequency in milliseconds. */ private static final String FREQUENCY_KEY = "rx.scheduler.jdk6.purge-frequency-millis"; @@ -112,7 +111,7 @@ static void purgeExecutors() { } } catch (Throwable t) { Exceptions.throwIfFatal(t); - RxJavaPlugins.getInstance().getErrorHandler().handleError(t); + RxJavaHooks.onError(t); } } @@ -168,7 +167,7 @@ public static boolean tryEnableCancelPolicy(ScheduledExecutorService executor) { methodToCall.invoke(executor, true); return true; } catch (Exception e) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); } } } @@ -207,7 +206,6 @@ public NewThreadWorker(ThreadFactory threadFactory) { if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) { registerExecutor((ScheduledThreadPoolExecutor)exec); } - schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook(); executor = exec; } @@ -233,7 +231,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit * @return the wrapper ScheduledAction */ public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { - Action0 decoratedAction = schedulersHook.onSchedule(action); + Action0 decoratedAction = RxJavaHooks.onScheduledAction(action); ScheduledAction run = new ScheduledAction(decoratedAction); Future f; if (delayTime <= 0) { @@ -246,7 +244,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time return run; } public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) { - Action0 decoratedAction = schedulersHook.onSchedule(action); + Action0 decoratedAction = RxJavaHooks.onScheduledAction(action); ScheduledAction run = new ScheduledAction(decoratedAction, parent); parent.add(run); @@ -262,7 +260,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time } public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) { - Action0 decoratedAction = schedulersHook.onSchedule(action); + Action0 decoratedAction = RxJavaHooks.onScheduledAction(action); ScheduledAction run = new ScheduledAction(decoratedAction, parent); parent.add(run); diff --git a/src/main/java/rx/internal/schedulers/ScheduledAction.java b/src/main/java/rx/internal/schedulers/ScheduledAction.java index 0f7d145a20..6ba16beaee 100644 --- a/src/main/java/rx/internal/schedulers/ScheduledAction.java +++ b/src/main/java/rx/internal/schedulers/ScheduledAction.java @@ -22,7 +22,7 @@ import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action0; import rx.internal.util.SubscriptionList; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; import rx.subscriptions.CompositeSubscription; /** @@ -61,7 +61,7 @@ public void run() { } else { ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e); } - RxJavaPlugins.getInstance().getErrorHandler().handleError(ie); + RxJavaHooks.onError(ie); Thread thread = Thread.currentThread(); thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); } finally { diff --git a/src/main/java/rx/internal/util/RxJavaPluginUtils.java b/src/main/java/rx/internal/util/RxJavaPluginUtils.java index b6b462412c..4e459130e5 100644 --- a/src/main/java/rx/internal/util/RxJavaPluginUtils.java +++ b/src/main/java/rx/internal/util/RxJavaPluginUtils.java @@ -15,13 +15,13 @@ */ package rx.internal.util; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; public final class RxJavaPluginUtils { public static void handleException(Throwable e) { try { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); } catch (Throwable pluginException) { handlePluginException(pluginException); } diff --git a/src/main/java/rx/internal/util/ScalarSynchronousObservable.java b/src/main/java/rx/internal/util/ScalarSynchronousObservable.java index 676527f954..8c56935596 100644 --- a/src/main/java/rx/internal/util/ScalarSynchronousObservable.java +++ b/src/main/java/rx/internal/util/ScalarSynchronousObservable.java @@ -34,14 +34,6 @@ * @param the value type */ public final class ScalarSynchronousObservable extends Observable { - /** - * The execution hook instance. - *

- * Can't be final to allow tests overriding it in place; if the class - * has been initialized, the plugin reset has no effect because - * how RxJavaPlugins was designed. - */ - static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook(); /** * Indicates that the Producer used by this Observable should be fully * threadsafe. It is possible, but unlikely that multiple concurrent @@ -81,7 +73,7 @@ public static ScalarSynchronousObservable create(T t) { final T t; protected ScalarSynchronousObservable(final T t) { - super(hook.onCreate(new JustOnSubscribe(t))); + super(RxJavaHooks.onCreate(new JustOnSubscribe(t))); this.t = t; } diff --git a/src/main/java/rx/observables/AsyncOnSubscribe.java b/src/main/java/rx/observables/AsyncOnSubscribe.java index 9b8926622f..a32175fd9c 100644 --- a/src/main/java/rx/observables/AsyncOnSubscribe.java +++ b/src/main/java/rx/observables/AsyncOnSubscribe.java @@ -27,7 +27,7 @@ import rx.functions.*; import rx.internal.operators.BufferUntilSubscriber; import rx.observers.SerializedObserver; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; import rx.subscriptions.CompositeSubscription; /** @@ -554,7 +554,7 @@ boolean tryEmit(long n) { private void handleThrownError(Throwable ex) { if (hasTerminated) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(ex); + RxJavaHooks.onError(ex); } else { hasTerminated = true; merger.onError(ex); diff --git a/src/main/java/rx/observables/SyncOnSubscribe.java b/src/main/java/rx/observables/SyncOnSubscribe.java index 461abe28d4..34745fb118 100644 --- a/src/main/java/rx/observables/SyncOnSubscribe.java +++ b/src/main/java/rx/observables/SyncOnSubscribe.java @@ -18,20 +18,13 @@ import java.util.concurrent.atomic.AtomicLong; +import rx.*; import rx.Observable.OnSubscribe; -import rx.Observer; -import rx.Producer; -import rx.Subscriber; -import rx.Subscription; import rx.annotations.Beta; import rx.exceptions.Exceptions; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Action2; -import rx.functions.Func0; -import rx.functions.Func2; +import rx.functions.*; import rx.internal.operators.BackpressureUtils; -import rx.plugins.RxJavaPlugins; +import rx.plugins.RxJavaHooks; /** * A utility class to create {@code OnSubscribe} functions that respond correctly to back @@ -387,7 +380,7 @@ private void doUnsubscribe() { parent.onUnsubscribe(state); } catch (Throwable e) { Exceptions.throwIfFatal(e); - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); } } @@ -422,7 +415,7 @@ private void fastpath() { private void handleThrownError(Subscriber a, Throwable ex) { if (hasTerminated) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(ex); + RxJavaHooks.onError(ex); } else { hasTerminated = true; a.onError(ex); diff --git a/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java b/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java index 87160f8326..287289242d 100644 --- a/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java +++ b/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java @@ -50,6 +50,7 @@ public abstract class RxJavaCompletableExecutionHook { * @return {@link rx.Completable.CompletableOnSubscribe} function that can be modified, decorated, replaced or just * returned as a pass through */ + @Deprecated public Completable.CompletableOnSubscribe onCreate(Completable.CompletableOnSubscribe f) { return f; } @@ -66,6 +67,7 @@ public Completable.CompletableOnSubscribe onCreate(Completable.CompletableOnSubs * @return {@link rx.Completable.CompletableOnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just * returned as a pass through */ + @Deprecated public Completable.CompletableOnSubscribe onSubscribeStart(Completable completableInstance, final Completable.CompletableOnSubscribe onSubscribe) { // pass through by default return onSubscribe; @@ -81,6 +83,7 @@ public Completable.CompletableOnSubscribe onSubscribeStart(Completable completab * Throwable thrown by {@link Completable#subscribe(Subscriber)} * @return Throwable that can be decorated, replaced or just returned as a pass through */ + @Deprecated public Throwable onSubscribeError(Throwable e) { // pass through by default return e; @@ -98,6 +101,7 @@ public Throwable onSubscribeError(Throwable e) { * @return {@link rx.Completable.CompletableOperator}{@code } function that can be modified, decorated, replaced or just * returned as a pass through */ + @Deprecated public Completable.CompletableOperator onLift(final Completable.CompletableOperator lift) { return lift; } diff --git a/src/main/java/rx/plugins/RxJavaErrorHandler.java b/src/main/java/rx/plugins/RxJavaErrorHandler.java index a6e56475ed..8107fab457 100644 --- a/src/main/java/rx/plugins/RxJavaErrorHandler.java +++ b/src/main/java/rx/plugins/RxJavaErrorHandler.java @@ -43,6 +43,7 @@ public abstract class RxJavaErrorHandler { * @param e * the {@code Exception} */ + @Deprecated public void handleError(Throwable e) { // do nothing by default } @@ -57,7 +58,7 @@ public void handleError(Throwable e) { * Note that primitive types are always rendered as their {@code toString()} value. *

* If a {@code Throwable} is caught when rendering, this will fallback to the item's classname suffixed by - * {@value #ERROR_IN_RENDERING_SUFFIX}. + * {@code ERROR_IN_RENDERING_SUFFIX}. * * @param item the last emitted item, that caused the exception wrapped in * {@code OnErrorThrowable.OnNextValue} diff --git a/src/main/java/rx/plugins/RxJavaHooks.java b/src/main/java/rx/plugins/RxJavaHooks.java new file mode 100644 index 0000000000..bbdb4aaf99 --- /dev/null +++ b/src/main/java/rx/plugins/RxJavaHooks.java @@ -0,0 +1,599 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.plugins; + +import rx.*; +import rx.Completable.CompletableOnSubscribe; +import rx.Observable.*; +import rx.annotations.Experimental; +import rx.functions.*; +import rx.internal.operators.*; + +/** + * Utility class that holds hooks for various Observable, Single and Completable lifecycle-related + * points as well as Scheduler hooks. + */ +@Experimental +public final class RxJavaHooks { + /** Utility class. */ + private RxJavaHooks() { + throw new IllegalStateException("No instances!"); + } + + /** + * Prevents changing the hook callbacks when set to true. + */ + /* test */ static volatile boolean lockdown; + + static volatile Action1 onError; + + @SuppressWarnings("rawtypes") + static volatile Func1 onObservableCreate; + + @SuppressWarnings("rawtypes") + static volatile Func1 onSingleCreate; + + static volatile Func1 onCompletableCreate; + + @SuppressWarnings("rawtypes") + static volatile Func2 onObservableStart; + + @SuppressWarnings("rawtypes") + static volatile Func2 onSingleStart; + + static volatile Func2 onCompletableStart; + + + static volatile Func1 onComputationScheduler; + + static volatile Func1 onIOScheduler; + + static volatile Func1 onNewThreadScheduler; + + static volatile Func1 onScheduleAction; + + static volatile Func1 onObservableReturn; + + static volatile Func1 onSingleReturn; + + /** Initialize with the default delegation to the original RxJavaPlugins. */ + static { + init(); + } + + /** + * Initialize the hooks via delegating to RxJavaPlugins. + */ + @SuppressWarnings({ "rawtypes", "unchecked", "deprecation"}) + static void init() { + onError = new Action1() { + @Override + public void call(Throwable e) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + } + }; + + RxJavaPlugins.getInstance().getObservableExecutionHook(); + + onObservableCreate = new Func1() { + @Override + public OnSubscribe call(OnSubscribe f) { + return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f); + } + }; + + onObservableStart = new Func2() { + @Override + public OnSubscribe call(Observable t1, OnSubscribe t2) { + return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2); + } + }; + + onObservableReturn = new Func1() { + @Override + public Subscription call(Subscription f) { + return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(f); + } + }; + + RxJavaPlugins.getInstance().getSingleExecutionHook(); + + onSingleCreate = new Func1() { + @Override + public rx.Single.OnSubscribe call(rx.Single.OnSubscribe f) { + return RxJavaPlugins.getInstance().getSingleExecutionHook().onCreate(f); + } + }; + + onSingleStart = new Func2() { + @Override + public Observable.OnSubscribe call(Single t1, Observable.OnSubscribe t2) { + return RxJavaPlugins.getInstance().getSingleExecutionHook().onSubscribeStart(t1, t2); + } + }; + + onSingleReturn = new Func1() { + @Override + public Subscription call(Subscription f) { + return RxJavaPlugins.getInstance().getSingleExecutionHook().onSubscribeReturn(f); + } + }; + + RxJavaPlugins.getInstance().getCompletableExecutionHook(); + + onCompletableCreate = new Func1() { + @Override + public CompletableOnSubscribe call(CompletableOnSubscribe f) { + return RxJavaPlugins.getInstance().getCompletableExecutionHook().onCreate(f); + } + }; + + onCompletableStart = new Func2() { + @Override + public Completable.CompletableOnSubscribe call(Completable t1, Completable.CompletableOnSubscribe t2) { + return RxJavaPlugins.getInstance().getCompletableExecutionHook().onSubscribeStart(t1, t2); + } + }; + + onScheduleAction = new Func1() { + @Override + public Action0 call(Action0 a) { + return RxJavaPlugins.getInstance().getSchedulersHook().onSchedule(a); + } + }; + } + + /** + * Reset all hook callbacks to those of the current RxJavaPlugins handlers. + * + * @see #clear() + */ + public static void reset() { + if (lockdown) { + return; + } + init(); + } + + /** + * Clears all hooks to be no-operations (and passthroughs) + * and onError hook to signal errors to the caller thread's + * UncaughtExceptionHandler. + * + * @see #reset() + */ + public static void clear() { + if (lockdown) { + return; + } + onError = null; + + onObservableCreate = null; + onObservableStart = null; + onObservableReturn = null; + + onSingleCreate = null; + onSingleStart = null; + onSingleReturn = null; + + onCompletableCreate = null; + onCompletableStart = null; + + onComputationScheduler = null; + onIOScheduler = null; + onNewThreadScheduler = null; + } + + /** + * Prevents changing a hooks. + */ + public static void lockdown() { + lockdown = true; + } + + /** + * Returns true if the hooks can no longer be changed. + * @return true if the hooks can no longer be changed + */ + public static boolean isLockdown() { + return lockdown; + } + /** + * Consume undeliverable Throwables (acts as a global catch). + * @param ex the exception to handle + */ + public static void onError(Throwable ex) { + Action1 f = onError; + if (f != null) { + f.call(ex); + return; + } + Thread current = Thread.currentThread(); + current.getUncaughtExceptionHandler().uncaughtException(current, ex); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Observable.OnSubscribe onCreate(Observable.OnSubscribe onSubscribe) { + Func1 f = onObservableCreate; + if (f != null) { + return f.call(onSubscribe); + } + return onSubscribe; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Single.OnSubscribe onCreate(Single.OnSubscribe onSubscribe) { + Func1 f = onSingleCreate; + if (f != null) { + return f.call(onSubscribe); + } + return onSubscribe; + } + + public static Completable.CompletableOnSubscribe onCreate(Completable.CompletableOnSubscribe onSubscribe) { + Func1 f = onCompletableCreate; + if (f != null) { + return f.call(onSubscribe); + } + return onSubscribe; + } + + public static Scheduler onComputationScheduler(Scheduler scheduler) { + Func1 f = onComputationScheduler; + if (f != null) { + return f.call(scheduler); + } + return scheduler; + } + + public static Scheduler onIOScheduler(Scheduler scheduler) { + Func1 f = onIOScheduler; + if (f != null) { + return f.call(scheduler); + } + return scheduler; + } + + public static Scheduler onNewThreadScheduler(Scheduler scheduler) { + Func1 f = onNewThreadScheduler; + if (f != null) { + return f.call(scheduler); + } + return scheduler; + } + + public static Action0 onScheduledAction(Action0 action) { + Func1 f = onScheduleAction; + if (f != null) { + return f.call(action); + } + return action; + } + + public static void setOnCompletableCreate( + Func1 onCompletableCreate) { + if (lockdown) { + return; + } + RxJavaHooks.onCompletableCreate = onCompletableCreate; + } + + public static void setOnComputationScheduler(Func1 onComputationScheduler) { + if (lockdown) { + return; + } + RxJavaHooks.onComputationScheduler = onComputationScheduler; + } + + public static void setOnError(Action1 onError) { + if (lockdown) { + return; + } + RxJavaHooks.onError = onError; + } + + public static void setOnIOScheduler(Func1 onIOScheduler) { + if (lockdown) { + return; + } + RxJavaHooks.onIOScheduler = onIOScheduler; + } + + public static void setOnNewThreadScheduler(Func1 onNewThreadScheduler) { + if (lockdown) { + return; + } + RxJavaHooks.onNewThreadScheduler = onNewThreadScheduler; + } + + @SuppressWarnings("rawtypes") + public static void setOnObservableCreate( + Func1 onObservableCreate) { + if (lockdown) { + return; + } + RxJavaHooks.onObservableCreate = onObservableCreate; + } + + public static void setOnScheduleAction(Func1 onScheduleAction) { + if (lockdown) { + return; + } + RxJavaHooks.onScheduleAction = onScheduleAction; + } + + @SuppressWarnings("rawtypes") + public static void setOnSingleCreate(Func1 onSingleCreate) { + if (lockdown) { + return; + } + RxJavaHooks.onSingleCreate = onSingleCreate; + } + + public static void setOnCompletableStart( + Func2 onCompletableStart) { + if (lockdown) { + return; + } + RxJavaHooks.onCompletableStart = onCompletableStart; + } + + @SuppressWarnings("rawtypes") + public static void setOnObservableStart( + Func2 onObservableStart) { + if (lockdown) { + return; + } + RxJavaHooks.onObservableStart = onObservableStart; + } + + @SuppressWarnings("rawtypes") + public static void setOnSingleStart(Func2 onSingleStart) { + if (lockdown) { + return; + } + RxJavaHooks.onSingleStart = onSingleStart; + } + + public static void setOnObservableReturn(Func1 onObservableReturn) { + if (lockdown) { + return; + } + RxJavaHooks.onObservableReturn = onObservableReturn; + } + + public static void setOnSingleReturn(Func1 onSingleReturn) { + if (lockdown) { + return; + } + RxJavaHooks.onSingleReturn = onSingleReturn; + } + + public static Func1 getOnComputationScheduler() { + return onComputationScheduler; + } + + public static Action1 getOnError() { + return onError; + } + + public static Func1 getOnIOScheduler() { + return onIOScheduler; + } + + public static Func1 getOnNewThreadScheduler() { + return onNewThreadScheduler; + } + + @SuppressWarnings("rawtypes") + public static Func1 getOnObservableCreate() { + return onObservableCreate; + } + + public static Func1 getOnScheduleAction() { + return onScheduleAction; + } + + @SuppressWarnings("rawtypes") + public static Func1 getOnSingleCreate() { + return onSingleCreate; + } + + public static Func1 getOnCompletableCreate() { + return onCompletableCreate; + } + + public static Func2 getOnCompletableStart() { + return onCompletableStart; + } + + @SuppressWarnings("rawtypes") + public static Func2 getOnObservableStart() { + return onObservableStart; + } + + @SuppressWarnings("rawtypes") + public static Func2 getOnSingleStart() { + return onSingleStart; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static OnSubscribe onObservableStart(Observable instance, OnSubscribe onSubscribe) { + Func2 f = onObservableStart; + if (f != null) { + return f.call(instance, onSubscribe); + } + return onSubscribe; + } + + public static Func1 getOnObservableReturn() { + return onObservableReturn; + } + + public static Func1 getOnSingleReturn() { + return onSingleReturn; + } + + public static Subscription onObservableReturn(Subscription subscription) { + Func1 f = onObservableReturn; + if (f != null) { + return f.call(subscription); + } + return subscription; + } + + public static Throwable onObservableError(Throwable error) { + // TODO add hook + return error; + } + + public static Operator onObservableLift(Operator operator) { + // TODO add hook + return operator; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Observable.OnSubscribe onSingleStart(Single instance, Observable.OnSubscribe onSubscribe) { + Func2 f = onSingleStart; + if (f != null) { + return f.call(instance, onSubscribe); + } + return onSubscribe; + } + + public static Subscription onSingleReturn(Subscription subscription) { + Func1 f = onSingleReturn; + if (f != null) { + return f.call(subscription); + } + return subscription; + } + + public static Throwable onSingleError(Throwable error) { + // TODO add hook + return error; + } + + public static Operator onSingleLift(Operator operator) { + // TODO add hook + return operator; + } + + public static Completable.CompletableOnSubscribe onCompletableStart(Completable instance, Completable.CompletableOnSubscribe onSubscribe) { + Func2 f = onCompletableStart; + if (f != null) { + return f.call(instance, onSubscribe); + } + return onSubscribe; + } + + public static Throwable onCompletableError(Throwable error) { + // TODO add hook + return error; + } + + public static Completable.CompletableOperator onCompletableLift(Completable.CompletableOperator operator) { + // TODO add hook + return operator; + } + + /** + * Resets the assembly tracking hooks to their default delegates to + * RxJavaPlugins. + */ + @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) + public static void resetAssemblyTracking() { + if (lockdown) { + return; + } + + RxJavaPlugins plugin = RxJavaPlugins.getInstance(); + + final RxJavaObservableExecutionHook observableExecutionHook = plugin.getObservableExecutionHook(); + + onObservableCreate = new Func1() { + @Override + public OnSubscribe call(OnSubscribe f) { + return observableExecutionHook.onCreate(f); + } + }; + + final RxJavaSingleExecutionHook singleExecutionHook = plugin.getSingleExecutionHook(); + + onSingleCreate = new Func1() { + @Override + public rx.Single.OnSubscribe call(rx.Single.OnSubscribe f) { + return singleExecutionHook.onCreate(f); + } + }; + + final RxJavaCompletableExecutionHook completableExecutionHook = plugin.getCompletableExecutionHook(); + + onCompletableCreate = new Func1() { + @Override + public CompletableOnSubscribe call(CompletableOnSubscribe f) { + return completableExecutionHook.onCreate(f); + } + }; + + } + + /** + * Clears the assembly tracking hooks to their default pass-through behavior. + */ + public static void crearAssemblyTracking() { + if (lockdown) { + return; + } + onObservableCreate = null; + onSingleCreate = null; + onCompletableCreate = null; + } + + /** + * Sets up hooks that capture the current stacktrace when a source or an + * operator is instantiated, keeping it in a field for debugging purposes + * and alters exceptions passign along to hold onto this stacktrace. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static void enableAssemblyTracking() { + if (lockdown) { + return; + } + + onObservableCreate = new Func1() { + @Override + public OnSubscribe call(OnSubscribe f) { + return new OnSubscribeOnAssembly(f); + } + }; + + onSingleCreate = new Func1() { + @Override + public rx.Single.OnSubscribe call(rx.Single.OnSubscribe f) { + return new OnSubscribeOnAssemblySingle(f); + } + }; + + onCompletableCreate = new Func1() { + @Override + public CompletableOnSubscribe call(CompletableOnSubscribe f) { + return new OnSubscribeOnAssemblyCompletable(f); + } + }; + + } +} diff --git a/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java b/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java index c7c085c7fe..8c08851ed5 100644 --- a/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java +++ b/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java @@ -52,6 +52,7 @@ public abstract class RxJavaObservableExecutionHook { * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just * returned as a pass through */ + @Deprecated public OnSubscribe onCreate(OnSubscribe f) { return f; } @@ -69,6 +70,7 @@ public OnSubscribe onCreate(OnSubscribe f) { * @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just * returned as a pass through */ + @Deprecated public OnSubscribe onSubscribeStart(Observable observableInstance, final OnSubscribe onSubscribe) { // pass through by default return onSubscribe; @@ -87,6 +89,7 @@ public OnSubscribe onSubscribeStart(Observable observableIns * @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a * pass through */ + @Deprecated public Subscription onSubscribeReturn(Subscription subscription) { // pass through by default return subscription; @@ -103,6 +106,7 @@ public Subscription onSubscribeReturn(Subscription subscription) { * Throwable thrown by {@link Observable#subscribe(Subscriber)} * @return Throwable that can be decorated, replaced or just returned as a pass through */ + @Deprecated public Throwable onSubscribeError(Throwable e) { // pass through by default return e; @@ -122,6 +126,7 @@ public Throwable onSubscribeError(Throwable e) { * @return {@link Operator}{@code } function that can be modified, decorated, replaced or just * returned as a pass through */ + @Deprecated public Operator onLift(final Operator lift) { return lift; } diff --git a/src/main/java/rx/plugins/RxJavaPlugins.java b/src/main/java/rx/plugins/RxJavaPlugins.java index f9926ac588..9271a42ce2 100644 --- a/src/main/java/rx/plugins/RxJavaPlugins.java +++ b/src/main/java/rx/plugins/RxJavaPlugins.java @@ -45,6 +45,9 @@ * * * @see RxJava Wiki: Plugins + * + * Use the {@link RxJavaHooks} features instead which let's you change individual + * handlers at runtime. */ public class RxJavaPlugins { private final static RxJavaPlugins INSTANCE = new RxJavaPlugins(); @@ -59,7 +62,10 @@ public class RxJavaPlugins { * Retrieves the single {@code RxJavaPlugins} instance. * * @return the single {@code RxJavaPlugins} instance + * + * @deprecated use the static methods of {@link RxJavaHooks}. */ + @Deprecated public static RxJavaPlugins getInstance() { return INSTANCE; } diff --git a/src/main/java/rx/plugins/RxJavaSchedulersHook.java b/src/main/java/rx/plugins/RxJavaSchedulersHook.java index e50d6a611e..a8158e2f5e 100644 --- a/src/main/java/rx/plugins/RxJavaSchedulersHook.java +++ b/src/main/java/rx/plugins/RxJavaSchedulersHook.java @@ -148,6 +148,7 @@ public Scheduler getNewThreadScheduler() { * @param action action to schedule * @return wrapped action to schedule */ + @Deprecated public Action0 onSchedule(Action0 action) { return action; } diff --git a/src/main/java/rx/plugins/RxJavaSingleExecutionHook.java b/src/main/java/rx/plugins/RxJavaSingleExecutionHook.java index ed780381b8..4e023f615e 100644 --- a/src/main/java/rx/plugins/RxJavaSingleExecutionHook.java +++ b/src/main/java/rx/plugins/RxJavaSingleExecutionHook.java @@ -51,6 +51,7 @@ public abstract class RxJavaSingleExecutionHook { * @return {@link rx.Single.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just * returned as a pass through */ + @Deprecated public Single.OnSubscribe onCreate(Single.OnSubscribe f) { return f; } @@ -68,6 +69,7 @@ public Single.OnSubscribe onCreate(Single.OnSubscribe f) { * @return {@link rx.Observable.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just * returned as a pass through */ + @Deprecated public Observable.OnSubscribe onSubscribeStart(Single singleInstance, final Observable.OnSubscribe onSubscribe) { // pass through by default return onSubscribe; @@ -86,6 +88,7 @@ public Observable.OnSubscribe onSubscribeStart(Single single * @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a * pass through */ + @Deprecated public Subscription onSubscribeReturn(Subscription subscription) { // pass through by default return subscription; @@ -102,6 +105,7 @@ public Subscription onSubscribeReturn(Subscription subscription) { * Throwable thrown by {@link Single#subscribe(Subscriber)} * @return Throwable that can be decorated, replaced or just returned as a pass through */ + @Deprecated public Throwable onSubscribeError(Throwable e) { // pass through by default return e; @@ -121,6 +125,7 @@ public Throwable onSubscribeError(Throwable e) { * @return {@link rx.Observable.Operator}{@code } function that can be modified, decorated, replaced or just * returned as a pass through */ + @Deprecated public Observable.Operator onLift(final Observable.Operator lift) { return lift; } diff --git a/src/main/java/rx/schedulers/Schedulers.java b/src/main/java/rx/schedulers/Schedulers.java index 031d0bb463..6dc89f0178 100644 --- a/src/main/java/rx/schedulers/Schedulers.java +++ b/src/main/java/rx/schedulers/Schedulers.java @@ -21,8 +21,7 @@ import rx.internal.schedulers.GenericScheduledExecutorService; import rx.internal.schedulers.SchedulerLifecycle; import rx.internal.util.RxRingBuffer; -import rx.plugins.RxJavaPlugins; -import rx.plugins.RxJavaSchedulersHook; +import rx.plugins.*; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; @@ -54,6 +53,7 @@ private static Schedulers getInstance() { } private Schedulers() { + @SuppressWarnings("deprecation") RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook(); Scheduler c = hook.getComputationScheduler(); @@ -105,7 +105,7 @@ public static Scheduler trampoline() { * @return a {@link Scheduler} that creates new threads */ public static Scheduler newThread() { - return getInstance().newThreadScheduler; + return RxJavaHooks.onNewThreadScheduler(getInstance().newThreadScheduler); } /** @@ -120,7 +120,7 @@ public static Scheduler newThread() { * @return a {@link Scheduler} meant for computation-bound work */ public static Scheduler computation() { - return getInstance().computationScheduler; + return RxJavaHooks.onComputationScheduler(getInstance().computationScheduler); } /** @@ -137,7 +137,7 @@ public static Scheduler computation() { * @return a {@link Scheduler} meant for IO-bound work */ public static Scheduler io() { - return getInstance().ioScheduler; + return RxJavaHooks.onComputationScheduler(getInstance().ioScheduler); } /** diff --git a/src/test/java/rx/CompletableTest.java b/src/test/java/rx/CompletableTest.java index a2a49d6c2d..018bf64509 100644 --- a/src/test/java/rx/CompletableTest.java +++ b/src/test/java/rx/CompletableTest.java @@ -31,7 +31,7 @@ import rx.exceptions.*; import rx.functions.*; import rx.observers.TestSubscriber; -import rx.plugins.*; +import rx.plugins.RxJavaHooks; import rx.schedulers.*; import rx.subjects.PublishSubject; import rx.subscriptions.*; @@ -1216,7 +1216,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaHooks.onError(e); } }); @@ -4036,13 +4036,34 @@ public void onNext(Object t) { } } - private static RxJavaCompletableExecutionHook hookSpy; + private Func1 onCreate; + + private Func2 onStart; @Before public void setUp() throws Exception { - hookSpy = spy( - new RxJavaPluginsTest.RxJavaCompletableExecutionHookTestImpl()); - Completable.HOOK = hookSpy; + onCreate = spy(new Func1() { + @Override + public CompletableOnSubscribe call(CompletableOnSubscribe t) { + return t; + } + }); + + RxJavaHooks.setOnCompletableCreate(onCreate); + + onStart = spy(new Func2() { + @Override + public CompletableOnSubscribe call(Completable t1, CompletableOnSubscribe t2) { + return t2; + } + }); + + RxJavaHooks.setOnCompletableStart(onStart); + } + + @After + public void after() { + RxJavaHooks.reset(); } @Test @@ -4050,7 +4071,7 @@ public void testHookCreate() { CompletableOnSubscribe subscriber = mock(CompletableOnSubscribe.class); Completable.create(subscriber); - verify(hookSpy, times(1)).onCreate(subscriber); + verify(onCreate, times(1)).call(subscriber); } @Test @@ -4064,7 +4085,7 @@ public void testHookSubscribeStart() { }); completable.subscribe(ts); - verify(hookSpy, times(1)).onSubscribeStart(eq(completable), any(Completable.CompletableOnSubscribe.class)); + verify(onStart, times(1)).call(eq(completable), any(Completable.CompletableOnSubscribe.class)); } @Test @@ -4077,7 +4098,7 @@ public void testHookUnsafeSubscribeStart() { }); completable.unsafeSubscribe(ts); - verify(hookSpy, times(1)).onSubscribeStart(eq(completable), any(Completable.CompletableOnSubscribe.class)); + verify(onStart, times(1)).call(eq(completable), any(Completable.CompletableOnSubscribe.class)); } @Test diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 12b059e648..7f45192044 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -21,33 +21,64 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import rx.Single.OnSubscribe; import rx.exceptions.*; import rx.functions.*; -import rx.observers.SafeSubscriber; -import rx.observers.TestSubscriber; -import rx.plugins.RxJavaPluginsTest; -import rx.plugins.RxJavaSingleExecutionHook; -import rx.schedulers.Schedulers; -import rx.schedulers.TestScheduler; +import rx.observers.*; +import rx.plugins.RxJavaHooks; +import rx.schedulers.*; import rx.singles.BlockingSingle; import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; public class SingleTest { - private static RxJavaSingleExecutionHook hookSpy; + @SuppressWarnings("rawtypes") + private Func1 onCreate; + + @SuppressWarnings("rawtypes") + private Func2 onStart; + private Func1 onReturn; + + @SuppressWarnings("rawtypes") @Before public void setUp() throws Exception { - hookSpy = spy( - new RxJavaPluginsTest.RxJavaSingleExecutionHookTestImpl()); - Single.hook = hookSpy; + onCreate = spy(new Func1() { + @Override + public Single.OnSubscribe call(Single.OnSubscribe t) { + return t; + } + }); + + RxJavaHooks.setOnSingleCreate(onCreate); + + onStart = spy(new Func2() { + @Override + public Observable.OnSubscribe call(Single t1, Observable.OnSubscribe t2) { + return t2; + } + }); + + RxJavaHooks.setOnSingleStart(onStart); + + onReturn = spy(new Func1() { + @Override + public Subscription call(Subscription t) { + return t; + } + }); + + RxJavaHooks.setOnSingleReturn(onReturn); + } + + @After + public void after() { + RxJavaHooks.reset(); } @Test @@ -400,10 +431,9 @@ public void testHookCreate() { OnSubscribe subscriber = mock(OnSubscribe.class); Single.create(subscriber); - verify(hookSpy, times(1)).onCreate(subscriber); + verify(onCreate, times(1)).call(subscriber); } - @SuppressWarnings("unchecked") @Test public void testHookSubscribeStart() { TestSubscriber ts = new TestSubscriber(); @@ -415,10 +445,9 @@ public void testHookSubscribeStart() { }); single.subscribe(ts); - verify(hookSpy, times(1)).onSubscribeStart(eq(single), any(Observable.OnSubscribe.class)); + verify(onStart, times(1)).call(eq(single), any(Observable.OnSubscribe.class)); } - @SuppressWarnings("unchecked") @Test public void testHookUnsafeSubscribeStart() { TestSubscriber ts = new TestSubscriber(); @@ -429,7 +458,7 @@ public void testHookUnsafeSubscribeStart() { }); single.unsafeSubscribe(ts); - verify(hookSpy, times(1)).onSubscribeStart(eq(single), any(Observable.OnSubscribe.class)); + verify(onStart, times(1)).call(eq(single), any(Observable.OnSubscribe.class)); } @Test @@ -443,7 +472,7 @@ public void testHookSubscribeReturn() { }); single.subscribe(ts); - verify(hookSpy, times(1)).onSubscribeReturn(any(SafeSubscriber.class)); + verify(onReturn, times(1)).call(any(SafeSubscriber.class)); } @Test @@ -457,7 +486,7 @@ public void testHookUnsafeSubscribeReturn() { }); single.unsafeSubscribe(ts); - verify(hookSpy, times(1)).onSubscribeReturn(ts); + verify(onReturn, times(1)).call(ts); } @Test diff --git a/src/test/java/rx/internal/util/ScalarSynchronousObservableTest.java b/src/test/java/rx/internal/util/ScalarSynchronousObservableTest.java index 6898fbd472..6c2386812b 100644 --- a/src/test/java/rx/internal/util/ScalarSynchronousObservableTest.java +++ b/src/test/java/rx/internal/util/ScalarSynchronousObservableTest.java @@ -235,19 +235,21 @@ public void onNext(Integer t) { ts.assertNotCompleted(); } + @SuppressWarnings("rawtypes") @Test public void hookCalled() { - RxJavaObservableExecutionHook save = ScalarSynchronousObservable.hook; + Func1 save = RxJavaHooks.getOnObservableCreate(); try { final AtomicInteger c = new AtomicInteger(); - ScalarSynchronousObservable.hook = new RxJavaObservableExecutionHook() { + + RxJavaHooks.setOnObservableCreate(new Func1() { @Override - public OnSubscribe onCreate(OnSubscribe f) { + public OnSubscribe call(OnSubscribe t) { c.getAndIncrement(); - return f; + return t; } - }; + }); int n = 10; @@ -257,22 +259,23 @@ public OnSubscribe onCreate(OnSubscribe f) { Assert.assertEquals(n, c.get()); } finally { - ScalarSynchronousObservable.hook = save; + RxJavaHooks.setOnObservableCreate(save); } } + @SuppressWarnings("rawtypes") @Test public void hookChangesBehavior() { - RxJavaObservableExecutionHook save = ScalarSynchronousObservable.hook; + Func1 save = RxJavaHooks.getOnObservableCreate(); try { - ScalarSynchronousObservable.hook = new RxJavaObservableExecutionHook() { + RxJavaHooks.setOnObservableCreate(new Func1() { @Override - public OnSubscribe onCreate(OnSubscribe f) { + public OnSubscribe call(OnSubscribe f) { if (f instanceof ScalarSynchronousObservable.JustOnSubscribe) { - final T v = ((ScalarSynchronousObservable.JustOnSubscribe) f).value; - return new OnSubscribe() { + final Object v = ((ScalarSynchronousObservable.JustOnSubscribe) f).value; + return new OnSubscribe() { @Override - public void call(Subscriber t) { + public void call(Subscriber t) { t.onNext(v); t.onNext(v); t.onCompleted(); @@ -281,7 +284,7 @@ public void call(Subscriber t) { } return f; } - }; + }); TestSubscriber ts = new TestSubscriber(); @@ -292,7 +295,7 @@ public void call(Subscriber t) { ts.assertCompleted(); } finally { - ScalarSynchronousObservable.hook = save; + RxJavaHooks.setOnObservableCreate(save); } } diff --git a/src/test/java/rx/observers/SafeSubscriberTest.java b/src/test/java/rx/observers/SafeSubscriberTest.java index c11c2b37a0..30d4ee905b 100644 --- a/src/test/java/rx/observers/SafeSubscriberTest.java +++ b/src/test/java/rx/observers/SafeSubscriberTest.java @@ -18,7 +18,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.lang.reflect.Method; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; @@ -36,20 +35,13 @@ import rx.plugins.RxJavaPlugins; import rx.subscriptions.Subscriptions; +@SuppressWarnings("deprecation") public class SafeSubscriberTest { @Before @After public void resetBefore() { - RxJavaPlugins ps = RxJavaPlugins.getInstance(); - - try { - Method m = ps.getClass().getDeclaredMethod("reset"); - m.setAccessible(true); - m.invoke(ps); - } catch (Throwable ex) { - ex.printStackTrace(); - } + RxJavaPlugins.getInstance().reset(); } @Test diff --git a/src/test/java/rx/plugins/RxJavaHooksTest.java b/src/test/java/rx/plugins/RxJavaHooksTest.java new file mode 100644 index 0000000000..2072e72d49 --- /dev/null +++ b/src/test/java/rx/plugins/RxJavaHooksTest.java @@ -0,0 +1,165 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.plugins; + +import java.lang.reflect.Method; + +import org.junit.*; + +import rx.*; +import rx.exceptions.*; +import rx.functions.*; +import rx.internal.util.UtilityFunctions; +import rx.observers.TestSubscriber; + +public class RxJavaHooksTest { + + static Observable createObservable() { + return Observable.range(1, 5).map(new Func1() { + @Override + public Integer call(Integer t) { + throw new TestException(); + } + }); + } + + @Test + public void assemblyTrackingObservable() { + RxJavaHooks.enableAssemblyTracking(); + try { + TestSubscriber ts = TestSubscriber.create(); + + createObservable().subscribe(ts); + + ts.assertError(AssemblyStackTraceException.class); + + Throwable ex = ts.getOnErrorEvents().get(0); + + Assert.assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException); + + Assert.assertTrue("" + ex, ex instanceof AssemblyStackTraceException); + + Assert.assertTrue(ex.getMessage(), ex.getMessage().contains("createObservable")); + } finally { + RxJavaHooks.resetAssemblyTracking(); + } + } + + static Single createSingle() { + return Single.just(1).map(new Func1() { + @Override + public Integer call(Integer t) { + throw new TestException(); + } + }); + } + + @Test + public void assemblyTrackingSingle() { + RxJavaHooks.enableAssemblyTracking(); + try { + TestSubscriber ts = TestSubscriber.create(); + + createSingle().subscribe(ts); + + ts.assertError(AssemblyStackTraceException.class); + + Throwable ex = ts.getOnErrorEvents().get(0); + + Assert.assertTrue("" + ex, ex instanceof AssemblyStackTraceException); + + Assert.assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException); + + Assert.assertTrue(ex.getMessage(), ex.getMessage().contains("createSingle")); + } finally { + RxJavaHooks.resetAssemblyTracking(); + } + } + + static Completable createCompletable() { + return Completable.error(new Func0() { + @Override + public Throwable call() { + return new TestException(); + } + }); + } + + @Test + public void assemblyTrackingCompletable() { + RxJavaHooks.enableAssemblyTracking(); + try { + TestSubscriber ts = TestSubscriber.create(); + + createCompletable().subscribe(ts); + + ts.assertError(AssemblyStackTraceException.class); + + Throwable ex = ts.getOnErrorEvents().get(0); + + Assert.assertTrue("" + ex, ex instanceof AssemblyStackTraceException); + + Assert.assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException); + + Assert.assertTrue(ex.getMessage(), ex.getMessage().contains("createCompletable")); + } finally { + RxJavaHooks.resetAssemblyTracking(); + } + } + + @SuppressWarnings("rawtypes") + @Test + public void lockdown() throws Exception { + RxJavaHooks.reset(); + RxJavaHooks.lockdown(); + try { + Action1 a1 = Actions.empty(); + Func1 f1 = UtilityFunctions.identity(); + Func2 f2 = new Func2() { + @Override + public Object call(Object t1, Object t2) { + return t2; + } + }; + + for (Method m : RxJavaHooks.class.getMethods()) { + if (m.getName().startsWith("setOn")) { + + Method getter = RxJavaHooks.class.getMethod("get" + m.getName().substring(3)); + + Object before = getter.invoke(null); + + if (m.getParameterTypes()[0].isAssignableFrom(Func1.class)) { + m.invoke(null, f1); + } else + if (m.getParameterTypes()[0].isAssignableFrom(Action1.class)) { + m.invoke(null, a1); + } else { + m.invoke(null, f2); + } + + Object after = getter.invoke(null); + + Assert.assertSame(m.toString(), before, after); + } + } + + } finally { + RxJavaHooks.lockdown = false; + RxJavaHooks.reset(); + } + } +} diff --git a/src/test/java/rx/plugins/RxJavaPluginsTest.java b/src/test/java/rx/plugins/RxJavaPluginsTest.java index 9c471cfa9c..b4171ea11a 100644 --- a/src/test/java/rx/plugins/RxJavaPluginsTest.java +++ b/src/test/java/rx/plugins/RxJavaPluginsTest.java @@ -28,6 +28,7 @@ import rx.exceptions.OnErrorThrowable; import rx.functions.Func1; +@SuppressWarnings("deprecation") public class RxJavaPluginsTest { @Before diff --git a/src/test/java/rx/schedulers/ResetSchedulersTest.java b/src/test/java/rx/schedulers/ResetSchedulersTest.java index 79c9f9435b..2fc2c4eaa4 100644 --- a/src/test/java/rx/schedulers/ResetSchedulersTest.java +++ b/src/test/java/rx/schedulers/ResetSchedulersTest.java @@ -11,6 +11,7 @@ public class ResetSchedulersTest { + @SuppressWarnings("deprecation") @Test public void reset() { RxJavaPlugins.getInstance().reset(); From 99b256c05e3a2dd5a5bd22280ac4b294b5189baf Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 22 Jun 2016 14:46:27 +0200 Subject: [PATCH 2/2] Add missing javadoc --- src/main/java/rx/plugins/RxJavaHooks.java | 533 ++++++++++++++++++---- 1 file changed, 439 insertions(+), 94 deletions(-) diff --git a/src/main/java/rx/plugins/RxJavaHooks.java b/src/main/java/rx/plugins/RxJavaHooks.java index bbdb4aaf99..a5b621ea19 100644 --- a/src/main/java/rx/plugins/RxJavaHooks.java +++ b/src/main/java/rx/plugins/RxJavaHooks.java @@ -15,6 +15,8 @@ */ package rx.plugins; +import java.lang.Thread.UncaughtExceptionHandler; + import rx.*; import rx.Completable.CompletableOnSubscribe; import rx.Observable.*; @@ -25,6 +27,9 @@ /** * Utility class that holds hooks for various Observable, Single and Completable lifecycle-related * points as well as Scheduler hooks. + *

+ * The class features a lockdown state, see {@link #lockdown()} and {@link #isLockdown()}, to + * prevent further changes to the hooks. */ @Experimental public final class RxJavaHooks { @@ -56,7 +61,6 @@ private RxJavaHooks() { static volatile Func2 onCompletableStart; - static volatile Func1 onComputationScheduler; static volatile Func1 onIOScheduler; @@ -225,6 +229,12 @@ public static void onError(Throwable ex) { current.getUncaughtExceptionHandler().uncaughtException(current, ex); } + /** + * Hook to call when an Observable is created. + * @param the value type + * @param onSubscribe the original OnSubscribe logic + * @return the original or replacement OnSubscribe instance + */ @SuppressWarnings({ "rawtypes", "unchecked" }) public static Observable.OnSubscribe onCreate(Observable.OnSubscribe onSubscribe) { Func1 f = onObservableCreate; @@ -234,6 +244,12 @@ public static Observable.OnSubscribe onCreate(Observable.OnSubscribe o return onSubscribe; } + /** + * Hook to call when a Single is created. + * @param the value type + * @param onSubscribe the original OnSubscribe logic + * @return the original or replacement OnSubscribe instance + */ @SuppressWarnings({ "rawtypes", "unchecked" }) public static Single.OnSubscribe onCreate(Single.OnSubscribe onSubscribe) { Func1 f = onSingleCreate; @@ -243,7 +259,12 @@ public static Single.OnSubscribe onCreate(Single.OnSubscribe onSubscri return onSubscribe; } - public static Completable.CompletableOnSubscribe onCreate(Completable.CompletableOnSubscribe onSubscribe) { + /** + * Hook to call when a Completable is created. + * @param onSubscribe the original OnSubscribe logic + * @return the original or replacement OnSubscribe instance + */ + public static Completable.CompletableOnSubscribe onCreate(Completable.CompletableOnSubscribe onSubscribe) { Func1 f = onCompletableCreate; if (f != null) { return f.call(onSubscribe); @@ -251,6 +272,11 @@ public static Completable.CompletableOnSubscribe onCreate(Completable.Comple return onSubscribe; } + /** + * Hook to call when the Schedulers.computation() is called. + * @param scheduler the default computation scheduler + * @return the default of alternative scheduler + */ public static Scheduler onComputationScheduler(Scheduler scheduler) { Func1 f = onComputationScheduler; if (f != null) { @@ -259,6 +285,11 @@ public static Scheduler onComputationScheduler(Scheduler scheduler) { return scheduler; } + /** + * Hook to call when the Schedulers.io() is called. + * @param scheduler the default io scheduler + * @return the default of alternative scheduler + */ public static Scheduler onIOScheduler(Scheduler scheduler) { Func1 f = onIOScheduler; if (f != null) { @@ -267,6 +298,11 @@ public static Scheduler onIOScheduler(Scheduler scheduler) { return scheduler; } + /** + * Hook to call when the Schedulers.newThread() is called. + * @param scheduler the default new thread scheduler + * @return the default of alternative scheduler + */ public static Scheduler onNewThreadScheduler(Scheduler scheduler) { Func1 f = onNewThreadScheduler; if (f != null) { @@ -275,6 +311,12 @@ public static Scheduler onNewThreadScheduler(Scheduler scheduler) { return scheduler; } + /** + * Hook to call before the action is scheduled, allows + * decorating the original action. + * @param action the original action + * @return the original or alternative action + */ public static Action0 onScheduledAction(Action0 action) { Func1 f = onScheduleAction; if (f != null) { @@ -282,67 +324,302 @@ public static Action0 onScheduledAction(Action0 action) { } return action; } + + /** + * Hook to call before the child subscriber is subscribed to the OnSubscribe action. + * @param the value type + * @param instance the parent Observable instance + * @param onSubscribe the original OnSubscribe action + * @return the original or alternative action that will be subscribed to + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static OnSubscribe onObservableStart(Observable instance, OnSubscribe onSubscribe) { + Func2 f = onObservableStart; + if (f != null) { + return f.call(instance, onSubscribe); + } + return onSubscribe; + } - public static void setOnCompletableCreate( - Func1 onCompletableCreate) { + /** + * Hook to call before the Observable.subscribe() method is about to return a Subscription. + * @param subscription the original subscription + * @return the original or alternative subscription that will be returned + */ + public static Subscription onObservableReturn(Subscription subscription) { + Func1 f = onObservableReturn; + if (f != null) { + return f.call(subscription); + } + return subscription; + } + + /** + * Hook to call if the Observable.subscribe() crashes for some reason. + * @param error the error + * @return the original error or alternative Throwable to be thrown + */ + public static Throwable onObservableError(Throwable error) { + // TODO add hook + return error; + } + + /** + * Hook to call before the child subscriber would subscribe to an Operator. + * @param the input value type + * @param the output value type + * @param operator the original operator + * @return the original or alternative operator that will be subscribed to + */ + public static Operator onObservableLift(Operator operator) { + // TODO add hook + return operator; + } + + /** + * Hook to call before the child subscriber is subscribed to the OnSubscribe action. + * @param the value type + * @param instance the parent Single instance + * @param onSubscribe the original OnSubscribe action + * @return the original or alternative action that will be subscribed to + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Observable.OnSubscribe onSingleStart(Single instance, Observable.OnSubscribe onSubscribe) { + Func2 f = onSingleStart; + if (f != null) { + return f.call(instance, onSubscribe); + } + return onSubscribe; + } + + /** + * Hook to call before the Single.subscribe() method is about to return a Subscription. + * @param subscription the original subscription + * @return the original or alternative subscription that will be returned + */ + public static Subscription onSingleReturn(Subscription subscription) { + Func1 f = onSingleReturn; + if (f != null) { + return f.call(subscription); + } + return subscription; + } + + /** + * Hook to call if the Single.subscribe() crashes for some reason. + * @param error the error + * @return the original error or alternative Throwable to be thrown + */ + public static Throwable onSingleError(Throwable error) { + // TODO add hook + return error; + } + + /** + * Hook to call before the child subscriber would subscribe to an Operator. + * @param the input value type + * @param the output value type + * @param operator the original operator + * @return the original or alternative operator that will be subscribed to + */ + public static Operator onSingleLift(Operator operator) { + // TODO add hook + return operator; + } + + /** + * Hook to call before the child subscriber is subscribed to the OnSubscribe action. + * @param the value type + * @param instance the parent Completable instance + * @param onSubscribe the original OnSubscribe action + * @return the original or alternative action that will be subscribed to + */ + public static Completable.CompletableOnSubscribe onCompletableStart(Completable instance, Completable.CompletableOnSubscribe onSubscribe) { + Func2 f = onCompletableStart; + if (f != null) { + return f.call(instance, onSubscribe); + } + return onSubscribe; + } + + /** + * Hook to call if the Completable.subscribe() crashes for some reason. + * @param error the error + * @return the original error or alternative Throwable to be thrown + */ + public static Throwable onCompletableError(Throwable error) { + // TODO add hook + return error; + } + + /** + * Hook to call before the child subscriber would subscribe to an Operator. + * @param the input value type + * @param the output value type + * @param operator the original operator + * @return the original or alternative operator that will be subscribed to + */ + public static Completable.CompletableOperator onCompletableLift(Completable.CompletableOperator operator) { + // TODO add hook + return operator; + } + + + /** + * Sets the global error consumer action unless a lockdown is in effect. + *

+ * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * errors are routed to the current thread's {@link UncaughtExceptionHandler}. + * @param onError the action that will receive undeliverable Throwables + */ + public static void setOnError(Action1 onError) { if (lockdown) { return; } - RxJavaHooks.onCompletableCreate = onCompletableCreate; + RxJavaHooks.onError = onError; } - public static void setOnComputationScheduler(Func1 onComputationScheduler) { + /** + * Sets the Completable's onCreate hook function unless a lockdown is in effect. + *

+ * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * the hook returns the same object. + * @param onCompletableCreate the function that takes the original CompletableOnSubscribe + * and should return a CompletableOnSubscribe. + */ + public static void setOnCompletableCreate( + Func1 onCompletableCreate) { if (lockdown) { return; } - RxJavaHooks.onComputationScheduler = onComputationScheduler; + RxJavaHooks.onCompletableCreate = onCompletableCreate; } - public static void setOnError(Action1 onError) { + /** + * Sets the Observable onCreate hook function unless a lockdown is in effect. + *

+ * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * the hook returns the same object. + * @param onObservableCreate the function that takes the original OnSubscribe + * and should return a OnSubscribe. + */ + @SuppressWarnings("rawtypes") + public static void setOnObservableCreate( + Func1 onObservableCreate) { if (lockdown) { return; } - RxJavaHooks.onError = onError; + RxJavaHooks.onObservableCreate = onObservableCreate; } - public static void setOnIOScheduler(Func1 onIOScheduler) { + /** + * Sets the Single onCreate hook function unless a lockdown is in effect. + *

+ * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * the hook returns the same object. + * @param onSingleCreate the function that takes the original OnSubscribe + * and should return a OnSubscribe. + */ + @SuppressWarnings("rawtypes") + public static void setOnSingleCreate(Func1 onSingleCreate) { if (lockdown) { return; } - RxJavaHooks.onIOScheduler = onIOScheduler; + RxJavaHooks.onSingleCreate = onSingleCreate; } - public static void setOnNewThreadScheduler(Func1 onNewThreadScheduler) { + /** + * Sets the hook function for returning a scheduler when the Schedulers.computation() is called + * unless a lockdown is in effect. + *

+ * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * the hook returns the same object. + * @param onComputationScheduler the function that receives the original computation scheduler + * and should return a scheduler. + */ + public static void setOnComputationScheduler(Func1 onComputationScheduler) { if (lockdown) { return; } - RxJavaHooks.onNewThreadScheduler = onNewThreadScheduler; + RxJavaHooks.onComputationScheduler = onComputationScheduler; } - @SuppressWarnings("rawtypes") - public static void setOnObservableCreate( - Func1 onObservableCreate) { + /** + * Sets the hook function for returning a scheduler when the Schedulers.io() is called + * unless a lockdown is in effect. + *

+ * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * the hook returns the same object. + * @param onIOScheduler the function that receives the original io scheduler + * and should return a scheduler. + */ + public static void setOnIOScheduler(Func1 onIOScheduler) { if (lockdown) { return; } - RxJavaHooks.onObservableCreate = onObservableCreate; + RxJavaHooks.onIOScheduler = onIOScheduler; } - public static void setOnScheduleAction(Func1 onScheduleAction) { + /** + * Sets the hook function for returning a scheduler when the Schedulers.newThread() is called + * unless a lockdown is in effect. + *

+ * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * the hook returns the same object. + * @param onNewThreadScheduler the function that receives the original new thread scheduler + * and should return a scheduler. + */ + public static void setOnNewThreadScheduler(Func1 onNewThreadScheduler) { if (lockdown) { return; } - RxJavaHooks.onScheduleAction = onScheduleAction; + RxJavaHooks.onNewThreadScheduler = onNewThreadScheduler; } - @SuppressWarnings("rawtypes") - public static void setOnSingleCreate(Func1 onSingleCreate) { + /** + * Sets the hook function that is called before an action is scheduled, allowing + * decorating that function, unless a lockdown is in effect. + *

+ * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * the hook returns the same object. + * @param onScheduleAction the function that receives the original action and should + * return an Action0. + */ + public static void setOnScheduleAction(Func1 onScheduleAction) { if (lockdown) { return; } - RxJavaHooks.onSingleCreate = onSingleCreate; + RxJavaHooks.onScheduleAction = onScheduleAction; } + /** + * Sets the hook function that is called when a subscriber subscribes to a Completable + * unless a lockdown is in effect. + *

+ * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * the hook returns the same CompletableOnSubscribe object. + * @param onCompletableStart the function that is called with the current Completable instance, + * its CompletableOnSubscribe function and should return a CompletableOnSubscribe function + * that gets actually subscribed to. + */ public static void setOnCompletableStart( Func2 onCompletableStart) { if (lockdown) { @@ -351,6 +628,18 @@ public static void setOnCompletableStart( RxJavaHooks.onCompletableStart = onCompletableStart; } + /** + * Sets the hook function that is called when a subscriber subscribes to a Observable + * unless a lockdown is in effect. + *

+ * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * the hook returns the same OnSubscribe object. + * @param onObservableStart the function that is called with the current Observable instance, + * its OnSubscribe function and should return a OnSubscribe function + * that gets actually subscribed to. + */ @SuppressWarnings("rawtypes") public static void setOnObservableStart( Func2 onObservableStart) { @@ -360,6 +649,18 @@ public static void setOnObservableStart( RxJavaHooks.onObservableStart = onObservableStart; } + /** + * Sets the hook function that is called when a subscriber subscribes to a Single + * unless a lockdown is in effect. + *

+ * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * the hook returns the same OnSubscribe object. + * @param onSingleStart the function that is called with the current Single instance, + * its OnSubscribe function and should return a OnSubscribe function + * that gets actually subscribed to. + */ @SuppressWarnings("rawtypes") public static void setOnSingleStart(Func2 onSingleStart) { if (lockdown) { @@ -368,6 +669,18 @@ public static void setOnSingleStart(Func2 + * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * the hook returns the same object. + * @param onObservableReturn the function that is called with the Subscriber that has been + * subscribed to the OnSubscribe function and returns a Subscription that will be returned by + * subscribe(). + */ public static void setOnObservableReturn(Func1 onObservableReturn) { if (lockdown) { return; @@ -375,6 +688,18 @@ public static void setOnObservableReturn(Func1 onObs RxJavaHooks.onObservableReturn = onObservableReturn; } + /** + * Sets a hook function that is called when the Single.subscribe() call + * is about to return a Subscription unless a lockdown is in effect. + *

+ * This operation is threadsafe. + *

+ * Calling with a {@code null} parameter restores the default behavior: + * the hook returns the same object. + * @param onSingleReturn the function that is called with the SingleSubscriber that has been + * subscribed to the OnSubscribe function and returns a Subscription that will be returned by + * subscribe(). + */ public static void setOnSingleReturn(Func1 onSingleReturn) { if (lockdown) { return; @@ -382,134 +707,154 @@ public static void setOnSingleReturn(Func1 onSingleR RxJavaHooks.onSingleReturn = onSingleReturn; } + /** + * Returns the current computation scheduler hook function or null if it is + * set to the default pass-through. + *

+ * This operation is threadsafe. + * @return the current hook function + */ public static Func1 getOnComputationScheduler() { return onComputationScheduler; } + /** + * Returns the current global error handler hook action or null if it is + * set to the default one that signals errors to the current threads + * UncaughtExceptionHandler. + *

+ * This operation is threadsafe. + * @return the current hook action + */ public static Action1 getOnError() { return onError; } + /** + * Returns the current io scheduler hook function or null if it is + * set to the default pass-through. + *

+ * This operation is threadsafe. + * @return the current hook function + */ public static Func1 getOnIOScheduler() { return onIOScheduler; } + /** + * Returns the current new thread scheduler hook function or null if it is + * set to the default pass-through. + *

+ * This operation is threadsafe. + * @return the current hook function + */ public static Func1 getOnNewThreadScheduler() { return onNewThreadScheduler; } + /** + * Returns the current Observable onCreate hook function or null if it is + * set to the default pass-through. + *

+ * This operation is threadsafe. + * @return the current hook function + */ @SuppressWarnings("rawtypes") public static Func1 getOnObservableCreate() { return onObservableCreate; } + /** + * Returns the current schedule action hook function or null if it is + * set to the default pass-through. + *

+ * This operation is threadsafe. + * @return the current hook function + */ public static Func1 getOnScheduleAction() { return onScheduleAction; } + /** + * Returns the current Single onCreate hook function or null if it is + * set to the default pass-through. + *

+ * This operation is threadsafe. + * @return the current hook function + */ @SuppressWarnings("rawtypes") public static Func1 getOnSingleCreate() { return onSingleCreate; } + /** + * Returns the current Completable onCreate hook function or null if it is + * set to the default pass-through. + *

+ * This operation is threadsafe. + * @return the current hook function + */ public static Func1 getOnCompletableCreate() { return onCompletableCreate; } + /** + * Returns the current Completable onStart hook function or null if it is + * set to the default pass-through. + *

+ * This operation is threadsafe. + * @return the current hook function + */ public static Func2 getOnCompletableStart() { return onCompletableStart; } + /** + * Returns the current Observable onStart hook function or null if it is + * set to the default pass-through. + *

+ * This operation is threadsafe. + * @return the current hook function + */ @SuppressWarnings("rawtypes") public static Func2 getOnObservableStart() { return onObservableStart; } + /** + * Returns the current Single onStart hook function or null if it is + * set to the default pass-through. + *

+ * This operation is threadsafe. + * @return the current hook function + */ @SuppressWarnings("rawtypes") public static Func2 getOnSingleStart() { return onSingleStart; } - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static OnSubscribe onObservableStart(Observable instance, OnSubscribe onSubscribe) { - Func2 f = onObservableStart; - if (f != null) { - return f.call(instance, onSubscribe); - } - return onSubscribe; - } - + /** + * Returns the current Observable onReturn hook function or null if it is + * set to the default pass-through. + *

+ * This operation is threadsafe. + * @return the current hook function + */ public static Func1 getOnObservableReturn() { return onObservableReturn; } + /** + * Returns the current Single onReturn hook function or null if it is + * set to the default pass-through. + *

+ * This operation is threadsafe. + * @return the current hook function + */ public static Func1 getOnSingleReturn() { return onSingleReturn; } - - public static Subscription onObservableReturn(Subscription subscription) { - Func1 f = onObservableReturn; - if (f != null) { - return f.call(subscription); - } - return subscription; - } - public static Throwable onObservableError(Throwable error) { - // TODO add hook - return error; - } - - public static Operator onObservableLift(Operator operator) { - // TODO add hook - return operator; - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static Observable.OnSubscribe onSingleStart(Single instance, Observable.OnSubscribe onSubscribe) { - Func2 f = onSingleStart; - if (f != null) { - return f.call(instance, onSubscribe); - } - return onSubscribe; - } - - public static Subscription onSingleReturn(Subscription subscription) { - Func1 f = onSingleReturn; - if (f != null) { - return f.call(subscription); - } - return subscription; - } - - public static Throwable onSingleError(Throwable error) { - // TODO add hook - return error; - } - - public static Operator onSingleLift(Operator operator) { - // TODO add hook - return operator; - } - - public static Completable.CompletableOnSubscribe onCompletableStart(Completable instance, Completable.CompletableOnSubscribe onSubscribe) { - Func2 f = onCompletableStart; - if (f != null) { - return f.call(instance, onSubscribe); - } - return onSubscribe; - } - - public static Throwable onCompletableError(Throwable error) { - // TODO add hook - return error; - } - - public static Completable.CompletableOperator onCompletableLift(Completable.CompletableOperator operator) { - // TODO add hook - return operator; - } - /** * Resets the assembly tracking hooks to their default delegates to * RxJavaPlugins.