diff --git a/src/main/java/com/metamx/common/lifecycle/Lifecycle.java b/src/main/java/com/metamx/common/lifecycle/Lifecycle.java index e53154ec..9f22ceea 100644 --- a/src/main/java/com/metamx/common/lifecycle/Lifecycle.java +++ b/src/main/java/com/metamx/common/lifecycle/Lifecycle.java @@ -51,6 +51,7 @@ public class Lifecycle private final Map> handlers; private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean shutdownHookRegistered = new AtomicBoolean(false); private volatile Stage currStage = null; public static enum Stage @@ -72,6 +73,7 @@ public Lifecycle() * Stage.NORMAL. If the lifecycle has already been started, it throws an {@link ISE} * * @param o The object to add to the lifecycle + * * @throws ISE {@link Lifecycle#addHandler(Handler, Stage)} */ public T addManagedInstance(T o) @@ -84,8 +86,9 @@ public T addManagedInstance(T o) * Adds a "managed" instance (annotated with {@link LifecycleStart} and {@link LifecycleStop}) to the Lifecycle. * If the lifecycle has already been started, it throws an {@link ISE} * - * @param o The object to add to the lifecycle + * @param o The object to add to the lifecycle * @param stage The stage to add the lifecycle at + * * @throws ISE {@link Lifecycle#addHandler(Handler, Stage)} */ public T addManagedInstance(T o, Stage stage) @@ -99,6 +102,7 @@ public T addManagedInstance(T o, Stage stage) * already been started, it throws an {@link ISE} * * @param o The object to add to the lifecycle + * * @throws ISE {@link Lifecycle#addHandler(Handler, Stage)} */ public T addStartCloseInstance(T o) @@ -111,8 +115,9 @@ public T addStartCloseInstance(T o) * Adds an instance with a start() and/or close() method to the Lifecycle. If the lifecycle has already been started, * it throws an {@link ISE} * - * @param o The object to add to the lifecycle + * @param o The object to add to the lifecycle * @param stage The stage to add the lifecycle at + * * @throws ISE {@link Lifecycle#addHandler(Handler, Stage)} */ public T addStartCloseInstance(T o, Stage stage) @@ -126,6 +131,7 @@ public T addStartCloseInstance(T o, Stage stage) * an {@link ISE} * * @param handler The hander to add to the lifecycle + * * @throws ISE {@link Lifecycle#addHandler(Handler, Stage)} */ public void addHandler(Handler handler) @@ -137,7 +143,8 @@ public void addHandler(Handler handler) * Adds a handler to the Lifecycle. If the lifecycle has already been started, it throws an {@link ISE} * * @param handler The hander to add to the lifecycle - * @param stage The stage to add the lifecycle at + * @param stage The stage to add the lifecycle at + * * @throws ISE indicates that the lifecycle has already been started and thus cannot be added to */ public void addHandler(Handler handler, Stage stage) @@ -155,6 +162,7 @@ public void addHandler(Handler handler, Stage stage) * Stage.NORMAL and starts it if the lifecycle has already been started. * * @param o The object to add to the lifecycle + * * @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)} */ public T addMaybeStartManagedInstance(T o) throws Exception @@ -167,8 +175,9 @@ public T addMaybeStartManagedInstance(T o) throws Exception * Adds a "managed" instance (annotated with {@link LifecycleStart} and {@link LifecycleStop}) to the Lifecycle * and starts it if the lifecycle has already been started. * - * @param o The object to add to the lifecycle + * @param o The object to add to the lifecycle * @param stage The stage to add the lifecycle at + * * @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)} */ public T addMaybeStartManagedInstance(T o, Stage stage) throws Exception @@ -182,6 +191,7 @@ public T addMaybeStartManagedInstance(T o, Stage stage) throws Exception * lifecycle has already been started. * * @param o The object to add to the lifecycle + * * @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)} */ public T addMaybeStartStartCloseInstance(T o) throws Exception @@ -194,8 +204,9 @@ public T addMaybeStartStartCloseInstance(T o) throws Exception * Adds an instance with a start() and/or close() method to the Lifecycle and starts it if the lifecycle has * already been started. * - * @param o The object to add to the lifecycle + * @param o The object to add to the lifecycle * @param stage The stage to add the lifecycle at + * * @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)} */ public T addMaybeStartStartCloseInstance(T o, Stage stage) throws Exception @@ -208,6 +219,7 @@ public T addMaybeStartStartCloseInstance(T o, Stage stage) throws Exception * Adds a handler to the Lifecycle at the Stage.NORMAL stage and starts it if the lifecycle has already been started. * * @param handler The hander to add to the lifecycle + * * @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)} */ public void addMaybeStartHandler(Handler handler) throws Exception @@ -219,7 +231,8 @@ public void addMaybeStartHandler(Handler handler) throws Exception * Adds a handler to the Lifecycle and starts it if the lifecycle has already been started. * * @param handler The hander to add to the lifecycle - * @param stage The stage to add the lifecycle at + * @param stage The stage to add the lifecycle at + * * @throws Exception an exception thrown from handler.start(). If an exception is thrown, the handler is *not* added */ public void addMaybeStartHandler(Handler handler, Stage stage) throws Exception @@ -237,7 +250,9 @@ public void addMaybeStartHandler(Handler handler, Stage stage) throws Exception public void start() throws Exception { synchronized (handlers) { - started.set(true); + if (!started.compareAndSet(false, true)) { + throw new ISE("Already started"); + } for (Stage stage : stagesOrdered()) { currStage = stage; for (Handler handler : handlers.get(stage)) { @@ -250,6 +265,10 @@ public void start() throws Exception public void stop() { synchronized (handlers) { + if (!started.compareAndSet(true, false)) { + log.info("Already stopped and stop was called. Silently skipping"); + return; + } List exceptions = Lists.newArrayList(); for (Stage stage : Lists.reverse(stagesOrdered())) { @@ -266,7 +285,6 @@ public void stop() } } } - started.set(false); if (!exceptions.isEmpty()) { throw Throwables.propagate(exceptions.get(0)); @@ -274,22 +292,28 @@ public void stop() } } - public void join() throws InterruptedException + public void ensureShutdownHook() { - Runtime.getRuntime().addShutdownHook( - new Thread( - new Runnable() - { - @Override - public void run() + if (shutdownHookRegistered.compareAndSet(false, true)) { + Runtime.getRuntime().addShutdownHook( + new Thread( + new Runnable() { - log.info("Running shutdown hook"); - stop(); + @Override + public void run() + { + log.info("Running shutdown hook"); + stop(); + } } - } - ) - ); + ) + ); + } + } + public void join() throws InterruptedException + { + ensureShutdownHook(); Thread.currentThread().join(); } @@ -302,6 +326,7 @@ private static List stagesOrdered() public static interface Handler { public void start() throws Exception; + public void stop(); } diff --git a/src/test/java/com/metamx/common/lifecycle/LifecycleTest.java b/src/test/java/com/metamx/common/lifecycle/LifecycleTest.java index 28c09c73..535f9700 100644 --- a/src/test/java/com/metamx/common/lifecycle/LifecycleTest.java +++ b/src/test/java/com/metamx/common/lifecycle/LifecycleTest.java @@ -16,17 +16,156 @@ package com.metamx.common.lifecycle; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.ISE; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** */ public class LifecycleTest { + @Test + public void testConcurrentStartStopOnce() throws Exception + { + final int numThreads = 10; + ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads)); + + final Lifecycle lifecycle = new Lifecycle(); + final AtomicLong startedCount = new AtomicLong(0L); + final AtomicLong failedCount = new AtomicLong(0L); + final Lifecycle.Handler exceptionalHandler = new Lifecycle.Handler() + { + final AtomicBoolean started = new AtomicBoolean(false); + + @Override + public void start() throws Exception + { + if (!started.compareAndSet(false, true)) { + failedCount.incrementAndGet(); + throw new ISE("Already started"); + } + startedCount.incrementAndGet(); + } + + @Override + public void stop() + { + if (!started.compareAndSet(true, false)) { + failedCount.incrementAndGet(); + throw new ISE("Not yet started started"); + } + } + }; + lifecycle.addHandler(exceptionalHandler); + Collection> futures = new ArrayList<>(numThreads); + final CyclicBarrier barrier = new CyclicBarrier(numThreads); + final AtomicBoolean started = new AtomicBoolean(false); + for (int i = 0; i < numThreads; ++i) { + futures.add( + executorService.submit( + new Runnable() + { + @Override + public void run() + { + try { + for (int i = 0; i < 1024; ++i) { + if (started.compareAndSet(false, true)) { + lifecycle.start(); + } + barrier.await(); + lifecycle.stop(); + barrier.await(); + started.set(false); + barrier.await(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ) + ); + } + try { + Futures.allAsList(futures).get(); + } + finally { + lifecycle.stop(); + } + Assert.assertEquals(0, failedCount.get()); + Assert.assertTrue(startedCount.get() > 0); + executorService.shutdownNow(); + } + + @Test + public void testStartStopOnce() throws Exception + { + final Lifecycle lifecycle = new Lifecycle(); + final AtomicLong startedCount = new AtomicLong(0L); + final AtomicLong failedCount = new AtomicLong(0L); + Lifecycle.Handler exceptionalHandler = new Lifecycle.Handler() + { + final AtomicBoolean started = new AtomicBoolean(false); + + @Override + public void start() throws Exception + { + if (!started.compareAndSet(false, true)) { + failedCount.incrementAndGet(); + throw new ISE("Already started"); + } + startedCount.incrementAndGet(); + } + + @Override + public void stop() + { + if (!started.compareAndSet(true, false)) { + failedCount.incrementAndGet(); + throw new ISE("Not yet started started"); + } + } + }; + lifecycle.addHandler(exceptionalHandler); + lifecycle.start(); + lifecycle.stop(); + lifecycle.stop(); + lifecycle.stop(); + lifecycle.start(); + lifecycle.stop(); + Assert.assertEquals(2, startedCount.get()); + Assert.assertEquals(0, failedCount.get()); + Exception ex = null; + try { + exceptionalHandler.stop(); + } + catch (Exception e) { + ex = e; + } + Assert.assertNotNull("Should have exception", ex); + } + @Test public void testSanity() throws Exception {