diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 6c1bf8d67f41..a1d2d3070f61 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -992,6 +992,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.runner.type`|Indicates whether tasks should be run locally using `local` or in a distributed environment using `remote`. The recommended option is `httpRemote`, which is similar to `remote` but uses HTTP to interact with Middle Managers instead of ZooKeeper.|`httpRemote`|
+|`druid.indexer.server.maxConcurrentActions`|Maximum number of concurrent action requests (such as getting locks, creating segments, fetching segments etc) that the Overlord will process simultaneously. This prevents thread exhaustion while preserving access to health check endpoints. Set to `0` to disable quality of service filtering entirely. If not specified, defaults to `max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`.|`max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`|
|`druid.indexer.storage.type`|Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. One of `local` or `metadata`. `local` is mainly for internal testing while `metadata` is recommended in production because storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|`local`|
|`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store task results. Default is 24 hours. If you have hundreds of tasks running in a day, consider increasing this threshold.|`PT24H`|
|`druid.indexer.tasklock.forceTimeChunkLock`|**Setting this to false is still experimental**
If set, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/tasks.md#context-parameters). See [Task lock system](../ingestion/tasks.md#task-lock-system) for more details about locking in tasks.|true|
@@ -1011,7 +1012,7 @@ The following configs only apply if the Overlord is running in remote mode. For
|--------|-----------|-------|
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task has been assigned to a Middle Manager before throwing an error.|`PT5M`|
|`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to send tasks to. The version number is a string. This affects the expected behavior during certain operations like comparison against `druid.worker.version`. Specifically, the version comparison follows dictionary order. Use ISO8601 date format for the version to accommodate date comparisons. |"0"|
-| `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range `[0, 1]`. |1|
+|`druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range `[0, 1]`. |1|
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the Overlord should expect Middle Managers to compress Znodes.|true|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in ZooKeeper, should be in the range of `[10KiB, 2GiB)`. [Human-readable format](human-readable-byte.md) is supported.| 512 KiB |
|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a Middle Manager is disconnected from ZooKeeper.|`PT15M`|
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java
index 76fd422b32ad..7a9efcb1edbd 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java
@@ -47,7 +47,7 @@
*/
public class CliIndexerServerModule implements Module
{
- private static final String SERVER_HTTP_NUM_THREADS_PROPERTY = "druid.server.http.numThreads";
+ public static final String SERVER_HTTP_NUM_THREADS_PROPERTY = "druid.server.http.numThreads";
private final Properties properties;
public CliIndexerServerModule(Properties properties)
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
index 6593e67947e3..ce17e8557c33 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerModule.java
@@ -212,7 +212,7 @@ static Server makeAndInitializeServer(
threadPool.setDaemon(true);
jettyServerThreadPool = threadPool;
- final Server server = new Server(threadPool);
+ final Server server = new Server(jettyServerThreadPool);
// Without this bean set, the default ScheduledExecutorScheduler runs as non-daemon, causing lifecycle hooks to fail
// to fire on main exit. Related bug: https://github.com/apache/druid/pull/1627
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 63a1d28bfcfa..b23878453efd 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -129,6 +129,8 @@
import org.apache.druid.server.http.RedirectInfo;
import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.initialization.jetty.CliIndexerServerModule;
+import org.apache.druid.server.initialization.jetty.JettyBindings;
import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.ServiceStatusMonitor;
@@ -298,12 +300,7 @@ public void configure(Binder binder)
.in(LazySingleton.class);
}
- Jerseys.addResource(binder, OverlordResource.class);
- Jerseys.addResource(binder, SupervisorResource.class);
- Jerseys.addResource(binder, HttpRemoteTaskRunnerResource.class);
- Jerseys.addResource(binder, OverlordCompactionResource.class);
- Jerseys.addResource(binder, OverlordDataSourcesResource.class);
-
+ configureOverlordWebResources(binder);
binder.bind(AppenderatorsManager.class)
.to(DummyForInjectionAppenderatorsManager.class)
@@ -458,6 +455,50 @@ private void configureOverlordHelpers(Binder binder)
dutyBinder.addBinding().to(TaskLogAutoCleaner.class);
dutyBinder.addBinding().to(UnusedSegmentsKiller.class).in(LazySingleton.class);
}
+
+ /**
+ * Configures Overlord-specific web resources and QoS filtering.
+ * This method performs two main tasks:
+ *
+ * QoS filtering is applied to action APIs to prevent the Overlord from becoming + * unresponsive to health checks + */ + private void configureOverlordWebResources(Binder binder) + { + Jerseys.addResource(binder, OverlordResource.class); + Jerseys.addResource(binder, SupervisorResource.class); + Jerseys.addResource(binder, HttpRemoteTaskRunnerResource.class); + Jerseys.addResource(binder, OverlordCompactionResource.class); + Jerseys.addResource(binder, OverlordDataSourcesResource.class); + + + final int serverHttpNumThreads = properties.containsKey(CliIndexerServerModule.SERVER_HTTP_NUM_THREADS_PROPERTY) + ? Integer.parseInt(properties.getProperty(CliIndexerServerModule.SERVER_HTTP_NUM_THREADS_PROPERTY)) + : ServerConfig.getDefaultNumThreads(); + + final int maxConcurrentActions; + if (properties.containsKey("druid.indexer.server.maxConcurrentActions")) { + maxConcurrentActions = Integer.parseInt(properties.getProperty("druid.indexer.server.maxConcurrentActions")); + } else { + maxConcurrentActions = getDefaultMaxConcurrentActions(serverHttpNumThreads); + } + + if (maxConcurrentActions > 0) { + // Add QoS filtering for action endpoints only + final String[] actionPaths = { + "/druid/indexer/v1/action", + }; + + log.info("Overlord QoS filtering enabled for action endpoints. Max concurrent actions: [%d]", maxConcurrentActions); + JettyBindings.addQosFilter(binder, actionPaths, maxConcurrentActions); + } else { + log.info("Overlord QoS filtering disabled for action endpoints. Max concurrent actions: [%d]", serverHttpNumThreads); + } + } }, new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(properties), @@ -473,6 +514,11 @@ private void configureOverlordHelpers(Binder binder) ); } + public static int getDefaultMaxConcurrentActions(int serverHttpNumThreads) + { + return Math.max(1, Math.max(serverHttpNumThreads - 4, (int) (serverHttpNumThreads * 0.8))); + } + /** */ private static class OverlordJettyServerInitializer implements JettyServerInitializer diff --git a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java index 9bfbd500f13f..031c38ff8b20 100644 --- a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java @@ -50,4 +50,30 @@ public void testSegmentMetadataCacheIsBound() = overlordInjector.getInstance(SegmentsMetadataManager.class); Assert.assertTrue(segmentsMetadataManager instanceof SqlSegmentsMetadataManagerV2); } + + + @Test + public void testGetDefaultMaxConcurrentActions() + { + // Small thread count where + Assert.assertEquals(8, CliOverlord.getDefaultMaxConcurrentActions(10)); + + // Medium thread count where + Assert.assertEquals(21, CliOverlord.getDefaultMaxConcurrentActions(25)); + Assert.assertEquals(26, CliOverlord.getDefaultMaxConcurrentActions(30)); + + + // Large thread count + Assert.assertEquals(46, CliOverlord.getDefaultMaxConcurrentActions(50)); + Assert.assertEquals(96, CliOverlord.getDefaultMaxConcurrentActions(100)); + + // Test edge cases - return atleast 1 thread + Assert.assertEquals(1, CliOverlord.getDefaultMaxConcurrentActions(-1)); + Assert.assertEquals(1, CliOverlord.getDefaultMaxConcurrentActions(0)); + + // Test small clustesr + Assert.assertEquals(2, CliOverlord.getDefaultMaxConcurrentActions(3)); + Assert.assertEquals(3, CliOverlord.getDefaultMaxConcurrentActions(4)); + Assert.assertEquals(4, CliOverlord.getDefaultMaxConcurrentActions(5)); + } }