Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 72 additions & 12 deletions common/src/main/java/io/druid/common/config/Log4jShutdown.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@

package io.druid.common.config;

import com.google.common.base.Throwables;
import org.apache.logging.log4j.core.util.Cancellable;
import org.apache.logging.log4j.core.util.ShutdownCallbackRegistry;

import javax.annotation.concurrent.GuardedBy;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class Log4jShutdown implements ShutdownCallbackRegistry, org.apache.logging.log4j.core.LifeCycle
{
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
private static final long SHUTDOWN_WAIT_TIMEOUT = 60000;

private final SynchronizedStateHolder state = new SynchronizedStateHolder(State.INITIALIZED);
private final Queue<Runnable> shutdownCallbacks = new ConcurrentLinkedQueue<>();
private final AtomicBoolean callbacksRun = new AtomicBoolean(false);

@Override
public Cancellable addShutdownCallback(final Runnable callback)
Expand Down Expand Up @@ -95,21 +97,23 @@ public void start()
@Override
public void stop()
{
if (callbacksRun.get()) {
if (!state.compareAndSet(State.STARTED, State.STOPPING)) {
State current = state.waitForTransition(State.STOPPING, State.STOPPED, SHUTDOWN_WAIT_TIMEOUT);
if (current != State.STOPPED) {
throw new IllegalStateException(String.format("Expected state [%s] found [%s]", State.STARTED, current));
}
return;
}
if (!state.compareAndSet(State.STARTED, State.STOPPED)) {
throw new IllegalStateException(String.format("Expected state [%s] found [%s]", State.STARTED, state.get()));
try {
runCallbacks();
}
finally {
state.compareAndSet(State.STOPPING, State.STOPPED);
}
}

public void runCallbacks()
private void runCallbacks()
{
if (!callbacksRun.compareAndSet(false, true)) {
// Already run, skip
return;
}
stop();
RuntimeException e = null;
for (Runnable callback = shutdownCallbacks.poll(); callback != null; callback = shutdownCallbacks.poll()) {
try {
Expand Down Expand Up @@ -138,4 +142,60 @@ public boolean isStopped()
{
return State.STOPPED.equals(getState());
}

private static class SynchronizedStateHolder
{
@GuardedBy("this")
private State current;

private SynchronizedStateHolder(State initial) {
current = initial;
}

private synchronized boolean compareAndSet(State expected, State transition)
{
if (current == expected) {
return transition(transition);
}
return false;
}

/**
* if current state is `expected`, wait it to be changed into `transition` state for `timeout` msec.
* if it's not, return current state immediately.
*
* @return current state
*/
private synchronized State waitForTransition(State expected, State transition, long timeout)
{
if (current == expected) {
long remaining = timeout;
try {
long prev = System.currentTimeMillis();
while (current != transition && remaining > 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be while(current == expected), wondering if we should return false if some other transition other than the expected happens ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw ISE with a note in the method signature would be appropriate

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or return the state that was found

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

wait(remaining);
long now = System.currentTimeMillis();
remaining -= now - prev;
prev = now;
}
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
return current;
}

private synchronized boolean transition(State transition)
{
current = transition;
notifyAll();
return true;
}

private synchronized State get()
{
return current;
}
}
}
24 changes: 12 additions & 12 deletions services/src/main/java/io/druid/cli/CliPeon.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,22 +232,22 @@ public void run()
Injector injector = makeInjector();
try {
final Lifecycle lifecycle = initLifecycle(injector);
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable()
{
@Override
public void run()
{
log.info("Running shutdown hook");
lifecycle.stop();
}
}
)
final Thread hook = new Thread(
new Runnable()
{
@Override
public void run()
{
log.info("Running shutdown hook");
lifecycle.stop();
}
}
);
Runtime.getRuntime().addShutdownHook(hook);
injector.getInstance(ExecutorLifecycle.class).join();
// Explicitly call lifecycle stop, dont rely on shutdown hook.
lifecycle.stop();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is using Thread now, can this simply call hook.run()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be. But if lifecycle.start is called once, should we be expected to call lifecycle.stop?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, hook.run() should call its Runnable.run() which calls lifecycle.stop().

But in the interest of minimizing changes for this PR such a change can be ignored for now.

Runtime.getRuntime().removeShutdownHook(hook);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this is more appropriately solved by metamx/java-util#28

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not opposed to the changes in this file though, as it adds better future proofing of the shutdown process for peons

}
catch (Throwable t) {
log.error(t, "Error when starting up. Failing.");
Expand Down