From 3b1bdf12cd2a40759e7a2959c5dcb34dc1a05200 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Wed, 11 Sep 2024 13:41:51 +0530 Subject: [PATCH 1/8] Allow MSQ engine only for compaction supervisors and add default engine as overlord runtime property. --- .../compact/CompactionSupervisor.java | 16 ++++++++ .../compact/CompactionSupervisorSpec.java | 13 ++++--- .../compact/OverlordCompactionScheduler.java | 7 ++-- .../compact/CompactionSupervisorSpecTest.java | 18 +++++++-- .../OverlordCompactionSchedulerTest.java | 12 +++--- .../http/OverlordCompactionResourceTest.java | 4 +- .../duty/ITAutoCompactionTest.java | 2 +- .../indexing/ClientCompactionRunnerInfo.java | 3 +- .../compaction/CompactionRunSimulator.java | 7 +++- .../coordinator/ClusterCompactionConfig.java | 17 +------- .../CompactionSupervisorConfig.java | 19 +++++++-- .../coordinator/DruidCompactionConfig.java | 18 +-------- .../server/coordinator/DruidCoordinator.java | 4 +- .../coordinator/duty/CompactSegments.java | 5 ++- .../CoordinatorCompactionConfigsResource.java | 39 +++---------------- .../ClientCompactionRunnerInfoTest.java | 2 +- .../CompactionRunSimulatorTest.java | 4 +- ...aSourceCompactionConfigAuditEntryTest.java | 13 +++---- ...DataSourceCompactionConfigHistoryTest.java | 3 +- .../DruidCompactionConfigTest.java | 2 - .../coordinator/duty/CompactSegmentsTest.java | 1 - ...rdinatorCompactionConfigsResourceTest.java | 24 +++++++++--- 22 files changed, 118 insertions(+), 115 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java index d4a3c46ba0b1..65dd7a11fe2f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.compact; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorReport; @@ -55,6 +56,13 @@ public void start() if (supervisorSpec.isSuspended()) { log.info("Suspending compaction for dataSource[%s].", dataSource); scheduler.stopCompaction(dataSource); + } else if (!supervisorSpec.getValidationResult().isValid()) { + log.error( + "Failed to start compaction supervisor for datasource[%s] due to invalid compaction supervisor spec. " + + "Reason[%s].", + dataSource, + supervisorSpec.getValidationResult().getReason() + ); } else { log.info("Starting compaction for dataSource[%s].", dataSource); scheduler.startCompaction(dataSource, supervisorSpec.getSpec()); @@ -76,6 +84,11 @@ public SupervisorReport getStatus() snapshot = AutoCompactionSnapshot.builder(dataSource) .withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED) .build(); + } else if (!supervisorSpec.getValidationResult().isValid()) { + throw InvalidInput.exception( + "Compaction supervisor spec is invalid. Reason[%s].", + supervisorSpec.getValidationResult().getReason() + ); } else { snapshot = scheduler.getCompactionSnapshot(dataSource); } @@ -90,6 +103,8 @@ public SupervisorStateManager.State getState() return State.SCHEDULER_STOPPED; } else if (supervisorSpec.isSuspended()) { return State.SUSPENDED; + } else if (!supervisorSpec.getValidationResult().isValid()) { + return State.INVALID_SPEC; } else { return State.RUNNING; } @@ -132,6 +147,7 @@ public enum State implements SupervisorStateManager.State SCHEDULER_STOPPED(true), RUNNING(true), SUSPENDED(true), + INVALID_SPEC(false), UNHEALTHY(false); private final boolean healthy; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java index 6911f35f96e3..66e54e971cdb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisorSpec.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.common.config.Configs; -import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -40,6 +39,7 @@ public class CompactionSupervisorSpec implements SupervisorSpec private final boolean suspended; private final DataSourceCompactionConfig spec; private final CompactionScheduler scheduler; + private final CompactionConfigValidationResult validationResult; @JsonCreator public CompactionSupervisorSpec( @@ -48,14 +48,10 @@ public CompactionSupervisorSpec( @JacksonInject CompactionScheduler scheduler ) { - final CompactionConfigValidationResult validationResult = scheduler.validateCompactionConfig(spec); - if (!validationResult.isValid()) { - throw InvalidInput.exception("Compaction supervisor 'spec' is invalid. Reason[%s].", validationResult.getReason()); - } - this.spec = spec; this.suspended = Configs.valueOrDefault(suspended, false); this.scheduler = scheduler; + this.validationResult = scheduler.validateCompactionConfig(spec); } @JsonProperty @@ -77,6 +73,11 @@ public String getId() return ID_PREFIX + spec.getDataSource(); } + public CompactionConfigValidationResult getValidationResult() + { + return validationResult; + } + @Override public CompactionSupervisor createSupervisor() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java index e7b6440deb65..0c2c15646f76 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java @@ -202,7 +202,7 @@ public CompactionConfigValidationResult validateCompactionConfig(DataSourceCompa } else { return ClientCompactionRunnerInfo.validateCompactionConfig( compactionConfig, - compactionConfigSupplier.get().getEngine() + supervisorConfig.getDefaultEngine() ); } } @@ -272,7 +272,7 @@ private synchronized void scheduledRun() private synchronized void runCompactionDuty() { final CoordinatorRunStats stats = new CoordinatorRunStats(); - duty.run(getLatestConfig(), getCurrentDatasourceTimelines(), stats); + duty.run(getLatestConfig(), getCurrentDatasourceTimelines(), supervisorConfig.getDefaultEngine(), stats); // Emit stats only if emission period has elapsed if (!sinceStatsEmitted.isRunning() || sinceStatsEmitted.hasElapsed(METRIC_EMISSION_PERIOD)) { @@ -309,7 +309,8 @@ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionCon if (isRunning()) { return new CompactionRunSimulator(statusTracker, overlordClient).simulateRunWithConfig( getLatestConfig().withClusterConfig(updateRequest), - getCurrentDatasourceTimelines() + getCurrentDatasourceTimelines(), + supervisorConfig.getDefaultEngine() ); } else { return new CompactionSimulateResult(Collections.emptyMap()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java index 2e6a1cf8cc37..f80599c583fc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java @@ -82,20 +82,32 @@ public void testSerdeOfSuspendedSpec() } @Test - public void testInvalidSpecThrowsException() + public void testSupervisorWithInvalidSpecThrowsExceptionForStatus() { Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any())) .thenReturn(CompactionConfigValidationResult.failure("bad spec")); final DruidException exception = Assert.assertThrows( DruidException.class, - () -> new CompactionSupervisorSpec(null, false, scheduler) + () -> new CompactionSupervisorSpec( + new DataSourceCompactionConfig.Builder().forDataSource("datasource").build(), + false, + scheduler + ).createSupervisor().getStatus() ); Assert.assertEquals( - "Compaction supervisor 'spec' is invalid. Reason[bad spec].", + "Compaction supervisor spec is invalid. Reason[bad spec].", exception.getMessage() ); } + @Test + public void testInvalidSpecReturnsInvalid() + { + Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any())) + .thenReturn(CompactionConfigValidationResult.failure("bad spec")); + Assert.assertFalse(new CompactionSupervisorSpec(null, false, scheduler).getValidationResult().isValid()); + } + @Test public void testGetIdAndDataSources() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java index 3133e2b9466a..f48c1d87a2b2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java @@ -120,7 +120,7 @@ public void setUp() serviceEmitter = new StubServiceEmitter(); segmentsMetadataManager = new TestSegmentsMetadataManager(); - supervisorConfig = new CompactionSupervisorConfig(true); + supervisorConfig = new CompactionSupervisorConfig(true, null); compactionConfig = DruidCompactionConfig.empty(); coordinatorOverlordServiceConfig = new CoordinatorOverlordServiceConfig(false, null); @@ -149,7 +149,7 @@ private void initScheduler() @Test public void testStartStopWhenSchedulerIsEnabled() { - supervisorConfig = new CompactionSupervisorConfig(true); + supervisorConfig = new CompactionSupervisorConfig(true, null); Assert.assertFalse(scheduler.isRunning()); scheduler.start(); @@ -168,7 +168,7 @@ public void testStartStopWhenSchedulerIsEnabled() @Test public void testStartStopWhenScheduledIsDisabled() { - supervisorConfig = new CompactionSupervisorConfig(false); + supervisorConfig = new CompactionSupervisorConfig(false, null); initScheduler(); Assert.assertFalse(scheduler.isRunning()); @@ -183,7 +183,7 @@ public void testStartStopWhenScheduledIsDisabled() @Test public void testSegmentsAreNotPolledWhenSchedulerIsDisabled() { - supervisorConfig = new CompactionSupervisorConfig(false); + supervisorConfig = new CompactionSupervisorConfig(false, null); initScheduler(); verifySegmentPolling(false); @@ -337,7 +337,7 @@ public void testRunSimulation() ); final CompactionSimulateResult simulateResult = scheduler.simulateRunWithConfigUpdate( - new ClusterCompactionConfig(null, null, null, null, null) + new ClusterCompactionConfig(null, null, null, null) ); Assert.assertEquals(1, simulateResult.getCompactionStates().size()); final Table pendingCompactionTable = simulateResult.getCompactionStates().get(CompactionStatus.State.PENDING); @@ -362,7 +362,7 @@ public void testRunSimulation() scheduler.stopCompaction(TestDataSource.WIKI); final CompactionSimulateResult simulateResultWhenDisabled = scheduler.simulateRunWithConfigUpdate( - new ClusterCompactionConfig(null, null, null, null, null) + new ClusterCompactionConfig(null, null, null, null) ); Assert.assertTrue(simulateResultWhenDisabled.getCompactionStates().isEmpty()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java index b93e6e7c1ac2..d0aba195c423 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordCompactionResourceTest.java @@ -44,9 +44,9 @@ public class OverlordCompactionResourceTest { private static final CompactionSupervisorConfig SUPERVISOR_ENABLED - = new CompactionSupervisorConfig(true); + = new CompactionSupervisorConfig(true, null); private static final CompactionSupervisorConfig SUPERVISOR_DISABLED - = new CompactionSupervisorConfig(false); + = new CompactionSupervisorConfig(false, null); private CompactionScheduler scheduler; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index e95d09bd5082..a1ae1526c8f5 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -118,7 +118,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest @DataProvider(name = "engine") public static Object[][] engine() { - return new Object[][]{{CompactionEngine.NATIVE}, {CompactionEngine.MSQ}}; + return new Object[][]{{CompactionEngine.NATIVE}}; } @Inject diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java index 806b35e94819..49770738610c 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java @@ -135,9 +135,8 @@ public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(Part if (!(partitionsSpec instanceof DimensionRangePartitionsSpec || partitionsSpec instanceof DynamicPartitionsSpec)) { return CompactionConfigValidationResult.failure( - "MSQ: Invalid partitioning type[%s]. Must be either 'dynamic' or 'range'", + "MSQ: Invalid partitioning type[%s]. Must be either dynamic or range", partitionsSpec.getClass().getSimpleName() - ); } if (partitionsSpec instanceof DynamicPartitionsSpec diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java index b51777c9e3b5..e3b1c66fba8e 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionRunSimulator.java @@ -27,6 +27,7 @@ import org.apache.druid.client.indexing.IndexingWorkerInfo; import org.apache.druid.client.indexing.TaskPayloadResponse; import org.apache.druid.client.indexing.TaskStatusResponse; +import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexer.report.TaskReport; @@ -75,7 +76,8 @@ public CompactionRunSimulator( */ public CompactionSimulateResult simulateRunWithConfig( DruidCompactionConfig compactionConfig, - Map datasourceTimelines + Map datasourceTimelines, + CompactionEngine defaultEngine ) { final Table compactedIntervals @@ -138,13 +140,14 @@ public void onTaskSubmitted(ClientCompactionTaskQuery taskPayload, CompactionCan // Unlimited task slots to ensure that simulator does not skip any interval final DruidCompactionConfig configWithUnlimitedTaskSlots = compactionConfig.withClusterConfig( - new ClusterCompactionConfig(1.0, Integer.MAX_VALUE, null, null, null) + new ClusterCompactionConfig(1.0, Integer.MAX_VALUE, null, null) ); final CoordinatorRunStats stats = new CoordinatorRunStats(); new CompactSegments(simulationStatusTracker, readOnlyOverlordClient).run( configWithUnlimitedTaskSlots, datasourceTimelines, + defaultEngine, stats ); diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java index e2b98a32a92c..2e0c070b1ed2 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy; import javax.annotation.Nullable; @@ -37,7 +36,6 @@ public class ClusterCompactionConfig private final Double compactionTaskSlotRatio; private final Integer maxCompactionTaskSlots; private final Boolean useAutoScaleSlots; - private final CompactionEngine engine; private final CompactionCandidateSearchPolicy compactionPolicy; @JsonCreator @@ -45,7 +43,6 @@ public ClusterCompactionConfig( @JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio, @JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots, @JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots, - @JsonProperty("engine") @Nullable CompactionEngine engine, @JsonProperty("compactionPolicy") @Nullable CompactionCandidateSearchPolicy compactionPolicy ) { @@ -53,7 +50,6 @@ public ClusterCompactionConfig( this.maxCompactionTaskSlots = maxCompactionTaskSlots; this.useAutoScaleSlots = useAutoScaleSlots; this.compactionPolicy = compactionPolicy; - this.engine = engine; } @Nullable @@ -77,13 +73,6 @@ public Boolean getUseAutoScaleSlots() return useAutoScaleSlots; } - @Nullable - @JsonProperty - public CompactionEngine getEngine() - { - return engine; - } - @Nullable @JsonProperty public CompactionCandidateSearchPolicy getCompactionPolicy() @@ -104,8 +93,7 @@ public boolean equals(Object o) return Objects.equals(compactionTaskSlotRatio, that.compactionTaskSlotRatio) && Objects.equals(maxCompactionTaskSlots, that.maxCompactionTaskSlots) && Objects.equals(useAutoScaleSlots, that.useAutoScaleSlots) - && Objects.equals(compactionPolicy, that.compactionPolicy) - && engine == that.engine; + && Objects.equals(compactionPolicy, that.compactionPolicy); } @Override @@ -115,8 +103,7 @@ public int hashCode() compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, - compactionPolicy, - engine + compactionPolicy ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java index e738a3e7f0bb..2ed87335af72 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.common.config.Configs; +import org.apache.druid.indexer.CompactionEngine; import javax.annotation.Nullable; import java.util.Objects; @@ -33,10 +34,12 @@ */ public class CompactionSupervisorConfig { - private static final CompactionSupervisorConfig DEFAULT = new CompactionSupervisorConfig(null); + private static final CompactionSupervisorConfig DEFAULT = new CompactionSupervisorConfig(null, null); @JsonProperty private final boolean enabled; + @JsonProperty + private final CompactionEngine defaultEngine; public static CompactionSupervisorConfig defaultConfig() { @@ -45,10 +48,12 @@ public static CompactionSupervisorConfig defaultConfig() @JsonCreator public CompactionSupervisorConfig( - @JsonProperty("enabled") @Nullable Boolean enabled + @JsonProperty("enabled") @Nullable Boolean enabled, + @JsonProperty("defaultEngine") @Nullable CompactionEngine defaultEngine ) { this.enabled = Configs.valueOrDefault(enabled, false); + this.defaultEngine = Configs.valueOrDefault(defaultEngine, CompactionEngine.NATIVE); } public boolean isEnabled() @@ -56,6 +61,11 @@ public boolean isEnabled() return enabled; } + public CompactionEngine getDefaultEngine() + { + return defaultEngine; + } + @Override public boolean equals(Object o) { @@ -66,13 +76,13 @@ public boolean equals(Object o) return false; } CompactionSupervisorConfig that = (CompactionSupervisorConfig) o; - return enabled == that.enabled; + return enabled == that.enabled && defaultEngine == that.defaultEngine; } @Override public int hashCode() { - return Objects.hashCode(enabled); + return Objects.hash(enabled, defaultEngine); } @Override @@ -80,6 +90,7 @@ public String toString() { return "CompactionSchedulerConfig{" + "enabled=" + enabled + + "engine=" + defaultEngine + '}'; } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java index b35ba7d29389..96338f5b2ef6 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import org.apache.druid.common.config.Configs; -import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy; import org.apache.druid.server.compaction.NewestSegmentFirstPolicy; @@ -43,13 +42,12 @@ public class DruidCompactionConfig private static final CompactionCandidateSearchPolicy DEFAULT_COMPACTION_POLICY = new NewestSegmentFirstPolicy(null); private static final DruidCompactionConfig EMPTY_INSTANCE - = new DruidCompactionConfig(Collections.emptyList(), null, null, null, null, null); + = new DruidCompactionConfig(Collections.emptyList(), null, null, null, null); private final List compactionConfigs; private final double compactionTaskSlotRatio; private final int maxCompactionTaskSlots; private final boolean useAutoScaleSlots; - private final CompactionEngine engine; private final CompactionCandidateSearchPolicy compactionPolicy; public DruidCompactionConfig withDatasourceConfigs( @@ -61,7 +59,6 @@ public DruidCompactionConfig withDatasourceConfigs( compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, - engine, compactionPolicy ); } @@ -75,7 +72,6 @@ public DruidCompactionConfig withClusterConfig( Configs.valueOrDefault(update.getCompactionTaskSlotRatio(), compactionTaskSlotRatio), Configs.valueOrDefault(update.getMaxCompactionTaskSlots(), maxCompactionTaskSlots), Configs.valueOrDefault(update.getUseAutoScaleSlots(), useAutoScaleSlots), - Configs.valueOrDefault(update.getEngine(), engine), Configs.valueOrDefault(update.getCompactionPolicy(), compactionPolicy) ); } @@ -98,7 +94,6 @@ public DruidCompactionConfig( @JsonProperty("compactionTaskSlotRatio") @Nullable Double compactionTaskSlotRatio, @JsonProperty("maxCompactionTaskSlots") @Nullable Integer maxCompactionTaskSlots, @JsonProperty("useAutoScaleSlots") @Nullable Boolean useAutoScaleSlots, - @JsonProperty("engine") @Nullable CompactionEngine compactionEngine, @JsonProperty("compactionPolicy") @Nullable CompactionCandidateSearchPolicy compactionPolicy ) { @@ -106,7 +101,6 @@ public DruidCompactionConfig( this.compactionTaskSlotRatio = Configs.valueOrDefault(compactionTaskSlotRatio, 0.1); this.maxCompactionTaskSlots = Configs.valueOrDefault(maxCompactionTaskSlots, Integer.MAX_VALUE); this.useAutoScaleSlots = Configs.valueOrDefault(useAutoScaleSlots, false); - this.engine = Configs.valueOrDefault(compactionEngine, CompactionEngine.NATIVE); this.compactionPolicy = Configs.valueOrDefault(compactionPolicy, DEFAULT_COMPACTION_POLICY); } @@ -134,12 +128,6 @@ public boolean isUseAutoScaleSlots() return useAutoScaleSlots; } - @JsonProperty - public CompactionEngine getEngine() - { - return engine; - } - // Null-safe getters not used for serialization public ClusterCompactionConfig clusterConfig() @@ -148,7 +136,6 @@ public ClusterCompactionConfig clusterConfig() compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, - engine, compactionPolicy ); } @@ -189,7 +176,6 @@ public boolean equals(Object o) return Double.compare(that.compactionTaskSlotRatio, compactionTaskSlotRatio) == 0 && maxCompactionTaskSlots == that.maxCompactionTaskSlots && useAutoScaleSlots == that.useAutoScaleSlots && - engine == that.engine && Objects.equals(compactionPolicy, that.compactionPolicy) && Objects.equals(compactionConfigs, that.compactionConfigs); } @@ -202,7 +188,6 @@ public int hashCode() compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, - engine, compactionPolicy ); } @@ -215,7 +200,6 @@ public String toString() ", compactionTaskSlotRatio=" + compactionTaskSlotRatio + ", maxCompactionTaskSlots=" + maxCompactionTaskSlots + ", useAutoScaleSlots=" + useAutoScaleSlots + - ", engine=" + engine + ", compactionPolicy=" + compactionPolicy + '}'; } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 6f907354f081..262750b527a8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -40,6 +40,7 @@ import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; @@ -362,7 +363,8 @@ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionCon metadataManager.configs().getCurrentCompactionConfig().withClusterConfig(updateRequest), metadataManager.segments() .getSnapshotOfDataSourcesWithAllUsedSegments() - .getUsedSegmentsTimelinesPerDataSource() + .getUsedSegmentsTimelinesPerDataSource(), + CompactionEngine.NATIVE ); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index a2f97f298afc..bf27af6358cb 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -120,9 +120,11 @@ public OverlordClient getOverlordClient() @Override public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) { + // Coordinator supports only native engine for compaction run( params.getCompactionConfig(), params.getUsedSegmentsTimelinesPerDataSource(), + CompactionEngine.NATIVE, params.getCoordinatorStats() ); return params; @@ -131,6 +133,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) public void run( DruidCompactionConfig dynamicConfig, Map dataSources, + CompactionEngine defaultEngine, CoordinatorRunStats stats ) { @@ -234,7 +237,7 @@ public void run( currentRunAutoCompactionSnapshotBuilders, availableCompactionTaskSlots, iterator, - dynamicConfig.getEngine() + defaultEngine ); stats.add(Stats.Compaction.MAX_SLOTS, compactionTaskCapacity); diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java index 8f9dfb9ca853..5215f7de4e26 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java @@ -27,22 +27,20 @@ import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; -import org.apache.druid.client.indexing.ClientCompactionRunnerInfo; import org.apache.druid.common.config.ConfigManager.SetResult; import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import org.apache.druid.error.NotFound; +import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.ClusterCompactionConfig; -import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfigHistory; import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.http.security.ConfigResourceFilter; import org.apache.druid.server.security.AuthorizationUtils; -import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.servlet.http.HttpServletRequest; @@ -100,30 +98,7 @@ public Response updateClusterCompactionConfig( @Context HttpServletRequest req ) { - UnaryOperator operator = current -> { - final DruidCompactionConfig newConfig = current.withClusterConfig(updatePayload); - - final List datasourceConfigs = newConfig.getCompactionConfigs(); - if (CollectionUtils.isNullOrEmpty(datasourceConfigs) - || current.getEngine() == newConfig.getEngine()) { - return newConfig; - } - - // Validate all the datasource configs against the new engine - for (DataSourceCompactionConfig datasourceConfig : datasourceConfigs) { - CompactionConfigValidationResult validationResult = - ClientCompactionRunnerInfo.validateCompactionConfig(datasourceConfig, newConfig.getEngine()); - if (!validationResult.isValid()) { - throw InvalidInput.exception( - "Cannot update engine to [%s] as it does not support" - + " compaction config of DataSource[%s]. Reason[%s].", - newConfig.getEngine(), datasourceConfig.getDataSource(), validationResult.getReason() - ); - } - } - - return newConfig; - }; + UnaryOperator operator = current -> current.withClusterConfig(updatePayload); return updateConfigHelper(operator, AuthorizationUtils.buildAuditInfo(req)); } @@ -146,7 +121,6 @@ public Response setCompactionTaskLimit( compactionTaskSlotRatio, maxCompactionTaskSlots, useAutoScaleSlots, - null, null ), req @@ -161,12 +135,11 @@ public Response addOrUpdateDatasourceCompactionConfig( ) { UnaryOperator callable = current -> { - CompactionConfigValidationResult validationResult = - ClientCompactionRunnerInfo.validateCompactionConfig(newConfig, current.getEngine()); - if (validationResult.isValid()) { - return current.withDatasourceConfig(newConfig); + if (newConfig.getEngine() == CompactionEngine.MSQ) { + throw InvalidInput.exception( + "MSQ engine in compaction config only supported with supervisor-based compaction on the Overlord."); } else { - throw InvalidInput.exception("Compaction config not supported. Reason[%s].", validationResult.getReason()); + return current.withDatasourceConfig(newConfig); } }; return updateConfigHelper( diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java index 011a4640da37..59493b5e4418 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java @@ -64,7 +64,7 @@ public void testMSQEngineWithHashedPartitionsSpecIsInvalid() ); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals( - "MSQ: Invalid partitioning type[HashedPartitionsSpec]. Must be either 'dynamic' or 'range'", + "MSQ: Invalid partitioning type[HashedPartitionsSpec]. Must be either dynamic or range", validationResult.getReason() ); } diff --git a/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java b/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java index 8b5c6bab6c40..aacf4216de78 100644 --- a/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java +++ b/server/src/test/java/org/apache/druid/server/compaction/CompactionRunSimulatorTest.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.druid.client.indexing.NoopOverlordClient; import org.apache.druid.client.indexing.TaskPayloadResponse; +import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.jackson.DefaultObjectMapper; @@ -76,7 +77,8 @@ public void testSimulateClusterCompactionConfigUpdate() DataSourceCompactionConfig.builder().forDataSource("wiki").build() ), segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments() - .getUsedSegmentsTimelinesPerDataSource() + .getUsedSegmentsTimelinesPerDataSource(), + CompactionEngine.NATIVE ); Assert.assertNotNull(simulateResult); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java index bdd028469211..f96ddcd84289 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java @@ -20,7 +20,6 @@ package org.apache.druid.server.coordinator; import org.apache.druid.audit.AuditInfo; -import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.TestDataSource; import org.junit.Assert; @@ -34,7 +33,7 @@ public class DataSourceCompactionConfigAuditEntryTest private final AuditInfo auditInfo = new AuditInfo("author", "identity", "comment", "ip"); private final DataSourceCompactionConfigAuditEntry firstEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.MSQ, null), + new ClusterCompactionConfig(0.1, 9, true, null), DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), auditInfo, DateTimes.nowUtc() @@ -44,7 +43,7 @@ public class DataSourceCompactionConfigAuditEntryTest public void testhasSameConfigWithSameBaseConfigIsTrue() { final DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.MSQ, null), + new ClusterCompactionConfig(0.1, 9, true, null), DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), auditInfo, DateTimes.nowUtc() @@ -57,7 +56,7 @@ public void testhasSameConfigWithSameBaseConfigIsTrue() public void testhasSameConfigWithDifferentClusterConfigIsFalse() { DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, false, CompactionEngine.MSQ, null), + new ClusterCompactionConfig(0.1, 9, false, null), DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), auditInfo, DateTimes.nowUtc() @@ -66,7 +65,7 @@ public void testhasSameConfigWithDifferentClusterConfigIsFalse() Assert.assertFalse(secondEntry.hasSameConfig(firstEntry)); secondEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE, null), + new ClusterCompactionConfig(0.1, 9, true, null), DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), auditInfo, DateTimes.nowUtc() @@ -79,7 +78,7 @@ public void testhasSameConfigWithDifferentClusterConfigIsFalse() public void testhasSameConfigWithDifferentDatasourceConfigIsFalse() { DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE, null), + new ClusterCompactionConfig(0.1, 9, true, null), DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), auditInfo, DateTimes.nowUtc() @@ -92,7 +91,7 @@ public void testhasSameConfigWithDifferentDatasourceConfigIsFalse() public void testhasSameConfigWithNullDatasourceConfigIsFalse() { final DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, true, CompactionEngine.NATIVE, null), + new ClusterCompactionConfig(0.1, 9, true, null), null, auditInfo, DateTimes.nowUtc() diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java index 4426d58b258e..00f50c5eff31 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java @@ -20,7 +20,6 @@ package org.apache.druid.server.coordinator; import org.apache.druid.audit.AuditInfo; -import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.TestDataSource; import org.joda.time.DateTime; @@ -178,7 +177,7 @@ public void testAddAndModifyClusterConfigShouldAddTwice() wikiAuditHistory.add(originalConfig, auditInfo, DateTimes.nowUtc()); final DruidCompactionConfig updatedConfig = originalConfig.withClusterConfig( - new ClusterCompactionConfig(null, null, null, CompactionEngine.MSQ, null) + new ClusterCompactionConfig(null, null, null, null) ); wikiAuditHistory.add(updatedConfig, auditInfo, DateTimes.nowUtc()); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java index c8ed8d9ba530..2f481727fa10 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java @@ -64,7 +64,6 @@ public void testSerdeWithDatasourceConfigs() throws Exception null, null, null, - CompactionEngine.MSQ, null ); @@ -82,7 +81,6 @@ public void testCopyWithClusterConfig() 0.5, 10, false, - CompactionEngine.MSQ, new NewestSegmentFirstPolicy(null) ); final DruidCompactionConfig copy = config.withClusterConfig(clusterConfig); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 8f84e860deae..7a49222e098d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -1888,7 +1888,6 @@ private CoordinatorRunStats doCompactSegments( numCompactionTaskSlots == null ? null : 1.0, // 100% when numCompactionTaskSlots is not null numCompactionTaskSlots, useAutoScaleSlots, - null, null ) ) diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java index 24641c81a156..bec35e8fa9cc 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java @@ -106,14 +106,13 @@ public void testGetDefaultClusterConfig() Assert.assertEquals(Integer.MAX_VALUE, defaultConfig.getMaxCompactionTaskSlots()); Assert.assertFalse(defaultConfig.isUseAutoScaleSlots()); Assert.assertTrue(defaultConfig.getCompactionConfigs().isEmpty()); - Assert.assertEquals(CompactionEngine.NATIVE, defaultConfig.getEngine()); } @Test public void testUpdateClusterConfig() { Response response = resource.updateClusterCompactionConfig( - new ClusterCompactionConfig(0.5, 10, true, CompactionEngine.MSQ, null), + new ClusterCompactionConfig(0.5, 10, true, null), mockHttpServletRequest ); verifyStatus(Response.Status.OK, response); @@ -127,7 +126,6 @@ public void testUpdateClusterConfig() Assert.assertEquals(0.5, updatedConfig.getCompactionTaskSlotRatio(), DELTA); Assert.assertEquals(10, updatedConfig.getMaxCompactionTaskSlots()); Assert.assertTrue(updatedConfig.isUseAutoScaleSlots()); - Assert.assertEquals(CompactionEngine.MSQ, updatedConfig.getEngine()); } @Test @@ -149,7 +147,6 @@ public void testSetCompactionTaskLimit() // Verify that the other fields are unchanged Assert.assertEquals(defaultConfig.getCompactionConfigs(), updatedConfig.getCompactionConfigs()); - Assert.assertEquals(defaultConfig.getEngine(), updatedConfig.getEngine()); } @Test @@ -177,6 +174,23 @@ public void testAddDatasourceConfig() Assert.assertEquals(newDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0)); } + @Test + public void testAddDatasourceConfigWithMSQEngineIsInvalid() + { + final DataSourceCompactionConfig newDatasourceConfig + = DataSourceCompactionConfig.builder() + .forDataSource(TestDataSource.WIKI) + .withEngine(CompactionEngine.MSQ) + .build(); + Response response = resource.addOrUpdateDatasourceCompactionConfig(newDatasourceConfig, mockHttpServletRequest); + verifyStatus(Response.Status.BAD_REQUEST, response); + Assert.assertTrue(response.getEntity() instanceof ErrorResponse); + Assert.assertEquals( + "MSQ engine in compaction config only supported with supervisor-based compaction on the Overlord.", + ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() + ); + } + @Test public void testUpdateDatasourceConfig() { @@ -341,7 +355,7 @@ public void testUpdateEngineToMSQWithInvalidDatasourceConfigThrowsBadRequest() verifyStatus(Response.Status.OK, response); response = resource.updateClusterCompactionConfig( - new ClusterCompactionConfig(null, null, null, CompactionEngine.MSQ, null), + new ClusterCompactionConfig(null, null, null, null), mockHttpServletRequest ); verifyStatus(Response.Status.BAD_REQUEST, response); From 6a7567ef3e6577bd0220995850f85ab6e88e7053 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Wed, 11 Sep 2024 16:53:43 +0530 Subject: [PATCH 2/8] Fix UTs --- ...aSourceCompactionConfigAuditEntryTest.java | 6 ++-- ...DataSourceCompactionConfigHistoryTest.java | 2 +- ...rdinatorCompactionConfigsResourceTest.java | 36 +++---------------- 3 files changed, 9 insertions(+), 35 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java index f96ddcd84289..4647ad8d9853 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigAuditEntryTest.java @@ -56,7 +56,7 @@ public void testhasSameConfigWithSameBaseConfigIsTrue() public void testhasSameConfigWithDifferentClusterConfigIsFalse() { DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, false, null), + new ClusterCompactionConfig(0.2, 9, false, null), DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), auditInfo, DateTimes.nowUtc() @@ -65,7 +65,7 @@ public void testhasSameConfigWithDifferentClusterConfigIsFalse() Assert.assertFalse(secondEntry.hasSameConfig(firstEntry)); secondEntry = new DataSourceCompactionConfigAuditEntry( - new ClusterCompactionConfig(0.1, 9, true, null), + new ClusterCompactionConfig(0.1, 10, true, null), DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), auditInfo, DateTimes.nowUtc() @@ -79,7 +79,7 @@ public void testhasSameConfigWithDifferentDatasourceConfigIsFalse() { DataSourceCompactionConfigAuditEntry secondEntry = new DataSourceCompactionConfigAuditEntry( new ClusterCompactionConfig(0.1, 9, true, null), - DataSourceCompactionConfig.builder().forDataSource(TestDataSource.WIKI).build(), + DataSourceCompactionConfig.builder().forDataSource(TestDataSource.KOALA).build(), auditInfo, DateTimes.nowUtc() ); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java index 00f50c5eff31..bcdee91ecd52 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigHistoryTest.java @@ -177,7 +177,7 @@ public void testAddAndModifyClusterConfigShouldAddTwice() wikiAuditHistory.add(originalConfig, auditInfo, DateTimes.nowUtc()); final DruidCompactionConfig updatedConfig = originalConfig.withClusterConfig( - new ClusterCompactionConfig(null, null, null, null) + new ClusterCompactionConfig(0.2, null, null, null) ); wikiAuditHistory.add(updatedConfig, auditInfo, DateTimes.nowUtc()); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java index bec35e8fa9cc..4cef66d2ac55 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java @@ -223,16 +223,16 @@ public void testUpdateDatasourceConfig() .build(); response = resource.addOrUpdateDatasourceCompactionConfig(updatedDatasourceConfig, mockHttpServletRequest); - verifyStatus(Response.Status.OK, response); + verifyStatus(Response.Status.BAD_REQUEST, response); final DataSourceCompactionConfig latestDatasourceConfig = verifyAndGetPayload(resource.getDatasourceCompactionConfig(TestDataSource.WIKI), DataSourceCompactionConfig.class); - Assert.assertEquals(updatedDatasourceConfig, latestDatasourceConfig); + Assert.assertEquals(originalDatasourceConfig, latestDatasourceConfig); final DruidCompactionConfig fullCompactionConfig = verifyAndGetPayload(resource.getCompactionConfig(), DruidCompactionConfig.class); Assert.assertEquals(1, fullCompactionConfig.getCompactionConfigs().size()); - Assert.assertEquals(updatedDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0)); + Assert.assertEquals(originalDatasourceConfig, fullCompactionConfig.getCompactionConfigs().get(0)); } @Test @@ -299,7 +299,7 @@ public void testGetDatasourceConfigHistory() resource.addOrUpdateDatasourceCompactionConfig(configV2, mockHttpServletRequest); final DataSourceCompactionConfig configV3 = builder - .withEngine(CompactionEngine.MSQ) + .withEngine(CompactionEngine.NATIVE) .withSkipOffsetFromLatest(Period.hours(1)) .build(); resource.addOrUpdateDatasourceCompactionConfig(configV3, mockHttpServletRequest); @@ -337,36 +337,10 @@ public void testAddInvalidDatasourceConfigThrowsBadRequest() verifyStatus(Response.Status.BAD_REQUEST, response); Assert.assertTrue(response.getEntity() instanceof ErrorResponse); Assert.assertEquals( - "Compaction config not supported. Reason[MSQ: Context maxNumTasks[1]" - + " must be at least 2 (1 controller + 1 worker)].", - ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() - ); - } - - @Test - public void testUpdateEngineToMSQWithInvalidDatasourceConfigThrowsBadRequest() - { - final DataSourceCompactionConfig datasourceConfig = DataSourceCompactionConfig - .builder() - .forDataSource(TestDataSource.WIKI) - .withTaskContext(Collections.singletonMap(ClientMSQContext.CTX_MAX_NUM_TASKS, 1)) - .build(); - Response response = resource.addOrUpdateDatasourceCompactionConfig(datasourceConfig, mockHttpServletRequest); - verifyStatus(Response.Status.OK, response); - - response = resource.updateClusterCompactionConfig( - new ClusterCompactionConfig(null, null, null, null), - mockHttpServletRequest - ); - verifyStatus(Response.Status.BAD_REQUEST, response); - Assert.assertTrue(response.getEntity() instanceof ErrorResponse); - Assert.assertEquals( - "Cannot update engine to [msq] as it does not support compaction config of DataSource[wiki]." - + " Reason[MSQ: Context maxNumTasks[1] must be at least 2 (1 controller + 1 worker)].", + "MSQ engine in compaction config only supported with supervisor-based compaction on the Overlord.", ((ErrorResponse) response.getEntity()).getUnderlyingException().getMessage() ); } - @SuppressWarnings("unchecked") private T verifyAndGetPayload(Response response, Class type) { From a4d684fb521fa7d8092dbf876477457ea77b1624 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Thu, 12 Sep 2024 15:32:41 +0530 Subject: [PATCH 3/8] Fix coverage --- .../OverlordCompactionSchedulerTest.java | 15 +++++++++++++++ .../coordinator/DruidCoordinatorTest.java | 18 ++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java index f48c1d87a2b2..fc757c76c384 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java @@ -19,8 +19,10 @@ package org.apache.druid.indexing.compact; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import org.apache.druid.client.indexing.ClientMSQContext; import org.apache.druid.guice.IndexingServiceTuningConfigModule; import org.apache.druid.indexer.CompactionEngine; @@ -146,6 +148,19 @@ private void initScheduler() ); } + @Test + public void testCompactionSupervisorConfigSerde() throws JsonProcessingException + { + boolean enabled = true; + CompactionEngine defaultEngine = CompactionEngine.MSQ; + CompactionSupervisorConfig compactionSupervisorConfig = + OBJECT_MAPPER.readValue( + OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("enabled", enabled, "defaultEngine", defaultEngine)), + CompactionSupervisorConfig.class + ); + Assert.assertEquals(new CompactionSupervisorConfig(enabled, defaultEngine), compactionSupervisorConfig); + } + @Test public void testStartStopWhenSchedulerIsEnabled() { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 1ebeb991ccbb..c59de936afee 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -50,6 +50,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.server.DruidNode; +import org.apache.druid.server.compaction.CompactionSimulateResult; import org.apache.druid.server.compaction.CompactionStatusTracker; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory; @@ -930,6 +931,23 @@ public void testCoordinatorRun_queryFromDeepStorage() throws Exception EasyMock.verify(metadataRuleManager); } + @Test + public void testSimulateRunWithEmptyDatasourceCompactionConfigs() + { + DruidDataSource dataSource = new DruidDataSource("dataSource", Collections.emptyMap()); + DataSourcesSnapshot dataSourcesSnapshot = + new DataSourcesSnapshot(ImmutableMap.of(dataSource.getName(), dataSource.toImmutableDruidDataSource())); + EasyMock + .expect(segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()) + .andReturn(dataSourcesSnapshot) + .anyTimes(); + EasyMock.replay(segmentsMetadataManager); + CompactionSimulateResult result = coordinator.simulateRunWithConfigUpdate( + new ClusterCompactionConfig(0.2, null, null, null) + ); + Assert.assertEquals(Collections.emptyMap(), result.getCompactionStates()); + } + private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch( int latchCount, PathChildrenCache pathChildrenCache, From bb31487db902e79e7adb95ee69cf537822a2f7ca Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Mon, 23 Sep 2024 17:48:46 +0530 Subject: [PATCH 4/8] Address review comments --- .../compact/CompactionSupervisor.java | 16 ++++++++------ .../compact/OverlordCompactionScheduler.java | 6 ++--- .../compact/CompactionSupervisorSpecTest.java | 19 +++++++--------- .../OverlordCompactionSchedulerTest.java | 6 ++--- .../indexing/ClientCompactionRunnerInfo.java | 3 ++- .../coordinator/AutoCompactionSnapshot.java | 22 ++++++++++++++++++- .../CompactionSupervisorConfig.java | 16 +++++++------- .../ClientCompactionRunnerInfoTest.java | 2 +- .../AutoCompactionSnapshotTest.java | 5 ++++- .../CoordinatorCompactionResourceTest.java | 1 + 10 files changed, 60 insertions(+), 36 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java index 65dd7a11fe2f..a00a968b107a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionSupervisor.java @@ -19,13 +19,13 @@ package org.apache.druid.indexing.compact; -import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorReport; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; @@ -57,8 +57,8 @@ public void start() log.info("Suspending compaction for dataSource[%s].", dataSource); scheduler.stopCompaction(dataSource); } else if (!supervisorSpec.getValidationResult().isValid()) { - log.error( - "Failed to start compaction supervisor for datasource[%s] due to invalid compaction supervisor spec. " + log.warn( + "Cannot start compaction supervisor for datasource[%s] since the compaction supervisor spec is invalid. " + "Reason[%s].", dataSource, supervisorSpec.getValidationResult().getReason() @@ -85,10 +85,12 @@ public SupervisorReport getStatus() .withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED) .build(); } else if (!supervisorSpec.getValidationResult().isValid()) { - throw InvalidInput.exception( - "Compaction supervisor spec is invalid. Reason[%s].", - supervisorSpec.getValidationResult().getReason() - ); + snapshot = AutoCompactionSnapshot.builder(dataSource) + .withMessage(StringUtils.format( + "Compaction supervisor spec is invalid. Reason[%s].", + supervisorSpec.getValidationResult().getReason() + )) + .build(); } else { snapshot = scheduler.getCompactionSnapshot(dataSource); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java index 0c2c15646f76..9e2668c81097 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/OverlordCompactionScheduler.java @@ -202,7 +202,7 @@ public CompactionConfigValidationResult validateCompactionConfig(DataSourceCompa } else { return ClientCompactionRunnerInfo.validateCompactionConfig( compactionConfig, - supervisorConfig.getDefaultEngine() + supervisorConfig.getEngine() ); } } @@ -272,7 +272,7 @@ private synchronized void scheduledRun() private synchronized void runCompactionDuty() { final CoordinatorRunStats stats = new CoordinatorRunStats(); - duty.run(getLatestConfig(), getCurrentDatasourceTimelines(), supervisorConfig.getDefaultEngine(), stats); + duty.run(getLatestConfig(), getCurrentDatasourceTimelines(), supervisorConfig.getEngine(), stats); // Emit stats only if emission period has elapsed if (!sinceStatsEmitted.isRunning() || sinceStatsEmitted.hasElapsed(METRIC_EMISSION_PERIOD)) { @@ -310,7 +310,7 @@ public CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionCon return new CompactionRunSimulator(statusTracker, overlordClient).simulateRunWithConfig( getLatestConfig().withClusterConfig(updateRequest), getCurrentDatasourceTimelines(), - supervisorConfig.getDefaultEngine() + supervisorConfig.getEngine() ); } else { return new CompactionSimulateResult(Collections.emptyMap()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java index f80599c583fc..4b21ee7cca61 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CompactionSupervisorSpecTest.java @@ -82,30 +82,27 @@ public void testSerdeOfSuspendedSpec() } @Test - public void testSupervisorWithInvalidSpecThrowsExceptionForStatus() + public void testGetStatusWithInvalidSpec() { Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any())) .thenReturn(CompactionConfigValidationResult.failure("bad spec")); - final DruidException exception = Assert.assertThrows( - DruidException.class, - () -> new CompactionSupervisorSpec( + Assert.assertEquals( + "Compaction supervisor spec is invalid. Reason[bad spec].", new CompactionSupervisorSpec( new DataSourceCompactionConfig.Builder().forDataSource("datasource").build(), false, scheduler - ).createSupervisor().getStatus() - ); - Assert.assertEquals( - "Compaction supervisor spec is invalid. Reason[bad spec].", - exception.getMessage() + ).createSupervisor().getStatus().getPayload().getMessage() ); } @Test - public void testInvalidSpecReturnsInvalid() + public void testGetValidationResultForInvalidSpec() { Mockito.when(scheduler.validateCompactionConfig(ArgumentMatchers.any())) .thenReturn(CompactionConfigValidationResult.failure("bad spec")); - Assert.assertFalse(new CompactionSupervisorSpec(null, false, scheduler).getValidationResult().isValid()); + CompactionConfigValidationResult validationResult = new CompactionSupervisorSpec(null, false, scheduler).getValidationResult(); + Assert.assertFalse(validationResult.isValid()); + Assert.assertEquals("bad spec", validationResult.getReason()); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java index fc757c76c384..d0c9abc59e17 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java @@ -151,11 +151,11 @@ private void initScheduler() @Test public void testCompactionSupervisorConfigSerde() throws JsonProcessingException { - boolean enabled = true; - CompactionEngine defaultEngine = CompactionEngine.MSQ; + final boolean enabled = true; + final CompactionEngine defaultEngine = CompactionEngine.MSQ; CompactionSupervisorConfig compactionSupervisorConfig = OBJECT_MAPPER.readValue( - OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("enabled", enabled, "defaultEngine", defaultEngine)), + OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("enabled", enabled, "engine", defaultEngine)), CompactionSupervisorConfig.class ); Assert.assertEquals(new CompactionSupervisorConfig(enabled, defaultEngine), compactionSupervisorConfig); diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java index 49770738610c..806b35e94819 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java +++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java @@ -135,8 +135,9 @@ public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(Part if (!(partitionsSpec instanceof DimensionRangePartitionsSpec || partitionsSpec instanceof DynamicPartitionsSpec)) { return CompactionConfigValidationResult.failure( - "MSQ: Invalid partitioning type[%s]. Must be either dynamic or range", + "MSQ: Invalid partitioning type[%s]. Must be either 'dynamic' or 'range'", partitionsSpec.getClass().getSimpleName() + ); } if (partitionsSpec instanceof DynamicPartitionsSpec diff --git a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java index d6fa4835b48e..3adcc3460aa9 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.server.compaction.CompactionStatistics; +import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import java.util.Objects; @@ -41,6 +42,8 @@ public enum AutoCompactionScheduleStatus @JsonProperty private final AutoCompactionScheduleStatus scheduleStatus; @JsonProperty + private final String message; + @JsonProperty private final long bytesAwaitingCompaction; @JsonProperty private final long bytesCompacted; @@ -68,6 +71,7 @@ public static Builder builder(String dataSource) public AutoCompactionSnapshot( @JsonProperty("dataSource") @NotNull String dataSource, @JsonProperty("scheduleStatus") @NotNull AutoCompactionScheduleStatus scheduleStatus, + @JsonProperty("message") @Nullable String message, @JsonProperty("bytesAwaitingCompaction") long bytesAwaitingCompaction, @JsonProperty("bytesCompacted") long bytesCompacted, @JsonProperty("bytesSkipped") long bytesSkipped, @@ -81,6 +85,7 @@ public AutoCompactionSnapshot( { this.dataSource = dataSource; this.scheduleStatus = scheduleStatus; + this.message = message; this.bytesAwaitingCompaction = bytesAwaitingCompaction; this.bytesCompacted = bytesCompacted; this.bytesSkipped = bytesSkipped; @@ -104,6 +109,12 @@ public AutoCompactionScheduleStatus getScheduleStatus() return scheduleStatus; } + @Nullable + public String getMessage() + { + return message; + } + public long getBytesAwaitingCompaction() { return bytesAwaitingCompaction; @@ -169,7 +180,8 @@ public boolean equals(Object o) intervalCountCompacted == that.intervalCountCompacted && intervalCountSkipped == that.intervalCountSkipped && dataSource.equals(that.dataSource) && - scheduleStatus == that.scheduleStatus; + scheduleStatus == that.scheduleStatus && + Objects.equals(message, that.message); } @Override @@ -178,6 +190,7 @@ public int hashCode() return Objects.hash( dataSource, scheduleStatus, + message, bytesAwaitingCompaction, bytesCompacted, bytesSkipped, @@ -194,6 +207,7 @@ public static class Builder { private final String dataSource; private AutoCompactionScheduleStatus scheduleStatus; + private String message; private final CompactionStatistics compactedStats = new CompactionStatistics(); private final CompactionStatistics skippedStats = new CompactionStatistics(); @@ -215,6 +229,11 @@ public Builder withStatus(AutoCompactionScheduleStatus status) return this; } + public Builder withMessage(String message) { + this.message = message; + return this; + } + public void incrementWaitingStats(CompactionStatistics entry) { waitingStats.increment(entry); @@ -235,6 +254,7 @@ public AutoCompactionSnapshot build() return new AutoCompactionSnapshot( dataSource, scheduleStatus, + message, waitingStats.getTotalBytes(), compactedStats.getTotalBytes(), skippedStats.getTotalBytes(), diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java index 2ed87335af72..2cc2f0d133f7 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CompactionSupervisorConfig.java @@ -39,7 +39,7 @@ public class CompactionSupervisorConfig @JsonProperty private final boolean enabled; @JsonProperty - private final CompactionEngine defaultEngine; + private final CompactionEngine engine; public static CompactionSupervisorConfig defaultConfig() { @@ -49,11 +49,11 @@ public static CompactionSupervisorConfig defaultConfig() @JsonCreator public CompactionSupervisorConfig( @JsonProperty("enabled") @Nullable Boolean enabled, - @JsonProperty("defaultEngine") @Nullable CompactionEngine defaultEngine + @JsonProperty("engine") @Nullable CompactionEngine engine ) { this.enabled = Configs.valueOrDefault(enabled, false); - this.defaultEngine = Configs.valueOrDefault(defaultEngine, CompactionEngine.NATIVE); + this.engine = Configs.valueOrDefault(engine, CompactionEngine.NATIVE); } public boolean isEnabled() @@ -61,9 +61,9 @@ public boolean isEnabled() return enabled; } - public CompactionEngine getDefaultEngine() + public CompactionEngine getEngine() { - return defaultEngine; + return engine; } @Override @@ -76,13 +76,13 @@ public boolean equals(Object o) return false; } CompactionSupervisorConfig that = (CompactionSupervisorConfig) o; - return enabled == that.enabled && defaultEngine == that.defaultEngine; + return enabled == that.enabled && engine == that.engine; } @Override public int hashCode() { - return Objects.hash(enabled, defaultEngine); + return Objects.hash(enabled, engine); } @Override @@ -90,7 +90,7 @@ public String toString() { return "CompactionSchedulerConfig{" + "enabled=" + enabled + - "engine=" + defaultEngine + + "engine=" + engine + '}'; } } diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java index 59493b5e4418..011a4640da37 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java @@ -64,7 +64,7 @@ public void testMSQEngineWithHashedPartitionsSpecIsInvalid() ); Assert.assertFalse(validationResult.isValid()); Assert.assertEquals( - "MSQ: Invalid partitioning type[HashedPartitionsSpec]. Must be either dynamic or range", + "MSQ: Invalid partitioning type[HashedPartitionsSpec]. Must be either 'dynamic' or 'range'", validationResult.getReason() ); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java index 9a8cd3cc8772..4ba65fe2df8c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java @@ -29,6 +29,7 @@ public class AutoCompactionSnapshotTest public void testAutoCompactionSnapshotBuilder() { final String expectedDataSource = "data"; + final String expectedMessage = "message"; final AutoCompactionSnapshot.Builder builder = AutoCompactionSnapshot.builder(expectedDataSource); // Increment every stat twice @@ -38,7 +39,7 @@ public void testAutoCompactionSnapshotBuilder() builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13)); } - final AutoCompactionSnapshot actual = builder.build(); + final AutoCompactionSnapshot actual = builder.withMessage(expectedMessage).build(); Assert.assertNotNull(actual); Assert.assertEquals(26, actual.getSegmentCountSkipped()); @@ -52,10 +53,12 @@ public void testAutoCompactionSnapshotBuilder() Assert.assertEquals(26, actual.getSegmentCountAwaitingCompaction()); Assert.assertEquals(AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, actual.getScheduleStatus()); Assert.assertEquals(expectedDataSource, actual.getDataSource()); + Assert.assertEquals(expectedMessage, actual.getMessage()); AutoCompactionSnapshot expected = new AutoCompactionSnapshot( expectedDataSource, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, + expectedMessage, 26, 26, 26, diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java index 4a73047d1955..91b348b72f40 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java @@ -50,6 +50,7 @@ public class CoordinatorCompactionResourceTest private final AutoCompactionSnapshot expectedSnapshot = new AutoCompactionSnapshot( dataSourceName, AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, + null, 1, 1, 1, From f42fa0652da5bec580a0d287f4f5bf5aeba6ff1c Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Tue, 24 Sep 2024 10:47:18 +0530 Subject: [PATCH 5/8] fix style --- .../druid/server/coordinator/AutoCompactionSnapshot.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java index 3adcc3460aa9..8aa8882b1cb0 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java @@ -229,7 +229,8 @@ public Builder withStatus(AutoCompactionScheduleStatus status) return this; } - public Builder withMessage(String message) { + public Builder withMessage(String message) + { this.message = message; return this; } From e3c7bb7408287280ca025dd9f9bc61127f70efb9 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Tue, 24 Sep 2024 12:01:34 +0530 Subject: [PATCH 6/8] fix coverage --- .../indexing/compact/OverlordCompactionSchedulerTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java index d0c9abc59e17..edede8045c32 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java @@ -158,7 +158,10 @@ public void testCompactionSupervisorConfigSerde() throws JsonProcessingException OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("enabled", enabled, "engine", defaultEngine)), CompactionSupervisorConfig.class ); - Assert.assertEquals(new CompactionSupervisorConfig(enabled, defaultEngine), compactionSupervisorConfig); + CompactionSupervisorConfig expectedConfig = new CompactionSupervisorConfig(enabled, defaultEngine); + Assert.assertEquals(expectedConfig, compactionSupervisorConfig); + // Superfluous test to meet min branch coverage for CompactionSupervisorSpec. + Assert.assertNotEquals(expectedConfig, "enabled"); } @Test From afcc0c36aadca656d0c23954b8a44a5e694ada18 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Tue, 24 Sep 2024 12:27:59 +0530 Subject: [PATCH 7/8] Clean-up added test --- .../OverlordCompactionSchedulerTest.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java index edede8045c32..d5d12ae98911 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java @@ -158,10 +158,21 @@ public void testCompactionSupervisorConfigSerde() throws JsonProcessingException OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("enabled", enabled, "engine", defaultEngine)), CompactionSupervisorConfig.class ); - CompactionSupervisorConfig expectedConfig = new CompactionSupervisorConfig(enabled, defaultEngine); - Assert.assertEquals(expectedConfig, compactionSupervisorConfig); - // Superfluous test to meet min branch coverage for CompactionSupervisorSpec. - Assert.assertNotEquals(expectedConfig, "enabled"); + Assert.assertEquals(new CompactionSupervisorConfig(enabled, defaultEngine), compactionSupervisorConfig); + } + + @Test + public void testCompactionSupervisorConfigEquality() + { + Assert.assertEquals( + new CompactionSupervisorConfig(true, CompactionEngine.MSQ), + new CompactionSupervisorConfig(true, CompactionEngine.MSQ) + ); + Assert.assertNotEquals( + new CompactionSupervisorConfig(true, CompactionEngine.NATIVE), + new CompactionSupervisorConfig(true, CompactionEngine.MSQ) + ); + Assert.assertNotEquals(new CompactionSupervisorConfig(true, CompactionEngine.NATIVE), "true"); } @Test From 87a6438bacaf66476de852c0f9d685af58158495 Mon Sep 17 00:00:00 2001 From: Vishesh Garg Date: Tue, 24 Sep 2024 14:14:51 +0530 Subject: [PATCH 8/8] Move coverage tests to same module --- .../OverlordCompactionSchedulerTest.java | 29 --------- .../CompactionSupervisorConfigTest.java | 60 +++++++++++++++++++ 2 files changed, 60 insertions(+), 29 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/server/coordinator/CompactionSupervisorConfigTest.java diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java index d5d12ae98911..f48c1d87a2b2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java @@ -19,10 +19,8 @@ package org.apache.druid.indexing.compact; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; import org.apache.druid.client.indexing.ClientMSQContext; import org.apache.druid.guice.IndexingServiceTuningConfigModule; import org.apache.druid.indexer.CompactionEngine; @@ -148,33 +146,6 @@ private void initScheduler() ); } - @Test - public void testCompactionSupervisorConfigSerde() throws JsonProcessingException - { - final boolean enabled = true; - final CompactionEngine defaultEngine = CompactionEngine.MSQ; - CompactionSupervisorConfig compactionSupervisorConfig = - OBJECT_MAPPER.readValue( - OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("enabled", enabled, "engine", defaultEngine)), - CompactionSupervisorConfig.class - ); - Assert.assertEquals(new CompactionSupervisorConfig(enabled, defaultEngine), compactionSupervisorConfig); - } - - @Test - public void testCompactionSupervisorConfigEquality() - { - Assert.assertEquals( - new CompactionSupervisorConfig(true, CompactionEngine.MSQ), - new CompactionSupervisorConfig(true, CompactionEngine.MSQ) - ); - Assert.assertNotEquals( - new CompactionSupervisorConfig(true, CompactionEngine.NATIVE), - new CompactionSupervisorConfig(true, CompactionEngine.MSQ) - ); - Assert.assertNotEquals(new CompactionSupervisorConfig(true, CompactionEngine.NATIVE), "true"); - } - @Test public void testStartStopWhenSchedulerIsEnabled() { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CompactionSupervisorConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CompactionSupervisorConfigTest.java new file mode 100644 index 000000000000..59cc3ecf1718 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/CompactionSupervisorConfigTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +public class CompactionSupervisorConfigTest +{ + private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); + + @Test + public void testCompactionSupervisorConfigSerde() throws JsonProcessingException + { + final boolean enabled = true; + final CompactionEngine defaultEngine = CompactionEngine.MSQ; + CompactionSupervisorConfig compactionSupervisorConfig = + OBJECT_MAPPER.readValue( + OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("enabled", enabled, "engine", defaultEngine)), + CompactionSupervisorConfig.class + ); + Assert.assertEquals(new CompactionSupervisorConfig(enabled, defaultEngine), compactionSupervisorConfig); + } + + @Test + public void testCompactionSupervisorConfigEquality() + { + Assert.assertEquals( + new CompactionSupervisorConfig(true, CompactionEngine.MSQ), + new CompactionSupervisorConfig(true, CompactionEngine.MSQ) + ); + Assert.assertNotEquals( + new CompactionSupervisorConfig(true, CompactionEngine.NATIVE), + new CompactionSupervisorConfig(true, CompactionEngine.MSQ) + ); + Assert.assertNotEquals(new CompactionSupervisorConfig(true, CompactionEngine.NATIVE), "true"); + } +}