Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7a3f46d
Add catalog templates for cascading compaction
kfaraz Aug 14, 2025
ce0b28f
Remove TODOs
kfaraz Aug 14, 2025
fa566ce
Fix up tests
kfaraz Aug 14, 2025
f3e19d2
Add template for creating MSQ compaction jobs
kfaraz Aug 14, 2025
939e96a
Add field CompactionCandidate.compactionInterval
kfaraz Aug 15, 2025
0f96366
Add more test coverage for templates
kfaraz Aug 16, 2025
032729e
Adjust rule boundaries to ensure maximum compaction
kfaraz Aug 17, 2025
34af71e
Merge branch 'master' of github.com:apache/druid into cascade_compact…
kfaraz Aug 17, 2025
6dea564
Fix tests, comments
kfaraz Aug 17, 2025
69a5425
Add test with mixed templates
kfaraz Aug 17, 2025
db2aebd
Clean up API for CompactionJobTemplate, fix test
kfaraz Aug 17, 2025
3e084bf
Use SQL to query template definitions
kfaraz Aug 17, 2025
45fe7cb
Add CatalogCoreModule to quidem test setup
kfaraz Aug 18, 2025
ec51aec
Fix SqlModuleTest
kfaraz Aug 18, 2025
03cefa5
Merge branch 'master' of github.com:apache/druid into cascade_compact…
kfaraz Aug 21, 2025
e0aa77d
Merge branch 'master' of github.com:apache/druid into cascade_compact…
kfaraz Sep 6, 2025
59ec104
Merge branch 'master' of github.com:apache/druid into cascade_compact…
kfaraz Sep 29, 2025
fcac675
Add javadocs, simplify JobParams
kfaraz Oct 1, 2025
52741a9
Merge branch 'master' of github.com:apache/druid into cascade_compact…
kfaraz Oct 6, 2025
4e53ab0
Merge branch 'master' of github.com:apache/druid into cascade_compact…
kfaraz Oct 27, 2025
bd490e1
Minor fixes
kfaraz Oct 27, 2025
ae17f67
Minor fixes
kfaraz Oct 27, 2025
b6423f8
Add metric, remove redundant checks
kfaraz Oct 27, 2025
e0d919c
Merge branch 'master' of github.com:apache/druid into cascade_compact…
kfaraz Oct 27, 2025
c830ced
Update compaction status when job finishes
kfaraz Oct 30, 2025
2263cb6
Merge branch 'master' of github.com:apache/druid into cascade_compact…
kfaraz Oct 30, 2025
e6c68b3
Remove catalog changes
kfaraz Oct 30, 2025
558e0b5
Remove extra changes
kfaraz Oct 30, 2025
d8ce2a7
Fix up test, remove extra files
kfaraz Oct 30, 2025
a67f408
Update javadocs
kfaraz Oct 30, 2025
738f4f8
Fix up tests
kfaraz Oct 30, 2025
5fcdc43
Merge branch 'master' of github.com:apache/druid into cascade_compact…
kfaraz Oct 30, 2025
1be4e04
Remove new template implementations
kfaraz Oct 30, 2025
3cbf955
Remove extra file
kfaraz Oct 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@

import com.google.common.collect.ImmutableList;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
import org.apache.druid.server.compaction.CompactionSegmentIterator;
import org.apache.druid.server.compaction.CompactionStatusTracker;
import org.apache.druid.server.compaction.NewestSegmentFirstPolicy;
import org.apache.druid.server.compaction.PriorityBasedCompactionSegmentIterator;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -137,8 +135,7 @@ public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
policy,
compactionConfigs,
dataSources,
Collections.emptyMap(),
new CompactionStatusTracker(new DefaultObjectMapper())
Collections.emptyMap()
);
for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
blackhole.consume(iterator.next());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.embedded.compact;

import org.apache.druid.catalog.guice.CatalogClientModule;
import org.apache.druid.catalog.guice.CatalogCoordinatorModule;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.guice.IndexerMemoryManagementModule;
import org.apache.druid.msq.guice.MSQDurableStorageModule;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.guice.MSQSqlModule;
import org.apache.druid.msq.guice.SqlTaskModule;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedHistorical;
import org.apache.druid.testing.embedded.EmbeddedIndexer;
import org.apache.druid.testing.embedded.EmbeddedOverlord;
import org.apache.druid.testing.embedded.EmbeddedRouter;
import org.apache.druid.testing.embedded.indexing.MoreResources;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Period;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.Map;

/**
* Embedded test that runs compaction supervisors of various types.
*/
public class CompactionSupervisorTest extends EmbeddedClusterTestBase
{
private final EmbeddedBroker broker = new EmbeddedBroker();
private final EmbeddedIndexer indexer = new EmbeddedIndexer()
.setServerMemory(2_000_000_000L)
.addProperty("druid.worker.capacity", "20");
private final EmbeddedOverlord overlord = new EmbeddedOverlord()
.addProperty("druid.manager.segments.pollDuration", "PT1s")
.addProperty("druid.manager.segments.useIncrementalCache", "always");
private final EmbeddedHistorical historical = new EmbeddedHistorical();
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator()
.addProperty("druid.manager.segments.useIncrementalCache", "always");

@Override
public EmbeddedDruidCluster createCluster()
{
return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
.useDefaultTimeoutForLatchableEmitter(600)
.addExtensions(
CatalogClientModule.class,
CatalogCoordinatorModule.class,
IndexerMemoryManagementModule.class,
MSQDurableStorageModule.class,
MSQIndexingModule.class,
MSQSqlModule.class,
SqlTaskModule.class
)
.addServer(coordinator)
.addServer(overlord)
.addServer(indexer)
.addServer(historical)
.addServer(broker)
.addServer(new EmbeddedRouter());
}

@BeforeAll
public void enableCompactionSupervisors()
{
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 100, null, true, null))
);
Assertions.assertTrue(updateResponse.isSuccess());
}

@Test
public void test_ingestDayGranularity_andCompactToMonthGranularity_withInlineConfig()
{
// Ingest data at DAY granularity and verify
runIngestionAtGranularity(
"DAY",
"2025-06-01T00:00:00.000Z,shirt,105"
+ "\n2025-06-02T00:00:00.000Z,trousers,210"
+ "\n2025-06-03T00:00:00.000Z,jeans,150"
);
Assertions.assertEquals(3, getNumSegmentsWith(Granularities.DAY));

// Create a compaction config with MONTH granularity
InlineSchemaDataSourceCompactionConfig compactionConfig =
InlineSchemaDataSourceCompactionConfig
.builder()
.forDataSource(dataSource)
.withSkipOffsetFromLatest(Period.seconds(0))
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null)
)
.build();

runCompactionWithSpec(compactionConfig);
waitForAllCompactionTasksToFinish();

Assertions.assertEquals(0, getNumSegmentsWith(Granularities.DAY));
Assertions.assertEquals(1, getNumSegmentsWith(Granularities.MONTH));
}

private void runCompactionWithSpec(DataSourceCompactionConfig config)
{
final CompactionSupervisorSpec compactionSupervisor
= new CompactionSupervisorSpec(config, false, null);
cluster.callApi().postSupervisor(compactionSupervisor);
}

private void waitForAllCompactionTasksToFinish()
{
// Wait for all intervals to be compacted
overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName("interval/waitCompact/count")
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
.hasValueMatching(Matchers.equalTo(0L))
);

// Wait for all submitted compaction jobs to finish
int numSubmittedTasks = overlord.latchableEmitter().getMetricValues(
"compact/task/count",
Map.of(DruidMetrics.DATASOURCE, dataSource)
).stream().mapToInt(Number::intValue).sum();

final Matcher<Object> taskTypeMatcher = Matchers.anyOf(
Matchers.equalTo("query_controller"),
Matchers.equalTo("compact")
);
overlord.latchableEmitter().waitForEventAggregate(
event -> event.hasMetricName("task/run/time")
.hasDimensionMatching(DruidMetrics.TASK_TYPE, taskTypeMatcher)
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
agg -> agg.hasCountAtLeast(numSubmittedTasks)
);
}

private int getNumSegmentsWith(Granularity granularity)
{
return (int) overlord
.bindings()
.segmentsMetadataStorage()
.retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
.stream()
.filter(segment -> granularity.isAligned(segment.getInterval()))
.count();
}

private void runIngestionAtGranularity(
String granularity,
String inlineDataCsv
)
{
final IndexTask task = MoreResources.Task.BASIC_INDEX
.get()
.segmentGranularity(granularity)
.inlineInputSourceWithData(inlineDataCsv)
.dataSource(dataSource)
.withId(IdUtils.getRandomId());
cluster.callApi().runTask(task, overlord);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void test_ingestClusterMetrics_withConcurrentCompactionSupervisor_andSkip
event -> event.hasMetricName("task/run/time")
.hasDimension(DruidMetrics.TASK_TYPE, "compact")
.hasDimension(DruidMetrics.TASK_STATUS, "SUCCESS"),
agg -> agg.hasCountAtLeast(2)
agg -> agg.hasCountAtLeast(10)
);

// Verify that some segments have been upgraded due to Concurrent Append and Replace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.testing.embedded.EmbeddedRouter;
import org.apache.druid.testing.embedded.indexing.Resources;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.hamcrest.Matchers;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -128,7 +129,7 @@ public void test_switchLeader_andVerifyUsingSysTables()
coordinator1.latchableEmitter().waitForEvent(
event -> event.hasMetricName("segment/metadataCache/used/count")
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
.hasValueAtLeast(10)
.hasValueMatching(Matchers.greaterThanOrEqualTo(10L))
);

// Run sys queries, switch leaders, repeat
Expand Down
5 changes: 5 additions & 0 deletions extensions-core/druid-catalog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.server.coordinator.CatalogDataSourceCompactionConfig;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
Expand All @@ -52,7 +53,8 @@
public class CatalogCompactionTest extends EmbeddedClusterTestBase
{
private final EmbeddedOverlord overlord = new EmbeddedOverlord()
.addProperty("druid.catalog.client.maxSyncRetries", "0");
.addProperty("druid.catalog.client.maxSyncRetries", "0")
.addProperty("druid.manager.segments.pollDuration", "PT1s");
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator()
.addProperty("druid.manager.segments.useIncrementalCache", "always");
private final EmbeddedBroker broker = new EmbeddedBroker()
Expand Down Expand Up @@ -119,7 +121,8 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity()
// Wait for compaction to finish
overlord.latchableEmitter().waitForEvent(
event -> event.hasMetricName("task/run/time")
.hasDimension("taskType", "compact")
.hasDimension(DruidMetrics.TASK_TYPE, "compact")
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
);

// Verify that segments are now compacted to MONTH granularity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -95,8 +94,8 @@
import org.apache.druid.segment.transform.CompactionTransformSpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.compaction.CompactionSlotManager;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -133,7 +132,7 @@
*/
public class CompactionTask extends AbstractBatchIndexTask implements PendingSegmentAllocatingTask
{
public static final String TYPE = "compact";
public static final String TYPE = CompactionSlotManager.COMPACTION_TASK_TYPE;
private static final Logger log = new Logger(CompactionTask.class);

/**
Expand All @@ -148,10 +147,6 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg
*/
public static final String CTX_KEY_APPENDERATOR_TRACKING_TASK_ID = "appenderatorTrackingTaskId";

static {
Verify.verify(TYPE.equals(CompactSegments.COMPACTION_TASK_TYPE));
}

private final CompactionIOConfig ioConfig;
@Nullable
private final DimensionsSpec dimensionsSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public class Tasks
public static final long DEFAULT_SUB_TASK_TIMEOUT_MILLIS = 0;
public static final boolean DEFAULT_FORCE_TIME_CHUNK_LOCK = true;
public static final boolean DEFAULT_STORE_COMPACTION_STATE = false;
public static final boolean DEFAULT_USE_MAX_MEMORY_ESTIMATES = false;
public static final TaskLockType DEFAULT_TASK_LOCK_TYPE = TaskLockType.EXCLUSIVE;
public static final boolean DEFAULT_USE_CONCURRENT_LOCKS = false;

Expand Down
Loading
Loading