Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
Expand All @@ -39,10 +41,12 @@
import org.apache.druid.segment.incremental.ParseExceptionReport;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

/**
* Manages the creation and lifetime of {@link Supervisor}.
Expand Down Expand Up @@ -212,11 +216,12 @@ public void start()
public void stop()
{
Preconditions.checkState(started, "SupervisorManager not started");

List<ListenableFuture<Void>> stopFutures = new ArrayList<>();
synchronized (lock) {
log.info("Stopping [%d] supervisors", supervisors.keySet().size());
for (String id : supervisors.keySet()) {
try {
supervisors.get(id).lhs.stop(false);
stopFutures.add(supervisors.get(id).lhs.stopAsync());
SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
if (autoscaler != null) {
autoscaler.stop();
Expand All @@ -226,6 +231,18 @@ public void stop()
log.warn(e, "Caught exception while stopping supervisor [%s]", id);
}
}
log.info("Waiting for [%d] supervisors to shutdown", stopFutures.size());
try {
FutureUtils.coalesce(stopFutures).get();
}
catch (Exception e) {
log.warn(
Comment thread
kfaraz marked this conversation as resolved.
e,
"Stopped [%d] out of [%d] supervisors. Remaining supervisors will be killed.",
stopFutures.stream().filter(Future::isDone).count(),
stopFutures.size()
);
}
supervisors.clear();
autoscalers.clear();
started = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
Expand Down Expand Up @@ -1103,6 +1104,19 @@ public void stop(boolean stopGracefully)
}
}

@Override
public ListenableFuture<Void> stopAsync()
{
ListeningExecutorService shutdownExec = MoreExecutors.listeningDecorator(
Execs.singleThreaded("supervisor-shutdown-" + StringUtils.encodeForFormat(supervisorId) + "--%d")
);
return shutdownExec.submit(() -> {
stop(false);
shutdownExec.shutdown();
return null;
});
}
Comment thread
kfaraz marked this conversation as resolved.

@Override
public void reset(@Nullable final DataSourceMetadata dataSourceMetadata)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.common.TaskLockType;
Expand Down Expand Up @@ -130,7 +131,9 @@ public void testCreateUpdateAndRemoveSupervisor()
verifyAll();

resetAll();
supervisor3.stop(false);
SettableFuture<Void> stopFuture = SettableFuture.create();
stopFuture.set(null);
EasyMock.expect(supervisor3.stopAsync()).andReturn(stopFuture);
replayAll();

manager.stop();
Expand Down Expand Up @@ -361,7 +364,7 @@ public void testStopThrowsException()

EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
supervisor1.stop(false);
supervisor1.stopAsync();
EasyMock.expectLastCall().andThrow(new RuntimeException("RTE"));
replayAll();

Expand Down Expand Up @@ -511,7 +514,9 @@ public void testCreateSuspendResumeAndStopSupervisor()

// mock manager shutdown to ensure supervisor 3 stops
resetAll();
supervisor3.stop(false);
SettableFuture<Void> stopFuture = SettableFuture.create();
stopFuture.set(null);
EasyMock.expect(supervisor3.stopAsync()).andReturn(stopFuture);
replayAll();

manager.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,28 @@ public void testStopping()
verifyAll();
}

@Test
public void testStopGracefully() throws Exception
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

taskRunner.unregisterListener("testSupervisorId");
indexTaskClient.close();
recordSupplier.close();

replayAll();
SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

supervisor.start();
supervisor.runInternal();
ListenableFuture<Void> stopFuture = supervisor.stopAsync();
stopFuture.get();
verifyAll();
}

@Test
public void testStoppingGracefully()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.segment.incremental.ParseExceptionReport;

Expand All @@ -44,6 +46,22 @@ public interface Supervisor
*/
void stop(boolean stopGracefully);

/**
* Starts non-graceful shutdown of the supervisor and returns a future that completes when shutdown is complete.
*/
default ListenableFuture<Void> stopAsync()
{
SettableFuture<Void> stopFuture = SettableFuture.create();
try {
stop(false);
stopFuture.set(null);
}
catch (Exception e) {
stopFuture.setException(e);
}
return stopFuture;
}

SupervisorReport getStatus();

SupervisorStateManager.State getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.junit.Test;

import javax.annotation.Nullable;
import java.util.concurrent.Future;

public class StreamSupervisorTest
{
Expand Down Expand Up @@ -100,4 +101,74 @@ public int getActiveTaskGroupsCount()
ex.getMessage()
);
}

@Test
public void testDefaultStopAsync()
{
// Create an instance of stream supervisor without overriding stopAsync().
final StreamSupervisor streamSupervisor = new StreamSupervisor()
{
private SupervisorStateManager.State state = SupervisorStateManager.BasicState.RUNNING;

@Override
public void start()
{

}

@Override
public void stop(boolean stopGracefully)
{
state = SupervisorStateManager.BasicState.STOPPING;
}

@Override
public SupervisorReport getStatus()
{
return null;
}

@Override
public SupervisorStateManager.State getState()
{
return state;
}

@Override
public void reset(@Nullable DataSourceMetadata dataSourceMetadata)
{

}

@Override
public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{

}

@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{

}

@Override
public LagStats computeLagStats()
{
return null;
}

@Override
public int getActiveTaskGroupsCount()
{
return 0;
}
};

Future<Void> stopAsyncFuture = streamSupervisor.stopAsync();
Assert.assertTrue(stopAsyncFuture.isDone());

// stop should be called by stopAsync
Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, streamSupervisor.getState());
}
}