diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java index e20188d58294..799d86d76bbd 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java @@ -309,7 +309,10 @@ private static Integer getRowsPerSegment(CompactionTask compactionTask) private static RowSignature getRowSignature(DataSchema dataSchema) { RowSignature.Builder rowSignatureBuilder = RowSignature.builder(); - rowSignatureBuilder.add(dataSchema.getTimestampSpec().getTimestampColumn(), ColumnType.LONG); + if (dataSchema.getDimensionsSpec().isForceSegmentSortByTime() == true) { + // If sort not forced by time, __time appears as part of dimensions in DimensionsSpec + rowSignatureBuilder.add(dataSchema.getTimestampSpec().getTimestampColumn(), ColumnType.LONG); + } if (!isQueryGranularityEmptyOrNone(dataSchema)) { // A virtual column for query granularity would have been added. Add corresponding column type. rowSignatureBuilder.add(TIME_VIRTUAL_COLUMN, ColumnType.LONG); @@ -359,25 +362,31 @@ private static List getAggregateDimensions( private static ColumnMappings getColumnMappings(DataSchema dataSchema) { - List columnMappings = dataSchema.getDimensionsSpec() - .getDimensions() - .stream() - .map(dim -> new ColumnMapping( - dim.getName(), dim.getName())) - .collect(Collectors.toList()); + List columnMappings = new ArrayList<>(); + // For scan queries, a virtual column is created from __time if a custom query granularity is provided. For + // group-by queries, as insert needs __time, it will always be one of the dimensions. Since dimensions in groupby + // aren't allowed to have time column as the output name, we map time dimension to TIME_VIRTUAL_COLUMN in + // dimensions, and map it back to the time column here. + String timeColumn = (isGroupBy(dataSchema) || !isQueryGranularityEmptyOrNone(dataSchema)) + ? TIME_VIRTUAL_COLUMN + : ColumnHolder.TIME_COLUMN_NAME; + ColumnMapping timeColumnMapping = new ColumnMapping(timeColumn, ColumnHolder.TIME_COLUMN_NAME); + if (dataSchema.getDimensionsSpec().isForceSegmentSortByTime()) { + // When not sorted by time, the __time column is missing from dimensionsSpec + columnMappings.add(timeColumnMapping); + } + columnMappings.addAll( + dataSchema.getDimensionsSpec() + .getDimensions() + .stream() + .map(dim -> dim.getName().equals(ColumnHolder.TIME_COLUMN_NAME) + ? timeColumnMapping + : new ColumnMapping(dim.getName(), dim.getName())) + .collect(Collectors.toList()) + ); columnMappings.addAll(Arrays.stream(dataSchema.getAggregators()) .map(agg -> new ColumnMapping(agg.getName(), agg.getName())) - .collect( - Collectors.toList())); - if (isGroupBy(dataSchema) || !isQueryGranularityEmptyOrNone(dataSchema)) { - // For scan queries, a virtual column is created from __time if a custom query granularity is provided. For - // group-by queries, as insert needs __time, it will always be one of the dimensions. Since dimensions in groupby - // aren't allowed to have time column as the output name, we map time dimension to TIME_VIRTUAL_COLUMN in - // dimensions, and map it back to the time column here. - columnMappings.add(new ColumnMapping(TIME_VIRTUAL_COLUMN, ColumnHolder.TIME_COLUMN_NAME)); - } else { - columnMappings.add(new ColumnMapping(ColumnHolder.TIME_COLUMN_NAME, ColumnHolder.TIME_COLUMN_NAME)); - } + .collect(Collectors.toList())); return new ColumnMappings(columnMappings); } @@ -392,6 +401,19 @@ private static List getOrderBySpec(PartitionsSpec partitionSp return Collections.emptyList(); } + private static Map buildQueryContext( + Map taskContext, + DataSchema dataSchema + ) + { + if (dataSchema.getDimensionsSpec().isForceSegmentSortByTime()) { + return taskContext; + } + Map queryContext = new HashMap<>(taskContext); + queryContext.put(MultiStageQueryContext.CTX_FORCE_TIME_SORT, false); + return queryContext; + } + private static Query buildScanQuery( CompactionTask compactionTask, Interval interval, @@ -408,7 +430,7 @@ private static Query buildScanQuery( .columnTypes(rowSignature.getColumnTypes()) .intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(interval))) .filters(dataSchema.getTransformSpec().getFilter()) - .context(compactionTask.getContext()); + .context(buildQueryContext(compactionTask.getContext(), dataSchema)); if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) { List orderByColumnSpecs = getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec()); @@ -560,7 +582,7 @@ private Query buildGroupByQuery( .setDimensions(getAggregateDimensions(dataSchema, inputColToVirtualCol)) .setAggregatorSpecs(Arrays.asList(dataSchema.getAggregators())) .setPostAggregatorSpecs(postAggregators) - .setContext(compactionTask.getContext()) + .setContext(buildQueryContext(compactionTask.getContext(), dataSchema)) .setInterval(interval); if (compactionTask.getTuningConfig() != null && compactionTask.getTuningConfig().getPartitionsSpec() != null) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java index 0b5395d727fe..3ff8317bf396 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java @@ -59,7 +59,9 @@ import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.AutoTypeColumnSchema; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.NestedDataColumnSchema; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.data.CompressionFactory; @@ -71,14 +73,13 @@ import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.coordinator.CompactionConfigValidationResult; import org.apache.druid.sql.calcite.parser.DruidSqlInsert; -import org.hamcrest.MatcherAssert; -import org.hamcrest.Matchers; import org.joda.time.Interval; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -102,10 +103,14 @@ public class MSQCompactionRunnerTest private static final StringDimensionSchema STRING_DIMENSION = new StringDimensionSchema("string_dim", null, false); private static final StringDimensionSchema MV_STRING_DIMENSION = new StringDimensionSchema("mv_string_dim", null, null); private static final LongDimensionSchema LONG_DIMENSION = new LongDimensionSchema("long_dim"); + private static final NestedDataColumnSchema NESTED_DIMENSION = new NestedDataColumnSchema("nested_dim", 4); + private static final AutoTypeColumnSchema AUTO_DIMENSION = new AutoTypeColumnSchema("auto_dim", null); private static final List DIMENSIONS = ImmutableList.of( STRING_DIMENSION, LONG_DIMENSION, - MV_STRING_DIMENSION + MV_STRING_DIMENSION, + NESTED_DIMENSION, + AUTO_DIMENSION ); private static final Map INTERVAL_DATASCHEMAS = ImmutableMap.of( COMPACTION_INTERVAL, @@ -298,7 +303,7 @@ public void testRunCompactionTasksWithEmptyTaskListFails() throws Exception } @Test - public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingException + public void testCompactionConfigWithoutMetricsSpecProducesCorrectSpec() throws JsonProcessingException { DimFilter dimFilter = new SelectorDimFilter("dim1", "foo", null); @@ -318,7 +323,7 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce .withGranularity( new UniformGranularitySpec( SEGMENT_GRANULARITY.getDefaultGranularity(), - null, + QUERY_GRANULARITY.getDefaultGranularity(), false, Collections.singletonList(COMPACTION_INTERVAL) ) @@ -336,37 +341,37 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce MSQSpec actualMSQSpec = msqControllerTask.getQuerySpec(); - Assert.assertEquals( - new MSQTuningConfig( - 1, - MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY, - MAX_ROWS_PER_SEGMENT, - null, - createIndexSpec() - ), - actualMSQSpec.getTuningConfig() - ); - Assert.assertEquals( - new DataSourceMSQDestination( - DATA_SOURCE, - SEGMENT_GRANULARITY.getDefaultGranularity(), - null, - Collections.singletonList(COMPACTION_INTERVAL), - DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, Function.identity())), - null - ), - actualMSQSpec.getDestination() - ); + Assert.assertEquals(getExpectedTuningConfig(), actualMSQSpec.getTuningConfig()); + Assert.assertEquals(getExpectedDestination(), actualMSQSpec.getDestination()); Assert.assertTrue(actualMSQSpec.getQuery() instanceof ScanQuery); ScanQuery scanQuery = (ScanQuery) actualMSQSpec.getQuery(); + List expectedColumns = new ArrayList<>(); + List expectedColumnTypes = new ArrayList<>(); + // Add __time since this is a time-ordered query which doesn't have __time explicitly defined in dimensionsSpec + expectedColumns.add(ColumnHolder.TIME_COLUMN_NAME); + expectedColumnTypes.add(ColumnType.LONG); + + // Add TIME_VIRTUAL_COLUMN since a query granularity is specified + expectedColumns.add(MSQCompactionRunner.TIME_VIRTUAL_COLUMN); + expectedColumnTypes.add(ColumnType.LONG); + + expectedColumns.addAll(DIMENSIONS.stream().map(DimensionSchema::getName).collect(Collectors.toList())); + expectedColumnTypes.addAll(DIMENSIONS.stream().map(DimensionSchema::getColumnType).collect(Collectors.toList())); + + Assert.assertEquals(expectedColumns, scanQuery.getColumns()); + Assert.assertEquals(expectedColumnTypes, scanQuery.getColumnTypes()); + Assert.assertEquals(dimFilter, scanQuery.getFilter()); Assert.assertEquals( JSON_MAPPER.writeValueAsString(SEGMENT_GRANULARITY.toString()), msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY) ); - Assert.assertNull(msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY)); + Assert.assertEquals( + JSON_MAPPER.writeValueAsString(QUERY_GRANULARITY.toString()), + msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY) + ); Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy()); Assert.assertEquals( PARTITION_DIMENSIONS.stream().map(OrderBy::ascending).collect(Collectors.toList()), @@ -375,7 +380,60 @@ public void testMSQControllerTaskSpecWithScanIsValid() throws JsonProcessingExce } @Test - public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcessingException + public void testCompactionConfigWithSortOnNonTimeDimensionsProducesCorrectSpec() throws JsonProcessingException + { + List nonTimeSortedDimensions = ImmutableList.of( + STRING_DIMENSION, + new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME), + LONG_DIMENSION + ); + CompactionTask taskCreatedWithTransformSpec = createCompactionTask( + new DynamicPartitionsSpec(TARGET_ROWS_PER_SEGMENT, null), + null, + Collections.emptyMap(), + null, + null + ); + + // Set forceSegmentSortByTime=false to enable non-time order + DimensionsSpec dimensionsSpec = DimensionsSpec.builder() + .setDimensions(nonTimeSortedDimensions) + .setForceSegmentSortByTime(false) + .build(); + DataSchema dataSchema = + DataSchema.builder() + .withDataSource(DATA_SOURCE) + .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null)) + .withDimensions(dimensionsSpec) + .withGranularity( + new UniformGranularitySpec( + SEGMENT_GRANULARITY.getDefaultGranularity(), + null, + false, + Collections.singletonList(COMPACTION_INTERVAL) + ) + ) + .build(); + + List msqControllerTasks = MSQ_COMPACTION_RUNNER.createMsqControllerTasks( + taskCreatedWithTransformSpec, + Collections.singletonMap(COMPACTION_INTERVAL, dataSchema) + ); + + MSQSpec actualMSQSpec = Iterables.getOnlyElement(msqControllerTasks).getQuerySpec(); + + Assert.assertTrue(actualMSQSpec.getQuery() instanceof ScanQuery); + ScanQuery scanQuery = (ScanQuery) actualMSQSpec.getQuery(); + + // Dimensions should already list __time and the order should remain intact + Assert.assertEquals( + nonTimeSortedDimensions.stream().map(DimensionSchema::getName).collect(Collectors.toList()), + scanQuery.getColumns() + ); + } + + @Test + public void testCompactionConfigWithMetricsSpecProducesCorrectSpec() throws JsonProcessingException { DimFilter dimFilter = new SelectorDimFilter("dim1", "foo", null); @@ -404,7 +462,6 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess multiValuedDimensions ); - List msqControllerTasks = MSQ_COMPACTION_RUNNER.createMsqControllerTasks( taskCreatedWithTransformSpec, Collections.singletonMap(COMPACTION_INTERVAL, dataSchema) @@ -414,27 +471,8 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess MSQSpec actualMSQSpec = msqControllerTask.getQuerySpec(); - Assert.assertEquals( - new MSQTuningConfig( - 1, - MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY, - MAX_ROWS_PER_SEGMENT, - null, - createIndexSpec() - ), - actualMSQSpec.getTuningConfig() - ); - Assert.assertEquals( - new DataSourceMSQDestination( - DATA_SOURCE, - SEGMENT_GRANULARITY.getDefaultGranularity(), - null, - Collections.singletonList(COMPACTION_INTERVAL), - DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, Function.identity())), - null - ), - actualMSQSpec.getDestination() - ); + Assert.assertEquals(getExpectedTuningConfig(), actualMSQSpec.getTuningConfig()); + Assert.assertEquals(getExpectedDestination(), actualMSQSpec.getDestination()); Assert.assertTrue(actualMSQSpec.getQuery() instanceof GroupByQuery); GroupByQuery groupByQuery = (GroupByQuery) actualMSQSpec.getQuery(); @@ -450,30 +488,32 @@ public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws JsonProcess ); Assert.assertEquals(WorkerAssignmentStrategy.MAX, actualMSQSpec.getAssignmentStrategy()); - - // Since only MV_STRING_DIMENSION is indicated to be MVD by the CombinedSchema, conversion to array should happen - // only for that column. - List expectedDimensionSpec = DIMENSIONS.stream() - .filter(dim -> !MV_STRING_DIMENSION.getName() - .equals(dim.getName())) - .map(dim -> new DefaultDimensionSpec( - dim.getName(), - dim.getName(), - dim.getColumnType() - )) - .collect( - Collectors.toList()); + List expectedDimensionSpec = new ArrayList<>(); expectedDimensionSpec.add( - new DefaultDimensionSpec(MSQCompactionRunner.TIME_VIRTUAL_COLUMN, - MSQCompactionRunner.TIME_VIRTUAL_COLUMN, - ColumnType.LONG) + new DefaultDimensionSpec( + MSQCompactionRunner.TIME_VIRTUAL_COLUMN, + MSQCompactionRunner.TIME_VIRTUAL_COLUMN, + ColumnType.LONG + ) ); String mvToArrayStringDim = MSQCompactionRunner.ARRAY_VIRTUAL_COLUMN_PREFIX + MV_STRING_DIMENSION.getName(); - expectedDimensionSpec.add(new DefaultDimensionSpec(mvToArrayStringDim, mvToArrayStringDim, ColumnType.STRING_ARRAY)); - MatcherAssert.assertThat( - expectedDimensionSpec, - Matchers.containsInAnyOrder(groupByQuery.getDimensions().toArray(new DimensionSpec[0])) - ); + // Since only MV_STRING_DIMENSION is indicated to be MVD by the CombinedSchema, conversion to array should happen + // only for that column. + expectedDimensionSpec.addAll(DIMENSIONS.stream() + .map(dim -> + MV_STRING_DIMENSION.getName().equals(dim.getName()) + ? new DefaultDimensionSpec( + mvToArrayStringDim, + mvToArrayStringDim, + ColumnType.STRING_ARRAY + ) + : new DefaultDimensionSpec( + dim.getName(), + dim.getName(), + dim.getColumnType() + )) + .collect(Collectors.toList())); + Assert.assertEquals(expectedDimensionSpec, groupByQuery.getDimensions()); } private CompactionTask createCompactionTask( @@ -538,4 +578,27 @@ private static IndexSpec createIndexSpec() .withLongEncoding(CompressionFactory.LongEncodingStrategy.LONGS) .build(); } + + private static DataSourceMSQDestination getExpectedDestination() + { + return new DataSourceMSQDestination( + DATA_SOURCE, + SEGMENT_GRANULARITY.getDefaultGranularity(), + null, + Collections.singletonList(COMPACTION_INTERVAL), + DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName, Function.identity())), + null + ); + } + + private static MSQTuningConfig getExpectedTuningConfig() + { + return new MSQTuningConfig( + 1, + MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY, + MAX_ROWS_PER_SEGMENT, + null, + createIndexSpec() + ); + } }