From d7aa580f4e6fe80e2c1ea8673abab9e61aaf1be2 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 21 May 2025 12:05:56 +0530 Subject: [PATCH 01/12] Adding QOSFiltering for health checks on the overlord. --- .../server/initialization/ServerConfig.java | 53 ++++++++++++--- .../jetty/JettyServerModule.java | 15 ++--- .../server/ClientQuerySegmentWalkerTest.java | 2 +- .../druid/server/QueryResourceTest.java | 2 +- .../druid/server/QuerySchedulerTest.java | 2 +- .../org/apache/druid/cli/CliOverlord.java | 65 +++++++++++++++++-- .../org/apache/druid/cli/CliOverlordTest.java | 29 +++++++++ .../druid/sql/http/SqlResourceTest.java | 2 +- 8 files changed, 142 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java index 8fdef26a4105..4ecd3156b9cc 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java @@ -20,10 +20,10 @@ package org.apache.druid.server.initialization; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import org.apache.druid.common.exception.ErrorResponseTransformStrategy; import org.apache.druid.common.exception.NoErrorResponseTransformStrategy; +import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.HumanReadableBytesRange; import org.apache.druid.query.QueryContexts; @@ -46,9 +46,10 @@ public class ServerConfig { public static final int DEFAULT_GZIP_INFLATE_BUFFER_SIZE = 4096; + public static final int DEFAULT_NUM_PACKING_THREADS = 30; /** - * The ServerConfig is normally created using {@link org.apache.druid.guice.JsonConfigProvider} binding. + * The ServerConfig is normally created using {@link JsonConfigProvider} binding. * * This constructor is provided for callers that need to create a ServerConfig object with specific field values. */ @@ -104,12 +105,6 @@ public ServerConfig() } - @VisibleForTesting - public ServerConfig(boolean enableQueryRequestsQueuing) - { - this.enableQueryRequestsQueuing = enableQueryRequestsQueuing; - } - @JsonProperty @Min(1) private int numThreads = getDefaultNumThreads(); @@ -195,6 +190,11 @@ public ServerConfig(boolean enableQueryRequestsQueuing) @JsonProperty private boolean showDetailedJettyErrors = true; + private ServerConfig(Builder builder) + { + enableQueryRequestsQueuing = builder.enableQueryRequestsQueuing; + } + public int getNumThreads() { return numThreads; @@ -400,6 +400,41 @@ public String toString() public static int getDefaultNumThreads() { - return Math.max(10, (JvmUtils.getRuntimeInfo().getAvailableProcessors() * 17) / 16 + 2) + 30; + return Math.max(10, (JvmUtils.getRuntimeInfo().getAvailableProcessors() * 17) / 16 + 2) + DEFAULT_NUM_PACKING_THREADS; + } + + + public static final class Builder + { + private @Min(1) int numThreads; + private boolean enableQueryRequestsQueuing; + + private Builder() + { + + } + + public static Builder newBuilder() + { + return new Builder(); + } + + public Builder numThreads(@Min(1) int numThreads) + { + this.numThreads = numThreads; + return this; + } + + + public Builder enableQueryRequestsQueuing(boolean enableQueryRequestsQueuing) + { + this.enableQueryRequestsQueuing = enableQueryRequestsQueuing; + return this; + } + + public ServerConfig build() + { + return new ServerConfig(this); + } } } diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java index 8ce48871c155..95c095b143c4 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java @@ -196,13 +196,12 @@ static Server makeAndInitializeServer( // that concurrently handle the requests". int numServerThreads = config.getNumThreads() + getMaxJettyAcceptorsSelectorsNum(node); - final QueuedThreadPool threadPool; if (config.getQueueSize() == Integer.MAX_VALUE) { - threadPool = new QueuedThreadPool(); - threadPool.setMinThreads(numServerThreads); - threadPool.setMaxThreads(numServerThreads); + jettyServerThreadPool = new QueuedThreadPool(); + jettyServerThreadPool.setMinThreads(numServerThreads); + jettyServerThreadPool.setMaxThreads(numServerThreads); } else { - threadPool = new QueuedThreadPool( + jettyServerThreadPool = new QueuedThreadPool( numServerThreads, numServerThreads, 60000, // same default is used in other case when threadPool = new QueuedThreadPool() @@ -210,10 +209,10 @@ static Server makeAndInitializeServer( ); } - threadPool.setDaemon(true); - jettyServerThreadPool = threadPool; + jettyServerThreadPool.setDaemon(true); + - final Server server = new Server(threadPool); + final Server server = new Server(jettyServerThreadPool); // Without this bean set, the default ScheduledExecutorScheduler runs as non-daemon, causing lifecycle hooks to fail // to fire on main exit. Related bug: https://github.com/apache/druid/pull/1627 diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index a812acc45974..718ee5c8df87 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -240,7 +240,7 @@ public void setUp() 8, ManualQueryPrioritizationStrategy.INSTANCE, NoQueryLaningStrategy.INSTANCE, - new ServerConfig(false) + ServerConfig.Builder.newBuilder().enableQueryRequestsQueuing(false).build() ); initWalker(ImmutableMap.of(), scheduler); } diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index 684092904848..e1ae1c8ec699 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -1216,7 +1216,7 @@ public void testTooManyQuery() throws InterruptedException, ExecutionException ManualQueryPrioritizationStrategy.INSTANCE, NoQueryLaningStrategy.INSTANCE, // enable total laning - new ServerConfig(false) + ServerConfig.Builder.newBuilder().enableQueryRequestsQueuing(false).build() ); ArrayList> back2 = new ArrayList<>(); diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java index ed602678a82c..d2ae1c6da7dd 100644 --- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java +++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java @@ -92,7 +92,7 @@ public class QuerySchedulerTest private static final int TEST_HI_CAPACITY = 5; private static final int TEST_LO_CAPACITY = 2; private static final ServerConfig SERVER_CONFIG_WITHOUT_TOTAL = new ServerConfig(); - private static final ServerConfig SERVER_CONFIG_WITH_TOTAL = new ServerConfig(false); + private static final ServerConfig SERVER_CONFIG_WITH_TOTAL = ServerConfig.Builder.newBuilder().enableQueryRequestsQueuing(false).build(); private ListeningExecutorService executorService; private ObservableQueryScheduler scheduler; diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 95fa654674d9..5ef5790b7a3a 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -123,6 +123,7 @@ import org.apache.druid.server.http.RedirectInfo; import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.initialization.jetty.JettyBindings; import org.apache.druid.server.initialization.jetty.JettyServerInitUtils; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.metrics.ServiceStatusMonitor; @@ -143,7 +144,9 @@ import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -151,6 +154,7 @@ import java.util.Set; /** + * */ @Command( name = "overlord", @@ -160,6 +164,7 @@ public class CliOverlord extends ServerRunnable { private static final Logger log = new Logger(CliOverlord.class); private static final String DEFAULT_SERVICE_NAME = "druid/overlord"; + private static final int THREADS_RESERVED_FOR_HEALTH_CHECK = 1; protected static final List UNSECURED_PATHS = ImmutableList.of( "/druid/indexer/v1/isLeader", @@ -289,12 +294,7 @@ public void configure(Binder binder) .in(LazySingleton.class); } - Jerseys.addResource(binder, OverlordResource.class); - Jerseys.addResource(binder, SupervisorResource.class); - Jerseys.addResource(binder, HttpRemoteTaskRunnerResource.class); - Jerseys.addResource(binder, OverlordCompactionResource.class); - Jerseys.addResource(binder, OverlordDataSourcesResource.class); - + addOverlordJerseyResources(binder); binder.bind(AppenderatorsManager.class) .to(DummyForInjectionAppenderatorsManager.class) @@ -460,9 +460,30 @@ private void configureOverlordHelpers(Binder binder) ); } + /** + * Currenlty, the resource paths of the jersery resources on the overlord start with + *
    + *
  1. /druid/indexer/v1
  2. + *
  3. /druid-internal/v1
  4. + *
+ *

+ * As QOS filtering is enabled on overlord requests, we need to update the QOS filter paths in + * {@link org.apache.druid.cli.CliOverlord#addQOSFiltering(ServletContextHandler, int)} when a new jersey resource is added. */ - private static class OverlordJettyServerInitializer implements JettyServerInitializer + private static void addOverlordJerseyResources(Binder binder) + { + Jerseys.addResource(binder, OverlordResource.class); + Jerseys.addResource(binder, SupervisorResource.class); + Jerseys.addResource(binder, HttpRemoteTaskRunnerResource.class); + Jerseys.addResource(binder, OverlordCompactionResource.class); + Jerseys.addResource(binder, OverlordDataSourcesResource.class); + } + + /** + * + */ + protected static class OverlordJettyServerInitializer implements JettyServerInitializer { private final AuthConfig authConfig; private final ServerConfig serverConfig; @@ -487,6 +508,13 @@ public void initialize(Server server, Injector injector) final ObjectMapper jsonMapper = injector.getInstance(Key.get(ObjectMapper.class, Json.class)); final AuthenticatorMapper authenticatorMapper = injector.getInstance(AuthenticatorMapper.class); + final QueuedThreadPool queuedThreadPool = (QueuedThreadPool) server.getThreadPool(); + final int maxThreads = queuedThreadPool.getMaxThreads(); + final int threadsForOvelordWork = maxThreads - THREADS_RESERVED_FOR_HEALTH_CHECK; + + addQOSFiltering(root, threadsForOvelordWork); + + JettyServerInitUtils.addQosFilters(root, injector); AuthenticationUtils.addSecuritySanityCheckFilter(root, jsonMapper); @@ -542,4 +570,27 @@ public void initialize(Server server, Injector injector) server.setHandler(handlerList); } } + + protected static boolean addQOSFiltering(ServletContextHandler root, int threadsForOvelordWork) + { + if (threadsForOvelordWork >= ServerConfig.DEFAULT_NUM_PACKING_THREADS) { + log.info("Enabling QoS Filter on overlord requests with limit [%d].", threadsForOvelordWork); + JettyBindings.QosFilterHolder filterHolder = new JettyBindings.QosFilterHolder( + new String[]{ + "/druid-internal/v1/*", + "/druid/indexer/v1/*" + }, + threadsForOvelordWork + ); + JettyServerInitUtils.addFilters(root, Collections.singleton(filterHolder)); + return true; + } else { + log.info( + "QoSFilter is disabled for the overlord requests." + + "Set `druid.server.http.numThread` to a value greater than %d to enable QoSFilter.", + ServerConfig.DEFAULT_NUM_PACKING_THREADS + ); + return false; + } + } } diff --git a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java index 5e5d0f5ef78b..8e4822b2885b 100644 --- a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java @@ -25,6 +25,7 @@ import org.apache.druid.metadata.segment.SqlSegmentsMetadataManagerV2; import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache; import org.apache.druid.metadata.segment.cache.SegmentMetadataCache; +import org.eclipse.jetty.servlet.ServletContextHandler; import org.junit.Assert; import org.junit.Test; @@ -47,4 +48,32 @@ public void testSegmentMetadataCacheIsBound() = overlordInjector.getInstance(SegmentsMetadataManager.class); Assert.assertTrue(segmentsMetadataManager instanceof SqlSegmentsMetadataManagerV2); } + + @Test + public void testQosFilteringEnabled() + { + ServletContextHandler handler = new ServletContextHandler(); + final int threadsForOverlordWork = 30; + + Assert.assertTrue(CliOverlord.addQOSFiltering(handler, threadsForOverlordWork)); + + Assert.assertEquals( + threadsForOverlordWork, + Integer.parseInt(handler.getServletHandler().getFilters()[0].getInitParameters().get("maxRequests")) + ); + } + + @Test + public void testQosFilteringDisabled() + { + ServletContextHandler handler = new ServletContextHandler(); + final int threadsForOverlordWork = 29; + + Assert.assertFalse(CliOverlord.addQOSFiltering(handler, threadsForOverlordWork)); + + Assert.assertEquals( + 0, + handler.getServletHandler().getFilters().length + ); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java index f18873668ea2..f2325d7b50c2 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java @@ -201,7 +201,7 @@ public static void setupClass(@TempDir File tempDir) ManualQueryPrioritizationStrategy.INSTANCE, new HiLoQueryLaningStrategy(40), // Enable total laning - new ServerConfig(false) + ServerConfig.Builder.newBuilder().enableQueryRequestsQueuing(false).build() ) { @Override From 45deabe2273dcc673bc1d666289ed3bd36b3d38c Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 21 May 2025 12:05:56 +0530 Subject: [PATCH 02/12] Adding QOSFiltering for health checks on the overlord. --- .../server/initialization/ServerConfig.java | 50 +++---------------- .../server/ClientQuerySegmentWalkerTest.java | 2 +- .../druid/server/QueryResourceTest.java | 2 +- .../druid/server/QuerySchedulerTest.java | 2 +- .../org/apache/druid/cli/CliOverlord.java | 4 +- .../druid/sql/http/SqlResourceTest.java | 2 +- 6 files changed, 14 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java index 4ecd3156b9cc..439f837660da 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java @@ -20,10 +20,10 @@ package org.apache.druid.server.initialization; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import org.apache.druid.common.exception.ErrorResponseTransformStrategy; import org.apache.druid.common.exception.NoErrorResponseTransformStrategy; -import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.HumanReadableBytesRange; import org.apache.druid.query.QueryContexts; @@ -49,7 +49,7 @@ public class ServerConfig public static final int DEFAULT_NUM_PACKING_THREADS = 30; /** - * The ServerConfig is normally created using {@link JsonConfigProvider} binding. + * The ServerConfig is normally created using {@link org.apache.druid.guice.JsonConfigProvider} binding. * * This constructor is provided for callers that need to create a ServerConfig object with specific field values. */ @@ -105,6 +105,12 @@ public ServerConfig() } + @VisibleForTesting + public ServerConfig(boolean enableQueryRequestsQueuing) + { + this.enableQueryRequestsQueuing = enableQueryRequestsQueuing; + } + @JsonProperty @Min(1) private int numThreads = getDefaultNumThreads(); @@ -190,11 +196,6 @@ public ServerConfig() @JsonProperty private boolean showDetailedJettyErrors = true; - private ServerConfig(Builder builder) - { - enableQueryRequestsQueuing = builder.enableQueryRequestsQueuing; - } - public int getNumThreads() { return numThreads; @@ -402,39 +403,4 @@ public static int getDefaultNumThreads() { return Math.max(10, (JvmUtils.getRuntimeInfo().getAvailableProcessors() * 17) / 16 + 2) + DEFAULT_NUM_PACKING_THREADS; } - - - public static final class Builder - { - private @Min(1) int numThreads; - private boolean enableQueryRequestsQueuing; - - private Builder() - { - - } - - public static Builder newBuilder() - { - return new Builder(); - } - - public Builder numThreads(@Min(1) int numThreads) - { - this.numThreads = numThreads; - return this; - } - - - public Builder enableQueryRequestsQueuing(boolean enableQueryRequestsQueuing) - { - this.enableQueryRequestsQueuing = enableQueryRequestsQueuing; - return this; - } - - public ServerConfig build() - { - return new ServerConfig(this); - } - } } diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 718ee5c8df87..a812acc45974 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -240,7 +240,7 @@ public void setUp() 8, ManualQueryPrioritizationStrategy.INSTANCE, NoQueryLaningStrategy.INSTANCE, - ServerConfig.Builder.newBuilder().enableQueryRequestsQueuing(false).build() + new ServerConfig(false) ); initWalker(ImmutableMap.of(), scheduler); } diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java index e1ae1c8ec699..684092904848 100644 --- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java @@ -1216,7 +1216,7 @@ public void testTooManyQuery() throws InterruptedException, ExecutionException ManualQueryPrioritizationStrategy.INSTANCE, NoQueryLaningStrategy.INSTANCE, // enable total laning - ServerConfig.Builder.newBuilder().enableQueryRequestsQueuing(false).build() + new ServerConfig(false) ); ArrayList> back2 = new ArrayList<>(); diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java index d2ae1c6da7dd..ed602678a82c 100644 --- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java +++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java @@ -92,7 +92,7 @@ public class QuerySchedulerTest private static final int TEST_HI_CAPACITY = 5; private static final int TEST_LO_CAPACITY = 2; private static final ServerConfig SERVER_CONFIG_WITHOUT_TOTAL = new ServerConfig(); - private static final ServerConfig SERVER_CONFIG_WITH_TOTAL = ServerConfig.Builder.newBuilder().enableQueryRequestsQueuing(false).build(); + private static final ServerConfig SERVER_CONFIG_WITH_TOTAL = new ServerConfig(false); private ListeningExecutorService executorService; private ObservableQueryScheduler scheduler; diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 5ef5790b7a3a..32a022b25335 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -574,7 +574,7 @@ public void initialize(Server server, Injector injector) protected static boolean addQOSFiltering(ServletContextHandler root, int threadsForOvelordWork) { if (threadsForOvelordWork >= ServerConfig.DEFAULT_NUM_PACKING_THREADS) { - log.info("Enabling QoS Filter on overlord requests with limit [%d].", threadsForOvelordWork); + log.info("Enabling QOS filter on overlord requests with limit [%d].", threadsForOvelordWork); JettyBindings.QosFilterHolder filterHolder = new JettyBindings.QosFilterHolder( new String[]{ "/druid-internal/v1/*", @@ -586,7 +586,7 @@ protected static boolean addQOSFiltering(ServletContextHandler root, int threads return true; } else { log.info( - "QoSFilter is disabled for the overlord requests." + + "QOS filter is disabled for the overlord requests." + "Set `druid.server.http.numThread` to a value greater than %d to enable QoSFilter.", ServerConfig.DEFAULT_NUM_PACKING_THREADS ); diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java index f2325d7b50c2..f18873668ea2 100644 --- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java @@ -201,7 +201,7 @@ public static void setupClass(@TempDir File tempDir) ManualQueryPrioritizationStrategy.INSTANCE, new HiLoQueryLaningStrategy(40), // Enable total laning - ServerConfig.Builder.newBuilder().enableQueryRequestsQueuing(false).build() + new ServerConfig(false) ) { @Override From 8a39249db6a736f5526fd0401b9cb9987d360f99 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Mon, 17 Nov 2025 16:19:24 +0530 Subject: [PATCH 03/12] Fixing compile --- .../org/apache/druid/cli/CliOverlord.java | 27 ++++++++++++++++--- .../org/apache/druid/cli/CliOverlordTest.java | 2 +- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 0cb4eb28d969..a37a456ae297 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -471,6 +471,25 @@ private void configureOverlordHelpers(Binder binder) ); } + /** + * Currently, the resource paths of the jersery resources on the overlord start with + *

    + *
  1. /druid/indexer/v1
  2. + *
  3. /druid-internal/v1
  4. + *
+ *

+ * As QOS filtering is enabled on overlord requests, we need to update the QOS filter paths in + * {@link org.apache.druid.cli.CliOverlord#addQOSFiltering(ServletContextHandler, int)} when a new jersey resource is added. + **/ + private void addOverlordJerseyResources(Binder binder) + { + Jerseys.addResource(binder, OverlordResource.class); + Jerseys.addResource(binder, SupervisorResource.class); + Jerseys.addResource(binder, HttpRemoteTaskRunnerResource.class); + Jerseys.addResource(binder, OverlordCompactionResource.class); + Jerseys.addResource(binder, OverlordDataSourcesResource.class); + } + /** */ protected static class OverlordJettyServerInitializer implements JettyServerInitializer @@ -551,16 +570,16 @@ public void initialize(Server server, Injector injector) } } - protected static boolean addQOSFiltering(ServletContextHandler root, int threadsForOvelordWork) + protected static boolean addQOSFiltering(ServletContextHandler root, int threadsForOverlordWork) { - if (threadsForOvelordWork >= ServerConfig.DEFAULT_NUM_PACKING_THREADS) { - log.info("Enabling QOS filter on overlord requests with limit [%d].", threadsForOvelordWork); + if (threadsForOverlordWork >= ServerConfig.DEFAULT_NUM_PACKING_THREADS) { + log.info("Enabling QOS filter on overlord requests with limit [%d].", threadsForOverlordWork); JettyBindings.QosFilterHolder filterHolder = new JettyBindings.QosFilterHolder( new String[]{ "/druid-internal/v1/*", "/druid/indexer/v1/*" }, - threadsForOvelordWork + threadsForOverlordWork ); JettyServerInitUtils.addFilters(root, Collections.singleton(filterHolder)); return true; diff --git a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java index 8da228ed8d23..6cded71db1d7 100644 --- a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java @@ -26,7 +26,7 @@ import org.apache.druid.metadata.segment.SqlSegmentsMetadataManagerV2; import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache; import org.apache.druid.metadata.segment.cache.SegmentMetadataCache; -import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.ee8.servlet.ServletContextHandler; import org.junit.Assert; import org.junit.Test; From c32b9da8a75b6b23ea4be6cab5abd3035bfb206b Mon Sep 17 00:00:00 2001 From: cryptoe Date: Mon, 17 Nov 2025 16:38:15 +0530 Subject: [PATCH 04/12] Guice way of doing things --- .../org/apache/druid/cli/CliOverlord.java | 47 +++++++++---------- .../org/apache/druid/cli/CliOverlordTest.java | 28 ----------- 2 files changed, 22 insertions(+), 53 deletions(-) diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index a37a456ae297..0d917db1cb35 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -150,7 +150,6 @@ import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -302,6 +301,7 @@ public void configure(Binder binder) } addOverlordJerseyResources(binder); + configureQosFiltering(binder); binder.bind(AppenderatorsManager.class) .to(DummyForInjectionAppenderatorsManager.class) @@ -456,6 +456,25 @@ private void configureOverlordHelpers(Binder binder) dutyBinder.addBinding().to(TaskLogAutoCleaner.class); dutyBinder.addBinding().to(UnusedSegmentsKiller.class).in(LazySingleton.class); } + + private void configureQosFiltering(Binder binder) + { + // Add QoS filtering for overlord-specific endpoints if we have enough threads + final int serverHttpNumThreads = properties.containsKey("druid.server.http.numThreads") + ? Integer.parseInt(properties.getProperty("druid.server.http.numThreads")) + : ServerConfig.getDefaultNumThreads(); + + final int threadsForOverlordWork = serverHttpNumThreads - THREADS_RESERVED_FOR_HEALTH_CHECK; + + if (threadsForOverlordWork >= ServerConfig.DEFAULT_NUM_PACKING_THREADS) { + final String[] overlordPaths = { + "/druid-internal/v1/*", + "/druid/indexer/v1/*" + }; + + JettyBindings.addQosFilter(binder, overlordPaths, threadsForOverlordWork); + } + } }, new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(properties), @@ -478,8 +497,8 @@ private void configureOverlordHelpers(Binder binder) *

  • /druid-internal/v1
  • * *

    - * As QOS filtering is enabled on overlord requests, we need to update the QOS filter paths in - * {@link org.apache.druid.cli.CliOverlord#addQOSFiltering(ServletContextHandler, int)} when a new jersey resource is added. + * As QoS filtering is enabled on overlord requests, we need to update the QoS filter paths in + * {@link org.apache.druid.cli.CliOverlord#configureQosFiltering(Binder)} when a new jersey resource is added. **/ private void addOverlordJerseyResources(Binder binder) { @@ -570,26 +589,4 @@ public void initialize(Server server, Injector injector) } } - protected static boolean addQOSFiltering(ServletContextHandler root, int threadsForOverlordWork) - { - if (threadsForOverlordWork >= ServerConfig.DEFAULT_NUM_PACKING_THREADS) { - log.info("Enabling QOS filter on overlord requests with limit [%d].", threadsForOverlordWork); - JettyBindings.QosFilterHolder filterHolder = new JettyBindings.QosFilterHolder( - new String[]{ - "/druid-internal/v1/*", - "/druid/indexer/v1/*" - }, - threadsForOverlordWork - ); - JettyServerInitUtils.addFilters(root, Collections.singleton(filterHolder)); - return true; - } else { - log.info( - "QOS filter is disabled for the overlord requests." + - "Set `druid.server.http.numThread` to a value greater than %d to enable QoSFilter.", - ServerConfig.DEFAULT_NUM_PACKING_THREADS - ); - return false; - } - } } diff --git a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java index 6cded71db1d7..8a0769500854 100644 --- a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java @@ -26,7 +26,6 @@ import org.apache.druid.metadata.segment.SqlSegmentsMetadataManagerV2; import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache; import org.apache.druid.metadata.segment.cache.SegmentMetadataCache; -import org.eclipse.jetty.ee8.servlet.ServletContextHandler; import org.junit.Assert; import org.junit.Test; @@ -52,31 +51,4 @@ public void testSegmentMetadataCacheIsBound() Assert.assertTrue(segmentsMetadataManager instanceof SqlSegmentsMetadataManagerV2); } - @Test - public void testQosFilteringEnabled() - { - ServletContextHandler handler = new ServletContextHandler(); - final int threadsForOverlordWork = 30; - - Assert.assertTrue(CliOverlord.addQOSFiltering(handler, threadsForOverlordWork)); - - Assert.assertEquals( - threadsForOverlordWork, - Integer.parseInt(handler.getServletHandler().getFilters()[0].getInitParameters().get("maxRequests")) - ); - } - - @Test - public void testQosFilteringDisabled() - { - ServletContextHandler handler = new ServletContextHandler(); - final int threadsForOverlordWork = 29; - - Assert.assertFalse(CliOverlord.addQOSFiltering(handler, threadsForOverlordWork)); - - Assert.assertEquals( - 0, - handler.getServletHandler().getFilters().length - ); - } } From 986c93bebffba814522b4f6f1d0431fd0a9ac658 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Mon, 17 Nov 2025 16:46:06 +0530 Subject: [PATCH 05/12] Review comments --- .../org/apache/druid/server/initialization/ServerConfig.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java index fa782cd24656..4c8484c6a676 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java @@ -56,7 +56,7 @@ public class ServerConfig { public static final int DEFAULT_GZIP_INFLATE_BUFFER_SIZE = 4096; - public static final int DEFAULT_NUM_PACKING_THREADS = 30; + public static final int DEFAULT_MIN_QOS_THRESHOLD = 30; /** * The ServerConfig is normally created using {@link org.apache.druid.guice.JsonConfigProvider} binding. @@ -439,7 +439,7 @@ public String toString() public static int getDefaultNumThreads() { - return Math.max(10, (JvmUtils.getRuntimeInfo().getAvailableProcessors() * 17) / 16 + 2) + DEFAULT_NUM_PACKING_THREADS; + return Math.max(10, (JvmUtils.getRuntimeInfo().getAvailableProcessors() * 17) / 16 + 2) + DEFAULT_MIN_QOS_THRESHOLD; } public static class UriComplianceDeserializer extends JsonDeserializer From 83cce040fe8a007464d8f822c0bbf3c9aa3576e7 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Mon, 17 Nov 2025 16:47:05 +0530 Subject: [PATCH 06/12] Remove extra line --- services/src/main/java/org/apache/druid/cli/CliOverlord.java | 2 +- .../src/test/java/org/apache/druid/cli/CliOverlordTest.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 0d917db1cb35..d8e3b7ff478d 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -466,7 +466,7 @@ private void configureQosFiltering(Binder binder) final int threadsForOverlordWork = serverHttpNumThreads - THREADS_RESERVED_FOR_HEALTH_CHECK; - if (threadsForOverlordWork >= ServerConfig.DEFAULT_NUM_PACKING_THREADS) { + if (threadsForOverlordWork >= ServerConfig.DEFAULT_MIN_QOS_THRESHOLD) { final String[] overlordPaths = { "/druid-internal/v1/*", "/druid/indexer/v1/*" diff --git a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java index 8a0769500854..9bfbd500f13f 100644 --- a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java @@ -50,5 +50,4 @@ public void testSegmentMetadataCacheIsBound() = overlordInjector.getInstance(SegmentsMetadataManager.class); Assert.assertTrue(segmentsMetadataManager instanceof SqlSegmentsMetadataManagerV2); } - } From ad2dbd06a7303e11820ff9cc76ab89764d19bad1 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Mon, 17 Nov 2025 16:55:20 +0530 Subject: [PATCH 07/12] Lesser code --- .../org/apache/druid/cli/CliOverlord.java | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index d8e3b7ff478d..f8d58865e5f9 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -300,8 +300,7 @@ public void configure(Binder binder) .in(LazySingleton.class); } - addOverlordJerseyResources(binder); - configureQosFiltering(binder); + configureOverlordWebResources(binder); binder.bind(AppenderatorsManager.class) .to(DummyForInjectionAppenderatorsManager.class) @@ -457,21 +456,46 @@ private void configureOverlordHelpers(Binder binder) dutyBinder.addBinding().to(UnusedSegmentsKiller.class).in(LazySingleton.class); } - private void configureQosFiltering(Binder binder) + /** + * Configures Overlord-specific web resources and QoS filtering. + * + *

    This method performs two main tasks: + *

      + *
    1. Registers Jersey resources for Overlord REST endpoints
    2. + *
    3. Configures QoS (Quality of Service) filtering for request limiting
    4. + *
    + * + *

    The Jersey resources handle the following endpoint paths: + *

      + *
    • /druid/indexer/v1/* - Main indexing and task management endpoints
    • + *
    • /druid-internal/v1/* - Internal Overlord management endpoints
    • + *
    + * Note to developers: + * Whenever adding new resources, please check if the root paths are added in the QOS filtering. + * + * @param binder the Guice binder for registering dependencies + */ + private void configureOverlordWebResources(Binder binder) { + Jerseys.addResource(binder, OverlordResource.class); + Jerseys.addResource(binder, SupervisorResource.class); + Jerseys.addResource(binder, HttpRemoteTaskRunnerResource.class); + Jerseys.addResource(binder, OverlordCompactionResource.class); + Jerseys.addResource(binder, OverlordDataSourcesResource.class); + // Add QoS filtering for overlord-specific endpoints if we have enough threads final int serverHttpNumThreads = properties.containsKey("druid.server.http.numThreads") - ? Integer.parseInt(properties.getProperty("druid.server.http.numThreads")) - : ServerConfig.getDefaultNumThreads(); - + ? Integer.parseInt(properties.getProperty("druid.server.http.numThreads")) + : ServerConfig.getDefaultNumThreads(); + final int threadsForOverlordWork = serverHttpNumThreads - THREADS_RESERVED_FOR_HEALTH_CHECK; - + if (threadsForOverlordWork >= ServerConfig.DEFAULT_MIN_QOS_THRESHOLD) { final String[] overlordPaths = { "/druid-internal/v1/*", "/druid/indexer/v1/*" }; - + JettyBindings.addQosFilter(binder, overlordPaths, threadsForOverlordWork); } } @@ -490,25 +514,6 @@ private void configureQosFiltering(Binder binder) ); } - /** - * Currently, the resource paths of the jersery resources on the overlord start with - *
      - *
    1. /druid/indexer/v1
    2. - *
    3. /druid-internal/v1
    4. - *
    - *

    - * As QoS filtering is enabled on overlord requests, we need to update the QoS filter paths in - * {@link org.apache.druid.cli.CliOverlord#configureQosFiltering(Binder)} when a new jersey resource is added. - **/ - private void addOverlordJerseyResources(Binder binder) - { - Jerseys.addResource(binder, OverlordResource.class); - Jerseys.addResource(binder, SupervisorResource.class); - Jerseys.addResource(binder, HttpRemoteTaskRunnerResource.class); - Jerseys.addResource(binder, OverlordCompactionResource.class); - Jerseys.addResource(binder, OverlordDataSourcesResource.class); - } - /** */ protected static class OverlordJettyServerInitializer implements JettyServerInitializer From 60a26a12bb09be60376c843d61e46e3c17ee7147 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Mon, 17 Nov 2025 16:59:54 +0530 Subject: [PATCH 08/12] even smaller diff --- .../initialization/jetty/JettyServerModule.java | 15 +++++++++------ .../java/org/apache/druid/cli/CliOverlord.java | 3 +-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java index 2cbb7d6e211e..17a97901a73e 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java @@ -195,12 +195,15 @@ static Server makeAndInitializeServer( // that concurrently handle the requests". int numServerThreads = config.getNumThreads() + getMaxJettyAcceptorsSelectorsNum(node); + + final QueuedThreadPool threadPool; + if (config.getQueueSize() == Integer.MAX_VALUE) { - jettyServerThreadPool = new QueuedThreadPool(); - jettyServerThreadPool.setMinThreads(numServerThreads); - jettyServerThreadPool.setMaxThreads(numServerThreads); + threadPool = new QueuedThreadPool(); + threadPool.setMinThreads(numServerThreads); + threadPool.setMaxThreads(numServerThreads); } else { - jettyServerThreadPool = new QueuedThreadPool( + threadPool = new QueuedThreadPool( numServerThreads, numServerThreads, 60000, // same default is used in other case when threadPool = new QueuedThreadPool() @@ -208,8 +211,8 @@ static Server makeAndInitializeServer( ); } - jettyServerThreadPool.setDaemon(true); - + threadPool.setDaemon(true); + jettyServerThreadPool = threadPool; final Server server = new Server(jettyServerThreadPool); diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index f8d58865e5f9..9b99ab9403b0 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -516,7 +516,7 @@ private void configureOverlordWebResources(Binder binder) /** */ - protected static class OverlordJettyServerInitializer implements JettyServerInitializer + private static class OverlordJettyServerInitializer implements JettyServerInitializer { private final AuthConfig authConfig; private final ServerConfig serverConfig; @@ -593,5 +593,4 @@ public void initialize(Server server, Injector injector) server.setHandler(handlerList); } } - } From 39077003b64e01084135e78a24bb73c5810343e1 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Mon, 17 Nov 2025 17:01:20 +0530 Subject: [PATCH 09/12] Adjusting more --- .../druid/server/initialization/jetty/JettyServerModule.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java index 17a97901a73e..ce17e8557c33 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java @@ -195,9 +195,7 @@ static Server makeAndInitializeServer( // that concurrently handle the requests". int numServerThreads = config.getNumThreads() + getMaxJettyAcceptorsSelectorsNum(node); - final QueuedThreadPool threadPool; - if (config.getQueueSize() == Integer.MAX_VALUE) { threadPool = new QueuedThreadPool(); threadPool.setMinThreads(numServerThreads); From f1fe5aea8311eccc498a55fb5014718deaf4c43d Mon Sep 17 00:00:00 2001 From: cryptoe Date: Mon, 17 Nov 2025 17:03:52 +0530 Subject: [PATCH 10/12] checkstyle fixes --- .../main/java/org/apache/druid/cli/CliOverlord.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 9b99ab9403b0..e9601bc1c976 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -458,22 +458,19 @@ private void configureOverlordHelpers(Binder binder) /** * Configures Overlord-specific web resources and QoS filtering. - * - *

    This method performs two main tasks: + * This method performs two main tasks: *

      *
    1. Registers Jersey resources for Overlord REST endpoints
    2. *
    3. Configures QoS (Quality of Service) filtering for request limiting
    4. *
    - * - *

    The Jersey resources handle the following endpoint paths: + *

    + * The Jersey resources handle the following endpoint paths: *

      - *
    • /druid/indexer/v1/* - Main indexing and task management endpoints
    • - *
    • /druid-internal/v1/* - Internal Overlord management endpoints
    • + *
    • /druid/indexer/v1 - Main indexing and task management endpoints
    • + *
    • /druid-internal/v1 - Internal Overlord management endpoints
    • *
    * Note to developers: * Whenever adding new resources, please check if the root paths are added in the QOS filtering. - * - * @param binder the Guice binder for registering dependencies */ private void configureOverlordWebResources(Binder binder) { From 0b3cff828366d395d69b820d02e24a119922e378 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 19 Nov 2025 19:03:32 +0530 Subject: [PATCH 11/12] Addressing review comments. --- docs/configuration/index.md | 3 +- .../server/initialization/ServerConfig.java | 3 +- .../jetty/CliIndexerServerModule.java | 2 +- .../org/apache/druid/cli/CliOverlord.java | 44 +++++++++++-------- .../org/apache/druid/cli/CliOverlordTest.java | 26 +++++++++++ 5 files changed, 56 insertions(+), 22 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 6c1bf8d67f41..a36d583239ba 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -992,6 +992,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro |Property|Description|Default| |--------|-----------|-------| |`druid.indexer.runner.type`|Indicates whether tasks should be run locally using `local` or in a distributed environment using `remote`. The recommended option is `httpRemote`, which is similar to `remote` but uses HTTP to interact with Middle Managers instead of ZooKeeper.|`httpRemote`| +|`druid.indexer.server.maxConcurrentActions`|Maximum number of concurrent action requests (such as getting locks, creating segments, fetching segments etc) that the Overlord will process simultaneously. This prevents thread exhaustion while preserving access to health check endpoints. Set to `0` to disable QoS filtering entirely. If not specified, defaults to `max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`.|`max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`| |`druid.indexer.storage.type`|Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. One of `local` or `metadata`. `local` is mainly for internal testing while `metadata` is recommended in production because storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|`local`| |`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store task results. Default is 24 hours. If you have hundreds of tasks running in a day, consider increasing this threshold.|`PT24H`| |`druid.indexer.tasklock.forceTimeChunkLock`|**Setting this to false is still experimental**
    If set, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/tasks.md#context-parameters). See [Task lock system](../ingestion/tasks.md#task-lock-system) for more details about locking in tasks.|true| @@ -1011,7 +1012,7 @@ The following configs only apply if the Overlord is running in remote mode. For |--------|-----------|-------| |`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task has been assigned to a Middle Manager before throwing an error.|`PT5M`| |`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to send tasks to. The version number is a string. This affects the expected behavior during certain operations like comparison against `druid.worker.version`. Specifically, the version comparison follows dictionary order. Use ISO8601 date format for the version to accommodate date comparisons. |"0"| -| `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range `[0, 1]`. |1| +|`druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range `[0, 1]`. |1| |`druid.indexer.runner.compressZnodes`|Indicates whether or not the Overlord should expect Middle Managers to compress Znodes.|true| |`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in ZooKeeper, should be in the range of `[10KiB, 2GiB)`. [Human-readable format](human-readable-byte.md) is supported.| 512 KiB | |`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a Middle Manager is disconnected from ZooKeeper.|`PT15M`| diff --git a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java index 4c8484c6a676..a7206ca9cf7e 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java @@ -56,7 +56,6 @@ public class ServerConfig { public static final int DEFAULT_GZIP_INFLATE_BUFFER_SIZE = 4096; - public static final int DEFAULT_MIN_QOS_THRESHOLD = 30; /** * The ServerConfig is normally created using {@link org.apache.druid.guice.JsonConfigProvider} binding. @@ -439,7 +438,7 @@ public String toString() public static int getDefaultNumThreads() { - return Math.max(10, (JvmUtils.getRuntimeInfo().getAvailableProcessors() * 17) / 16 + 2) + DEFAULT_MIN_QOS_THRESHOLD; + return Math.max(10, (JvmUtils.getRuntimeInfo().getAvailableProcessors() * 17) / 16 + 2) + 30; } public static class UriComplianceDeserializer extends JsonDeserializer diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java index 76fd422b32ad..7a9efcb1edbd 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java @@ -47,7 +47,7 @@ */ public class CliIndexerServerModule implements Module { - private static final String SERVER_HTTP_NUM_THREADS_PROPERTY = "druid.server.http.numThreads"; + public static final String SERVER_HTTP_NUM_THREADS_PROPERTY = "druid.server.http.numThreads"; private final Properties properties; public CliIndexerServerModule(Properties properties) diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index e9601bc1c976..b23878453efd 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -129,6 +129,7 @@ import org.apache.druid.server.http.RedirectInfo; import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.initialization.jetty.CliIndexerServerModule; import org.apache.druid.server.initialization.jetty.JettyBindings; import org.apache.druid.server.initialization.jetty.JettyServerInitUtils; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; @@ -166,7 +167,6 @@ public class CliOverlord extends ServerRunnable { private static final Logger log = new Logger(CliOverlord.class); private static final String DEFAULT_SERVICE_NAME = "druid/overlord"; - private static final int THREADS_RESERVED_FOR_HEALTH_CHECK = 1; protected static final List UNSECURED_PATHS = ImmutableList.of( "/druid/indexer/v1/isLeader", @@ -461,16 +461,11 @@ private void configureOverlordHelpers(Binder binder) * This method performs two main tasks: *
      *
    1. Registers Jersey resources for Overlord REST endpoints
    2. - *
    3. Configures QoS (Quality of Service) filtering for request limiting
    4. + *
    5. Configures QoS (Quality of Service) filtering for action APIs only
    6. *
    *

    - * The Jersey resources handle the following endpoint paths: - *

      - *
    • /druid/indexer/v1 - Main indexing and task management endpoints
    • - *
    • /druid-internal/v1 - Internal Overlord management endpoints
    • - *
    - * Note to developers: - * Whenever adding new resources, please check if the root paths are added in the QOS filtering. + * QoS filtering is applied to action APIs to prevent the Overlord from becoming + * unresponsive to health checks */ private void configureOverlordWebResources(Binder binder) { @@ -480,20 +475,28 @@ private void configureOverlordWebResources(Binder binder) Jerseys.addResource(binder, OverlordCompactionResource.class); Jerseys.addResource(binder, OverlordDataSourcesResource.class); - // Add QoS filtering for overlord-specific endpoints if we have enough threads - final int serverHttpNumThreads = properties.containsKey("druid.server.http.numThreads") - ? Integer.parseInt(properties.getProperty("druid.server.http.numThreads")) + + final int serverHttpNumThreads = properties.containsKey(CliIndexerServerModule.SERVER_HTTP_NUM_THREADS_PROPERTY) + ? Integer.parseInt(properties.getProperty(CliIndexerServerModule.SERVER_HTTP_NUM_THREADS_PROPERTY)) : ServerConfig.getDefaultNumThreads(); - final int threadsForOverlordWork = serverHttpNumThreads - THREADS_RESERVED_FOR_HEALTH_CHECK; + final int maxConcurrentActions; + if (properties.containsKey("druid.indexer.server.maxConcurrentActions")) { + maxConcurrentActions = Integer.parseInt(properties.getProperty("druid.indexer.server.maxConcurrentActions")); + } else { + maxConcurrentActions = getDefaultMaxConcurrentActions(serverHttpNumThreads); + } - if (threadsForOverlordWork >= ServerConfig.DEFAULT_MIN_QOS_THRESHOLD) { - final String[] overlordPaths = { - "/druid-internal/v1/*", - "/druid/indexer/v1/*" + if (maxConcurrentActions > 0) { + // Add QoS filtering for action endpoints only + final String[] actionPaths = { + "/druid/indexer/v1/action", }; - JettyBindings.addQosFilter(binder, overlordPaths, threadsForOverlordWork); + log.info("Overlord QoS filtering enabled for action endpoints. Max concurrent actions: [%d]", maxConcurrentActions); + JettyBindings.addQosFilter(binder, actionPaths, maxConcurrentActions); + } else { + log.info("Overlord QoS filtering disabled for action endpoints. Max concurrent actions: [%d]", serverHttpNumThreads); } } }, @@ -511,6 +514,11 @@ private void configureOverlordWebResources(Binder binder) ); } + public static int getDefaultMaxConcurrentActions(int serverHttpNumThreads) + { + return Math.max(1, Math.max(serverHttpNumThreads - 4, (int) (serverHttpNumThreads * 0.8))); + } + /** */ private static class OverlordJettyServerInitializer implements JettyServerInitializer diff --git a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java index 9bfbd500f13f..031c38ff8b20 100644 --- a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java @@ -50,4 +50,30 @@ public void testSegmentMetadataCacheIsBound() = overlordInjector.getInstance(SegmentsMetadataManager.class); Assert.assertTrue(segmentsMetadataManager instanceof SqlSegmentsMetadataManagerV2); } + + + @Test + public void testGetDefaultMaxConcurrentActions() + { + // Small thread count where + Assert.assertEquals(8, CliOverlord.getDefaultMaxConcurrentActions(10)); + + // Medium thread count where + Assert.assertEquals(21, CliOverlord.getDefaultMaxConcurrentActions(25)); + Assert.assertEquals(26, CliOverlord.getDefaultMaxConcurrentActions(30)); + + + // Large thread count + Assert.assertEquals(46, CliOverlord.getDefaultMaxConcurrentActions(50)); + Assert.assertEquals(96, CliOverlord.getDefaultMaxConcurrentActions(100)); + + // Test edge cases - return atleast 1 thread + Assert.assertEquals(1, CliOverlord.getDefaultMaxConcurrentActions(-1)); + Assert.assertEquals(1, CliOverlord.getDefaultMaxConcurrentActions(0)); + + // Test small clustesr + Assert.assertEquals(2, CliOverlord.getDefaultMaxConcurrentActions(3)); + Assert.assertEquals(3, CliOverlord.getDefaultMaxConcurrentActions(4)); + Assert.assertEquals(4, CliOverlord.getDefaultMaxConcurrentActions(5)); + } } From c5b84dce8ddff5980da8a861592e86ec7e2143c7 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 19 Nov 2025 20:25:31 +0530 Subject: [PATCH 12/12] Fix spell check --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index a36d583239ba..a1d2d3070f61 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -992,7 +992,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro |Property|Description|Default| |--------|-----------|-------| |`druid.indexer.runner.type`|Indicates whether tasks should be run locally using `local` or in a distributed environment using `remote`. The recommended option is `httpRemote`, which is similar to `remote` but uses HTTP to interact with Middle Managers instead of ZooKeeper.|`httpRemote`| -|`druid.indexer.server.maxConcurrentActions`|Maximum number of concurrent action requests (such as getting locks, creating segments, fetching segments etc) that the Overlord will process simultaneously. This prevents thread exhaustion while preserving access to health check endpoints. Set to `0` to disable QoS filtering entirely. If not specified, defaults to `max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`.|`max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`| +|`druid.indexer.server.maxConcurrentActions`|Maximum number of concurrent action requests (such as getting locks, creating segments, fetching segments etc) that the Overlord will process simultaneously. This prevents thread exhaustion while preserving access to health check endpoints. Set to `0` to disable quality of service filtering entirely. If not specified, defaults to `max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`.|`max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`| |`druid.indexer.storage.type`|Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. One of `local` or `metadata`. `local` is mainly for internal testing while `metadata` is recommended in production because storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|`local`| |`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store task results. Default is 24 hours. If you have hundreds of tasks running in a day, consider increasing this threshold.|`PT24H`| |`druid.indexer.tasklock.forceTimeChunkLock`|**Setting this to false is still experimental**
    If set, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/tasks.md#context-parameters). See [Task lock system](../ingestion/tasks.md#task-lock-system) for more details about locking in tasks.|true|