Skip to content

Add compaction templates and CompactionJobQueue#18402

Merged
kfaraz merged 34 commits intoapache:masterfrom
kfaraz:cascade_compact_with_templates
Oct 31, 2025
Merged

Add compaction templates and CompactionJobQueue#18402
kfaraz merged 34 commits intoapache:masterfrom
kfaraz:cascade_compact_with_templates

Conversation

@kfaraz
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz commented Aug 14, 2025

Note

A lot of the changes related to compaction template implementations and persisting templates in Druid catalog were once a part of this PR but have been removed until there is consensus on the best approach.

This PR now deals with only refactoring the OverlordCompactionScheduler to use the CompactionJobQueue and other related changes.

Changes

Functionality

  • Reset the compaction job queue every 15 minutes by default.
  • Whenever a compaction task finishes, check if jobs pending in the queue can be launched.
  • When a compaction supervisor is stopped, remove its jobs from the queue.
  • When a compaction supervisor is started (or restarted), create jobs for it and add them to the queue.

Classes

  • Add BatchIndexingJob which may contain either a ClientTaskQuery or a ClientSqlQuery (for MSQ jobs).
  • Add BatchIndexingJobTemplate that can create jobs for a given source and destination
  • Add CompactionConfigBasedJobTemplate which implements CompactionJobTemplate
  • Update CompactionSupervisor to create jobs using templates
  • Add CompactionJobQueue to create and submit compaction jobs to the Overlord

Refactor for reuse

  • Move common code from CompactSegments to CompactionSlotManager, CompactionSnapshotBuilder
  • Update CompactionStatus, CompactionStatusTracker and DataSourceCompactibleSegmentIterator

Future work

  • Have a common BatchIndexingSupervisor which uses templates to create jobs.
    • It could be implemented by ScheduledBatchSupervisor and CompactionSupervisor.
    • This change was originally included in this patch but has been left out to keep the changes small.
  • Reset the job queue (if required) when the cluster-level compaction config changes
  • Allow force reset of the job queue by calling an API per supervisor (e.g. supervisor reset) and overall
  • Cancel tasks which are not aligned with the supervisor spec anymore
  • Do not queue up jobs for recently compacted intervals even if the segment timeline has been updated
  • Determine if any change was made to a time chunk of the segment timeline in the latest metadata store poll

Release note

TODO


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(
config.getTuningConfig(),
config.getMaxRowsPerSegment(),

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
DataSourceCompactionConfig.getMaxRowsPerSegment
should be avoided because it has been deprecated.
if (partitionsSpecFromTuningConfig == null) {
final long maxTotalRows = Configs.valueOrDefault(tuningConfig.getMaxTotalRows(), Long.MAX_VALUE);
return new DynamicPartitionsSpec(tuningConfig.getMaxRowsPerSegment(), maxTotalRows);
final Long maxTotalRows = tuningConfig.getMaxTotalRows();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
ClientCompactionTaskQueryTuningConfig.getMaxTotalRows
should be avoided because it has been deprecated.
final long maxTotalRows = Configs.valueOrDefault(tuningConfig.getMaxTotalRows(), Long.MAX_VALUE);
return new DynamicPartitionsSpec(tuningConfig.getMaxRowsPerSegment(), maxTotalRows);
final Long maxTotalRows = tuningConfig.getMaxTotalRows();
final Integer maxRowsPerSegment = tuningConfig.getMaxRowsPerSegment();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
ClientCompactionTaskQueryTuningConfig.getMaxRowsPerSegment
should be avoided because it has been deprecated.
Copy link
Copy Markdown
Contributor

@uds5501 uds5501 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a qq -

Comment on lines +143 to +150
// Filter out jobs if they are outside the search interval
final List<CompactionJob> validJobs = new ArrayList<>();
for (CompactionJob job : allJobs) {
final Interval compactionInterval = job.getCandidate().getCompactionInterval();
if (searchInterval.contains(compactionInterval)) {
validJobs.add(job);
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would you get jobs outside of the interval? That seems like a problem with the contract or the specific implementation rather than a concern that everybody who ever calls the method needs to apply?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. This should not be needed anymore.

Comment on lines +97 to +101
final BatchIndexingJobTemplate delegate
= resolvedTable.decodeProperty(IndexingTemplateDefn.PROPERTY_PAYLOAD);
if (delegate instanceof CompactionJobTemplate) {
return (CompactionJobTemplate) delegate;
} else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this is an established pattern. But I would never have thought that you are supposed to get the table from the catalog and then pass a magical parameter to "decodeProperty" in order to get it to become an object that is a template.

Why isn't there a method that's like asJobTemplate() or as() or something like that? What's PROPERTY_PAYLOAD and why is it special? Will a decoded property always generate a BatchIndexingJobTemplate? What does "decode property" have to do with generating a BatchIndexingJobTemplate?

Maybe as I read more of the code I'll understand, but only reading the usage-side of this is not very intuitive.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this felt hacky to me too. Unfortunately, the Druid catalog currently only understands a "table" as the top-level object. So, I just stuck to that model for the time being. I have also put in some relevant points in the PR descrioption under "Open questions".

@clintropolis , do you have any suggestions on what would be the preferred approach to store an indexing/compaction template in the catalog?

{
final CompactionJobTemplate delegate = getDelegate();
if (delegate == null) {
return List.of();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would mean that the table doesn't actually exist right? Should we offer some sort of indication that there is a reference to a table that doesn't exist? an exception? a log line? Something that can help figure out that there's an issue with how things are setup?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed to throw a NotFound exception with the proper error message.

Comment on lines +108 to +112
@Override
public String getType()
{
throw new UnsupportedOperationException("This template type cannot be serialized");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is someone supposed to do if this exception gets thrown? Why would it have happened? Are there any hints that can be provided to the developer who sees this and needs to fix it? Also, it should probably be either a DruidException.defensive() or a NotYetImplemented exception.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted to a defensive exception with a better error message.

/**
* Parameters used while creating a {@link CompactionJob} using a {@link CompactionJobTemplate}.
*/
public class CompactionJobParams implements JobParams
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some other code, you are adjsuting the input with a searchInterval in order to limit the time interval seen. When I initially read that, I wondered "isn't that a param? why isn't it on the param object". After seeing this class, I still don't have a good answer, why isn't that a param on the param object?

Copy link
Copy Markdown
Contributor Author

@kfaraz kfaraz Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The search interval seemed like a better fit for the InputSource since it defines the time range of the data that is the input for a job template.

It seemed redundant to include it in CompactionJobParams too since the passed DruidInputSource already contains the search interval.

Comment on lines +51 to +70
@Override
public DateTime getScheduleStartTime()
{
return scheduleStartTime;
}

public ClusterCompactionConfig getClusterCompactionConfig()
{
return clusterCompactionConfig;
}

public SegmentTimeline getTimeline(String dataSource)
{
return timelineProvider.getTimelineForDataSource(dataSource);
}

public CompactionSnapshotBuilder getSnapshotBuilder()
{
return snapshotBuilder;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the right place to find the contract of what these objects are and what they do and why they exist? If I'm just a lowly developer trying to create a new CompactionTask thingie and I'm passing in a CompactionJobParams how do I go about figuring out what the semantics of the things that were given to me are and what I need to do with them?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added javadocs for these methods. Please let me know if they do not seem adequate.

Comment on lines +55 to +59
/**
* Iterates over all eligible compaction jobs in order of their priority.
* A fresh instance of this class must be used in every run of the
* {@link CompactionScheduler}.
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the scale of compaction jobs? Like, how many do we expect this to be iterating at any point in time?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each time chunk (using the target segment granularity) of each datasource would translate to a single compaction job.

On a large cluster with say 1 year of hourly segment data for 10 datasources, this number could easily be 365 * 24 * 10 = ~80k.

Every time the queue is reset, we re-create all of the jobs and check which ones are already done and which ones need to be queued up.
Some of the work is wasteful and we may optimize it in follow up PRs to simply not re-create jobs for intervals which we know to be already compacted.

I have added some metrics to easily monitor the size of the job queue.

Comment on lines +153 to +155
final long segmentPollPeriodSeconds =
segmentManagerConfig.getPollDuration().toStandardDuration().getMillis();
this.schedulePeriodMillis = Math.min(5_000, segmentPollPeriodSeconds);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A 5 second poll is pretty rapid, what's the logic behind the need for such a rapid poll and why it won't be a significant resource burden?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this value has been carried over from the original iteration of the OverlordCompactionScheduler.
I had intended the scheduler to be able to pick up compaction jobs as soon as compaction task slots become available but it is wasteful to recompute the entire queue for that, especially since on large clusters, recomputation of the entire queue may take up to a couple of minutes (seen from coordinator/time metric on large clusters).

We can do the following instead:

  • Keep the schedule period as 5 minutes (or may be even higher?)
  • When the scheduler kicks in, reset the job queue.
  • We already receive a callback whenever a task completes.
  • When a task completes and slots become available, check if there is any pending job still in the queue from the last scheduled run. If there is a pending job, launch that.

Please let me know if this makes sense.

/**
* Provides parameters required to create a {@link BatchIndexingJob}.
*/
public interface JobParams
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why interface instead of class? It seems like this interface is very likely to only ever carry getters and it's unclear to me why it's important that different classes can implement this instead of just having a reference to one of these lying around.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, made JobParams a concrete class.
We would still need to have a CompactionJobParams which extends JobParams since compaction job templates use some extra stuff like CompactionSnapshotBuilder and ClusterCompactionConfig.

Copy link
Copy Markdown
Contributor

@capistrant capistrant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking really cool. minor comments/questions

* separate to allow:
* <ul>
* <li>fields to be nullable so that only non-null fields are used for matching</li>
* <li>legacy "compaction-incompatible" fields to be removed</li>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be me coming late to the party on compaction. For my sake, could you elaborate on this list item to get me up to speed. I guess I'm unclear on what you refer to as compaction-incompatible fields

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the javadoc to clarify this point. But it mostly refers to things like the transformSpec which can filter out data or even the dimensionsSpec where we may choose to drop some dimensions and thus cause an irreversible aggregation of the data.

(Side note: the more desirable way to achieve this pre-aggregation to improve query perf would be to use projections).

return jobs;
}

private ClientSqlQuery createQueryForJob(String dataSource, Interval compactionInterval)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering aloud on if there is a way to make the query and formatting more flexible/extensible if folks want to use more than these standard format vars. Perhaps if someone wanted to do some filtering during compaction? Or would you suggest that if a person desires that, that it would be in some custom template type that they roll on their own?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess since this is just inline, the filter could be defined inline. But if someone changed the query after it existed for some time that filter wouldn't be re-applied to already compacted segments since it is not persisted in the target state.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the filter can simply be included in the SQL.

But if someone changed the query after it existed for some time that filter wouldn't be re-applied to already compacted segments since it is not persisted in the target state.

Yes, that is by design. We don't want compaction to be triggered as long as the compaction state has not changed.
This ensures that every minor tweak made to the SQL query does not end up recompacting everything.

@@ -0,0 +1,191 @@
/*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I was playing with this on my local I accidentally got into a state where I had defined an expected granularity of MONTH, but the query in the template was DAY. It looked like it just went into an infinite compact loop. Could we force the templated query to honor segment granularity from the state matcher by injecting it? I guess this forces everyone creating a rule to select a segment gran though, which is probably not desired.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allowing the state matcher to be distinct from the query is by design as it allows us to make improvements the query without worrying about all the intervals being recompacted when not desired.

For the time being, it is upto the user to ensure that the state matcher and the query are compatible with each other.
In future versions, we will include some kind of template validation.

@kfaraz
Copy link
Copy Markdown
Contributor Author

kfaraz commented Oct 30, 2025

Since there is a lack of clarity on how the catalog would be used to persist compaction templates, the catalog bits have been removed from this PR for the time being.

For posterity, some possible options for storing compaction templates are:

a. As a table inside a new schema index_template(currently used in this PR)
b. OR as a table inside the druid schema: Currently used for datasources only
c. OR as a single row inside sys.templates: probably not preferable since the catalog models everything as tables and their properties, but this would be neither.

Note: In all of the above cases, the template is always physically stored as a single row in druid_tableDefs in the metadata store.

d. A separate metadata table druid_indexingTemplates. The schema used to access the contents would still be one of a, b, or c.

@kfaraz kfaraz changed the title Add catalog templates to power cascading compaction Add compaction templates and CompactionJobQueue Oct 30, 2025
Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@kfaraz kfaraz merged commit 30d98b0 into apache:master Oct 31, 2025
51 checks passed
@kfaraz kfaraz deleted the cascade_compact_with_templates branch October 31, 2025 02:14
@kfaraz
Copy link
Copy Markdown
Contributor Author

kfaraz commented Oct 31, 2025

Thanks for the reviews, @capistrant , @gianm , @uds5501 , @cheddar !

@kgyrtkirk kgyrtkirk added this to the 36.0.0 milestone Jan 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants