Enable rollup on multi-value dimensions for compaction with MSQ engine#16937
Enable rollup on multi-value dimensions for compaction with MSQ engine#16937kfaraz merged 17 commits intoapache:masterfrom
Conversation
# Conflicts: # extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
# Conflicts: # extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
| public void testCombinedDataSchemaSetsMultiValuedColumnsInfo() | ||
| { | ||
| MultiValuedColumnsInfo multiValuedColumnsInfo = MultiValuedColumnsInfo.processed(); | ||
| multiValuedColumnsInfo.addMultiValuedColumn("dimA"); | ||
|
|
||
| CombinedDataSchema schema = new CombinedDataSchema( | ||
| IdUtilsTest.VALID_ID_CHARS, | ||
| new TimestampSpec("time", "auto", null), | ||
| DimensionsSpec.builder() | ||
| .setDimensions( | ||
| DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimA", "dimB", "metric1")) |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
| ); | ||
| Assert.assertTrue(schema.getMultiValuedColumnsInfo().isProcessed()); | ||
| Assert.assertEquals(ImmutableSet.of("dimA"), schema.getMultiValuedColumnsInfo().getMultiValuedColumns()); | ||
| } |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
| ); | ||
| Assert.assertTrue(schema.getMultiValuedColumnsInfo().isProcessed()); | ||
| Assert.assertEquals(ImmutableSet.of("dimA"), schema.getMultiValuedColumnsInfo().getMultiValuedColumns()); | ||
| } |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
| {"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} | ||
| {"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9} | ||
| {"timestamp": "2013-09-01T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143} No newline at end of file | ||
| {"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "tags": ["t5", "t6"], "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900} |
There was a problem hiding this comment.
The value of tags always has 2 entries in all the data files. Maybe mix it up a bit, like 0 values, 1 value or 3 values.
There was a problem hiding this comment.
Rather than changing existing data files, I would advise we create new ones (e.g. wikipedia_index_data1_mvd.json). These files are probably used in multiple tests and it would be better to leave those as is.
There was a problem hiding this comment.
Have updated test dataset varying tags length. Modified existing dataset itself since only one other test required an update mainly because most of tests selectively use columns from the dataset.
| forceTriggerAutoCompaction(5); | ||
| verifyQuery(INDEX_QUERIES_RESOURCE); | ||
| verifySegmentsCompacted(hashedPartitionsSpec, 4); | ||
| verifySegmentsCompacted(hashedPartitionsSpec, 5); |
There was a problem hiding this comment.
Why do we need to change this test?
There was a problem hiding this comment.
Have a comment a few lines above. Reqd since numShards is a (max) hint and the actual number can change based on the data.
I haven't gone through the PR yet, but grouping on String columns can make use of the indices. Converting them to arrays before grouping on them will invalidate those indices and can be a lot slower. I think this might not be as much of a concern in MSQ only, since the frames don't contain indices in the first place, but if this changes in the future, will this modification be actively harmful to query processing in MSQ? Moreover, conversion roundtrip will inherently be slower because of the mere fact that conversion is happening. Is the overhead calculated somewhere? |
|
@LakshSingla: The change only impacts compaction using MSQ engine, and we don't have a recourse here since for MSQ, grouping on MVDs ends up unnesting the dimensions. The conversion overhead should be acceptable since this is only for compaction. For regular query processing post compaction, the original indexes for each column will be preserved, since with #16864, we pass the dimensionSchema to compaction job. |
kfaraz
left a comment
There was a problem hiding this comment.
Minor suggestions, otherwise looks good.
|
Merging this PR as the failing test is an unrelated flaky test being fixed in a different PR . |
apache#16937) Currently compaction with MSQ engine doesn't work for rollup on multi-value dimensions (MVDs), the reason being the default behaviour of grouping on MVD dimensions to unnest the dimension values; for instance grouping on `[s1,s2]` with aggregate `a` will result in two rows: `<s1,a>` and `<s2,a>`. This change enables rollup on MVDs (without unnest) by converting MVDs to Arrays before rollup using virtual columns, and then converting them back to MVDs using post aggregators. If segment schema is available to the compaction task (when it ends up downloading segments to get existing dimensions/metrics/granularity), it selectively does the MVD-Array conversion only for known multi-valued columns; else it conservatively performs this conversion for all `string` columns.
Description
Currently compaction with MSQ engine doesn't work for rollup on multi-value dimensions (MVDs), the reason being the default behaviour of grouping on MVD dimensions to unnest the dimension values; for instance grouping on
[s1,s2]with aggregateawill result in two rows:<s1,a>and<s2,a>.This change enables rollup on MVDs (without unnest) by converting MVDs to Arrays before rollup using virtual columns, and then converting them back to MVDs using post aggregators. If segment schema is available to the compaction task (when it ends up downloading segments to get existing dimensions/metrics/granularity), it selectively does the MVD-Array conversion only for known multi-valued columns; else it conservatively performs this conversion for all
stringcolumns.Key changed/added classes in this PR
MSQCompactionRunnerCompactionTaskThis PR has: