- * If this Completable emits an error, it is sent to ERROR_HANDLER.handleError and gets swallowed.
+ * If this Completable emits an error, it is sent to RxJavaHooks.onError and gets swallowed.
* @param onComplete the runnable called when this Completable completes normally
* @return the Subscription that allows cancelling the subscription
*/
@@ -1902,7 +1896,7 @@ public void onCompleted() {
try {
onComplete.call();
} catch (Throwable e) {
- ERROR_HANDLER.handleError(e);
+ RxJavaHooks.onError(e);
deliverUncaughtException(e);
} finally {
mad.unsubscribe();
@@ -1912,7 +1906,7 @@ public void onCompleted() {
@Override
public void onError(Throwable e) {
- ERROR_HANDLER.handleError(e);
+ RxJavaHooks.onError(e);
mad.unsubscribe();
deliverUncaughtException(e);
}
@@ -1961,7 +1955,7 @@ public void onError(Throwable e) {
done = true;
callOnError(e);
} else {
- ERROR_HANDLER.handleError(e);
+ RxJavaHooks.onError(e);
deliverUncaughtException(e);
}
}
@@ -1971,7 +1965,7 @@ void callOnError(Throwable e) {
onError.call(e);
} catch (Throwable ex) {
e = new CompositeException(Arrays.asList(e, ex));
- ERROR_HANDLER.handleError(e);
+ RxJavaHooks.onError(e);
deliverUncaughtException(e);
} finally {
mad.unsubscribe();
@@ -2000,15 +1994,15 @@ private static void deliverUncaughtException(Throwable e) {
public final void unsafeSubscribe(CompletableSubscriber s) {
requireNonNull(s);
try {
- CompletableOnSubscribe onSubscribeDecorated = HOOK.onSubscribeStart(this, this.onSubscribe);
+ CompletableOnSubscribe onSubscribeDecorated = RxJavaHooks.onCompletableStart(this, this.onSubscribe);
onSubscribeDecorated.call(s);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
- ex = HOOK.onSubscribeError(ex);
- ERROR_HANDLER.handleError(ex);
+ ex = RxJavaHooks.onCompletableError(ex);
+ RxJavaHooks.onError(ex);
throw toNpe(ex);
}
}
@@ -2066,13 +2060,13 @@ public void onSubscribe(Subscription d) {
s.add(d);
}
});
- RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(s);
+ RxJavaHooks.onObservableReturn(s);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
- ex = HOOK.onSubscribeError(ex);
- ERROR_HANDLER.handleError(ex);
+ ex = RxJavaHooks.onObservableError(ex);
+ RxJavaHooks.onError(ex);
throw toNpe(ex);
}
}
diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java
index b48299cd65..85d28438cd 100644
--- a/src/main/java/rx/Observable.java
+++ b/src/main/java/rx/Observable.java
@@ -59,8 +59,6 @@ protected Observable(OnSubscribe f) {
this.onSubscribe = f;
}
- static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
-
/**
* Returns an Observable that will execute the specified function when a {@link Subscriber} subscribes to
* it.
@@ -91,7 +89,7 @@ protected Observable(OnSubscribe f) {
* @see ReactiveX operators documentation: Create
*/
public static Observable create(OnSubscribe f) {
- return new Observable(hook.onCreate(f));
+ return new Observable(RxJavaHooks.onCreate(f));
}
/**
@@ -128,7 +126,7 @@ public static Observable create(OnSubscribe f) {
*/
@Beta
public static Observable create(SyncOnSubscribe syncOnSubscribe) {
- return new Observable(hook.onCreate(syncOnSubscribe));
+ return create((OnSubscribe)syncOnSubscribe);
}
/**
@@ -164,7 +162,7 @@ public static Observable create(SyncOnSubscribe syncOnSubscribe)
*/
@Experimental
public static Observable create(AsyncOnSubscribe asyncOnSubscribe) {
- return new Observable(hook.onCreate(asyncOnSubscribe));
+ return create((OnSubscribe)asyncOnSubscribe);
}
/**
@@ -238,7 +236,7 @@ public void call(Subscriber super T> subscriber) {
* @see RxJava wiki: Implementing Your Own Operators
*/
public final Observable lift(final Operator extends R, ? super T> operator) {
- return new Observable(new OnSubscribeLift(onSubscribe, operator));
+ return create(new OnSubscribeLift(onSubscribe, operator));
}
/**
@@ -8663,21 +8661,21 @@ public final Subscription unsafeSubscribe(Subscriber super T> subscriber) {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
- hook.onSubscribeStart(this, onSubscribe).call(subscriber);
- return hook.onSubscribeReturn(subscriber);
+ RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
+ return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
- subscriber.onError(hook.onSubscribeError(e));
+ subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
- hook.onSubscribeError(r);
+ RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
@@ -8756,25 +8754,25 @@ static Subscription subscribe(Subscriber super T> subscriber, Observable {
* {@code OnExecute} to be executed when {@code execute(SingleSubscriber)} or
* {@code subscribe(Subscriber)} is called
*/
- protected Single(final OnSubscribe f) {
+ protected Single(OnSubscribe f) {
+ final OnSubscribe g = RxJavaHooks.onCreate(f);
// bridge between OnSubscribe (which all Operators and Observables use) and OnExecute (for Single)
this.onSubscribe = new Observable.OnSubscribe() {
@@ -91,18 +87,16 @@ public void onError(Throwable error) {
};
child.add(ss);
- f.call(ss);
+ g.call(ss);
}
};
}
private Single(final Observable.OnSubscribe f) {
- this.onSubscribe = f;
+ this.onSubscribe = RxJavaHooks.onCreate(f);
}
- static RxJavaSingleExecutionHook hook = RxJavaPlugins.getInstance().getSingleExecutionHook();
-
/**
* Returns a Single that will execute the specified function when a {@link SingleSubscriber} executes it or
* a {@link Subscriber} subscribes to it.
@@ -130,7 +124,7 @@ private Single(final Observable.OnSubscribe f) {
* @see ReactiveX operators documentation: Create
*/
public static Single create(OnSubscribe f) {
- return new Single(hook.onCreate(f));
+ return new Single(f);
}
/**
@@ -170,7 +164,7 @@ public final Single lift(final Operator extends R, ? super T> lift) {
@Override
public void call(Subscriber super R> o) {
try {
- final Subscriber super T> st = hook.onLift(lift).call(o);
+ final Subscriber super T> st = RxJavaHooks.onSingleLift(lift).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
@@ -1709,21 +1703,21 @@ public final Subscription unsafeSubscribe(Subscriber super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
- hook.onSubscribeStart(this, onSubscribe).call(subscriber);
- return hook.onSubscribeReturn(subscriber);
+ RxJavaHooks.onSingleStart(this, onSubscribe).call(subscriber);
+ return RxJavaHooks.onSingleReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
- subscriber.onError(hook.onSubscribeError(e));
+ subscriber.onError(RxJavaHooks.onSingleError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
- hook.onSubscribeError(r);
+ RxJavaHooks.onSingleError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
@@ -1819,21 +1813,21 @@ public final Subscription subscribe(Subscriber super T> subscriber) {
// The code below is exactly the same an unsafeSubscribe but not used because it would add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
- hook.onSubscribeStart(this, onSubscribe).call(subscriber);
- return hook.onSubscribeReturn(subscriber);
+ RxJavaHooks.onSingleStart(this, onSubscribe).call(subscriber);
+ return RxJavaHooks.onSingleReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
- subscriber.onError(hook.onSubscribeError(e));
+ subscriber.onError(RxJavaHooks.onSingleError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
- hook.onSubscribeError(r);
+ RxJavaHooks.onSingleError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
diff --git a/src/main/java/rx/exceptions/AssemblyStackTraceException.java b/src/main/java/rx/exceptions/AssemblyStackTraceException.java
new file mode 100644
index 0000000000..ee6a8be6a9
--- /dev/null
+++ b/src/main/java/rx/exceptions/AssemblyStackTraceException.java
@@ -0,0 +1,52 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.exceptions;
+
+import rx.annotations.Experimental;
+
+/**
+ * A RuntimeException that is stackless but holds onto a textual
+ * stacktrace from tracking the assembly location of operators.
+ */
+@Experimental
+public final class AssemblyStackTraceException extends RuntimeException {
+
+ /** */
+ private static final long serialVersionUID = 2038859767182585852L;
+
+ /**
+ * Constructs an AssemblyStackTraceException with the given message and
+ * a cause.
+ * @param message the message
+ * @param cause the cause
+ */
+ public AssemblyStackTraceException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs an AssemblyStackTraceException with the given message.
+ * @param message the message
+ */
+ public AssemblyStackTraceException(String message) {
+ super(message);
+ }
+
+ @Override
+ public synchronized Throwable fillInStackTrace() {
+ return this;
+ }
+}
diff --git a/src/main/java/rx/exceptions/OnErrorThrowable.java b/src/main/java/rx/exceptions/OnErrorThrowable.java
index 52ce45ed2a..2ef465141b 100644
--- a/src/main/java/rx/exceptions/OnErrorThrowable.java
+++ b/src/main/java/rx/exceptions/OnErrorThrowable.java
@@ -191,6 +191,7 @@ static String renderValue(Object value){
return ((Enum>) value).name();
}
+ @SuppressWarnings("deprecation")
String pluggedRendering = RxJavaPlugins.getInstance().getErrorHandler().handleOnNextValueRendering(value);
if (pluggedRendering != null) {
return pluggedRendering;
diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java
index bccea15cc4..96cccc23f7 100644
--- a/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java
+++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java
@@ -22,7 +22,7 @@
import rx.Completable.*;
import rx.exceptions.MissingBackpressureException;
import rx.internal.util.unsafe.SpscArrayQueue;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.SerialSubscription;
public final class CompletableOnSubscribeConcat implements CompletableOnSubscribe {
@@ -87,7 +87,7 @@ public void onError(Throwable t) {
actual.onError(t);
return;
}
- RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
+ RxJavaHooks.onError(t);
}
@Override
@@ -125,7 +125,7 @@ void next() {
}
return;
}
- RxJavaPlugins.getInstance().getErrorHandler().handleError(new IllegalStateException("Queue is empty?!"));
+ RxJavaHooks.onError(new IllegalStateException("Queue is empty?!"));
return;
}
diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java
index 059e193da7..48dce53c9c 100644
--- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java
+++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java
@@ -22,9 +22,9 @@
import rx.*;
import rx.Completable.*;
-import rx.exceptions.CompositeException;
import rx.Observable;
-import rx.plugins.RxJavaPlugins;
+import rx.exceptions.CompositeException;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.CompositeSubscription;
public final class CompletableOnSubscribeMerge implements CompletableOnSubscribe {
@@ -110,7 +110,7 @@ public void onSubscribe(Subscription d) {
@Override
public void onError(Throwable e) {
if (innerDone) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
return;
}
innerDone = true;
@@ -145,7 +145,7 @@ public void onCompleted() {
@Override
public void onError(Throwable t) {
if (done) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
+ RxJavaHooks.onError(t);
return;
}
getOrCreateErrors().offer(t);
@@ -172,7 +172,7 @@ void terminate() {
if (once.compareAndSet(false, true)) {
actual.onError(e);
} else {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
}
}
} else
@@ -183,7 +183,7 @@ void terminate() {
if (once.compareAndSet(false, true)) {
actual.onError(e);
} else {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
}
}
}
diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java
index f78c960430..d80f50abf6 100644
--- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java
+++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java
@@ -20,7 +20,7 @@
import rx.*;
import rx.Completable.*;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.CompositeSubscription;
public final class CompletableOnSubscribeMergeArray implements CompletableOnSubscribe {
@@ -50,7 +50,7 @@ public void call(final CompletableSubscriber s) {
s.onError(npe);
return;
} else {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(npe);
+ RxJavaHooks.onError(npe);
}
}
@@ -66,7 +66,7 @@ public void onError(Throwable e) {
if (once.compareAndSet(false, true)) {
s.onError(e);
} else {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
}
}
diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java
index e13670cd20..fed111a6c7 100644
--- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java
+++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java
@@ -21,7 +21,7 @@
import rx.*;
import rx.Completable.*;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.CompositeSubscription;
public final class CompletableOnSubscribeMergeIterable implements CompletableOnSubscribe {
@@ -66,7 +66,7 @@ public void call(final CompletableSubscriber s) {
if (once.compareAndSet(false, true)) {
s.onError(e);
} else {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
}
return;
}
@@ -88,7 +88,7 @@ public void call(final CompletableSubscriber s) {
if (once.compareAndSet(false, true)) {
s.onError(e);
} else {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
}
return;
}
@@ -103,7 +103,7 @@ public void call(final CompletableSubscriber s) {
if (once.compareAndSet(false, true)) {
s.onError(npe);
} else {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(npe);
+ RxJavaHooks.onError(npe);
}
return;
}
@@ -122,7 +122,7 @@ public void onError(Throwable e) {
if (once.compareAndSet(false, true)) {
s.onError(e);
} else {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
}
}
diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeTimeout.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeTimeout.java
index b62c62274d..672cfad61b 100644
--- a/src/main/java/rx/internal/operators/CompletableOnSubscribeTimeout.java
+++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeTimeout.java
@@ -22,7 +22,7 @@
import rx.*;
import rx.Completable.*;
import rx.functions.Action0;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.CompositeSubscription;
public final class CompletableOnSubscribeTimeout implements CompletableOnSubscribe {
@@ -98,7 +98,7 @@ public void onError(Throwable e) {
set.unsubscribe();
s.onError(e);
} else {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
}
}
diff --git a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java
index 10b4b582cd..e9dac2d026 100644
--- a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java
+++ b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java
@@ -23,7 +23,7 @@
import rx.functions.FuncN;
import rx.internal.util.RxRingBuffer;
import rx.internal.util.atomic.SpscLinkedArrayQueue;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
public final class OnSubscribeCombineLatest implements OnSubscribe {
final Observable extends T>[] sources;
@@ -386,7 +386,7 @@ public void onNext(T t) {
@Override
public void onError(Throwable t) {
if (done) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
+ RxJavaHooks.onError(t);
return;
}
parent.onError(t);
diff --git a/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java b/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java
index c2799df758..fba34e2f6b 100644
--- a/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java
+++ b/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java
@@ -28,7 +28,7 @@
import rx.internal.util.atomic.SpscAtomicArrayQueue;
import rx.internal.util.unsafe.*;
import rx.observers.SerializedSubscriber;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.SerialSubscription;
/**
@@ -210,7 +210,7 @@ void innerCompleted(long produced) {
}
void pluginError(Throwable e) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
}
void drain() {
diff --git a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java
index dc3146d7e6..d25d940d61 100644
--- a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java
+++ b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionOther.java
@@ -56,7 +56,7 @@ public void onNext(U t) {
@Override
public void onError(Throwable e) {
if (done) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
return;
}
done = true;
diff --git a/src/main/java/rx/internal/operators/OnSubscribeLift.java b/src/main/java/rx/internal/operators/OnSubscribeLift.java
index ba6210f38b..65f6f20d14 100644
--- a/src/main/java/rx/internal/operators/OnSubscribeLift.java
+++ b/src/main/java/rx/internal/operators/OnSubscribeLift.java
@@ -29,8 +29,6 @@
*/
public final class OnSubscribeLift implements OnSubscribe {
- static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
-
final OnSubscribe parent;
final Operator extends R, ? super T> operator;
@@ -43,7 +41,7 @@ public OnSubscribeLift(OnSubscribe parent, Operator extends R, ? super T> o
@Override
public void call(Subscriber super R> o) {
try {
- Subscriber super T> st = hook.onLift(operator).call(o);
+ Subscriber super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java
new file mode 100644
index 0000000000..fd338c1ece
--- /dev/null
+++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssembly.java
@@ -0,0 +1,128 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.internal.operators;
+
+import rx.*;
+import rx.Observable.OnSubscribe;
+import rx.exceptions.AssemblyStackTraceException;
+
+/**
+ * Captures the current stack when it is instantiated, makes
+ * it available through a field and attaches it to all
+ * passing exception.
+ *
+ * @param the value type
+ */
+public final class OnSubscribeOnAssembly implements OnSubscribe {
+
+ final OnSubscribe source;
+
+ final String stacktrace;
+
+ /**
+ * If set to true, the creation of PublisherOnAssembly will capture the raw
+ * stacktrace instead of the sanitized version.
+ */
+ public static volatile boolean fullStackTrace;
+
+ public OnSubscribeOnAssembly(OnSubscribe source) {
+ this.source = source;
+ this.stacktrace = createStacktrace();
+ }
+
+ static String createStacktrace() {
+ StackTraceElement[] stes = Thread.currentThread().getStackTrace();
+
+ StringBuilder sb = new StringBuilder("Assembly trace:");
+
+ for (StackTraceElement e : stes) {
+ String row = e.toString();
+ if (!fullStackTrace) {
+ if (e.getLineNumber() <= 1) {
+ continue;
+ }
+ if (row.contains("RxJavaHooks.")) {
+ continue;
+ }
+ if (row.contains("OnSubscribeOnAssembly")) {
+ continue;
+ }
+ if (row.contains(".junit.runner")) {
+ continue;
+ }
+ if (row.contains(".junit4.runner")) {
+ continue;
+ }
+ if (row.contains(".junit.internal")) {
+ continue;
+ }
+ if (row.contains("sun.reflect")) {
+ continue;
+ }
+ if (row.contains("java.lang.Thread.")) {
+ continue;
+ }
+ if (row.contains("ThreadPoolExecutor")) {
+ continue;
+ }
+ if (row.contains("org.apache.catalina.")) {
+ continue;
+ }
+ if (row.contains("org.apache.tomcat.")) {
+ continue;
+ }
+ }
+ sb.append("\n at ").append(row);
+ }
+
+ return sb.append("\nOriginal exception:").toString();
+ }
+
+ @Override
+ public void call(Subscriber super T> t) {
+ source.call(new OnAssemblySubscriber(t, stacktrace));
+ }
+
+ static final class OnAssemblySubscriber extends Subscriber {
+
+ final Subscriber super T> actual;
+
+ final String stacktrace;
+
+ public OnAssemblySubscriber(Subscriber super T> actual, String stacktrace) {
+ super(actual);
+ this.actual = actual;
+ this.stacktrace = stacktrace;
+ }
+
+ @Override
+ public void onCompleted() {
+ actual.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ e = new AssemblyStackTraceException(stacktrace, e);
+ actual.onError(e);
+ }
+
+ @Override
+ public void onNext(T t) {
+ actual.onNext(t);
+ }
+
+ }
+}
diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java
new file mode 100644
index 0000000000..da5a144045
--- /dev/null
+++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblyCompletable.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.internal.operators;
+
+import rx.*;
+import rx.Completable.CompletableSubscriber;
+import rx.exceptions.AssemblyStackTraceException;
+
+/**
+ * Captures the current stack when it is instantiated, makes
+ * it available through a field and attaches it to all
+ * passing exception.
+ *
+ * @param the value type
+ */
+public final class OnSubscribeOnAssemblyCompletable implements Completable.CompletableOnSubscribe {
+
+ final Completable.CompletableOnSubscribe source;
+
+ final String stacktrace;
+
+ /**
+ * If set to true, the creation of PublisherOnAssembly will capture the raw
+ * stacktrace instead of the sanitized version.
+ */
+ public static volatile boolean fullStackTrace;
+
+ public OnSubscribeOnAssemblyCompletable(Completable.CompletableOnSubscribe source) {
+ this.source = source;
+ this.stacktrace = OnSubscribeOnAssembly.createStacktrace();
+ }
+
+ @Override
+ public void call(Completable.CompletableSubscriber t) {
+ source.call(new OnAssemblyCompletableSubscriber(t, stacktrace));
+ }
+
+ static final class OnAssemblyCompletableSubscriber implements CompletableSubscriber {
+
+ final CompletableSubscriber actual;
+
+ final String stacktrace;
+
+ public OnAssemblyCompletableSubscriber(CompletableSubscriber actual, String stacktrace) {
+ this.actual = actual;
+ this.stacktrace = stacktrace;
+ }
+
+ @Override
+ public void onSubscribe(Subscription d) {
+ actual.onSubscribe(d);
+ }
+
+ @Override
+ public void onCompleted() {
+ actual.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ e = new AssemblyStackTraceException(stacktrace, e);
+ actual.onError(e);
+ }
+ }
+}
diff --git a/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java
new file mode 100644
index 0000000000..aeed623a7b
--- /dev/null
+++ b/src/main/java/rx/internal/operators/OnSubscribeOnAssemblySingle.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.internal.operators;
+
+import rx.*;
+import rx.exceptions.AssemblyStackTraceException;
+
+/**
+ * Captures the current stack when it is instantiated, makes
+ * it available through a field and attaches it to all
+ * passing exception.
+ *
+ * @param the value type
+ */
+public final class OnSubscribeOnAssemblySingle implements Single.OnSubscribe {
+
+ final Single.OnSubscribe source;
+
+ final String stacktrace;
+
+ /**
+ * If set to true, the creation of PublisherOnAssembly will capture the raw
+ * stacktrace instead of the sanitized version.
+ */
+ public static volatile boolean fullStackTrace;
+
+ public OnSubscribeOnAssemblySingle(Single.OnSubscribe source) {
+ this.source = source;
+ this.stacktrace = OnSubscribeOnAssembly.createStacktrace();
+ }
+
+ @Override
+ public void call(SingleSubscriber super T> t) {
+ source.call(new OnAssemblySingleSubscriber(t, stacktrace));
+ }
+
+ static final class OnAssemblySingleSubscriber extends SingleSubscriber {
+
+ final SingleSubscriber super T> actual;
+
+ final String stacktrace;
+
+ public OnAssemblySingleSubscriber(SingleSubscriber super T> actual, String stacktrace) {
+ this.actual = actual;
+ this.stacktrace = stacktrace;
+ actual.add(this);
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ e = new AssemblyStackTraceException(stacktrace, e);
+ actual.onError(e);
+ }
+
+ @Override
+ public void onSuccess(T t) {
+ actual.onSuccess(t);
+ }
+
+ }
+}
diff --git a/src/main/java/rx/internal/operators/OperatorDoAfterTerminate.java b/src/main/java/rx/internal/operators/OperatorDoAfterTerminate.java
index 64afca478a..a3efa8f845 100644
--- a/src/main/java/rx/internal/operators/OperatorDoAfterTerminate.java
+++ b/src/main/java/rx/internal/operators/OperatorDoAfterTerminate.java
@@ -19,7 +19,7 @@
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
/**
* Registers an action to be called after an Observable invokes {@code onComplete} or {@code onError}.
@@ -73,7 +73,7 @@ void callAction() {
action.call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
- RxJavaPlugins.getInstance().getErrorHandler().handleError(ex);
+ RxJavaHooks.onError(ex);
}
}
};
diff --git a/src/main/java/rx/internal/operators/OperatorGroupBy.java b/src/main/java/rx/internal/operators/OperatorGroupBy.java
index bf474619bb..4ed1a504f3 100644
--- a/src/main/java/rx/internal/operators/OperatorGroupBy.java
+++ b/src/main/java/rx/internal/operators/OperatorGroupBy.java
@@ -25,7 +25,7 @@
import rx.internal.producers.ProducerArbiter;
import rx.internal.util.*;
import rx.observables.GroupedObservable;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.Subscriptions;
/**
@@ -196,7 +196,7 @@ public void onNext(T t) {
@Override
public void onError(Throwable t) {
if (done) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
+ RxJavaHooks.onError(t);
return;
}
error = t;
diff --git a/src/main/java/rx/internal/operators/OperatorMaterialize.java b/src/main/java/rx/internal/operators/OperatorMaterialize.java
index 75700b8d11..4d01dadf39 100644
--- a/src/main/java/rx/internal/operators/OperatorMaterialize.java
+++ b/src/main/java/rx/internal/operators/OperatorMaterialize.java
@@ -17,11 +17,9 @@
import java.util.concurrent.atomic.AtomicLong;
-import rx.Notification;
+import rx.*;
import rx.Observable.Operator;
-import rx.Producer;
-import rx.Subscriber;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
/**
* Turns all of the notifications from an Observable into {@code onNext} emissions, and marks
@@ -104,7 +102,7 @@ public void onCompleted() {
@Override
public void onError(Throwable e) {
terminalNotification = Notification.createOnError(e);
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
drain();
}
diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java
index 10ad74e4e2..42fc5518ae 100644
--- a/src/main/java/rx/internal/operators/OperatorObserveOn.java
+++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java
@@ -23,10 +23,10 @@
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.internal.schedulers.*;
-import rx.internal.util.*;
+import rx.internal.util.RxRingBuffer;
import rx.internal.util.atomic.SpscAtomicArrayQueue;
import rx.internal.util.unsafe.*;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
import rx.schedulers.Schedulers;
/**
@@ -177,7 +177,7 @@ public void onCompleted() {
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
return;
}
error = e;
diff --git a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java b/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java
index 7f5b22c0a2..d78b75ce57 100644
--- a/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java
+++ b/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java
@@ -20,7 +20,7 @@
import rx.exceptions.Exceptions;
import rx.functions.Func1;
import rx.internal.producers.ProducerArbiter;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.SerialSubscription;
/**
@@ -105,7 +105,7 @@ public void onCompleted() {
public void onError(Throwable e) {
if (done) {
Exceptions.throwIfFatal(e);
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
return;
}
done = true;
diff --git a/src/main/java/rx/internal/operators/OperatorSwitch.java b/src/main/java/rx/internal/operators/OperatorSwitch.java
index bbbcd9879d..c76e3008c9 100644
--- a/src/main/java/rx/internal/operators/OperatorSwitch.java
+++ b/src/main/java/rx/internal/operators/OperatorSwitch.java
@@ -25,7 +25,7 @@
import rx.functions.Action0;
import rx.internal.util.RxRingBuffer;
import rx.internal.util.atomic.SpscLinkedArrayQueue;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.*;
/**
@@ -238,7 +238,7 @@ void complete(long id) {
}
void pluginError(Throwable e) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
}
void innerProducer(Producer p, long id) {
diff --git a/src/main/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOther.java b/src/main/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOther.java
index 41d0383ac9..92452706cb 100644
--- a/src/main/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOther.java
+++ b/src/main/java/rx/internal/operators/SingleOnSubscribeDelaySubscriptionOther.java
@@ -16,11 +16,8 @@
package rx.internal.operators;
-import rx.Observable;
-import rx.Single;
-import rx.SingleSubscriber;
-import rx.Subscriber;
-import rx.plugins.RxJavaPlugins;
+import rx.*;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.SerialSubscription;
/**
@@ -66,7 +63,7 @@ public void onNext(Object t) {
@Override
public void onError(Throwable e) {
if (done) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
return;
}
done = true;
diff --git a/src/main/java/rx/internal/operators/SingleOnSubscribeUsing.java b/src/main/java/rx/internal/operators/SingleOnSubscribeUsing.java
index 54cdfe5c24..9d7319724c 100644
--- a/src/main/java/rx/internal/operators/SingleOnSubscribeUsing.java
+++ b/src/main/java/rx/internal/operators/SingleOnSubscribeUsing.java
@@ -21,7 +21,7 @@
import rx.*;
import rx.exceptions.*;
import rx.functions.*;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
/**
* Generates a resource, derives a Single from it and disposes that resource once the
@@ -91,7 +91,7 @@ public void onSuccess(T value) {
disposeAction.call(resource);
} catch (Throwable ex2) {
Exceptions.throwIfFatal(ex2);
- RxJavaPlugins.getInstance().getErrorHandler().handleError(ex2);
+ RxJavaHooks.onError(ex2);
}
}
}
@@ -125,7 +125,7 @@ void handleSubscriptionTimeError(SingleSubscriber super T> t, Resource resourc
disposeAction.call(resource);
} catch (Throwable ex2) {
Exceptions.throwIfFatal(ex2);
- RxJavaPlugins.getInstance().getErrorHandler().handleError(ex2);
+ RxJavaHooks.onError(ex2);
}
}
}
diff --git a/src/main/java/rx/internal/operators/SingleOperatorZip.java b/src/main/java/rx/internal/operators/SingleOperatorZip.java
index 7614a6a6c5..93afe523d3 100644
--- a/src/main/java/rx/internal/operators/SingleOperatorZip.java
+++ b/src/main/java/rx/internal/operators/SingleOperatorZip.java
@@ -16,17 +16,15 @@
package rx.internal.operators;
-import rx.Single;
-import rx.SingleSubscriber;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.*;
+
+import rx.*;
import rx.exceptions.Exceptions;
import rx.functions.FuncN;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.CompositeSubscription;
-import java.util.NoSuchElementException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
public class SingleOperatorZip {
public static Single zip(final Single extends T>[] singles, final FuncN extends R> zipper) {
@@ -75,7 +73,7 @@ public void onError(Throwable error) {
if (once.compareAndSet(false, true)) {
subscriber.onError(error);
} else {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(error);
+ RxJavaHooks.onError(error);
}
}
};
diff --git a/src/main/java/rx/internal/schedulers/ExecutorScheduler.java b/src/main/java/rx/internal/schedulers/ExecutorScheduler.java
index a57ee8c938..b4bcf19d7f 100644
--- a/src/main/java/rx/internal/schedulers/ExecutorScheduler.java
+++ b/src/main/java/rx/internal/schedulers/ExecutorScheduler.java
@@ -20,7 +20,7 @@
import rx.*;
import rx.functions.Action0;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.*;
/**
@@ -79,7 +79,7 @@ public Subscription schedule(Action0 action) {
tasks.remove(ea);
wip.decrementAndGet();
// report the error to the plugin
- RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
+ RxJavaHooks.onError(t);
// throw it to the caller
throw t;
}
@@ -158,7 +158,7 @@ public void call() {
ea.add(f);
} catch (RejectedExecutionException t) {
// report the rejection to plugins
- RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
+ RxJavaHooks.onError(t);
throw t;
}
diff --git a/src/main/java/rx/internal/schedulers/NewThreadWorker.java b/src/main/java/rx/internal/schedulers/NewThreadWorker.java
index 585d27b3b6..6521aadf46 100644
--- a/src/main/java/rx/internal/schedulers/NewThreadWorker.java
+++ b/src/main/java/rx/internal/schedulers/NewThreadWorker.java
@@ -34,7 +34,6 @@
*/
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
private final ScheduledExecutorService executor;
- private final RxJavaSchedulersHook schedulersHook;
volatile boolean isUnsubscribed;
/** The purge frequency in milliseconds. */
private static final String FREQUENCY_KEY = "rx.scheduler.jdk6.purge-frequency-millis";
@@ -112,7 +111,7 @@ static void purgeExecutors() {
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
- RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
+ RxJavaHooks.onError(t);
}
}
@@ -168,7 +167,7 @@ public static boolean tryEnableCancelPolicy(ScheduledExecutorService executor) {
methodToCall.invoke(executor, true);
return true;
} catch (Exception e) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
}
}
}
@@ -207,7 +206,6 @@ public NewThreadWorker(ThreadFactory threadFactory) {
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
- schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
executor = exec;
}
@@ -233,7 +231,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
* @return the wrapper ScheduledAction
*/
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
- Action0 decoratedAction = schedulersHook.onSchedule(action);
+ Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future> f;
if (delayTime <= 0) {
@@ -246,7 +244,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
return run;
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
- Action0 decoratedAction = schedulersHook.onSchedule(action);
+ Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
parent.add(run);
@@ -262,7 +260,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
- Action0 decoratedAction = schedulersHook.onSchedule(action);
+ Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
parent.add(run);
diff --git a/src/main/java/rx/internal/schedulers/ScheduledAction.java b/src/main/java/rx/internal/schedulers/ScheduledAction.java
index 0f7d145a20..6ba16beaee 100644
--- a/src/main/java/rx/internal/schedulers/ScheduledAction.java
+++ b/src/main/java/rx/internal/schedulers/ScheduledAction.java
@@ -22,7 +22,7 @@
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.internal.util.SubscriptionList;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.CompositeSubscription;
/**
@@ -61,7 +61,7 @@ public void run() {
} else {
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
}
- RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
+ RxJavaHooks.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
} finally {
diff --git a/src/main/java/rx/internal/util/RxJavaPluginUtils.java b/src/main/java/rx/internal/util/RxJavaPluginUtils.java
index b6b462412c..4e459130e5 100644
--- a/src/main/java/rx/internal/util/RxJavaPluginUtils.java
+++ b/src/main/java/rx/internal/util/RxJavaPluginUtils.java
@@ -15,13 +15,13 @@
*/
package rx.internal.util;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
public final class RxJavaPluginUtils {
public static void handleException(Throwable e) {
try {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
} catch (Throwable pluginException) {
handlePluginException(pluginException);
}
diff --git a/src/main/java/rx/internal/util/ScalarSynchronousObservable.java b/src/main/java/rx/internal/util/ScalarSynchronousObservable.java
index 676527f954..8c56935596 100644
--- a/src/main/java/rx/internal/util/ScalarSynchronousObservable.java
+++ b/src/main/java/rx/internal/util/ScalarSynchronousObservable.java
@@ -34,14 +34,6 @@
* @param the value type
*/
public final class ScalarSynchronousObservable extends Observable {
- /**
- * The execution hook instance.
- *
- * Can't be final to allow tests overriding it in place; if the class
- * has been initialized, the plugin reset has no effect because
- * how RxJavaPlugins was designed.
- */
- static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
/**
* Indicates that the Producer used by this Observable should be fully
* threadsafe. It is possible, but unlikely that multiple concurrent
@@ -81,7 +73,7 @@ public static ScalarSynchronousObservable create(T t) {
final T t;
protected ScalarSynchronousObservable(final T t) {
- super(hook.onCreate(new JustOnSubscribe(t)));
+ super(RxJavaHooks.onCreate(new JustOnSubscribe(t)));
this.t = t;
}
diff --git a/src/main/java/rx/observables/AsyncOnSubscribe.java b/src/main/java/rx/observables/AsyncOnSubscribe.java
index 9b8926622f..a32175fd9c 100644
--- a/src/main/java/rx/observables/AsyncOnSubscribe.java
+++ b/src/main/java/rx/observables/AsyncOnSubscribe.java
@@ -27,7 +27,7 @@
import rx.functions.*;
import rx.internal.operators.BufferUntilSubscriber;
import rx.observers.SerializedObserver;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
import rx.subscriptions.CompositeSubscription;
/**
@@ -554,7 +554,7 @@ boolean tryEmit(long n) {
private void handleThrownError(Throwable ex) {
if (hasTerminated) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(ex);
+ RxJavaHooks.onError(ex);
} else {
hasTerminated = true;
merger.onError(ex);
diff --git a/src/main/java/rx/observables/SyncOnSubscribe.java b/src/main/java/rx/observables/SyncOnSubscribe.java
index 461abe28d4..34745fb118 100644
--- a/src/main/java/rx/observables/SyncOnSubscribe.java
+++ b/src/main/java/rx/observables/SyncOnSubscribe.java
@@ -18,20 +18,13 @@
import java.util.concurrent.atomic.AtomicLong;
+import rx.*;
import rx.Observable.OnSubscribe;
-import rx.Observer;
-import rx.Producer;
-import rx.Subscriber;
-import rx.Subscription;
import rx.annotations.Beta;
import rx.exceptions.Exceptions;
-import rx.functions.Action0;
-import rx.functions.Action1;
-import rx.functions.Action2;
-import rx.functions.Func0;
-import rx.functions.Func2;
+import rx.functions.*;
import rx.internal.operators.BackpressureUtils;
-import rx.plugins.RxJavaPlugins;
+import rx.plugins.RxJavaHooks;
/**
* A utility class to create {@code OnSubscribe} functions that respond correctly to back
@@ -387,7 +380,7 @@ private void doUnsubscribe() {
parent.onUnsubscribe(state);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
}
}
@@ -422,7 +415,7 @@ private void fastpath() {
private void handleThrownError(Subscriber super T> a, Throwable ex) {
if (hasTerminated) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(ex);
+ RxJavaHooks.onError(ex);
} else {
hasTerminated = true;
a.onError(ex);
diff --git a/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java b/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java
index 87160f8326..287289242d 100644
--- a/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java
+++ b/src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java
@@ -50,6 +50,7 @@ public abstract class RxJavaCompletableExecutionHook {
* @return {@link rx.Completable.CompletableOnSubscribe} function that can be modified, decorated, replaced or just
* returned as a pass through
*/
+ @Deprecated
public Completable.CompletableOnSubscribe onCreate(Completable.CompletableOnSubscribe f) {
return f;
}
@@ -66,6 +67,7 @@ public Completable.CompletableOnSubscribe onCreate(Completable.CompletableOnSubs
* @return {@link rx.Completable.CompletableOnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
* returned as a pass through
*/
+ @Deprecated
public Completable.CompletableOnSubscribe onSubscribeStart(Completable completableInstance, final Completable.CompletableOnSubscribe onSubscribe) {
// pass through by default
return onSubscribe;
@@ -81,6 +83,7 @@ public Completable.CompletableOnSubscribe onSubscribeStart(Completable completab
* Throwable thrown by {@link Completable#subscribe(Subscriber)}
* @return Throwable that can be decorated, replaced or just returned as a pass through
*/
+ @Deprecated
public Throwable onSubscribeError(Throwable e) {
// pass through by default
return e;
@@ -98,6 +101,7 @@ public Throwable onSubscribeError(Throwable e) {
* @return {@link rx.Completable.CompletableOperator}{@code } function that can be modified, decorated, replaced or just
* returned as a pass through
*/
+ @Deprecated
public Completable.CompletableOperator onLift(final Completable.CompletableOperator lift) {
return lift;
}
diff --git a/src/main/java/rx/plugins/RxJavaErrorHandler.java b/src/main/java/rx/plugins/RxJavaErrorHandler.java
index a6e56475ed..8107fab457 100644
--- a/src/main/java/rx/plugins/RxJavaErrorHandler.java
+++ b/src/main/java/rx/plugins/RxJavaErrorHandler.java
@@ -43,6 +43,7 @@ public abstract class RxJavaErrorHandler {
* @param e
* the {@code Exception}
*/
+ @Deprecated
public void handleError(Throwable e) {
// do nothing by default
}
@@ -57,7 +58,7 @@ public void handleError(Throwable e) {
* Note that primitive types are always rendered as their {@code toString()} value.
*
* If a {@code Throwable} is caught when rendering, this will fallback to the item's classname suffixed by
- * {@value #ERROR_IN_RENDERING_SUFFIX}.
+ * {@code ERROR_IN_RENDERING_SUFFIX}.
*
* @param item the last emitted item, that caused the exception wrapped in
* {@code OnErrorThrowable.OnNextValue}
diff --git a/src/main/java/rx/plugins/RxJavaHooks.java b/src/main/java/rx/plugins/RxJavaHooks.java
new file mode 100644
index 0000000000..a5b621ea19
--- /dev/null
+++ b/src/main/java/rx/plugins/RxJavaHooks.java
@@ -0,0 +1,944 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.plugins;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+
+import rx.*;
+import rx.Completable.CompletableOnSubscribe;
+import rx.Observable.*;
+import rx.annotations.Experimental;
+import rx.functions.*;
+import rx.internal.operators.*;
+
+/**
+ * Utility class that holds hooks for various Observable, Single and Completable lifecycle-related
+ * points as well as Scheduler hooks.
+ *
+ * The class features a lockdown state, see {@link #lockdown()} and {@link #isLockdown()}, to
+ * prevent further changes to the hooks.
+ */
+@Experimental
+public final class RxJavaHooks {
+ /** Utility class. */
+ private RxJavaHooks() {
+ throw new IllegalStateException("No instances!");
+ }
+
+ /**
+ * Prevents changing the hook callbacks when set to true.
+ */
+ /* test */ static volatile boolean lockdown;
+
+ static volatile Action1 onError;
+
+ @SuppressWarnings("rawtypes")
+ static volatile Func1 onObservableCreate;
+
+ @SuppressWarnings("rawtypes")
+ static volatile Func1 onSingleCreate;
+
+ static volatile Func1 onCompletableCreate;
+
+ @SuppressWarnings("rawtypes")
+ static volatile Func2 onObservableStart;
+
+ @SuppressWarnings("rawtypes")
+ static volatile Func2 onSingleStart;
+
+ static volatile Func2 onCompletableStart;
+
+ static volatile Func1 onComputationScheduler;
+
+ static volatile Func1 onIOScheduler;
+
+ static volatile Func1 onNewThreadScheduler;
+
+ static volatile Func1 onScheduleAction;
+
+ static volatile Func1 onObservableReturn;
+
+ static volatile Func1 onSingleReturn;
+
+ /** Initialize with the default delegation to the original RxJavaPlugins. */
+ static {
+ init();
+ }
+
+ /**
+ * Initialize the hooks via delegating to RxJavaPlugins.
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked", "deprecation"})
+ static void init() {
+ onError = new Action1() {
+ @Override
+ public void call(Throwable e) {
+ RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ }
+ };
+
+ RxJavaPlugins.getInstance().getObservableExecutionHook();
+
+ onObservableCreate = new Func1() {
+ @Override
+ public OnSubscribe call(OnSubscribe f) {
+ return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
+ }
+ };
+
+ onObservableStart = new Func2() {
+ @Override
+ public OnSubscribe call(Observable t1, OnSubscribe t2) {
+ return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
+ }
+ };
+
+ onObservableReturn = new Func1() {
+ @Override
+ public Subscription call(Subscription f) {
+ return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(f);
+ }
+ };
+
+ RxJavaPlugins.getInstance().getSingleExecutionHook();
+
+ onSingleCreate = new Func1() {
+ @Override
+ public rx.Single.OnSubscribe call(rx.Single.OnSubscribe f) {
+ return RxJavaPlugins.getInstance().getSingleExecutionHook().onCreate(f);
+ }
+ };
+
+ onSingleStart = new Func2() {
+ @Override
+ public Observable.OnSubscribe call(Single t1, Observable.OnSubscribe t2) {
+ return RxJavaPlugins.getInstance().getSingleExecutionHook().onSubscribeStart(t1, t2);
+ }
+ };
+
+ onSingleReturn = new Func1() {
+ @Override
+ public Subscription call(Subscription f) {
+ return RxJavaPlugins.getInstance().getSingleExecutionHook().onSubscribeReturn(f);
+ }
+ };
+
+ RxJavaPlugins.getInstance().getCompletableExecutionHook();
+
+ onCompletableCreate = new Func1() {
+ @Override
+ public CompletableOnSubscribe call(CompletableOnSubscribe f) {
+ return RxJavaPlugins.getInstance().getCompletableExecutionHook().onCreate(f);
+ }
+ };
+
+ onCompletableStart = new Func2() {
+ @Override
+ public Completable.CompletableOnSubscribe call(Completable t1, Completable.CompletableOnSubscribe t2) {
+ return RxJavaPlugins.getInstance().getCompletableExecutionHook().onSubscribeStart(t1, t2);
+ }
+ };
+
+ onScheduleAction = new Func1() {
+ @Override
+ public Action0 call(Action0 a) {
+ return RxJavaPlugins.getInstance().getSchedulersHook().onSchedule(a);
+ }
+ };
+ }
+
+ /**
+ * Reset all hook callbacks to those of the current RxJavaPlugins handlers.
+ *
+ * @see #clear()
+ */
+ public static void reset() {
+ if (lockdown) {
+ return;
+ }
+ init();
+ }
+
+ /**
+ * Clears all hooks to be no-operations (and passthroughs)
+ * and onError hook to signal errors to the caller thread's
+ * UncaughtExceptionHandler.
+ *
+ * @see #reset()
+ */
+ public static void clear() {
+ if (lockdown) {
+ return;
+ }
+ onError = null;
+
+ onObservableCreate = null;
+ onObservableStart = null;
+ onObservableReturn = null;
+
+ onSingleCreate = null;
+ onSingleStart = null;
+ onSingleReturn = null;
+
+ onCompletableCreate = null;
+ onCompletableStart = null;
+
+ onComputationScheduler = null;
+ onIOScheduler = null;
+ onNewThreadScheduler = null;
+ }
+
+ /**
+ * Prevents changing a hooks.
+ */
+ public static void lockdown() {
+ lockdown = true;
+ }
+
+ /**
+ * Returns true if the hooks can no longer be changed.
+ * @return true if the hooks can no longer be changed
+ */
+ public static boolean isLockdown() {
+ return lockdown;
+ }
+ /**
+ * Consume undeliverable Throwables (acts as a global catch).
+ * @param ex the exception to handle
+ */
+ public static void onError(Throwable ex) {
+ Action1 f = onError;
+ if (f != null) {
+ f.call(ex);
+ return;
+ }
+ Thread current = Thread.currentThread();
+ current.getUncaughtExceptionHandler().uncaughtException(current, ex);
+ }
+
+ /**
+ * Hook to call when an Observable is created.
+ * @param the value type
+ * @param onSubscribe the original OnSubscribe logic
+ * @return the original or replacement OnSubscribe instance
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static Observable.OnSubscribe onCreate(Observable.OnSubscribe onSubscribe) {
+ Func1 f = onObservableCreate;
+ if (f != null) {
+ return f.call(onSubscribe);
+ }
+ return onSubscribe;
+ }
+
+ /**
+ * Hook to call when a Single is created.
+ * @param the value type
+ * @param onSubscribe the original OnSubscribe logic
+ * @return the original or replacement OnSubscribe instance
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static Single.OnSubscribe onCreate(Single.OnSubscribe onSubscribe) {
+ Func1 f = onSingleCreate;
+ if (f != null) {
+ return f.call(onSubscribe);
+ }
+ return onSubscribe;
+ }
+
+ /**
+ * Hook to call when a Completable is created.
+ * @param onSubscribe the original OnSubscribe logic
+ * @return the original or replacement OnSubscribe instance
+ */
+ public static Completable.CompletableOnSubscribe onCreate(Completable.CompletableOnSubscribe onSubscribe) {
+ Func1 f = onCompletableCreate;
+ if (f != null) {
+ return f.call(onSubscribe);
+ }
+ return onSubscribe;
+ }
+
+ /**
+ * Hook to call when the Schedulers.computation() is called.
+ * @param scheduler the default computation scheduler
+ * @return the default of alternative scheduler
+ */
+ public static Scheduler onComputationScheduler(Scheduler scheduler) {
+ Func1 f = onComputationScheduler;
+ if (f != null) {
+ return f.call(scheduler);
+ }
+ return scheduler;
+ }
+
+ /**
+ * Hook to call when the Schedulers.io() is called.
+ * @param scheduler the default io scheduler
+ * @return the default of alternative scheduler
+ */
+ public static Scheduler onIOScheduler(Scheduler scheduler) {
+ Func1 f = onIOScheduler;
+ if (f != null) {
+ return f.call(scheduler);
+ }
+ return scheduler;
+ }
+
+ /**
+ * Hook to call when the Schedulers.newThread() is called.
+ * @param scheduler the default new thread scheduler
+ * @return the default of alternative scheduler
+ */
+ public static Scheduler onNewThreadScheduler(Scheduler scheduler) {
+ Func1 f = onNewThreadScheduler;
+ if (f != null) {
+ return f.call(scheduler);
+ }
+ return scheduler;
+ }
+
+ /**
+ * Hook to call before the action is scheduled, allows
+ * decorating the original action.
+ * @param action the original action
+ * @return the original or alternative action
+ */
+ public static Action0 onScheduledAction(Action0 action) {
+ Func1 f = onScheduleAction;
+ if (f != null) {
+ return f.call(action);
+ }
+ return action;
+ }
+
+ /**
+ * Hook to call before the child subscriber is subscribed to the OnSubscribe action.
+ * @param the value type
+ * @param instance the parent Observable instance
+ * @param onSubscribe the original OnSubscribe action
+ * @return the original or alternative action that will be subscribed to
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static OnSubscribe onObservableStart(Observable instance, OnSubscribe onSubscribe) {
+ Func2 f = onObservableStart;
+ if (f != null) {
+ return f.call(instance, onSubscribe);
+ }
+ return onSubscribe;
+ }
+
+ /**
+ * Hook to call before the Observable.subscribe() method is about to return a Subscription.
+ * @param subscription the original subscription
+ * @return the original or alternative subscription that will be returned
+ */
+ public static Subscription onObservableReturn(Subscription subscription) {
+ Func1 f = onObservableReturn;
+ if (f != null) {
+ return f.call(subscription);
+ }
+ return subscription;
+ }
+
+ /**
+ * Hook to call if the Observable.subscribe() crashes for some reason.
+ * @param error the error
+ * @return the original error or alternative Throwable to be thrown
+ */
+ public static Throwable onObservableError(Throwable error) {
+ // TODO add hook
+ return error;
+ }
+
+ /**
+ * Hook to call before the child subscriber would subscribe to an Operator.
+ * @param the input value type
+ * @param the output value type
+ * @param operator the original operator
+ * @return the original or alternative operator that will be subscribed to
+ */
+ public static Operator onObservableLift(Operator operator) {
+ // TODO add hook
+ return operator;
+ }
+
+ /**
+ * Hook to call before the child subscriber is subscribed to the OnSubscribe action.
+ * @param the value type
+ * @param instance the parent Single instance
+ * @param onSubscribe the original OnSubscribe action
+ * @return the original or alternative action that will be subscribed to
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static Observable.OnSubscribe onSingleStart(Single instance, Observable.OnSubscribe onSubscribe) {
+ Func2 f = onSingleStart;
+ if (f != null) {
+ return f.call(instance, onSubscribe);
+ }
+ return onSubscribe;
+ }
+
+ /**
+ * Hook to call before the Single.subscribe() method is about to return a Subscription.
+ * @param subscription the original subscription
+ * @return the original or alternative subscription that will be returned
+ */
+ public static Subscription onSingleReturn(Subscription subscription) {
+ Func1 f = onSingleReturn;
+ if (f != null) {
+ return f.call(subscription);
+ }
+ return subscription;
+ }
+
+ /**
+ * Hook to call if the Single.subscribe() crashes for some reason.
+ * @param error the error
+ * @return the original error or alternative Throwable to be thrown
+ */
+ public static Throwable onSingleError(Throwable error) {
+ // TODO add hook
+ return error;
+ }
+
+ /**
+ * Hook to call before the child subscriber would subscribe to an Operator.
+ * @param the input value type
+ * @param the output value type
+ * @param operator the original operator
+ * @return the original or alternative operator that will be subscribed to
+ */
+ public static Operator onSingleLift(Operator operator) {
+ // TODO add hook
+ return operator;
+ }
+
+ /**
+ * Hook to call before the child subscriber is subscribed to the OnSubscribe action.
+ * @param the value type
+ * @param instance the parent Completable instance
+ * @param onSubscribe the original OnSubscribe action
+ * @return the original or alternative action that will be subscribed to
+ */
+ public static Completable.CompletableOnSubscribe onCompletableStart(Completable instance, Completable.CompletableOnSubscribe onSubscribe) {
+ Func2 f = onCompletableStart;
+ if (f != null) {
+ return f.call(instance, onSubscribe);
+ }
+ return onSubscribe;
+ }
+
+ /**
+ * Hook to call if the Completable.subscribe() crashes for some reason.
+ * @param error the error
+ * @return the original error or alternative Throwable to be thrown
+ */
+ public static Throwable onCompletableError(Throwable error) {
+ // TODO add hook
+ return error;
+ }
+
+ /**
+ * Hook to call before the child subscriber would subscribe to an Operator.
+ * @param the input value type
+ * @param the output value type
+ * @param operator the original operator
+ * @return the original or alternative operator that will be subscribed to
+ */
+ public static Completable.CompletableOperator onCompletableLift(Completable.CompletableOperator operator) {
+ // TODO add hook
+ return operator;
+ }
+
+
+ /**
+ * Sets the global error consumer action unless a lockdown is in effect.
+ *
+ * This operation is threadsafe.
+ *
+ * Calling with a {@code null} parameter restores the default behavior:
+ * errors are routed to the current thread's {@link UncaughtExceptionHandler}.
+ * @param onError the action that will receive undeliverable Throwables
+ */
+ public static void setOnError(Action1 onError) {
+ if (lockdown) {
+ return;
+ }
+ RxJavaHooks.onError = onError;
+ }
+
+ /**
+ * Sets the Completable's onCreate hook function unless a lockdown is in effect.
+ *
+ * This operation is threadsafe.
+ *
+ * Calling with a {@code null} parameter restores the default behavior:
+ * the hook returns the same object.
+ * @param onCompletableCreate the function that takes the original CompletableOnSubscribe
+ * and should return a CompletableOnSubscribe.
+ */
+ public static void setOnCompletableCreate(
+ Func1 onCompletableCreate) {
+ if (lockdown) {
+ return;
+ }
+ RxJavaHooks.onCompletableCreate = onCompletableCreate;
+ }
+
+ /**
+ * Sets the Observable onCreate hook function unless a lockdown is in effect.
+ *
+ * This operation is threadsafe.
+ *
+ * Calling with a {@code null} parameter restores the default behavior:
+ * the hook returns the same object.
+ * @param onObservableCreate the function that takes the original OnSubscribe
+ * and should return a OnSubscribe.
+ */
+ @SuppressWarnings("rawtypes")
+ public static void setOnObservableCreate(
+ Func1 onObservableCreate) {
+ if (lockdown) {
+ return;
+ }
+ RxJavaHooks.onObservableCreate = onObservableCreate;
+ }
+
+ /**
+ * Sets the Single onCreate hook function unless a lockdown is in effect.
+ *
+ * This operation is threadsafe.
+ *
+ * Calling with a {@code null} parameter restores the default behavior:
+ * the hook returns the same object.
+ * @param onSingleCreate the function that takes the original OnSubscribe
+ * and should return a OnSubscribe.
+ */
+ @SuppressWarnings("rawtypes")
+ public static void setOnSingleCreate(Func1 onSingleCreate) {
+ if (lockdown) {
+ return;
+ }
+ RxJavaHooks.onSingleCreate = onSingleCreate;
+ }
+
+ /**
+ * Sets the hook function for returning a scheduler when the Schedulers.computation() is called
+ * unless a lockdown is in effect.
+ *
+ * This operation is threadsafe.
+ *
+ * Calling with a {@code null} parameter restores the default behavior:
+ * the hook returns the same object.
+ * @param onComputationScheduler the function that receives the original computation scheduler
+ * and should return a scheduler.
+ */
+ public static void setOnComputationScheduler(Func1 onComputationScheduler) {
+ if (lockdown) {
+ return;
+ }
+ RxJavaHooks.onComputationScheduler = onComputationScheduler;
+ }
+
+ /**
+ * Sets the hook function for returning a scheduler when the Schedulers.io() is called
+ * unless a lockdown is in effect.
+ *
+ * This operation is threadsafe.
+ *
+ * Calling with a {@code null} parameter restores the default behavior:
+ * the hook returns the same object.
+ * @param onIOScheduler the function that receives the original io scheduler
+ * and should return a scheduler.
+ */
+ public static void setOnIOScheduler(Func1 onIOScheduler) {
+ if (lockdown) {
+ return;
+ }
+ RxJavaHooks.onIOScheduler = onIOScheduler;
+ }
+
+ /**
+ * Sets the hook function for returning a scheduler when the Schedulers.newThread() is called
+ * unless a lockdown is in effect.
+ *
+ * This operation is threadsafe.
+ *
+ * Calling with a {@code null} parameter restores the default behavior:
+ * the hook returns the same object.
+ * @param onNewThreadScheduler the function that receives the original new thread scheduler
+ * and should return a scheduler.
+ */
+ public static void setOnNewThreadScheduler(Func1 onNewThreadScheduler) {
+ if (lockdown) {
+ return;
+ }
+ RxJavaHooks.onNewThreadScheduler = onNewThreadScheduler;
+ }
+
+ /**
+ * Sets the hook function that is called before an action is scheduled, allowing
+ * decorating that function, unless a lockdown is in effect.
+ *
+ * This operation is threadsafe.
+ *
+ * Calling with a {@code null} parameter restores the default behavior:
+ * the hook returns the same object.
+ * @param onScheduleAction the function that receives the original action and should
+ * return an Action0.
+ */
+ public static void setOnScheduleAction(Func1 onScheduleAction) {
+ if (lockdown) {
+ return;
+ }
+ RxJavaHooks.onScheduleAction = onScheduleAction;
+ }
+
+ /**
+ * Sets the hook function that is called when a subscriber subscribes to a Completable
+ * unless a lockdown is in effect.
+ *
+ * This operation is threadsafe.
+ *
+ * Calling with a {@code null} parameter restores the default behavior:
+ * the hook returns the same CompletableOnSubscribe object.
+ * @param onCompletableStart the function that is called with the current Completable instance,
+ * its CompletableOnSubscribe function and should return a CompletableOnSubscribe function
+ * that gets actually subscribed to.
+ */
+ public static void setOnCompletableStart(
+ Func2 onCompletableStart) {
+ if (lockdown) {
+ return;
+ }
+ RxJavaHooks.onCompletableStart = onCompletableStart;
+ }
+
+ /**
+ * Sets the hook function that is called when a subscriber subscribes to a Observable
+ * unless a lockdown is in effect.
+ *
+ * This operation is threadsafe.
+ *
+ * Calling with a {@code null} parameter restores the default behavior:
+ * the hook returns the same OnSubscribe object.
+ * @param onObservableStart the function that is called with the current Observable instance,
+ * its OnSubscribe function and should return a OnSubscribe function
+ * that gets actually subscribed to.
+ */
+ @SuppressWarnings("rawtypes")
+ public static void setOnObservableStart(
+ Func2 onObservableStart) {
+ if (lockdown) {
+ return;
+ }
+ RxJavaHooks.onObservableStart = onObservableStart;
+ }
+
+ /**
+ * Sets the hook function that is called when a subscriber subscribes to a Single
+ * unless a lockdown is in effect.
+ *
+ * This operation is threadsafe.
+ *
+ * Calling with a {@code null} parameter restores the default behavior:
+ * the hook returns the same OnSubscribe object.
+ * @param onSingleStart the function that is called with the current Single instance,
+ * its OnSubscribe function and should return a OnSubscribe function
+ * that gets actually subscribed to.
+ */
+ @SuppressWarnings("rawtypes")
+ public static void setOnSingleStart(Func2 onSingleStart) {
+ if (lockdown) {
+ return;
+ }
+ RxJavaHooks.onSingleStart = onSingleStart;
+ }
+
+ /**
+ * Sets a hook function that is called when the Observable.subscribe() call
+ * is about to return a Subscription unless a lockdown is in effect.
+ *
+ * This operation is threadsafe.
+ *
+ * Calling with a {@code null} parameter restores the default behavior:
+ * the hook returns the same object.
+ * @param onObservableReturn the function that is called with the Subscriber that has been
+ * subscribed to the OnSubscribe function and returns a Subscription that will be returned by
+ * subscribe().
+ */
+ public static void setOnObservableReturn(Func1 onObservableReturn) {
+ if (lockdown) {
+ return;
+ }
+ RxJavaHooks.onObservableReturn = onObservableReturn;
+ }
+
+ /**
+ * Sets a hook function that is called when the Single.subscribe() call
+ * is about to return a Subscription unless a lockdown is in effect.
+ *
+ * This operation is threadsafe.
+ *
+ * Calling with a {@code null} parameter restores the default behavior:
+ * the hook returns the same object.
+ * @param onSingleReturn the function that is called with the SingleSubscriber that has been
+ * subscribed to the OnSubscribe function and returns a Subscription that will be returned by
+ * subscribe().
+ */
+ public static void setOnSingleReturn(Func1 onSingleReturn) {
+ if (lockdown) {
+ return;
+ }
+ RxJavaHooks.onSingleReturn = onSingleReturn;
+ }
+
+ /**
+ * Returns the current computation scheduler hook function or null if it is
+ * set to the default pass-through.
+ *
+ * This operation is threadsafe.
+ * @return the current hook function
+ */
+ public static Func1 getOnComputationScheduler() {
+ return onComputationScheduler;
+ }
+
+ /**
+ * Returns the current global error handler hook action or null if it is
+ * set to the default one that signals errors to the current threads
+ * UncaughtExceptionHandler.
+ *
+ * This operation is threadsafe.
+ * @return the current hook action
+ */
+ public static Action1 getOnError() {
+ return onError;
+ }
+
+ /**
+ * Returns the current io scheduler hook function or null if it is
+ * set to the default pass-through.
+ *
+ * This operation is threadsafe.
+ * @return the current hook function
+ */
+ public static Func1 getOnIOScheduler() {
+ return onIOScheduler;
+ }
+
+ /**
+ * Returns the current new thread scheduler hook function or null if it is
+ * set to the default pass-through.
+ *
+ * This operation is threadsafe.
+ * @return the current hook function
+ */
+ public static Func1 getOnNewThreadScheduler() {
+ return onNewThreadScheduler;
+ }
+
+ /**
+ * Returns the current Observable onCreate hook function or null if it is
+ * set to the default pass-through.
+ *
+ * This operation is threadsafe.
+ * @return the current hook function
+ */
+ @SuppressWarnings("rawtypes")
+ public static Func1 getOnObservableCreate() {
+ return onObservableCreate;
+ }
+
+ /**
+ * Returns the current schedule action hook function or null if it is
+ * set to the default pass-through.
+ *
+ * This operation is threadsafe.
+ * @return the current hook function
+ */
+ public static Func1 getOnScheduleAction() {
+ return onScheduleAction;
+ }
+
+ /**
+ * Returns the current Single onCreate hook function or null if it is
+ * set to the default pass-through.
+ *
+ * This operation is threadsafe.
+ * @return the current hook function
+ */
+ @SuppressWarnings("rawtypes")
+ public static Func1 getOnSingleCreate() {
+ return onSingleCreate;
+ }
+
+ /**
+ * Returns the current Completable onCreate hook function or null if it is
+ * set to the default pass-through.
+ *
+ * This operation is threadsafe.
+ * @return the current hook function
+ */
+ public static Func1 getOnCompletableCreate() {
+ return onCompletableCreate;
+ }
+
+ /**
+ * Returns the current Completable onStart hook function or null if it is
+ * set to the default pass-through.
+ *
+ * This operation is threadsafe.
+ * @return the current hook function
+ */
+ public static Func2 getOnCompletableStart() {
+ return onCompletableStart;
+ }
+
+ /**
+ * Returns the current Observable onStart hook function or null if it is
+ * set to the default pass-through.
+ *
+ * This operation is threadsafe.
+ * @return the current hook function
+ */
+ @SuppressWarnings("rawtypes")
+ public static Func2 getOnObservableStart() {
+ return onObservableStart;
+ }
+
+ /**
+ * Returns the current Single onStart hook function or null if it is
+ * set to the default pass-through.
+ *
+ * This operation is threadsafe.
+ * @return the current hook function
+ */
+ @SuppressWarnings("rawtypes")
+ public static Func2 getOnSingleStart() {
+ return onSingleStart;
+ }
+
+ /**
+ * Returns the current Observable onReturn hook function or null if it is
+ * set to the default pass-through.
+ *
+ * This operation is threadsafe.
+ * @return the current hook function
+ */
+ public static Func1 getOnObservableReturn() {
+ return onObservableReturn;
+ }
+
+ /**
+ * Returns the current Single onReturn hook function or null if it is
+ * set to the default pass-through.
+ *
+ * This operation is threadsafe.
+ * @return the current hook function
+ */
+ public static Func1 getOnSingleReturn() {
+ return onSingleReturn;
+ }
+
+ /**
+ * Resets the assembly tracking hooks to their default delegates to
+ * RxJavaPlugins.
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+ public static void resetAssemblyTracking() {
+ if (lockdown) {
+ return;
+ }
+
+ RxJavaPlugins plugin = RxJavaPlugins.getInstance();
+
+ final RxJavaObservableExecutionHook observableExecutionHook = plugin.getObservableExecutionHook();
+
+ onObservableCreate = new Func1() {
+ @Override
+ public OnSubscribe call(OnSubscribe f) {
+ return observableExecutionHook.onCreate(f);
+ }
+ };
+
+ final RxJavaSingleExecutionHook singleExecutionHook = plugin.getSingleExecutionHook();
+
+ onSingleCreate = new Func1() {
+ @Override
+ public rx.Single.OnSubscribe call(rx.Single.OnSubscribe f) {
+ return singleExecutionHook.onCreate(f);
+ }
+ };
+
+ final RxJavaCompletableExecutionHook completableExecutionHook = plugin.getCompletableExecutionHook();
+
+ onCompletableCreate = new Func1() {
+ @Override
+ public CompletableOnSubscribe call(CompletableOnSubscribe f) {
+ return completableExecutionHook.onCreate(f);
+ }
+ };
+
+ }
+
+ /**
+ * Clears the assembly tracking hooks to their default pass-through behavior.
+ */
+ public static void crearAssemblyTracking() {
+ if (lockdown) {
+ return;
+ }
+ onObservableCreate = null;
+ onSingleCreate = null;
+ onCompletableCreate = null;
+ }
+
+ /**
+ * Sets up hooks that capture the current stacktrace when a source or an
+ * operator is instantiated, keeping it in a field for debugging purposes
+ * and alters exceptions passign along to hold onto this stacktrace.
+ */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public static void enableAssemblyTracking() {
+ if (lockdown) {
+ return;
+ }
+
+ onObservableCreate = new Func1() {
+ @Override
+ public OnSubscribe call(OnSubscribe f) {
+ return new OnSubscribeOnAssembly(f);
+ }
+ };
+
+ onSingleCreate = new Func1() {
+ @Override
+ public rx.Single.OnSubscribe call(rx.Single.OnSubscribe f) {
+ return new OnSubscribeOnAssemblySingle(f);
+ }
+ };
+
+ onCompletableCreate = new Func1() {
+ @Override
+ public CompletableOnSubscribe call(CompletableOnSubscribe f) {
+ return new OnSubscribeOnAssemblyCompletable(f);
+ }
+ };
+
+ }
+}
diff --git a/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java b/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java
index c7c085c7fe..8c08851ed5 100644
--- a/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java
+++ b/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java
@@ -52,6 +52,7 @@ public abstract class RxJavaObservableExecutionHook {
* @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
* returned as a pass through
*/
+ @Deprecated
public OnSubscribe onCreate(OnSubscribe f) {
return f;
}
@@ -69,6 +70,7 @@ public OnSubscribe onCreate(OnSubscribe f) {
* @return {@link OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
* returned as a pass through
*/
+ @Deprecated
public OnSubscribe onSubscribeStart(Observable extends T> observableInstance, final OnSubscribe onSubscribe) {
// pass through by default
return onSubscribe;
@@ -87,6 +89,7 @@ public OnSubscribe onSubscribeStart(Observable extends T> observableIns
* @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a
* pass through
*/
+ @Deprecated
public Subscription onSubscribeReturn(Subscription subscription) {
// pass through by default
return subscription;
@@ -103,6 +106,7 @@ public Subscription onSubscribeReturn(Subscription subscription) {
* Throwable thrown by {@link Observable#subscribe(Subscriber)}
* @return Throwable that can be decorated, replaced or just returned as a pass through
*/
+ @Deprecated
public Throwable onSubscribeError(Throwable e) {
// pass through by default
return e;
@@ -122,6 +126,7 @@ public Throwable onSubscribeError(Throwable e) {
* @return {@link Operator}{@code } function that can be modified, decorated, replaced or just
* returned as a pass through
*/
+ @Deprecated
public Operator extends R, ? super T> onLift(final Operator extends R, ? super T> lift) {
return lift;
}
diff --git a/src/main/java/rx/plugins/RxJavaPlugins.java b/src/main/java/rx/plugins/RxJavaPlugins.java
index f9926ac588..9271a42ce2 100644
--- a/src/main/java/rx/plugins/RxJavaPlugins.java
+++ b/src/main/java/rx/plugins/RxJavaPlugins.java
@@ -45,6 +45,9 @@
*
*
* @see RxJava Wiki: Plugins
+ *
+ * Use the {@link RxJavaHooks} features instead which let's you change individual
+ * handlers at runtime.
*/
public class RxJavaPlugins {
private final static RxJavaPlugins INSTANCE = new RxJavaPlugins();
@@ -59,7 +62,10 @@ public class RxJavaPlugins {
* Retrieves the single {@code RxJavaPlugins} instance.
*
* @return the single {@code RxJavaPlugins} instance
+ *
+ * @deprecated use the static methods of {@link RxJavaHooks}.
*/
+ @Deprecated
public static RxJavaPlugins getInstance() {
return INSTANCE;
}
diff --git a/src/main/java/rx/plugins/RxJavaSchedulersHook.java b/src/main/java/rx/plugins/RxJavaSchedulersHook.java
index e50d6a611e..a8158e2f5e 100644
--- a/src/main/java/rx/plugins/RxJavaSchedulersHook.java
+++ b/src/main/java/rx/plugins/RxJavaSchedulersHook.java
@@ -148,6 +148,7 @@ public Scheduler getNewThreadScheduler() {
* @param action action to schedule
* @return wrapped action to schedule
*/
+ @Deprecated
public Action0 onSchedule(Action0 action) {
return action;
}
diff --git a/src/main/java/rx/plugins/RxJavaSingleExecutionHook.java b/src/main/java/rx/plugins/RxJavaSingleExecutionHook.java
index ed780381b8..4e023f615e 100644
--- a/src/main/java/rx/plugins/RxJavaSingleExecutionHook.java
+++ b/src/main/java/rx/plugins/RxJavaSingleExecutionHook.java
@@ -51,6 +51,7 @@ public abstract class RxJavaSingleExecutionHook {
* @return {@link rx.Single.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
* returned as a pass through
*/
+ @Deprecated
public Single.OnSubscribe onCreate(Single.OnSubscribe f) {
return f;
}
@@ -68,6 +69,7 @@ public Single.OnSubscribe onCreate(Single.OnSubscribe f) {
* @return {@link rx.Observable.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
* returned as a pass through
*/
+ @Deprecated
public Observable.OnSubscribe onSubscribeStart(Single extends T> singleInstance, final Observable.OnSubscribe onSubscribe) {
// pass through by default
return onSubscribe;
@@ -86,6 +88,7 @@ public Observable.OnSubscribe onSubscribeStart(Single extends T> single
* @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a
* pass through
*/
+ @Deprecated
public Subscription onSubscribeReturn(Subscription subscription) {
// pass through by default
return subscription;
@@ -102,6 +105,7 @@ public Subscription onSubscribeReturn(Subscription subscription) {
* Throwable thrown by {@link Single#subscribe(Subscriber)}
* @return Throwable that can be decorated, replaced or just returned as a pass through
*/
+ @Deprecated
public Throwable onSubscribeError(Throwable e) {
// pass through by default
return e;
@@ -121,6 +125,7 @@ public Throwable onSubscribeError(Throwable e) {
* @return {@link rx.Observable.Operator}{@code } function that can be modified, decorated, replaced or just
* returned as a pass through
*/
+ @Deprecated
public Observable.Operator extends R, ? super T> onLift(final Observable.Operator extends R, ? super T> lift) {
return lift;
}
diff --git a/src/main/java/rx/schedulers/Schedulers.java b/src/main/java/rx/schedulers/Schedulers.java
index 031d0bb463..6dc89f0178 100644
--- a/src/main/java/rx/schedulers/Schedulers.java
+++ b/src/main/java/rx/schedulers/Schedulers.java
@@ -21,8 +21,7 @@
import rx.internal.schedulers.GenericScheduledExecutorService;
import rx.internal.schedulers.SchedulerLifecycle;
import rx.internal.util.RxRingBuffer;
-import rx.plugins.RxJavaPlugins;
-import rx.plugins.RxJavaSchedulersHook;
+import rx.plugins.*;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
@@ -54,6 +53,7 @@ private static Schedulers getInstance() {
}
private Schedulers() {
+ @SuppressWarnings("deprecation")
RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();
Scheduler c = hook.getComputationScheduler();
@@ -105,7 +105,7 @@ public static Scheduler trampoline() {
* @return a {@link Scheduler} that creates new threads
*/
public static Scheduler newThread() {
- return getInstance().newThreadScheduler;
+ return RxJavaHooks.onNewThreadScheduler(getInstance().newThreadScheduler);
}
/**
@@ -120,7 +120,7 @@ public static Scheduler newThread() {
* @return a {@link Scheduler} meant for computation-bound work
*/
public static Scheduler computation() {
- return getInstance().computationScheduler;
+ return RxJavaHooks.onComputationScheduler(getInstance().computationScheduler);
}
/**
@@ -137,7 +137,7 @@ public static Scheduler computation() {
* @return a {@link Scheduler} meant for IO-bound work
*/
public static Scheduler io() {
- return getInstance().ioScheduler;
+ return RxJavaHooks.onComputationScheduler(getInstance().ioScheduler);
}
/**
diff --git a/src/test/java/rx/CompletableTest.java b/src/test/java/rx/CompletableTest.java
index a2a49d6c2d..018bf64509 100644
--- a/src/test/java/rx/CompletableTest.java
+++ b/src/test/java/rx/CompletableTest.java
@@ -31,7 +31,7 @@
import rx.exceptions.*;
import rx.functions.*;
import rx.observers.TestSubscriber;
-import rx.plugins.*;
+import rx.plugins.RxJavaHooks;
import rx.schedulers.*;
import rx.subjects.PublishSubject;
import rx.subscriptions.*;
@@ -1216,7 +1216,7 @@ public void onCompleted() {
@Override
public void onError(Throwable e) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
+ RxJavaHooks.onError(e);
}
});
@@ -4036,13 +4036,34 @@ public void onNext(Object t) {
}
}
- private static RxJavaCompletableExecutionHook hookSpy;
+ private Func1 onCreate;
+
+ private Func2 onStart;
@Before
public void setUp() throws Exception {
- hookSpy = spy(
- new RxJavaPluginsTest.RxJavaCompletableExecutionHookTestImpl());
- Completable.HOOK = hookSpy;
+ onCreate = spy(new Func1() {
+ @Override
+ public CompletableOnSubscribe call(CompletableOnSubscribe t) {
+ return t;
+ }
+ });
+
+ RxJavaHooks.setOnCompletableCreate(onCreate);
+
+ onStart = spy(new Func2() {
+ @Override
+ public CompletableOnSubscribe call(Completable t1, CompletableOnSubscribe t2) {
+ return t2;
+ }
+ });
+
+ RxJavaHooks.setOnCompletableStart(onStart);
+ }
+
+ @After
+ public void after() {
+ RxJavaHooks.reset();
}
@Test
@@ -4050,7 +4071,7 @@ public void testHookCreate() {
CompletableOnSubscribe subscriber = mock(CompletableOnSubscribe.class);
Completable.create(subscriber);
- verify(hookSpy, times(1)).onCreate(subscriber);
+ verify(onCreate, times(1)).call(subscriber);
}
@Test
@@ -4064,7 +4085,7 @@ public void testHookSubscribeStart() {
});
completable.subscribe(ts);
- verify(hookSpy, times(1)).onSubscribeStart(eq(completable), any(Completable.CompletableOnSubscribe.class));
+ verify(onStart, times(1)).call(eq(completable), any(Completable.CompletableOnSubscribe.class));
}
@Test
@@ -4077,7 +4098,7 @@ public void testHookUnsafeSubscribeStart() {
});
completable.unsafeSubscribe(ts);
- verify(hookSpy, times(1)).onSubscribeStart(eq(completable), any(Completable.CompletableOnSubscribe.class));
+ verify(onStart, times(1)).call(eq(completable), any(Completable.CompletableOnSubscribe.class));
}
@Test
diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java
index 12b059e648..7f45192044 100644
--- a/src/test/java/rx/SingleTest.java
+++ b/src/test/java/rx/SingleTest.java
@@ -21,33 +21,64 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import rx.Single.OnSubscribe;
import rx.exceptions.*;
import rx.functions.*;
-import rx.observers.SafeSubscriber;
-import rx.observers.TestSubscriber;
-import rx.plugins.RxJavaPluginsTest;
-import rx.plugins.RxJavaSingleExecutionHook;
-import rx.schedulers.Schedulers;
-import rx.schedulers.TestScheduler;
+import rx.observers.*;
+import rx.plugins.RxJavaHooks;
+import rx.schedulers.*;
import rx.singles.BlockingSingle;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
public class SingleTest {
- private static RxJavaSingleExecutionHook hookSpy;
+ @SuppressWarnings("rawtypes")
+ private Func1 onCreate;
+
+ @SuppressWarnings("rawtypes")
+ private Func2 onStart;
+ private Func1 onReturn;
+
+ @SuppressWarnings("rawtypes")
@Before
public void setUp() throws Exception {
- hookSpy = spy(
- new RxJavaPluginsTest.RxJavaSingleExecutionHookTestImpl());
- Single.hook = hookSpy;
+ onCreate = spy(new Func1() {
+ @Override
+ public Single.OnSubscribe call(Single.OnSubscribe t) {
+ return t;
+ }
+ });
+
+ RxJavaHooks.setOnSingleCreate(onCreate);
+
+ onStart = spy(new Func2() {
+ @Override
+ public Observable.OnSubscribe call(Single t1, Observable.OnSubscribe t2) {
+ return t2;
+ }
+ });
+
+ RxJavaHooks.setOnSingleStart(onStart);
+
+ onReturn = spy(new Func1() {
+ @Override
+ public Subscription call(Subscription t) {
+ return t;
+ }
+ });
+
+ RxJavaHooks.setOnSingleReturn(onReturn);
+ }
+
+ @After
+ public void after() {
+ RxJavaHooks.reset();
}
@Test
@@ -400,10 +431,9 @@ public void testHookCreate() {
OnSubscribe