Add storeCompactionState flag support to msq#15965
Conversation
cryptoe
left a comment
There was a problem hiding this comment.
Left some comments.
We should also add a assert to the compaction config in MSQReplaceTests.
| List<String> partitionDimensions | ||
| ) | ||
| { | ||
| final boolean storeCompactionState = task.getContextValue( |
There was a problem hiding this comment.
We should document this in the SQLBasedIngestion context parameter docs saying we support these context parameters in MSQ and link them to the original documentation of storeCompactionState
There was a problem hiding this comment.
I'm just thinking if it would be better to move it to query context instead of task context to enable setting it from the web-console. Any thoughts on that?
There was a problem hiding this comment.
Moved this to query context
| IndexSpec indexSpec = task.getQuerySpec().getTuningConfig().getIndexSpec(); | ||
| GranularitySpec granularitySpec = dataSchema.getGranularitySpec(); | ||
| DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec(); | ||
| Map<String, Object> transformSpec = dataSchema.getTransformSpec() == null |
There was a problem hiding this comment.
Curious to know why this is required for compaction state.
There was a problem hiding this comment.
All these fields are captured when setting compaction state in the native flow.
| Tasks.DEFAULT_STORE_COMPACTION_STATE | ||
| ); | ||
|
|
||
| if (storeCompactionState) { |
There was a problem hiding this comment.
Where are we checking that the sql statement is a replace ?
There was a problem hiding this comment.
That would be part of the logic to set this flag itself. Currently, this PR doesn't incorporate the logic to implicitly set it in REPLACE commands, so this would have to be explicitly set.
There was a problem hiding this comment.
If the flag is set and the user issues an insert command, it should be an error.
There was a problem hiding this comment.
You can check if its a replace query using :
A clarification: this PR doesn't implicitly set the compaction state for all MSQ replace tasks |
… granularity spec accordingly
… granularity spec accordingly
AmatyaAvadhanula
left a comment
There was a problem hiding this comment.
Would it be better to extract the dataSchema, indexSpec, granularitySpec and (compute) the partitionsSpec, and use a common utility for both native batch and MSQ?
The bulk of the code is to extract/compute these values themselves, so I think the common utility will be of little value other than just creating the CompactionState object and the transform fn. |
I think we are interested in storing the compaction state to prevent additional compactions from running on an already compacted interval. |
|
|
||
| @MethodSource("data") | ||
| @ParameterizedTest(name = "{index}:with context {0}") | ||
| public void testReplaceSegmentsWithQuarterSegmentGranularity(String contextName, Map<String, Object> context) |
Check notice
Code scanning / CodeQL
Useless parameter
There was a problem hiding this comment.
This parameter needs to be removed.
There was a problem hiding this comment.
It is required and is there in every other test -- though unused.
There was a problem hiding this comment.
I see, thanks for the clarification. The second parameter is used just to name the test in JUnit. There also seems to be a lot of repetition of the annotations.
Edit: Apparently this is the just the way JUnit5 works. There is no way (yet) to parameterize the constructor of the entire test class.
…er minor changes.
I've moved the annotation function calculation to a common place now |
cryptoe
left a comment
There was a problem hiding this comment.
Left some comments. Overall lgtm.
@gargvishesh Please test this on a dev local cluster confirming that the compaction does not trigger if we set this flag.
Or triggers if there is a change in the "compaction spec".
| } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) { | ||
| partitionSpec = new DynamicPartitionsSpec(task.getQuerySpec().getTuningConfig().getRowsPerSegment(), null); | ||
| } else { | ||
| log.error( |
There was a problem hiding this comment.
I feel this should be a MSQ error ie throw a MSQ Fault and fail the job since if we do add new shardSpecs to MSQ, we should also add support to store compaction stage. If we donot add code here, the jobs of the user would pass with this error message in the logs. It would require lot of debugging to figure out that we missed adding stuff here.
There was a problem hiding this comment.
Throwing an MSQException now.
| @SuppressWarnings("unchecked") | ||
| final Set<DataSegment> segments = (Set<DataSegment>) queryKernel.getResultObjectForStage(finalStageId); | ||
|
|
||
| Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction = Function.identity(); |
There was a problem hiding this comment.
Rather than declaring this function outside the if-else and assigning identity() to it, it should just be declared where necessary. You can make the segments non-final.
There was a problem hiding this comment.
Applying the function in the branch itself now before sending to publish.
kfaraz
left a comment
There was a problem hiding this comment.
@gargvishesh , thanks for the changes.
I have left some comments. There are also some from @cryptoe that need to be addressed before this PR can be merged.
I guess at some point, we would need a test that verifies that segments written by MSQ REPLACE are indeed not picked up by compaction if the desired state matches. But maybe we can do that in an integration-test in a follow up PR.
| if ((Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE) | ||
| || Objects.equals(shardSpec.getType(), ShardSpec.Type.RANGE))) { | ||
| List<String> partitionDimensions = ((DimensionRangeShardSpec) shardSpec).getDimensions(); | ||
| partitionSpec = new DimensionRangePartitionsSpec( |
There was a problem hiding this comment.
Since we are using DimensionRangePartitionsSpec for single-dim segments, is it possible that segments partitioned by single-dim would get re-picked by compaction if compaction config has single as the desired state?
I am not entirely sure if we still allow users to use single in the compaction config.
There was a problem hiding this comment.
From the code, the equals method compares the class type first before comparing the fields themselves. So you are right: a single type spec should be stored in the corresponding instance. Have updated the handling now. Thanks!
|
@cryptoe @kfaraz |
Very important catch @gargvishesh . Thank you. |
| final MSQTuningConfig tuningConfig = task.getQuerySpec().getTuningConfig(); | ||
| PartitionsSpec partitionSpec; | ||
|
|
||
| // There is currently no way of specifying either maxRowsPerSegment or maxTotalRows for an MSQ task. |
There was a problem hiding this comment.
Please add another line of comment to explain the implications of this fact for the code here.
There was a problem hiding this comment.
Added and relocated to the appropriate line.
| } else if (Objects.equals(shardSpec.getType(), ShardSpec.Type.NUMBERED)) { | ||
| // There is currently no way of specifying either maxRowsPerSegment or maxTotalRows for an MSQ task. | ||
| // Hence using null for both which ends up translating to DEFAULT_MAX_ROWS_PER_SEGMENT for maxRowsPerSegment. | ||
| partitionSpec = new DynamicPartitionsSpec(null, null); |
There was a problem hiding this comment.
maxRowsPerSegment=numRowsPerSegment no ?
There was a problem hiding this comment.
maybe use this spec :
cryptoe
left a comment
There was a problem hiding this comment.
Left 2 comments. LGTM otherwise.
| PartitionsSpec partitionSpec; | ||
|
|
||
| if (Objects.equals(shardSpec.getType(), ShardSpec.Type.SINGLE)) { | ||
| String partitionDimension = ((SingleDimensionShardSpec) shardSpec).getDimension(); |
There was a problem hiding this comment.
I think shard spec cannot be single in MSQ. Lets just check for Range shard spec.
# Conflicts: # extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
|
@gargvishesh Could you please add the release notes in the PR description. |
Compaction in the native engine by default records the state of compaction for each segment in the
lastCompactionStatesegment field. This PR adds support for doing the same in the MSQ engine, targeted for future cases such asREPLACEand compaction done via MSQ.Note that this PR doesn't implicitly store the compaction state for MSQ replace tasks; it is stored with flag
"storeCompactionState": truein the query context.Release Note
storeCompactionState context flag is now supported for MSQ Replace tasks.