From 5f86beae7fe308cf30408c53fe560bf4cb4f72d4 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 17 Jun 2024 21:43:37 +0530 Subject: [PATCH 1/6] init commit --- .../scan/ScanResultValueFramesIterable.java | 66 +++++++++++++--- .../ScanResultValueFramesIterableTest.java | 79 +++++++++++++++++++ 2 files changed, 135 insertions(+), 10 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java index 42f57628461b..43f3a03e84d2 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java @@ -22,6 +22,9 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; import org.apache.druid.frame.allocation.MemoryAllocatorFactory; @@ -36,6 +39,7 @@ import org.apache.druid.query.FrameSignaturePair; import org.apache.druid.query.IterableRowsCursorHelper; import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import java.io.Closeable; @@ -161,6 +165,20 @@ private static class ScanResultValueFramesIterator implements Iterator currentRows = null; + public ScanResultValueFramesIterator( Sequence resultSequence, @@ -200,26 +218,33 @@ public FrameSignaturePair next() // start all the processing populateCursor(); boolean firstRowWritten = false; - // While calling populateCursor() repeatedly, currentRowSignature might change. Therefore we store the signature + // While calling populateCursor() repeatedly, currentRowSignature might change. Therefore, we store the signature // with which we have written the frames - final RowSignature writtenSignature = currentRowSignature; + final RowSignature writtenSignature = trimmedRowSignature; FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory( FrameType.COLUMNAR, memoryAllocatorFactory, - currentRowSignature, + trimmedRowSignature, Collections.emptyList() ); Frame frame; - try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(new SettableCursorColumnSelectorFactory( - () -> currentCursor, - currentRowSignature - ))) { + try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter( + new SettableCursorColumnSelectorFactory(() -> currentCursor, currentRowSignature))) { while (populateCursor()) { // Do till we don't have any more rows, or the next row isn't compatible with the current row if (!frameWriter.addSelection()) { // Add the cursor's row to the frame, till the frame is full break; } + + for (Integer columnNumber : nullTypedColumns) { + if (currentRows.get(currentRowIndex)[columnNumber] != null) { + throw DruidException.defensive("Expected a null value"); + } + } + firstRowWritten = true; + // Check that the columns with the null types are actually null before advancing currentCursor.advance(); + currentRowIndex++; } if (!firstRowWritten) { @@ -276,19 +301,34 @@ private boolean populateCursor() // At this point, we know that we need to move to the next non-empty cursor, AND it exists, because // done() is not false ScanResultValue scanResultValue = resultSequenceIterator.next(); + final RowSignature rowSignature = scanResultValue.getRowSignature() != null ? scanResultValue.getRowSignature() : defaultRowSignature; + RowSignature modifiedRowSignature = useNestedForUnknownTypes ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) : rowSignature; + IntList currentNullTypedColumns = new IntArrayList(); + RowSignature.Builder modifiedTrimmedRowSignatureBuilder = RowSignature.builder(); + + for (int i = 0; i < modifiedRowSignature.size(); ++i) { + ColumnType columnType = modifiedRowSignature.getColumnType(i).orElse(null); + if (columnType == null) { + currentNullTypedColumns.add(i); + } else { + modifiedTrimmedRowSignatureBuilder.add(modifiedRowSignature.getColumnName(i), columnType); + } + } + + RowSignature modifiedTrimmedRowSignature = modifiedTrimmedRowSignatureBuilder.build(); + // currentRowSignature at this time points to the previous row's signature - final boolean compatible = modifiedRowSignature != null - && modifiedRowSignature.equals(currentRowSignature); + final boolean compatible = modifiedTrimmedRowSignature.equals(trimmedRowSignature); final List rows = (List) scanResultValue.getEvents(); - final Iterable formattedRows = Lists.newArrayList(Iterables.transform( + final List formattedRows = Lists.newArrayList(Iterables.transform( rows, (Function) resultFormatMapper.apply(modifiedRowSignature) )); @@ -307,6 +347,12 @@ private boolean populateCursor() } currentRowSignature = modifiedRowSignature; + trimmedRowSignature = modifiedTrimmedRowSignature; + nullTypedColumns = currentNullTypedColumns; + currentRows = formattedRows; + currentRowIndex = 0; + + return compatible; } } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java index bdd64c1c8bd8..827fc351e5d5 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java @@ -55,6 +55,18 @@ public class ScanResultValueFramesIterableTest extends InitializedNullHandlingTe .add("col2", ColumnType.LONG) .build(); + private static final RowSignature SIGNATURE3 = RowSignature.builder() + .add("col1", ColumnType.DOUBLE) + .add("col2", ColumnType.LONG) + .add("col3", null) + .build(); + + private static final RowSignature SIGNATURE4 = RowSignature.builder() + .add("col1", ColumnType.DOUBLE) + .add("col3", null) + .add("col2", ColumnType.LONG) + .build(); + @Test public void testEmptySequence() @@ -191,6 +203,32 @@ public void testBatchingWithHeterogenousScanResultValues() ); } + @Test + public void testBatchingWithHeterogenousScanResultValuesAndNullTypes() + { + List frames = Lists.newArrayList( + createIterable( + scanResultValue1(2), + scanResultValue3(2) + ) + ); + Assert.assertEquals(2, frames.size()); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{1L, 1.0D}, + new Object[]{2L, 2.0D} + ), + new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE1).getRowsAsSequence() + ); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{5.0D, 5L}, + new Object[]{6.0D, 6L} + ), + new FrameBasedInlineDataSource(Collections.singletonList(frames.get(1)), SIGNATURE2).getRowsAsSequence() + ); + } + @Test public void testBatchingWithHeterogenousAndEmptyScanResultValues() { @@ -222,6 +260,37 @@ public void testBatchingWithHeterogenousAndEmptyScanResultValues() ); } + @Test + public void testBatchingWithHeterogenousAndEmptyScanResultValuesAndNullTypes() + { + List frames = Lists.newArrayList( + createIterable( + scanResultValue1(0), + scanResultValue2(0), + scanResultValue1(2), + scanResultValue1(0), + scanResultValue2(2), + scanResultValue2(0), + scanResultValue2(0) + ) + ); + Assert.assertEquals(2, frames.size()); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{1L, 1.0D}, + new Object[]{2L, 2.0D} + ), + new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE1).getRowsAsSequence() + ); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{3.0D, 3L}, + new Object[]{4.0D, 4L} + ), + new FrameBasedInlineDataSource(Collections.singletonList(frames.get(1)), SIGNATURE2).getRowsAsSequence() + ); + } + @Test public void testSplitting() { @@ -267,4 +336,14 @@ private static ScanResultValue scanResultValue2(int numRows) SIGNATURE2 ); } + + private static ScanResultValue scanResultValue3(int numRows) + { + return new ScanResultValue( + "dummy", + ImmutableList.of("col1", "col2", "col3"), + IntStream.range(5, 5 + numRows).mapToObj(i -> new Object[]{(double) i, i, null}).collect(Collectors.toList()), + SIGNATURE3 + ); + } } From a95807d1e40a7e72d5d8dd2212a3a5549831eb17 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 18 Jun 2024 18:29:45 +0530 Subject: [PATCH 2/6] more tests --- .../scan/ScanResultValueFramesIterable.java | 37 ++++++++---- .../ScanResultValueFramesIterableTest.java | 59 +++++++++++++++++++ 2 files changed, 85 insertions(+), 11 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java index 43f3a03e84d2..817c66e27104 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java @@ -161,24 +161,35 @@ private static class ScanResultValueFramesIterator implements Iterator currentRows = null; /** - * Current row index + * Row index pointing to the current row in {@link #currentRows}. This is the exact same row that the {@link #currentCursor} + * is also pointing at. Therefore {@link #currentRows} + {@link #currentCursor} represent the same information as presented + * by {@link #currentCursor}. */ - int currentRowIndex = 0; + int currentRowIndex = -1; /** - * + * Row signature of the current cursor. This is used to create the cursor out of the ScanResultValue. We have to use + * the full signature because the ScanResultValue will have */ - IntList nullTypedColumns = null; + RowSignature currentRowSignature = null; - List currentRows = null; + /** + * Row signature of the current cursor, with columns having unknown (null) types trimmed out. This is used to write + * the rows onto the frame. There's an implicit assumption (that we verify), that columns with null typed only + * contain null values, because the underlying segment didn't have the column. + */ + RowSignature trimmedRowSignature = null; + /** + * Columns of the currentRows with missing type information. As we materialize the rows onto the frames, we also + * verify that these columns only contain null values. + */ + IntList nullTypedColumns = null; public ScanResultValueFramesIterator( Sequence resultSequence, @@ -282,7 +293,9 @@ private boolean done() * if (hasNext()) was true before calling the method - * 1. {@link #currentCursor} - Points to the cursor with non-empty value (i.e. isDone()) is false, and the cursor points * to the next row present in the sequence of the scan result values. This row would get materialized to frame - * 2. {@link #currentRowSignature} - Row signature of the row. + * 2. {@link #currentRowSignature} - Row signature of the row + * 3. {@link #currentRows} - Points to the group of rows underlying the currentCursor + * 4. {@link #currentRowIndex} - Reset to 0 if we modified the cursor, else untouched *

* Return value - * if (hasNext()) is false before calling the method - returns false @@ -324,7 +337,9 @@ private boolean populateCursor() RowSignature modifiedTrimmedRowSignature = modifiedTrimmedRowSignatureBuilder.build(); - // currentRowSignature at this time points to the previous row's signature + // currentRowSignature at this time points to the previous row's signature. We look at the trimmed signature + // because that is the one used to write onto the frames, and if two rows have same trimmed signature, we can + // write both the rows onto the same frame final boolean compatible = modifiedTrimmedRowSignature.equals(trimmedRowSignature); final List rows = (List) scanResultValue.getEvents(); diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java index 827fc351e5d5..2d6922863826 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java @@ -21,7 +21,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory; +import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.FrameBasedInlineDataSource; import org.apache.druid.query.FrameSignaturePair; @@ -291,6 +293,40 @@ public void testBatchingWithHeterogenousAndEmptyScanResultValuesAndNullTypes() ); } + @Test + public void testBatchingWithDifferentRowSignaturesButSameTrimmedRowSignature() + { + List frames = Lists.newArrayList( + createIterable( + scanResultValue3(0), + scanResultValue4(0), + scanResultValue3(2), + scanResultValue3(0), + scanResultValue4(2), + scanResultValue4(0), + scanResultValue3(0) + ) + ); + Assert.assertEquals(1, frames.size()); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{5.0D, 5L}, + new Object[]{6.0D, 6L}, + new Object[]{7.0D, 7L}, + new Object[]{8.0D, 8L} + ), + new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE2).getRowsAsSequence() + ); + } + + @Test + public void testExceptionThrownWithMissingType() + { + Sequence frames = Sequences.simple(createIterable(incompleteTypeScanResultValue(1))); + Assert.assertThrows(DruidException.class, frames::toList); + } + + @Test public void testSplitting() { @@ -337,6 +373,7 @@ private static ScanResultValue scanResultValue2(int numRows) ); } + // Signature: col1: DOUBLE, col2: LONG, col3: null private static ScanResultValue scanResultValue3(int numRows) { return new ScanResultValue( @@ -346,4 +383,26 @@ private static ScanResultValue scanResultValue3(int numRows) SIGNATURE3 ); } + + // Signature: col1: DOUBLE, col3: null, col2: LONG + private static ScanResultValue scanResultValue4(int numRows) + { + return new ScanResultValue( + "dummy", + ImmutableList.of("col1", "col3", "col2"), + IntStream.range(7, 7 + numRows).mapToObj(i -> new Object[]{(double) i, null, i}).collect(Collectors.toList()), + SIGNATURE4 + ); + } + + // Contains ScanResultValue with incomplete type, and non-null row + private static ScanResultValue incompleteTypeScanResultValue(int numRows) + { + return new ScanResultValue( + "dummy", + ImmutableList.of("col1", "col3", "col2"), + IntStream.range(7, 7 + numRows).mapToObj(i -> new Object[]{(double) i, i, i}).collect(Collectors.toList()), + SIGNATURE4 + ); + } } From e66b9487a4763052e103bd975ec903aec8964fe1 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 18 Jun 2024 19:08:04 +0530 Subject: [PATCH 3/6] add calcite tests --- .../sql/calcite/BaseCalciteQueryTest.java | 2 + .../sql/calcite/CalciteSubqueryTest.java | 202 ++++++++++++++++++ 2 files changed, 204 insertions(+) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index dfee7d0e3a22..606710ff53b5 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -256,6 +256,8 @@ public static void setupNullValues() ImmutableMap.builder() .putAll(QUERY_CONTEXT_DEFAULT) .put(QueryContexts.MAX_SUBQUERY_BYTES_KEY, "100000") + // Disallows the fallback to row based limiting + .put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, "1") .build(); // Add additional context to the given context map for when the diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java index 808ab5c36630..f566aa6556ab 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java @@ -20,7 +20,15 @@ package org.apache.druid.sql.calcite; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Injector; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; @@ -31,6 +39,7 @@ import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.ResourceLimitExceededException; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -54,12 +63,21 @@ import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.topn.DimensionTopNMetricSpec; import org.apache.druid.query.topn.TopNQueryBuilder; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.join.JoinType; +import org.apache.druid.segment.join.JoinableFactoryWrapper; +import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.sql.calcite.expression.DruidExpression; import org.apache.druid.sql.calcite.filtration.Filtration; import org.apache.druid.sql.calcite.util.CalciteTests; +import org.apache.druid.sql.calcite.util.SqlTestFramework; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.hamcrest.CoreMatchers; import org.joda.time.DateTimeZone; import org.joda.time.Period; @@ -67,12 +85,14 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -84,6 +104,7 @@ * 1. Where the memory limit is not set. The intermediate results are materialized as inline rows * 2. Where the memory limit is set. The intermediate results are materialized as frames */ +@SqlTestFrameworkConfig.ComponentSupplier(CalciteSubqueryTest.SubqueryComponentSupplier.class) public class CalciteSubqueryTest extends BaseCalciteQueryTest { public static Iterable constructorFeeder() @@ -147,6 +168,57 @@ public void testExactCountDistinctUsingSubqueryWithWhereToOuterFilter(String tes ); } + @MethodSource("constructorFeeder") + @ParameterizedTest(name = "{0}") + public void testSubqueryOnDataSourceWithMissingColumnsInSegments(String testName, Map queryContext) + { + if (!queryContext.containsKey(QueryContexts.MAX_SUBQUERY_BYTES_KEY)) { + cannotVectorize(); + } + testQuery( + "SELECT\n" + + " __time,\n" + + " col1,\n" + + " col2,\n" + + " col3,\n" + + " COUNT(*)\n" + + "FROM (SELECT * FROM dsMissingCol LIMIT 10)\n" + + "GROUP BY 1, 2, 3, 4", + queryContext, + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + newScanQueryBuilder() + .dataSource("dsMissingCol") + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "col1", "col2", "col3") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .limit(10) + .build() + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setDimensions( + new DefaultDimensionSpec("__time", "d0", ColumnType.LONG), + new DefaultDimensionSpec("col1", "d1", ColumnType.STRING), + new DefaultDimensionSpec("col2", "d2", ColumnType.STRING), + new DefaultDimensionSpec("col3", "d3", ColumnType.STRING) + ) + .setAggregatorSpecs(aggregators( + new CountAggregatorFactory("a0") + )) + .setContext(queryContext) + .build() + ), + ImmutableList.of( + new Object[]{946684800000L, "abc", null, "def", 1L}, + new Object[]{946684800000L, "foo", "bar", null, 1L} + ) + ); + } + @MethodSource("constructorFeeder") @ParameterizedTest(name = "{0}") public void testExactCountDistinctOfSemiJoinResult(String testName, Map queryContext) @@ -1315,4 +1387,134 @@ public void testSingleValueEmptyInnerAgg(String testName, Map qu ImmutableList.of() ); } + + public static class SubqueryComponentSupplier extends SqlTestFramework.StandardComponentSupplier + { + + private final TempDirProducer tmpDirProducer; + + public SubqueryComponentSupplier(TempDirProducer tempDirProducer) + { + super(tempDirProducer); + this.tmpDirProducer = tempDirProducer; + } + + @Override + public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( + QueryRunnerFactoryConglomerate conglomerate, + JoinableFactoryWrapper joinableFactory, + Injector injector + ) + { + SpecificSegmentsQuerySegmentWalker walker = + super.createQuerySegmentWalker(conglomerate, joinableFactory, injector); + + final String datasource1 = "dsMissingCol"; + final File tmpFolder = tempDirProducer.newTempFolder(); + + final List> rawRows1 = ImmutableList.of( + ImmutableMap.builder() + .put("t", "2000-01-01") + .put("col1", "foo") + .put("col2", "bar") + .build() + ); + final List rows1 = + rawRows1 + .stream() + .map(mapInputRow -> MapInputRowParser.parse( + new InputRowSchema( + new TimestampSpec("t", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("col1", "col2")) + ), + null + ), + mapInputRow + )) + .collect(Collectors.toList()); + final QueryableIndex queryableIndex1 = IndexBuilder + .create() + .tmpDir(new File(tmpFolder, datasource1)) + .segmentWriteOutMediumFactory(OnHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema(new IncrementalIndexSchema.Builder() + .withRollup(false) + .withDimensionsSpec( + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("col1"), + new StringDimensionSchema("col2") + ) + ) + ) + .build() + ) + .rows(rows1) + .buildMMappedIndex(); + + final List> rawRows2 = ImmutableList.of( + ImmutableMap.builder() + .put("t", "2000-01-01") + .put("col1", "abc") + .put("col3", "def") + .build() + ); + final List rows2 = + rawRows2 + .stream() + .map(mapInputRow -> MapInputRowParser.parse( + new InputRowSchema( + new TimestampSpec("t", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(ImmutableList.of("col1", "col3")) + ), + null + ), + mapInputRow + )) + .collect(Collectors.toList()); + final QueryableIndex queryableIndex2 = IndexBuilder + .create() + .tmpDir(new File(tmpFolder, datasource1)) + .segmentWriteOutMediumFactory(OnHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema(new IncrementalIndexSchema.Builder() + .withRollup(false) + .withDimensionsSpec( + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("col1"), + new StringDimensionSchema("col3") + ) + ) + ) + .build() + ) + .rows(rows2) + .buildMMappedIndex(); + + walker.add( + DataSegment.builder() + .dataSource(datasource1) + .interval(Intervals.ETERNITY) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(), + queryableIndex1 + ); + + walker.add( + DataSegment.builder() + .dataSource(datasource1) + .interval(Intervals.ETERNITY) + .version("1") + .shardSpec(new LinearShardSpec(1)) + .size(0) + .build(), + queryableIndex2 + ); + + return walker; + } + } } From 591e25cbf8a4dbecd418c7114c15ff88a7f1727d Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 18 Jun 2024 20:29:39 +0530 Subject: [PATCH 4/6] review comments --- .../scan/ScanResultValueFramesIterable.java | 31 +++--- .../ScanResultValueFramesIterableTest.java | 94 ++++++++++--------- .../sql/calcite/CalciteSubqueryTest.java | 2 +- 3 files changed, 68 insertions(+), 59 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java index 817c66e27104..a9fb108d71a6 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java @@ -173,17 +173,16 @@ private static class ScanResultValueFramesIterator implements Iterator currentCursor, currentRowSignature))) { + new SettableCursorColumnSelectorFactory(() -> currentCursor, currentInputRowSignature))) { while (populateCursor()) { // Do till we don't have any more rows, or the next row isn't compatible with the current row if (!frameWriter.addSelection()) { // Add the cursor's row to the frame, till the frame is full break; @@ -264,7 +261,9 @@ public FrameSignaturePair next() frame = Frame.wrap(frameWriter.toByteArray()); } - return new FrameSignaturePair(frame, writtenSignature); + // While calling populateCursor() repeatedly, currentRowSignature might change. Therefore, we store the signature + // with which we have written the frames + return new FrameSignaturePair(frame, frameWriterFactory.signature()); } /** @@ -280,7 +279,7 @@ private boolean done() /** * This is the most important method of this iterator. This determines if two consecutive scan result values can - * be batched or not, populates the value of the {@link #currentCursor} and {@link #currentRowSignature}, + * be batched or not, populates the value of the {@link #currentCursor} and {@link #currentInputRowSignature}, * during the course of the iterator, and facilitates the {@link #next()} *

* Multiple calls to populateCursor, without advancing the {@link #currentCursor} is idempotent. This allows successive @@ -293,7 +292,7 @@ private boolean done() * if (hasNext()) was true before calling the method - * 1. {@link #currentCursor} - Points to the cursor with non-empty value (i.e. isDone()) is false, and the cursor points * to the next row present in the sequence of the scan result values. This row would get materialized to frame - * 2. {@link #currentRowSignature} - Row signature of the row + * 2. {@link #currentInputRowSignature} - Row signature of the row * 3. {@link #currentRows} - Points to the group of rows underlying the currentCursor * 4. {@link #currentRowIndex} - Reset to 0 if we modified the cursor, else untouched *

@@ -340,7 +339,7 @@ private boolean populateCursor() // currentRowSignature at this time points to the previous row's signature. We look at the trimmed signature // because that is the one used to write onto the frames, and if two rows have same trimmed signature, we can // write both the rows onto the same frame - final boolean compatible = modifiedTrimmedRowSignature.equals(trimmedRowSignature); + final boolean compatible = modifiedTrimmedRowSignature.equals(currentOutputRowSignature); final List rows = (List) scanResultValue.getEvents(); final List formattedRows = Lists.newArrayList(Iterables.transform( @@ -361,8 +360,8 @@ private boolean populateCursor() return populateCursor(); } - currentRowSignature = modifiedRowSignature; - trimmedRowSignature = modifiedTrimmedRowSignature; + currentInputRowSignature = modifiedRowSignature; + currentOutputRowSignature = modifiedTrimmedRowSignature; nullTypedColumns = currentNullTypedColumns; currentRows = formattedRows; currentRowIndex = 0; diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java index 2d6922863826..8ffaa45de797 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java @@ -111,10 +111,10 @@ public void testBatchingWithHomogenousScanResultValues() Assert.assertEquals(1, frames.size()); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D}, - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D} + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D}, + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D} ), new FrameBasedInlineDataSource(frames, SIGNATURE1).getRowsAsSequence() ); @@ -167,10 +167,10 @@ public void testBatchingWithHomogenousAndEmptyScanResultValues() Assert.assertEquals(1, frames.size()); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D}, - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D} + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D}, + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D} ), new FrameBasedInlineDataSource(frames, SIGNATURE1).getRowsAsSequence() ); @@ -191,17 +191,17 @@ public void testBatchingWithHeterogenousScanResultValues() Assert.assertEquals(2, frames.size()); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D} + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D} ), - new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE1).getRowsAsSequence() + new FrameBasedInlineDataSource(frames.subList(0, 1), SIGNATURE1).getRowsAsSequence() ); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{3.0D, 3L}, - new Object[]{4.0D, 4L} + new Object[]{2000.0D, 2100L}, + new Object[]{2001.0D, 2101L} ), - new FrameBasedInlineDataSource(Collections.singletonList(frames.get(1)), SIGNATURE2).getRowsAsSequence() + new FrameBasedInlineDataSource(frames.subList(1, 2), SIGNATURE2).getRowsAsSequence() ); } @@ -217,17 +217,17 @@ public void testBatchingWithHeterogenousScanResultValuesAndNullTypes() Assert.assertEquals(2, frames.size()); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D} + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D} ), - new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE1).getRowsAsSequence() + new FrameBasedInlineDataSource(frames.subList(0, 1), SIGNATURE1).getRowsAsSequence() ); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{5.0D, 5L}, - new Object[]{6.0D, 6L} + new Object[]{3000.0D, 3100L}, + new Object[]{3001.0D, 3101L} ), - new FrameBasedInlineDataSource(Collections.singletonList(frames.get(1)), SIGNATURE2).getRowsAsSequence() + new FrameBasedInlineDataSource(frames.subList(1, 2), SIGNATURE2).getRowsAsSequence() ); } @@ -248,17 +248,17 @@ public void testBatchingWithHeterogenousAndEmptyScanResultValues() Assert.assertEquals(2, frames.size()); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D} + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D} ), - new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE1).getRowsAsSequence() + new FrameBasedInlineDataSource(frames.subList(0, 1), SIGNATURE1).getRowsAsSequence() ); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{3.0D, 3L}, - new Object[]{4.0D, 4L} + new Object[]{2000.0D, 2100L}, + new Object[]{2001.0D, 2101L} ), - new FrameBasedInlineDataSource(Collections.singletonList(frames.get(1)), SIGNATURE2).getRowsAsSequence() + new FrameBasedInlineDataSource(frames.subList(1, 2), SIGNATURE2).getRowsAsSequence() ); } @@ -279,17 +279,17 @@ public void testBatchingWithHeterogenousAndEmptyScanResultValuesAndNullTypes() Assert.assertEquals(2, frames.size()); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{1L, 1.0D}, - new Object[]{2L, 2.0D} + new Object[]{1000L, 1100.0D}, + new Object[]{1001L, 1101.0D} ), - new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE1).getRowsAsSequence() + new FrameBasedInlineDataSource(frames.subList(0, 1), SIGNATURE1).getRowsAsSequence() ); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{3.0D, 3L}, - new Object[]{4.0D, 4L} + new Object[]{2000.0D, 2100L}, + new Object[]{2001.0D, 2101L} ), - new FrameBasedInlineDataSource(Collections.singletonList(frames.get(1)), SIGNATURE2).getRowsAsSequence() + new FrameBasedInlineDataSource(frames.subList(1, 2), SIGNATURE2).getRowsAsSequence() ); } @@ -310,12 +310,12 @@ public void testBatchingWithDifferentRowSignaturesButSameTrimmedRowSignature() Assert.assertEquals(1, frames.size()); QueryToolChestTestHelper.assertArrayResultsEquals( ImmutableList.of( - new Object[]{5.0D, 5L}, - new Object[]{6.0D, 6L}, - new Object[]{7.0D, 7L}, - new Object[]{8.0D, 8L} + new Object[]{3000.0D, 3100L}, + new Object[]{3001.0D, 3101L}, + new Object[]{4000.0D, 4100L}, + new Object[]{4001.0D, 4101L} ), - new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE2).getRowsAsSequence() + new FrameBasedInlineDataSource(frames, SIGNATURE2).getRowsAsSequence() ); } @@ -357,7 +357,9 @@ private static ScanResultValue scanResultValue1(int numRows) return new ScanResultValue( "dummy", ImmutableList.of("col1", "col2"), - IntStream.range(1, 1 + numRows).mapToObj(i -> new Object[]{i, (double) i}).collect(Collectors.toList()), + IntStream.range(1000, 1000 + numRows) + .mapToObj(i -> new Object[]{i, (double) i + 100}) + .collect(Collectors.toList()), SIGNATURE1 ); } @@ -368,7 +370,9 @@ private static ScanResultValue scanResultValue2(int numRows) return new ScanResultValue( "dummy", ImmutableList.of("col1", "col2"), - IntStream.range(3, 3 + numRows).mapToObj(i -> new Object[]{(double) i, i}).collect(Collectors.toList()), + IntStream.range(2000, 2000 + numRows) + .mapToObj(i -> new Object[]{(double) i, i + 100}) + .collect(Collectors.toList()), SIGNATURE2 ); } @@ -379,7 +383,9 @@ private static ScanResultValue scanResultValue3(int numRows) return new ScanResultValue( "dummy", ImmutableList.of("col1", "col2", "col3"), - IntStream.range(5, 5 + numRows).mapToObj(i -> new Object[]{(double) i, i, null}).collect(Collectors.toList()), + IntStream.range(3000, 3000 + numRows) + .mapToObj(i -> new Object[]{(double) i, i + 100, null}) + .collect(Collectors.toList()), SIGNATURE3 ); } @@ -390,7 +396,9 @@ private static ScanResultValue scanResultValue4(int numRows) return new ScanResultValue( "dummy", ImmutableList.of("col1", "col3", "col2"), - IntStream.range(7, 7 + numRows).mapToObj(i -> new Object[]{(double) i, null, i}).collect(Collectors.toList()), + IntStream.range(4000, 4000 + numRows) + .mapToObj(i -> new Object[]{(double) i, null, i + 100}) + .collect(Collectors.toList()), SIGNATURE4 ); } @@ -401,7 +409,9 @@ private static ScanResultValue incompleteTypeScanResultValue(int numRows) return new ScanResultValue( "dummy", ImmutableList.of("col1", "col3", "col2"), - IntStream.range(7, 7 + numRows).mapToObj(i -> new Object[]{(double) i, i, i}).collect(Collectors.toList()), + IntStream.range(5000, 5000 + numRows) + .mapToObj(i -> new Object[]{(double) i, i + 100, i + 200}) + .collect(Collectors.toList()), SIGNATURE4 ); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java index f566aa6556ab..cf496354ff3d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java @@ -1392,7 +1392,7 @@ public static class SubqueryComponentSupplier extends SqlTestFramework.StandardC { private final TempDirProducer tmpDirProducer; - + public SubqueryComponentSupplier(TempDirProducer tempDirProducer) { super(tempDirProducer); From 180f9fe9a2c84ba719a02d1e325a657a9eb3eaa9 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 21 Jun 2024 09:49:39 +0530 Subject: [PATCH 5/6] review comments --- .../scan/ScanResultValueFramesIterable.java | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java index a9fb108d71a6..2f3b988d34aa 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java @@ -191,11 +191,11 @@ private static class ScanResultValueFramesIterator implements Iterator resultSequence, - MemoryAllocatorFactory memoryAllocatorFactory, - boolean useNestedForUnknownTypes, - RowSignature defaultRowSignature, - Function> resultFormatMapper + final Sequence resultSequence, + final MemoryAllocatorFactory memoryAllocatorFactory, + final boolean useNestedForUnknownTypes, + final RowSignature defaultRowSignature, + final Function> resultFormatMapper ) { this.memoryAllocatorFactory = memoryAllocatorFactory; @@ -229,13 +229,13 @@ public FrameSignaturePair next() populateCursor(); boolean firstRowWritten = false; - FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory( + final FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory( FrameType.COLUMNAR, memoryAllocatorFactory, currentOutputRowSignature, Collections.emptyList() ); - Frame frame; + final Frame frame; try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter( new SettableCursorColumnSelectorFactory(() -> currentCursor, currentInputRowSignature))) { while (populateCursor()) { // Do till we don't have any more rows, or the next row isn't compatible with the current row @@ -243,14 +243,18 @@ public FrameSignaturePair next() break; } + // Check that the columns with the null types are actually null before advancing + final Object[] currentRow = currentRows.get(currentRowIndex); for (Integer columnNumber : nullTypedColumns) { - if (currentRows.get(currentRowIndex)[columnNumber] != null) { - throw DruidException.defensive("Expected a null value"); + if (currentRow[columnNumber] != null) { + throw DruidException.defensive( + "Expected a null value for column [%s]", + frameWriterFactory.signature().getColumnName(columnNumber) + ); } } firstRowWritten = true; - // Check that the columns with the null types are actually null before advancing currentCursor.advance(); currentRowIndex++; } @@ -312,18 +316,18 @@ private boolean populateCursor() // At this point, we know that we need to move to the next non-empty cursor, AND it exists, because // done() is not false - ScanResultValue scanResultValue = resultSequenceIterator.next(); + final ScanResultValue scanResultValue = resultSequenceIterator.next(); final RowSignature rowSignature = scanResultValue.getRowSignature() != null ? scanResultValue.getRowSignature() : defaultRowSignature; - RowSignature modifiedRowSignature = useNestedForUnknownTypes + final RowSignature modifiedRowSignature = useNestedForUnknownTypes ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) : rowSignature; - IntList currentNullTypedColumns = new IntArrayList(); - RowSignature.Builder modifiedTrimmedRowSignatureBuilder = RowSignature.builder(); + final IntList currentNullTypedColumns = new IntArrayList(); + final RowSignature.Builder modifiedTrimmedRowSignatureBuilder = RowSignature.builder(); for (int i = 0; i < modifiedRowSignature.size(); ++i) { ColumnType columnType = modifiedRowSignature.getColumnType(i).orElse(null); @@ -334,7 +338,7 @@ private boolean populateCursor() } } - RowSignature modifiedTrimmedRowSignature = modifiedTrimmedRowSignatureBuilder.build(); + final RowSignature modifiedTrimmedRowSignature = modifiedTrimmedRowSignatureBuilder.build(); // currentRowSignature at this time points to the previous row's signature. We look at the trimmed signature // because that is the one used to write onto the frames, and if two rows have same trimmed signature, we can @@ -347,7 +351,7 @@ private boolean populateCursor() (Function) resultFormatMapper.apply(modifiedRowSignature) )); - Pair cursorAndCloseable = IterableRowsCursorHelper.getCursorFromIterable( + final Pair cursorAndCloseable = IterableRowsCursorHelper.getCursorFromIterable( formattedRows, modifiedRowSignature ); From 9d64ed4a43521c33fc3c415df343bd8ad64812f8 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Sun, 23 Jun 2024 17:38:15 +0530 Subject: [PATCH 6/6] fix tests --- .../org/apache/druid/sql/calcite/CalciteSubqueryTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java index cf496354ff3d..6269e2a5c8cc 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java @@ -213,8 +213,8 @@ public void testSubqueryOnDataSourceWithMissingColumnsInSegments(String testName .build() ), ImmutableList.of( - new Object[]{946684800000L, "abc", null, "def", 1L}, - new Object[]{946684800000L, "foo", "bar", null, 1L} + new Object[]{946684800000L, "abc", NullHandling.defaultStringValue(), "def", 1L}, + new Object[]{946684800000L, "foo", "bar", NullHandling.defaultStringValue(), 1L} ) ); }