Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7bcb35f
Add new dynamic config
adarshsanjeev Mar 3, 2025
c9a9fd5
Inject dynamic config
adarshsanjeev Mar 3, 2025
acb289f
Add new API
adarshsanjeev Mar 3, 2025
386e389
Add new loading pool
adarshsanjeev Mar 3, 2025
48e81dc
Revert "Inject dynamic config"
adarshsanjeev Mar 3, 2025
270acdf
Cleanup
adarshsanjeev Mar 3, 2025
85a1ae8
Clean up
adarshsanjeev Mar 3, 2025
d0c62fd
Address review comments
adarshsanjeev Mar 4, 2025
cf389fe
Address review comments
adarshsanjeev Mar 4, 2025
a1af8c3
Revert new API
adarshsanjeev Mar 4, 2025
2ec0dd1
Address review comments
adarshsanjeev Mar 7, 2025
117ae34
Address review comments
adarshsanjeev Mar 7, 2025
426def6
Rename config
adarshsanjeev Mar 7, 2025
09f679a
Improve coverage
adarshsanjeev Mar 7, 2025
d2b4e93
Add dynamic configuration of batch size
adarshsanjeev Mar 8, 2025
6884fc3
Add test
adarshsanjeev Mar 8, 2025
96f89b6
Add test
adarshsanjeev Mar 9, 2025
f81e20f
Add test
adarshsanjeev Mar 9, 2025
4a14236
Fix tests
adarshsanjeev Mar 9, 2025
a502cef
Fix ITs
adarshsanjeev Mar 17, 2025
bdc0334
Merge remote-tracking branch 'origin/master' into turboload-historicals
adarshsanjeev Mar 20, 2025
ed7815a
Update docs
adarshsanjeev Mar 20, 2025
cf71484
Split executors
adarshsanjeev Mar 20, 2025
e24c5e0
Fix typo
adarshsanjeev Mar 20, 2025
844c2f9
Address review comments
adarshsanjeev Mar 21, 2025
dce2f12
Address review comments
adarshsanjeev Mar 21, 2025
42e20d4
Address review comments
adarshsanjeev Mar 21, 2025
fc92af4
Address review comments
adarshsanjeev Mar 21, 2025
d84e31c
Address review comments
adarshsanjeev Mar 24, 2025
752c1b7
Add correct threadpool
adarshsanjeev Mar 24, 2025
52ac772
Add comment
adarshsanjeev Mar 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions docs/api-reference/dynamic-configuration-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ Host: http://ROUTER_IP:ROUTER_PORT
"maxNonPrimaryReplicantsToLoad": 2147483647,
"useRoundRobinSegmentAssignment": true,
"smartSegmentLoading": true,
"debugDimensions": null
"debugDimensions": null,
"turboLoadingNodes": []
}
```

Expand Down Expand Up @@ -172,7 +173,8 @@ curl "http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/config" \
"pauseCoordination": false,
"replicateAfterLoadTimeout": false,
"maxNonPrimaryReplicantsToLoad": 2147483647,
"useRoundRobinSegmentAssignment": true
"useRoundRobinSegmentAssignment": true,
"turboLoadingNodes": []
}'
```

Expand Down Expand Up @@ -203,7 +205,8 @@ Content-Length: 683
"pauseCoordination": false,
"replicateAfterLoadTimeout": false,
"maxNonPrimaryReplicantsToLoad": 2147483647,
"useRoundRobinSegmentAssignment": true
"useRoundRobinSegmentAssignment": true,
"turboLoadingNodes": []
}
```

Expand Down Expand Up @@ -289,7 +292,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"comment": "",
"ip": "127.0.0.1"
},
"payload": "{\"millisToWaitBeforeDeleting\":900000,\"maxSegmentsToMove\":5,\"replicantLifetime\":15,\"replicationThrottleLimit\":10,\"balancerComputeThreads\":1,\"killDataSourceWhitelist\":[],\"killPendingSegmentsSkipList\":[],\"maxSegmentsInNodeLoadingQueue\":100,\"decommissioningNodes\":[],\"decommissioningMaxPercentOfMaxSegmentsToMove\":70,\"pauseCoordination\":false,\"replicateAfterLoadTimeout\":false,\"maxNonPrimaryReplicantsToLoad\":2147483647,\"useRoundRobinSegmentAssignment\":true,\"smartSegmentLoading\":true,\"debugDimensions\":null}",
"payload": "{\"millisToWaitBeforeDeleting\":900000,\"maxSegmentsToMove\":5,\"replicantLifetime\":15,\"replicationThrottleLimit\":10,\"balancerComputeThreads\":1,\"killDataSourceWhitelist\":[],\"killPendingSegmentsSkipList\":[],\"maxSegmentsInNodeLoadingQueue\":100,\"decommissioningNodes\":[],\"decommissioningMaxPercentOfMaxSegmentsToMove\":70,\"pauseCoordination\":false,\"replicateAfterLoadTimeout\":false,\"maxNonPrimaryReplicantsToLoad\":2147483647,\"useRoundRobinSegmentAssignment\":true,\"smartSegmentLoading\":true,\"debugDimensions\":null,\"decommissioningNodes\":[]}",
"auditTime": "2023-10-03T20:59:51.622Z"
}
]
Expand Down
3 changes: 2 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.coordinator.kill.maxInterval`|The largest interval, as an [ISO 8601 duration](https://en.wikipedia.org/wiki/ISO_8601#Durations), of segments to delete per kill task. Set to zero, e.g. `PT0S`, for unlimited. This only applies when `druid.coordinator.kill.on=true`.|`P30D`|
|`druid.coordinator.balancer.strategy`|The [balancing strategy](../design/coordinator.md#balancing-segments-in-a-tier) used by the Coordinator to distribute segments among the Historical servers in a tier. The `cost` strategy distributes segments by minimizing a cost function, `diskNormalized` weights these costs with the disk usage ratios of the servers and `random` distributes segments randomly.|`cost`|
|`druid.coordinator.loadqueuepeon.http.repeatDelay`|The start and repeat delay (in milliseconds) for the load queue peon, which manages the load/drop queue of segments for any server.|1 minute|
|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop requests to batch in one HTTP request. Note that it must be smaller than `druid.segmentCache.numLoadingThreads` config on Historical service.|1|
|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop requests to batch in one HTTP request. Note that it must be smaller than or equal to the `druid.segmentCache.numLoadingThreads` config on Historical service. If this value is not configured, the coordinator uses the value of the `numLoadingThreads` for the respective server. | `druid.segmentCache.numLoadingThreads` |
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator service should act like an Overlord as well. This configuration allows users to simplify a Druid cluster by not having to deploy any standalone Overlord services. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also.|false|
|`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord services and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL|

Expand Down Expand Up @@ -953,6 +953,7 @@ The following table shows the dynamic configuration properties for the Coordinat
|`decommissioningNodes`|List of Historical servers to decommission. Coordinator will not assign new segments to decommissioning servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `maxSegmentsToMove`.|none|
|`pauseCoordination`|Boolean flag for whether or not the Coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` interface. Such duties include: segment balancing, segment compaction, submitting kill tasks for unused segments (if enabled), logging of used segments in the cluster, marking of newly unused or overshadowed segments, matching and execution of load/drop rules for used segments, unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS name nodes with downtime and don't want the Coordinator to be directing Historical nodes to hit the name node with API requests until maintenance is done and the deep store is declared healthy for use again.|false|
|`replicateAfterLoadTimeout`|Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow Historicals in the cluster. However, the slow Historical may still load the segment later and the Coordinator may issue drop requests if the segment is over-replicated.|false|
|`turboLoadingNodes`| List of Historical servers to place in turbo loading mode. These servers use a larger thread-pool to load segments faster but at the cost of query performance. For servers specified in `turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator uses the value of the respective `numLoadingThreads` instead. |none|

##### Smart segment loading

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@
public interface DataSegmentChangeHandler
{
void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback);

void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.http.SegmentLoadingMode;
import org.apache.druid.server.metrics.SegmentRowCountDistribution;
import org.apache.druid.timeline.DataSegment;

Expand All @@ -44,8 +45,11 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -63,7 +67,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler
private final SegmentLoaderConfig config;
private final DataSegmentAnnouncer announcer;
private final SegmentManager segmentManager;
private final ScheduledExecutorService exec;
private final ScheduledExecutorService normalLoadExec;
private final ThreadPoolExecutor turboLoadExec;

private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;

Expand All @@ -88,9 +93,19 @@ public SegmentLoadDropHandler(
config,
announcer,
segmentManager,
Executors.newScheduledThreadPool(
new ScheduledThreadPoolExecutor(
Comment thread
adarshsanjeev marked this conversation as resolved.
config.getNumLoadingThreads(),
Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s")
Execs.makeThreadFactory("SegmentLoadDropHandler-normal-%s")
),
// Create a fixed size threadpool which has a timeout of 1 minute. Since they are all core threads, new threads
// will be created without enqueing the tasks till the capacity is reached.
new ThreadPoolExecutor(
config.getNumBootstrapThreads(),
Comment thread
adarshsanjeev marked this conversation as resolved.
Comment thread
kfaraz marked this conversation as resolved.
config.getNumBootstrapThreads(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
Execs.makeThreadFactory("SegmentLoadDropHandler-turbo-%s")
)
);
}
Expand All @@ -100,13 +115,17 @@ public SegmentLoadDropHandler(
SegmentLoaderConfig config,
DataSegmentAnnouncer announcer,
SegmentManager segmentManager,
ScheduledExecutorService exec
ScheduledExecutorService normalLoadExec,
ThreadPoolExecutor turboLoadExec
)
{
this.config = config;
this.announcer = announcer;
this.segmentManager = segmentManager;
this.exec = exec;
this.normalLoadExec = normalLoadExec;
this.turboLoadExec = turboLoadExec;

this.turboLoadExec.allowCoreThreadTimeOut(true);
Comment thread
adarshsanjeev marked this conversation as resolved.

this.segmentsToDelete = new ConcurrentSkipListSet<>();
requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
Expand Down Expand Up @@ -214,7 +233,7 @@ void removeSegment(
"Completely removing segment[%s] in [%,d]ms.",
segment.getId(), config.getDropSegmentDelayMillis()
);
exec.schedule(
normalLoadExec.schedule(
runnable,
config.getDropSegmentDelayMillis(),
TimeUnit.MILLISECONDS
Expand Down Expand Up @@ -244,14 +263,22 @@ public Collection<DataSegment> getSegmentsToDelete()
return ImmutableList.copyOf(segmentsToDelete);
}

public ListenableFuture<List<DataSegmentChangeResponse>> processBatch(List<DataSegmentChangeRequest> changeRequests)
/**
* Process a list of {@link DataSegmentChangeRequest}, invoking
* {@link #processRequest(DataSegmentChangeRequest, SegmentLoadingMode)} for each one. Handles the computation
* asynchronously and returns a future to the result.
*/
public ListenableFuture<List<DataSegmentChangeResponse>> processBatch(
List<DataSegmentChangeRequest> changeRequests,
SegmentLoadingMode segmentLoadingMode
)
{
boolean isAnyRequestDone = false;

Map<DataSegmentChangeRequest, AtomicReference<SegmentChangeStatus>> statuses = Maps.newHashMapWithExpectedSize(changeRequests.size());

for (DataSegmentChangeRequest cr : changeRequests) {
AtomicReference<SegmentChangeStatus> status = processRequest(cr);
AtomicReference<SegmentChangeStatus> status = processRequest(cr, segmentLoadingMode);
if (status.get().getState() != SegmentChangeStatus.State.PENDING) {
isAnyRequestDone = true;
}
Expand All @@ -271,7 +298,15 @@ public ListenableFuture<List<DataSegmentChangeResponse>> processBatch(List<DataS
return future;
}

private AtomicReference<SegmentChangeStatus> processRequest(DataSegmentChangeRequest changeRequest)
/**
* Process a {@link DataSegmentChangeRequest}, invoking the request's
* {@link DataSegmentChangeRequest#go(DataSegmentChangeHandler, DataSegmentChangeCallback)}.
* The segmentLoadingMode parameter determines the thread pool to use.
*/
private AtomicReference<SegmentChangeStatus> processRequest(
DataSegmentChangeRequest changeRequest,
SegmentLoadingMode segmentLoadingMode
)
{
synchronized (requestStatusesLock) {
AtomicReference<SegmentChangeStatus> status = requestStatuses.getIfPresent(changeRequest);
Expand All @@ -282,10 +317,13 @@ private AtomicReference<SegmentChangeStatus> processRequest(DataSegmentChangeReq
new DataSegmentChangeHandler()
{
@Override
public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
public void addSegment(
DataSegment segment,
@Nullable DataSegmentChangeCallback callback
)
{
requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.PENDING));
exec.submit(
getExecutorService(segmentLoadingMode).submit(
() -> SegmentLoadDropHandler.this.addSegment(
((SegmentChangeRequestLoad) changeRequest).getSegment(),
() -> resolveWaitingFutures()
Expand All @@ -294,7 +332,10 @@ public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback
}

@Override
public void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback)
public void removeSegment(
DataSegment segment,
@Nullable DataSegmentChangeCallback callback
)
{
requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.PENDING));
SegmentLoadDropHandler.this.removeSegment(
Expand Down Expand Up @@ -386,5 +427,15 @@ public boolean cancel(boolean interruptIfRunning)
return true;
}
}

private ExecutorService getExecutorService(SegmentLoadingMode loadingMode)
{
return loadingMode == SegmentLoadingMode.TURBO ? turboLoadExec : normalLoadExec;
}

public SegmentLoaderConfig getSegmentLoaderConfig()
{
return config;
}
}

Loading
Loading