From 23ea578b788664284f57ebcf4e7d83ff032dea2f Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Sat, 25 Feb 2023 19:40:39 -0800 Subject: [PATCH 1/3] Continuous automatic compaction This change introduces the ability to have auto-compaction continuously schedule compaction tasks as slots become available. Previously, each run of the CompactSegments duty built an iterator based on the latest segment metadata available. This meant that if the compact tasks that were scheduled ran into any issues, like task lock contention, or an interval which can not be compacted because of a bug, auto-compaction would be stuck on the cluster. With this change, CompactSegments, refreshes it's view of the segments based on the `druid.coordinator.compaction.searchPolicyRefreshPeriod` property. This allows auto-compaction to continue to make progress if any interval fails to compact until the search policy is refreshed. Compaction statistics for the cluster are only refreshed when the search policy is refreshed. This is because to collect statistics, the task has to run through the entire list of available segments on the cluster which can take a long time on large clusters To enable this behavior on the cluster, add something like this to the coordinator runtime properties ``` druid.coordinator.dutyGroups=["compaction"] druid.coordinator.compaction.duties=["compactSegments"] druid.coordinator.compaction.period=PT60S ``` Key changes * CompactionSegmentSearchPolicy#resetIfNeeded * CompactSegments#makeStats Future changes: - With this change an operator still needs to place the CompactSegments duty in a custom duty group with a faster period than the Indexing duty period to take advantage of this change. A future change should change this so that auto-compaction is scheduled to run continuously by default. - A new API should be introduced to enable a cluster operator to force reset the CompactionSegmentSearchPolicy at the next CompactSegments run. This will be useful in demo situations, or if a large ingestion has just taken place and the cluster operator wants to get auto-compaction to start processing these segments immediately. --- .../NewestSegmentFirstPolicyBenchmark.java | 11 +++- .../coordinator/DruidCoordinatorConfig.java | 3 ++ .../coordinator/duty/CompactSegments.java | 28 +++++++---- .../duty/CompactionSegmentIterator.java | 6 +++ .../duty/CompactionSegmentSearchPolicy.java | 13 +++++ .../duty/NewestSegmentFirstIterator.java | 17 +++++++ .../duty/NewestSegmentFirstPolicy.java | 50 +++++++++++++++++-- .../druid/server/http/CompactionResource.java | 4 +- .../coordinator/DruidCoordinatorTest.java | 5 +- .../TestDruidCoordinatorConfig.java | 22 +++++++- .../coordinator/duty/CompactSegmentsTest.java | 5 +- .../duty/NewestSegmentFirstPolicyTest.java | 5 +- .../CoordinatorSimulationBuilder.java | 8 +-- .../org/apache/druid/cli/CliCoordinator.java | 4 +- 14 files changed, 154 insertions(+), 27 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java index 531003352123..2a17564f87d6 100644 --- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java @@ -32,6 +32,7 @@ import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Interval; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -45,6 +46,7 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.infra.Blackhole; +import java.time.Clock; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -61,7 +63,14 @@ public class NewestSegmentFirstPolicyBenchmark { private static final String DATA_SOURCE_PREFIX = "dataSource_"; - private final CompactionSegmentSearchPolicy policy = new NewestSegmentFirstPolicy(new DefaultObjectMapper()); + private final CompactionSegmentSearchPolicy policy = + new NewestSegmentFirstPolicy( + new DefaultObjectMapper(), + new TestDruidCoordinatorConfig.Builder() + .withCompactionSearchPolicyRefreshPeriod(Duration.standardMinutes(5)) + .build(), + Clock.systemUTC() + ); @Param("100") private int numDataSources; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java index 9495f30f2c87..63152fd50ee5 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java @@ -137,4 +137,7 @@ public boolean getCompactionSkipLockedIntervals() return true; } + @Config("druid.coordinator.compaction.searchPolicyRefreshPeriod") + @Default("PT5M") + public abstract Duration getCompactionSearchPolicyRefreshPeriod(); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 47456a28dbc4..c279e3bedc7b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -34,6 +34,7 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.logger.Logger; @@ -185,8 +186,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) .addAll(intervals) ); - final CompactionSegmentIterator iterator = - policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction); + final Pair iteratorAndReset = + policy.resetIfNeeded(compactionConfigs, dataSources, intervalsToSkipCompaction); int totalCapacity; if (dynamicConfig.isUseAutoScaleSlots()) { @@ -232,11 +233,15 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) compactionConfigs, currentRunAutoCompactionSnapshotBuilders, numAvailableCompactionTaskSlots, - iterator + iteratorAndReset.lhs, + iteratorAndReset.rhs ) ); } else { - stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, 0, iterator)); + stats.addToGlobalStat(COMPACTION_TASK_COUNT, 0); + if (iteratorAndReset.rhs){ + stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, iteratorAndReset.lhs)); + } } } else { LOG.info("compactionConfig is empty. Skip."); @@ -350,7 +355,8 @@ private CoordinatorStats doRun( Map compactionConfigs, Map currentRunAutoCompactionSnapshotBuilders, int numAvailableCompactionTaskSlots, - CompactionSegmentIterator iterator + CompactionSegmentIterator iterator, + boolean calculateStats ) { int numSubmittedTasks = 0; @@ -474,7 +480,9 @@ private CoordinatorStats doRun( } } - return makeStats(currentRunAutoCompactionSnapshotBuilders, numSubmittedTasks, iterator); + final CoordinatorStats stats = new CoordinatorStats(); + stats.addToGlobalStat(COMPACTION_TASK_COUNT, numSubmittedTasks); + return calculateStats ? stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, iterator)) : stats; } private Map newAutoCompactionContext(@Nullable Map configuredContext) @@ -488,19 +496,17 @@ private Map newAutoCompactionContext(@Nullable Map currentRunAutoCompactionSnapshotBuilders, - int numCompactionTasks, CompactionSegmentIterator iterator ) { final Map currentAutoCompactionSnapshotPerDataSource = new HashMap<>(); final CoordinatorStats stats = new CoordinatorStats(); - stats.addToGlobalStat(COMPACTION_TASK_COUNT, numCompactionTasks); // Iterate through all the remaining segments in the iterator. // As these segments could be compacted but were not compacted due to lack of task slot, we will aggregates // the statistic to the AwaitingCompaction statistics - while (iterator.hasNext()) { - final List segmentsToCompact = iterator.next(); + + iterator.forEachRemainingSegmentsToCompact(segmentsToCompact -> { if (!segmentsToCompact.isEmpty()) { final String dataSourceName = segmentsToCompact.get(0).getDataSource(); AutoCompactionSnapshot.Builder snapshotBuilder = currentRunAutoCompactionSnapshotBuilders.computeIfAbsent( @@ -520,7 +526,7 @@ private CoordinatorStats makeStats( ); snapshotBuilder.incrementSegmentCountAwaitingCompaction(segmentsToCompact.size()); } - } + }); // Statistics of all segments considered compacted after this run Map allCompactedStatistics = iterator.totalCompactedStatistics(); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java index 64f5c16a17ca..fd6b83774c2d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentIterator.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Consumer; /** * Segments in the lists which are the elements of this iterator are sorted according to the natural segment order @@ -48,4 +49,9 @@ public interface CompactionSegmentIterator extends Iterator> */ Map totalSkippedStatistics(); + /** + * Performs the given action over the remaining segments to compact in the iterator. Similar to + * {@link #forEachRemaining} however, this iteration does not advance the iterator. + */ + void forEachRemainingSegmentsToCompact(Consumer> action); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java index 2cbaf31d69e1..5e90b126ae3c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java @@ -19,6 +19,9 @@ package org.apache.druid.server.coordinator.duty; +import com.google.common.base.Suppliers; +import com.google.common.cache.CacheBuilder; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.timeline.SegmentTimeline; import org.joda.time.Interval; @@ -31,6 +34,16 @@ */ public interface CompactionSegmentSearchPolicy { + /** + * @return An iterator over the intervals that need to be compacted. The iterator may not be reset on every call. + * Callers who want to guarantee that the iterator will be reset, should call {@link #reset} instead. + */ + Pair resetIfNeeded( + Map compactionConfigs, + Map dataSources, + Map> skipIntervals + ); + /** * Reset the current states of this policy. This method should be called whenever iterating starts. */ diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java index d746d91c346a..dbf314f452af 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java @@ -73,6 +73,7 @@ import java.util.Objects; import java.util.PriorityQueue; import java.util.Set; +import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -195,6 +196,17 @@ public Map totalSkippedStatistics() return skippedSegments; } + @Override + public void forEachRemainingSegmentsToCompact(Consumer> action) + { + for (CompactibleTimelineObjectHolderCursor compactibleTimelineObjectHolderCursor : timelineIterators.values()) { + for (TimelineObjectHolder holder : compactibleTimelineObjectHolderCursor.holders) { + List candidates = compactibleTimelineObjectHolderCursor.getCandidates(holder); + action.accept(candidates); + } + } + } + @Override public boolean hasNext() { @@ -310,6 +322,11 @@ public List next() throw new NoSuchElementException(); } TimelineObjectHolder timelineObjectHolder = holders.remove(holders.size() - 1); + return getCandidates(timelineObjectHolder); + } + + private List getCandidates(TimelineObjectHolder timelineObjectHolder) + { List candidates = Streams.sequentialStreamFrom(timelineObjectHolder.getObject()) .map(PartitionChunk::getObject) .collect(Collectors.toList()); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java index ce4f0e1066e3..4989bbcaf559 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java @@ -20,34 +20,78 @@ package org.apache.druid.server.coordinator.duty; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.google.inject.Inject; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.timeline.SegmentTimeline; import org.joda.time.Interval; +import java.time.Clock; import java.util.List; import java.util.Map; /** * This policy searches segments for compaction from the newest one to oldest one. + * The {@link #resetIfNeeded} functionality is inspired by {@link com.google.common.base.Suppliers.ExpiringMemoizingSupplier}. */ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy { private final ObjectMapper objectMapper; + private final long durationMillis; + private transient volatile NewestSegmentFirstIterator iterator; + // The special value 0 means "not yet initialized". + private transient volatile long expirationMillis; + private final Clock clock; @Inject - public NewestSegmentFirstPolicy(ObjectMapper objectMapper) + public NewestSegmentFirstPolicy(ObjectMapper objectMapper, DruidCoordinatorConfig config, Clock clock) { this.objectMapper = objectMapper; + this.durationMillis = config.getCompactionSearchPolicyRefreshPeriod().getMillis(); + this.clock = clock; + Preconditions.checkArgument(durationMillis > 0); } @Override - public CompactionSegmentIterator reset( + public Pair resetIfNeeded( Map compactionConfigs, Map dataSources, Map> skipIntervals ) { - return new NewestSegmentFirstIterator(objectMapper, compactionConfigs, dataSources, skipIntervals); + long millis = expirationMillis; + long now = clock.millis(); + if (millis == 0 || now - millis >= 0) { + synchronized (this) { + if (millis == expirationMillis) { + NewestSegmentFirstIterator t = reset(compactionConfigs, dataSources, skipIntervals); + iterator = t; + // reset can be slow + expirationMillis = clock.millis() + durationMillis; + return Pair.of(t, true); + } + } + } + return Pair.of(iterator, false); + } + + @Override + public synchronized NewestSegmentFirstIterator reset( + Map compactionConfigs, + Map dataSources, + Map> skipIntervals + ) + { + NewestSegmentFirstIterator t = new NewestSegmentFirstIterator( + objectMapper, + compactionConfigs, + dataSources, + skipIntervals + ); + iterator = t; + expirationMillis = clock.millis() + durationMillis; + return t; } } diff --git a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java index e88d0cdacfb2..e0da556798e3 100644 --- a/server/src/main/java/org/apache/druid/server/http/CompactionResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CompactionResource.java @@ -52,7 +52,9 @@ public CompactionResource( } /** - * This API is meant to only be used by Druid's integration tests. + * This API schedules an out of compaction task. It's primary purpose was for testing, but can be used in production + * clusters. If you are using this API excessively, you should look at scheduling the CompactSegments duty in a custom + * duty group at a shorter period. */ @POST @Path("/compact") diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 4076ce9d3a87..cd740d2853cd 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -76,6 +76,7 @@ import org.junit.Test; import javax.annotation.Nullable; +import java.time.Clock; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -146,13 +147,13 @@ public void setUp() throws Exception curator.blockUntilConnected(); curator.create().creatingParentsIfNeeded().forPath(LOADPATH); objectMapper = new DefaultObjectMapper(); - newestSegmentFirstPolicy = new NewestSegmentFirstPolicy(objectMapper); druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder() .withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY)) .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD)) .withCoordinatorKillPeriod(new Duration(COORDINATOR_PERIOD)) .withCoordinatorKillIgnoreDurationToRetain(false) .build(); + newestSegmentFirstPolicy = new NewestSegmentFirstPolicy(objectMapper, druidCoordinatorConfig, Clock.systemUTC()); pathChildrenCache = new PathChildrenCache( curator, LOADPATH, @@ -214,7 +215,7 @@ public void testMoveSegment() EasyMock.replay(segment); loadQueuePeon = EasyMock.createNiceMock(LoadQueuePeon.class); - EasyMock.expect(loadQueuePeon.getLoadQueueSize()).andReturn(new Long(1)); + EasyMock.expect(loadQueuePeon.getLoadQueueSize()).andReturn(1L); loadQueuePeon.markSegmentToDrop(segment); EasyMock.expectLastCall().once(); Capture loadCallbackCapture = Capture.newInstance(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java index 271bda113143..05fc8f5b2f16 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java @@ -47,6 +47,7 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig private final int curatorLoadQueuePeonNumCallbackThreads; private final Duration httpLoadQueuePeonHostTimeout; private final int httpLoadQueuePeonBatchSize; + private final Duration compactionSearchPolicyRefreshPeriod; public TestDruidCoordinatorConfig( Duration coordinatorStartDelay, @@ -72,7 +73,8 @@ public TestDruidCoordinatorConfig( Duration httpLoadQueuePeonRepeatDelay, Duration httpLoadQueuePeonHostTimeout, int httpLoadQueuePeonBatchSize, - int curatorLoadQueuePeonNumCallbackThreads + int curatorLoadQueuePeonNumCallbackThreads, + Duration compactionSearchPolicyRefreshPeriod ) { this.coordinatorStartDelay = coordinatorStartDelay; @@ -99,6 +101,7 @@ public TestDruidCoordinatorConfig( this.httpLoadQueuePeonHostTimeout = httpLoadQueuePeonHostTimeout; this.httpLoadQueuePeonBatchSize = httpLoadQueuePeonBatchSize; this.curatorLoadQueuePeonNumCallbackThreads = curatorLoadQueuePeonNumCallbackThreads; + this.compactionSearchPolicyRefreshPeriod = compactionSearchPolicyRefreshPeriod; } @Override @@ -209,6 +212,12 @@ public boolean getCompactionSkipLockedIntervals() return compactionSkipLockedIntervals; } + @Override + public Duration getCompactionSearchPolicyRefreshPeriod() + { + return compactionSearchPolicyRefreshPeriod; + } + @Override public boolean getCoordinatorKillIgnoreDurationToRetain() { @@ -271,6 +280,7 @@ public static class Builder private static final boolean DEFAULT_COMPACTION_SKIP_LOCKED_INTERVALS = true; private static final Duration DEFAULT_COORDINATOR_AUDIT_KILL_PERIOD = new Duration("PT86400s"); private static final Duration DEFAULT_COORDINATOR_AUTIT_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s"); + private static final Duration DEFAULT_COMPACTION_SEARCH_POLICY_REFRESH_PERIOD = new Duration("PT5M"); private Duration coordinatorStartDelay; @@ -297,6 +307,7 @@ public static class Builder private Boolean compactionSkippedLockedIntervals; private Duration coordinatorAuditKillPeriod; private Duration coordinatorAuditKillDurationToRetain; + private Duration compactionSearchPolicyRefreshPeriod; public Builder() { @@ -446,6 +457,12 @@ public Builder withCoordinatorAuditKillDurationToRetain(Duration coordinatorAudi return this; } + public Builder withCompactionSearchPolicyRefreshPeriod(Duration compactionSearchPolicyRefreshPeriod) + { + this.compactionSearchPolicyRefreshPeriod = compactionSearchPolicyRefreshPeriod; + return this; + } + public TestDruidCoordinatorConfig build() { return new TestDruidCoordinatorConfig( @@ -473,7 +490,8 @@ public TestDruidCoordinatorConfig build() httpLoadQueuePeonHostTimeout == null ? DEFAULT_HTTP_LOAD_QUEUE_PEON_HOST_TIMEOUT : httpLoadQueuePeonHostTimeout, httpLoadQueuePeonBatchSize == null ? DEFAULT_HTTP_LOAD_QUEUE_PEON_BATCH_SIZE : httpLoadQueuePeonBatchSize, curatorLoadQueuePeonNumCallbackThreads == null ? DEFAULT_CURATOR_LOAD_QUEUE_PEON_NUM_CALLBACK_THREADS - : curatorLoadQueuePeonNumCallbackThreads + : curatorLoadQueuePeonNumCallbackThreads, + compactionSearchPolicyRefreshPeriod == null ? DEFAULT_COMPACTION_SEARCH_POLICY_REFRESH_PERIOD : compactionSearchPolicyRefreshPeriod ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 510689c9ee6d..20626434b6c5 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -79,6 +79,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCoordinatorConfig; import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig; @@ -116,6 +117,7 @@ import java.io.IOException; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.time.Clock; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -139,7 +141,8 @@ public class CompactSegmentsTest private static final int TOTAL_SEGMENT_PER_DATASOURCE = 44; private static final int TOTAL_INTERVAL_PER_DATASOURCE = 11; private static final int MAXIMUM_CAPACITY_WITH_AUTO_SCALE = 10; - private static final NewestSegmentFirstPolicy SEARCH_POLICY = new NewestSegmentFirstPolicy(JSON_MAPPER); + private static final NewestSegmentFirstPolicy SEARCH_POLICY = + new NewestSegmentFirstPolicy(JSON_MAPPER, new TestDruidCoordinatorConfig.Builder().build(), Clock.systemUTC()); @Parameterized.Parameters(name = "{0}") public static Collection constructorFeeder() diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java index 73abcb8d5eac..ad1d87499a0b 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java @@ -48,6 +48,7 @@ import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; +import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig; import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig; import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig; import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig; @@ -65,6 +66,7 @@ import org.junit.Assert; import org.junit.Test; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -81,7 +83,8 @@ public class NewestSegmentFirstPolicyTest private static final long DEFAULT_SEGMENT_SIZE = 1000; private static final int DEFAULT_NUM_SEGMENTS_PER_SHARD = 4; private ObjectMapper mapper = new DefaultObjectMapper(); - private final NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(mapper); + private final NewestSegmentFirstPolicy policy = + new NewestSegmentFirstPolicy(mapper, new TestDruidCoordinatorConfig.Builder().build(), Clock.systemUTC()); @Test public void testLargeOffsetAndSmallSegmentInterval() diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java index 054d34422d9e..f13a5b8f3984 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java @@ -56,6 +56,7 @@ import org.easymock.EasyMock; import org.joda.time.Duration; +import java.time.Clock; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -80,8 +81,6 @@ public class CoordinatorSimulationBuilder DataSegment.PruneSpecsHolder.DEFAULT ) ); - private static final CompactionSegmentSearchPolicy COMPACTION_SEGMENT_SEARCH_POLICY = - new NewestSegmentFirstPolicy(OBJECT_MAPPER); private String balancerStrategy; private CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig.builder() @@ -193,7 +192,8 @@ public CoordinatorSimulation build() (datasource, rules) -> env.ruleManager.overrideRule(datasource, rules, null) ); - + CompactionSegmentSearchPolicy compactionSegmentSearchPolicy = + new NewestSegmentFirstPolicy(OBJECT_MAPPER, env.coordinatorConfig, Clock.systemUTC()); // Build the coordinator final DruidCoordinator coordinator = new DruidCoordinator( env.coordinatorConfig, @@ -213,7 +213,7 @@ public CoordinatorSimulation build() createBalancerStrategy(env), env.lookupCoordinatorManager, env.leaderSelector, - COMPACTION_SEGMENT_SEARCH_POLICY + compactionSegmentSearchPolicy ); return new SimulationImpl(coordinator, env); diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 5684c3871c39..04ff5d066012 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -119,6 +119,7 @@ import org.eclipse.jetty.server.Server; import org.joda.time.Duration; +import java.time.Clock; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -315,7 +316,8 @@ public void configure(Binder binder) ); //TODO: make this configurable when there are multiple search policies - binder.bind(CompactionSegmentSearchPolicy.class).to(NewestSegmentFirstPolicy.class); + binder.bind(CompactionSegmentSearchPolicy.class).to(NewestSegmentFirstPolicy.class).in(LazySingleton.class); + binder.bind(Clock.class).toInstance(Clock.systemUTC()); bindAnnouncer( binder, From a40a4060e94308504695bfdba34157a11f37f134 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Mon, 27 Feb 2023 19:25:10 -0800 Subject: [PATCH 2/3] fix checkstyle --- .../coordinator/duty/CompactSegments.java | 45 +++++++++++++------ .../duty/CompactionSegmentSearchPolicy.java | 2 - 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 584495047348..7e91934956f4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -75,9 +75,13 @@ public class CompactSegments implements CoordinatorCustomDuty static final String TOTAL_COUNT_OF_SEGMENTS_COMPACTED = "segmentCountCompacted"; static final String TOTAL_INTERVAL_OF_SEGMENTS_COMPACTED = "segmentIntervalCompacted"; - /** Must be synced with org.apache.druid.indexing.common.task.CompactionTask.TYPE. */ + /** + * Must be synced with org.apache.druid.indexing.common.task.CompactionTask.TYPE. + */ public static final String COMPACTION_TASK_TYPE = "compact"; - /** Must be synced with org.apache.druid.indexing.common.task.Tasks.STORE_COMPACTION_STATE_KEY */ + /** + * Must be synced with org.apache.druid.indexing.common.task.Tasks.STORE_COMPACTION_STATE_KEY + */ public static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState"; private static final Logger LOG = new Logger(CompactSegments.class); @@ -148,17 +152,21 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery) response.getPayload(); DataSourceCompactionConfig dataSourceCompactionConfig = compactionConfigs.get(status.getDataSource()); if (dataSourceCompactionConfig != null && dataSourceCompactionConfig.getGranularitySpec() != null) { - Granularity configuredSegmentGranularity = dataSourceCompactionConfig.getGranularitySpec().getSegmentGranularity(); + Granularity configuredSegmentGranularity = dataSourceCompactionConfig.getGranularitySpec() + .getSegmentGranularity(); if (configuredSegmentGranularity != null && compactionTaskQuery.getGranularitySpec() != null - && !configuredSegmentGranularity.equals(compactionTaskQuery.getGranularitySpec().getSegmentGranularity())) { + && !configuredSegmentGranularity.equals(compactionTaskQuery.getGranularitySpec() + .getSegmentGranularity())) { // We will cancel active compaction task if segmentGranularity changes and we will need to // re-compact the interval - LOG.info("Canceled task[%s] as task segmentGranularity is [%s] but compaction config " - + "segmentGranularity is [%s]", - status.getId(), - compactionTaskQuery.getGranularitySpec().getSegmentGranularity(), - configuredSegmentGranularity); + LOG.info( + "Canceled task[%s] as task segmentGranularity is [%s] but compaction config " + + "segmentGranularity is [%s]", + status.getId(), + compactionTaskQuery.getGranularitySpec().getSegmentGranularity(), + configuredSegmentGranularity + ); indexingServiceClient.cancelTask(status.getId()); continue; } @@ -238,7 +246,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) ); } else { stats.addToGlobalStat(COMPACTION_TASK_COUNT, 0); - if (iteratorAndReset.rhs){ + if (iteratorAndReset.rhs) { stats.accumulate(makeStats(currentRunAutoCompactionSnapshotBuilders, iteratorAndReset.lhs)); } } @@ -305,7 +313,8 @@ private Map> getLockedIntervalsToSkip( static int findMaxNumTaskSlotsUsedByOneCompactionTask(@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig) { if (isParallelMode(tuningConfig)) { - @Nullable Integer maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks(); + @Nullable + Integer maxNumConcurrentSubTasks = tuningConfig.getMaxNumConcurrentSubTasks(); // Max number of task slots used in parallel mode = maxNumConcurrentSubTasks + 1 (supervisor task) return (maxNumConcurrentSubTasks == null ? 1 : maxNumConcurrentSubTasks) + 1; } else { @@ -373,7 +382,10 @@ private CoordinatorStats doRun( k -> new AutoCompactionSnapshot.Builder(k, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING) ); snapshotBuilder.incrementBytesCompacted(segmentsToCompact.stream().mapToLong(DataSegment::getSize).sum()); - snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream().map(DataSegment::getInterval).distinct().count()); + snapshotBuilder.incrementIntervalCountCompacted(segmentsToCompact.stream() + .map(DataSegment::getInterval) + .distinct() + .count()); snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size()); final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName); @@ -396,7 +408,8 @@ private CoordinatorStats doRun( LOG.warn("Cannot determine segmentGranularity from interval [%s]", interval); } } else { - LOG.warn("segmentsToCompact does not have the same interval. Fallback to not setting segmentGranularity for auto compaction task"); + LOG.warn( + "segmentsToCompact does not have the same interval. Fallback to not setting segmentGranularity for auto compaction task"); } } else { segmentGranularityToUse = config.getGranularitySpec().getSegmentGranularity(); @@ -456,7 +469,11 @@ private CoordinatorStats doRun( "coordinator-issued", segmentsToCompact, config.getTaskPriority(), - ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), config.getMetricsSpec() != null), + ClientCompactionTaskQueryTuningConfig.from( + config.getTuningConfig(), + config.getMaxRowsPerSegment(), + config.getMetricsSpec() != null + ), granularitySpec, dimensionsSpec, config.getMetricsSpec(), diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java index 5e90b126ae3c..707a2145a66b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java @@ -19,8 +19,6 @@ package org.apache.druid.server.coordinator.duty; -import com.google.common.base.Suppliers; -import com.google.common.cache.CacheBuilder; import org.apache.druid.java.util.common.Pair; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.timeline.SegmentTimeline; From b67c7f2ce0b71a468d7a9f23fd816693f60ad5af Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Tue, 28 Feb 2023 21:09:20 -0800 Subject: [PATCH 3/3] code review + tests --- .../duty/CompactionSegmentSearchPolicy.java | 4 ++++ .../coordinator/duty/NewestSegmentFirstPolicy.java | 11 ++++++++--- .../coordinator/TestDruidCoordinatorConfig.java | 2 +- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java index 707a2145a66b..41fb14c11b38 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactionSegmentSearchPolicy.java @@ -35,6 +35,8 @@ public interface CompactionSegmentSearchPolicy /** * @return An iterator over the intervals that need to be compacted. The iterator may not be reset on every call. * Callers who want to guarantee that the iterator will be reset, should call {@link #reset} instead. + * + * Implementations SHOULD ensure that calls to this method and {@link #reset} are thread safe. */ Pair resetIfNeeded( Map compactionConfigs, @@ -44,6 +46,8 @@ Pair resetIfNeeded( /** * Reset the current states of this policy. This method should be called whenever iterating starts. + * + * Implementations SHOULD ensure that calls to this method and {@link #resetIfNeeded} are thread safe. */ CompactionSegmentIterator reset( Map compactionConfigs, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java index 4989bbcaf559..7dbe7b9cfb4d 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicy.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import org.apache.druid.java.util.common.Pair; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -40,9 +41,11 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy { private final ObjectMapper objectMapper; private final long durationMillis; - private transient volatile NewestSegmentFirstIterator iterator; + @GuardedBy("this") + private volatile NewestSegmentFirstIterator iterator; // The special value 0 means "not yet initialized". - private transient volatile long expirationMillis; + @GuardedBy("this") + private volatile long expirationMillis; private final Clock clock; @Inject @@ -61,6 +64,8 @@ public Pair resetIfNeeded( Map> skipIntervals ) { + // This implementation was inspired by Suppliers.memoizeWithExpiration. + // resetIfNeeded and reset can be called from different threads. long millis = expirationMillis; long now = clock.millis(); if (millis == 0 || now - millis >= 0) { @@ -68,7 +73,7 @@ public Pair resetIfNeeded( if (millis == expirationMillis) { NewestSegmentFirstIterator t = reset(compactionConfigs, dataSources, skipIntervals); iterator = t; - // reset can be slow + // reset can be slow, so use the current time to set the new expiryexpiry expirationMillis = clock.millis() + durationMillis; return Pair.of(t, true); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java index 05fc8f5b2f16..826cbaac62d6 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java @@ -280,7 +280,7 @@ public static class Builder private static final boolean DEFAULT_COMPACTION_SKIP_LOCKED_INTERVALS = true; private static final Duration DEFAULT_COORDINATOR_AUDIT_KILL_PERIOD = new Duration("PT86400s"); private static final Duration DEFAULT_COORDINATOR_AUTIT_KILL_DURATION_TO_RETAIN = new Duration("PT7776000s"); - private static final Duration DEFAULT_COMPACTION_SEARCH_POLICY_REFRESH_PERIOD = new Duration("PT5M"); + private static final Duration DEFAULT_COMPACTION_SEARCH_POLICY_REFRESH_PERIOD = new Duration("PT300s"); private Duration coordinatorStartDelay;