From 1b5b4591574a883dfbe4f97cd4e746bb965ee25b Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 24 Mar 2025 22:52:35 +0530 Subject: [PATCH 1/2] Add metric and simulation test for turbo loading mode --- .../BatchDataSegmentAnnouncer.java | 19 +----- .../coordination/SegmentChangeStatus.java | 67 +++++++++++++++++-- .../coordination/SegmentLoadDropHandler.java | 33 +++++---- .../config/HttpLoadQueuePeonConfig.java | 13 ++-- .../loading/HttpLoadQueuePeon.java | 42 +++++++----- .../http/SegmentLoadingCapabilities.java | 4 +- .../SegmentBootstrapperCacheTest.java | 4 +- .../SegmentLoadDropHandlerTest.java | 16 ++--- .../config/HttpLoadQueuePeonConfigTest.java | 21 +++--- .../loading/HttpLoadQueuePeonTest.java | 2 +- .../CoordinatorSimulationBaseTest.java | 1 + .../simulate/SegmentLoadingTest.java | 41 ++++++++++++ .../TestSegmentLoadingHttpClient.java | 21 +++++- 13 files changed, 201 insertions(+), 83 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java index d95dee729af0..7fff124ac816 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -348,27 +348,12 @@ public ListenableFuture> getSeg synchronized (lock) { Iterable segments = Iterables.transform( segmentLookup.keySet(), - new Function<>() - { - @Nullable - @Override - public SegmentChangeRequestLoad apply(DataSegment input) - { - return new SegmentChangeRequestLoad(input); - } - } + SegmentChangeRequestLoad::new ); Iterable sinkSchema = Iterables.transform( taskSinkSchema.values(), - new Function<>() - { - @Override - public SegmentSchemasChangeRequest apply(SegmentSchemas input) - { - return new SegmentSchemasChangeRequest(input); - } - } + SegmentSchemasChangeRequest::new ); Iterable changeRequestIterables = Iterables.concat(segments, sinkSchema); SettableFuture> future = SettableFuture.create(); diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeStatus.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeStatus.java index c19d1e7914a9..2e63c687060b 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeStatus.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentChangeStatus.java @@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.server.http.SegmentLoadingMode; import javax.annotation.Nullable; +import java.util.Objects; /** * Contains {@link State} of a {@link DataSegmentChangeRequest} and failure @@ -39,23 +41,51 @@ public enum State private final State state; @Nullable private final String failureCause; + private final SegmentLoadingMode loadingMode; - public static final SegmentChangeStatus SUCCESS = new SegmentChangeStatus(State.SUCCESS, null); - public static final SegmentChangeStatus PENDING = new SegmentChangeStatus(State.PENDING, null); + private static final SegmentChangeStatus SUCCESS = new SegmentChangeStatus(State.SUCCESS, null, null); + private static final SegmentChangeStatus PENDING = new SegmentChangeStatus(State.PENDING, null, null); + + public static SegmentChangeStatus success() + { + return SUCCESS; + } + + public static SegmentChangeStatus success(SegmentLoadingMode loadingMode) + { + return new SegmentChangeStatus(State.SUCCESS, null, loadingMode); + } + + public static SegmentChangeStatus pending() + { + return PENDING; + } + + public static SegmentChangeStatus pending(SegmentLoadingMode loadingMode) + { + return new SegmentChangeStatus(State.PENDING, null, loadingMode); + } + + public static SegmentChangeStatus failed(String cause, SegmentLoadingMode loadingMode) + { + return new SegmentChangeStatus(State.FAILED, cause, loadingMode); + } public static SegmentChangeStatus failed(String cause) { - return new SegmentChangeStatus(State.FAILED, cause); + return new SegmentChangeStatus(State.FAILED, cause, null); } @JsonCreator private SegmentChangeStatus( @JsonProperty("state") State state, - @JsonProperty("failureCause") @Nullable String failureCause + @JsonProperty("failureCause") @Nullable String failureCause, + @JsonProperty("loadingMode") @Nullable SegmentLoadingMode loadingMode ) { this.state = Preconditions.checkNotNull(state, "state must be non-null"); this.failureCause = failureCause; + this.loadingMode = loadingMode; } @JsonProperty @@ -71,12 +101,41 @@ public String getFailureCause() return failureCause; } + @Nullable + @JsonProperty + public SegmentLoadingMode getLoadingMode() + { + return loadingMode; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SegmentChangeStatus that = (SegmentChangeStatus) o; + return state == that.state + && Objects.equals(failureCause, that.failureCause) + && loadingMode == that.loadingMode; + } + + @Override + public int hashCode() + { + return Objects.hash(state, failureCause, loadingMode); + } + @Override public String toString() { return "SegmentChangeStatus{" + "state=" + state + ", failureCause='" + failureCause + '\'' + + ", loadingMode=" + loadingMode + '}'; } } diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 4d3e22c1a131..875fad0a8237 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -20,6 +20,7 @@ package org.apache.druid.server.coordination; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableList; @@ -57,7 +58,7 @@ * Responsible for loading and dropping of segments by a process that can serve segments. */ @ManageLifecycle -public class SegmentLoadDropHandler implements DataSegmentChangeHandler +public class SegmentLoadDropHandler { private static final EmittingLogger log = new EmittingLogger(SegmentLoadDropHandler.class); @@ -125,6 +126,7 @@ public SegmentLoadDropHandler( this.normalLoadExec = normalLoadExec; this.turboLoadExec = turboLoadExec; + // Allow core threads to time out to save resources when not in turbo mode this.turboLoadExec.allowCoreThreadTimeOut(true); this.segmentsToDelete = new ConcurrentSkipListSet<>(); @@ -141,12 +143,15 @@ public Map getRowCountDistributionPerDataso return segmentManager.getRowCountDistribution(); } - @Override - public void addSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) + public void addSegment( + DataSegment segment, + @Nullable DataSegmentChangeCallback callback, + SegmentLoadingMode loadingMode + ) { SegmentChangeStatus result = null; try { - log.info("Loading segment[%s]", segment.getId()); + log.info("Loading segment[%s] in mode[%s]", segment.getId(), loadingMode); /* The lock below is used to prevent a race condition when the scheduled runnable in removeSegment() starts, and if (segmentsToDelete.remove(segment)) returns true, in which case historical will start deleting segment @@ -179,13 +184,14 @@ each time when addSegment() is called, it has to wait for the lock in order to m throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getId()); } - result = SegmentChangeStatus.SUCCESS; + result = SegmentChangeStatus.success(loadingMode); } catch (Throwable e) { log.makeAlert(e, "Failed to load segment") .addData("segment", segment) .emit(); - result = SegmentChangeStatus.failed(e.toString()); + Throwable rootCause = Throwables.getRootCause(e); + result = SegmentChangeStatus.failed(rootCause.toString(), loadingMode); } finally { updateRequestStatus(new SegmentChangeRequestLoad(segment), result); @@ -195,7 +201,6 @@ each time when addSegment() is called, it has to wait for the lock in order to m } } - @Override public void removeSegment(DataSegment segment, @Nullable DataSegmentChangeCallback callback) { removeSegment(segment, callback, true); @@ -242,7 +247,7 @@ void removeSegment( runnable.run(); } - result = SegmentChangeStatus.SUCCESS; + result = SegmentChangeStatus.success(); } catch (Exception e) { log.makeAlert(e, "Failed to remove segment") @@ -322,11 +327,13 @@ public void addSegment( @Nullable DataSegmentChangeCallback callback ) { - requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.PENDING)); - getExecutorService(segmentLoadingMode).submit( + final SegmentChangeStatus pendingStatus = SegmentChangeStatus.pending(segmentLoadingMode); + requestStatuses.put(changeRequest, new AtomicReference<>(pendingStatus)); + getLoadingExecutor(segmentLoadingMode).submit( () -> SegmentLoadDropHandler.this.addSegment( ((SegmentChangeRequestLoad) changeRequest).getSegment(), - () -> resolveWaitingFutures() + () -> resolveWaitingFutures(), + segmentLoadingMode ) ); } @@ -337,7 +344,7 @@ public void removeSegment( @Nullable DataSegmentChangeCallback callback ) { - requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.PENDING)); + requestStatuses.put(changeRequest, new AtomicReference<>(SegmentChangeStatus.pending())); SegmentLoadDropHandler.this.removeSegment( ((SegmentChangeRequestDrop) changeRequest).getSegment(), () -> resolveWaitingFutures(), @@ -428,7 +435,7 @@ public boolean cancel(boolean interruptIfRunning) } } - private ExecutorService getExecutorService(SegmentLoadingMode loadingMode) + private ExecutorService getLoadingExecutor(SegmentLoadingMode loadingMode) { return loadingMode == SegmentLoadingMode.TURBO ? turboLoadExec : normalLoadExec; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java index 42af2e948cf8..8c7cd7defb62 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfig.java @@ -22,7 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.common.config.Configs; -import org.apache.druid.java.util.common.RE; +import org.apache.druid.error.InvalidInput; import org.joda.time.Duration; import javax.annotation.Nullable; @@ -50,12 +50,13 @@ public HttpLoadQueuePeonConfig( { this.hostTimeout = Configs.valueOrDefault(hostTimeout, Duration.standardMinutes(5)); this.repeatDelay = Configs.valueOrDefault(repeatDelay, Duration.standardMinutes(1)); - - if (batchSize != null && batchSize < 1) { - throw new RE("Batch size must be greater than 0."); - } - this.batchSize = batchSize; + + InvalidInput.conditionalException( + batchSize == null || batchSize >= 1, + "'druid.coordinator.loadqueuepeon.http.batchSize'[%s] must be greater than 0", + batchSize + ); } @Nullable diff --git a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java index 4f5ff04911ed..38eeb42413e7 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java @@ -210,7 +210,7 @@ private void doSegmentManagement() final SegmentHolder holder = queuedSegmentIterator.next(); final DataSegment segment = holder.getSegment(); if (holder.hasRequestTimedOut()) { - onRequestFailed(holder, "timed out"); + onRequestFailed(holder, SegmentChangeStatus.failed("timed out")); queuedSegmentIterator.remove(); if (holder.isLoad()) { segmentsToLoad.remove(segment); @@ -407,9 +407,9 @@ private void updateSuccessOrFailureInHolder(SegmentHolder holder, SegmentChangeS queuedSegments.remove(holder); activeRequestSegments.remove(holder.getSegment()); if (status.getState() == SegmentChangeStatus.State.FAILED) { - onRequestFailed(holder, status.getFailureCause()); + onRequestFailed(holder, status); } else { - onRequestCompleted(holder, RequestStatus.SUCCESS); + onRequestCompleted(holder, RequestStatus.SUCCESS, status); } } }, null @@ -452,7 +452,13 @@ public void stop() stopped = true; if (!queuedSegments.isEmpty()) { - queuedSegments.forEach(holder -> onRequestCompleted(holder, RequestStatus.CANCELLED)); + queuedSegments.forEach( + holder -> onRequestCompleted( + holder, + RequestStatus.CANCELLED, + SegmentChangeStatus.failed("cancelled") + ) + ); } segmentsToDrop.clear(); @@ -487,13 +493,12 @@ public void loadSegment(DataSegment segment, SegmentAction action, LoadPeonCallb SegmentHolder holder = segmentsToLoad.get(segment); if (holder == null) { - log.trace("Server[%s] to load segment[%s] queued.", serverId, segment.getId()); queuedSize.addAndGet(segment.getSize()); holder = new SegmentHolder(segment, action, config.getLoadTimeout(), callback); segmentsToLoad.put(segment, holder); queuedSegments.add(holder); processingExecutor.execute(this::doSegmentManagement); - incrementStat(holder, RequestStatus.ASSIGNED); + incrementStat(holder, RequestStatus.ASSIGNED, null); } else { holder.addCallback(callback); } @@ -522,7 +527,7 @@ public void dropSegment(DataSegment segment, LoadPeonCallback callback) segmentsToDrop.put(segment, holder); queuedSegments.add(holder); processingExecutor.execute(this::doSegmentManagement); - incrementStat(holder, RequestStatus.ASSIGNED); + incrementStat(holder, RequestStatus.ASSIGNED, null); } else { holder.addCallback(callback); } @@ -593,16 +598,16 @@ public Set getSegmentsMarkedToDrop() return Collections.unmodifiableSet(segmentsMarkedToDrop); } - private void onRequestFailed(SegmentHolder holder, String failureCause) + private void onRequestFailed(SegmentHolder holder, SegmentChangeStatus status) { log.error( "Server[%s] failed segment[%s] request[%s] with cause [%s].", - serverId, holder.getSegment().getId(), holder.getAction(), failureCause + serverId, holder.getSegment().getId(), holder.getAction(), status.getFailureCause() ); - onRequestCompleted(holder, RequestStatus.FAILED); + onRequestCompleted(holder, RequestStatus.FAILED, status); } - private void onRequestCompleted(SegmentHolder holder, RequestStatus status) + private void onRequestCompleted(SegmentHolder holder, RequestStatus status, SegmentChangeStatus changeStatus) { final SegmentAction action = holder.getAction(); log.trace( @@ -613,14 +618,19 @@ private void onRequestCompleted(SegmentHolder holder, RequestStatus status) if (holder.isLoad()) { queuedSize.addAndGet(-holder.getSegment().getSize()); } - incrementStat(holder, status); + incrementStat(holder, status, changeStatus); executeCallbacks(holder, status == RequestStatus.SUCCESS); } - private void incrementStat(SegmentHolder holder, RequestStatus status) + private void incrementStat(SegmentHolder holder, RequestStatus status, SegmentChangeStatus changeStatus) { + String description = holder.getAction().name(); + if (changeStatus != null && changeStatus.getLoadingMode() != null) { + description += ": " + changeStatus.getLoadingMode().name(); + } + RowKey rowKey = RowKey.with(Dimension.DATASOURCE, holder.getSegment().getDataSource()) - .and(Dimension.DESCRIPTION, holder.getAction().name()); + .and(Dimension.DESCRIPTION, description); stats.get().add(status.datasourceStat, rowKey, 1); } @@ -634,7 +644,7 @@ private void executeCallbacks(SegmentHolder holder, boolean success) } /** - * Tries to cancel a load/drop operation. An load/drop request can be cancelled + * Tries to cancel a load/drop operation. A load/drop request can be cancelled * only if it has not already been sent to the corresponding server. */ @Override @@ -654,7 +664,7 @@ public boolean cancelOperation(DataSegment segment) } queuedSegments.remove(holder); - onRequestCompleted(holder, RequestStatus.CANCELLED); + onRequestCompleted(holder, RequestStatus.CANCELLED, SegmentChangeStatus.failed("cancelled")); return true; } } diff --git a/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java index 9dba8af5e6bb..d74101482fab 100644 --- a/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java +++ b/server/src/main/java/org/apache/druid/server/http/SegmentLoadingCapabilities.java @@ -41,13 +41,13 @@ public SegmentLoadingCapabilities( this.numTurboLoadingThreads = numTurboLoadingThreads; } - @JsonProperty("numLoadingThreads") + @JsonProperty public int getNumLoadingThreads() { return numLoadingThreads; } - @JsonProperty("numTurboLoadingThreads") + @JsonProperty public int getNumTurboLoadingThreads() { return numTurboLoadingThreads; diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java index 187725317a21..17e862a50a9c 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentBootstrapperCacheTest.java @@ -219,12 +219,12 @@ public void testLoadLocalCache() throws IOException, SegmentLoadingException // Make sure adding segments beyond allowed size fails DataSegment newSegment = TestSegmentUtils.makeSegment("test", "new-segment", SEGMENT_SIZE); - loadDropHandler.addSegment(newSegment, null); + loadDropHandler.addSegment(newSegment, null, null); Assert.assertFalse(segmentAnnouncer.getObservedSegments().contains(newSegment)); // Clearing some segment should allow for new segments loadDropHandler.removeSegment(expectedSegments.get(0), null, false); - loadDropHandler.addSegment(newSegment, null); + loadDropHandler.addSegment(newSegment, null, null); Assert.assertTrue(segmentAnnouncer.getObservedSegments().contains(newSegment)); bootstrapper.stop(); diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index 7a8822a60d87..99a2a1f7ad46 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -144,7 +144,7 @@ public void testSegmentLoading1() Assert.assertFalse(segmentAnnouncer.getObservedSegments().contains(segment)); - handler.addSegment(segment, DataSegmentChangeCallback.NOOP); + handler.addSegment(segment, DataSegmentChangeCallback.NOOP, null); // Make sure the scheduled runnable that "deletes" segment files has been executed. // Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in @@ -186,7 +186,7 @@ public void testSegmentLoading2() final DataSegment segment = makeSegment("test", "1", Intervals.of("P1d/2011-04-01")); - handler.addSegment(segment, DataSegmentChangeCallback.NOOP); + handler.addSegment(segment, DataSegmentChangeCallback.NOOP, null); Assert.assertTrue(segmentAnnouncer.getObservedSegments().contains(segment)); @@ -194,7 +194,7 @@ public void testSegmentLoading2() Assert.assertFalse(segmentAnnouncer.getObservedSegments().contains(segment)); - handler.addSegment(segment, DataSegmentChangeCallback.NOOP); + handler.addSegment(segment, DataSegmentChangeCallback.NOOP, null); // Make sure the scheduled runnable that "deletes" segment files has been executed. // Because another addSegment() call is executed, which removes the segment from segmentsToDelete field in @@ -232,11 +232,11 @@ public void testProcessBatch() throws Exception new SegmentChangeRequestDrop(segment2) ); - ListenableFuture> future = handler.processBatch(batch, SegmentLoadingMode.NORMAL); + ListenableFuture> future = handler.processBatch(batch, SegmentLoadingMode.TURBO); Map expectedStatusMap = new HashMap<>(); - expectedStatusMap.put(batch.get(0), SegmentChangeStatus.PENDING); - expectedStatusMap.put(batch.get(1), SegmentChangeStatus.SUCCESS); + expectedStatusMap.put(batch.get(0), SegmentChangeStatus.pending(SegmentLoadingMode.TURBO)); + expectedStatusMap.put(batch.get(1), SegmentChangeStatus.success()); List result = future.get(); for (DataSegmentChangeResponse requestAndStatus : result) { Assert.assertEquals(expectedStatusMap.get(requestAndStatus.getRequest()), requestAndStatus.getStatus()); @@ -247,7 +247,7 @@ public void testProcessBatch() throws Exception } result = handler.processBatch(ImmutableList.of(new SegmentChangeRequestLoad(segment1)), SegmentLoadingMode.TURBO).get(); - Assert.assertEquals(SegmentChangeStatus.SUCCESS, result.get(0).getStatus()); + Assert.assertEquals(SegmentChangeStatus.success(SegmentLoadingMode.TURBO), result.get(0).getStatus()); Assert.assertEquals(ImmutableList.of(segment1), segmentAnnouncer.getObservedSegments()); @@ -287,7 +287,7 @@ public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequ runnable.run(); } result = future.get(); - Assert.assertEquals(State.SUCCESS, result.get(0).getStatus().getState()); + Assert.assertEquals(SegmentChangeStatus.success(SegmentLoadingMode.NORMAL), result.get(0).getStatus()); Assert.assertEquals(ImmutableList.of(segment1, segment1), segmentAnnouncer.getObservedSegments()); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java index 4d9a2400215c..6c9338beb486 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java @@ -19,31 +19,30 @@ package org.apache.druid.server.coordinator.config; +import com.amazonaws.util.Throwables; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.exc.ValueInstantiationException; -import org.hamcrest.CoreMatchers; +import org.apache.druid.error.DruidExceptionMatcher; import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Test; -import org.junit.internal.matchers.ThrowableMessageMatcher; public class HttpLoadQueuePeonConfigTest { @Test public void testValidateBatchSize() throws JsonProcessingException { - ObjectMapper jsonMapper = new ObjectMapper(); + final ObjectMapper jsonMapper = new ObjectMapper(); MatcherAssert.assertThat( - Assert.assertThrows(ValueInstantiationException.class, () -> - jsonMapper.readValue("{\"batchSize\":0}", HttpLoadQueuePeonConfig.class) - ), - CoreMatchers.allOf( - CoreMatchers.instanceOf(ValueInstantiationException.class), - ThrowableMessageMatcher.hasMessage( - CoreMatchers.containsString("Batch size must be greater than 0.") + Throwables.getRootCause( + Assert.assertThrows(ValueInstantiationException.class, () -> + jsonMapper.readValue("{\"batchSize\":0}", HttpLoadQueuePeonConfig.class) ) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "'druid.coordinator.loadqueuepeon.http.batchSize'[0] must be greater than 0" ) ); @@ -57,6 +56,6 @@ public void testValidateBatchSize() throws JsonProcessingException "{\"batchSize\":2}", HttpLoadQueuePeonConfig.class ); - Assert.assertEquals(2, config.getBatchSize().intValue()); + Assert.assertEquals(Integer.valueOf(2), config.getBatchSize()); } } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java index e511ce77ee2c..fba98bff3aec 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java @@ -398,7 +398,7 @@ public ListenableFuture go( for (DataSegmentChangeRequest cr : changeRequests) { cr.go(this, null); statuses.add( - new DataSegmentChangeResponse(cr, SegmentChangeStatus.SUCCESS) + new DataSegmentChangeResponse(cr, SegmentChangeStatus.success()) ); } return (ListenableFuture) Futures.immediateFuture( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java index 1fd3d54dc456..b9db5921ee36 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java @@ -217,6 +217,7 @@ static class Metric static final String DELETED_COUNT = "segment/deleted/count"; static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count"; static final String DROP_QUEUE_COUNT = "segment/dropQueue/count"; + static final String SUCCESS_ACTIONS = "segment/loadQueue/success"; static final String CANCELLED_ACTIONS = "segment/loadQueue/cancelled"; static final String OVERSHADOWED_COUNT = "segment/overshadowed/count"; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java index e2127f090d87..bfc28abc1cc8 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java @@ -28,6 +28,8 @@ import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; /** * Coordinator simulation test to verify behaviour of segment loading. @@ -542,6 +544,45 @@ public void testSegmentsAreDroppedFromFullServersFirst() Assert.assertEquals(historicalT11.getCurrSize(), historicalT12.getCurrSize()); } + @Test + public void testSegmentLoadingModes() + { + CoordinatorDynamicConfig config = + CoordinatorDynamicConfig.builder() + .withTurboLoadingNodes(Set.of(historicalT11.getName())) + .build(); + + final CoordinatorSimulation sim = + CoordinatorSimulation.builder() + .withServers(historicalT11, historicalT12) + .withDynamicConfig(config) + .withRules(datasource, Load.on(Tier.T1, 1).forever()) + .withSegments(Segments.WIKI_10X1D) + .build(); + + startSimulation(sim); + + // Run 1: Assign and load all segments + runCoordinatorCycle(); + loadQueuedSegments(); + verifyValue(Metric.ASSIGNED_COUNT, 10L); + Assert.assertEquals(5, historicalT11.getTotalSegments()); + Assert.assertEquals(5, historicalT12.getTotalSegments()); + + // Run 2: Emit success metrics + runCoordinatorCycle(); + verifyValue( + Metric.SUCCESS_ACTIONS, + Map.of("server", historicalT11.getName(), "description", "LOAD: TURBO"), + 5L + ); + verifyValue( + Metric.SUCCESS_ACTIONS, + Map.of("server", historicalT12.getName(), "description", "LOAD: NORMAL"), + 5L + ); + } + private int getNumLoadedSegments(DruidServer... servers) { int numLoaded = 0; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java index 877265d85b4e..ae911c4d8ae1 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java @@ -34,6 +34,7 @@ import org.apache.druid.server.coordination.DataSegmentChangeResponse; import org.apache.druid.server.coordination.SegmentChangeStatus; import org.apache.druid.server.http.SegmentLoadingCapabilities; +import org.apache.druid.server.http.SegmentLoadingMode; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -81,7 +82,6 @@ public ListenableFuture go( } @Override - @SuppressWarnings("unchecked") public ListenableFuture go( Request request, HttpResponseHandler handler, @@ -143,9 +143,11 @@ private List processRequest( new TypeReference<>() {} ); + final SegmentLoadingMode loadingMode = getLoadingMode(request); + return changeRequests .stream() - .map(changeRequest -> processRequest(changeRequest, changeHandler)) + .map(changeRequest -> processRequest(changeRequest, loadingMode, changeHandler)) .collect(Collectors.toList()); } @@ -175,13 +177,14 @@ private ListenableFuture getCapabilities(HttpRespon */ private DataSegmentChangeResponse processRequest( DataSegmentChangeRequest request, + SegmentLoadingMode loadingMode, DataSegmentChangeHandler handler ) { SegmentChangeStatus status; try { request.go(handler, NOOP_CALLBACK); - status = SegmentChangeStatus.SUCCESS; + status = SegmentChangeStatus.success(loadingMode); } catch (Exception e) { status = SegmentChangeStatus.failed(e.getMessage()); @@ -189,4 +192,16 @@ private DataSegmentChangeResponse processRequest( return new DataSegmentChangeResponse(request, status); } + + private static SegmentLoadingMode getLoadingMode(Request request) + { + String url = request.getUrl().toString(); + String[] splits = url.split("loadingMode="); + + if (splits.length > 1) { + return SegmentLoadingMode.valueOf(splits[1]); + } else { + return SegmentLoadingMode.NORMAL; + } + } } From 4387b48a6f3825b553740c27fdf302185a6b1ecc Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 24 Mar 2025 23:45:05 +0530 Subject: [PATCH 2/2] Fix up URL and imports --- .../org/apache/druid/server/http/SegmentListerResource.java | 2 +- .../server/coordinator/config/HttpLoadQueuePeonConfigTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java index 48e037271995..25e823828222 100644 --- a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java +++ b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java @@ -62,7 +62,7 @@ /** * Endpoints exposed here are to be used only for druid internal management of segments by Coordinators, Brokers etc. */ -@Path("/druid-internal/v1/segments/") +@Path("/druid-internal/v1/segments") @ResourceFilters(StateResourceFilter.class) public class SegmentListerResource { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java index 6c9338beb486..4f4528414b20 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/config/HttpLoadQueuePeonConfigTest.java @@ -19,10 +19,10 @@ package org.apache.druid.server.coordinator.config; -import com.amazonaws.util.Throwables; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.exc.ValueInstantiationException; +import com.google.common.base.Throwables; import org.apache.druid.error.DruidExceptionMatcher; import org.hamcrest.MatcherAssert; import org.junit.Assert;