Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d2c28d4
Add storeCompactionState annotation function
gargvishesh Feb 23, 2024
555d5d5
Add flag and change some config sources
gargvishesh Feb 26, 2024
0ac20e4
Add type check for shard spec before casting
gargvishesh Feb 26, 2024
a6d3dc0
Check if there is a segment granularity in the context and revise the…
gargvishesh Feb 26, 2024
b218280
Check if there is a segment granularity in the context and revise the…
gargvishesh Feb 26, 2024
33b5a82
Address review comments
gargvishesh Feb 26, 2024
cf37c65
Add tests for compaction state
gargvishesh Mar 5, 2024
f877b91
Corrections
gargvishesh Mar 5, 2024
6464605
Address review comments
gargvishesh Mar 22, 2024
b24a7c9
Remove unused var
gargvishesh Mar 22, 2024
ac2d8f5
Merge branch 'master' into add-store-compaction-state-to-msq
gargvishesh Mar 22, 2024
1a0517c
Fix compilation errors due to junit5 migration
gargvishesh Mar 22, 2024
f402523
Separate compactionStateAnnotationFunction to a common place, and oth…
gargvishesh Mar 22, 2024
13f2c99
Checkstyle fixes
gargvishesh Mar 22, 2024
3b57dfa
Try again
gargvishesh Mar 22, 2024
454cb09
Update doc
gargvishesh Mar 22, 2024
e59c1bc
Revert additional indentation changes
gargvishesh Mar 22, 2024
cbc582d
Resolve coverage test for druid-processing
gargvishesh Mar 23, 2024
316e378
Suppress spelling error
gargvishesh Apr 1, 2024
c87ff9f
Address review comments
gargvishesh Apr 2, 2024
f18650d
Resolve checkstyle errors
gargvishesh Apr 2, 2024
49053db
Remove redundant comment
gargvishesh Apr 2, 2024
29ea760
Revert maxTotalRows to null
gargvishesh Apr 4, 2024
7e43b5d
Address review comments and fix tests
gargvishesh Apr 5, 2024
a282e32
Correct values in DynamicPartitionSpec.
gargvishesh Apr 8, 2024
b6bc0a5
Fix checkstyle
gargvishesh Apr 8, 2024
6a4edc9
Fix tests
gargvishesh Apr 9, 2024
be761ed
Merge branch 'master' into add-store-compaction-state-to-msq
gargvishesh Apr 9, 2024
659fe07
Merge branch 'master' into add-store-compaction-state-to-msq
gargvishesh Apr 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ The following table lists the context parameters for the MSQ task engine:
| `rowsPerPage` | SELECT<br /><br />The number of rows per page to target. The actual number of rows per page may be somewhat higher or lower than this number. In most cases, use the default.<br /> This property comes into effect only when `selectDestination` is set to `durableStorage` | 100000 |
| `skipTypeVerification` | INSERT or REPLACE<br /><br />During query validation, Druid validates that [string arrays](../querying/arrays.md) and [multi-value dimensions](../querying/multi-value-dimensions.md) are not mixed in the same column. If you are intentionally migrating from one to the other, use this context parameter to disable type validation.<br /><br />Provide the column list as comma-separated values or as a JSON array in string form.| empty list |
| `failOnEmptyInsert` | INSERT or REPLACE<br /><br /> When set to false (the default), an INSERT query generating no output rows will be no-op, and a REPLACE query generating no output rows will delete all data that matches the OVERWRITE clause. When set to true, an ingest query generating no output rows will throw an `InsertCannotBeEmpty` fault. | `false` |
| `storeCompactionState` | REPLACE<br /><br /> When set to true, a REPLACE query stores as part of each segment's metadata a `lastCompactionState` field that captures the various specs used to create the segment. Future compaction jobs skip segments whose `lastCompactionState` matches the desired compaction state. Works the same as [`storeCompactionState`](../ingestion/tasks.md#context-parameters) task context flag. | `false` |

## Joins

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.msq.exec;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand All @@ -41,6 +42,7 @@
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.DimensionSchema;
Expand All @@ -62,6 +64,9 @@
import org.apache.druid.frame.write.InvalidNullByteException;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
Expand All @@ -76,6 +81,7 @@
import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.TooManyBucketsException;
import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
Expand Down Expand Up @@ -191,13 +197,15 @@
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.DruidNode;
Expand All @@ -207,6 +215,7 @@
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
Expand All @@ -230,6 +239,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
Expand All @@ -241,6 +251,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -1731,12 +1742,114 @@ private void publishSegmentsIfNeeded(

//noinspection unchecked
@SuppressWarnings("unchecked")
final Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId);
Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId);

boolean storeCompactionState = QueryContext.of(task.getQuerySpec().getQuery().getContext())
.getBoolean(
Tasks.STORE_COMPACTION_STATE_KEY,
Tasks.DEFAULT_STORE_COMPACTION_STATE
);

if (!segments.isEmpty() && storeCompactionState) {
DataSourceMSQDestination destination = (DataSourceMSQDestination) task.getQuerySpec().getDestination();
if (!destination.isReplaceTimeChunks()) {
// Store compaction state only for replace queries.
log.warn(
"storeCompactionState flag set for a non-REPLACE query [%s]. Ignoring the flag for now.",
queryDef.getQueryId()
);
} else {
DataSchema dataSchema = ((SegmentGeneratorFrameProcessorFactory) queryKernel
.getStageDefinition(finalStageId).getProcessorFactory()).getDataSchema();

ShardSpec shardSpec = segments.stream().findFirst().get().getShardSpec();

Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = addCompactionStateToSegments(
task(),
context.jsonMapper(),
dataSchema,
shardSpec,
queryDef.getQueryId()
);
segments = compactionStateAnnotateFunction.apply(segments);
}
}
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size());
publishAllSegments(segments);
}
}

private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateToSegments(
MSQControllerTask task,
ObjectMapper jsonMapper,
DataSchema dataSchema,
ShardSpec shardSpec,
String queryId
)
{
final MSQTuningConfig tuningConfig = task.getQuerySpec().getTuningConfig();
PartitionsSpec partitionSpec;

if (Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE)) {
List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions();
partitionSpec = new DimensionRangePartitionsSpec(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are using DimensionRangePartitionsSpec for single-dim segments, is it possible that segments partitioned by single-dim would get re-picked by compaction if compaction config has single as the desired state?
I am not entirely sure if we still allow users to use single in the compaction config.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code, the equals method compares the class type first before comparing the fields themselves. So you are right: a single type spec should be stored in the corresponding instance. Have updated the handling now. Thanks!

tuningConfig.getRowsPerSegment(),
null,
partitionDimensions,
false
);
} else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) {
// MSQ tasks don't use maxTotalRows. Hence using LONG.MAX_VALUE.
partitionSpec = new DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
} else {
// SingleDimenionShardSpec and other shard specs are never created in MSQ.
throw new MSQException(
UnknownFault.forMessage(
StringUtils.format(
"Query[%s] cannot store compaction state in segments as shard spec of unsupported type[%s].",
queryId,
shardSpec.getType()
)));
}

Granularity segmentGranularity = ((DataSourceMSQDestination) task.getQuerySpec().getDestination())
.getSegmentGranularity();

GranularitySpec granularitySpec = new UniformGranularitySpec(
segmentGranularity,
dataSchema.getGranularitySpec().getQueryGranularity(),
dataSchema.getGranularitySpec().isRollup(),
dataSchema.getGranularitySpec().inputIntervals()
);

DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec();
Map<String, Object> transformSpec = TransformSpec.NONE.equals(dataSchema.getTransformSpec())
? null
: new ClientCompactionTaskTransformSpec(
dataSchema.getTransformSpec().getFilter()
).asMap(jsonMapper);
List<Object> metricsSpec = dataSchema.getAggregators() == null
? null
: jsonMapper.convertValue(
dataSchema.getAggregators(), new TypeReference<List<Object>>()
{
});


IndexSpec indexSpec = tuningConfig.getIndexSpec();

log.info("Query[%s] storing compaction state in segments.", queryId);

return CompactionState.addCompactionStateToSegments(
partitionSpec,
dimensionsSpec,
metricsSpec,
transformSpec,
indexSpec.asMap(jsonMapper),
granularitySpec.asMap(jsonMapper)
);
}

/**
* Clean up durable storage, if used for stage output.
* <p>
Expand Down Expand Up @@ -1797,7 +1910,9 @@ private static QueryDefinition makeQueryDefinition(
}
} else {
shuffleSpecFactory = querySpec.getDestination()
.getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()));
.getShuffleSpecFactory(
MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
);
queryToPlan = querySpec.getQuery();
}

Expand Down Expand Up @@ -1899,9 +2014,11 @@ private static QueryDefinition makeQueryDefinition(
if (filesIterator.hasNext()) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Found files at provided export destination[%s]. Export is only allowed to "
+ "an empty path. Please provide an empty path/subdirectory or move the existing files.",
exportStorageProvider.getBasePath());
.build(
"Found files at provided export destination[%s]. Export is only allowed to "
+ "an empty path. Please provide an empty path/subdirectory or move the existing files.",
exportStorageProvider.getBasePath()
);
}
}
catch (IOException e) {
Expand Down Expand Up @@ -1933,7 +2050,6 @@ private static QueryDefinition makeQueryDefinition(
}



private static DataSchema generateDataSchema(
MSQSpec querySpec,
RowSignature querySignature,
Expand Down Expand Up @@ -2388,7 +2504,9 @@ private static MSQStatusReport makeStatusReport(
workerStatsMap = taskLauncher.getWorkerStats();
}

SegmentLoadStatusFetcher.SegmentLoadWaiterStatus status = segmentLoadWaiter == null ? null : segmentLoadWaiter.status();
SegmentLoadStatusFetcher.SegmentLoadWaiterStatus status = segmentLoadWaiter == null
? null
: segmentLoadWaiter.status();

return new MSQStatusReport(
taskState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public class MultiStageQueryContext
public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = ClusterStatisticsMergeMode.SEQUENTIAL.toString();

public static final String CTX_ROWS_PER_SEGMENT = "rowsPerSegment";
static final int DEFAULT_ROWS_PER_SEGMENT = 3000000;
public static final int DEFAULT_ROWS_PER_SEGMENT = 3000000;

public static final String CTX_ROWS_PER_PAGE = "rowsPerPage";
static final int DEFAULT_ROWS_PER_PAGE = 100000;
Expand Down
Loading