From c78f3bcd84c9e8d92f55bfe825069a19a60cad4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 26 May 2016 00:59:26 +0200 Subject: [PATCH] 1.x: add optional tracking of worker creation sites + report it on error --- .../schedulers/CachedThreadScheduler.java | 34 +++- .../schedulers/EventLoopsScheduler.java | 57 +++++- .../schedulers/ExecutorScheduler.java | 30 ++- .../schedulers/NewThreadScheduler.java | 6 +- .../internal/schedulers/NewThreadWorker.java | 65 +++--- .../internal/schedulers/ScheduledAction.java | 80 ++------ .../internal/schedulers/WorkerCallback.java | 41 ++++ .../schedulers/WorkerDebugSupport.java | 63 ++++++ src/main/java/rx/schedulers/Schedulers.java | 21 +- .../rx/schedulers/AbstractSchedulerTests.java | 190 ++++++++++++++++-- 10 files changed, 444 insertions(+), 143 deletions(-) create mode 100644 src/main/java/rx/internal/schedulers/WorkerCallback.java create mode 100644 src/main/java/rx/internal/schedulers/WorkerDebugSupport.java diff --git a/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java b/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java index 1ebfa56fe4..f74f665590 100644 --- a/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java +++ b/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java @@ -170,10 +170,16 @@ public void shutdown() { @Override public Worker createWorker() { - return new EventLoopWorker(pool.get()); + Throwable site = null; + if (WorkerDebugSupport.isEnabled()) { + site = new RuntimeException("createWorker() called"); + } + return new EventLoopWorker(pool.get(), site); } - private static final class EventLoopWorker extends Scheduler.Worker { + private static final class EventLoopWorker extends Scheduler.Worker + implements WorkerCallback { + final Throwable site; private final CompositeSubscription innerSubscription = new CompositeSubscription(); private final CachedWorkerPool pool; private final ThreadWorker threadWorker; @@ -182,8 +188,9 @@ private static final class EventLoopWorker extends Scheduler.Worker { static final AtomicIntegerFieldUpdater ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(EventLoopWorker.class, "once"); - EventLoopWorker(CachedWorkerPool pool) { + EventLoopWorker(CachedWorkerPool pool, Throwable site) { this.pool = pool; + this.site = site; this.threadWorker = pool.get(); } @@ -221,18 +228,31 @@ public void call() { } action.call(); } - }, delayTime, unit); - innerSubscription.add(s); - s.addParent(innerSubscription); + }, delayTime, unit, this); return s; } + + @Override + public void add(ScheduledAction action) { + innerSubscription.add(action); + } + + @Override + public void remove(ScheduledAction action) { + innerSubscription.remove(action); + } + + @Override + public Throwable workerCreationSite() { + return site; + } } private static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { - super(threadFactory); + super(threadFactory, null); this.expirationTime = 0L; } diff --git a/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java b/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java index fb813412db..c88d55c8cd 100644 --- a/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java +++ b/src/main/java/rx/internal/schedulers/EventLoopsScheduler.java @@ -45,7 +45,7 @@ public final class EventLoopsScheduler extends Scheduler implements SchedulerLif static final PoolWorker SHUTDOWN_WORKER; static { - SHUTDOWN_WORKER = new PoolWorker(RxThreadFactory.NONE); + SHUTDOWN_WORKER = new PoolWorker(RxThreadFactory.NONE, null); SHUTDOWN_WORKER.unsubscribe(); } @@ -60,7 +60,7 @@ static final class FixedSchedulerPool { this.cores = maxThreads; this.eventLoops = new PoolWorker[maxThreads]; for (int i = 0; i < maxThreads; i++) { - this.eventLoops[i] = new PoolWorker(threadFactory); + this.eventLoops[i] = new PoolWorker(threadFactory, null); } } @@ -98,7 +98,11 @@ public EventLoopsScheduler(ThreadFactory threadFactory) { @Override public Worker createWorker() { - return new EventLoopWorker(pool.get().getEventLoop()); + Throwable site = null; + if (WorkerDebugSupport.isEnabled()) { + site = new RuntimeException("createWorker() called"); + } + return new EventLoopWorker(pool.get().getEventLoop(), site); } @Override @@ -131,7 +135,7 @@ public void shutdown() { */ public Subscription scheduleDirect(Action0 action) { PoolWorker pw = pool.get().getEventLoop(); - return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS); + return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS, null); } private static class EventLoopWorker extends Scheduler.Worker { @@ -139,10 +143,43 @@ private static class EventLoopWorker extends Scheduler.Worker { private final CompositeSubscription timed = new CompositeSubscription(); private final SubscriptionList both = new SubscriptionList(serial, timed); private final PoolWorker poolWorker; + final WorkerCallback timedCallback; + final WorkerCallback serialCallback; - EventLoopWorker(PoolWorker poolWorker) { + EventLoopWorker(PoolWorker poolWorker, final Throwable site) { this.poolWorker = poolWorker; - + this.timedCallback = new WorkerCallback() { + @Override + public void add(ScheduledAction action) { + timed.add(action); + } + + @Override + public void remove(ScheduledAction action) { + timed.remove(action); + } + + @Override + public Throwable workerCreationSite() { + return site; + } + }; + this.serialCallback = new WorkerCallback() { + @Override + public void add(ScheduledAction action) { + serial.add(action); + } + + @Override + public void remove(ScheduledAction action) { + serial.remove(action); + } + + @Override + public Throwable workerCreationSite() { + return site; + } + }; } @Override @@ -169,7 +206,7 @@ public void call() { } action.call(); } - }, 0, null, serial); + }, 0, null, serialCallback); } @Override @@ -186,13 +223,13 @@ public void call() { } action.call(); } - }, delayTime, unit, timed); + }, delayTime, unit, timedCallback); } } static final class PoolWorker extends NewThreadWorker { - PoolWorker(ThreadFactory threadFactory) { - super(threadFactory); + PoolWorker(ThreadFactory threadFactory, Throwable site) { + super(threadFactory, site); } } } diff --git a/src/main/java/rx/internal/schedulers/ExecutorScheduler.java b/src/main/java/rx/internal/schedulers/ExecutorScheduler.java index a57ee8c938..23b72d3474 100644 --- a/src/main/java/rx/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/rx/internal/schedulers/ExecutorScheduler.java @@ -37,11 +37,16 @@ public ExecutorScheduler(Executor executor) { @Override public Worker createWorker() { - return new ExecutorSchedulerWorker(executor); + Throwable site = null; + if (WorkerDebugSupport.isEnabled()) { + site = new RuntimeException("createWorker() called"); + } + return new ExecutorSchedulerWorker(executor, site); } /** Worker that schedules tasks on the executor indirectly through a trampoline mechanism. */ - static final class ExecutorSchedulerWorker extends Scheduler.Worker implements Runnable { + static final class ExecutorSchedulerWorker extends Scheduler.Worker + implements Runnable, WorkerCallback { final Executor executor; // TODO: use a better performing structure for task tracking final CompositeSubscription tasks; @@ -51,12 +56,15 @@ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements R final ScheduledExecutorService service; - public ExecutorSchedulerWorker(Executor executor) { + final Throwable site; + + public ExecutorSchedulerWorker(Executor executor, Throwable site) { this.executor = executor; this.queue = new ConcurrentLinkedQueue(); this.wip = new AtomicInteger(); this.tasks = new CompositeSubscription(); this.service = GenericScheduledExecutorService.getInstance(); + this.site = site; } @Override @@ -64,7 +72,7 @@ public Subscription schedule(Action0 action) { if (isUnsubscribed()) { return Subscriptions.unsubscribed(); } - ScheduledAction ea = new ScheduledAction(action, tasks); + ScheduledAction ea = new ScheduledAction(action, this); tasks.add(ea); queue.offer(ea); if (wip.getAndIncrement() == 0) { @@ -180,5 +188,19 @@ public void unsubscribe() { queue.clear(); } + @Override + public void add(ScheduledAction action) { + tasks.add(action); + } + + @Override + public void remove(ScheduledAction action) { + tasks.remove(action); + } + + @Override + public Throwable workerCreationSite() { + return site; + } } } diff --git a/src/main/java/rx/internal/schedulers/NewThreadScheduler.java b/src/main/java/rx/internal/schedulers/NewThreadScheduler.java index 96d04026f9..b87faa25b3 100644 --- a/src/main/java/rx/internal/schedulers/NewThreadScheduler.java +++ b/src/main/java/rx/internal/schedulers/NewThreadScheduler.java @@ -30,6 +30,10 @@ public NewThreadScheduler(ThreadFactory threadFactory) { @Override public Worker createWorker() { - return new NewThreadWorker(threadFactory); + Throwable site = null; + if (WorkerDebugSupport.isEnabled()) { + site = new RuntimeException("createWorker() called"); + } + return new NewThreadWorker(threadFactory, site); } } diff --git a/src/main/java/rx/internal/schedulers/NewThreadWorker.java b/src/main/java/rx/internal/schedulers/NewThreadWorker.java index 585d27b3b6..4addd2e7e9 100644 --- a/src/main/java/rx/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/rx/internal/schedulers/NewThreadWorker.java @@ -32,7 +32,9 @@ /** * @warn class description missing */ -public class NewThreadWorker extends Scheduler.Worker implements Subscription { +public class NewThreadWorker extends Scheduler.Worker implements Subscription, WorkerCallback { + final Throwable site; + private final ScheduledExecutorService executor; private final RxJavaSchedulersHook schedulersHook; volatile boolean isUnsubscribed; @@ -200,15 +202,16 @@ static Method findSetRemoveOnCancelPolicyMethod(ScheduledExecutorService executo } /* package */ - public NewThreadWorker(ThreadFactory threadFactory) { + public NewThreadWorker(ThreadFactory threadFactory, Throwable site) { ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak boolean cancelSupported = tryEnableCancelPolicy(exec); if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) { registerExecutor((ScheduledThreadPoolExecutor)exec); } - schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook(); - executor = exec; + this.site = site; + this.schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook(); + this.executor = exec; } @Override @@ -221,7 +224,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit if (isUnsubscribed) { return Subscriptions.unsubscribed(); } - return scheduleActual(action, delayTime, unit); + return scheduleActual(action, delayTime, unit, this); } /** @@ -230,25 +233,18 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit * @param action the action to wrap and schedule * @param delayTime the delay in execution * @param unit the time unit of the delay + * @param parent the optional parent callback in case the action needs to be tracked * @return the wrapper ScheduledAction */ - public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { + public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, WorkerCallback parent) { Action0 decoratedAction = schedulersHook.onSchedule(action); - ScheduledAction run = new ScheduledAction(decoratedAction); - Future f; - if (delayTime <= 0) { - f = executor.submit(run); + ScheduledAction run; + if (parent != null) { + run = new ScheduledAction(decoratedAction, parent); + parent.add(run); } else { - f = executor.schedule(run, delayTime, unit); + run = new ScheduledAction(decoratedAction); } - run.add(f); - - return run; - } - public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) { - Action0 decoratedAction = schedulersHook.onSchedule(action); - ScheduledAction run = new ScheduledAction(decoratedAction, parent); - parent.add(run); Future f; if (delayTime <= 0) { @@ -261,22 +257,6 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time return run; } - public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) { - Action0 decoratedAction = schedulersHook.onSchedule(action); - ScheduledAction run = new ScheduledAction(decoratedAction, parent); - parent.add(run); - - Future f; - if (delayTime <= 0) { - f = executor.submit(run); - } else { - f = executor.schedule(run, delayTime, unit); - } - run.add(f); - - return run; - } - @Override public void unsubscribe() { isUnsubscribed = true; @@ -288,4 +268,19 @@ public void unsubscribe() { public boolean isUnsubscribed() { return isUnsubscribed; } + + @Override + public void add(ScheduledAction action) { + // NewThreadWorker doesn't track tasks on its own + } + + @Override + public void remove(ScheduledAction action) { + // NewThreadWorker doesn't track tasks on its own + } + + @Override + public Throwable workerCreationSite() { + return site; + } } diff --git a/src/main/java/rx/internal/schedulers/ScheduledAction.java b/src/main/java/rx/internal/schedulers/ScheduledAction.java index 0f7d145a20..ec4eefbf7a 100644 --- a/src/main/java/rx/internal/schedulers/ScheduledAction.java +++ b/src/main/java/rx/internal/schedulers/ScheduledAction.java @@ -19,11 +19,10 @@ import java.util.concurrent.atomic.*; import rx.Subscription; -import rx.exceptions.OnErrorNotImplementedException; +import rx.exceptions.*; import rx.functions.Action0; import rx.internal.util.SubscriptionList; import rx.plugins.RxJavaPlugins; -import rx.subscriptions.CompositeSubscription; /** * A {@code Runnable} that executes an {@code Action0} and can be cancelled. The analog is the @@ -32,21 +31,20 @@ public final class ScheduledAction extends AtomicReference implements Runnable, Subscription { /** */ private static final long serialVersionUID = -3962399486978279857L; + final WorkerCallback parent; final SubscriptionList cancel; final Action0 action; public ScheduledAction(Action0 action) { this.action = action; + this.parent = null; this.cancel = new SubscriptionList(); } - public ScheduledAction(Action0 action, CompositeSubscription parent) { + public ScheduledAction(Action0 action, WorkerCallback parent) { this.action = action; + this.parent = parent; this.cancel = new SubscriptionList(new Remover(this, parent)); } - public ScheduledAction(Action0 action, SubscriptionList parent) { - this.action = action; - this.cancel = new SubscriptionList(new Remover2(this, parent)); - } @Override public void run() { @@ -54,13 +52,22 @@ public void run() { lazySet(Thread.currentThread()); action.call(); } catch (Throwable e) { + Throwable ex = e; + WorkerCallback localParent = parent; + if (localParent != null) { + Throwable wt = localParent.workerCreationSite(); + if (wt != null) { + ex = new CompositeException(e, wt); + } + } // nothing to do but print a System error as this is fatal and there is nowhere else to throw this - IllegalStateException ie = null; + Throwable ie = null; if (e instanceof OnErrorNotImplementedException) { - ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e); + ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", ex); } else { - ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e); + ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", ex); } + RxJavaPlugins.getInstance().getErrorHandler().handleError(ie); Thread thread = Thread.currentThread(); thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); @@ -68,7 +75,7 @@ public void run() { unsubscribe(); } } - + @Override public boolean isUnsubscribed() { return cancel.isUnsubscribed(); @@ -100,28 +107,6 @@ public void add(final Future f) { cancel.add(new FutureCompleter(f)); } - /** - * Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is - * cancelled or terminates, it can remove itself from this parent. - * - * @param parent - * the parent {@code CompositeSubscription} to add - */ - public void addParent(CompositeSubscription parent) { - cancel.add(new Remover(this, parent)); - } - - /** - * Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is - * cancelled or terminates, it can remove itself from this parent. - * - * @param parent - * the parent {@code CompositeSubscription} to add - */ - public void addParent(SubscriptionList parent) { - cancel.add(new Remover2(this, parent)); - } - /** * Cancels the captured future if the caller of the call method * is not the same as the runner of the outer ScheduledAction to @@ -154,34 +139,9 @@ private static final class Remover extends AtomicBoolean implements Subscription /** */ private static final long serialVersionUID = 247232374289553518L; final ScheduledAction s; - final CompositeSubscription parent; - - public Remover(ScheduledAction s, CompositeSubscription parent) { - this.s = s; - this.parent = parent; - } - - @Override - public boolean isUnsubscribed() { - return s.isUnsubscribed(); - } - - @Override - public void unsubscribe() { - if (compareAndSet(false, true)) { - parent.remove(s); - } - } - - } - /** Remove a child subscription from a composite when unsubscribing. */ - private static final class Remover2 extends AtomicBoolean implements Subscription { - /** */ - private static final long serialVersionUID = 247232374289553518L; - final ScheduledAction s; - final SubscriptionList parent; + final WorkerCallback parent; - public Remover2(ScheduledAction s, SubscriptionList parent) { + public Remover(ScheduledAction s, WorkerCallback parent) { this.s = s; this.parent = parent; } diff --git a/src/main/java/rx/internal/schedulers/WorkerCallback.java b/src/main/java/rx/internal/schedulers/WorkerCallback.java new file mode 100644 index 0000000000..dd6c6873f2 --- /dev/null +++ b/src/main/java/rx/internal/schedulers/WorkerCallback.java @@ -0,0 +1,41 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.schedulers; + +/** + * Called by the ScheduledAction to remove itself from the parent tracking structure + * and ask for the instantiation location of the parent Worker. + */ +public interface WorkerCallback { + /** + * Adds the specified action to the tracking structure. + * @param action the action to add, not null + */ + void add(ScheduledAction action); + /** + * Remove the specified action from the tracking structure. + * @param action the action to remove, not null + */ + void remove(ScheduledAction action); + /** + * Returns the Throwable exception representing the stacktrace + * where the parent worker has been created or null + * if worker tracking is disabled. + * @return the Throwable or null + */ + Throwable workerCreationSite(); +} diff --git a/src/main/java/rx/internal/schedulers/WorkerDebugSupport.java b/src/main/java/rx/internal/schedulers/WorkerDebugSupport.java new file mode 100644 index 0000000000..1e4690a410 --- /dev/null +++ b/src/main/java/rx/internal/schedulers/WorkerDebugSupport.java @@ -0,0 +1,63 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package rx.internal.schedulers; + +import java.security.*; + +/** + * Holds onto a flag that enables the capture of the current stacktrace when + * a Scheduler.Worker is created. + * + * Use the system property {@code rx.scheduler-worker.debug} ({@code true|false}) + * to initialize its value, or use the setEnabled() method during runtime. + */ +public enum WorkerDebugSupport { + ; + + private static volatile boolean enabled; + + static { + String s = System.getProperty("rx.scheduler-worker.debug", "false"); + enabled = "true".equals(s); + } + /** + * Returns the current state of the worker debug mode. + * @return the current state of the worker debug mode + */ + public static boolean isEnabled() { + return enabled; + } + + /** + * Enables or disables the worker debug mode. + * @param value the new state + */ + public static void setEnabled(final boolean value) { + SecurityManager smgr = System.getSecurityManager(); + if (smgr == null) { + enabled = value; + } else { + AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Void run() { + enabled = value; + return null; + } + }); + } + } +} diff --git a/src/main/java/rx/schedulers/Schedulers.java b/src/main/java/rx/schedulers/Schedulers.java index eae594ef08..b0260736f4 100644 --- a/src/main/java/rx/schedulers/Schedulers.java +++ b/src/main/java/rx/schedulers/Schedulers.java @@ -15,13 +15,13 @@ */ package rx.schedulers; +import java.util.concurrent.Executor; + import rx.Scheduler; +import rx.annotations.Experimental; import rx.internal.schedulers.*; import rx.internal.util.RxRingBuffer; -import rx.plugins.RxJavaPlugins; -import rx.plugins.RxJavaSchedulersHook; - -import java.util.concurrent.Executor; +import rx.plugins.*; /** * Static factory methods for creating Schedulers. @@ -189,4 +189,17 @@ public static void shutdown() { RxRingBuffer.SPMC_POOL.shutdown(); } } + + /** + * Enable or disable worker tracking; if a scheduled task crashes, + * the error reported to the RxJavaPlugins will contain an IllegalStateException + * with a CompositeException cause of the original crash exception + * along with a RuntimeException holding the + * stacktrace where the parent worker has been instantiated. + * @param value the state + */ + @Experimental + public static void setWorkerTracking(boolean value) { + WorkerDebugSupport.setEnabled(value); + } } \ No newline at end of file diff --git a/src/test/java/rx/schedulers/AbstractSchedulerTests.java b/src/test/java/rx/schedulers/AbstractSchedulerTests.java index f27c43807c..96d0aba720 100644 --- a/src/test/java/rx/schedulers/AbstractSchedulerTests.java +++ b/src/test/java/rx/schedulers/AbstractSchedulerTests.java @@ -15,34 +15,26 @@ */ package rx.schedulers; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.junit.Test; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import org.junit.*; import org.mockito.InOrder; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import rx.*; import rx.Observable; import rx.Observable.OnSubscribe; -import rx.Scheduler; -import rx.Subscriber; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; +import rx.Scheduler.Worker; +import rx.exceptions.*; +import rx.functions.*; +import rx.internal.schedulers.WorkerDebugSupport; +import rx.plugins.*; /** * Base tests for all schedulers including Immediate/Current. @@ -502,4 +494,158 @@ public void onNext(T args) { } + void workerCreationSiteCaptured(boolean timed) throws Exception { + Scheduler scheduler = getScheduler(); + + if ((scheduler instanceof rx.internal.schedulers.TrampolineScheduler) + || (scheduler instanceof rx.internal.schedulers.ImmediateScheduler)) { + // we don't care about these schedulers + return; + } + + final AtomicReference error = new AtomicReference(); + RxJavaPlugins plugin = RxJavaPlugins.getInstance(); + plugin.reset(); + plugin.registerErrorHandler(new RxJavaErrorHandler() { + @Override + public void handleError(Throwable e) { + error.set(e); + } + }); + + boolean state = WorkerDebugSupport.isEnabled(); + + Schedulers.setWorkerTracking(true); + + Worker w = scheduler.createWorker(); + try { + + if (timed) { + w.schedule(new Action0() { + @Override + public void call() { + throw new TestException("Forced failure"); + } + }, 50, TimeUnit.MILLISECONDS); + Thread.sleep(100); + } else { + w.schedule(new Action0() { + @Override + public void call() { + throw new TestException("Forced failure"); + } + }); + + Thread.sleep(10); + } + + while (error.get() == null) ; + + Throwable ex = error.get(); + Assert.assertTrue(ex.toString(), ex instanceof IllegalStateException); + + Throwable cause = ex.getCause(); + + Assert.assertTrue(cause.toString(), cause instanceof CompositeException); + + CompositeException ce = (CompositeException)ex.getCause(); + List list = ce.getExceptions(); + + Assert.assertEquals(2, list.size()); + + Throwable ex2 = list.get(1); + + for (StackTraceElement ste : ex2.getStackTrace()) { + if (ste.getMethodName().equals("workerCreationSiteCaptured")) { + return; + } + } + + Assert.fail("Couldn't find the current method in the stacktrace"); + + } finally { + Schedulers.setWorkerTracking(state); + w.unsubscribe(); + } + } + + void workerCreationSiteNotCaptured(boolean timed) throws Exception { + Scheduler scheduler = getScheduler(); + + if ((scheduler instanceof rx.internal.schedulers.TrampolineScheduler) + || (scheduler instanceof rx.internal.schedulers.ImmediateScheduler)) { + // we don't care about these schedulers + return; + } + + final AtomicReference error = new AtomicReference(); + RxJavaPlugins plugin = RxJavaPlugins.getInstance(); + plugin.reset(); + plugin.registerErrorHandler(new RxJavaErrorHandler() { + @Override + public void handleError(Throwable e) { + error.set(e); + } + }); + + boolean state = WorkerDebugSupport.isEnabled(); + + Schedulers.setWorkerTracking(false); + + Worker w = scheduler.createWorker(); + try { + + if (timed) { + w.schedule(new Action0() { + @Override + public void call() { + throw new TestException("Forced failure"); + } + }, 50, TimeUnit.MILLISECONDS); + Thread.sleep(100); + } else { + w.schedule(new Action0() { + @Override + public void call() { + throw new TestException("Forced failure"); + } + }); + + Thread.sleep(10); + } + + while (error.get() == null) ; + + Throwable ex = error.get(); + Assert.assertTrue(ex.toString(), ex instanceof IllegalStateException); + + Throwable cause = ex.getCause(); + + Assert.assertTrue(ex.toString(), cause instanceof TestException); + Assert.assertEquals("Forced failure", cause.getMessage()); + } finally { + Schedulers.setWorkerTracking(state); + w.unsubscribe(); + } + } + + @Test(timeout = 1500) + public void workerCreationSiteCaptured() throws Exception { + workerCreationSiteCaptured(false); + } + + @Test(timeout = 1500) + public void workerCreationSiteCapturedTimed() throws Exception { + workerCreationSiteCaptured(true); + } + + @Test(timeout = 1500) + public void workerCreationSiteNotCaptured() throws Exception { + workerCreationSiteNotCaptured(false); + } + + @Test(timeout = 1500) + public void workerCreationSiteNotCapturedTimed() throws Exception { + workerCreationSiteNotCaptured(true); + } }