Auto-Compaction using Multi-Stage Query Engine#16291
Conversation
# Conflicts: # indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java # server/src/main/java/org/apache/druid/server/coordinator/CoordinatorCompactionConfig.java
@gargvishesh , I concur with @LakshSingla on this one. We need UTs to test the entire flow with MSQ the same way we are doing with native. We may add ITs too but most Druid devs rely on UTs more heavily as the IT flow is a little flaky currently.
|
| if (ArrayUtils.isEmpty(configuredMetricsSpec)) { | ||
| return COMPLETE; | ||
| } | ||
| final AggregatorFactory[] configuredMetricsCombiningFactorySpec = |
There was a problem hiding this comment.
Why are we comparing just the combining factory now and not the original AggregatorFactory itself? Please add comments (and tests, if not already added) to clarity this change.
There was a problem hiding this comment.
Added it as a javadoc for the function
There was a problem hiding this comment.
I would advise retaining the old version of the code and doing this change in a follow up. In the current PR, I would prefer it if we didn't modify any of the logic for native compaction. What happens if we just stick to the old logic?
There was a problem hiding this comment.
With the last change to unsupport problematic aggregator factory definitions, this change is not required. Have reverted it.
|
@kfaraz @LakshSingla |
There was a problem hiding this comment.
Please resolve merge conflicts in the PR.
It has been introduced by a recent NPE bugfix in #16713 .
I think this bugfix is also included in the current PR but I decided merged it separately as it has been reported in community in a couple of places.
Sorry for the inconvenience!
# Conflicts: # server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
kfaraz
left a comment
There was a problem hiding this comment.
Thanks a lot for taking this PR to completion, @gargvishesh !
You have been a great sport with addressing the multiple rounds of feedback. 🚀
|
Thank you @kfaraz, @LakshSingla and @cryptoe for the time and effort to do all those rounds of reviews, esp. given the size of the PR. |
Follow-up to #16291, this commit enables a subset of existing native compaction ITs on the MSQ engine. In the process, the following changes have been introduced in the MSQ compaction flow: - Populate `metricsSpec` in `CompactionState` from `querySpec` in `MSQControllerTask` instead of `dataSchema` - Add check for pre-rolled-up segments having `AggregatorFactory` with different input and output column names - Fix passing missing cluster-by clause in scan queries - Add annotation of `CompactionState` to tombstone segments
Description: Compaction operations issued by the Coordinator currently run using the native query engine. As majority of the advancements that we are making in batch ingestion are in MSQ, it is imperative that we support compaction on MSQ to make Compaction more robust and possibly faster. For instance, we have seen OOM errors in native compaction that MSQ could have handled by its auto-calculation of tuning parameters. This commit enables compaction on MSQ to remove the dependency on native engine. Main changes: * `DataSourceCompactionConfig` now has an additional field `engine` that can be one of `[native, msq]` with `native` being the default. * if engine is MSQ, `CompactSegments` duty assigns all available compaction task slots to the launched `CompactionTask` to ensure full capacity is available to MSQ. This is to avoid stalling which could happen in case a fraction of the tasks were allotted and they eventually fell short of the number of tasks required by the MSQ engine to run the compaction. * `ClientCompactionTaskQuery` has a new field `compactionRunner` with just one `engine` field. * `CompactionTask` now has `CompactionRunner` interface instance with its implementations `NativeCompactinRunner` and `MSQCompactionRunner` in the `druid-multi-stage-query` extension. The objectmapper deserializes `ClientCompactionRunnerInfo` in `ClientCompactionTaskQuery` to the `CompactionRunner` instance that is mapped to the specified type [`native`, `msq`]. * `CompactTask` uses the `CompactionRunner` instance it receives to create the indexing tasks. * `CompactionTask` to `MSQControllerTask` conversion logic checks whether metrics are present in the segment schema. If present, the task is created with a native group-by query; if not, the task is issued with a scan query. The `storeCompactionState` flag is set in the context. * Each created `MSQControllerTask` is launched in-place and its `TaskStatus` tracked to determine the final status of the `CompactionTask`. The id of each of these tasks is the same as that of `CompactionTask` since otherwise, the workers will be unable to determine the controller task's location for communication (as they haven't been launched via the overlord).
Follow-up to apache#16291, this commit enables a subset of existing native compaction ITs on the MSQ engine. In the process, the following changes have been introduced in the MSQ compaction flow: - Populate `metricsSpec` in `CompactionState` from `querySpec` in `MSQControllerTask` instead of `dataSchema` - Add check for pre-rolled-up segments having `AggregatorFactory` with different input and output column names - Fix passing missing cluster-by clause in scan queries - Add annotation of `CompactionState` to tombstone segments
Description
Compaction operations issued by the Coordinator currently run using the native query engine. As majority of the advancements that we are making in batch ingestion are in MSQ, it is imperative that we support compaction on MSQ to make Compaction more robust and possibly faster. For instance, we have seen OOM errors in native compaction that MSQ could have handled by its auto-calculation of tuning parameters.
This PR enables compaction on MSQ to remove the dependency on native engine.
Main changes:
DataSourceCompactionConfignow has an additional fieldenginethat can be one among[native, msq]withnativebeing the default.CompactSegmentsduty assigns all available compaction task slots to the launchedCompactionTaskto ensure full capacity is available to MSQ. This is to avoid stalling which could happen in case a fraction of the tasks were allotted and they eventually fell short of the number of tasks required by the MSQ engine to run the compaction.ClientCompactionTaskQueryhas a new fieldClientCompactionRunnerInfowith just oneEnginesubfield.CompactionTasknow hasCompactionRunnerinterface instance with its implementationsNativeCompactinRunnerin core andMSQCompactionRunnerin thedruid-multi-stage-queryextension. . The objectmapper deserializesClientCompactionRunnerInfoinClientCompactionTaskQueryto theCompactionRunnerinstance that is mapped to the specified type [native,msq].CompactTaskuses theCompactionRunnerinstance it receives to create the indexing tasks.CompactionTasktoMSQControllerTaskconversion logic checks whether metrics are present in the segment schema. If present, the task is created with a native group-by query; if not, the task is issued with a scan query. ThestoreCompactionStateflag is set in the context.MSQControllerTaskis launched in-place and itsTaskStatustracked to determine the final status ofthe
CompactionTask. The id of each of these tasks is the same as that ofCompactionTasksince otherwise, the workers will be unable to determine the controller task's location for communication (as they haven't been launched via the overlord).Some things to note:
DataSourceCompactionConfigis passed as is to the MSQControllerTask and hence can contain MSQ context params as well, with the exception ofrowsPerSegment-- which will be overridden by eithertargetRowsPerSegmentormaxRowsPerSegmentif specified in apartitionsSpec.maxRowsInMemoryparam is only considered if specified in the context. The value in DataSourceCompactionConfig.tuningConfig is not considered as it is set to a default value (1M) if unspecified by a user, so it is indistinguishable between coming from the user or via the default.maxNumTasksvalue is specified in thetaskContext,min(availableCompactionTaskSlots, 5)is allotted to MSQ compaction tasks.rollup:truewithout anymetricsSpecde-duplicates rows since all columns are then treated as dimensions -- just as in native compaction.Currently unsupported for MSQ Compaction:
nativewhich can be updated at a per-datasource level. The cluster-level update API will come in a follow-up PR.groupByEnableMultiValueUnnestingis disabled. Only array-type columns are supportedpartitionsSpecof typeHashedParititionsSpec. OnlyDimensionRangePartitionsSpecandDynamicPartitionsSpecworks.maxTotalRowsinDynamicPartitionsSpec. OnlymaxRowsPerSegmentworks.rollupset to false ingranularitySpecwhenmetricsSpecis specified. Onlyrollupset totrueworks with a non-emptymetricsSpec.metricsSpecwherefieldName != nameOR (internal)aggregatorFactory.class() != aggregatorFactory.getCombiningFactory.class(). This is a major limitation that will be addressed in a follow-up PRRelease note
Key changed/added classes in this PR
MyFooOurBarTheirBazThis PR has: