diff --git a/processing/src/main/java/org/apache/druid/common/guava/FutureUtils.java b/processing/src/main/java/org/apache/druid/common/guava/FutureUtils.java index cf9a7f0ef71f..792071d4cf83 100644 --- a/processing/src/main/java/org/apache/druid/common/guava/FutureUtils.java +++ b/processing/src/main/java/org/apache/druid/common/guava/FutureUtils.java @@ -32,6 +32,8 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.function.Consumer; import java.util.function.Function; public class FutureUtils @@ -205,4 +207,34 @@ public void onFailure(Throwable e) return retVal; } + + /** + * Adds success and failure callbacks to the given future. + */ + public static void addCallback( + ListenableFuture future, + Executor executor, + Consumer onSuccess, + Consumer onFailure + ) + { + Futures.addCallback( + future, + new FutureCallback() + { + @Override + public void onSuccess(@Nullable V result) + { + onSuccess.accept(result); + } + + @Override + public void onFailure(Throwable t) + { + onFailure.accept(t); + } + }, + executor + ); + } } diff --git a/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java b/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java index 2d941828a0ee..2a4f68b4aeb5 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/Stopwatch.java @@ -55,19 +55,22 @@ private Stopwatch(com.google.common.base.Stopwatch delegate) this.delegate = delegate; } - public synchronized void start() + public synchronized Stopwatch start() { delegate.start(); + return this; } - public synchronized void stop() + public synchronized Stopwatch stop() { delegate.stop(); + return this; } - public synchronized void reset() + public synchronized Stopwatch reset() { delegate.reset(); + return this; } /** diff --git a/processing/src/test/java/org/apache/druid/common/guava/FutureUtilsTest.java b/processing/src/test/java/org/apache/druid/common/guava/FutureUtilsTest.java index 416c97e1840f..51ee5b578108 100644 --- a/processing/src/test/java/org/apache/druid/common/guava/FutureUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/common/guava/FutureUtilsTest.java @@ -334,4 +334,61 @@ public void test_futureWithBaggage_failure() MatcherAssert.assertThat(e.getCause(), ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("error!"))); Assert.assertEquals(1, baggageHandled.get()); } + + @Test + public void test_addCallback_success() + { + final SettableFuture future = SettableFuture.create(); + + final AtomicLong observedValue = new AtomicLong(0); + final AtomicReference observedError = new AtomicReference<>(); + FutureUtils.addCallback( + future, + Execs.directExecutor(), + observedValue::set, + observedError::set + ); + + future.set(101L); + Assert.assertEquals(101L, observedValue.get()); + Assert.assertNull(observedError.get()); + } + + @Test + public void test_addCallback_failure() + { + final SettableFuture future = SettableFuture.create(); + + final AtomicReference observedValue = new AtomicReference<>(); + final AtomicReference observedError = new AtomicReference<>(); + FutureUtils.addCallback( + future, + Execs.directExecutor(), + observedValue::set, + observedError::set + ); + + future.setException(new ISE("an error occurred")); + Assert.assertNull(observedValue.get()); + Assert.assertTrue(observedError.get() instanceof ISE); + } + + @Test + public void test_addCallback_cancelled() + { + final SettableFuture future = SettableFuture.create(); + + final AtomicReference observedValue = new AtomicReference<>(); + final AtomicReference observedError = new AtomicReference<>(); + FutureUtils.addCallback( + future, + Execs.directExecutor(), + observedValue::set, + observedError::set + ); + + future.cancel(true); + Assert.assertNull(observedValue.get()); + Assert.assertTrue(observedError.get() instanceof CancellationException); + } } diff --git a/processing/src/test/java/org/apache/druid/java/util/common/StopwatchTest.java b/processing/src/test/java/org/apache/druid/java/util/common/StopwatchTest.java index 06bed222e009..fba12633a00f 100644 --- a/processing/src/test/java/org/apache/druid/java/util/common/StopwatchTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/common/StopwatchTest.java @@ -67,4 +67,22 @@ public void testHasElapsed() Assert.assertTrue(stopwatch.hasNotElapsed(Duration.millis(101))); Assert.assertTrue(stopwatch.hasNotElapsed(Duration.millis(500))); } + + @Test + public void testChainedMethods() + { + FakeTicker fakeTicker = new FakeTicker(); + Stopwatch stopwatch = Stopwatch.createStarted(fakeTicker); + + fakeTicker.advance(100, TimeUnit.MILLISECONDS); + Assert.assertEquals(100, stopwatch.millisElapsed()); + + stopwatch.stop().start(); + Assert.assertTrue(stopwatch.isRunning()); + Assert.assertEquals(100, stopwatch.millisElapsed()); + + stopwatch.stop().reset(); + Assert.assertFalse(stopwatch.isRunning()); + Assert.assertEquals(0, stopwatch.millisElapsed()); + } } diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java index dac5dfc2263c..65394411a9f7 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClient.java @@ -23,8 +23,11 @@ import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.rpc.ServiceRetryPolicy; +import org.joda.time.Interval; +import java.util.List; import java.util.Map; import java.util.Set; @@ -55,5 +58,13 @@ public interface OverlordClient ListenableFuture taskPayload(String taskId); + ListenableFuture> allActiveTasks(); + + ListenableFuture totalWorkerCapacity(); + + ListenableFuture totalWorkerCapacityWithAutoScale(); + + ListenableFuture>> lockedIntervals(Map datasourceToMinTaskPriority); + OverlordClient withRetryPolicy(ServiceRetryPolicy retryPolicy); } diff --git a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java index 5c28d87a8d11..1cbc815fedb2 100644 --- a/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java +++ b/server/src/main/java/org/apache/druid/rpc/indexing/OverlordClientImpl.java @@ -22,11 +22,15 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo; +import org.apache.druid.client.indexing.IndexingWorkerInfo; import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.client.indexing.TaskStatusResponse; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; @@ -36,10 +40,14 @@ import org.apache.druid.rpc.ServiceClient; import org.apache.druid.rpc.ServiceRetryPolicy; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.Interval; import java.io.IOException; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Production implementation of {@link OverlordClient}. @@ -148,12 +156,82 @@ public ListenableFuture taskPayload(String taskId) ); } + @Override + public ListenableFuture> allActiveTasks() + { + return FutureUtils.transform( + Futures.allAsList( + getTasksOfType("waitingTasks"), + getTasksOfType("pendingTasks"), + getTasksOfType("runningTasks") + ), + listOfList -> listOfList.stream().flatMap(Collection::stream).collect(Collectors.toList()) + ); + } + + @Override + public ListenableFuture totalWorkerCapacity() + { + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/workers"), + new BytesFullResponseHandler() + ), + holder -> + deserialize(holder, new TypeReference>() {}) + .stream() + .mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()) + .sum() + ); + } + + @Override + public ListenableFuture totalWorkerCapacityWithAutoScale() + { + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.GET, "/druid/indexer/v1/totalWorkerCapacity"), + new BytesFullResponseHandler() + ), + holder -> + deserialize(holder, new TypeReference() {}) + .getMaximumCapacityWithAutoScale() + ); + } + + @Override + public ListenableFuture>> lockedIntervals( + Map datasourceToMinTaskPriority + ) + { + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals") + .jsonContent(jsonMapper, datasourceToMinTaskPriority), + new BytesFullResponseHandler() + ), + holder -> deserialize(holder, new TypeReference>>() {}) + ); + } + @Override public OverlordClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy) { return new OverlordClientImpl(client.withRetryPolicy(retryPolicy), jsonMapper); } + private ListenableFuture> getTasksOfType(String type) + { + final String path = StringUtils.format("/druid/indexer/v1/%s", StringUtils.urlEncode(type)); + return FutureUtils.transform( + client.asyncRequest( + new RequestBuilder(HttpMethod.GET, path), + new BytesFullResponseHandler() + ), + holder -> deserialize(holder, new TypeReference>() {}) + ); + } + private T deserialize(final BytesFullResponseHolder bytesHolder, final Class clazz) { try { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index c9d334968ef2..c40ccdfce9f0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -20,7 +20,6 @@ package org.apache.druid.server.coordinator; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; @@ -38,7 +37,6 @@ import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.ServerInventoryView; import org.apache.druid.client.coordinator.Coordinator; -import org.apache.druid.client.indexing.IndexingServiceClient; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.curator.discovery.ServiceAnnouncer; import org.apache.druid.discovery.DruidLeaderSelector; @@ -47,6 +45,7 @@ import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; @@ -58,6 +57,7 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.metadata.SegmentsMetadataManager; +import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordinator.balancer.BalancerStrategy; import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory; @@ -99,7 +99,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -140,7 +139,7 @@ public class DruidCoordinator private final MetadataRuleManager metadataRuleManager; private final ServiceEmitter emitter; - private final IndexingServiceClient indexingServiceClient; + private final OverlordClient overlordClient; private final ScheduledExecutorService exec; private final LoadQueueTaskMaster taskMaster; private final ConcurrentHashMap loadManagementPeons = new ConcurrentHashMap<>(); @@ -187,7 +186,7 @@ public DruidCoordinator( MetadataRuleManager metadataRuleManager, ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, - IndexingServiceClient indexingServiceClient, + OverlordClient overlordClient, LoadQueueTaskMaster taskMaster, SegmentLoadQueueManager loadQueueManager, ServiceAnnouncer serviceAnnouncer, @@ -208,7 +207,7 @@ public DruidCoordinator( this.serverInventoryView = serverInventoryView; this.metadataRuleManager = metadataRuleManager; this.emitter = emitter; - this.indexingServiceClient = indexingServiceClient; + this.overlordClient = overlordClient; this.taskMaster = taskMaster; this.serviceAnnouncer = serviceAnnouncer; this.self = self; @@ -461,7 +460,7 @@ private void becomeLeader() config.getCoordinatorPeriod() ) ); - if (indexingServiceClient != null) { + if (overlordClient != null) { dutiesRunnables.add( new DutiesRunnable( makeIndexingServiceDuties(), @@ -619,7 +618,7 @@ CompactSegments initializeCompactSegmentsDuty(CompactionSegmentSearchPolicy comp { List compactSegmentsDutyFromCustomGroups = getCompactSegmentsDutyFromCustomGroups(); if (compactSegmentsDutyFromCustomGroups.isEmpty()) { - return new CompactSegments(config, compactionSegmentSearchPolicy, indexingServiceClient); + return new CompactSegments(config, compactionSegmentSearchPolicy, overlordClient); } else { if (compactSegmentsDutyFromCustomGroups.size() > 1) { log.warn( @@ -728,7 +727,7 @@ public void run() && coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { - dutyRunTime.reset().start(); + dutyRunTime.restart(); params = duty.run(params); dutyRunTime.stop(); @@ -738,7 +737,7 @@ public void run() return; } else { final RowKey rowKey = RowKey.of(Dimension.DUTY, dutyName); - final long dutyRunMillis = dutyRunTime.elapsed(TimeUnit.MILLISECONDS); + final long dutyRunMillis = dutyRunTime.millisElapsed(); params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey, dutyRunMillis); } } @@ -764,7 +763,7 @@ public void run() } // Emit the runtime of the full DutiesRunnable - final long runMillis = groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS); + final long runMillis = groupRunTime.stop().millisElapsed(); emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(), runMillis); log.info("Finished coordinator run for group [%s] in [%d] ms", dutyGroupName, runMillis); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 0381a70ba249..1b12f0c879af 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -22,20 +22,31 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; +import org.apache.druid.client.indexing.ClientCompactionIOConfig; +import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec; import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; -import org.apache.druid.client.indexing.IndexingServiceClient; +import org.apache.druid.client.indexing.ClientTaskQuery; import org.apache.druid.client.indexing.TaskPayloadResponse; +import org.apache.druid.common.config.Configs; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.common.utils.IdUtils; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; -import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.CompactionStatistics; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; @@ -67,11 +78,11 @@ public class CompactSegments implements CoordinatorCustomDuty /** Must be the same as org.apache.druid.indexing.common.task.Tasks.STORE_COMPACTION_STATE_KEY */ public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState"; - private static final Logger LOG = new Logger(CompactSegments.class); + private static final EmittingLogger LOG = new EmittingLogger(CompactSegments.class); private final CompactionSegmentSearchPolicy policy; private final boolean skipLockedIntervals; - private final IndexingServiceClient indexingServiceClient; + private final OverlordClient overlordClient; // This variable is updated by the Coordinator thread executing duties and // read by HTTP threads processing Coordinator API calls. @@ -82,11 +93,11 @@ public class CompactSegments implements CoordinatorCustomDuty public CompactSegments( @JacksonInject DruidCoordinatorConfig config, @JacksonInject CompactionSegmentSearchPolicy policy, - @JacksonInject IndexingServiceClient indexingServiceClient + @JacksonInject OverlordClient overlordClient ) { this.policy = policy; - this.indexingServiceClient = indexingServiceClient; + this.overlordClient = overlordClient; this.skipLockedIntervals = config.getCompactionSkipLockedIntervals(); resetCompactionSnapshot(); @@ -128,9 +139,11 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // Fetch currently running compaction tasks int busyCompactionTaskSlots = 0; - final List compactionTasks = filterNonCompactionTasks(indexingServiceClient.getActiveTasks()); + final List compactionTasks = filterNonCompactionTasks( + get(overlordClient.allActiveTasks()) + ); for (TaskStatusPlus status : compactionTasks) { - final TaskPayloadResponse response = indexingServiceClient.getTaskPayload(status.getId()); + final TaskPayloadResponse response = get(overlordClient.taskPayload(status.getId())); if (response == null) { throw new ISE("Could not find payload for active compaction task[%s]", status.getId()); } else if (!COMPACTION_TASK_TYPE.equals(response.getPayload().getType())) { @@ -225,7 +238,10 @@ private boolean cancelTaskIfGranularityChanged( "Cancelling task [%s] as task segmentGranularity is [%s] but compaction config segmentGranularity is [%s]", compactionTaskQuery.getId(), taskSegmentGranularity, configuredSegmentGranularity ); - indexingServiceClient.cancelTask(compactionTaskQuery.getId()); + + // Cancel task synchronously as it is a rare operation and doing it async + // may break assumptions in the subsequent flow of submitting fresh compact tasks + get(overlordClient.cancelTask(compactionTaskQuery.getId())); return true; } @@ -249,24 +265,21 @@ private Map> getLockedIntervalsToSkip( { if (!skipLockedIntervals) { LOG.info("Not skipping any locked interval for Compaction"); - return new HashMap<>(); + return Collections.emptyMap(); } - final Map minTaskPriority = compactionConfigs - .stream() - .collect( - Collectors.toMap( - DataSourceCompactionConfig::getDataSource, - DataSourceCompactionConfig::getTaskPriority - ) - ); - final Map> datasourceToLockedIntervals = - new HashMap<>(indexingServiceClient.getLockedIntervals(minTaskPriority)); - LOG.debug( - "Skipping the following intervals for Compaction as they are currently locked: %s", - datasourceToLockedIntervals + Map datasourceToMinTaskPriority = compactionConfigs.stream().collect( + Collectors.toMap( + DataSourceCompactionConfig::getDataSource, + DataSourceCompactionConfig::getTaskPriority + ) + ); + final Map> datasourceToLockedIntervals = Configs.valueOrDefault( + get(overlordClient.lockedIntervals(datasourceToMinTaskPriority)), + Collections.emptyMap() ); + LOG.debug("Skipping intervals[%s] for compaction as they are locked.", datasourceToLockedIntervals); return datasourceToLockedIntervals; } @@ -328,12 +341,12 @@ private int getCompactionTaskCapacity(CoordinatorCompactionConfig dynamicConfig) int totalWorkerCapacity; try { totalWorkerCapacity = dynamicConfig.isUseAutoScaleSlots() - ? indexingServiceClient.getTotalWorkerCapacityWithAutoScale() - : indexingServiceClient.getTotalWorkerCapacity(); + ? get(overlordClient.totalWorkerCapacityWithAutoScale()) + : get(overlordClient.totalWorkerCapacity()); } catch (Exception e) { LOG.warn("Failed to get total worker capacity with auto scale slots. Falling back to current capacity count"); - totalWorkerCapacity = indexingServiceClient.getTotalWorkerCapacity(); + totalWorkerCapacity = get(overlordClient.totalWorkerCapacity()); } return Math.min( @@ -476,8 +489,7 @@ private int submitCompactionTasks( } } - final String taskId = indexingServiceClient.compactSegments( - "coordinator-issued", + final String taskId = submitCompactionTaskAsync( segmentsToCompact, config.getTaskPriority(), ClientCompactionTaskQueryTuningConfig.from( @@ -493,7 +505,7 @@ private int submitCompactionTasks( newAutoCompactionContext(config.getTaskContext()) ); - LOG.info("Submitted a compactionTask[%s] for [%d] segments", taskId, segmentsToCompact.size()); + LOG.info("Sent request to submit a compactionTask[%s] for [%d] segments", taskId, segmentsToCompact.size()); LOG.infoSegments(segmentsToCompact, "Compacting segments"); // Count the compaction task itself + its sub tasks numSubmittedTasks++; @@ -503,6 +515,63 @@ private int submitCompactionTasks( return numSubmittedTasks; } + private String submitCompactionTaskAsync( + List segments, + int compactionTaskPriority, + @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig, + @Nullable ClientCompactionTaskGranularitySpec granularitySpec, + @Nullable ClientCompactionTaskDimensionsSpec dimensionsSpec, + @Nullable AggregatorFactory[] metricsSpec, + @Nullable ClientCompactionTaskTransformSpec transformSpec, + @Nullable Boolean dropExisting, + @Nullable Map context + ) + { + Preconditions.checkArgument(!segments.isEmpty(), "Expect non-empty segments to compact"); + + final String dataSource = segments.get(0).getDataSource(); + Preconditions.checkArgument( + segments.stream().allMatch(segment -> segment.getDataSource().equals(dataSource)), + "Segments must belong to the same DataSource" + ); + + context = Configs.valueOrDefault(context, new HashMap<>()); + context.put("priority", compactionTaskPriority); + + final String taskId = IdUtils.newTaskId("coordinator-issued", COMPACTION_TASK_TYPE, dataSource, null); + final Granularity segmentGranularity = granularitySpec == null ? null : granularitySpec.getSegmentGranularity(); + ClientCompactionIOConfig ioConfig = new ClientCompactionIOConfig( + ClientCompactionIntervalSpec.fromSegments(segments, segmentGranularity), + dropExisting + ); + + final ClientTaskQuery taskQuery = new ClientCompactionTaskQuery( + taskId, + dataSource, + ioConfig, + tuningConfig, + granularitySpec, + dimensionsSpec, + metricsSpec, + transformSpec, + context + ); + + ListenableFuture taskSubmitFuture = overlordClient.runTask(taskId, taskQuery); + final Stopwatch timeToSubmit = Stopwatch.createStarted(); + FutureUtils.addCallback( + taskSubmitFuture, + Execs.directExecutor(), + v -> LOG.debug( + "Submitted compaction task[%s] to overlord in [%d]ms", + taskId, timeToSubmit.stop().millisElapsed() + ), + t -> LOG.noStackTrace().makeAlert(t, "Error while submitting compaction task [%s]", taskId) + ); + + return taskId; + } + private Map newAutoCompactionContext(@Nullable Map configuredContext) { final Map newContext = configuredContext == null @@ -627,4 +696,9 @@ public Map getAutoCompactionSnapshot() { return autoCompactionSnapshotPerDataSource.get(); } + + private T get(ListenableFuture future) + { + return FutureUtils.getUnchecked(future, true); + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java index a0a656f38477..133b31c01332 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java @@ -575,7 +575,7 @@ private SegmentsToCompact findSegmentsToCompact( throw new ISE("No segment is found?"); } } - log.info("All segments look good! Nothing to compact"); + log.info("All segments in datasource[%s] look good. Nothing to compact.", dataSourceName); return new SegmentsToCompact(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java index 28f5c91049fb..c808032ffb28 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java @@ -93,7 +93,7 @@ public static class Tier public static class Compaction { public static final CoordinatorStat SUBMITTED_TASKS - = CoordinatorStat.toDebugAndEmit("compactTasks", "compact/task/count"); + = CoordinatorStat.toLogAndEmit("compactTasks", "compact/task/count", CoordinatorStat.Level.INFO); public static final CoordinatorStat MAX_SLOTS = CoordinatorStat.toDebugAndEmit("compactMaxSlots", "compactTask/maxSlot/count"); public static final CoordinatorStat AVAILABLE_SLOTS diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java index dbfa1ec10473..14e57f656c92 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java +++ b/server/src/test/java/org/apache/druid/client/indexing/NoopOverlordClient.java @@ -21,9 +21,12 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.rpc.ServiceRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; +import org.joda.time.Interval; +import java.util.List; import java.util.Map; import java.util.Set; @@ -65,6 +68,30 @@ public ListenableFuture taskPayload(String taskId) throw new UnsupportedOperationException(); } + @Override + public ListenableFuture> allActiveTasks() + { + throw new UnsupportedOperationException(); + } + + @Override + public ListenableFuture>> lockedIntervals(Map datasourceToMinTaskPriority) + { + throw new UnsupportedOperationException(); + } + + @Override + public ListenableFuture totalWorkerCapacityWithAutoScale() + { + throw new UnsupportedOperationException(); + } + + @Override + public ListenableFuture totalWorkerCapacity() + { + throw new UnsupportedOperationException(); + } + @Override public OverlordClient withRetryPolicy(ServiceRetryPolicy retryPolicy) { diff --git a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java index 26fb972ac5cd..d5ac4f2b4320 100644 --- a/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java +++ b/server/src/test/java/org/apache/druid/rpc/indexing/OverlordClientImplTest.java @@ -37,7 +37,6 @@ public class OverlordClientImplTest { - @Test public void testTaskPayload() throws ExecutionException, InterruptedException, JsonProcessingException { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 7487d332d1f2..a1ba601dee7c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -27,29 +27,20 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import junitparams.converters.Nullable; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.indexing.ClientCompactionIOConfig; import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; -import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec; import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; -import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec; -import org.apache.druid.client.indexing.ClientTaskQuery; -import org.apache.druid.client.indexing.HttpIndexingServiceClient; -import org.apache.druid.client.indexing.IndexingServiceClient; -import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo; -import org.apache.druid.client.indexing.IndexingWorker; -import org.apache.druid.client.indexing.IndexingWorkerInfo; +import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.discovery.DruidLeaderClient; -import org.apache.druid.discovery.DruidNodeDiscovery; -import org.apache.druid.discovery.DruidNodeDiscoveryProvider; -import org.apache.druid.discovery.NodeRole; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; @@ -64,14 +55,14 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; +import org.apache.druid.segment.indexing.BatchIOConfig; import org.apache.druid.segment.transform.TransformSpec; -import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.CoordinatorCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -88,19 +79,12 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.SingleDimensionShardSpec; import org.apache.druid.utils.Streams; -import org.easymock.EasyMock; -import org.jboss.netty.handler.codec.http.DefaultHttpResponse; -import org.jboss.netty.handler.codec.http.HttpMethod; -import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.jboss.netty.handler.codec.http.HttpVersion; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; @@ -109,13 +93,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -import java.io.IOException; -import java.net.URL; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -124,7 +103,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; -import java.util.function.BooleanSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -180,7 +158,9 @@ public static Collection constructorFeeder() private final BiFunction shardSpecFactory; private DataSourcesSnapshot dataSources; - Map> datasourceToSegments = new HashMap<>(); + private TestOverlordClient overlordClient; + private CompactSegments compactSegments; + private final Map> datasourceToSegments = new HashMap<>(); public CompactSegmentsTest(PartitionsSpec partitionsSpec, BiFunction shardSpecFactory) { @@ -208,6 +188,9 @@ public void setup() } dataSources = DataSourcesSnapshot.fromUsedSegments(allSegments, ImmutableMap.of()); Mockito.when(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals()).thenReturn(true); + + overlordClient = new TestOverlordClient(); + compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient); } private DataSegment createSegment(String dataSource, int startDay, boolean beforeNoon, int partition) @@ -244,17 +227,13 @@ private DataSegment createSegment(String dataSource, int startDay, boolean befor @Test public void testSerde() throws Exception { - final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); - final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); - JSON_MAPPER.setInjectableValues( new InjectableValues.Std() .addValue(DruidCoordinatorConfig.class, COORDINATOR_CONFIG) - .addValue(IndexingServiceClient.class, indexingServiceClient) + .addValue(OverlordClient.class, overlordClient) .addValue(CompactionSegmentSearchPolicy.class, SEARCH_POLICY) ); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient); String compactSegmentString = JSON_MAPPER.writeValueAsString(compactSegments); CompactSegments serdeCompactSegments = JSON_MAPPER.readValue(compactSegmentString, CompactSegments.class); @@ -264,22 +243,9 @@ public void testSerde() throws Exception @Test public void testRun() { - final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); - leaderClient.start(); - final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient); - - final Supplier expectedVersionSupplier = new Supplier() - { - private int i = 0; - - @Override - public String get() - { - return "newVersion_" + i++; - } - }; - int expectedCompactTaskCount = 1; + final AtomicInteger versionNumber = new AtomicInteger(0); + final Supplier expectedVersionSupplier = () -> "newVersion_" + versionNumber.getAndIncrement(); + final int expectedCompactTaskCount = 1; int expectedRemainingSegments = 400; // compact for 2017-01-08T12:00:00.000Z/2017-01-09T12:00:00.000Z @@ -342,11 +308,6 @@ public String get() @Test public void testMakeStats() { - final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); - leaderClient.start(); - final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient); - // Before any compaction, we do not have any snapshot of compactions Map autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot(); Assert.assertEquals(0, autoCompactionSnapshots.size()); @@ -438,11 +399,6 @@ public void testMakeStatsForDataSourceWithCompactedIntervalBetweenNonCompactedIn dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of()); - final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); - leaderClient.start(); - final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient); - // Before any compaction, we do not have any snapshot of compactions Map autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot(); Assert.assertEquals(0, autoCompactionSnapshots.size()); @@ -502,11 +458,6 @@ public void testMakeStatsForDataSourceWithCompactedIntervalBetweenNonCompactedIn @Test public void testMakeStatsWithDeactivatedDatasource() { - final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); - leaderClient.start(); - final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient); - // Before any compaction, we do not have any snapshot of compactions Map autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot(); Assert.assertEquals(0, autoCompactionSnapshots.size()); @@ -596,11 +547,6 @@ public void testMakeStatsForDataSourceWithSkipped() dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of()); - final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); - leaderClient.start(); - final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient); - // Before any compaction, we do not have any snapshot of compactions Map autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot(); Assert.assertEquals(0, autoCompactionSnapshots.size()); @@ -657,11 +603,6 @@ public void testMakeStatsForDataSourceWithSkipped() @Test public void testRunMultipleCompactionTaskSlots() { - final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); - leaderClient.start(); - final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient); - final CoordinatorRunStats stats = doCompactSegments(compactSegments, 3); Assert.assertEquals(3, stats.get(Stats.Compaction.AVAILABLE_SLOTS)); Assert.assertEquals(3, stats.get(Stats.Compaction.MAX_SLOTS)); @@ -671,12 +612,8 @@ public void testRunMultipleCompactionTaskSlots() @Test public void testRunMultipleCompactionTaskSlotsWithUseAutoScaleSlotsOverMaxSlot() { - int maxCompactionSlot = 3; + final int maxCompactionSlot = 3; Assert.assertTrue(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE); - final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); - leaderClient.start(); - final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient); final CoordinatorRunStats stats = doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true); Assert.assertEquals(maxCompactionSlot, stats.get(Stats.Compaction.AVAILABLE_SLOTS)); Assert.assertEquals(maxCompactionSlot, stats.get(Stats.Compaction.MAX_SLOTS)); @@ -688,10 +625,6 @@ public void testRunMultipleCompactionTaskSlotsWithUseAutoScaleSlotsUnderMaxSlot( { int maxCompactionSlot = 100; Assert.assertFalse(maxCompactionSlot < MAXIMUM_CAPACITY_WITH_AUTO_SCALE); - final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); - leaderClient.start(); - final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient); final CoordinatorRunStats stats = doCompactSegments(compactSegments, createCompactionConfigs(), maxCompactionSlot, true); Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.get(Stats.Compaction.AVAILABLE_SLOTS)); Assert.assertEquals(MAXIMUM_CAPACITY_WITH_AUTO_SCALE, stats.get(Stats.Compaction.MAX_SLOTS)); @@ -701,8 +634,6 @@ public void testRunMultipleCompactionTaskSlotsWithUseAutoScaleSlotsUnderMaxSlot( @Test public void testCompactWithoutGranularitySpec() { - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); final List compactionConfigs = new ArrayList<>(); final String dataSource = DATA_SOURCE_PREFIX + 0; compactionConfigs.add( @@ -742,33 +673,23 @@ public void testCompactWithoutGranularitySpec() ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor> segmentsCaptor = ArgumentCaptor.forClass(List.class); - ArgumentCaptor granularitySpecArgumentCaptor = ArgumentCaptor.forClass( - ClientCompactionTaskGranularitySpec.class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - segmentsCaptor.capture(), - ArgumentMatchers.anyInt(), - ArgumentMatchers.any(), - granularitySpecArgumentCaptor.capture(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + + // Only the same amount of segments as the original PARTITION_PER_TIME_INTERVAL since segment granulartity is unchanged + Assert.assertEquals( + Intervals.of("2017-01-09T12:00:00.000Z/2017-01-10T00:00:00.000Z"), + compactionTask.getIoConfig().getInputSpec().getInterval() ); - // Only the same amount of segments as the original PARTITION_PER_TIME_INTERVAL since segment granulartity is the same - Assert.assertEquals(PARTITION_PER_TIME_INTERVAL, segmentsCaptor.getValue().size()); - Assert.assertNull(granularitySpecArgumentCaptor.getValue().getSegmentGranularity()); - Assert.assertNull(granularitySpecArgumentCaptor.getValue().getQueryGranularity()); - Assert.assertNull(granularitySpecArgumentCaptor.getValue().isRollup()); + + Assert.assertNull(compactionTask.getGranularitySpec().getSegmentGranularity()); + Assert.assertNull(compactionTask.getGranularitySpec().getQueryGranularity()); + Assert.assertNull(compactionTask.getGranularitySpec().isRollup()); } @Test public void testCompactWithNotNullIOConfig() { - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); final List compactionConfigs = new ArrayList<>(); final String dataSource = DATA_SOURCE_PREFIX + 0; compactionConfigs.add( @@ -808,27 +729,14 @@ public void testCompactWithNotNullIOConfig() ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor dropExistingCapture = ArgumentCaptor.forClass(Boolean.class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.anyInt(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - dropExistingCapture.capture(), - ArgumentMatchers.any() - ); - Assert.assertEquals(true, dropExistingCapture.getValue()); + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + Assert.assertTrue(compactionTask.getIoConfig().isDropExisting()); } @Test public void testCompactWithNullIOConfig() { - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); final List compactionConfigs = new ArrayList<>(); final String dataSource = DATA_SOURCE_PREFIX + 0; compactionConfigs.add( @@ -868,27 +776,14 @@ public void testCompactWithNullIOConfig() ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor dropExistingCapture = ArgumentCaptor.forClass(Boolean.class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.anyInt(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - dropExistingCapture.capture(), - ArgumentMatchers.any() - ); - Assert.assertNull(dropExistingCapture.getValue()); + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + Assert.assertEquals(BatchIOConfig.DEFAULT_DROP_EXISTING, compactionTask.getIoConfig().isDropExisting()); } @Test public void testCompactWithGranularitySpec() { - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); final List compactionConfigs = new ArrayList<>(); final String dataSource = DATA_SOURCE_PREFIX + 0; compactionConfigs.add( @@ -928,35 +823,18 @@ public void testCompactWithGranularitySpec() ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor> segmentsCaptor = ArgumentCaptor.forClass(List.class); - ArgumentCaptor granularitySpecArgumentCaptor = ArgumentCaptor.forClass( - ClientCompactionTaskGranularitySpec.class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - segmentsCaptor.capture(), - ArgumentMatchers.anyInt(), - ArgumentMatchers.any(), - granularitySpecArgumentCaptor.capture(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment // are within the same year - Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size()); - ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue(); - Assert.assertNotNull(actual); + Assert.assertEquals(Intervals.of("2017/2018"), compactionTask.getIoConfig().getInputSpec().getInterval()); ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null); - Assert.assertEquals(expected, actual); + Assert.assertEquals(expected, compactionTask.getGranularitySpec()); } @Test public void testCompactWithDimensionSpec() { - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); final List compactionConfigs = new ArrayList<>(); final String dataSource = DATA_SOURCE_PREFIX + 0; compactionConfigs.add( @@ -996,30 +874,17 @@ public void testCompactWithDimensionSpec() ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor dimensionsSpecArgumentCaptor = ArgumentCaptor.forClass( - ClientCompactionTaskDimensionsSpec.class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.anyInt(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - dimensionsSpecArgumentCaptor.capture(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + Assert.assertEquals( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), + compactionTask.getDimensionsSpec().getDimensions() ); - ClientCompactionTaskDimensionsSpec actual = dimensionsSpecArgumentCaptor.getValue(); - Assert.assertNotNull(actual); - Assert.assertEquals(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), actual.getDimensions()); } @Test public void testCompactWithoutDimensionSpec() { - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); final List compactionConfigs = new ArrayList<>(); final String dataSource = DATA_SOURCE_PREFIX + 0; compactionConfigs.add( @@ -1059,29 +924,15 @@ public void testCompactWithoutDimensionSpec() ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor dimensionsSpecArgumentCaptor = ArgumentCaptor.forClass( - ClientCompactionTaskDimensionsSpec.class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.anyInt(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - dimensionsSpecArgumentCaptor.capture(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); - ClientCompactionTaskDimensionsSpec actual = dimensionsSpecArgumentCaptor.getValue(); - Assert.assertNull(actual); + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + + Assert.assertNull(compactionTask.getDimensionsSpec()); } @Test public void testCompactWithRollupInGranularitySpec() { - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); final List compactionConfigs = new ArrayList<>(); final String dataSource = DATA_SOURCE_PREFIX + 0; compactionConfigs.add( @@ -1121,26 +972,11 @@ public void testCompactWithRollupInGranularitySpec() ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor> segmentsCaptor = ArgumentCaptor.forClass(List.class); - ArgumentCaptor granularitySpecArgumentCaptor = ArgumentCaptor.forClass( - ClientCompactionTaskGranularitySpec.class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - segmentsCaptor.capture(), - ArgumentMatchers.anyInt(), - ArgumentMatchers.any(), - granularitySpecArgumentCaptor.capture(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); - Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size()); - ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue(); - Assert.assertNotNull(actual); + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + Assert.assertEquals(Intervals.of("2017/2018"), compactionTask.getIoConfig().getInputSpec().getInterval()); ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, true); - Assert.assertEquals(expected, actual); + Assert.assertEquals(expected, compactionTask.getGranularitySpec()); } @Test @@ -1148,7 +984,6 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() { final String dataSource = DATA_SOURCE_PREFIX + 0; final String conflictTaskId = "taskIdDummy"; - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); TaskStatusPlus runningConflictCompactionTask = new TaskStatusPlus( conflictTaskId, "groupId", @@ -1182,10 +1017,10 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() null ) ); - Mockito.when(mockIndexingServiceClient.getActiveTasks()).thenReturn(ImmutableList.of(runningConflictCompactionTask)); - Mockito.when(mockIndexingServiceClient.getTaskPayload(ArgumentMatchers.eq(conflictTaskId))).thenReturn(runningConflictCompactionTaskPayload); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); + overlordClient.activeTasks.add(runningConflictCompactionTask); + overlordClient.taskIdToPayload.put(conflictTaskId, runningConflictCompactionTaskPayload); + final List compactionConfigs = new ArrayList<>(); compactionConfigs.add( new DataSourceCompactionConfig( @@ -1225,42 +1060,21 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() ); doCompactSegments(compactSegments, compactionConfigs); // Verify that conflict task was canceled - Mockito.verify(mockIndexingServiceClient).cancelTask(conflictTaskId); - // The active conflict task has interval of 2000/2099 - // Make sure that we do not skip interval of conflict task. - // Since we cancel the task and will have to compact those intervals with the new segmentGranulartity - ArgumentCaptor> segmentsCaptor = ArgumentCaptor.forClass(List.class); - ArgumentCaptor granularitySpecArgumentCaptor = ArgumentCaptor.forClass( - ClientCompactionTaskGranularitySpec.class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - segmentsCaptor.capture(), - ArgumentMatchers.anyInt(), - ArgumentMatchers.any(), - granularitySpecArgumentCaptor.capture(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); + Assert.assertTrue(overlordClient.cancelledTaskIds.contains(conflictTaskId)); + + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment // are within the same year - Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size()); - ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue(); - Assert.assertNotNull(actual); + Assert.assertEquals(Intervals.of("2017/2018"), compactionTask.getIoConfig().getInputSpec().getInterval()); ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null); - Assert.assertEquals(expected, actual); + Assert.assertEquals(expected, compactionTask.getGranularitySpec()); } @Test public void testRunParallelCompactionMultipleCompactionTaskSlots() { - final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); - leaderClient.start(); - final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient); - final CoordinatorRunStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4); Assert.assertEquals(4, stats.get(Stats.Compaction.AVAILABLE_SLOTS)); Assert.assertEquals(4, stats.get(Stats.Compaction.MAX_SLOTS)); @@ -1270,35 +1084,30 @@ public void testRunParallelCompactionMultipleCompactionTaskSlots() @Test public void testRunWithLockedIntervals() { - final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); - leaderClient.start(); - HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); - // Lock all intervals for dataSource_1 and dataSource_2 final String datasource1 = DATA_SOURCE_PREFIX + 1; - leaderClient.lockedIntervals + overlordClient.datasourceToLockedIntervals .computeIfAbsent(datasource1, k -> new ArrayList<>()) .add(Intervals.of("2017/2018")); final String datasource2 = DATA_SOURCE_PREFIX + 2; - leaderClient.lockedIntervals + overlordClient.datasourceToLockedIntervals .computeIfAbsent(datasource2, k -> new ArrayList<>()) .add(Intervals.of("2017/2018")); // Lock all intervals but one for dataSource_0 final String datasource0 = DATA_SOURCE_PREFIX + 0; - leaderClient.lockedIntervals + overlordClient.datasourceToLockedIntervals .computeIfAbsent(datasource0, k -> new ArrayList<>()) .add(Intervals.of("2017-01-01T13:00:00Z/2017-02-01")); // Verify that locked intervals are skipped and only one compaction task // is submitted for dataSource_0 - CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient); final CoordinatorRunStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4); Assert.assertEquals(1, stats.get(Stats.Compaction.SUBMITTED_TASKS)); - Assert.assertEquals(1, leaderClient.submittedCompactionTasks.size()); + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); - final ClientCompactionTaskQuery compactionTask = leaderClient.submittedCompactionTasks.get(0); + final ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); Assert.assertEquals(datasource0, compactionTask.getDataSource()); Assert.assertEquals( Intervals.of("2017-01-01T00:00:00/2017-01-01T12:00:00"), @@ -1310,8 +1119,7 @@ public void testRunWithLockedIntervals() public void testCompactWithTransformSpec() { NullHandling.initializeForTests(); - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); + final List compactionConfigs = new ArrayList<>(); final String dataSource = DATA_SOURCE_PREFIX + 0; compactionConfigs.add( @@ -1351,30 +1159,18 @@ public void testCompactWithTransformSpec() ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor transformSpecArgumentCaptor = ArgumentCaptor.forClass( - ClientCompactionTaskTransformSpec.class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.anyInt(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - transformSpecArgumentCaptor.capture(), - ArgumentMatchers.any(), - ArgumentMatchers.any() + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + + Assert.assertEquals( + new SelectorDimFilter("dim1", "foo", null), + compactionTask.getTransformSpec().getFilter() ); - ClientCompactionTaskTransformSpec actual = transformSpecArgumentCaptor.getValue(); - Assert.assertNotNull(actual); - Assert.assertEquals(new SelectorDimFilter("dim1", "foo", null), actual.getFilter()); } @Test public void testCompactWithoutCustomSpecs() { - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); final List compactionConfigs = new ArrayList<>(); final String dataSource = DATA_SOURCE_PREFIX + 0; compactionConfigs.add( @@ -1414,34 +1210,18 @@ public void testCompactWithoutCustomSpecs() ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor transformSpecArgumentCaptor = ArgumentCaptor.forClass( - ClientCompactionTaskTransformSpec.class); - ArgumentCaptor metricsSpecArgumentCaptor = ArgumentCaptor.forClass(AggregatorFactory[].class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.anyInt(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - metricsSpecArgumentCaptor.capture(), - transformSpecArgumentCaptor.capture(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); - ClientCompactionTaskTransformSpec actualTransformSpec = transformSpecArgumentCaptor.getValue(); - Assert.assertNull(actualTransformSpec); - AggregatorFactory[] actualMetricsSpec = metricsSpecArgumentCaptor.getValue(); - Assert.assertNull(actualMetricsSpec); + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + Assert.assertNull(compactionTask.getTransformSpec()); + Assert.assertNull(compactionTask.getMetricsSpec()); } @Test public void testCompactWithMetricsSpec() { NullHandling.initializeForTests(); - AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {new CountAggregatorFactory("cnt")}; - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); + final AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {new CountAggregatorFactory("cnt")}; + final List compactionConfigs = new ArrayList<>(); final String dataSource = DATA_SOURCE_PREFIX + 0; compactionConfigs.add( @@ -1481,22 +1261,11 @@ public void testCompactWithMetricsSpec() ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor metricsSpecArgumentCaptor = ArgumentCaptor.forClass(AggregatorFactory[].class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.anyInt(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - metricsSpecArgumentCaptor.capture(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); - AggregatorFactory[] actual = metricsSpecArgumentCaptor.getValue(); - Assert.assertNotNull(actual); - Assert.assertArrayEquals(aggregatorFactories, actual); + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + + Assert.assertNotNull(compactionTask.getMetricsSpec()); + Assert.assertArrayEquals(aggregatorFactories, compactionTask.getMetricsSpec()); } @Test @@ -1504,41 +1273,37 @@ public void testRunWithLockedIntervalsNoSkip() { Mockito.when(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals()).thenReturn(false); - final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER); - leaderClient.start(); - HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient); - // Lock all intervals for all the dataSources final String datasource0 = DATA_SOURCE_PREFIX + 0; - leaderClient.lockedIntervals + overlordClient.datasourceToLockedIntervals .computeIfAbsent(datasource0, k -> new ArrayList<>()) .add(Intervals.of("2017/2018")); final String datasource1 = DATA_SOURCE_PREFIX + 1; - leaderClient.lockedIntervals + overlordClient.datasourceToLockedIntervals .computeIfAbsent(datasource1, k -> new ArrayList<>()) .add(Intervals.of("2017/2018")); final String datasource2 = DATA_SOURCE_PREFIX + 2; - leaderClient.lockedIntervals + overlordClient.datasourceToLockedIntervals .computeIfAbsent(datasource2, k -> new ArrayList<>()) .add(Intervals.of("2017/2018")); // Verify that no locked intervals are skipped - CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, indexingServiceClient); + CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, overlordClient); int maxTaskSlots = partitionsSpec instanceof SingleDimensionPartitionsSpec ? 5 : 3; final CoordinatorRunStats stats = doCompactSegments(compactSegments, createCompactionConfigs(1), maxTaskSlots); Assert.assertEquals(3, stats.get(Stats.Compaction.SUBMITTED_TASKS)); - Assert.assertEquals(3, leaderClient.submittedCompactionTasks.size()); - leaderClient.submittedCompactionTasks.forEach(task -> { - System.out.println(task.getDataSource() + " : " + task.getIoConfig().getInputSpec().getInterval()); - }); + Assert.assertEquals(3, overlordClient.submittedCompactionTasks.size()); // Verify that tasks are submitted for the latest interval of each dataSource final Map datasourceToInterval = new HashMap<>(); - leaderClient.submittedCompactionTasks.forEach( + overlordClient.submittedCompactionTasks.forEach( task -> datasourceToInterval.put( - task.getDataSource(), task.getIoConfig().getInputSpec().getInterval())); + task.getDataSource(), + task.getIoConfig().getInputSpec().getInterval() + ) + ); Assert.assertEquals( Intervals.of("2017-01-09T00:00:00Z/2017-01-09T12:00:00Z"), datasourceToInterval.get(datasource0) @@ -1586,8 +1351,6 @@ public void testDetermineSegmentGranularityFromSegmentsToCompact() ); dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of()); - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); final List compactionConfigs = new ArrayList<>(); compactionConfigs.add( new DataSourceCompactionConfig( @@ -1626,26 +1389,17 @@ public void testDetermineSegmentGranularityFromSegmentsToCompact() ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor> segmentsCaptor = ArgumentCaptor.forClass(List.class); - ArgumentCaptor granularitySpecArgumentCaptor = ArgumentCaptor.forClass( - ClientCompactionTaskGranularitySpec.class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - segmentsCaptor.capture(), - ArgumentMatchers.anyInt(), - ArgumentMatchers.any(), - granularitySpecArgumentCaptor.capture(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + + Assert.assertEquals( + Intervals.of("2017-01-01/2017-01-02"), + compactionTask.getIoConfig().getInputSpec().getInterval() + ); + Assert.assertEquals( + new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null), + compactionTask.getGranularitySpec() ); - Assert.assertEquals(2, segmentsCaptor.getValue().size()); - ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue(); - Assert.assertNotNull(actual); - ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null); - Assert.assertEquals(expected, actual); } @Test @@ -1681,8 +1435,6 @@ public void testDetermineSegmentGranularityFromSegmentGranularityInCompactionCon ); dataSources = DataSourcesSnapshot.fromUsedSegments(segments, ImmutableMap.of()); - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); final List compactionConfigs = new ArrayList<>(); compactionConfigs.add( new DataSourceCompactionConfig( @@ -1721,33 +1473,22 @@ public void testDetermineSegmentGranularityFromSegmentGranularityInCompactionCon ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor> segmentsCaptor = ArgumentCaptor.forClass(List.class); - ArgumentCaptor granularitySpecArgumentCaptor = ArgumentCaptor.forClass( - ClientCompactionTaskGranularitySpec.class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - segmentsCaptor.capture(), - ArgumentMatchers.anyInt(), - ArgumentMatchers.any(), - granularitySpecArgumentCaptor.capture(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + + Assert.assertEquals( + Intervals.of("2017/2018"), + compactionTask.getIoConfig().getInputSpec().getInterval() + ); + Assert.assertEquals( + new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null), + compactionTask.getGranularitySpec() ); - Assert.assertEquals(2, segmentsCaptor.getValue().size()); - ClientCompactionTaskGranularitySpec actual = granularitySpecArgumentCaptor.getValue(); - Assert.assertNotNull(actual); - ClientCompactionTaskGranularitySpec expected = new ClientCompactionTaskGranularitySpec(Granularities.YEAR, null, null); - Assert.assertEquals(expected, actual); } @Test public void testCompactWithMetricsSpecShouldSetPreserveExistingMetricsTrue() { - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); final List compactionConfigs = new ArrayList<>(); final String dataSource = DATA_SOURCE_PREFIX + 0; compactionConfigs.add( @@ -1787,30 +1528,20 @@ public void testCompactWithMetricsSpecShouldSetPreserveExistingMetricsTrue() ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor clientCompactionTaskQueryTuningConfigArgumentCaptor = ArgumentCaptor.forClass( - ClientCompactionTaskQueryTuningConfig.class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.anyInt(), - clientCompactionTaskQueryTuningConfigArgumentCaptor.capture(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); - Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue()); - Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()); - Assert.assertTrue(((OnheapIncrementalIndex.Spec) clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()).isPreserveExistingMetrics()); + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + + Assert.assertNotNull(compactionTask.getTuningConfig()); + Assert.assertNotNull(compactionTask.getTuningConfig().getAppendableIndexSpec()); + + AppendableIndexSpec appendableIndexSpec = compactionTask.getTuningConfig().getAppendableIndexSpec(); + Assert.assertTrue(appendableIndexSpec instanceof OnheapIncrementalIndex.Spec); + Assert.assertTrue(((OnheapIncrementalIndex.Spec) appendableIndexSpec).isPreserveExistingMetrics()); } @Test public void testCompactWithoutMetricsSpecShouldSetPreserveExistingMetricsFalse() { - final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class); - final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, SEARCH_POLICY, mockIndexingServiceClient); final List compactionConfigs = new ArrayList<>(); final String dataSource = DATA_SOURCE_PREFIX + 0; compactionConfigs.add( @@ -1850,23 +1581,15 @@ public void testCompactWithoutMetricsSpecShouldSetPreserveExistingMetricsFalse() ) ); doCompactSegments(compactSegments, compactionConfigs); - ArgumentCaptor clientCompactionTaskQueryTuningConfigArgumentCaptor = ArgumentCaptor.forClass( - ClientCompactionTaskQueryTuningConfig.class); - Mockito.verify(mockIndexingServiceClient).compactSegments( - ArgumentMatchers.anyString(), - ArgumentMatchers.any(), - ArgumentMatchers.anyInt(), - clientCompactionTaskQueryTuningConfigArgumentCaptor.capture(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any() - ); - Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue()); - Assert.assertNotNull(clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()); - Assert.assertFalse(((OnheapIncrementalIndex.Spec) clientCompactionTaskQueryTuningConfigArgumentCaptor.getValue().getAppendableIndexSpec()).isPreserveExistingMetrics()); + Assert.assertEquals(1, overlordClient.submittedCompactionTasks.size()); + ClientCompactionTaskQuery compactionTask = overlordClient.submittedCompactionTasks.get(0); + + Assert.assertNotNull(compactionTask.getTuningConfig()); + Assert.assertNotNull(compactionTask.getTuningConfig().getAppendableIndexSpec()); + + AppendableIndexSpec appendableIndexSpec = compactionTask.getTuningConfig().getAppendableIndexSpec(); + Assert.assertTrue(appendableIndexSpec instanceof OnheapIncrementalIndex.Spec); + Assert.assertFalse(((OnheapIncrementalIndex.Spec) appendableIndexSpec).isPreserveExistingMetrics()); } private void verifySnapshot( @@ -2128,6 +1851,11 @@ private void addMoreData(String dataSource, int day) } } + private ListenableFuture futureOf(V value) + { + return Futures.immediateFuture(value); + } + private List createCompactionConfigs() { return createCompactionConfigs(null); @@ -2178,94 +1906,66 @@ private List createCompactionConfigs(@Nullable Integ return compactionConfigs; } - private class TestDruidLeaderClient extends DruidLeaderClient + private class TestOverlordClient extends NoopOverlordClient { - private final ObjectMapper jsonMapper; + // Given values + private final Map> datasourceToLockedIntervals = new HashMap<>(); + private final List activeTasks = new ArrayList<>(); + private final Map taskIdToPayload = new HashMap<>(); - // Map from Task Id to the intervals locked by that task - private final Map> lockedIntervals = new HashMap<>(); - - // List of submitted compaction tasks for verification in the tests + // Captured values private final List submittedCompactionTasks = new ArrayList<>(); + private final List cancelledTaskIds = new ArrayList<>(); private int compactVersionSuffix = 0; - private TestDruidLeaderClient(ObjectMapper jsonMapper) + @Override + public ListenableFuture totalWorkerCapacity() { - super(null, new TestNodeDiscoveryProvider(), null, null); - this.jsonMapper = jsonMapper; + return futureOf(10); } @Override - public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException + public ListenableFuture totalWorkerCapacityWithAutoScale() { - return new Request(httpMethod, new URL("http", "host", 8090, urlPath)); + return futureOf(10); } @Override - public StringFullResponseHolder go(Request request) throws IOException + public ListenableFuture>> lockedIntervals(Map datasourceToMinTaskPriority) { - final String urlString = request.getUrl().toString(); - if (urlString.contains("/druid/indexer/v1/task")) { - return handleTask(request); - } else if (urlString.contains("/druid/indexer/v1/workers")) { - return handleWorkers(); - } else if (urlString.contains("/druid/indexer/v1/totalWorkerCapacity")) { - return handleTotalWorkerCapacity(); - } else if (urlString.contains("/druid/indexer/v1/waitingTasks") - || urlString.contains("/druid/indexer/v1/pendingTasks") - || urlString.contains("/druid/indexer/v1/runningTasks")) { - return createStringFullResponseHolder(jsonMapper.writeValueAsString(Collections.emptyList())); - } else if (urlString.contains(("/druid/indexer/v1/lockedIntervals"))) { - return handleLockedIntervals(); - } else { - throw new IAE("Cannot handle request for url[%s]", request.getUrl()); - } + return futureOf(datasourceToLockedIntervals); } - private StringFullResponseHolder createStringFullResponseHolder(String content) + @Override + public ListenableFuture> allActiveTasks() { - final HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - final StringFullResponseHolder holder = new StringFullResponseHolder( - httpResponse, - StandardCharsets.UTF_8 - ); - holder.addChunk(content); - return holder; + return futureOf(activeTasks); } - private StringFullResponseHolder handleWorkers() throws JsonProcessingException + @Override + public ListenableFuture taskPayload(String taskId) { - final List workerInfos = new ArrayList<>(); - // There are 10 workers available in this test - for (int i = 0; i < 10; i++) { - workerInfos.add( - new IndexingWorkerInfo( - new IndexingWorker("http", "host", "8091", 1, "version"), - 0, - Collections.emptySet(), - Collections.emptyList(), - DateTimes.EPOCH, - null - ) - ); - } - return createStringFullResponseHolder(jsonMapper.writeValueAsString(workerInfos)); + return futureOf(taskIdToPayload.get(taskId)); } - private StringFullResponseHolder handleTotalWorkerCapacity() throws JsonProcessingException + @Override + public ListenableFuture runTask(String taskId, Object taskObject) { - IndexingTotalWorkerCapacityInfo info = new IndexingTotalWorkerCapacityInfo(5, 10); - return createStringFullResponseHolder(jsonMapper.writeValueAsString(info)); + Assert.assertTrue(taskObject instanceof ClientCompactionTaskQuery); + handleTask((ClientCompactionTaskQuery) taskObject); + return futureOf(null); } - private StringFullResponseHolder handleTask(Request request) throws IOException + @Override + public ListenableFuture cancelTask(String taskId) + { + cancelledTaskIds.add(taskId); + return futureOf(null); + } + + private void handleTask(ClientCompactionTaskQuery compactionTaskQuery) { - final ClientTaskQuery taskQuery = jsonMapper.readValue(request.getContent().array(), ClientTaskQuery.class); - if (!(taskQuery instanceof ClientCompactionTaskQuery)) { - throw new IAE("Cannot run non-compaction task"); - } - final ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery) taskQuery; submittedCompactionTasks.add(compactionTaskQuery); final Interval intervalToCompact = compactionTaskQuery.getIoConfig().getInputSpec().getInterval(); @@ -2278,16 +1978,10 @@ private StringFullResponseHolder handleTask(Request request) throws IOException .collect(Collectors.toList()); compactSegments(timeline, segments, compactionTaskQuery); - return createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task", taskQuery.getId()))); - } - - private StringFullResponseHolder handleLockedIntervals() throws IOException - { - return createStringFullResponseHolder(jsonMapper.writeValueAsString(lockedIntervals)); } private void compactSegments( - VersionedIntervalTimeline timeline, + SegmentTimeline timeline, List segments, ClientCompactionTaskQuery clientCompactionTaskQuery ) @@ -2302,7 +1996,6 @@ private void compactSegments( maxEnd = segment.getInterval().getEnd(); } } - Interval compactInterval = new Interval(minStart, maxEnd); segments.forEach( segment -> timeline.remove( segment.getInterval(), @@ -2316,7 +2009,8 @@ private void compactSegments( if (clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec() instanceof DynamicPartitionsSpec) { compactionPartitionsSpec = new DynamicPartitionsSpec( clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec().getMaxRowsPerSegment(), - ((DynamicPartitionsSpec) clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec()).getMaxTotalRowsOr(Long.MAX_VALUE) + ((DynamicPartitionsSpec) clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec()) + .getMaxTotalRowsOr(Long.MAX_VALUE) ); } else { compactionPartitionsSpec = clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec(); @@ -2325,12 +2019,11 @@ private void compactSegments( Map transformSpec = null; try { if (clientCompactionTaskQuery.getTransformSpec() != null) { - transformSpec = jsonMapper.readValue( - jsonMapper.writeValueAsString(new TransformSpec(clientCompactionTaskQuery.getTransformSpec() - .getFilter(), null)), - new TypeReference>() - { - } + transformSpec = JSON_MAPPER.readValue( + JSON_MAPPER.writeValueAsString( + new TransformSpec(clientCompactionTaskQuery.getTransformSpec().getFilter(), null) + ), + new TypeReference>() {} ); } } @@ -2340,9 +2033,13 @@ private void compactSegments( List metricsSpec = null; if (clientCompactionTaskQuery.getMetricsSpec() != null) { - metricsSpec = jsonMapper.convertValue(clientCompactionTaskQuery.getMetricsSpec(), new TypeReference>() {}); + metricsSpec = JSON_MAPPER.convertValue( + clientCompactionTaskQuery.getMetricsSpec(), + new TypeReference>() {} + ); } + final Interval compactInterval = new Interval(minStart, maxEnd); for (int i = 0; i < 2; i++) { DataSegment compactSegment = new DataSegment( segments.get(0).getDataSource(), @@ -2354,20 +2051,16 @@ private void compactSegments( shardSpecFactory.apply(i, 2), new CompactionState( compactionPartitionsSpec, - clientCompactionTaskQuery.getDimensionsSpec() == null ? null : new DimensionsSpec( - clientCompactionTaskQuery.getDimensionsSpec().getDimensions() - ), + clientCompactionTaskQuery.getDimensionsSpec() == null + ? null + : new DimensionsSpec(clientCompactionTaskQuery.getDimensionsSpec().getDimensions()), metricsSpec, transformSpec, ImmutableMap.of( - "bitmap", - ImmutableMap.of("type", "roaring"), - "dimensionCompression", - "lz4", - "metricCompression", - "lz4", - "longEncoding", - "longs" + "bitmap", ImmutableMap.of("type", "roaring"), + "dimensionCompression", "lz4", + "metricCompression", "lz4", + "longEncoding", "longs" ), ImmutableMap.of() ), @@ -2384,21 +2077,6 @@ private void compactSegments( } } - private static class TestNodeDiscoveryProvider extends DruidNodeDiscoveryProvider - { - @Override - public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole) - { - throw new UnsupportedOperationException(); - } - - @Override - public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole) - { - return EasyMock.niceMock(DruidNodeDiscovery.class); - } - } - public static class StaticUtilsTest { @Test