From 335e04f69f3ac45d4b32a79c49efbc4104667e4b Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 3 Apr 2025 16:42:49 +0530 Subject: [PATCH 01/15] Do not emit metrics from integration testing client --- .../testsEx/cluster/ZooKeeperClient.java | 85 ------------------- .../druid/testing/guice/DruidTestModule.java | 5 +- 2 files changed, 4 insertions(+), 86 deletions(-) delete mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/ZooKeeperClient.java diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/ZooKeeperClient.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/ZooKeeperClient.java deleted file mode 100644 index 32f6cb1fdfce..000000000000 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/cluster/ZooKeeperClient.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.testsEx.cluster; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.druid.curator.CuratorConfig; -import org.apache.druid.curator.CuratorModule; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.testsEx.config.ResolvedConfig; -import org.apache.druid.testsEx.config.ResolvedService.ResolvedZk; - -import java.util.concurrent.TimeUnit; - -/** - * Test oriented ZooKeeper client. - *

- * Currently contains just enough functionality to verify that - * ZK is ready. - */ -public class ZooKeeperClient -{ - private final ResolvedConfig clusterConfig; - private final ResolvedZk config; - private CuratorFramework curatorFramework; - - public ZooKeeperClient(ResolvedConfig config) - { - this.clusterConfig = config; - this.config = config.zk(); - if (this.config == null) { - throw new ISE("ZooKeeper not configured"); - } - prepare(); - awaitReady(); - } - - private void prepare() - { - CuratorConfig curatorConfig = clusterConfig.toCuratorConfig(); - curatorFramework = CuratorModule.createCurator(curatorConfig); - } - - private void awaitReady() - { - int timeoutSec = config.startTimeoutSecs(); - if (timeoutSec == 0) { - timeoutSec = 5; - } - try { - curatorFramework.start(); - curatorFramework.blockUntilConnected(timeoutSec, TimeUnit.SECONDS); - } - catch (InterruptedException e) { - throw new ISE("ZooKeeper timed out waiting for connect"); - } - } - - public CuratorFramework curator() - { - return curatorFramework; - } - - public void close() - { - curatorFramework.close(); - curatorFramework = null; - } -} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java index be80a5ddc58b..3efea1a09782 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java @@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.emitter.core.LoggingEmitter; import org.apache.druid.java.util.emitter.core.LoggingEmitterConfig; +import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.CredentialedHttpClient; import org.apache.druid.java.util.http.client.HttpClient; @@ -85,6 +86,8 @@ public HttpClient getHttpClient( @ManageLifecycle public ServiceEmitter getServiceEmitter(Supplier config, ObjectMapper jsonMapper) { - return new ServiceEmitter("", "", new LoggingEmitter(config.get(), jsonMapper)); + // Disabling metric emission since no useful metrics are emitted by the integration testing client + // Use a LoggingEmitter here if needed in the future + return new ServiceEmitter("", "", new NoopEmitter()); } } From 631a71df7dcf62bda2f94f77ce1cdc37eeaf884d Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 3 Apr 2025 16:48:51 +0530 Subject: [PATCH 02/15] Remove unused import --- .../java/org/apache/druid/testing/guice/DruidTestModule.java | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java index 3efea1a09782..b86c49eed2e0 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java @@ -32,7 +32,6 @@ import org.apache.druid.guice.annotations.EscalatedClient; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.lifecycle.Lifecycle; -import org.apache.druid.java.util.emitter.core.LoggingEmitter; import org.apache.druid.java.util.emitter.core.LoggingEmitterConfig; import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter; From 80f18859c96009ec6e47b71786da847e1cfd70f9 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 3 Apr 2025 17:06:51 +0530 Subject: [PATCH 03/15] Do not print complete/incomplete tasks --- .../ITAutoCompactionLockContentionTest.java | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java index 4fbbbdbb92a4..11e4ec6cad30 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java @@ -324,32 +324,12 @@ private void ensureCompactionTaskCount(int expectedCount) */ private int getNumberOfCompletedCompactionTasks() { - List incompleteTasks = indexer - .getUncompletedTasksForDataSource(fullDatasourceName); List completeTasks = indexer .getCompleteTasksForDataSource(fullDatasourceName); - printTasks(incompleteTasks, "Incomplete"); - printTasks(completeTasks, "Complete"); - return (int) completeTasks.stream().filter(this::isCompactionTask).count(); } - private void printTasks(List tasks, String taskState) - { - StringBuilder sb = new StringBuilder(); - tasks.forEach( - task -> sb.append("{") - .append(task.getType()) - .append(", ") - .append(task.getStatus()) - .append(", ") - .append(task.getCreatedTime()) - .append("}, ") - ); - LOG.info("%s Tasks: %s", taskState, sb); - } - /** * Retries until the total row count is as expected. */ From 888f63409a00c1eb6c849b62f53d8bed98aed4fb Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 3 Apr 2025 17:27:28 +0530 Subject: [PATCH 04/15] Further cleanup --- .../clients/CompactionResourceTestClient.java | 24 +++++++++++++++---- .../duty/ITAutoCompactionTest.java | 22 ++++++++--------- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java index 9649a2cce707..daaa561423e5 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java @@ -31,6 +31,8 @@ import org.apache.druid.java.util.http.client.response.StatusResponseHandler; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.server.compaction.CompactionSimulateResult; +import org.apache.druid.server.compaction.CompactionStatusResponse; +import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCompactionConfig; @@ -38,6 +40,7 @@ import org.apache.druid.testing.guice.TestClient; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.testng.Assert; import java.net.URL; import java.util.List; @@ -253,7 +256,7 @@ public ClusterCompactionConfig getClusterConfig() throws Exception } /** - * This API is currently only to force the coordinator to refresh its config. + * This API is used only to force the coordinator to refresh its config. * For all other purposes, use {@link #updateClusterConfig}. */ @Deprecated @@ -273,6 +276,19 @@ private void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer ma response.getContent() ); } + + // Verify that coordinator has the latest config now + final DruidCompactionConfig configOnCoordinator = getCoordinatorCompactionConfig(); + Assert.assertEquals( + maxCompactionTaskSlots, + Integer.valueOf(configOnCoordinator.getMaxCompactionTaskSlots()), + "Could not update compaction task slots on the Coordinator" + ); + Assert.assertEquals( + compactionTaskSlotRatio, + Double.valueOf(configOnCoordinator.getCompactionTaskSlotRatio()), + "Could not update compaction task slot ratio on the Coordinator" + ); } public Map getCompactionProgress(String dataSource) throws Exception @@ -291,7 +307,7 @@ public Map getCompactionProgress(String dataSource) throws Excep return jsonMapper.readValue(response.getContent(), new TypeReference<>() {}); } - public Map getCompactionStatus(String dataSource) throws Exception + public AutoCompactionSnapshot getCompactionStatus(String dataSource) throws Exception { String url = StringUtils.format("%scompaction/status?dataSource=%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); StatusResponseHolder response = httpClient.go( @@ -306,8 +322,8 @@ public Map getCompactionStatus(String dataSource) throws Excepti response.getContent() ); } - Map>> latestSnapshots = jsonMapper.readValue(response.getContent(), new TypeReference<>() {}); - return latestSnapshots.get("latestStatus").get(0); + final CompactionStatusResponse latestSnapshots = jsonMapper.readValue(response.getContent(), new TypeReference<>() {}); + return latestSnapshots.getLatestStatus().get(0); } public CompactionSimulateResult simulateRunOnCoordinator() throws Exception diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 48ef5994177b..cdafb7fa48eb 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -2132,17 +2132,17 @@ private void getAndAssertCompactionStatus( long intervalCountSkipped ) throws Exception { - Map actualStatus = compactionResource.getCompactionStatus(fullDatasourceName); + AutoCompactionSnapshot actualStatus = compactionResource.getCompactionStatus(fullDatasourceName); Assert.assertNotNull(actualStatus); - Assert.assertEquals(actualStatus.get("scheduleStatus"), scheduleStatus.toString()); - MatcherAssert.assertThat(Long.parseLong(actualStatus.get("bytesAwaitingCompaction")), bytesAwaitingCompactionMatcher); - MatcherAssert.assertThat(Long.parseLong(actualStatus.get("bytesCompacted")), bytesCompactedMatcher); - MatcherAssert.assertThat(Long.parseLong(actualStatus.get("bytesSkipped")), bytesSkippedMatcher); - Assert.assertEquals(Long.parseLong(actualStatus.get("segmentCountAwaitingCompaction")), segmentCountAwaitingCompaction); - Assert.assertEquals(Long.parseLong(actualStatus.get("segmentCountCompacted")), segmentCountCompacted); - Assert.assertEquals(Long.parseLong(actualStatus.get("segmentCountSkipped")), segmentCountSkipped); - Assert.assertEquals(Long.parseLong(actualStatus.get("intervalCountAwaitingCompaction")), intervalCountAwaitingCompaction); - Assert.assertEquals(Long.parseLong(actualStatus.get("intervalCountCompacted")), intervalCountCompacted); - Assert.assertEquals(Long.parseLong(actualStatus.get("intervalCountSkipped")), intervalCountSkipped); + Assert.assertEquals(actualStatus.getScheduleStatus(), scheduleStatus); + MatcherAssert.assertThat(actualStatus.getBytesAwaitingCompaction(), bytesAwaitingCompactionMatcher); + MatcherAssert.assertThat(actualStatus.getBytesCompacted(), bytesCompactedMatcher); + MatcherAssert.assertThat(actualStatus.getBytesSkipped(), bytesSkippedMatcher); + Assert.assertEquals(actualStatus.getSegmentCountAwaitingCompaction(), segmentCountAwaitingCompaction); + Assert.assertEquals(actualStatus.getSegmentCountCompacted(), segmentCountCompacted); + Assert.assertEquals(actualStatus.getSegmentCountSkipped(), segmentCountSkipped); + Assert.assertEquals(actualStatus.getIntervalCountAwaitingCompaction(), intervalCountAwaitingCompaction); + Assert.assertEquals(actualStatus.getIntervalCountCompacted(), intervalCountCompacted); + Assert.assertEquals(actualStatus.getIntervalCountSkipped(), intervalCountSkipped); } } From 9a4564ce6c9b23b4fe86b2dcbba068be568e0d26 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 3 Apr 2025 18:38:48 +0530 Subject: [PATCH 05/15] Fix pom --- integration-tests-ex/cases/pom.xml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/integration-tests-ex/cases/pom.xml b/integration-tests-ex/cases/pom.xml index 1a565d5fb853..1dcf6664702f 100644 --- a/integration-tests-ex/cases/pom.xml +++ b/integration-tests-ex/cases/pom.xml @@ -83,10 +83,6 @@ com.google.inject guice - - org.apache.curator - curator-framework - com.google.guava guava From 46f59dc83f324e9fe7af3ed1f4535ff41323e705 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 3 Apr 2025 20:50:36 +0530 Subject: [PATCH 06/15] More checks around the compaction tests --- .../clients/CompactionResourceTestClient.java | 14 +++++- .../druid/testing/utils/ITRetryUtil.java | 2 + .../duty/ITAutoCompactionTest.java | 14 ++++++ .../compaction/CompactionStatusTracker.java | 2 +- .../compaction/CompactionTriggerResponse.java | 44 +++++++++++++++++++ .../server/coordinator/DruidCoordinator.java | 11 ++++- .../http/CoordinatorCompactionResource.java | 4 +- 7 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/server/compaction/CompactionTriggerResponse.java diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java index daaa561423e5..39c788d9cc56 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.server.compaction.CompactionSimulateResult; import org.apache.druid.server.compaction.CompactionStatusResponse; +import org.apache.druid.server.compaction.CompactionTriggerResponse; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -200,9 +201,9 @@ public void forceTriggerAutoCompaction() throws Exception clusterConfig.getCompactionTaskSlotRatio(), clusterConfig.getMaxCompactionTaskSlots() ); - final CompactionSimulateResult simulateResult = simulateRunOnCoordinator(); + CompactionSimulateResult simulateResult = simulateRunOnCoordinator(); log.info( - "Triggering compaction duty on Coordinator. Expected jobs: %s", + "Before triggering compaction duty on Coordinator, expected jobs are %s", simulateResult.getCompactionStates() ); @@ -215,6 +216,15 @@ public void forceTriggerAutoCompaction() throws Exception response.getContent() ); } + + final CompactionTriggerResponse result = jsonMapper.readValue(response.getContent(), new TypeReference<>() {}); + log.info("Triggered compaction. Submitted tasks[%s].", result.getSubmittedTaskIds()); + + simulateResult = simulateRunOnCoordinator(); + log.info( + "After triggering compaction duty on Coordinator, expected jobs are %s", + simulateResult.getCompactionStates() + ); } public void updateClusterConfig(ClusterCompactionConfig config) throws Exception diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java index 02fd960e13e1..63dbbea0b426 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java @@ -120,6 +120,8 @@ public static void retryUntilEquals( } } + System.out.printf("Retries[%d] exhausted.%n", retryCount); + if (lastException != null) { throw new ISE( lastException, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index cdafb7fa48eb..48c042e8e82e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -2004,6 +2004,20 @@ private void forceTriggerAutoCompaction( private void forceTriggerAutoCompaction(int numExpectedSegmentsAfterCompaction) throws Exception { compactionResource.forceTriggerAutoCompaction(); + + final AutoCompactionSnapshot snapshot = compactionResource.getCompactionStatus(fullDatasourceName); + if (snapshot == null) { + LOG.warn("Compaction could not be triggered for datasource."); + } else { + LOG.info( + "Triggered compaction with status[%s], message[%s]." + + " Interval counts: skipped[%d], compacted[%d], pending[%d].", + snapshot.getScheduleStatus(), snapshot.getMessage(), + snapshot.getIntervalCountSkipped(), snapshot.getIntervalCountCompacted(), + snapshot.getIntervalCountAwaitingCompaction() + ); + } + waitForCompactionToFinish(numExpectedSegmentsAfterCompaction); } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java index cbf5f25f9d7b..ba84ca99ff3b 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java @@ -83,7 +83,7 @@ public CompactionTaskStatus getLatestTaskStatus(CompactionCandidate candidates) */ public Set getSubmittedTaskIds() { - return submittedTaskIdToSegments.keySet(); + return Set.copyOf(submittedTaskIdToSegments.keySet()); } public CompactionStatus computeCompactionStatus( diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionTriggerResponse.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionTriggerResponse.java new file mode 100644 index 000000000000..32bd38b1f2ad --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionTriggerResponse.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.compaction; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Set; + +public class CompactionTriggerResponse +{ + private final Set submittedTaskIds; + + @JsonCreator + public CompactionTriggerResponse( + @JsonProperty("submittedTaskIds") Set submittedTaskIds + ) + { + this.submittedTaskIds = submittedTaskIds; + } + + @JsonProperty + public Set getSubmittedTaskIds() + { + return submittedTaskIds; + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 61b56e582298..6fd432f0966f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -58,6 +58,7 @@ import org.apache.druid.server.compaction.CompactionRunSimulator; import org.apache.druid.server.compaction.CompactionSimulateResult; import org.apache.druid.server.compaction.CompactionStatusTracker; +import org.apache.druid.server.compaction.CompactionTriggerResponse; import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory; import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs; import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig; @@ -105,6 +106,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -399,8 +401,10 @@ public void stop() } } - public void runCompactSegmentsDuty() + public CompactionTriggerResponse runCompactSegmentsDuty() { + final Set taskIdsBeforeDuty = compactionStatusTracker.getSubmittedTaskIds(); + final int startingLeaderCounter = coordLeaderSelector.localTerm(); DutiesRunnable compactSegmentsDuty = new DutiesRunnable( ImmutableList.of(compactSegments), @@ -409,6 +413,11 @@ public void runCompactSegmentsDuty() null ); compactSegmentsDuty.run(); + + final Set submittedTaskIds = new HashSet<>(compactionStatusTracker.getSubmittedTaskIds()); + submittedTaskIds.removeAll(taskIdsBeforeDuty); + + return new CompactionTriggerResponse(submittedTaskIds); } private Map> computeUnderReplicated( diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java index 0e465abbc5c1..abc03d5f1d36 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java @@ -61,11 +61,11 @@ public CoordinatorCompactionResource( @POST @Path("/compact") @ResourceFilters(ConfigResourceFilter.class) + @Produces(MediaType.APPLICATION_JSON) @VisibleForTesting public Response forceTriggerCompaction() { - coordinator.runCompactSegmentsDuty(); - return Response.ok().build(); + return Response.ok(coordinator.runCompactSegmentsDuty()).build(); } @GET From af74ba09b0b94f27bbbe1be740224ecdf536dd6f Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 3 Apr 2025 21:37:51 +0530 Subject: [PATCH 07/15] Fix coverage --- .../coordinator/DruidCoordinatorTest.java | 4 ++++ .../CoordinatorCompactionResourceTest.java | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index fa6a4e9bdc56..9c4be44d794d 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -48,6 +48,7 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.compaction.CompactionSimulateResult; import org.apache.druid.server.compaction.CompactionStatusTracker; +import org.apache.druid.server.compaction.CompactionTriggerResponse; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory; import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs; @@ -824,6 +825,9 @@ public void testSimulateRunWithEmptyDatasourceCompactionConfigs() new ClusterCompactionConfig(0.2, null, null, null, null) ); Assert.assertEquals(Collections.emptyMap(), result.getCompactionStates()); + + final CompactionTriggerResponse response = coordinator.runCompactSegmentsDuty(); + Assert.assertTrue(response.getSubmittedTaskIds().isEmpty()); } private void setupSegmentsMetadataMock(DruidDataSource dataSource) diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java index 943f9a6ccf7d..c908918bc8c0 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java @@ -23,6 +23,7 @@ import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.error.ErrorResponse; import org.apache.druid.server.compaction.CompactionStatusResponse; +import org.apache.druid.server.compaction.CompactionTriggerResponse; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.DruidCoordinator; import org.easymock.EasyMock; @@ -36,6 +37,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; public class CoordinatorCompactionResourceTest { @@ -152,4 +154,21 @@ public void testGetProgressForNullDatasourceReturnsBadRequest() DruidExceptionMatcher.invalidInput().expectMessageIs("No DataSource specified") ); } + + @Test + public void testForceTriggerCompaction() + { + EasyMock.expect(mock.runCompactSegmentsDuty()) + .andReturn(new CompactionTriggerResponse(Set.of())); + EasyMock.replay(mock); + + final Response response = new CoordinatorCompactionResource(mock) + .forceTriggerCompaction(); + Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + + Assert.assertTrue(response.getEntity() instanceof CompactionTriggerResponse); + + final CompactionTriggerResponse payload = (CompactionTriggerResponse) response.getEntity(); + Assert.assertTrue(payload.getSubmittedTaskIds().isEmpty()); + } } From 7fc77c3f6b92b60e9c114bfc625c9a9b508c18da Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 4 Apr 2025 09:26:44 +0530 Subject: [PATCH 08/15] Include skip reason in compaction snapshot --- .../ITAutoCompactionLockContentionTest.java | 24 ++++++++++++-- .../server/compaction/CompactionStatus.java | 2 +- .../coordinator/AutoCompactionSnapshot.java | 33 +++++++++++++++---- .../coordinator/duty/CompactSegments.java | 2 +- .../AutoCompactionSnapshotTest.java | 17 ++++++++-- .../CoordinatorCompactionResourceTest.java | 3 +- 6 files changed, 67 insertions(+), 14 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java index 11e4ec6cad30..56023526f1e6 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig; @@ -151,9 +152,9 @@ public void testAutoCompactionSkipsLockedIntervals(boolean transactionEnabled) t rowsForMinute2 += generateData(minute2, streamEventWriter); ensureLockedIntervals(minute2); - // Trigger auto compaction + // Trigger auto compaction and verify that only minute1 and minute3 are compacted submitAndVerifyCompactionConfig(); - compactionResource.forceTriggerAutoCompaction(); + forceTriggerAutoCompaction(); // Wait for segments to be loaded ensureRowCount(rowsForMinute1 + rowsForMinute2 + rowsForMinute3); @@ -165,7 +166,7 @@ public void testAutoCompactionSkipsLockedIntervals(boolean transactionEnabled) t verifyCompactedIntervals(minute1, minute3); // Trigger auto compaction again - compactionResource.forceTriggerAutoCompaction(); + forceTriggerAutoCompaction(); // Verify that all the segments are now compacted ensureCompactionTaskCount(3); @@ -175,6 +176,23 @@ public void testAutoCompactionSkipsLockedIntervals(boolean transactionEnabled) t } } + private void forceTriggerAutoCompaction() throws Exception + { + compactionResource.forceTriggerAutoCompaction(); + final AutoCompactionSnapshot snapshot = compactionResource.getCompactionStatus(fullDatasourceName); + if (snapshot == null) { + LOG.warn("Compaction could not be triggered for datasource."); + } else { + LOG.info( + "Triggered compaction with status[%s], message[%s]." + + " Interval counts: skipped[%d], compacted[%d], pending[%d].", + snapshot.getScheduleStatus(), snapshot.getMessage(), + snapshot.getIntervalCountSkipped(), snapshot.getIntervalCountCompacted(), + snapshot.getIntervalCountAwaitingCompaction() + ); + } + } + /** * Retries until the segment count is as expected. */ diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java index 77786993ac99..268f39781cf9 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java @@ -186,7 +186,7 @@ private static String asString(PartitionsSpec partitionsSpec) } } - static CompactionStatus skipped(String reasonFormat, Object... args) + public static CompactionStatus skipped(String reasonFormat, Object... args) { return new CompactionStatus(State.SKIPPED, StringUtils.format(reasonFormat, args)); } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java index e31a7919f24f..28aa06cabbb4 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java @@ -23,10 +23,13 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.server.compaction.CompactionCandidate; import org.apache.druid.server.compaction.CompactionStatistics; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; public class AutoCompactionSnapshot @@ -62,6 +65,8 @@ public enum ScheduleStatus private final long intervalCountCompacted; @JsonProperty private final long intervalCountSkipped; + @JsonProperty + private final List skippedReasons; public static Builder builder(String dataSource) { @@ -81,7 +86,8 @@ public AutoCompactionSnapshot( @JsonProperty("segmentCountSkipped") long segmentCountSkipped, @JsonProperty("intervalCountAwaitingCompaction") long intervalCountAwaitingCompaction, @JsonProperty("intervalCountCompacted") long intervalCountCompacted, - @JsonProperty("intervalCountSkipped") long intervalCountSkipped + @JsonProperty("intervalCountSkipped") long intervalCountSkipped, + @JsonProperty("skippedReasons") @Nullable List skippedReasons ) { this.dataSource = dataSource; @@ -96,6 +102,7 @@ public AutoCompactionSnapshot( this.intervalCountAwaitingCompaction = intervalCountAwaitingCompaction; this.intervalCountCompacted = intervalCountCompacted; this.intervalCountSkipped = intervalCountSkipped; + this.skippedReasons = skippedReasons; } @NotNull @@ -161,6 +168,12 @@ public long getIntervalCountSkipped() return intervalCountSkipped; } + @Nullable + public List getSkippedReasons() + { + return skippedReasons; + } + @Override public boolean equals(Object o) { @@ -182,7 +195,8 @@ public boolean equals(Object o) intervalCountSkipped == that.intervalCountSkipped && dataSource.equals(that.dataSource) && scheduleStatus == that.scheduleStatus && - Objects.equals(message, that.message); + Objects.equals(message, that.message) && + Objects.equals(skippedReasons, that.skippedReasons); } @Override @@ -200,7 +214,8 @@ public int hashCode() segmentCountSkipped, intervalCountAwaitingCompaction, intervalCountCompacted, - intervalCountSkipped + intervalCountSkipped, + skippedReasons ); } @@ -214,6 +229,8 @@ public static class Builder private final CompactionStatistics skippedStats = new CompactionStatistics(); private final CompactionStatistics waitingStats = new CompactionStatistics(); + private final List skippedReasons = new ArrayList<>(); + private Builder( @NotNull String dataSource ) @@ -246,9 +263,12 @@ public void incrementCompactedStats(CompactionStatistics entry) compactedStats.increment(entry); } - public void incrementSkippedStats(CompactionStatistics entry) + public void incrementSkippedStats(CompactionCandidate candidate) { - skippedStats.increment(entry); + skippedStats.increment(candidate.getStats()); + if (candidate.getCurrentStatus() != null) { + skippedReasons.add(candidate.getUmbrellaInterval() + ": " + candidate.getCurrentStatus().getReason()); + } } public AutoCompactionSnapshot build() @@ -265,7 +285,8 @@ public AutoCompactionSnapshot build() skippedStats.getNumSegments(), waitingStats.getNumIntervals(), compactedStats.getNumIntervals(), - skippedStats.getNumIntervals() + skippedStats.getNumIntervals(), + skippedReasons.size() > 10 ? skippedReasons.subList(0, 10) : skippedReasons ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 543bc2b82139..1e971f30b416 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -631,7 +631,7 @@ private void updateCompactionSnapshotStats( iterator.getSkippedSegments().forEach( candidateSegments -> currentRunAutoCompactionSnapshotBuilders .computeIfAbsent(candidateSegments.getDataSource(), AutoCompactionSnapshot::builder) - .incrementSkippedStats(candidateSegments.getStats()) + .incrementSkippedStats(candidateSegments) ); final Map currentAutoCompactionSnapshotPerDataSource = new HashMap<>(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java index a6eb127f854c..a8a4b363fd74 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java @@ -19,10 +19,16 @@ package org.apache.druid.server.coordinator; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.server.compaction.CompactionCandidate; import org.apache.druid.server.compaction.CompactionStatistics; +import org.apache.druid.server.compaction.CompactionStatus; +import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Test; +import java.util.List; + public class AutoCompactionSnapshotTest { @Test @@ -32,9 +38,15 @@ public void testAutoCompactionSnapshotBuilder() final String expectedMessage = "message"; final AutoCompactionSnapshot.Builder builder = AutoCompactionSnapshot.builder(expectedDataSource); + final List segments = CreateDataSegments.ofDatasource(expectedDataSource) + .forIntervals(13, Granularities.HOUR) + .eachOfSize(1); + // Increment every stat twice for (int i = 0; i < 2; i++) { - builder.incrementSkippedStats(CompactionStatistics.create(13, 13, 13)); + final CompactionCandidate skippedCandidate = + CompactionCandidate.from(segments).withCurrentStatus(CompactionStatus.skipped("skip reason")); + builder.incrementSkippedStats(skippedCandidate); builder.incrementWaitingStats(CompactionStatistics.create(13, 13, 13)); builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13)); } @@ -67,7 +79,8 @@ public void testAutoCompactionSnapshotBuilder() 26, 26, 26, - 26 + 26, + List.of("skip reason") ); Assert.assertEquals(expected, actual); } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java index c908918bc8c0..c5e456e32d2c 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java @@ -55,7 +55,8 @@ public class CoordinatorCompactionResourceTest 1, 1, 1, - 1 + 1, + null ); @Before From 3aed0dac2389e38c6fb348e1f68c6b0c7cf049d4 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 4 Apr 2025 09:47:10 +0530 Subject: [PATCH 09/15] Fix test --- .../coordinator/AutoCompactionSnapshotTest.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java index a8a4b363fd74..b3ae4804618b 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java @@ -40,12 +40,13 @@ public void testAutoCompactionSnapshotBuilder() final List segments = CreateDataSegments.ofDatasource(expectedDataSource) .forIntervals(13, Granularities.HOUR) + .startingAt("2010-01-01") .eachOfSize(1); // Increment every stat twice for (int i = 0; i < 2; i++) { final CompactionCandidate skippedCandidate = - CompactionCandidate.from(segments).withCurrentStatus(CompactionStatus.skipped("skip reason")); + CompactionCandidate.from(segments).withCurrentStatus(CompactionStatus.skipped("reason " + i)); builder.incrementSkippedStats(skippedCandidate); builder.incrementWaitingStats(CompactionStatistics.create(13, 13, 13)); builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13)); @@ -67,6 +68,12 @@ public void testAutoCompactionSnapshotBuilder() Assert.assertEquals(expectedDataSource, actual.getDataSource()); Assert.assertEquals(expectedMessage, actual.getMessage()); + final List skipReasons = List.of( + "2010-01-01T00:00:00.000Z/2010-01-01T13:00:00.000Z: reason 0", + "2010-01-01T00:00:00.000Z/2010-01-01T13:00:00.000Z: reason 1" + ); + Assert.assertEquals(skipReasons, actual.getSkippedReasons()); + AutoCompactionSnapshot expected = new AutoCompactionSnapshot( expectedDataSource, AutoCompactionSnapshot.ScheduleStatus.RUNNING, @@ -80,7 +87,7 @@ public void testAutoCompactionSnapshotBuilder() 26, 26, 26, - List.of("skip reason") + skipReasons ); Assert.assertEquals(expected, actual); } From 9c3e4a2ba962eba55c0f2cf76d79360ba0eccdb0 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 4 Apr 2025 11:15:24 +0530 Subject: [PATCH 10/15] Log skip reasons --- .../duty/ITAutoCompactionLockContentionTest.java | 6 ++++-- .../druid/tests/coordinator/duty/ITAutoCompactionTest.java | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java index 56023526f1e6..d7205e2fc2a8 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java @@ -185,10 +185,12 @@ private void forceTriggerAutoCompaction() throws Exception } else { LOG.info( "Triggered compaction with status[%s], message[%s]." - + " Interval counts: skipped[%d], compacted[%d], pending[%d].", + + " Interval counts: skipped[%d], compacted[%d], pending[%d]." + + " Skipped reasons: %s", snapshot.getScheduleStatus(), snapshot.getMessage(), snapshot.getIntervalCountSkipped(), snapshot.getIntervalCountCompacted(), - snapshot.getIntervalCountAwaitingCompaction() + snapshot.getIntervalCountAwaitingCompaction(), + snapshot.getSkippedReasons() ); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 48c042e8e82e..77f1d3d90845 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -2011,10 +2011,12 @@ private void forceTriggerAutoCompaction(int numExpectedSegmentsAfterCompaction) } else { LOG.info( "Triggered compaction with status[%s], message[%s]." - + " Interval counts: skipped[%d], compacted[%d], pending[%d].", + + " Interval counts: skipped[%d], compacted[%d], pending[%d]." + + " Skip reasons: %s", snapshot.getScheduleStatus(), snapshot.getMessage(), snapshot.getIntervalCountSkipped(), snapshot.getIntervalCountCompacted(), - snapshot.getIntervalCountAwaitingCompaction() + snapshot.getIntervalCountAwaitingCompaction(), + snapshot.getSkippedReasons() ); } From 7af4ac5e4abeb10fe940384663bff5ca3e57daa0 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 4 Apr 2025 17:17:18 +0530 Subject: [PATCH 11/15] Do not use stale config while updating compaction task slots --- .../clients/CompactionResourceTestClient.java | 3 ++- .../coordinator/CoordinatorConfigManager.java | 23 +++++++++++++++++++ .../CoordinatorCompactionConfigsResource.java | 13 +---------- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java index 39c788d9cc56..c27bb984fbac 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java @@ -191,7 +191,8 @@ public DataSourceCompactionConfig getDataSourceCompactionConfig(String dataSourc public void forceTriggerAutoCompaction() throws Exception { - // Perform a dummy update of task slots to force the coordinator to refresh its compaction config + // Fetch cluster config from Overlord and perform a dummy update of task slots + // to force the coordinator to refresh its compaction config final ClusterCompactionConfig clusterConfig = getClusterConfig(); updateCompactionTaskSlot( clusterConfig.getCompactionTaskSlotRatio(), diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java index 9313ba1ff15b..81e57bc5b9b8 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java @@ -27,6 +27,7 @@ import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; import org.apache.druid.common.config.ConfigManager; +import org.apache.druid.common.config.Configs; import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.error.DruidException; import org.apache.druid.error.InternalServerError; @@ -152,6 +153,28 @@ public DruidCompactionConfig convertBytesToCompactionConfig(byte[] bytes) ); } + public boolean updateCompactionTaskSlots( + @Nullable Double compactionTaskSlotRatio, + @Nullable Integer maxCompactionTaskSlots, + AuditInfo auditInfo + ) + { + UnaryOperator operator = current -> { + final ClusterCompactionConfig currentClusterConfig = current.clusterConfig(); + final ClusterCompactionConfig updatedClusterConfig = new ClusterCompactionConfig( + Configs.valueOrDefault(compactionTaskSlotRatio, currentClusterConfig.getCompactionTaskSlotRatio()), + Configs.valueOrDefault(maxCompactionTaskSlots, currentClusterConfig.getMaxCompactionTaskSlots()), + currentClusterConfig.getCompactionPolicy(), + currentClusterConfig.isUseSupervisors(), + currentClusterConfig.getEngine() + ); + + return current.withClusterConfig(updatedClusterConfig); + }; + + return updateConfigHelper(operator, auditInfo); + } + public boolean updateClusterCompactionConfig( ClusterCompactionConfig config, AuditInfo auditInfo diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java index f24aae7baa58..561670ae628b 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResource.java @@ -22,10 +22,8 @@ import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.audit.AuditInfo; -import org.apache.druid.common.config.Configs; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.CompactionEngine; -import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.CoordinatorConfigManager; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.http.security.ConfigResourceFilter; @@ -85,18 +83,9 @@ public Response setCompactionTaskLimit( return ServletResourceUtils.buildUpdateResponse(() -> true); } - final ClusterCompactionConfig currentConfig = configManager.getCurrentCompactionConfig().clusterConfig(); - final ClusterCompactionConfig updatedClusterConfig = new ClusterCompactionConfig( - Configs.valueOrDefault(compactionTaskSlotRatio, currentConfig.getCompactionTaskSlotRatio()), - Configs.valueOrDefault(maxCompactionTaskSlots, currentConfig.getMaxCompactionTaskSlots()), - currentConfig.getCompactionPolicy(), - currentConfig.isUseSupervisors(), - currentConfig.getEngine() - ); - final AuditInfo auditInfo = AuthorizationUtils.buildAuditInfo(req); return ServletResourceUtils.buildUpdateResponse( - () -> configManager.updateClusterCompactionConfig(updatedClusterConfig, auditInfo) + () -> configManager.updateCompactionTaskSlots(compactionTaskSlotRatio, maxCompactionTaskSlots, auditInfo) ); } From bdd7cf9e51ca02bc15acacc8f7f754a78dca18c3 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 4 Apr 2025 18:42:32 +0530 Subject: [PATCH 12/15] Fix checkstyle --- .../druid/server/coordinator/CoordinatorConfigManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java index 81e57bc5b9b8..5d435549bacd 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorConfigManager.java @@ -154,9 +154,9 @@ public DruidCompactionConfig convertBytesToCompactionConfig(byte[] bytes) } public boolean updateCompactionTaskSlots( - @Nullable Double compactionTaskSlotRatio, - @Nullable Integer maxCompactionTaskSlots, - AuditInfo auditInfo + @Nullable Double compactionTaskSlotRatio, + @Nullable Integer maxCompactionTaskSlots, + AuditInfo auditInfo ) { UnaryOperator operator = current -> { From 7e66d9d32464bcdc9f8d4b33d9872680548a9e38 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 4 Apr 2025 20:51:33 +0530 Subject: [PATCH 13/15] Revert extra changes --- .../clients/CompactionResourceTestClient.java | 33 ++------------ .../ITAutoCompactionLockContentionTest.java | 26 ++--------- .../duty/ITAutoCompactionTest.java | 16 ------- .../compaction/CompactionStatusTracker.java | 2 +- .../compaction/CompactionTriggerResponse.java | 44 ------------------- .../coordinator/AutoCompactionSnapshot.java | 33 +++----------- .../server/coordinator/DruidCoordinator.java | 11 +---- .../coordinator/duty/CompactSegments.java | 2 +- .../http/CoordinatorCompactionResource.java | 4 +- .../AutoCompactionSnapshotTest.java | 24 +--------- .../coordinator/DruidCoordinatorTest.java | 4 -- .../CoordinatorCompactionResourceTest.java | 19 -------- 12 files changed, 20 insertions(+), 198 deletions(-) delete mode 100644 server/src/main/java/org/apache/druid/server/compaction/CompactionTriggerResponse.java diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java index c27bb984fbac..e05e72b12ffa 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CompactionResourceTestClient.java @@ -32,7 +32,6 @@ import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.server.compaction.CompactionSimulateResult; import org.apache.druid.server.compaction.CompactionStatusResponse; -import org.apache.druid.server.compaction.CompactionTriggerResponse; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; @@ -41,7 +40,6 @@ import org.apache.druid.testing.guice.TestClient; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.testng.Assert; import java.net.URL; import java.util.List; @@ -191,8 +189,7 @@ public DataSourceCompactionConfig getDataSourceCompactionConfig(String dataSourc public void forceTriggerAutoCompaction() throws Exception { - // Fetch cluster config from Overlord and perform a dummy update of task slots - // to force the coordinator to refresh its compaction config + // Perform a dummy update of task slots to force the coordinator to refresh its compaction config final ClusterCompactionConfig clusterConfig = getClusterConfig(); updateCompactionTaskSlot( clusterConfig.getCompactionTaskSlotRatio(), @@ -202,9 +199,9 @@ public void forceTriggerAutoCompaction() throws Exception clusterConfig.getCompactionTaskSlotRatio(), clusterConfig.getMaxCompactionTaskSlots() ); - CompactionSimulateResult simulateResult = simulateRunOnCoordinator(); + final CompactionSimulateResult simulateResult = simulateRunOnCoordinator(); log.info( - "Before triggering compaction duty on Coordinator, expected jobs are %s", + "Triggering compaction duty on Coordinator. Expected jobs: %s", simulateResult.getCompactionStates() ); @@ -217,15 +214,6 @@ public void forceTriggerAutoCompaction() throws Exception response.getContent() ); } - - final CompactionTriggerResponse result = jsonMapper.readValue(response.getContent(), new TypeReference<>() {}); - log.info("Triggered compaction. Submitted tasks[%s].", result.getSubmittedTaskIds()); - - simulateResult = simulateRunOnCoordinator(); - log.info( - "After triggering compaction duty on Coordinator, expected jobs are %s", - simulateResult.getCompactionStates() - ); } public void updateClusterConfig(ClusterCompactionConfig config) throws Exception @@ -267,7 +255,7 @@ public ClusterCompactionConfig getClusterConfig() throws Exception } /** - * This API is used only to force the coordinator to refresh its config. + * This API is currently only to force the coordinator to refresh its config. * For all other purposes, use {@link #updateClusterConfig}. */ @Deprecated @@ -287,19 +275,6 @@ private void updateCompactionTaskSlot(Double compactionTaskSlotRatio, Integer ma response.getContent() ); } - - // Verify that coordinator has the latest config now - final DruidCompactionConfig configOnCoordinator = getCoordinatorCompactionConfig(); - Assert.assertEquals( - maxCompactionTaskSlots, - Integer.valueOf(configOnCoordinator.getMaxCompactionTaskSlots()), - "Could not update compaction task slots on the Coordinator" - ); - Assert.assertEquals( - compactionTaskSlotRatio, - Double.valueOf(configOnCoordinator.getCompactionTaskSlotRatio()), - "Could not update compaction task slot ratio on the Coordinator" - ); } public Map getCompactionProgress(String dataSource) throws Exception diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java index d7205e2fc2a8..11e4ec6cad30 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java @@ -25,7 +25,6 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.ClusterCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig; @@ -152,9 +151,9 @@ public void testAutoCompactionSkipsLockedIntervals(boolean transactionEnabled) t rowsForMinute2 += generateData(minute2, streamEventWriter); ensureLockedIntervals(minute2); - // Trigger auto compaction and verify that only minute1 and minute3 are compacted + // Trigger auto compaction submitAndVerifyCompactionConfig(); - forceTriggerAutoCompaction(); + compactionResource.forceTriggerAutoCompaction(); // Wait for segments to be loaded ensureRowCount(rowsForMinute1 + rowsForMinute2 + rowsForMinute3); @@ -166,7 +165,7 @@ public void testAutoCompactionSkipsLockedIntervals(boolean transactionEnabled) t verifyCompactedIntervals(minute1, minute3); // Trigger auto compaction again - forceTriggerAutoCompaction(); + compactionResource.forceTriggerAutoCompaction(); // Verify that all the segments are now compacted ensureCompactionTaskCount(3); @@ -176,25 +175,6 @@ public void testAutoCompactionSkipsLockedIntervals(boolean transactionEnabled) t } } - private void forceTriggerAutoCompaction() throws Exception - { - compactionResource.forceTriggerAutoCompaction(); - final AutoCompactionSnapshot snapshot = compactionResource.getCompactionStatus(fullDatasourceName); - if (snapshot == null) { - LOG.warn("Compaction could not be triggered for datasource."); - } else { - LOG.info( - "Triggered compaction with status[%s], message[%s]." - + " Interval counts: skipped[%d], compacted[%d], pending[%d]." - + " Skipped reasons: %s", - snapshot.getScheduleStatus(), snapshot.getMessage(), - snapshot.getIntervalCountSkipped(), snapshot.getIntervalCountCompacted(), - snapshot.getIntervalCountAwaitingCompaction(), - snapshot.getSkippedReasons() - ); - } - } - /** * Retries until the segment count is as expected. */ diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java index 77f1d3d90845..cdafb7fa48eb 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java @@ -2004,22 +2004,6 @@ private void forceTriggerAutoCompaction( private void forceTriggerAutoCompaction(int numExpectedSegmentsAfterCompaction) throws Exception { compactionResource.forceTriggerAutoCompaction(); - - final AutoCompactionSnapshot snapshot = compactionResource.getCompactionStatus(fullDatasourceName); - if (snapshot == null) { - LOG.warn("Compaction could not be triggered for datasource."); - } else { - LOG.info( - "Triggered compaction with status[%s], message[%s]." - + " Interval counts: skipped[%d], compacted[%d], pending[%d]." - + " Skip reasons: %s", - snapshot.getScheduleStatus(), snapshot.getMessage(), - snapshot.getIntervalCountSkipped(), snapshot.getIntervalCountCompacted(), - snapshot.getIntervalCountAwaitingCompaction(), - snapshot.getSkippedReasons() - ); - } - waitForCompactionToFinish(numExpectedSegmentsAfterCompaction); } diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java index ba84ca99ff3b..cbf5f25f9d7b 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatusTracker.java @@ -83,7 +83,7 @@ public CompactionTaskStatus getLatestTaskStatus(CompactionCandidate candidates) */ public Set getSubmittedTaskIds() { - return Set.copyOf(submittedTaskIdToSegments.keySet()); + return submittedTaskIdToSegments.keySet(); } public CompactionStatus computeCompactionStatus( diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionTriggerResponse.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionTriggerResponse.java deleted file mode 100644 index 32bd38b1f2ad..000000000000 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionTriggerResponse.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.compaction; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.Set; - -public class CompactionTriggerResponse -{ - private final Set submittedTaskIds; - - @JsonCreator - public CompactionTriggerResponse( - @JsonProperty("submittedTaskIds") Set submittedTaskIds - ) - { - this.submittedTaskIds = submittedTaskIds; - } - - @JsonProperty - public Set getSubmittedTaskIds() - { - return submittedTaskIds; - } -} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java index 28aa06cabbb4..e31a7919f24f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/AutoCompactionSnapshot.java @@ -23,13 +23,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.server.compaction.CompactionCandidate; import org.apache.druid.server.compaction.CompactionStatistics; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; -import java.util.ArrayList; -import java.util.List; import java.util.Objects; public class AutoCompactionSnapshot @@ -65,8 +62,6 @@ public enum ScheduleStatus private final long intervalCountCompacted; @JsonProperty private final long intervalCountSkipped; - @JsonProperty - private final List skippedReasons; public static Builder builder(String dataSource) { @@ -86,8 +81,7 @@ public AutoCompactionSnapshot( @JsonProperty("segmentCountSkipped") long segmentCountSkipped, @JsonProperty("intervalCountAwaitingCompaction") long intervalCountAwaitingCompaction, @JsonProperty("intervalCountCompacted") long intervalCountCompacted, - @JsonProperty("intervalCountSkipped") long intervalCountSkipped, - @JsonProperty("skippedReasons") @Nullable List skippedReasons + @JsonProperty("intervalCountSkipped") long intervalCountSkipped ) { this.dataSource = dataSource; @@ -102,7 +96,6 @@ public AutoCompactionSnapshot( this.intervalCountAwaitingCompaction = intervalCountAwaitingCompaction; this.intervalCountCompacted = intervalCountCompacted; this.intervalCountSkipped = intervalCountSkipped; - this.skippedReasons = skippedReasons; } @NotNull @@ -168,12 +161,6 @@ public long getIntervalCountSkipped() return intervalCountSkipped; } - @Nullable - public List getSkippedReasons() - { - return skippedReasons; - } - @Override public boolean equals(Object o) { @@ -195,8 +182,7 @@ public boolean equals(Object o) intervalCountSkipped == that.intervalCountSkipped && dataSource.equals(that.dataSource) && scheduleStatus == that.scheduleStatus && - Objects.equals(message, that.message) && - Objects.equals(skippedReasons, that.skippedReasons); + Objects.equals(message, that.message); } @Override @@ -214,8 +200,7 @@ public int hashCode() segmentCountSkipped, intervalCountAwaitingCompaction, intervalCountCompacted, - intervalCountSkipped, - skippedReasons + intervalCountSkipped ); } @@ -229,8 +214,6 @@ public static class Builder private final CompactionStatistics skippedStats = new CompactionStatistics(); private final CompactionStatistics waitingStats = new CompactionStatistics(); - private final List skippedReasons = new ArrayList<>(); - private Builder( @NotNull String dataSource ) @@ -263,12 +246,9 @@ public void incrementCompactedStats(CompactionStatistics entry) compactedStats.increment(entry); } - public void incrementSkippedStats(CompactionCandidate candidate) + public void incrementSkippedStats(CompactionStatistics entry) { - skippedStats.increment(candidate.getStats()); - if (candidate.getCurrentStatus() != null) { - skippedReasons.add(candidate.getUmbrellaInterval() + ": " + candidate.getCurrentStatus().getReason()); - } + skippedStats.increment(entry); } public AutoCompactionSnapshot build() @@ -285,8 +265,7 @@ public AutoCompactionSnapshot build() skippedStats.getNumSegments(), waitingStats.getNumIntervals(), compactedStats.getNumIntervals(), - skippedStats.getNumIntervals(), - skippedReasons.size() > 10 ? skippedReasons.subList(0, 10) : skippedReasons + skippedStats.getNumIntervals() ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java index 6fd432f0966f..61b56e582298 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java @@ -58,7 +58,6 @@ import org.apache.druid.server.compaction.CompactionRunSimulator; import org.apache.druid.server.compaction.CompactionSimulateResult; import org.apache.druid.server.compaction.CompactionStatusTracker; -import org.apache.druid.server.compaction.CompactionTriggerResponse; import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory; import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs; import org.apache.druid.server.coordinator.config.DruidCoordinatorConfig; @@ -106,7 +105,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -401,10 +399,8 @@ public void stop() } } - public CompactionTriggerResponse runCompactSegmentsDuty() + public void runCompactSegmentsDuty() { - final Set taskIdsBeforeDuty = compactionStatusTracker.getSubmittedTaskIds(); - final int startingLeaderCounter = coordLeaderSelector.localTerm(); DutiesRunnable compactSegmentsDuty = new DutiesRunnable( ImmutableList.of(compactSegments), @@ -413,11 +409,6 @@ public CompactionTriggerResponse runCompactSegmentsDuty() null ); compactSegmentsDuty.run(); - - final Set submittedTaskIds = new HashSet<>(compactionStatusTracker.getSubmittedTaskIds()); - submittedTaskIds.removeAll(taskIdsBeforeDuty); - - return new CompactionTriggerResponse(submittedTaskIds); } private Map> computeUnderReplicated( diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index 1e971f30b416..543bc2b82139 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -631,7 +631,7 @@ private void updateCompactionSnapshotStats( iterator.getSkippedSegments().forEach( candidateSegments -> currentRunAutoCompactionSnapshotBuilders .computeIfAbsent(candidateSegments.getDataSource(), AutoCompactionSnapshot::builder) - .incrementSkippedStats(candidateSegments) + .incrementSkippedStats(candidateSegments.getStats()) ); final Map currentAutoCompactionSnapshotPerDataSource = new HashMap<>(); diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java index abc03d5f1d36..0e465abbc5c1 100644 --- a/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java +++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorCompactionResource.java @@ -61,11 +61,11 @@ public CoordinatorCompactionResource( @POST @Path("/compact") @ResourceFilters(ConfigResourceFilter.class) - @Produces(MediaType.APPLICATION_JSON) @VisibleForTesting public Response forceTriggerCompaction() { - return Response.ok(coordinator.runCompactSegmentsDuty()).build(); + coordinator.runCompactSegmentsDuty(); + return Response.ok().build(); } @GET diff --git a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java index b3ae4804618b..a6eb127f854c 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/AutoCompactionSnapshotTest.java @@ -19,16 +19,10 @@ package org.apache.druid.server.coordinator; -import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.server.compaction.CompactionCandidate; import org.apache.druid.server.compaction.CompactionStatistics; -import org.apache.druid.server.compaction.CompactionStatus; -import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Test; -import java.util.List; - public class AutoCompactionSnapshotTest { @Test @@ -38,16 +32,9 @@ public void testAutoCompactionSnapshotBuilder() final String expectedMessage = "message"; final AutoCompactionSnapshot.Builder builder = AutoCompactionSnapshot.builder(expectedDataSource); - final List segments = CreateDataSegments.ofDatasource(expectedDataSource) - .forIntervals(13, Granularities.HOUR) - .startingAt("2010-01-01") - .eachOfSize(1); - // Increment every stat twice for (int i = 0; i < 2; i++) { - final CompactionCandidate skippedCandidate = - CompactionCandidate.from(segments).withCurrentStatus(CompactionStatus.skipped("reason " + i)); - builder.incrementSkippedStats(skippedCandidate); + builder.incrementSkippedStats(CompactionStatistics.create(13, 13, 13)); builder.incrementWaitingStats(CompactionStatistics.create(13, 13, 13)); builder.incrementCompactedStats(CompactionStatistics.create(13, 13, 13)); } @@ -68,12 +55,6 @@ public void testAutoCompactionSnapshotBuilder() Assert.assertEquals(expectedDataSource, actual.getDataSource()); Assert.assertEquals(expectedMessage, actual.getMessage()); - final List skipReasons = List.of( - "2010-01-01T00:00:00.000Z/2010-01-01T13:00:00.000Z: reason 0", - "2010-01-01T00:00:00.000Z/2010-01-01T13:00:00.000Z: reason 1" - ); - Assert.assertEquals(skipReasons, actual.getSkippedReasons()); - AutoCompactionSnapshot expected = new AutoCompactionSnapshot( expectedDataSource, AutoCompactionSnapshot.ScheduleStatus.RUNNING, @@ -86,8 +67,7 @@ public void testAutoCompactionSnapshotBuilder() 26, 26, 26, - 26, - skipReasons + 26 ); Assert.assertEquals(expected, actual); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java index 9c4be44d794d..fa6a4e9bdc56 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java @@ -48,7 +48,6 @@ import org.apache.druid.server.DruidNode; import org.apache.druid.server.compaction.CompactionSimulateResult; import org.apache.druid.server.compaction.CompactionStatusTracker; -import org.apache.druid.server.compaction.CompactionTriggerResponse; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory; import org.apache.druid.server.coordinator.config.CoordinatorKillConfigs; @@ -825,9 +824,6 @@ public void testSimulateRunWithEmptyDatasourceCompactionConfigs() new ClusterCompactionConfig(0.2, null, null, null, null) ); Assert.assertEquals(Collections.emptyMap(), result.getCompactionStates()); - - final CompactionTriggerResponse response = coordinator.runCompactSegmentsDuty(); - Assert.assertTrue(response.getSubmittedTaskIds().isEmpty()); } private void setupSegmentsMetadataMock(DruidDataSource dataSource) diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java index c5e456e32d2c..df0fb07c4325 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java @@ -23,7 +23,6 @@ import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.error.ErrorResponse; import org.apache.druid.server.compaction.CompactionStatusResponse; -import org.apache.druid.server.compaction.CompactionTriggerResponse; import org.apache.druid.server.coordinator.AutoCompactionSnapshot; import org.apache.druid.server.coordinator.DruidCoordinator; import org.easymock.EasyMock; @@ -37,7 +36,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; public class CoordinatorCompactionResourceTest { @@ -155,21 +153,4 @@ public void testGetProgressForNullDatasourceReturnsBadRequest() DruidExceptionMatcher.invalidInput().expectMessageIs("No DataSource specified") ); } - - @Test - public void testForceTriggerCompaction() - { - EasyMock.expect(mock.runCompactSegmentsDuty()) - .andReturn(new CompactionTriggerResponse(Set.of())); - EasyMock.replay(mock); - - final Response response = new CoordinatorCompactionResource(mock) - .forceTriggerCompaction(); - Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - - Assert.assertTrue(response.getEntity() instanceof CompactionTriggerResponse); - - final CompactionTriggerResponse payload = (CompactionTriggerResponse) response.getEntity(); - Assert.assertTrue(payload.getSubmittedTaskIds().isEmpty()); - } } From ed7e42379a1fdbdaf3260e737a4f60179277a9f4 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 4 Apr 2025 20:52:55 +0530 Subject: [PATCH 14/15] More cleanup --- .../org/apache/druid/server/compaction/CompactionStatus.java | 2 +- .../druid/server/http/CoordinatorCompactionResourceTest.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java index 268f39781cf9..77786993ac99 100644 --- a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java +++ b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java @@ -186,7 +186,7 @@ private static String asString(PartitionsSpec partitionsSpec) } } - public static CompactionStatus skipped(String reasonFormat, Object... args) + static CompactionStatus skipped(String reasonFormat, Object... args) { return new CompactionStatus(State.SKIPPED, StringUtils.format(reasonFormat, args)); } diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java index df0fb07c4325..943f9a6ccf7d 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionResourceTest.java @@ -53,8 +53,7 @@ public class CoordinatorCompactionResourceTest 1, 1, 1, - 1, - null + 1 ); @Before From cc104b9b57ddc5db76dc12e19b5dd96e026b6373 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 4 Apr 2025 21:04:31 +0530 Subject: [PATCH 15/15] Remove unused args --- .../java/org/apache/druid/testing/guice/DruidTestModule.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java index b86c49eed2e0..f71ca38fbf3f 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModule.java @@ -19,8 +19,6 @@ package org.apache.druid.testing.guice; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Supplier; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; @@ -32,7 +30,6 @@ import org.apache.druid.guice.annotations.EscalatedClient; import org.apache.druid.guice.annotations.Self; import org.apache.druid.java.util.common.lifecycle.Lifecycle; -import org.apache.druid.java.util.emitter.core.LoggingEmitterConfig; import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.CredentialedHttpClient; @@ -83,7 +80,7 @@ public HttpClient getHttpClient( @Provides @ManageLifecycle - public ServiceEmitter getServiceEmitter(Supplier config, ObjectMapper jsonMapper) + public ServiceEmitter getServiceEmitter() { // Disabling metric emission since no useful metrics are emitted by the integration testing client // Use a LoggingEmitter here if needed in the future