diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java index 6ed790e6a7ea..c2f07d12495b 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java @@ -23,15 +23,20 @@ import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.emitter.kafka.KafkaEmitter; import org.apache.druid.emitter.kafka.KafkaEmitterModule; +import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.indexing.compact.CompactionSupervisorSpec; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.indexing.kafka.simulate.KafkaResource; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.query.DruidMetrics; import org.apache.druid.rpc.UpdateResponse; import org.apache.druid.rpc.indexing.OverlordClient; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy; +import org.apache.druid.server.compaction.MostFragmentedIntervalFirstPolicy; +import org.apache.druid.server.compaction.NewestSegmentFirstPolicy; import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig; @@ -50,10 +55,14 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Embedded test to emit cluster metrics using a {@link KafkaEmitter} and then @@ -96,7 +105,8 @@ public void stop() } }; - indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s") + indexer.setServerMemory(1_000_000_000L) + .addProperty("druid.segment.handoff.pollDuration", "PT0.1s") .addProperty("druid.worker.capacity", "10"); overlord.addProperty("druid.indexer.task.default.context", "{\"useConcurrentLocks\": true}") .addProperty("druid.manager.segments.useIncrementalCache", "ifSynced") @@ -128,6 +138,20 @@ public void stop() return cluster; } + public static Stream getCompactionSupervisorTestParams() + { + return Stream.of( + Arguments.of( + CompactionEngine.NATIVE, + new NewestSegmentFirstPolicy(null) + ), + Arguments.of( + CompactionEngine.MSQ, + new MostFragmentedIntervalFirstPolicy(1, HumanReadableBytes.valueOf(1), null, 80, null) + ) + ); + } + @Test @Timeout(20) public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues() @@ -176,9 +200,13 @@ public void test_ingest10kRows_ofSelfClusterMetrics_andVerifyValues() cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec()); } - @Test + @MethodSource("getCompactionSupervisorTestParams") + @ParameterizedTest(name = "engine={0}, policy={1}") @Timeout(120) - public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkipKillOfUnusedSegments() + public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkipKillOfUnusedSegments( + CompactionEngine engine, + CompactionCandidateSearchPolicy policy + ) { final int maxRowsPerSegment = 500; final int compactedMaxRowsPerSegment = 5000; @@ -213,7 +241,7 @@ public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkip ); final ClusterCompactionConfig updatedCompactionConfig - = new ClusterCompactionConfig(1.0, 10, null, true, null, null); + = new ClusterCompactionConfig(1.0, 10, policy, true, engine, null); final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord( o -> o.updateClusterCompactionConfig(updatedCompactionConfig) ); @@ -237,6 +265,7 @@ public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkip overlord.latchableEmitter().waitForEventAggregate( event -> event.hasMetricName("task/run/time") .hasDimension(DruidMetrics.TASK_TYPE, "compact") + .hasDimension(DruidMetrics.DATASOURCE, dataSource) .hasDimension(DruidMetrics.TASK_STATUS, "SUCCESS"), agg -> agg.hasCountAtLeast(10) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java index 4eae2670626e..1a8dd95179b5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionConfigBasedJobTemplate.java @@ -28,9 +28,9 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.server.compaction.CompactionCandidate; -import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy; import org.apache.druid.server.compaction.CompactionSlotManager; import org.apache.druid.server.compaction.DataSourceCompactibleSegmentIterator; +import org.apache.druid.server.compaction.Eligibility; import org.apache.druid.server.compaction.NewestSegmentFirstPolicy; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.duty.CompactSegments; @@ -94,7 +94,7 @@ public List createCompactionJobs( // Create a job for each CompactionCandidate while (segmentIterator.hasNext()) { final CompactionCandidate candidate = segmentIterator.next(); - final CompactionCandidateSearchPolicy.Eligibility eligibility = + final Eligibility eligibility = params.getClusterCompactionConfig() .getCompactionPolicy() .checkEligibilityForCompaction(candidate, params.getLatestTaskStatus(candidate)); @@ -126,7 +126,7 @@ public List createCompactionJobs( ); ClientCompactionTaskQuery taskPayload = CompactSegments.createCompactionTask( finalCandidate, - eligibility.getMode(), + eligibility, finalConfig, engine, indexingStateFingerprint, @@ -138,7 +138,8 @@ public List createCompactionJobs( finalCandidate, CompactionSlotManager.computeSlotsRequiredForTask(taskPayload), indexingStateFingerprint, - compactionState + compactionState, + eligibility ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJob.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJob.java index 7611485bd6d8..a64abb3b2a02 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJob.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJob.java @@ -21,8 +21,8 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.indexing.template.BatchIndexingJob; -import org.apache.druid.query.http.ClientSqlQuery; import org.apache.druid.server.compaction.CompactionCandidate; +import org.apache.druid.server.compaction.Eligibility; import org.apache.druid.timeline.CompactionState; /** @@ -34,32 +34,20 @@ public class CompactionJob extends BatchIndexingJob private final int maxRequiredTaskSlots; private final String targetIndexingStateFingerprint; private final CompactionState targetIndexingState; + private final Eligibility eligibility; public CompactionJob( ClientCompactionTaskQuery task, CompactionCandidate candidate, int maxRequiredTaskSlots, String targetIndexingStateFingerprint, - CompactionState targetIndexingState + CompactionState targetIndexingState, + Eligibility eligibility ) { super(task, null); this.candidate = candidate; - this.maxRequiredTaskSlots = maxRequiredTaskSlots; - this.targetIndexingStateFingerprint = targetIndexingStateFingerprint; - this.targetIndexingState = targetIndexingState; - } - - public CompactionJob( - ClientSqlQuery msqQuery, - CompactionCandidate candidate, - int maxRequiredTaskSlots, - String targetIndexingStateFingerprint, - CompactionState targetIndexingState - ) - { - super(null, msqQuery); - this.candidate = candidate; + this.eligibility = eligibility; this.maxRequiredTaskSlots = maxRequiredTaskSlots; this.targetIndexingStateFingerprint = targetIndexingStateFingerprint; this.targetIndexingState = targetIndexingState; @@ -90,6 +78,11 @@ public CompactionState getTargetIndexingState() return targetIndexingState; } + public Eligibility getEligibility() + { + return eligibility; + } + @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java index 1dc29cfde6a8..313338012251 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java @@ -64,7 +64,6 @@ import java.util.Map; import java.util.PriorityQueue; import java.util.Set; -import java.util.stream.Collectors; /** * Iterates over all eligible compaction jobs in order of their priority. @@ -198,7 +197,7 @@ public void removeJobs(String dataSource) final List jobsToRemove = queue .stream() .filter(job -> job.getDataSource().equals(dataSource)) - .collect(Collectors.toList()); + .toList(); queue.removeAll(jobsToRemove); log.info("Removed [%d] jobs for datasource[%s] from queue.", jobsToRemove.size(), dataSource); @@ -221,7 +220,10 @@ public void runReadyJobs() while (!queue.isEmpty()) { final CompactionJob job = queue.poll(); if (startJobIfPendingAndReady(job, pendingJobs, slotManager)) { - runStats.add(Stats.Compaction.SUBMITTED_TASKS, RowKey.of(Dimension.DATASOURCE, job.getDataSource()), 1); + final RowKey rowKey = RowKey + .with(Dimension.DATASOURCE, job.getDataSource()) + .and(Dimension.DESCRIPTION, job.getEligibility().getMode().name()); + runStats.add(Stats.Compaction.SUBMITTED_TASKS, rowKey, 1); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index aa9b006dd8ae..807f67bdb092 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -193,13 +193,14 @@ private void assertQueryToTask(ClientCompactionTaskQuery query, CompactionTask t { Assert.assertEquals(query.getId(), task.getId()); Assert.assertEquals(query.getDataSource(), task.getDataSource()); + Assert.assertTrue(query.getIoConfig().getInputSpec() instanceof ClientCompactionIntervalSpec); Assert.assertTrue(task.getIoConfig().getInputSpec() instanceof CompactionIntervalSpec); Assert.assertEquals( query.getIoConfig().getInputSpec().getInterval(), ((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getInterval() ); Assert.assertEquals( - query.getIoConfig().getInputSpec().getSha256OfSortedSegmentIds(), + ((ClientCompactionIntervalSpec) query.getIoConfig().getInputSpec()).getSha256OfSortedSegmentIds(), ((CompactionIntervalSpec) task.getIoConfig().getInputSpec()).getSha256OfSortedSegmentIds() ); Assert.assertEquals( @@ -301,7 +302,7 @@ private ClientCompactionTaskQuery createCompactionTaskQuery(String id, Compactio id, "datasource", new ClientCompactionIOConfig( - new ClientCompactionIntervalSpec(Intervals.of("2019/2020"), null, "testSha256OfSortedSegmentIds"), true + new ClientCompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true ), new ClientCompactionTaskQueryTuningConfig( 100, diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java index 348d9d22da00..4585995e572e 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java @@ -35,12 +35,12 @@ public class ClientCompactionIOConfig { private static final String TYPE = "compact"; - private final ClientCompactionIntervalSpec inputSpec; + private final ClientCompactionInputSpec inputSpec; private final boolean dropExisting; @JsonCreator public ClientCompactionIOConfig( - @JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec, + @JsonProperty("inputSpec") ClientCompactionInputSpec inputSpec, @JsonProperty("dropExisting") @Nullable Boolean dropExisting ) { @@ -55,7 +55,7 @@ public String getType() } @JsonProperty - public ClientCompactionIntervalSpec getInputSpec() + public ClientCompactionInputSpec getInputSpec() { return inputSpec; } diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java new file mode 100644 index 000000000000..9f13763612e4 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionInputSpec.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.indexing; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.joda.time.Interval; + +/** + * Client side equivalent of {@code CompactionInputSpec}. Required since the + * {@code CompactionInputSpec} resides in {@code indexing-service} module. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = ClientCompactionIntervalSpec.TYPE, value = ClientCompactionIntervalSpec.class), + @JsonSubTypes.Type(name = ClientMinorCompactionInputSpec.TYPE, value = ClientMinorCompactionInputSpec.class) +}) +public interface ClientCompactionInputSpec +{ + /** + * @return non-null Interval that this input spec operates on. + */ + Interval getInterval(); +} diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java index 366ad089069a..f14dd55b38f3 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpec.java @@ -20,37 +20,29 @@ package org.apache.druid.client.indexing; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.IAE; -import org.apache.druid.query.SegmentDescriptor; import org.joda.time.Interval; import javax.annotation.Nullable; -import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; /** * InputSpec for {@link ClientCompactionIOConfig}. *

* Should be synchronized with org.apache.druid.indexing.common.task.CompactionIntervalSpec and org.apache.druid.indexing.common.task.UncompactedInputSpec. */ -public class ClientCompactionIntervalSpec +public class ClientCompactionIntervalSpec implements ClientCompactionInputSpec { - private static final String TYPE_ALL_SEGMENTS = "interval"; - private static final String TYPE_UNCOMPACTED_SEGMENTS_ONLY = "uncompacted"; + public static final String TYPE = "interval"; private final Interval interval; @Nullable - private final List uncompactedSegments; - @Nullable private final String sha256OfSortedSegmentIds; @JsonCreator public ClientCompactionIntervalSpec( @JsonProperty("interval") Interval interval, - @JsonProperty("uncompactedSegments") @Nullable List uncompactedSegments, @JsonProperty("sha256OfSortedSegmentIds") @Nullable String sha256OfSortedSegmentIds ) { @@ -58,45 +50,16 @@ public ClientCompactionIntervalSpec( throw new IAE("Interval[%s] is empty, must specify a nonempty interval", interval); } this.interval = interval; - if (uncompactedSegments == null) { - // perform a full compaction - } else if (uncompactedSegments.isEmpty()) { - throw new IAE("Can not supply empty segments as input, please use either null or non-empty segments."); - } else if (interval != null) { - List segmentsNotInInterval = - uncompactedSegments.stream().filter(s -> !interval.contains(s.getInterval())).collect(Collectors.toList()); - if (!segmentsNotInInterval.isEmpty()) { - throw new IAE( - "Can not supply segments outside interval[%s], got segments[%s].", - interval, - segmentsNotInInterval - ); - } - } - this.uncompactedSegments = uncompactedSegments; this.sha256OfSortedSegmentIds = sha256OfSortedSegmentIds; } - @JsonProperty - public String getType() - { - return (uncompactedSegments == null) ? TYPE_ALL_SEGMENTS : TYPE_UNCOMPACTED_SEGMENTS_ONLY; - } - + @Override @JsonProperty public Interval getInterval() { return interval; } - @Nullable - @JsonProperty - @JsonInclude(JsonInclude.Include.NON_NULL) - public List getUncompactedSegments() - { - return uncompactedSegments; - } - @Nullable @JsonProperty public String getSha256OfSortedSegmentIds() @@ -115,14 +78,13 @@ public boolean equals(Object o) } ClientCompactionIntervalSpec that = (ClientCompactionIntervalSpec) o; return Objects.equals(interval, that.interval) && - Objects.equals(uncompactedSegments, that.uncompactedSegments) && Objects.equals(sha256OfSortedSegmentIds, that.sha256OfSortedSegmentIds); } @Override public int hashCode() { - return Objects.hash(interval, uncompactedSegments, sha256OfSortedSegmentIds); + return Objects.hash(interval, sha256OfSortedSegmentIds); } @Override @@ -130,7 +92,6 @@ public String toString() { return "ClientCompactionIntervalSpec{" + "interval=" + interval + - ", uncompactedSegments=" + uncompactedSegments + ", sha256OfSortedSegmentIds='" + sha256OfSortedSegmentIds + '\'' + '}'; } diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java new file mode 100644 index 000000000000..751ec8d462a1 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientMinorCompactionInputSpec.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.indexing; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.SegmentDescriptor; +import org.joda.time.Interval; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Client-side equivalent of {@code MinorCompactionInputSpec}. + */ +public class ClientMinorCompactionInputSpec extends ClientCompactionIntervalSpec +{ + public static final String TYPE = "uncompacted"; + + private final List uncompactedSegments; + + @JsonCreator + public ClientMinorCompactionInputSpec( + @JsonProperty("interval") Interval interval, + @JsonProperty("uncompactedSegments") List uncompactedSegments + ) + { + super(interval, null); + if (uncompactedSegments == null || uncompactedSegments.isEmpty()) { + throw InvalidInput.exception("'uncompactedSegments' must be non-empty."); + } else if (interval != null) { + List segmentsNotInInterval = + uncompactedSegments.stream().filter(s -> !interval.contains(s.getInterval())).collect(Collectors.toList()); + if (!segmentsNotInInterval.isEmpty()) { + throw new IAE( + "Can not supply segments outside interval[%s], got segments[%s].", + interval, + segmentsNotInInterval + ); + } + } + this.uncompactedSegments = uncompactedSegments; + } + + @JsonProperty + public List getUncompactedSegments() + { + return uncompactedSegments; + } + + @Override + public boolean equals(Object object) + { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + if (!super.equals(object)) { + return false; + } + ClientMinorCompactionInputSpec that = (ClientMinorCompactionInputSpec) object; + return Objects.equals(uncompactedSegments, that.uncompactedSegments); + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), uncompactedSegments); + } + + @Override + public String toString() + { + return "ClientMinorCompactionInputSpec{" + + "interval=" + getInterval() + + ",uncompactedSegments=" + uncompactedSegments + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java index 2a9107132623..e47d8197f8bb 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/BaseCandidateSearchPolicy.java @@ -73,7 +73,7 @@ public Eligibility checkEligibilityForCompaction( CompactionTaskStatus latestTaskStatus ) { - return Eligibility.OK; + return Eligibility.FULL; } /** diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java index 44889fb7e10c..f08997a2c6a2 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionCandidateSearchPolicy.java @@ -21,12 +21,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.coordinator.duty.CompactSegments; -import javax.annotation.Nullable; -import java.util.Objects; - /** * Policy used by {@link CompactSegments} duty to pick segments for compaction. */ @@ -53,89 +49,11 @@ public interface CompactionCandidateSearchPolicy * in the current iteration. A policy may implement this method to skip * compacting intervals or segments that do not fulfil some required criteria. * - * @return {@link Eligibility#OK} only if eligible. + * @return {@link Eligibility#FULL} only if eligible. */ Eligibility checkEligibilityForCompaction( CompactionCandidate candidate, CompactionTaskStatus latestTaskStatus ); - /** - * Describes the eligibility of an interval for compaction. - */ - class Eligibility - { - public static final Eligibility OK = new Eligibility(true, null, CompactionMode.ALL_SEGMENTS); - - private final boolean eligible; - private final String reason; - @Nullable - private final CompactionMode mode; - - private Eligibility(boolean eligible, String reason, @Nullable CompactionMode mode) - { - this.eligible = eligible; - this.reason = reason; - this.mode = mode; - } - - public boolean isEligible() - { - return eligible; - } - - public String getReason() - { - return reason; - } - - /** - * The mode of compaction (full or minor). This is non-null only when the - * candidate is considered to be eligible for compaction by the policy. - */ - @Nullable - public CompactionMode getMode() - { - return mode; - } - - public static Eligibility fail(String messageFormat, Object... args) - { - return new Eligibility(false, StringUtils.format(messageFormat, args), null); - } - - public static Eligibility incremental(String messageFormat, Object... args) - { - return new Eligibility(true, StringUtils.format(messageFormat, args), CompactionMode.UNCOMPACTED_SEGMENTS_ONLY); - } - - @Override - public boolean equals(Object object) - { - if (this == object) { - return true; - } - if (object == null || getClass() != object.getClass()) { - return false; - } - Eligibility that = (Eligibility) object; - return eligible == that.eligible && Objects.equals(reason, that.reason) && Objects.equals(mode, that.mode); - } - - @Override - public int hashCode() - { - return Objects.hash(eligible, reason, mode); - } - - @Override - public String toString() - { - return "Eligibility{" + - "eligible=" + eligible + - ", reason='" + reason + - ", mode='" + mode + '\'' + - '}'; - } - } } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java index cdd52a4f917c..a5933a9450bd 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java @@ -98,7 +98,7 @@ public CompactionStatus computeCompactionStatus( } // Skip intervals that have been filtered out by the policy - final CompactionCandidateSearchPolicy.Eligibility eligibility + final Eligibility eligibility = searchPolicy.checkEligibilityForCompaction(candidate, lastTaskStatus); if (eligibility.isEligible()) { return CompactionStatus.pending("Not compacted yet"); diff --git a/server/src/main/java/org/apache/druid/server/compaction/Eligibility.java b/server/src/main/java/org/apache/druid/server/compaction/Eligibility.java new file mode 100644 index 000000000000..90743308694d --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/compaction/Eligibility.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.compaction; + +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.StringUtils; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Describes the eligibility of an interval for compaction. + */ +public class Eligibility +{ + /** + * Denotes that the candidate interval is eligible for a + * {@link CompactionMode#ALL_SEGMENTS full} compaction. + */ + public static final Eligibility FULL = new Eligibility(true, null, CompactionMode.ALL_SEGMENTS); + + private final boolean eligible; + private final String reason; + @Nullable + private final CompactionMode mode; + + private Eligibility(boolean eligible, String reason, @Nullable CompactionMode mode) + { + this.eligible = eligible; + this.reason = reason; + this.mode = mode; + } + + public boolean isEligible() + { + return eligible; + } + + public String getReason() + { + return reason; + } + + /** + * The mode of compaction (full or minor). This is non-null only when the + * candidate is considered to be eligible for compaction by the policy. + * + * @throws DruidException if {@link #isEligible()} returns false. + */ + public CompactionMode getMode() + { + if (!isEligible()) { + throw DruidException.defensive("Cannot get mode since interval is not eligible for compaction"); + } + return mode; + } + + /** + * Denotes that the candidate interval is not eligible for compaction. + */ + public static Eligibility fail(String messageFormat, Object... args) + { + return new Eligibility(false, StringUtils.format(messageFormat, args), null); + } + + /** + * @return {@code Eligibility} denoting that the candidate interval is + * eligible for a {@link CompactionMode#UNCOMPACTED_SEGMENTS_ONLY minor} + * compaction. + */ + public static Eligibility minor(String messageFormat, Object... args) + { + return new Eligibility(true, StringUtils.format(messageFormat, args), CompactionMode.UNCOMPACTED_SEGMENTS_ONLY); + } + + @Override + public boolean equals(Object object) + { + if (this == object) { + return true; + } + if (object == null || getClass() != object.getClass()) { + return false; + } + Eligibility that = (Eligibility) object; + return eligible == that.eligible && Objects.equals(reason, that.reason) && Objects.equals(mode, that.mode); + } + + @Override + public int hashCode() + { + return Objects.hash(eligible, reason, mode); + } + + @Override + public String toString() + { + return "Eligibility{" + + "eligible=" + eligible + + ", reason='" + reason + + ", mode='" + mode + '\'' + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java index 24a2f001afe3..c4b96aceaca4 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/FixedIntervalOrderPolicy.java @@ -62,7 +62,7 @@ public Eligibility checkEligibilityForCompaction( ) { return findIndex(candidate) < Integer.MAX_VALUE - ? Eligibility.OK + ? Eligibility.FULL : Eligibility.fail("Datasource/Interval is not in the list of 'eligibleCandidates'"); } diff --git a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java index 90d3ce40471e..001525a2a894 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java +++ b/server/src/main/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicy.java @@ -191,7 +191,7 @@ public Eligibility checkEligibilityForCompaction( { final CompactionStatistics uncompacted = candidate.getUncompactedStats(); if (uncompacted == null) { - return Eligibility.OK; + return Eligibility.FULL; } else if (uncompacted.getNumSegments() < 1) { return Eligibility.fail("No uncompacted segments in interval"); } else if (uncompacted.getNumSegments() < minUncompactedCount) { @@ -218,13 +218,13 @@ public Eligibility checkEligibilityForCompaction( (uncompacted.getTotalBytes() + candidate.getCompactedStats().getTotalBytes()) * 100; if (uncompactedBytesRatio < minUncompactedBytesPercentForFullCompaction) { - return Eligibility.incremental( + return Eligibility.minor( "Uncompacted bytes ratio[%.2f] is below threshold[%d]", uncompactedBytesRatio, minUncompactedBytesPercentForFullCompaction ); } else { - return Eligibility.OK; + return Eligibility.FULL; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 108e74b1921b..e60c512e6a5b 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -25,12 +25,14 @@ import com.google.common.base.Preconditions; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.indexing.ClientCompactionIOConfig; +import org.apache.druid.client.indexing.ClientCompactionInputSpec; import org.apache.druid.client.indexing.ClientCompactionIntervalSpec; import org.apache.druid.client.indexing.ClientCompactionRunnerInfo; import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec; import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec; import org.apache.druid.client.indexing.ClientCompactionTaskQuery; import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig; +import org.apache.druid.client.indexing.ClientMinorCompactionInputSpec; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.impl.AggregateProjectionSpec; @@ -51,6 +53,7 @@ import org.apache.druid.server.compaction.CompactionSnapshotBuilder; import org.apache.druid.server.compaction.CompactionStatus; import org.apache.druid.server.compaction.CompactionStatusTracker; +import org.apache.druid.server.compaction.Eligibility; import org.apache.druid.server.compaction.PriorityBasedCompactionSegmentIterator; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -86,6 +89,8 @@ public class CompactSegments implements CoordinatorCustomDuty public static final String INDEXING_STATE_FINGERPRINT_KEY = "indexingStateFingerprint"; private static final String COMPACTION_REASON_KEY = "compactionReason"; + private static final String COMPACTION_MODE_KEY = "compactionMode"; + private static final String COMPACTION_POLICY_RESULT = "compactionPolicyResult"; private static final Logger LOG = new Logger(CompactSegments.class); @@ -265,7 +270,7 @@ private int submitCompactionTasks( final ClientCompactionTaskQuery taskPayload = createCompactionTask( entry, - CompactionMode.ALL_SEGMENTS, + Eligibility.FULL, config, defaultEngine, null, @@ -295,7 +300,7 @@ private int submitCompactionTasks( */ public static ClientCompactionTaskQuery createCompactionTask( CompactionCandidate candidate, - CompactionMode compactionMode, + Eligibility eligibility, DataSourceCompactionConfig config, CompactionEngine defaultEngine, String indexingStateFingerprint, @@ -385,7 +390,7 @@ public static ClientCompactionTaskQuery createCompactionTask( return compactSegments( candidate, - compactionMode, + eligibility, config.getTaskPriority(), ClientCompactionTaskQueryTuningConfig.from( config.getTuningConfig(), @@ -445,7 +450,7 @@ public Map getAutoCompactionSnapshot() private static ClientCompactionTaskQuery compactSegments( CompactionCandidate entry, - CompactionMode compactionMode, + Eligibility eligibility, int compactionTaskPriority, ClientCompactionTaskQueryTuningConfig tuningConfig, ClientCompactionTaskGranularitySpec granularitySpec, @@ -469,18 +474,27 @@ private static ClientCompactionTaskQuery compactSegments( context.put("priority", compactionTaskPriority); - final String taskId = IdUtils.newTaskId(TASK_ID_PREFIX, ClientCompactionTaskQuery.TYPE, dataSource, null); - final ClientCompactionIntervalSpec inputSpec; + final CompactionMode compactionMode = eligibility.getMode(); + context.put(COMPACTION_MODE_KEY, compactionMode); + if (eligibility.getReason() != null) { + context.put(COMPACTION_POLICY_RESULT, eligibility.getReason()); + } + + String taskIdPrefix = compactionMode == CompactionMode.UNCOMPACTED_SEGMENTS_ONLY + ? TASK_ID_PREFIX + "-minor" + : TASK_ID_PREFIX; + final String taskId = IdUtils.newTaskId(taskIdPrefix, ClientCompactionTaskQuery.TYPE, dataSource, null); + final ClientCompactionInputSpec inputSpec; switch (compactionMode) { case ALL_SEGMENTS: - inputSpec = new ClientCompactionIntervalSpec(entry.getCompactionInterval(), null, null); + inputSpec = new ClientCompactionIntervalSpec(entry.getCompactionInterval(), null); break; case UNCOMPACTED_SEGMENTS_ONLY: List uncompacted = entry.getUncompactedSegments() .stream() .map(DataSegment::toDescriptor) .toList(); - inputSpec = new ClientCompactionIntervalSpec(entry.getCompactionInterval(), uncompacted, null); + inputSpec = new ClientMinorCompactionInputSpec(entry.getCompactionInterval(), uncompacted); break; default: throw DruidException.defensive("unexpected compaction mode[%s]", compactionMode); diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java index 0ab3e0593612..7e116a271eb0 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionIntervalSpecTest.java @@ -21,9 +21,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import org.apache.druid.error.DruidException; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.SegmentDescriptor; @@ -121,19 +121,30 @@ public void testFromSegmentWithFinerSegmentGranularityAndUmbrellaIntervalNotAlig } @Test - public void testClientCompactionIntervalSpec_throwsException_whenEmptySegmentsList() + public void testClientMinorCompactionInputSpec_throwsException_whenEmptySegmentsList() { Interval interval = Intervals.of("2015-04-11/2015-04-12"); - List emptySegments = List.of(); - Assert.assertThrows( - IAE.class, - () -> new ClientCompactionIntervalSpec(interval, emptySegments, null) + DruidException.class, + () -> new ClientMinorCompactionInputSpec(interval, null) ); } @Test public void testClientCompactionIntervalSpec_serde() throws Exception + { + ObjectMapper mapper = new DefaultObjectMapper(); + Interval interval = Intervals.of("2015-04-11/2015-04-12"); + + // Test without uncompactedSegments (full compaction) + ClientCompactionIntervalSpec withoutSegments = new ClientCompactionIntervalSpec(interval, null); + String json2 = mapper.writeValueAsString(withoutSegments); + ClientCompactionIntervalSpec deserialized2 = mapper.readValue(json2, ClientCompactionIntervalSpec.class); + Assert.assertEquals(withoutSegments, deserialized2); + } + + @Test + public void testClientMinorCompactionInputSpec_serde() throws Exception { ObjectMapper mapper = new DefaultObjectMapper(); Interval interval = Intervals.of("2015-04-11/2015-04-12"); @@ -142,17 +153,11 @@ public void testClientCompactionIntervalSpec_serde() throws Exception ); // Test with uncompactedSegments (minor compaction) - ClientCompactionIntervalSpec withSegments = new ClientCompactionIntervalSpec(interval, segments, "sha256hash"); + ClientCompactionInputSpec withSegments = new ClientMinorCompactionInputSpec(interval, segments); String json1 = mapper.writeValueAsString(withSegments); - ClientCompactionIntervalSpec deserialized1 = mapper.readValue(json1, ClientCompactionIntervalSpec.class); + ClientCompactionInputSpec deserialized1 = mapper.readValue(json1, ClientCompactionIntervalSpec.class); + Assert.assertTrue(deserialized1 instanceof ClientMinorCompactionInputSpec); Assert.assertEquals(withSegments, deserialized1); - Assert.assertEquals(segments, deserialized1.getUncompactedSegments()); - - // Test without uncompactedSegments (full compaction) - ClientCompactionIntervalSpec withoutSegments = new ClientCompactionIntervalSpec(interval, null, null); - String json2 = mapper.writeValueAsString(withoutSegments); - ClientCompactionIntervalSpec deserialized2 = mapper.readValue(json2, ClientCompactionIntervalSpec.class); - Assert.assertEquals(withoutSegments, deserialized2); - Assert.assertNull(deserialized2.getUncompactedSegments()); + Assert.assertEquals(segments, ((ClientMinorCompactionInputSpec) deserialized1).getUncompactedSegments()); } } diff --git a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java index f242cec16805..53e027a93d9a 100644 --- a/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java +++ b/server/src/test/java/org/apache/druid/server/compaction/MostFragmentedIntervalFirstPolicyTest.java @@ -60,13 +60,13 @@ public void test_checkEligibilityForCompaction_fails_ifUncompactedCountLessThanC ); Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.fail( + Eligibility.fail( "Uncompacted segments[1] in interval must be at least [10,000]" ), policy.checkEligibilityForCompaction(createCandidate(1, 100L), null) ); Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.OK, + Eligibility.FULL, policy.checkEligibilityForCompaction(createCandidate(10_001, 100L), null) ); } @@ -84,13 +84,13 @@ public void test_checkEligibilityForCompaction_fails_ifUncompactedBytesLessThanC ); Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.fail( + Eligibility.fail( "Uncompacted bytes[100] in interval must be at least [10,000]" ), policy.checkEligibilityForCompaction(createCandidate(1, 100L), null) ); Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.OK, + Eligibility.FULL, policy.checkEligibilityForCompaction(createCandidate(100, 10_000L), null) ); } @@ -108,13 +108,13 @@ public void test_checkEligibilityForCompaction_fails_ifAvgSegmentSizeGreaterThan ); Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.fail( + Eligibility.fail( "Average size[10,000] of uncompacted segments in interval must be at most [100]" ), policy.checkEligibilityForCompaction(createCandidate(1, 10_000L), null) ); Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.OK, + Eligibility.FULL, policy.checkEligibilityForCompaction(createCandidate(1, 100L), null) ); } @@ -256,7 +256,7 @@ public void test_compactionMode_returnsMinorCompactionMode_whenPercentageBelowTh final CompactionStatus status = CompactionStatus.pending(compacted, uncompacted, List.of(SEGMENT), ""); final CompactionCandidate candidate = CompactionCandidate.from(List.of(SEGMENT), null, status); - final CompactionCandidateSearchPolicy.Eligibility eligibility = + final Eligibility eligibility = policy.checkEligibilityForCompaction(candidate, null); Assertions.assertEquals(CompactionMode.UNCOMPACTED_SEGMENTS_ONLY, eligibility.getMode()); @@ -283,7 +283,7 @@ public void test_compactionMode_returnsFullCompaction_whenPercentageAboveThresho "" ); final CompactionCandidate candidate = CompactionCandidate.from(List.of(SEGMENT), null, status); - final CompactionCandidateSearchPolicy.Eligibility eligibility = + final Eligibility eligibility = policy.checkEligibilityForCompaction(candidate, null); Assertions.assertEquals(CompactionMode.ALL_SEGMENTS, eligibility.getMode()); @@ -311,7 +311,7 @@ public void test_compactionMode_returnsFullCompaction_whenThresholdIsDefault() "" ); final CompactionCandidate candidate = CompactionCandidate.from(List.of(SEGMENT), null, status); - final CompactionCandidateSearchPolicy.Eligibility eligibility = + final Eligibility eligibility = policy.checkEligibilityForCompaction(candidate, null); Assertions.assertEquals(CompactionMode.ALL_SEGMENTS, eligibility.getMode()); @@ -336,7 +336,7 @@ private CompactionCandidate createCandidate(int numSegments, long avgSizeBytes) private void verifyCandidateIsEligible(CompactionCandidate candidate, MostFragmentedIntervalFirstPolicy policy) { Assertions.assertEquals( - CompactionCandidateSearchPolicy.Eligibility.OK, + Eligibility.FULL, policy.checkEligibilityForCompaction(candidate, null) ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 1e8a8f6ea0ac..b7f01cf57d2e 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -1106,7 +1106,6 @@ public void testCompactWithGranularitySpecConflictWithActiveCompactionTask() new ClientCompactionIOConfig( new ClientCompactionIntervalSpec( Intervals.of("2000/2099"), - null, "testSha256OfSortedSegmentIds" ), null