Skip to content
3 changes: 2 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.runner.type`|Indicates whether tasks should be run locally using `local` or in a distributed environment using `remote`. The recommended option is `httpRemote`, which is similar to `remote` but uses HTTP to interact with Middle Managers instead of ZooKeeper.|`httpRemote`|
|`druid.indexer.server.maxConcurrentActions`|Maximum number of concurrent action requests (such as getting locks, creating segments, fetching segments etc) that the Overlord will process simultaneously. This prevents thread exhaustion while preserving access to health check endpoints. Set to `0` to disable quality of service filtering entirely. If not specified, defaults to `max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`.|`max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`|
|`druid.indexer.storage.type`|Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. One of `local` or `metadata`. `local` is mainly for internal testing while `metadata` is recommended in production because storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|`local`|
|`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store task results. Default is 24 hours. If you have hundreds of tasks running in a day, consider increasing this threshold.|`PT24H`|
|`druid.indexer.tasklock.forceTimeChunkLock`|**Setting this to false is still experimental**<br/> If set, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/tasks.md#context-parameters). See [Task lock system](../ingestion/tasks.md#task-lock-system) for more details about locking in tasks.|true|
Expand All @@ -1011,7 +1012,7 @@ The following configs only apply if the Overlord is running in remote mode. For
|--------|-----------|-------|
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task has been assigned to a Middle Manager before throwing an error.|`PT5M`|
|`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to send tasks to. The version number is a string. This affects the expected behavior during certain operations like comparison against `druid.worker.version`. Specifically, the version comparison follows dictionary order. Use ISO8601 date format for the version to accommodate date comparisons. |"0"|
| `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range `[0, 1]`. |1|
|`druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots available for parallel indexing supervisor tasks per worker. The specified value must be in the range `[0, 1]`. |1|
|`druid.indexer.runner.compressZnodes`|Indicates whether or not the Overlord should expect Middle Managers to compress Znodes.|true|
|`druid.indexer.runner.maxZnodeBytes`|The maximum size Znode in bytes that can be created in ZooKeeper, should be in the range of `[10KiB, 2GiB)`. [Human-readable format](human-readable-byte.md) is supported.| 512 KiB |
|`druid.indexer.runner.taskCleanupTimeout`|How long to wait before failing a task after a Middle Manager is disconnected from ZooKeeper.|`PT15M`|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
*/
public class CliIndexerServerModule implements Module
{
private static final String SERVER_HTTP_NUM_THREADS_PROPERTY = "druid.server.http.numThreads";
public static final String SERVER_HTTP_NUM_THREADS_PROPERTY = "druid.server.http.numThreads";
private final Properties properties;

public CliIndexerServerModule(Properties properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ static Server makeAndInitializeServer(
threadPool.setDaemon(true);
jettyServerThreadPool = threadPool;

final Server server = new Server(threadPool);
final Server server = new Server(jettyServerThreadPool);

// Without this bean set, the default ScheduledExecutorScheduler runs as non-daemon, causing lifecycle hooks to fail
// to fire on main exit. Related bug: https://github.com/apache/druid/pull/1627
Expand Down
58 changes: 52 additions & 6 deletions services/src/main/java/org/apache/druid/cli/CliOverlord.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@
import org.apache.druid.server.http.RedirectInfo;
import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.jetty.CliIndexerServerModule;
import org.apache.druid.server.initialization.jetty.JettyBindings;
import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.ServiceStatusMonitor;
Expand Down Expand Up @@ -298,12 +300,7 @@
.in(LazySingleton.class);
}

Jerseys.addResource(binder, OverlordResource.class);
Jerseys.addResource(binder, SupervisorResource.class);
Jerseys.addResource(binder, HttpRemoteTaskRunnerResource.class);
Jerseys.addResource(binder, OverlordCompactionResource.class);
Jerseys.addResource(binder, OverlordDataSourcesResource.class);

configureOverlordWebResources(binder);

binder.bind(AppenderatorsManager.class)
.to(DummyForInjectionAppenderatorsManager.class)
Expand Down Expand Up @@ -458,6 +455,50 @@
dutyBinder.addBinding().to(TaskLogAutoCleaner.class);
dutyBinder.addBinding().to(UnusedSegmentsKiller.class).in(LazySingleton.class);
}

/**
* Configures Overlord-specific web resources and QoS filtering.
* This method performs two main tasks:
* <ol>
* <li>Registers Jersey resources for Overlord REST endpoints</li>
* <li>Configures QoS (Quality of Service) filtering for action APIs only</li>
* </ol>
* <p>
* QoS filtering is applied to action APIs to prevent the Overlord from becoming
* unresponsive to health checks
*/
private void configureOverlordWebResources(Binder binder)
{
Jerseys.addResource(binder, OverlordResource.class);
Jerseys.addResource(binder, SupervisorResource.class);
Jerseys.addResource(binder, HttpRemoteTaskRunnerResource.class);
Jerseys.addResource(binder, OverlordCompactionResource.class);
Jerseys.addResource(binder, OverlordDataSourcesResource.class);


final int serverHttpNumThreads = properties.containsKey(CliIndexerServerModule.SERVER_HTTP_NUM_THREADS_PROPERTY)
? Integer.parseInt(properties.getProperty(CliIndexerServerModule.SERVER_HTTP_NUM_THREADS_PROPERTY))

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
: ServerConfig.getDefaultNumThreads();

final int maxConcurrentActions;
if (properties.containsKey("druid.indexer.server.maxConcurrentActions")) {
maxConcurrentActions = Integer.parseInt(properties.getProperty("druid.indexer.server.maxConcurrentActions"));

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
} else {
maxConcurrentActions = getDefaultMaxConcurrentActions(serverHttpNumThreads);
}

if (maxConcurrentActions > 0) {
// Add QoS filtering for action endpoints only
final String[] actionPaths = {
"/druid/indexer/v1/action",
};

log.info("Overlord QoS filtering enabled for action endpoints. Max concurrent actions: [%d]", maxConcurrentActions);
JettyBindings.addQosFilter(binder, actionPaths, maxConcurrentActions);
} else {
log.info("Overlord QoS filtering disabled for action endpoints. Max concurrent actions: [%d]", serverHttpNumThreads);
}
}
},
new IndexingServiceInputSourceModule(),
new IndexingServiceTaskLogsModule(properties),
Expand All @@ -473,6 +514,11 @@
);
}

public static int getDefaultMaxConcurrentActions(int serverHttpNumThreads)
{
return Math.max(1, Math.max(serverHttpNumThreads - 4, (int) (serverHttpNumThreads * 0.8)));
}

/**
*/
private static class OverlordJettyServerInitializer implements JettyServerInitializer
Expand Down
26 changes: 26 additions & 0 deletions services/src/test/java/org/apache/druid/cli/CliOverlordTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,30 @@ public void testSegmentMetadataCacheIsBound()
= overlordInjector.getInstance(SegmentsMetadataManager.class);
Assert.assertTrue(segmentsMetadataManager instanceof SqlSegmentsMetadataManagerV2);
}


@Test
public void testGetDefaultMaxConcurrentActions()
{
// Small thread count where
Assert.assertEquals(8, CliOverlord.getDefaultMaxConcurrentActions(10));

// Medium thread count where
Assert.assertEquals(21, CliOverlord.getDefaultMaxConcurrentActions(25));
Assert.assertEquals(26, CliOverlord.getDefaultMaxConcurrentActions(30));


// Large thread count
Assert.assertEquals(46, CliOverlord.getDefaultMaxConcurrentActions(50));
Assert.assertEquals(96, CliOverlord.getDefaultMaxConcurrentActions(100));

// Test edge cases - return atleast 1 thread
Assert.assertEquals(1, CliOverlord.getDefaultMaxConcurrentActions(-1));
Assert.assertEquals(1, CliOverlord.getDefaultMaxConcurrentActions(0));

// Test small clustesr
Assert.assertEquals(2, CliOverlord.getDefaultMaxConcurrentActions(3));
Assert.assertEquals(3, CliOverlord.getDefaultMaxConcurrentActions(4));
Assert.assertEquals(4, CliOverlord.getDefaultMaxConcurrentActions(5));
}
}
Loading