diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md
index 19b1740b9d4d..8c0e0ff15b37 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -399,6 +399,7 @@ The following table lists the context parameters for the MSQ task engine:
| `rowsPerPage` | SELECT
The number of rows per page to target. The actual number of rows per page may be somewhat higher or lower than this number. In most cases, use the default. This property comes into effect only when `selectDestination` is set to `durableStorage` | 100000 |
| `skipTypeVerification` | INSERT or REPLACE
During query validation, Druid validates that [string arrays](../querying/arrays.md) and [multi-value dimensions](../querying/multi-value-dimensions.md) are not mixed in the same column. If you are intentionally migrating from one to the other, use this context parameter to disable type validation.
Provide the column list as comma-separated values or as a JSON array in string form.| empty list |
| `failOnEmptyInsert` | INSERT or REPLACE
When set to false (the default), an INSERT query generating no output rows will be no-op, and a REPLACE query generating no output rows will delete all data that matches the OVERWRITE clause. When set to true, an ingest query generating no output rows will throw an `InsertCannotBeEmpty` fault. | `false` |
+| `storeCompactionState` | REPLACE
When set to true, a REPLACE query stores as part of each segment's metadata a `lastCompactionState` field that captures the various specs used to create the segment. Future compaction jobs skip segments whose `lastCompactionState` matches the desired compaction state. Works the same as [`storeCompactionState`](../ingestion/tasks.md#context-parameters) task context flag. | `false` |
## Joins
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index c29259e318c6..13afc358c337 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -20,6 +20,7 @@
package org.apache.druid.msq.exec;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -41,6 +42,7 @@
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.DimensionSchema;
@@ -62,6 +64,9 @@
import org.apache.druid.frame.write.InvalidNullByteException;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
@@ -76,6 +81,7 @@
import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.TooManyBucketsException;
import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
@@ -191,6 +197,7 @@
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@@ -198,6 +205,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.DruidNode;
@@ -207,6 +215,7 @@
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.storage.ExportStorageProvider;
+import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
@@ -230,6 +239,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
@@ -241,6 +251,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
@@ -1731,12 +1742,114 @@ private void publishSegmentsIfNeeded(
//noinspection unchecked
@SuppressWarnings("unchecked")
- final Set segments = (Set) queryKernel.getResultObjectForStage(finalStageId);
+ Set segments = (Set) queryKernel.getResultObjectForStage(finalStageId);
+
+ boolean storeCompactionState = QueryContext.of(task.getQuerySpec().getQuery().getContext())
+ .getBoolean(
+ Tasks.STORE_COMPACTION_STATE_KEY,
+ Tasks.DEFAULT_STORE_COMPACTION_STATE
+ );
+
+ if (!segments.isEmpty() && storeCompactionState) {
+ DataSourceMSQDestination destination = (DataSourceMSQDestination) task.getQuerySpec().getDestination();
+ if (!destination.isReplaceTimeChunks()) {
+ // Store compaction state only for replace queries.
+ log.warn(
+ "storeCompactionState flag set for a non-REPLACE query [%s]. Ignoring the flag for now.",
+ queryDef.getQueryId()
+ );
+ } else {
+ DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel
+ .getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();
+
+ ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec();
+
+ Function, Set> compactionStateAnnotateFunction = addCompactionStateToSegments(
+ task(),
+ context.jsonMapper(),
+ dataSchema,
+ shardSpec,
+ queryDef.getQueryId()
+ );
+ segments = compactionStateAnnotateFunction.apply(segments);
+ }
+ }
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size());
publishAllSegments(segments);
}
}
+ private static Function, Set> addCompactionStateToSegments(
+ MSQControllerTask task,
+ ObjectMapper jsonMapper,
+ DataSchema dataSchema,
+ ShardSpec shardSpec,
+ String queryId
+ )
+ {
+ final MSQTuningConfig tuningConfig = task.getQuerySpec().getTuningConfig();
+ PartitionsSpec partitionSpec;
+
+ if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
+ List partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
+ partitionSpec = new DimensionRangePartitionsSpec(
+ tuningConfig.getRowsPerSegment(),
+ null,
+ partitionDimensions,
+ false
+ );
+ } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
+ // MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE.
+ partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
+ } else {
+ // SingleDimenionShardSpec and other shard specs are never created in MSQ.
+ throw new MSQException(
+ UnknownFault.forMessage(
+ StringUtils.format(
+ "Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].",
+ queryId,
+ shardSpec.getType()
+ )));
+ }
+
+ Granularity segmentGranularity = ((DataSourceMSQDestination) task.getQuerySpec().getDestination())
+ .getSegmentGranularity();
+
+ GranularitySpec granularitySpec = new UniformGranularitySpec(
+ segmentGranularity,
+ dataSchema.getGranularitySpec().getQueryGranularity(),
+ dataSchema.getGranularitySpec().isRollup(),
+ dataSchema.getGranularitySpec().inputIntervals()
+ );
+
+ DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec();
+ Map transformSpec = TransformSpec.NONE.equals(dataSchema.getTransformSpec())
+ ? null
+ : new ClientCompactionTaskTransformSpec(
+ dataSchema.getTransformSpec().getFilter()
+ ).asMap(jsonMapper);
+ List