diff --git a/docs/api-reference/dynamic-configuration-api.md b/docs/api-reference/dynamic-configuration-api.md index 90b7028f2478..971aa81d206a 100644 --- a/docs/api-reference/dynamic-configuration-api.md +++ b/docs/api-reference/dynamic-configuration-api.md @@ -105,7 +105,8 @@ Host: http://ROUTER_IP:ROUTER_PORT "maxNonPrimaryReplicantsToLoad": 2147483647, "useRoundRobinSegmentAssignment": true, "smartSegmentLoading": true, - "debugDimensions": null + "debugDimensions": null, + "turboLoadingNodes": [] } ``` @@ -172,7 +173,8 @@ curl "http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/config" \ "pauseCoordination": false, "replicateAfterLoadTimeout": false, "maxNonPrimaryReplicantsToLoad": 2147483647, - "useRoundRobinSegmentAssignment": true + "useRoundRobinSegmentAssignment": true, + "turboLoadingNodes": [] }' ``` @@ -203,7 +205,8 @@ Content-Length: 683 "pauseCoordination": false, "replicateAfterLoadTimeout": false, "maxNonPrimaryReplicantsToLoad": 2147483647, - "useRoundRobinSegmentAssignment": true + "useRoundRobinSegmentAssignment": true, + "turboLoadingNodes": [] } ``` @@ -289,7 +292,7 @@ Host: http://ROUTER_IP:ROUTER_PORT "comment": "", "ip": "127.0.0.1" }, - "payload": "{\"millisToWaitBeforeDeleting\":900000,\"maxSegmentsToMove\":5,\"replicantLifetime\":15,\"replicationThrottleLimit\":10,\"balancerComputeThreads\":1,\"killDataSourceWhitelist\":[],\"killPendingSegmentsSkipList\":[],\"maxSegmentsInNodeLoadingQueue\":100,\"decommissioningNodes\":[],\"decommissioningMaxPercentOfMaxSegmentsToMove\":70,\"pauseCoordination\":false,\"replicateAfterLoadTimeout\":false,\"maxNonPrimaryReplicantsToLoad\":2147483647,\"useRoundRobinSegmentAssignment\":true,\"smartSegmentLoading\":true,\"debugDimensions\":null}", + "payload": "{\"millisToWaitBeforeDeleting\":900000,\"maxSegmentsToMove\":5,\"replicantLifetime\":15,\"replicationThrottleLimit\":10,\"balancerComputeThreads\":1,\"killDataSourceWhitelist\":[],\"killPendingSegmentsSkipList\":[],\"maxSegmentsInNodeLoadingQueue\":100,\"decommissioningNodes\":[],\"decommissioningMaxPercentOfMaxSegmentsToMove\":70,\"pauseCoordination\":false,\"replicateAfterLoadTimeout\":false,\"maxNonPrimaryReplicantsToLoad\":2147483647,\"useRoundRobinSegmentAssignment\":true,\"smartSegmentLoading\":true,\"debugDimensions\":null,\"decommissioningNodes\":[]}", "auditTime": "2023-10-03T20:59:51.622Z" } ] diff --git a/docs/configuration/index.md b/docs/configuration/index.md index d75309b7ac31..687e1d688a67 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -885,7 +885,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti |`druid.coordinator.kill.maxInterval`|The largest interval, as an [ISO 8601 duration](https://en.wikipedia.org/wiki/ISO_8601#Durations), of segments to delete per kill task. Set to zero, e.g. `PT0S`, for unlimited. This only applies when `druid.coordinator.kill.on=true`.|`P30D`| |`druid.coordinator.balancer.strategy`|The [balancing strategy](../design/coordinator.md#balancing-segments-in-a-tier) used by the Coordinator to distribute segments among the Historical servers in a tier. The `cost` strategy distributes segments by minimizing a cost function, `diskNormalized` weights these costs with the disk usage ratios of the servers and `random` distributes segments randomly.|`cost`| |`druid.coordinator.loadqueuepeon.http.repeatDelay`|The start and repeat delay (in milliseconds) for the load queue peon, which manages the load/drop queue of segments for any server.|1 minute| -|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop requests to batch in one HTTP request. Note that it must be smaller than `druid.segmentCache.numLoadingThreads` config on Historical service.|1| +|`druid.coordinator.loadqueuepeon.http.batchSize`|Number of segment load/drop requests to batch in one HTTP request. Note that it must be smaller than or equal to the `druid.segmentCache.numLoadingThreads` config on Historical service. If this value is not configured, the coordinator uses the value of the `numLoadingThreads` for the respective server. | `druid.segmentCache.numLoadingThreads` | |`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator service should act like an Overlord as well. This configuration allows users to simplify a Druid cluster by not having to deploy any standalone Overlord services. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also.|false| |`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord services and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL| @@ -953,6 +953,7 @@ The following table shows the dynamic configuration properties for the Coordinat |`decommissioningNodes`|List of Historical servers to decommission. Coordinator will not assign new segments to decommissioning servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `maxSegmentsToMove`.|none| |`pauseCoordination`|Boolean flag for whether or not the Coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` interface. Such duties include: segment balancing, segment compaction, submitting kill tasks for unused segments (if enabled), logging of used segments in the cluster, marking of newly unused or overshadowed segments, matching and execution of load/drop rules for used segments, unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS name nodes with downtime and don't want the Coordinator to be directing Historical nodes to hit the name node with API requests until maintenance is done and the deep store is declared healthy for use again.|false| |`replicateAfterLoadTimeout`|Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow Historicals in the cluster. However, the slow Historical may still load the segment later and the Coordinator may issue drop requests if the segment is over-replicated.|false| +|`turboLoadingNodes`| List of Historical servers to place in turbo loading mode. These servers use a larger thread-pool to load segments faster but at the cost of query performance. For servers specified in `turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator uses the value of the respective `numLoadingThreads` instead. |none| ##### Smart segment loading diff --git a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java index cd2a8c3740f2..70758cd63abd 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/DataSegmentChangeHandler.java @@ -28,5 +28,6 @@ public interface DataSegmentChangeHandler { void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback); + void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback); } diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 12462adab2f6..4d3e22c1a131 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -33,6 +33,7 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.http.SegmentLoadingMode; import org.apache.druid.server.metrics.SegmentRowCountDistribution; import org.apache.druid.timeline.DataSegment; @@ -44,8 +45,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -63,7 +67,8 @@ public class SegmentLoadDropHandler implements DataSegmentChangeHandler private final SegmentLoaderConfig config; private final DataSegmentAnnouncer announcer; private final SegmentManager segmentManager; - private final ScheduledExecutorService exec; + private final ScheduledExecutorService normalLoadExec; + private final ThreadPoolExecutor turboLoadExec; private final ConcurrentSkipListSet segmentsToDelete; @@ -88,9 +93,19 @@ public SegmentLoadDropHandler( config, announcer, segmentManager, - Executors.newScheduledThreadPool( + new ScheduledThreadPoolExecutor( config.getNumLoadingThreads(), - Execs.makeThreadFactory("SimpleDataSegmentChangeHandler-%s") + Execs.makeThreadFactory("SegmentLoadDropHandler-normal-%s") + ), + // Create a fixed size threadpool which has a timeout of 1 minute. Since they are all core threads, new threads + // will be created without enqueing the tasks till the capacity is reached. + new ThreadPoolExecutor( + config.getNumBootstrapThreads(), + config.getNumBootstrapThreads(), + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + Execs.makeThreadFactory("SegmentLoadDropHandler-turbo-%s") ) ); } @@ -100,13 +115,17 @@ public SegmentLoadDropHandler( SegmentLoaderConfig config, DataSegmentAnnouncer announcer, SegmentManager segmentManager, - ScheduledExecutorService exec + ScheduledExecutorService normalLoadExec, + ThreadPoolExecutor turboLoadExec ) { this.config = config; this.announcer = announcer; this.segmentManager = segmentManager; - this.exec = exec; + this.normalLoadExec = normalLoadExec; + this.turboLoadExec = turboLoadExec; + + this.turboLoadExec.allowCoreThreadTimeOut(true); this.segmentsToDelete = new ConcurrentSkipListSet<>(); requestStatuses = CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build(); @@ -214,7 +233,7 @@ void removeSegment( "Completely removing segment[%s] in [%,d]ms.", segment.getId(), config.getDropSegmentDelayMillis() ); - exec.schedule( + normalLoadExec.schedule( runnable, config.getDropSegmentDelayMillis(), TimeUnit.MILLISECONDS @@ -244,14 +263,22 @@ public Collection getSegmentsToDelete() return ImmutableList.copyOf(segmentsToDelete); } - public ListenableFuture> processBatch(List changeRequests) + /** + * Process a list of {@link DataSegmentChangeRequest}, invoking + * {@link #processRequest(DataSegmentChangeRequest, SegmentLoadingMode)} for each one. Handles the computation + * asynchronously and returns a future to the result. + */ + public ListenableFuture> processBatch( + List changeRequests, + SegmentLoadingMode segmentLoadingMode + ) { boolean isAnyRequestDone = false; Map> statuses = Maps.newHashMapWithExpectedSize(changeRequests.size()); for (DataSegmentChangeRequest cr : changeRequests) { - AtomicReference status = processRequest(cr); + AtomicReference status = processRequest(cr, segmentLoadingMode); if (status.get().getState() != SegmentChangeStatus.State.PENDING) { isAnyRequestDone = true; } @@ -271,7 +298,15 @@ public ListenableFuture> processBatch(List processRequest(DataSegmentChangeRequest changeRequest) + /** + * Process a {@link DataSegmentChangeRequest}, invoking the request's + * {@link DataSegmentChangeRequest#go(DataSegmentChangeHandler, DataSegmentChangeCallback)}. + * The segmentLoadingMode parameter determines the thread pool to use. + */ + private AtomicReference processRequest( + DataSegmentChangeRequest changeRequest, + SegmentLoadingMode segmentLoadingMode + ) { synchronized (requestStatusesLock) { AtomicReference status = requestStatuses.getIfPresent(changeRequest); @@ -282,10 +317,13 @@ private AtomicReference processRequest(DataSegmentChangeReq new DataSegmentChangeHandler() { @Override - public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) + public void addSegment( + DataSegment segment, + @Nullable DataSegmentChangeCallback callback + ) { requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.PENDING)); - exec.submit( + getExecutorService(segmentLoadingMode).submit( () -> SegmentLoadDropHandler.this.addSegment( ((SegmentChangeRequestLoad) changeRequest).getSegment(), () -> resolveWaitingFutures() @@ -294,7 +332,10 @@ public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback } @Override - public void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) + public void removeSegment( + DataSegment segment, + @Nullable DataSegmentChangeCallback callback + ) { requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.PENDING)); SegmentLoadDropHandler.this.removeSegment( @@ -386,5 +427,15 @@ public boolean cancel(boolean interruptIfRunning) return true; } } + + private ExecutorService getExecutorService(SegmentLoadingMode loadingMode) + { + return loadingMode == SegmentLoadingMode.TURBO ? turboLoadExec : normalLoadExec; + } + + public SegmentLoaderConfig getSegmentLoaderConfig() + { + return config; + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java index d805bad5e01c..806b6ebbee1e 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -23,10 +23,12 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableSet; +import org.apache.druid.common.config.Configs; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.error.InvalidInput; import org.apache.druid.server.coordinator.duty.KillUnusedSegments; import org.apache.druid.server.coordinator.stats.Dimension; +import org.apache.druid.server.http.SegmentLoadingMode; import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; @@ -70,6 +72,8 @@ public class CoordinatorDynamicConfig private final Map debugDimensions; private final Map validDebugDimensions; + private final Set turboLoadingNodes; + /** * Stale pending segments belonging to the data sources in this list are not killed by {@code * KillStalePendingSegments}. In other words, segments in these data sources are "protected". @@ -118,7 +122,8 @@ public CoordinatorDynamicConfig( @JsonProperty("replicateAfterLoadTimeout") boolean replicateAfterLoadTimeout, @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment, @JsonProperty("smartSegmentLoading") @Nullable Boolean smartSegmentLoading, - @JsonProperty("debugDimensions") @Nullable Map debugDimensions + @JsonProperty("debugDimensions") @Nullable Map debugDimensions, + @JsonProperty("turboLoadingNodes") @Nullable Set turboLoadingNodes ) { this.markSegmentAsUnusedDelayMillis = @@ -162,6 +167,7 @@ public CoordinatorDynamicConfig( ); this.debugDimensions = debugDimensions; this.validDebugDimensions = validateDebugDimensions(debugDimensions); + this.turboLoadingNodes = Configs.valueOrDefault(turboLoadingNodes, Set.of()); } private Map validateDebugDimensions(Map debugDimensions) @@ -200,6 +206,13 @@ private static Set parseJsonStringOrArray(Object jsonStringOrArray) } } + public SegmentLoadingMode getLoadingModeForServer(String serverName) + { + return turboLoadingNodes.contains(serverName) ? + SegmentLoadingMode.TURBO : + SegmentLoadingMode.NORMAL; + } + @JsonProperty("millisToWaitBeforeDeleting") public long getMarkSegmentAsUnusedDelayMillis() { @@ -308,6 +321,19 @@ public boolean getReplicateAfterLoadTimeout() return replicateAfterLoadTimeout; } + /** + * List of servers to put in turbo-loading mode. These servers will use a larger thread pool to load + * segments. This causes decreases the average time taken to load segments. However, this also means less resources + * available to query threads which may cause a drop in query performance. + * + * @return Set of host:port entries + */ + @JsonProperty + public Set getTurboLoadingNodes() + { + return turboLoadingNodes; + } + @Override public String toString() { @@ -326,6 +352,7 @@ public String toString() ", decommissioningNodes=" + decommissioningNodes + ", pauseCoordination=" + pauseCoordination + ", replicateAfterLoadTimeout=" + replicateAfterLoadTimeout + + ", turboLoadingNodes=" + turboLoadingNodes + '}'; } @@ -359,6 +386,7 @@ public boolean equals(Object o) dataSourcesToNotKillStalePendingSegmentsIn, that.dataSourcesToNotKillStalePendingSegmentsIn) && Objects.equals(decommissioningNodes, that.decommissioningNodes) + && Objects.equals(turboLoadingNodes, that.turboLoadingNodes) && Objects.equals(debugDimensions, that.debugDimensions); } @@ -378,7 +406,8 @@ public int hashCode() dataSourcesToNotKillStalePendingSegmentsIn, decommissioningNodes, pauseCoordination, - debugDimensions + debugDimensions, + turboLoadingNodes ); } @@ -430,6 +459,7 @@ public static class Builder private Boolean replicateAfterLoadTimeout; private Boolean useRoundRobinSegmentAssignment; private Boolean smartSegmentLoading; + private Set turboLoadingNodes; public Builder() { @@ -452,7 +482,8 @@ public Builder( @JsonProperty("replicateAfterLoadTimeout") @Nullable Boolean replicateAfterLoadTimeout, @JsonProperty("useRoundRobinSegmentAssignment") @Nullable Boolean useRoundRobinSegmentAssignment, @JsonProperty("smartSegmentLoading") @Nullable Boolean smartSegmentLoading, - @JsonProperty("debugDimensions") @Nullable Map debugDimensions + @JsonProperty("debugDimensions") @Nullable Map debugDimensions, + @JsonProperty("turboLoadingNodes") @Nullable Set turboLoadingNodes ) { this.markSegmentAsUnusedDelayMillis = markSegmentAsUnusedDelayMillis; @@ -471,6 +502,7 @@ public Builder( this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment; this.smartSegmentLoading = smartSegmentLoading; this.debugDimensions = debugDimensions; + this.turboLoadingNodes = turboLoadingNodes; } public Builder withMarkSegmentAsUnusedDelayMillis(long leadingTimeMillis) @@ -491,6 +523,12 @@ public Builder withSmartSegmentLoading(boolean smartSegmentLoading) return this; } + public Builder withTurboLoadingNodes(Set turboLoadingNodes) + { + this.turboLoadingNodes = turboLoadingNodes; + return this; + } + public Builder withReplicantLifetime(int replicantLifetime) { this.replicantLifetime = replicantLifetime; @@ -582,7 +620,8 @@ public CoordinatorDynamicConfig build() valueOrDefault(replicateAfterLoadTimeout, Defaults.REPLICATE_AFTER_LOAD_TIMEOUT), valueOrDefault(useRoundRobinSegmentAssignment, Defaults.USE_ROUND_ROBIN_ASSIGNMENT), valueOrDefault(smartSegmentLoading, Defaults.SMART_SEGMENT_LOADING), - debugDimensions + debugDimensions, + turboLoadingNodes ); } @@ -612,7 +651,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults) valueOrDefault(replicateAfterLoadTimeout, defaults.getReplicateAfterLoadTimeout()), valueOrDefault(useRoundRobinSegmentAssignment, defaults.isUseRoundRobinSegmentAssignment()), valueOrDefault(smartSegmentLoading, defaults.isSmartSegmentLoading()), - valueOrDefault(debugDimensions, defaults.getDebugDimensions()) + valueOrDefault(debugDimensions, defaults.getDebugDimensions()), + valueOrDefault(turboLoadingNodes, defaults.getTurboLoadingNodes()) ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java index f6f402037d1f..42af2e948cf8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java @@ -22,8 +22,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.common.config.Configs; +import org.apache.druid.java.util.common.RE; import org.joda.time.Duration; +import javax.annotation.Nullable; + public class HttpLoadQueuePeonConfig { private static final Duration DEFAULT_LOAD_TIMEOUT = Duration.standardMinutes(15); @@ -35,21 +38,28 @@ public class HttpLoadQueuePeonConfig private final Duration repeatDelay; @JsonProperty - private final int batchSize; + @Nullable + private final Integer batchSize; @JsonCreator public HttpLoadQueuePeonConfig( @JsonProperty("hostTimeout") Duration hostTimeout, @JsonProperty("repeatDelay") Duration repeatDelay, - @JsonProperty("batchSize") Integer batchSize + @JsonProperty("batchSize") @Nullable Integer batchSize ) { this.hostTimeout = Configs.valueOrDefault(hostTimeout, Duration.standardMinutes(5)); this.repeatDelay = Configs.valueOrDefault(repeatDelay, Duration.standardMinutes(1)); - this.batchSize = Configs.valueOrDefault(batchSize, 1); + + if (batchSize != null && batchSize < 1) { + throw new RE("Batch size must be greater than 0."); + } + + this.batchSize = batchSize; } - public int getBatchSize() + @Nullable + public Integer getBatchSize() { return batchSize; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java index 015b5686769d..4f5ff04911ed 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java @@ -22,9 +22,11 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.common.config.Configs; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; @@ -45,6 +47,8 @@ import org.apache.druid.server.coordinator.stats.Dimension; import org.apache.druid.server.coordinator.stats.RowKey; import org.apache.druid.server.coordinator.stats.Stats; +import org.apache.druid.server.http.SegmentLoadingCapabilities; +import org.apache.druid.server.http.SegmentLoadingMode; import org.apache.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -53,7 +57,6 @@ import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MediaType; import java.io.InputStream; -import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.Collections; @@ -69,6 +72,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; /** * @@ -82,6 +86,7 @@ public class HttpLoadQueuePeon implements LoadQueuePeon new TypeReference<>() {}; private static final EmittingLogger log = new EmittingLogger(HttpLoadQueuePeon.class); + private static final long DEFAULT_TIMEOUT = 10000L; private final AtomicLong queuedSize = new AtomicLong(0); private final AtomicReference stats = new AtomicReference<>(new CoordinatorRunStats()); @@ -114,19 +119,21 @@ public class HttpLoadQueuePeon implements LoadQueuePeon private final ObjectMapper jsonMapper; private final HttpClient httpClient; - private final URL changeRequestURL; private final String serverId; private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false); private final ExecutorService callBackExecutor; + private final Supplier loadingModeSupplier; private final ObjectWriter requestBodyWriter; + private final SegmentLoadingCapabilities serverCapabilities; public HttpLoadQueuePeon( String baseUrl, ObjectMapper jsonMapper, HttpClient httpClient, HttpLoadQueuePeonConfig config, + Supplier loadingModeSupplier, ScheduledExecutorService processingExecutor, ExecutorService callBackExecutor ) @@ -139,17 +146,48 @@ public HttpLoadQueuePeon( this.callBackExecutor = callBackExecutor; this.serverId = baseUrl; + this.loadingModeSupplier = loadingModeSupplier; + this.serverCapabilities = fetchSegmentLoadingCapabilities(); + } + + @VisibleForTesting + SegmentLoadingCapabilities fetchSegmentLoadingCapabilities() + { try { - this.changeRequestURL = new URL( - new URL(baseUrl), - StringUtils.nonStrictFormat( - "druid-internal/v1/segments/changeRequests?timeout=%d", - config.getHostTimeout().getMillis() - ) + final URL segmentLoadingCapabilitiesURL = new URL( + new URL(serverId), + "druid-internal/v1/segments/loadCapabilities" + ); + + BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler(); + InputStream stream = httpClient.go( + new Request(HttpMethod.GET, segmentLoadingCapabilitiesURL) + .addHeader(HttpHeaders.Names.ACCEPT, MediaType.APPLICATION_JSON), + responseHandler, + new Duration(DEFAULT_TIMEOUT) + ).get(); + + if (HttpServletResponse.SC_NOT_FOUND == responseHandler.getStatus()) { + int batchSize = config.getBatchSize() == null ? 1 : config.getBatchSize(); + SegmentLoadingCapabilities defaultCapabilities = new SegmentLoadingCapabilities(batchSize, batchSize); + log.warn( + "Historical capabilities endpoint not found at URL[%s]. Using default values[%s].", + segmentLoadingCapabilitiesURL, + defaultCapabilities + ); + return defaultCapabilities; + } else if (HttpServletResponse.SC_OK != responseHandler.getStatus()) { + log.makeAlert("Received status[%s] when fetching loading capabilities from server[%s]", responseHandler.getStatus(), serverId); + throw new RE("Received status[%s] when fetching loading capabilities from server[%s]", responseHandler.getStatus(), serverId); + } + + return jsonMapper.readValue( + stream, + SegmentLoadingCapabilities.class ); } - catch (MalformedURLException ex) { - throw new RuntimeException(ex); + catch (Throwable th) { + throw new RE(th, "Received error while fetching historical capabilities from Server[%s].", serverId); } } @@ -160,7 +198,8 @@ private void doSegmentManagement() return; } - final int batchSize = config.getBatchSize(); + final SegmentLoadingMode loadingMode = loadingModeSupplier.get(); + final int batchSize = calculateBatchSize(loadingMode); final List newRequests = new ArrayList<>(batchSize); @@ -194,19 +233,28 @@ private void doSegmentManagement() if (newRequests.isEmpty()) { log.trace( "[%s]Found no load/drop requests. SegmentsToLoad[%d], SegmentsToDrop[%d], batchSize[%d].", - serverId, segmentsToLoad.size(), segmentsToDrop.size(), config.getBatchSize() + serverId, segmentsToLoad.size(), segmentsToDrop.size(), batchSize ); mainLoopInProgress.set(false); return; } try { - log.trace("Sending [%d] load/drop requests to Server[%s].", newRequests.size(), serverId); + log.trace("Sending [%d] load/drop requests to Server[%s] in loadingMode[%s].", newRequests.size(), serverId, loadingMode); final boolean hasLoadRequests = newRequests.stream().anyMatch(r -> r instanceof SegmentChangeRequestLoad); if (hasLoadRequests && !loadingRateTracker.isLoadingBatch()) { loadingRateTracker.markBatchLoadingStarted(); } + final URL changeRequestURL = new URL( + new URL(serverId), + StringUtils.nonStrictFormat( + "druid-internal/v1/segments/changeRequests?timeout=%d&loadingMode=%s", + config.getHostTimeout().getMillis(), + loadingMode + ) + ); + BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler(); ListenableFuture future = httpClient.go( new Request(HttpMethod.POST, changeRequestURL) @@ -314,6 +362,25 @@ private void logRequestFailure(Throwable t) } } + /** + * Calculates the number of segments the server is capable of handling at a time. If loading segments in turbo loading + * mode, returns the number of turbo loading threads on the server. Otherwise, return the value set by the batch size + * runtime parameter, or number of normal threads on the server if the parameter is not set. + * Always returns a positive integer. + */ + @VisibleForTesting + int calculateBatchSize(SegmentLoadingMode loadingMode) + { + int batchSize; + if (SegmentLoadingMode.TURBO.equals(loadingMode)) { + batchSize = serverCapabilities.getNumTurboLoadingThreads(); + } else { + batchSize = Configs.valueOrDefault(config.getBatchSize(), serverCapabilities.getNumLoadingThreads()); + } + + return Math.max(batchSize, 1); + } + private void handleResponseStatus(DataSegmentChangeRequest changeRequest, SegmentChangeStatus status) { changeRequest.go( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java index ed6d2d5a3ea3..d9fdca36c67d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java @@ -25,6 +25,7 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig; import java.util.List; @@ -34,6 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; /** * Provides LoadQueuePeons @@ -47,6 +49,7 @@ public class LoadQueueTaskMaster private final ExecutorService callbackExec; private final HttpLoadQueuePeonConfig config; private final HttpClient httpClient; + private final Supplier coordinatorDynamicConfigSupplier; @GuardedBy("this") private final AtomicBoolean isLeader = new AtomicBoolean(false); @@ -58,7 +61,8 @@ public LoadQueueTaskMaster( ScheduledExecutorService peonExec, ExecutorService callbackExec, HttpLoadQueuePeonConfig config, - HttpClient httpClient + HttpClient httpClient, + Supplier coordinatorDynamicConfigSupplier ) { this.jsonMapper = jsonMapper; @@ -66,11 +70,20 @@ public LoadQueueTaskMaster( this.callbackExec = callbackExec; this.config = config; this.httpClient = httpClient; + this.coordinatorDynamicConfigSupplier = coordinatorDynamicConfigSupplier; } private LoadQueuePeon createPeon(ImmutableDruidServer server) { - return new HttpLoadQueuePeon(server.getURL(), jsonMapper, httpClient, config, peonExec, callbackExec); + return new HttpLoadQueuePeon( + server.getURL(), + jsonMapper, + httpClient, + config, + () -> coordinatorDynamicConfigSupplier.get().getLoadingModeForServer(server.getName()), + peonExec, + callbackExec + ); } public Map getAllPeons() diff --git a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java index 7c4392daf4e2..48e037271995 100644 --- a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java +++ b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java @@ -31,6 +31,7 @@ import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer; import org.apache.druid.server.coordination.ChangeRequestHistory; import org.apache.druid.server.coordination.ChangeRequestsSnapshot; @@ -54,6 +55,7 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import java.io.IOException; import java.util.List; @@ -219,10 +221,10 @@ public void onFailure(Throwable th) * This endpoint is used by HttpLoadQueuePeon to assign segment load/drop requests batch. This endpoint makes the * client wait till one of the following events occur. Note that this is implemented using async IO so no jetty * threads are held while in wait. - * - * (1) Given timeout elapses. - * (2) Some load/drop request completed. - * + *
    + *
  1. Given timeout elapses.
  2. + *
  3. Some load/drop request completed.
  4. + *
* It returns a map of "load/drop request -> SUCCESS/FAILED/PENDING status" for each request in the batch. */ @POST @@ -231,6 +233,7 @@ public void onFailure(Throwable th) @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) public void applyDataSegmentChangeRequests( @QueryParam("timeout") long timeout, + @QueryParam("loadingMode") @Nullable SegmentLoadingMode loadingMode, List changeRequestList, @Context final HttpServletRequest req ) throws IOException @@ -252,7 +255,7 @@ public void applyDataSegmentChangeRequests( final ResponseContext context = createContext(req.getHeader("Accept")); final ListenableFuture> future = - loadDropRequestHandler.processBatch(changeRequestList); + loadDropRequestHandler.processBatch(changeRequestList, loadingMode == null ? SegmentLoadingMode.NORMAL : loadingMode); final AsyncContext asyncContext = req.startAsync(); @@ -327,6 +330,24 @@ public void onFailure(Throwable th) asyncContext.setTimeout(timeout); } + @GET + @Path("/loadCapabilities") + @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + public Response getSegmentLoadingCapabilities( + @Context final HttpServletRequest req + ) + { + if (loadDropRequestHandler == null) { + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); + } + + SegmentLoaderConfig config = loadDropRequestHandler.getSegmentLoaderConfig(); + SegmentLoadingCapabilities capabilitiesResponse = + new SegmentLoadingCapabilities(config.getNumLoadingThreads(), config.getNumBootstrapThreads()); + + return Response.status(Response.Status.OK).entity(capabilitiesResponse).build(); + } + private void sendErrorResponse(HttpServletRequest req, int code, String error) throws IOException { AsyncContext asyncContext = req.startAsync(); diff --git a/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java new file mode 100644 index 000000000000..9dba8af5e6bb --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Contains information related to the capability of a server to load segments, for example the number of threads + * available. + */ +public class SegmentLoadingCapabilities +{ + private final int numLoadingThreads; + private final int numTurboLoadingThreads; + + @JsonCreator + public SegmentLoadingCapabilities( + @JsonProperty("numLoadingThreads") int numLoadingThreads, + @JsonProperty("numTurboLoadingThreads") int numTurboLoadingThreads + ) + { + this.numLoadingThreads = numLoadingThreads; + this.numTurboLoadingThreads = numTurboLoadingThreads; + } + + @JsonProperty("numLoadingThreads") + public int getNumLoadingThreads() + { + return numLoadingThreads; + } + + @JsonProperty("numTurboLoadingThreads") + public int getNumTurboLoadingThreads() + { + return numTurboLoadingThreads; + } + + @Override + public String toString() + { + return "SegmentLoadingCapabilities{" + + "numLoadingThreads=" + numLoadingThreads + + ", numTurboLoadingThreads=" + numTurboLoadingThreads + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/http/SegmentLoadingMode.java b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingMode.java new file mode 100644 index 000000000000..b3896a540bda --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingMode.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +/** + * Determines the threadpool used by the historical to load segments. + */ +public enum SegmentLoadingMode +{ + NORMAL, + TURBO +} diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index cd2fe2dbd63e..7a8822a60d87 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -30,6 +30,7 @@ import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.SegmentChangeStatus.State; +import org.apache.druid.server.http.SegmentLoadingMode; import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Before; @@ -48,6 +49,7 @@ import java.util.Map; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.apache.druid.server.TestSegmentUtils.makeSegment; @@ -106,7 +108,7 @@ public int getDropSegmentDelayMillis() scheduledExecutorFactory = (corePoolSize, nameFormat) -> { // Override normal behavior by adding the runnable to a list so that you can make sure - // all the shceduled runnables are executed by explicitly calling run() on each item in the list + // all the scheduled runnables are executed by explicitly calling run() on each item in the list return new ScheduledThreadPoolExecutor(corePoolSize, Execs.makeThreadFactory(nameFormat)) { @Override @@ -230,7 +232,7 @@ public void testProcessBatch() throws Exception new SegmentChangeRequestDrop(segment2) ); - ListenableFuture> future = handler.processBatch(batch); + ListenableFuture> future = handler.processBatch(batch, SegmentLoadingMode.NORMAL); Map expectedStatusMap = new HashMap<>(); expectedStatusMap.put(batch.get(0), SegmentChangeStatus.PENDING); @@ -244,7 +246,7 @@ public void testProcessBatch() throws Exception runnable.run(); } - result = handler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1))).get(); + result = handler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1)), SegmentLoadingMode.TURBO).get(); Assert.assertEquals(SegmentChangeStatus.SUCCESS, result.get(0).getStatus()); Assert.assertEquals(ImmutableList.of(segment1), segmentAnnouncer.getObservedSegments()); @@ -271,7 +273,7 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ DataSegment segment1 = makeSegment("batchtest1", "1", Intervals.of("P1d/2011-04-01")); List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); - ListenableFuture> future = handler.processBatch(batch); + ListenableFuture> future = handler.processBatch(batch, SegmentLoadingMode.NORMAL); for (Runnable runnable : scheduledRunnable) { runnable.run(); @@ -280,7 +282,7 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ Assert.assertEquals(State.FAILED, result.get(0).getStatus().getState()); Assert.assertEquals(ImmutableList.of(), segmentAnnouncer.getObservedSegments()); - future = handler.processBatch(batch); + future = handler.processBatch(batch, SegmentLoadingMode.NORMAL); for (Runnable runnable : scheduledRunnable) { runnable.run(); } @@ -343,7 +345,7 @@ public int getDropSegmentDelayMillis() List batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); // Request 1: Load the segment - ListenableFuture> future = handler.processBatch(batch); + ListenableFuture> future = handler.processBatch(batch, SegmentLoadingMode.NORMAL); for (Runnable runnable : scheduledRunnable) { runnable.run(); } @@ -354,7 +356,7 @@ public int getDropSegmentDelayMillis() // Request 2: Drop the segment batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1)); - future = handler.processBatch(batch); + future = handler.processBatch(batch, SegmentLoadingMode.NORMAL); for (Runnable runnable : scheduledRunnable) { runnable.run(); } @@ -372,7 +374,7 @@ public int getDropSegmentDelayMillis() // Request 3: Reload the segment batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); - future = handler.processBatch(batch); + future = handler.processBatch(batch, SegmentLoadingMode.NORMAL); for (Runnable runnable : scheduledRunnable) { runnable.run(); } @@ -389,7 +391,7 @@ public int getDropSegmentDelayMillis() // Request 4: Try to reload the segment - segment is loaded and announced again batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1)); - future = handler.processBatch(batch); + future = handler.processBatch(batch, SegmentLoadingMode.NORMAL); for (Runnable runnable : scheduledRunnable) { runnable.run(); } @@ -420,7 +422,8 @@ private SegmentLoadDropHandler initSegmentLoadDropHandler( config, segmentAnnouncer, segmentManager, - scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]") + scheduledExecutorFactory.create(5, "LoadDropHandlerTest-[%d]"), + (ThreadPoolExecutor) scheduledExecutorFactory.create(5, "TurboSegmentLoadDropHandlerTest-[%d]") ); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java index 36cb9c8439f4..d91cb62050a1 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java @@ -101,7 +101,7 @@ public void testLoadQueuePeonConfigDefaultValues() Assert.assertEquals(Duration.standardMinutes(1), config.getRepeatDelay()); Assert.assertEquals(Duration.standardMinutes(5), config.getHostTimeout()); Assert.assertEquals(Duration.standardMinutes(15), config.getLoadTimeout()); - Assert.assertEquals(1, config.getBatchSize()); + Assert.assertNull(config.getBatchSize()); } @Test @@ -118,7 +118,7 @@ public void testLoadQueuePeonConfigOverrideValues() Assert.assertEquals(Duration.standardMinutes(20), config.getRepeatDelay()); Assert.assertEquals(Duration.standardMinutes(10), config.getHostTimeout()); Assert.assertEquals(Duration.standardMinutes(15), config.getLoadTimeout()); - Assert.assertEquals(100, config.getBatchSize()); + Assert.assertEquals(Integer.valueOf(100), config.getBatchSize()); } @Test diff --git a/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java new file mode 100644 index 000000000000..4d9a2400215c --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.config; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.ValueInstantiationException; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.Assert; +import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; + +public class HttpLoadQueuePeonConfigTest +{ + @Test + public void testValidateBatchSize() throws JsonProcessingException + { + ObjectMapper jsonMapper = new ObjectMapper(); + + MatcherAssert.assertThat( + Assert.assertThrows(ValueInstantiationException.class, () -> + jsonMapper.readValue("{\"batchSize\":0}", HttpLoadQueuePeonConfig.class) + ), + CoreMatchers.allOf( + CoreMatchers.instanceOf(ValueInstantiationException.class), + ThrowableMessageMatcher.hasMessage( + CoreMatchers.containsString("Batch size must be greater than 0.") + ) + ) + ); + + HttpLoadQueuePeonConfig emptyConfig = jsonMapper.readValue( + "{}", + HttpLoadQueuePeonConfig.class + ); + Assert.assertNull(emptyConfig.getBatchSize()); + + HttpLoadQueuePeonConfig config = jsonMapper.readValue( + "{\"batchSize\":2}", + HttpLoadQueuePeonConfig.class + ); + Assert.assertEquals(2, config.getBatchSize().intValue()); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java index 1dcf384d2e0d..e511ce77ee2c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java @@ -37,6 +37,8 @@ import org.apache.druid.server.coordinator.config.HttpLoadQueuePeonConfig; import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; +import org.apache.druid.server.http.SegmentLoadingCapabilities; +import org.apache.druid.server.http.SegmentLoadingMode; import org.apache.druid.timeline.DataSegment; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; @@ -81,13 +83,21 @@ public void setUp() MAPPER, httpClient, new HttpLoadQueuePeonConfig(null, null, 10), + () -> SegmentLoadingMode.NORMAL, new WrappingScheduledExecutorService( "HttpLoadQueuePeonTest-%s", httpClient.processingExecutor, true ), httpClient.callbackExecutor - ); + ) + { + @Override + SegmentLoadingCapabilities fetchSegmentLoadingCapabilities() + { + return new SegmentLoadingCapabilities(1, 3); + } + }; httpLoadQueuePeon.start(); } @@ -316,6 +326,37 @@ public void testLoadRateIsChangedWhenLoadSucceeds() throws InterruptedException ); } + @Test + public void testBatchSize() + { + Assert.assertEquals(10, httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL)); + + // Without a batch size runtime parameter + httpLoadQueuePeon = new HttpLoadQueuePeon( + "http://dummy:4000", + MAPPER, + httpClient, + new HttpLoadQueuePeonConfig(null, null, null), + () -> SegmentLoadingMode.NORMAL, + new WrappingScheduledExecutorService( + "HttpLoadQueuePeonTest-%s", + httpClient.processingExecutor, + true + ), + httpClient.callbackExecutor + ) + { + @Override + SegmentLoadingCapabilities fetchSegmentLoadingCapabilities() + { + return new SegmentLoadingCapabilities(1, 3); + } + }; + + Assert.assertEquals(1, httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.NORMAL)); + Assert.assertEquals(3, httpLoadQueuePeon.calculateBatchSize(SegmentLoadingMode.TURBO)); + } + private LoadPeonCallback markSegmentProcessed(DataSegment segment) { return success -> httpClient.processedSegments.add(segment); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index ddc360aa0124..938b6b09b1a0 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -454,19 +454,22 @@ private Environment( createBalancerStrategy(balancerStrategy), new HttpLoadQueuePeonConfig(null, null, null) ); + + JacksonConfigManager jacksonConfigManager = mockConfigManager(); + setDynamicConfig(dynamicConfig); + this.loadQueueTaskMaster = new LoadQueueTaskMaster( OBJECT_MAPPER, executorFactory.create(1, ExecutorFactory.LOAD_QUEUE_EXECUTOR), executorFactory.create(1, ExecutorFactory.LOAD_CALLBACK_EXECUTOR), coordinatorConfig.getHttpLoadQueuePeonConfig(), - httpClient + httpClient, + () -> dynamicConfig ); + this.loadQueueManager = new SegmentLoadQueueManager(coordinatorInventoryView, loadQueueTaskMaster); - JacksonConfigManager jacksonConfigManager = mockConfigManager(); - setDynamicConfig(dynamicConfig); - this.lookupCoordinatorManager = EasyMock.createNiceMock(LookupCoordinatorManager.class); mocks.add(jacksonConfigManager); mocks.add(lookupCoordinatorManager); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java index 5caa90d8dfdb..877265d85b4e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.HttpResponseHandler; @@ -32,6 +33,7 @@ import org.apache.druid.server.coordination.DataSegmentChangeRequest; import org.apache.druid.server.coordination.DataSegmentChangeResponse; import org.apache.druid.server.coordination.SegmentChangeStatus; +import org.apache.druid.server.http.SegmentLoadingCapabilities; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -79,12 +81,16 @@ public ListenableFuture go( } @Override + @SuppressWarnings("unchecked") public ListenableFuture go( Request request, HttpResponseHandler handler, Duration readTimeout ) { + if (request.getUrl().toString().contains("/loadCapabilities")) { + return getCapabilities(handler); + } return executorService.submit(() -> processRequest(request, handler)); } @@ -143,6 +149,27 @@ private List processRequest( .collect(Collectors.toList()); } + private ListenableFuture getCapabilities(HttpResponseHandler handler) + { + try { + // Set response content and status + final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.setContent(ChannelBuffers.EMPTY_BUFFER); + handler.handleResponse(response, NOOP_TRAFFIC_COP); + + // Serialize + SettableFuture future = SettableFuture.create(); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + objectMapper.writeValue(baos, new SegmentLoadingCapabilities(1, 1)); + future.set(new ByteArrayInputStream(baos.toByteArray())); + } + return future; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + /** * Processes each DataSegmentChangeRequest using the handler. */ diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java index eb7fb1992873..1be987ddc278 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java @@ -51,7 +51,8 @@ public void testSerde() throws Exception + " \"maxSegmentsInNodeLoadingQueue\": 1,\n" + " \"decommissioningNodes\": [\"host1\", \"host2\"],\n" + " \"pauseCoordination\": false,\n" - + " \"replicateAfterLoadTimeout\": false\n" + + " \"replicateAfterLoadTimeout\": false,\n" + + " \"turboLoadingNodes\":[\"host1\", \"host3\"]\n" + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( @@ -65,6 +66,7 @@ public void testSerde() throws Exception ); ImmutableSet decommissioning = ImmutableSet.of("host1", "host2"); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); + ImmutableSet turboLoadingNodes = ImmutableSet.of("host1", "host3"); assertConfig( actual, 1, @@ -78,7 +80,8 @@ public void testSerde() throws Exception 1, decommissioning, false, - false + false, + turboLoadingNodes ); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); @@ -95,7 +98,8 @@ public void testSerde() throws Exception 1, ImmutableSet.of("host1"), false, - false + false, + turboLoadingNodes ); actual = CoordinatorDynamicConfig.builder().build(actual); @@ -112,7 +116,8 @@ public void testSerde() throws Exception 1, ImmutableSet.of("host1"), false, - false + false, + turboLoadingNodes ); actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual); @@ -129,7 +134,8 @@ public void testSerde() throws Exception 1, ImmutableSet.of("host1"), true, - false + false, + turboLoadingNodes ); actual = CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual); @@ -146,7 +152,8 @@ public void testSerde() throws Exception 1, ImmutableSet.of("host1"), true, - true + true, + turboLoadingNodes ); actual = CoordinatorDynamicConfig.builder().build(actual); @@ -163,7 +170,8 @@ public void testSerde() throws Exception 1, ImmutableSet.of("host1"), true, - true + true, + turboLoadingNodes ); actual = CoordinatorDynamicConfig.builder().withKillTaskSlotRatio(0.1).build(actual); @@ -180,7 +188,8 @@ public void testSerde() throws Exception 1, ImmutableSet.of("host1"), true, - true + true, + turboLoadingNodes ); actual = CoordinatorDynamicConfig.builder().withMaxKillTaskSlots(5).build(actual); @@ -197,7 +206,8 @@ public void testSerde() throws Exception 1, ImmutableSet.of("host1"), true, - true + true, + turboLoadingNodes ); } @@ -233,7 +243,8 @@ public void testConstructorWithNullsShouldKillUnusedSegmentsInAllDataSources() true, false, false, - null + null, + ImmutableSet.of("host1") ); Assert.assertTrue(config.getSpecificDataSourcesToKillUnusedSegmentsIn().isEmpty()); } @@ -257,7 +268,8 @@ public void testConstructorWithSpecificDataSourcesToKillShouldNotKillUnusedSegme true, false, false, - null + null, + ImmutableSet.of("host1") ); Assert.assertEquals(ImmutableSet.of("test1"), config.getSpecificDataSourcesToKillUnusedSegmentsIn()); } @@ -272,7 +284,8 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio + " \"replicationThrottleLimit\": 1,\n" + " \"balancerComputeThreads\": 2, \n" + " \"killDataSourceWhitelist\": [\"test1\",\"test2\"],\n" - + " \"maxSegmentsInNodeLoadingQueue\": 1\n" + + " \"maxSegmentsInNodeLoadingQueue\": 1,\n" + + " \"turboLoadingNodes\": [\"host3\",\"host4\"]\n" + "}\n"; CoordinatorDynamicConfig actual = mapper.readValue( @@ -286,6 +299,7 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio ); ImmutableSet decommissioning = ImmutableSet.of(); ImmutableSet whitelist = ImmutableSet.of("test1", "test2"); + ImmutableSet turboLoading = ImmutableSet.of("host3", "host4"); assertConfig( actual, 1, @@ -299,7 +313,8 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio 1, decommissioning, false, - false + false, + turboLoading ); actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual); @@ -316,7 +331,8 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio 1, ImmutableSet.of("host1"), false, - false + false, + turboLoading ); actual = CoordinatorDynamicConfig.builder().build(actual); @@ -333,7 +349,8 @@ public void testDecommissioningParametersBackwardCompatibility() throws Exceptio 1, ImmutableSet.of("host1"), false, - false + false, + turboLoading ); } @@ -372,7 +389,8 @@ public void testSerdeWithStringInKillDataSourceWhitelist() throws Exception 1, ImmutableSet.of(), false, - false + false, + ImmutableSet.of() ); } @@ -411,7 +429,8 @@ public void testHandleMissingPercentOfSegmentsToConsiderPerMove() throws Excepti 1, decommissioning, false, - false + false, + ImmutableSet.of() ); } @@ -446,7 +465,8 @@ public void testDeserializeWithoutMaxSegmentsInNodeLoadingQueue() throws Excepti EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, ImmutableSet.of(), false, - false + false, + ImmutableSet.of() ); } @@ -468,7 +488,8 @@ public void testBuilderDefaults() EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, emptyList, false, - false + false, + ImmutableSet.of() ); } @@ -493,7 +514,8 @@ public void testBuilderWithDefaultSpecificDataSourcesToKillUnusedSegmentsInSpeci EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE, ImmutableSet.of(), false, - false + false, + ImmutableSet.of() ); } @@ -511,6 +533,18 @@ public void testUpdate() ); } + @Test + public void testTurboLoadingNodes() + { + CoordinatorDynamicConfig config = CoordinatorDynamicConfig + .builder() + .withTurboLoadingNodes(ImmutableSet.of("localhost:8083")) + .build(); + + Assert.assertEquals(SegmentLoadingMode.NORMAL, config.getLoadingModeForServer("localhost:8082")); + Assert.assertEquals(SegmentLoadingMode.TURBO, config.getLoadingModeForServer("localhost:8083")); + } + @Test public void testEqualsAndHashCode() { @@ -533,7 +567,8 @@ private void assertConfig( int expectedMaxSegmentsInNodeLoadingQueue, Set decommissioningNodes, boolean pauseCoordination, - boolean replicateAfterLoadTimeout + boolean replicateAfterLoadTimeout, + Set turboLoadingNodes ) { Assert.assertEquals( @@ -554,6 +589,7 @@ private void assertConfig( Assert.assertEquals(decommissioningNodes, config.getDecommissioningNodes()); Assert.assertEquals(pauseCoordination, config.getPauseCoordination()); Assert.assertEquals(replicateAfterLoadTimeout, config.getReplicateAfterLoadTimeout()); + Assert.assertEquals(turboLoadingNodes, config.getTurboLoadingNodes()); } private static int getDefaultNumBalancerThreads() diff --git a/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java b/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java new file mode 100644 index 000000000000..0819439a0aa9 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/http/SegmentLoadingCapabilitiesTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.http; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +public class SegmentLoadingCapabilitiesTest +{ + private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + + @Test + public void testSerde() throws Exception + { + SegmentLoadingCapabilities capabilities = new SegmentLoadingCapabilities(1, 4); + + SegmentLoadingCapabilities reread = jsonMapper.readValue(jsonMapper.writeValueAsString(capabilities), SegmentLoadingCapabilities.class); + + Assert.assertEquals(capabilities.getNumLoadingThreads(), reread.getNumLoadingThreads()); + Assert.assertEquals(capabilities.getNumTurboLoadingThreads(), reread.getNumTurboLoadingThreads()); + } + + @Test + public void testSerdeFromJson() throws JsonProcessingException + { + String json = "{\"numLoadingThreads\":3,\"numTurboLoadingThreads\":5}"; + SegmentLoadingCapabilities reread = jsonMapper.readValue(json, SegmentLoadingCapabilities.class); + + Assert.assertEquals(3, reread.getNumLoadingThreads()); + Assert.assertEquals(5, reread.getNumTurboLoadingThreads()); + } +} diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index b1fc5f634047..1f61d9716eba 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -306,7 +306,8 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster( ScheduledExecutorFactory factory, DruidCoordinatorConfig config, @EscalatedGlobal HttpClient httpClient, - Lifecycle lifecycle + Lifecycle lifecycle, + CoordinatorConfigManager coordinatorConfigManager ) { final ExecutorService callBackExec = Execs.singleThreaded("LoadQueuePeon-callbackexec--%d"); @@ -316,7 +317,8 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster( factory.create(1, "Master-PeonExec--%d"), callBackExec, config.getHttpLoadQueuePeonConfig(), - httpClient + httpClient, + coordinatorConfigManager::getCurrentDynamicConfig ); } }