From 882b3f67959910a7040fda0661be2655953d7d2b Mon Sep 17 00:00:00 2001 From: George Wu Date: Wed, 4 Dec 2024 15:21:06 -0500 Subject: [PATCH 01/10] Fix supervisor stop --- .../supervisor/SupervisorManager.java | 56 +++++++++++++++---- .../supervisor/SeekableStreamSupervisor.java | 2 +- 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 243c41e4b5d8..1339d664476a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -21,7 +21,12 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; + +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.Tasks; @@ -29,7 +34,9 @@ import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -39,10 +46,18 @@ import org.apache.druid.segment.incremental.ParseExceptionReport; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig.DEFAULT_SHUTDOWN_TIMEOUT; /** * Manages the creation and lifetime of {@link Supervisor}. @@ -56,6 +71,7 @@ public class SupervisorManager // SupervisorTaskAutoScaler could be null private final ConcurrentHashMap autoscalers = new ConcurrentHashMap<>(); private final Object lock = new Object(); + private final ListeningExecutorService shutdownExec; private volatile boolean started = false; @@ -63,6 +79,9 @@ public class SupervisorManager public SupervisorManager(MetadataSupervisorManager metadataSupervisorManager) { this.metadataSupervisorManager = metadataSupervisorManager; + this.shutdownExec = MoreExecutors.listeningDecorator( + Execs.multiThreaded(25, "supervisor-manager-shutdown-%d") + ); } public MetadataSupervisorManager getMetadataSupervisorManager() @@ -212,19 +231,36 @@ public void start() public void stop() { Preconditions.checkState(started, "SupervisorManager not started"); - + List> stopFutures = new ArrayList<>(); synchronized (lock) { for (String id : supervisors.keySet()) { - try { - supervisors.get(id).lhs.stop(false); - SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); - if (autoscaler != null) { - autoscaler.stop(); + stopFutures.add(shutdownExec.submit(() -> { + try { + supervisors.get(id).lhs.stop(false); + SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); + if (autoscaler != null) { + autoscaler.stop(); + } } - } - catch (Exception e) { - log.warn(e, "Caught exception while stopping supervisor [%s]", id); - } + catch (Exception e) { + log.warn(e, "Caught exception while stopping supervisor [%s]", id); + } + return null; + })); + } + log.info("Waiting for [%d] supervisors to shutdown", stopFutures.size()); + try { + long waitMillis = SeekableStreamSupervisorTuningConfig.defaultDuration( + null, + DEFAULT_SHUTDOWN_TIMEOUT + ).getMillis(); + FutureUtils.coalesce(stopFutures).get(waitMillis, TimeUnit.MILLISECONDS); + } catch (Exception e) { + log.warn( + "Stopped [%d] out of [%d] supervisors. Remaining supervisors will be killed.", + stopFutures.stream().filter(Future::isDone).count(), + stopFutures.size() + ); } supervisors.clear(); autoscalers.clear(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 8b4845b08d07..0c24950c067b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1075,7 +1075,7 @@ public void stop(boolean stopGracefully) long shutdownTimeoutMillis = tuningConfig.getShutdownTimeout().getMillis(); long endTime = System.currentTimeMillis() + shutdownTimeoutMillis; - while (!stopped) { + while (!stopped && stopGracefully) { long sleepTime = endTime - System.currentTimeMillis(); if (sleepTime <= 0) { log.info("Timed out while waiting for shutdown (timeout [%,dms])", shutdownTimeoutMillis); From 870d5b14eaa20602d6d172cb91f46e8b686f5a30 Mon Sep 17 00:00:00 2001 From: George Wu Date: Thu, 5 Dec 2024 12:57:56 -0500 Subject: [PATCH 02/10] fix checkstyle --- .../overlord/supervisor/SupervisorManager.java | 16 +++------------- .../supervisor/SeekableStreamSupervisor.java | 6 +++++- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 1339d664476a..4cbdfaad6961 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -25,7 +25,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; - import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.TaskLockType; @@ -34,7 +33,6 @@ import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; -import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -50,15 +48,10 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import static org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig.DEFAULT_SHUTDOWN_TIMEOUT; - /** * Manages the creation and lifetime of {@link Supervisor}. */ @@ -250,12 +243,9 @@ public void stop() } log.info("Waiting for [%d] supervisors to shutdown", stopFutures.size()); try { - long waitMillis = SeekableStreamSupervisorTuningConfig.defaultDuration( - null, - DEFAULT_SHUTDOWN_TIMEOUT - ).getMillis(); - FutureUtils.coalesce(stopFutures).get(waitMillis, TimeUnit.MILLISECONDS); - } catch (Exception e) { + FutureUtils.coalesce(stopFutures).get(80, TimeUnit.SECONDS); + } + catch (Exception e) { log.warn( "Stopped [%d] out of [%d] supervisors. Remaining supervisors will be killed.", stopFutures.stream().filter(Future::isDone).count(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 0c24950c067b..3653d099bcd5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1075,7 +1075,11 @@ public void stop(boolean stopGracefully) long shutdownTimeoutMillis = tuningConfig.getShutdownTimeout().getMillis(); long endTime = System.currentTimeMillis() + shutdownTimeoutMillis; - while (!stopped && stopGracefully) { + while (!stopped) { + if (!stopGracefully) { + stopped = true; + break; + } long sleepTime = endTime - System.currentTimeMillis(); if (sleepTime <= 0) { log.info("Timed out while waiting for shutdown (timeout [%,dms])", shutdownTimeoutMillis); From 7b11512918f0428cec92d2b1c249c2896877fe18 Mon Sep 17 00:00:00 2001 From: George Wu Date: Fri, 13 Dec 2024 11:46:38 -0500 Subject: [PATCH 03/10] use a new executor --- .../overlord/supervisor/SupervisorManager.java | 12 +++--------- .../supervisor/SeekableStreamSupervisor.java | 11 +++++++++++ .../indexing/overlord/supervisor/Supervisor.java | 12 ++++++++++++ 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 4cbdfaad6961..ca07f010ecec 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -64,7 +64,6 @@ public class SupervisorManager // SupervisorTaskAutoScaler could be null private final ConcurrentHashMap autoscalers = new ConcurrentHashMap<>(); private final Object lock = new Object(); - private final ListeningExecutorService shutdownExec; private volatile boolean started = false; @@ -72,9 +71,6 @@ public class SupervisorManager public SupervisorManager(MetadataSupervisorManager metadataSupervisorManager) { this.metadataSupervisorManager = metadataSupervisorManager; - this.shutdownExec = MoreExecutors.listeningDecorator( - Execs.multiThreaded(25, "supervisor-manager-shutdown-%d") - ); } public MetadataSupervisorManager getMetadataSupervisorManager() @@ -227,9 +223,8 @@ public void stop() List> stopFutures = new ArrayList<>(); synchronized (lock) { for (String id : supervisors.keySet()) { - stopFutures.add(shutdownExec.submit(() -> { try { - supervisors.get(id).lhs.stop(false); + stopFutures.add(supervisors.get(id).lhs.stopAsync(false)); SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); if (autoscaler != null) { autoscaler.stop(); @@ -238,15 +233,14 @@ public void stop() catch (Exception e) { log.warn(e, "Caught exception while stopping supervisor [%s]", id); } - return null; - })); } log.info("Waiting for [%d] supervisors to shutdown", stopFutures.size()); try { - FutureUtils.coalesce(stopFutures).get(80, TimeUnit.SECONDS); + FutureUtils.coalesce(stopFutures).get(); } catch (Exception e) { log.warn( + e, "Stopped [%d] out of [%d] supervisors. Remaining supervisors will be killed.", stopFutures.stream().filter(Future::isDone).count(), stopFutures.size() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 3653d099bcd5..414364430d2e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -34,6 +34,7 @@ import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap; @@ -123,6 +124,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -1107,6 +1109,15 @@ public void stop(boolean stopGracefully) } } + @Override + public ListenableFuture stopAsync(boolean stopGracefully) { + ListeningExecutorService shutdownExec = MoreExecutors.listeningDecorator(Execs.singleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Shutdown-%d")); + return shutdownExec.submit(() -> { + stop(stopGracefully); + shutdownExec.shutdown(); + return null; + }); + } @Override public void reset(@Nullable final DataSourceMetadata dataSourceMetadata) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 63c05be93ca1..c029eab94c88 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -21,12 +21,17 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import net.spy.memcached.internal.ImmediateFuture; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.segment.incremental.ParseExceptionReport; import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; /** * An interface representing a general supervisor for managing ingestion tasks. For streaming ingestion use cases, @@ -44,6 +49,13 @@ public interface Supervisor */ void stop(boolean stopGracefully); + default ListenableFuture stopAsync(boolean stopGracefully) { + stop(stopGracefully); + SettableFuture stopFuture = SettableFuture.create(); + stopFuture.set(null); + return stopFuture; + } + SupervisorReport getStatus(); SupervisorStateManager.State getState(); From c875367541a8100d3f02bd3dfdb6e059eb3d7839 Mon Sep 17 00:00:00 2001 From: George Wu Date: Fri, 13 Dec 2024 12:17:54 -0500 Subject: [PATCH 04/10] PR changes --- .../supervisor/SupervisorManager.java | 22 ++++++++----------- .../supervisor/SeekableStreamSupervisor.java | 8 ++++--- .../overlord/supervisor/Supervisor.java | 8 +++---- 3 files changed, 17 insertions(+), 21 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index ca07f010ecec..4b5a71bd9cb9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -22,8 +22,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.error.DruidException; @@ -34,7 +32,6 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.common.Pair; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -50,7 +47,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; /** * Manages the creation and lifetime of {@link Supervisor}. @@ -223,16 +219,16 @@ public void stop() List> stopFutures = new ArrayList<>(); synchronized (lock) { for (String id : supervisors.keySet()) { - try { - stopFutures.add(supervisors.get(id).lhs.stopAsync(false)); - SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); - if (autoscaler != null) { - autoscaler.stop(); - } - } - catch (Exception e) { - log.warn(e, "Caught exception while stopping supervisor [%s]", id); + try { + stopFutures.add(supervisors.get(id).lhs.stopAsync(false)); + SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); + if (autoscaler != null) { + autoscaler.stop(); } + } + catch (Exception e) { + log.warn(e, "Caught exception while stopping supervisor [%s]", id); + } } log.info("Waiting for [%d] supervisors to shutdown", stopFutures.size()); try { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 414364430d2e..ec0255c8cd68 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -124,7 +124,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -1110,8 +1109,11 @@ public void stop(boolean stopGracefully) } @Override - public ListenableFuture stopAsync(boolean stopGracefully) { - ListeningExecutorService shutdownExec = MoreExecutors.listeningDecorator(Execs.singleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Shutdown-%d")); + public ListenableFuture stopAsync(boolean stopGracefully) + { + ListeningExecutorService shutdownExec = MoreExecutors.listeningDecorator( + Execs.singleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Shutdown-%d") + ); return shutdownExec.submit(() -> { stop(stopGracefully); shutdownExec.shutdown(); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index c029eab94c88..e91ec613c3b7 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -23,15 +23,12 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import net.spy.memcached.internal.ImmediateFuture; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.segment.incremental.ParseExceptionReport; import javax.annotation.Nullable; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; /** * An interface representing a general supervisor for managing ingestion tasks. For streaming ingestion use cases, @@ -49,9 +46,10 @@ public interface Supervisor */ void stop(boolean stopGracefully); - default ListenableFuture stopAsync(boolean stopGracefully) { - stop(stopGracefully); + default ListenableFuture stopAsync(boolean stopGracefully) + { SettableFuture stopFuture = SettableFuture.create(); + stop(stopGracefully); stopFuture.set(null); return stopFuture; } From 5ebd372a1b66ae5620018b7668944f887e712420 Mon Sep 17 00:00:00 2001 From: George Wu Date: Fri, 13 Dec 2024 14:38:01 -0500 Subject: [PATCH 05/10] add some logging --- .../druid/indexing/overlord/supervisor/SupervisorManager.java | 1 + .../seekablestream/supervisor/SeekableStreamSupervisor.java | 4 ---- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 4b5a71bd9cb9..6f894d98b6de 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -218,6 +218,7 @@ public void stop() Preconditions.checkState(started, "SupervisorManager not started"); List> stopFutures = new ArrayList<>(); synchronized (lock) { + log.info("Stopping [%d] supervisors", supervisors.keySet().size()); for (String id : supervisors.keySet()) { try { stopFutures.add(supervisors.get(id).lhs.stopAsync(false)); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ec0255c8cd68..25ae91859e1f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1077,10 +1077,6 @@ public void stop(boolean stopGracefully) long shutdownTimeoutMillis = tuningConfig.getShutdownTimeout().getMillis(); long endTime = System.currentTimeMillis() + shutdownTimeoutMillis; while (!stopped) { - if (!stopGracefully) { - stopped = true; - break; - } long sleepTime = endTime - System.currentTimeMillis(); if (sleepTime <= 0) { log.info("Timed out while waiting for shutdown (timeout [%,dms])", shutdownTimeoutMillis); From ed06f88ee990c1eb2c614af918ae253666786fbc Mon Sep 17 00:00:00 2001 From: George Wu Date: Fri, 13 Dec 2024 15:39:57 -0500 Subject: [PATCH 06/10] add test --- .../SeekableStreamSupervisorStateTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index af66ce3b8b97..175089c2ca2a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1000,6 +1000,28 @@ public void testStopping() verifyAll(); } + @Test + public void testStopGracefully() throws Exception + { + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); + + taskRunner.unregisterListener("testSupervisorId"); + indexTaskClient.close(); + recordSupplier.close(); + + replayAll(); + SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + supervisor.start(); + supervisor.runInternal(); + ListenableFuture stopFuture = supervisor.stopAsync(false); + stopFuture.get(); + verifyAll(); + } + @Test public void testStoppingGracefully() { From 93024c7364cce1f49ff3622a8774f4d7ea5dcb05 Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 16 Dec 2024 10:34:31 -0500 Subject: [PATCH 07/10] add another test --- .../supervisor/StreamSupervisorTest.java | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java index 287a13c34366..01d543d3a312 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java @@ -26,6 +26,7 @@ import org.junit.Test; import javax.annotation.Nullable; +import java.util.concurrent.Future; public class StreamSupervisorTest { @@ -100,4 +101,74 @@ public int getActiveTaskGroupsCount() ex.getMessage() ); } + + @Test + public void testDefaultStopAsync() + { + // Create an instance of stream supervisor without overriding stopAsync(). + final StreamSupervisor streamSupervisor = new StreamSupervisor() + { + private SupervisorStateManager.State state = SupervisorStateManager.BasicState.RUNNING; + + @Override + public void start() + { + + } + + @Override + public void stop(boolean stopGracefully) + { + state = SupervisorStateManager.BasicState.STOPPING; + } + + @Override + public SupervisorReport getStatus() + { + return null; + } + + @Override + public SupervisorStateManager.State getState() + { + return state; + } + + @Override + public void reset(@Nullable DataSourceMetadata dataSourceMetadata) + { + + } + + @Override + public void resetOffsets(DataSourceMetadata resetDataSourceMetadata) + { + + } + + @Override + public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) + { + + } + + @Override + public LagStats computeLagStats() + { + return null; + } + + @Override + public int getActiveTaskGroupsCount() + { + return 0; + } + }; + + Future stopAsyncFuture = streamSupervisor.stopAsync(true); + Assert.assertTrue(stopAsyncFuture.isDone()); + + // stop should be called by stopAsync + Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, streamSupervisor.getState()); + } } From f8ce8219d58f9e763968a26d55708529e579c046 Mon Sep 17 00:00:00 2001 From: George Wu Date: Mon, 16 Dec 2024 12:18:54 -0500 Subject: [PATCH 08/10] fix supervisormanagertest --- .../overlord/supervisor/SupervisorManagerTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 81153f238a4e..5e639a9592c0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.indexing.common.TaskLockType; @@ -130,7 +131,9 @@ public void testCreateUpdateAndRemoveSupervisor() verifyAll(); resetAll(); - supervisor3.stop(false); + SettableFuture stopFuture = SettableFuture.create(); + stopFuture.set(null); + EasyMock.expect(supervisor3.stopAsync(false)).andReturn(stopFuture); replayAll(); manager.stop(); @@ -361,7 +364,7 @@ public void testStopThrowsException() EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor1.start(); - supervisor1.stop(false); + supervisor1.stopAsync(false); EasyMock.expectLastCall().andThrow(new RuntimeException("RTE")); replayAll(); @@ -511,7 +514,9 @@ public void testCreateSuspendResumeAndStopSupervisor() // mock manager shutdown to ensure supervisor 3 stops resetAll(); - supervisor3.stop(false); + SettableFuture stopFuture = SettableFuture.create(); + stopFuture.set(null); + EasyMock.expect(supervisor3.stopAsync(false)).andReturn(stopFuture); replayAll(); manager.stop(); From ec4601fd344528c6ca2178f43451a237af822f4e Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Tue, 17 Dec 2024 11:18:58 -0500 Subject: [PATCH 09/10] Update indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java Co-authored-by: Kashif Faraz --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 25ae91859e1f..1b3f02151c9c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1108,7 +1108,7 @@ public void stop(boolean stopGracefully) public ListenableFuture stopAsync(boolean stopGracefully) { ListeningExecutorService shutdownExec = MoreExecutors.listeningDecorator( - Execs.singleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Shutdown-%d") + Execs.singleThreaded("supervisor-shutdown-" + StringUtils.encodeForFormat(supervisorId) + "--%d") ); return shutdownExec.submit(() -> { stop(stopGracefully); From 5b124c949e20f6b609bfb0848aa2de66005da18e Mon Sep 17 00:00:00 2001 From: George Wu Date: Tue, 17 Dec 2024 11:50:35 -0500 Subject: [PATCH 10/10] PR comments --- .../overlord/supervisor/SupervisorManager.java | 2 +- .../supervisor/SeekableStreamSupervisor.java | 5 +++-- .../overlord/supervisor/SupervisorManagerTest.java | 6 +++--- .../SeekableStreamSupervisorStateTest.java | 2 +- .../indexing/overlord/supervisor/Supervisor.java | 14 +++++++++++--- .../overlord/supervisor/StreamSupervisorTest.java | 2 +- 6 files changed, 20 insertions(+), 11 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 6f894d98b6de..731ddcaa1362 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -221,7 +221,7 @@ public void stop() log.info("Stopping [%d] supervisors", supervisors.keySet().size()); for (String id : supervisors.keySet()) { try { - stopFutures.add(supervisors.get(id).lhs.stopAsync(false)); + stopFutures.add(supervisors.get(id).lhs.stopAsync()); SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); if (autoscaler != null) { autoscaler.stop(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 1b3f02151c9c..eb4d11c99221 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1105,17 +1105,18 @@ public void stop(boolean stopGracefully) } @Override - public ListenableFuture stopAsync(boolean stopGracefully) + public ListenableFuture stopAsync() { ListeningExecutorService shutdownExec = MoreExecutors.listeningDecorator( Execs.singleThreaded("supervisor-shutdown-" + StringUtils.encodeForFormat(supervisorId) + "--%d") ); return shutdownExec.submit(() -> { - stop(stopGracefully); + stop(false); shutdownExec.shutdown(); return null; }); } + @Override public void reset(@Nullable final DataSourceMetadata dataSourceMetadata) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 5e639a9592c0..add6d4473dec 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -133,7 +133,7 @@ public void testCreateUpdateAndRemoveSupervisor() resetAll(); SettableFuture stopFuture = SettableFuture.create(); stopFuture.set(null); - EasyMock.expect(supervisor3.stopAsync(false)).andReturn(stopFuture); + EasyMock.expect(supervisor3.stopAsync()).andReturn(stopFuture); replayAll(); manager.stop(); @@ -364,7 +364,7 @@ public void testStopThrowsException() EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor1.start(); - supervisor1.stopAsync(false); + supervisor1.stopAsync(); EasyMock.expectLastCall().andThrow(new RuntimeException("RTE")); replayAll(); @@ -516,7 +516,7 @@ public void testCreateSuspendResumeAndStopSupervisor() resetAll(); SettableFuture stopFuture = SettableFuture.create(); stopFuture.set(null); - EasyMock.expect(supervisor3.stopAsync(false)).andReturn(stopFuture); + EasyMock.expect(supervisor3.stopAsync()).andReturn(stopFuture); replayAll(); manager.stop(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 175089c2ca2a..ac4e472e3780 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1017,7 +1017,7 @@ public void testStopGracefully() throws Exception supervisor.start(); supervisor.runInternal(); - ListenableFuture stopFuture = supervisor.stopAsync(false); + ListenableFuture stopFuture = supervisor.stopAsync(); stopFuture.get(); verifyAll(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index e91ec613c3b7..c0106f648d03 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -46,11 +46,19 @@ public interface Supervisor */ void stop(boolean stopGracefully); - default ListenableFuture stopAsync(boolean stopGracefully) + /** + * Starts non-graceful shutdown of the supervisor and returns a future that completes when shutdown is complete. + */ + default ListenableFuture stopAsync() { SettableFuture stopFuture = SettableFuture.create(); - stop(stopGracefully); - stopFuture.set(null); + try { + stop(false); + stopFuture.set(null); + } + catch (Exception e) { + stopFuture.setException(e); + } return stopFuture; } diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java index 01d543d3a312..0912bca8f8b7 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java @@ -165,7 +165,7 @@ public int getActiveTaskGroupsCount() } }; - Future stopAsyncFuture = streamSupervisor.stopAsync(true); + Future stopAsyncFuture = streamSupervisor.stopAsync(); Assert.assertTrue(stopAsyncFuture.isDone()); // stop should be called by stopAsync