Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a317db0
Add API to get cluster compaction config
kfaraz Mar 26, 2025
006ece9
Return non-null compaction snapshot from scheduler
kfaraz Mar 26, 2025
295deab
Fix test
kfaraz Mar 26, 2025
b7a37aa
Use supervisors with an IT
kfaraz Mar 26, 2025
d9efc7b
Add delay in IT
kfaraz Mar 26, 2025
872bfb6
Add coverage for FixedIntervalOrderPolicy
kfaraz Mar 26, 2025
179f361
Add some new APIs, allow for smooth transition
kfaraz Mar 27, 2025
406f0c1
Add more compaction APIs
kfaraz Mar 28, 2025
b86d286
Re-organize compaction APIs
kfaraz Mar 30, 2025
0e2d40c
Remove unused methods
kfaraz Mar 30, 2025
8ba7720
Cleanup
kfaraz Mar 30, 2025
a912201
Fix APIs and dependencies
kfaraz Mar 30, 2025
eba67d6
Add tests, handle filters
kfaraz Mar 31, 2025
07f45b2
Fix /status/datasources API
kfaraz Apr 1, 2025
7e33720
Fix up the ITs
kfaraz Apr 1, 2025
43ea46f
Merge branch 'master' of github.com:apache/druid into fix_compaction_…
kfaraz Apr 1, 2025
a127f46
Fix and rerun ITs
kfaraz Apr 1, 2025
c78595c
Use new APIs in ITs
kfaraz Apr 1, 2025
3cc2efc
Yet another attempt at the fix
kfaraz Apr 1, 2025
baf9fad
Merge branch 'master' of github.com:apache/druid into fix_compaction_…
kfaraz Apr 2, 2025
16cd7f4
Fix compilation after upstream changes
kfaraz Apr 2, 2025
ca81071
Add docs for new compaction APIs
kfaraz Apr 2, 2025
6997889
Minor typo
kfaraz Apr 2, 2025
05d310c
Add check for MSQ engine
kfaraz Apr 2, 2025
0d7d9e7
Remove unused import
kfaraz Apr 2, 2025
bf10f7e
Merge branch 'master' of github.com:apache/druid into fix_compaction_…
kfaraz Apr 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
823 changes: 727 additions & 96 deletions docs/api-reference/automatic-compaction-api.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,46 @@
*/
public interface CompactionScheduler
{
void start();
void becomeLeader();

void stop();
void stopBeingLeader();
Comment thread
kfaraz marked this conversation as resolved.

/**
* @return true if the scheduler is enabled i.e. when
* {@link DruidCompactionConfig#isUseSupervisors()} is true.
*/
boolean isEnabled();

/**
* @return true if the scheduler is currently running and submitting compaction
* tasks.
*/
boolean isRunning();

CompactionConfigValidationResult validateCompactionConfig(DataSourceCompactionConfig compactionConfig);

/**
* Starts compaction for a datasource if not already running.
*/
void startCompaction(String dataSourceName, DataSourceCompactionConfig compactionConfig);

/**
* Stops compaction for a datasource if currently running.
*/
void stopCompaction(String dataSourceName);

Map<String, AutoCompactionSnapshot> getAllCompactionSnapshots();

/**
* @return Non-null snapshot of the current status of compaction for the datasource.
*/
AutoCompactionSnapshot getCompactionSnapshot(String dataSource);

/**
* Simulates a compaction run with the given cluster config.
*
* @return Result of the simulation
*/
CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionConfig updateRequest);

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public SupervisorReport<AutoCompactionSnapshot> getStatus()
final AutoCompactionSnapshot snapshot;
if (supervisorSpec.isSuspended()) {
snapshot = AutoCompactionSnapshot.builder(dataSource)
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
.withStatus(AutoCompactionSnapshot.ScheduleStatus.NOT_ENABLED)
.build();
} else if (!supervisorSpec.getValidationResult().isValid()) {
snapshot = AutoCompactionSnapshot.builder(dataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class CompactionSupervisorSpec implements SupervisorSpec
{
Expand All @@ -41,6 +42,11 @@ public class CompactionSupervisorSpec implements SupervisorSpec
private final CompactionScheduler scheduler;
private final CompactionConfigValidationResult validationResult;

public static String getSupervisorIdForDatasource(String dataSource)
{
return ID_PREFIX + dataSource;
}

@JsonCreator
public CompactionSupervisorSpec(
@JsonProperty("spec") DataSourceCompactionConfig spec,
Expand Down Expand Up @@ -70,7 +76,7 @@ public boolean isSuspended()
@Override
public String getId()
{
return ID_PREFIX + spec.getDataSource();
return getSupervisorIdForDatasource(spec.getDataSource());
}

public CompactionConfigValidationResult getValidationResult()
Expand Down Expand Up @@ -113,4 +119,23 @@ public String getSource()
{
return "";
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompactionSupervisorSpec that = (CompactionSupervisorSpec) o;
return suspended == that.suspended && Objects.equals(spec, that.spec);
}

@Override
public int hashCode()
{
return Objects.hash(suspended, spec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
Expand Down Expand Up @@ -103,6 +105,7 @@ public class OverlordCompactionScheduler implements CompactionScheduler
*/
private final TaskRunnerListener taskRunnerListener;

private final AtomicBoolean isLeader = new AtomicBoolean(false);
private final AtomicBoolean started = new AtomicBoolean(false);
private final CompactSegments duty;

Expand Down Expand Up @@ -165,29 +168,36 @@ public void statusChanged(String taskId, TaskStatus status)
};
}

@LifecycleStart
public synchronized void start()
{
// Do nothing
}

@LifecycleStop
public synchronized void stop()
{
executor.shutdownNow();
}

@Override
public void start()
public void becomeLeader()
{
if (isEnabled() && started.compareAndSet(false, true)) {
log.info("Starting compaction scheduler.");
initState();
scheduleOnExecutor(this::scheduledRun);
if (isLeader.compareAndSet(false, true)) {
scheduleOnExecutor(this::scheduledRun, SCHEDULE_PERIOD_SECONDS);
}
}

@Override
public void stop()
public void stopBeingLeader()
{
if (isEnabled() && started.compareAndSet(true, false)) {
log.info("Stopping compaction scheduler.");
cleanupState();
}
isLeader.set(false);
}

@Override
public boolean isRunning()
{
return isEnabled() && started.get();
return started.get();
}

@Override
Expand Down Expand Up @@ -220,8 +230,16 @@ public void stopCompaction(String dataSourceName)
statusTracker.removeDatasource(dataSourceName);
}

/**
* Initializes scheduler state if required.
*/
private synchronized void initState()
{
if (!started.compareAndSet(false, true)) {
return;
}

log.info("Starting compaction scheduler.");
final Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (taskRunnerOptional.isPresent()) {
taskRunnerOptional.get().registerListener(taskRunnerListener, Execs.directExecutor());
Expand All @@ -231,8 +249,16 @@ private synchronized void initState()
}
}

/**
* Cleans up scheduler state if required.
*/
private synchronized void cleanupState()
{
if (!started.compareAndSet(true, false)) {
return;
}

log.info("Stopping compaction scheduler.");
final Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (taskRunnerOptional.isPresent()) {
taskRunnerOptional.get().unregisterListener(taskRunnerListener.getListenerId());
Expand All @@ -251,21 +277,36 @@ public boolean isEnabled()
return compactionConfigSupplier.get().isUseSupervisors();
}

/**
* Periodic task which runs the compaction duty if we are leader and
* useSupervisors is true. Otherwise, the scheduler state is cleaned up.
*/
private synchronized void scheduledRun()
{
if (isRunning()) {
if (!isLeader.get()) {
cleanupState();
return;
}

if (isEnabled()) {
initState();
try {
runCompactionDuty();
}
catch (Exception e) {
log.error(e, "Error processing compaction queue. Continuing schedule.");
}
scheduleOnExecutor(this::scheduledRun);
scheduleOnExecutor(this::scheduledRun, SCHEDULE_PERIOD_SECONDS);
} else {
cleanupState();
scheduleOnExecutor(this::scheduledRun, SCHEDULE_PERIOD_SECONDS * 4);
}
}

/**
* Runs the compaction duty and emits stats if {@link #METRIC_EMISSION_PERIOD}
* has elapsed.
*/
private synchronized void runCompactionDuty()
{
final CoordinatorRunStats stats = new CoordinatorRunStats();
Expand All @@ -291,7 +332,22 @@ private synchronized void runCompactionDuty()
@Override
public AutoCompactionSnapshot getCompactionSnapshot(String dataSource)
{
return duty.getAutoCompactionSnapshot(dataSource);
if (!activeDatasourceConfigs.containsKey(dataSource)) {
return AutoCompactionSnapshot.builder(dataSource)
.withStatus(AutoCompactionSnapshot.ScheduleStatus.NOT_ENABLED)
.build();
}

final AutoCompactionSnapshot snapshot = duty.getAutoCompactionSnapshot(dataSource);
if (snapshot == null) {
final AutoCompactionSnapshot.ScheduleStatus status =
isEnabled()
? AutoCompactionSnapshot.ScheduleStatus.AWAITING_FIRST_RUN
: AutoCompactionSnapshot.ScheduleStatus.NOT_ENABLED;
return AutoCompactionSnapshot.builder(dataSource).withStatus(status).build();
} else {
return snapshot;
}
}

@Override
Expand Down Expand Up @@ -336,7 +392,7 @@ private DataSourcesSnapshot getDatasourceSnapshot()
return segmentManager.getSnapshotOfDataSourcesWithAllUsedSegments();
}

private void scheduleOnExecutor(Runnable runnable)
private void scheduleOnExecutor(Runnable runnable, long delaySeconds)
{
executor.schedule(
() -> {
Expand All @@ -347,7 +403,7 @@ private void scheduleOnExecutor(Runnable runnable)
log.error(t, "Error while executing runnable");
}
},
SCHEDULE_PERIOD_SECONDS,
delaySeconds,
TimeUnit.SECONDS
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void stop()
public void start()
{
taskMaster.becomeFullLeader();
compactionScheduler.start();
compactionScheduler.becomeLeader();
scheduledBatchTaskManager.start();

// Announce the node only after all the services have been initialized
Expand All @@ -181,7 +181,7 @@ public void stop()
{
serviceAnnouncer.unannounce(node);
scheduledBatchTaskManager.stop();
compactionScheduler.stop();
compactionScheduler.stopBeingLeader();
taskMaster.downgradeToHalfLeader();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.overlord.http;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.server.coordinator.DataSourceCompactionConfigAuditEntry;

import java.util.List;
import java.util.Objects;

public class CompactionConfigHistoryResponse
Comment thread
adarshsanjeev marked this conversation as resolved.
{
private final List<DataSourceCompactionConfigAuditEntry> entries;

public CompactionConfigHistoryResponse(
@JsonProperty("entries") List<DataSourceCompactionConfigAuditEntry> entries
)
{
this.entries = entries;
}

@JsonProperty
public List<DataSourceCompactionConfigAuditEntry> getEntries()
{
return entries;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompactionConfigHistoryResponse that = (CompactionConfigHistoryResponse) o;
return Objects.equals(entries, that.entries);
}

@Override
public int hashCode()
{
return Objects.hashCode(entries);
}
}
Loading
Loading