Skip to content

Enhance Compaction task to be able to write to a different/new datasource#18612

Closed
maytasm wants to merge 1 commit intoapache:masterfrom
maytasm:compaction_to_new_datasource
Closed

Enhance Compaction task to be able to write to a different/new datasource#18612
maytasm wants to merge 1 commit intoapache:masterfrom
maytasm:compaction_to_new_datasource

Conversation

@maytasm
Copy link
Copy Markdown
Contributor

@maytasm maytasm commented Oct 9, 2025

Enhance Compaction task to be able to write to a different/new datasource

Description

Compaction tasks, unlike reindexing (which is native batch + druid input source), are great as they allow user to only specific specs that is needed and can automatically infer unspecified specs from existing segments of the input datasource. For example, if a user doesn't want to change the metrics, they don't have to set the metricsSpec and the Compaction task will correctly create metricsSpec from existing segments.

A limitation of Compaction tasks is that the datasource the task is writing to and reading from has to be the same datasource. There are multiple use cases where we may want to write to a different datasource. For example:

  1. You have an existing datasource A with all your data. You want to overwrite the datasource with a new ingestionSpec modifying x dimensions and y metrics. Before you do, you want to create a backup in case anything goes wrong. You may want to use a Compaction task to read from datasource A and write to datasource A_temp (exact copy without any change in specs) to test out the new ingestionSpec.
  2. You have an existing datasource A with all your data. You may want to do a periodic backup (clone of the table at that state in time). You may want to use a Compaction task to read from datasource A and write to datasource A_20250429 (exact copy without any change in specs). This can be used for audit purpose or as a backup in case an accidental and/or malicious drop/overwrite/modification happen to the datasource A.
  3. You have an existing datasource A with all your data. You have some queries that is not performant on this datasource due to the low rollup ratio (i.e. this datasource has a requirement for fine granularity and/or large number of dimensions). You may want to use a Compaction task to read from datasource A and write to datasource A_optimizedForSomeGroupBy (exact copy with change in dimensionSpec and/or granularitySpec) to power this query. In this case, you wouldn't need to specific the metricsSpec as you are not changing the metricsSpec.

This PR adds a new Property to the Compaction task inputSpec. The new field is dataSource. This new field is not required. When not set, the Compaction task will read and write to the same datasource given by the top-most level dataSource field. When set, the Compaction task will read from inputSpec's dataSource and write to the top-most level dataSource field. I think it make sense for this new field to be in the Compaction task inputSpec as the inputSpec purpose is specifying the input of the task.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions Bot added Area - Batch Ingestion Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Oct 9, 2025
@jtuglu1 jtuglu1 self-requested a review October 9, 2025 06:33
@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Oct 9, 2025

@maytasm , how is this different from the existing re-index capability that reads from a DruidInputSource and writes into a new datasource using an index_parallel task? Will it not achieve the same result?

compact by definition means compaction of the data in the same datasource.

If we want to be able to leverage the exact same capabilities as a CompactionTask, I would advise we create a new task type which extends the CompactionTask rather than update the existing class.
Alternatively, we can try to fill in the gaps in re-indexing using index_parallel.

Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first pass – looks good so far, let's just get some UTs in as well as docs.

Ideally we can add an embedded/IT test as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid writing this if possible

@@ -517,6 +520,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
emitMetric(toolbox.getEmitter(), "ingest/count", 1);

final Map<Interval, DataSchema> intervalDataSchemas = createDataSchemasForIntervals(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we are now ingesting into potentially a new datasource, have we checked whether all associated tasks for ingestion (e.g. lock acquisition, segment alloc) are now working?

this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null;
this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
this.segmentProvider = new SegmentProvider(
this.ioConfig.getInputSpec().getDataSource() == null ? dataSource : this.ioConfig.getInputSpec().getDataSource(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.ioConfig.getInputSpec().getDataSource() == null ? dataSource : this.ioConfig.getInputSpec().getDataSource(),
Configs.getOrDefault(this.ioConfig.getInputSpec().getDataSource(), dataSource)

boolean validateSegments(LockGranularity lockGranularityInUse, List<DataSegment> latestSegments);

/**
* Return the datasource to be used as input to the compaction task.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Return the datasource to be used as input to the compaction task.
* Returns the name of the datasource to ingest the compacted segments to.

.ofTypeCompact()
.context("storeCompactionState", true)
.ioConfig(new CompactionIntervalSpec(Intervals.of("2013-08-31/2013-09-02"), null), false);
.ioConfig(new CompactionIntervalSpec(Intervals.of("2013-08-31/2013-09-02"), null, null), false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add tests where this isn't null

@Override
@Nullable
@JsonProperty
public String getDataSource()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's add some javadoc to this

@jtuglu1
Copy link
Copy Markdown
Contributor

jtuglu1 commented Oct 9, 2025

@maytasm , how is this different from the existing re-index capability that reads from a DruidInputSource and writes into a new datasource using an index_parallel task? Will it not achieve the same result?

@kfaraz I think the intent here is to take advantage of the schema detection, spec-autofill etc. I agree that this probably makes more sense in an index_parallel task with a dataSource input type.

@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Oct 9, 2025

I think the intent here is to take advantage of the schema detection, spec-autofill etc.

Yeah, that's a fair point. Compaction task does provide those useful capabilities out-of-the-box.

I agree that this probably makes more sense in an index_parallel task with a dataSource input type.

👍🏻

@maytasm
Copy link
Copy Markdown
Contributor Author

maytasm commented Oct 9, 2025

@kfaraz I just added more detail to the PR description. Compaction task doesn't require you to specific all the specs (metricsSpec, etc) and can discover from existing segments. Specifying all the specs of a datasource is a big pain as Druid doesn't have the concept of schema/catalog and the schema can evolve/differs between segments. For example, the metricsSpec requires you to specify the CombiningAggregator instead of the original aggregators. All of this is done for you in the Compaction task
The Compaction task is just a layer that create index_parallel task with a dataSource input type. We can have a new task type but I think that's making Druid harder to use/understand...just like adding new runtime properties and tuning configs. You can already use Compaction task for more than Compacting. There are case where users change the schema, drop dimensions, change granularity level, etc. You can even give it a finer segmentGranularity and it will be expanding the datasource
Re-indexing is really just native batch + druid input source. I don't think this is really a gap in reindexing as the native batch supports various other input source. The schema detection, spec-autofill etc here only make sense for only one of those input source, the Druid input source. For compaction task, the input source is always Druid but for native batch it isn't. It would be kinda weird to have native batch support not providing dimensionSpec, metricSpec, etc when the input source is Druid and required it when input source is not. Seems very specific/particular...

@clintropolis
Copy link
Copy Markdown
Member

You have an existing datasource A with all your data. You have some queries that is not performant on this datasource due to the low rollup ratio (i.e. this datasource has a requirement for fine granularity and/or large number of dimensions). You may want to use a Compaction task to read from datasource A and write to datasource A_optimizedForSomeGroupBy (exact copy with change in dimensionSpec and/or granularitySpec) to power this query. In this case, you wouldn't need to specific the metricsSpec as you are not changing the metricsSpec.

drive by comment (still digesting the main idea of this PR), you should check out projections for this use case, which allow building aggregated views inside of segments the same datasource to automatically do more performant queries for any query which can match the projection. Projections can be applied to existing datasources via compaction since #17803. Projections are still experimental, and existing released versions have some bugs which will be fixed in the upcoming 35 release, where we plan to document them as part of that release since they should be a lot more stable.

@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Oct 9, 2025

Yes, @maytasm , I agree that we should be able to leverage the auto-discover capabilities of compaction task for re-indexing too. For that reason, it makes sense to extend that feature.

We can have a new task type but I think that's making Druid harder to use/understand...just like adding new runtime properties and tuning configs.

Yeah, more runtime properties and tuning configs often make Druid harder to use and understand.
In fact, my concern is the same. Overloading the same feature to satisfy completely different use cases
makes things confusing and less maintainable. The question is of intent. There is no reason a user
trying to move data from one DS to another should have to launch a compact task.

Since the use case here is a new capability altogether, there is no harm in adding a new task type or a new input source type, whichever seems simpler to implement.

You can already use Compaction task for more than Compacting. There are case where users change the schema, drop dimensions, change granularity level, etc. You can even give it a finer segmentGranularity and it will be expanding the datasource

Absolutely, this is one of the things we have been discussing, that a compact task should ideally never change the meaning of the data, only how it's laid out/partitioned (the change in this PR would only add to that discrepancy).
Since we have already added this capability, we don't want to get rid of it right now as users may already be using it.
In the future, the compaction templates in #18402 will have capability to validate that a template does not change the
meaning of data and does only "compaction".

The schema detection, spec-autofill etc here only make sense for only one of those input source, the Druid input source. For compaction task, the input source is always Druid but for native batch it isn't.

Oh, absolutely. To clarify, I meant that we should try to bring the auto-detection capabilities of compact task
into native batch + druid input source, not native batch in general. But I suppose that might be more involved to implement, and perhaps an overkill anyway. We might as well just extend the compact task, as that seems simpler.

Also, to add to the suggestions from @clintropolis , you could also consider using an MSQ INSERT/REPLACE statement.
I don't know for sure if they provide all the auto-discover niceties or not but probably worth a shot.

@maytasm
Copy link
Copy Markdown
Contributor Author

maytasm commented Oct 9, 2025

drive by comment (still digesting the main idea of this PR), you should check out #17481 for this use case, which allow building aggregated views inside of segments the same datasource to automatically do more performant queries for any query which can match the projection. Projections can be applied to existing datasources via compaction since #17803. Projections are still experimental, and existing released versions have some bugs which will be fixed in the upcoming 35 release, where we plan to document them as part of that release since they should be a lot more stable.

Ah. I forgot about Projection. Yep, you are right. Projection can achieve the same result as this point too. Although I think Projection doesn't allow you to have a different queryGranularity though

@maytasm
Copy link
Copy Markdown
Contributor Author

maytasm commented Oct 9, 2025

Also, to add to the suggestions from @clintropolis , you could also consider using an MSQ INSERT/REPLACE statement.
I don't know for sure if they provide all the auto-discover niceties or not but probably worth a shot.

I think this actually work and can achieve all the use cases in the PR description. Thanks for the suggestion.
I think it should just be as simple as

INSERT INTO datasource_2 
SELECT * FROM datasource
PARTITIONED BY hour

@maytasm maytasm closed this Oct 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants