Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions src/main/java/com/metamx/common/lifecycle/Lifecycle.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class Lifecycle

private final Map<Stage, CopyOnWriteArrayList<Handler>> handlers;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean shutdownHookRegistered = new AtomicBoolean(false);
private volatile Stage currStage = null;

public static enum Stage
Expand All @@ -72,6 +73,7 @@ public Lifecycle()
* Stage.NORMAL. If the lifecycle has already been started, it throws an {@link ISE}
*
* @param o The object to add to the lifecycle
*
* @throws ISE {@link Lifecycle#addHandler(Handler, Stage)}
*/
public <T> T addManagedInstance(T o)
Expand All @@ -84,8 +86,9 @@ public <T> T addManagedInstance(T o)
* Adds a "managed" instance (annotated with {@link LifecycleStart} and {@link LifecycleStop}) to the Lifecycle.
* If the lifecycle has already been started, it throws an {@link ISE}
*
* @param o The object to add to the lifecycle
* @param o The object to add to the lifecycle
* @param stage The stage to add the lifecycle at
*
* @throws ISE {@link Lifecycle#addHandler(Handler, Stage)}
*/
public <T> T addManagedInstance(T o, Stage stage)
Expand All @@ -99,6 +102,7 @@ public <T> T addManagedInstance(T o, Stage stage)
* already been started, it throws an {@link ISE}
*
* @param o The object to add to the lifecycle
*
* @throws ISE {@link Lifecycle#addHandler(Handler, Stage)}
*/
public <T> T addStartCloseInstance(T o)
Expand All @@ -111,8 +115,9 @@ public <T> T addStartCloseInstance(T o)
* Adds an instance with a start() and/or close() method to the Lifecycle. If the lifecycle has already been started,
* it throws an {@link ISE}
*
* @param o The object to add to the lifecycle
* @param o The object to add to the lifecycle
* @param stage The stage to add the lifecycle at
*
* @throws ISE {@link Lifecycle#addHandler(Handler, Stage)}
*/
public <T> T addStartCloseInstance(T o, Stage stage)
Expand All @@ -126,6 +131,7 @@ public <T> T addStartCloseInstance(T o, Stage stage)
* an {@link ISE}
*
* @param handler The hander to add to the lifecycle
*
* @throws ISE {@link Lifecycle#addHandler(Handler, Stage)}
*/
public void addHandler(Handler handler)
Expand All @@ -137,7 +143,8 @@ public void addHandler(Handler handler)
* Adds a handler to the Lifecycle. If the lifecycle has already been started, it throws an {@link ISE}
*
* @param handler The hander to add to the lifecycle
* @param stage The stage to add the lifecycle at
* @param stage The stage to add the lifecycle at
*
* @throws ISE indicates that the lifecycle has already been started and thus cannot be added to
*/
public void addHandler(Handler handler, Stage stage)
Expand All @@ -155,6 +162,7 @@ public void addHandler(Handler handler, Stage stage)
* Stage.NORMAL and starts it if the lifecycle has already been started.
*
* @param o The object to add to the lifecycle
*
* @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)}
*/
public <T> T addMaybeStartManagedInstance(T o) throws Exception
Expand All @@ -167,8 +175,9 @@ public <T> T addMaybeStartManagedInstance(T o) throws Exception
* Adds a "managed" instance (annotated with {@link LifecycleStart} and {@link LifecycleStop}) to the Lifecycle
* and starts it if the lifecycle has already been started.
*
* @param o The object to add to the lifecycle
* @param o The object to add to the lifecycle
* @param stage The stage to add the lifecycle at
*
* @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)}
*/
public <T> T addMaybeStartManagedInstance(T o, Stage stage) throws Exception
Expand All @@ -182,6 +191,7 @@ public <T> T addMaybeStartManagedInstance(T o, Stage stage) throws Exception
* lifecycle has already been started.
*
* @param o The object to add to the lifecycle
*
* @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)}
*/
public <T> T addMaybeStartStartCloseInstance(T o) throws Exception
Expand All @@ -194,8 +204,9 @@ public <T> T addMaybeStartStartCloseInstance(T o) throws Exception
* Adds an instance with a start() and/or close() method to the Lifecycle and starts it if the lifecycle has
* already been started.
*
* @param o The object to add to the lifecycle
* @param o The object to add to the lifecycle
* @param stage The stage to add the lifecycle at
*
* @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)}
*/
public <T> T addMaybeStartStartCloseInstance(T o, Stage stage) throws Exception
Expand All @@ -208,6 +219,7 @@ public <T> T addMaybeStartStartCloseInstance(T o, Stage stage) throws Exception
* Adds a handler to the Lifecycle at the Stage.NORMAL stage and starts it if the lifecycle has already been started.
*
* @param handler The hander to add to the lifecycle
*
* @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)}
*/
public void addMaybeStartHandler(Handler handler) throws Exception
Expand All @@ -219,7 +231,8 @@ public void addMaybeStartHandler(Handler handler) throws Exception
* Adds a handler to the Lifecycle and starts it if the lifecycle has already been started.
*
* @param handler The hander to add to the lifecycle
* @param stage The stage to add the lifecycle at
* @param stage The stage to add the lifecycle at
*
* @throws Exception an exception thrown from handler.start(). If an exception is thrown, the handler is *not* added
*/
public void addMaybeStartHandler(Handler handler, Stage stage) throws Exception
Expand All @@ -237,7 +250,9 @@ public void addMaybeStartHandler(Handler handler, Stage stage) throws Exception
public void start() throws Exception
{
synchronized (handlers) {
started.set(true);
if (!started.compareAndSet(false, true)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an error? I think unlike stop, there's no good reason for start to be called multiple times

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe? Prior behavior did not throw an error, which is why I did not do so here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like it makes sense to either not change the behavior, or to have it throw an error; silently skipping over double starts seems sketchy. At least the old behavior would probably blow up in some way and let us know something was being done wrong.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

throw new ISE("Already started");
}
for (Stage stage : stagesOrdered()) {
currStage = stage;
for (Handler handler : handlers.get(stage)) {
Expand All @@ -250,6 +265,10 @@ public void start() throws Exception
public void stop()
{
synchronized (handlers) {
if (!started.compareAndSet(true, false)) {
log.info("Already stopped and stop was called. Silently skipping");
return;
}
List<Exception> exceptions = Lists.newArrayList();

for (Stage stage : Lists.reverse(stagesOrdered())) {
Expand All @@ -266,30 +285,35 @@ public void stop()
}
}
}
started.set(false);

if (!exceptions.isEmpty()) {
throw Throwables.propagate(exceptions.get(0));
}
}
}

public void join() throws InterruptedException
public void ensureShutdownHook()
{
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable()
{
@Override
public void run()
if (shutdownHookRegistered.compareAndSet(false, true)) {
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable()
{
log.info("Running shutdown hook");
stop();
@Override
public void run()
{
log.info("Running shutdown hook");
stop();
}
}
}
)
);
)
);
}
}

public void join() throws InterruptedException
{
ensureShutdownHook();
Thread.currentThread().join();
}

Expand All @@ -302,6 +326,7 @@ private static List<Stage> stagesOrdered()
public static interface Handler
{
public void start() throws Exception;

public void stop();
}

Expand Down
139 changes: 139 additions & 0 deletions src/test/java/com/metamx/common/lifecycle/LifecycleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,156 @@

package com.metamx.common.lifecycle;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
*/
public class LifecycleTest
{
@Test
public void testConcurrentStartStopOnce() throws Exception
{
final int numThreads = 10;
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads));

final Lifecycle lifecycle = new Lifecycle();
final AtomicLong startedCount = new AtomicLong(0L);
final AtomicLong failedCount = new AtomicLong(0L);
final Lifecycle.Handler exceptionalHandler = new Lifecycle.Handler()
{
final AtomicBoolean started = new AtomicBoolean(false);

@Override
public void start() throws Exception
{
if (!started.compareAndSet(false, true)) {
failedCount.incrementAndGet();
throw new ISE("Already started");
}
startedCount.incrementAndGet();
}

@Override
public void stop()
{
if (!started.compareAndSet(true, false)) {
failedCount.incrementAndGet();
throw new ISE("Not yet started started");
}
}
};
lifecycle.addHandler(exceptionalHandler);
Collection<ListenableFuture<?>> futures = new ArrayList<>(numThreads);
final CyclicBarrier barrier = new CyclicBarrier(numThreads);
final AtomicBoolean started = new AtomicBoolean(false);
for (int i = 0; i < numThreads; ++i) {
futures.add(
executorService.submit(
new Runnable()
{
@Override
public void run()
{
try {
for (int i = 0; i < 1024; ++i) {
if (started.compareAndSet(false, true)) {
lifecycle.start();
}
barrier.await();
lifecycle.stop();
barrier.await();
started.set(false);
barrier.await();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
)
);
}
try {
Futures.allAsList(futures).get();
}
finally {
lifecycle.stop();
}
Assert.assertEquals(0, failedCount.get());
Assert.assertTrue(startedCount.get() > 0);
executorService.shutdownNow();
}

@Test
public void testStartStopOnce() throws Exception
{
final Lifecycle lifecycle = new Lifecycle();
final AtomicLong startedCount = new AtomicLong(0L);
final AtomicLong failedCount = new AtomicLong(0L);
Lifecycle.Handler exceptionalHandler = new Lifecycle.Handler()
{
final AtomicBoolean started = new AtomicBoolean(false);

@Override
public void start() throws Exception
{
if (!started.compareAndSet(false, true)) {
failedCount.incrementAndGet();
throw new ISE("Already started");
}
startedCount.incrementAndGet();
}

@Override
public void stop()
{
if (!started.compareAndSet(true, false)) {
failedCount.incrementAndGet();
throw new ISE("Not yet started started");
}
}
};
lifecycle.addHandler(exceptionalHandler);
lifecycle.start();
lifecycle.stop();
lifecycle.stop();
lifecycle.stop();
lifecycle.start();
lifecycle.stop();
Assert.assertEquals(2, startedCount.get());
Assert.assertEquals(0, failedCount.get());
Exception ex = null;
try {
exceptionalHandler.stop();
}
catch (Exception e) {
ex = e;
}
Assert.assertNotNull("Should have exception", ex);
}

@Test
public void testSanity() throws Exception
{
Expand Down