diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java index 3039fbb7c843..012e2e801a0d 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java @@ -70,7 +70,7 @@ public class AvroOCFReader extends IntermediateRowParsingReader false, binaryAsString, extractUnionsByType, - inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + inputRowSchema.getDimensionsSpec().useSchemaDiscovery() ) ); } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java index 12f8a20bab7b..283286b1bbb7 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java @@ -59,7 +59,7 @@ public static ObjectFlattener makeFlattener( fromPigAvroStorage, binaryAsString, extractUnionsByType, - dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery() + dimensionsSpec != null && dimensionsSpec.useSchemaDiscovery() ) ); } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java index 51bc733d4305..c63e83b065a3 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java @@ -66,7 +66,7 @@ public class AvroStreamReader extends IntermediateRowParsingReader flattenSpec, new OrcStructFlattenerMaker( binaryAsString, - inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + inputRowSchema.getDimensionsSpec().useSchemaDiscovery() ) ); } diff --git a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java index cf8ba19d9b74..3013b4439ef2 100644 --- a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java +++ b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java @@ -432,7 +432,7 @@ public void testNestedColumnSchemaless() throws IOException ); final InputRowSchema schema = new InputRowSchema( new TimestampSpec("ts", "millis", null), - DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(), + DimensionsSpec.builder().useSchemaDiscovery(true).build(), ColumnsFilter.all(), null ); diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java index 494f66251633..0bc5f374a0cf 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java @@ -70,7 +70,7 @@ public class ParquetReader extends IntermediateRowParsingReader flattenSpec, new ParquetGroupFlattenerMaker( binaryAsString, - inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + inputRowSchema.getDimensionsSpec().useSchemaDiscovery() ) ); } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java index 5c2dd2844f19..1bef1e9e9129 100755 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java @@ -85,7 +85,7 @@ public ParquetAvroHadoopInputRowParser( false, this.binaryAsString, this.extractUnionsByType, - dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery() + dimensionsSpec != null && dimensionsSpec.useSchemaDiscovery() ) ); } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java index f3e853968511..41db6efe2009 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java @@ -60,7 +60,7 @@ public ParquetHadoopInputRowParser( flattenSpec, new ParquetGroupFlattenerMaker( this.binaryAsString, - dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery() + dimensionsSpec != null && dimensionsSpec.useSchemaDiscovery() ) ); this.parser = new MapInputRowParser(parseSpec); diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java index 950039d06956..cd587f444f82 100644 --- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java @@ -169,7 +169,7 @@ public void testNestedColumnSchemalessNestedTestFileNoNested() throws IOExceptio final String file = "example/flattening/test_nested_1.parquet"; InputRowSchema schema = new InputRowSchema( new TimestampSpec("timestamp", "auto", null), - DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(false).build(), + DimensionsSpec.builder().useSchemaDiscovery(false).build(), ColumnsFilter.all(), null ); @@ -206,7 +206,7 @@ public void testNestedColumnSchemalessNestedTestFile() throws IOException final String file = "example/flattening/test_nested_1.parquet"; InputRowSchema schema = new InputRowSchema( new TimestampSpec("timestamp", "auto", null), - DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(), + DimensionsSpec.builder().useSchemaDiscovery(true).build(), ColumnsFilter.all(), null ); diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java index ac77a545dd7b..ecb2fbc7653c 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java @@ -59,7 +59,7 @@ public class ProtobufReader extends IntermediateRowParsingReader this.inputRowSchema = inputRowSchema; this.recordFlattener = ObjectFlatteners.create( flattenSpec, - new ProtobufFlattenerMaker(inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()) + new ProtobufFlattenerMaker(inputRowSchema.getDimensionsSpec().useSchemaDiscovery()) ); this.source = source; this.protobufBytesDecoder = protobufBytesDecoder; diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java index 9255e1e7ebd2..498174852bfc 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java @@ -345,7 +345,7 @@ public void testParseNestedDataSchemaless() throws Exception InputEntityReader reader = protobufInputFormat.createReader( new InputRowSchema( timestampSpec, - DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(), + DimensionsSpec.builder().useSchemaDiscovery(true).build(), null, null ), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java index c91d8434c21b..be3b31be3b70 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java @@ -31,6 +31,7 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.Row; +import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimedShutoffInputSourceReader; import org.apache.druid.data.input.impl.TimestampSpec; @@ -44,6 +45,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongMinAggregatorFactory; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexAddResult; import org.apache.druid.segment.incremental.IncrementalIndexSchema; @@ -232,9 +235,41 @@ public SamplerResponse sample( int numRowsRead = responseRows.size(); + List logicalDimensionSchemas = new ArrayList<>(); + List physicalDimensionSchemas = new ArrayList<>(); + + RowSignature.Builder signatureBuilder = RowSignature.builder(); + signatureBuilder.add( + ColumnHolder.TIME_COLUMN_NAME, + index.getColumnCapabilities(ColumnHolder.TIME_COLUMN_NAME).toColumnType() + ); + for (IncrementalIndex.DimensionDesc dimensionDesc : index.getDimensions()) { + if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionDesc.getName())) { + final ColumnType columnType = dimensionDesc.getCapabilities().toColumnType(); + signatureBuilder.add(dimensionDesc.getName(), columnType); + logicalDimensionSchemas.add( + DimensionSchema.getDefaultSchemaForBuiltInType(dimensionDesc.getName(), dimensionDesc.getCapabilities()) + ); + physicalDimensionSchemas.add( + dimensionDesc.getHandler().getDimensionSchema(dimensionDesc.getCapabilities()) + ); + } + } + for (AggregatorFactory aggregatorFactory : index.getMetricAggs()) { + if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(aggregatorFactory.getName())) { + signatureBuilder.add( + aggregatorFactory.getName(), + index.getColumnCapabilities(aggregatorFactory.getName()).toColumnType() + ); + } + } + return new SamplerResponse( numRowsRead, numRowsIndexed, + logicalDimensionSchemas, + physicalDimensionSchemas, + signatureBuilder.build(), responseRows.stream() .filter(Objects::nonNull) .filter(x -> x.getParsed() != null || x.isUnparseable() != null) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java index 464579764d11..e788545507cd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java @@ -27,16 +27,20 @@ import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InlineInputSource; +import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; import org.junit.Test; import java.util.List; -public class CsvInputSourceSamplerTest +public class CsvInputSourceSamplerTest extends InitializedNullHandlingTest { @Test public void testCSVColumnAllNull() @@ -73,6 +77,25 @@ public void testCSVColumnAllNull() Assert.assertEquals(4, response.getNumRowsRead()); Assert.assertEquals(4, response.getNumRowsIndexed()); Assert.assertEquals(4, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("FirstName"), + new StringDimensionSchema("LastName"), + new StringDimensionSchema("Number"), + new StringDimensionSchema("Gender") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("FirstName", ColumnType.STRING) + .add("LastName", ColumnType.STRING) + .add("Number", ColumnType.STRING) + .add("Gender", ColumnType.STRING) + .build(), + response.getLogicalSegmentSchema() + ); List data = response.getData(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java index b305cd989ede..bc92ff298b49 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import org.apache.druid.client.indexing.SamplerResponse; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputSource; @@ -29,6 +30,7 @@ import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.guice.FirehoseModule; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.indexing.DataSchema; import org.easymock.Capture; import org.easymock.EasyMock; @@ -104,7 +106,7 @@ public void testSerde() throws IOException EasyMock.capture(capturedInputFormat), EasyMock.capture(capturedDataSchema), EasyMock.capture(capturedSamplerConfig) - )).andReturn(new SamplerResponse(0, 0, null)); + )).andReturn(new SamplerResponse(0, 0, ImmutableList.of(), ImmutableList.of(), RowSignature.empty(), null)); replayAll(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index cbd58d1adfbb..a80aabdc34cb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -39,9 +39,11 @@ import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.guice.NestedDataModule; import org.apache.druid.indexing.seekablestream.RecordSupplierInputSource; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; @@ -58,9 +60,12 @@ import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.nested.StructuredData; import org.apache.druid.segment.realtime.firehose.InlineFirehoseFactory; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; @@ -127,7 +132,9 @@ private enum ParserType @Parameterized.Parameters(name = "parserType = {0}, useInputFormatApi={1}") public static Iterable constructorFeeder() { + NestedDataModule.registerHandlersAndSerde(); OBJECT_MAPPER.registerModules(new SamplerModule().getJacksonModules()); + OBJECT_MAPPER.registerModules(NestedDataModule.getJacksonModulesList()); return ImmutableList.of( new Object[]{ParserType.STR_JSON, false}, new Object[]{ParserType.STR_JSON, true}, @@ -181,6 +188,8 @@ public void testNoDataSchema() Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(0, response.getNumRowsIndexed()); Assert.assertEquals(6, response.getData().size()); + Assert.assertEquals(ImmutableList.of(), response.getLogicalDimensions()); + Assert.assertEquals(RowSignature.builder().addTimeColumn().build(), response.getLogicalSegmentSchema()); List data = response.getData(); @@ -254,6 +263,8 @@ public void testNoDataSchemaNumRows() Assert.assertEquals(3, response.getNumRowsRead()); Assert.assertEquals(0, response.getNumRowsIndexed()); Assert.assertEquals(3, response.getData().size()); + Assert.assertEquals(ImmutableList.of(), response.getLogicalDimensions()); + Assert.assertEquals(RowSignature.builder().addTimeColumn().build(), response.getLogicalSegmentSchema()); List data = response.getData(); @@ -300,6 +311,40 @@ public void testMissingValueTimestampSpec() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(6, response.getNumRowsIndexed()); Assert.assertEquals(6, response.getData().size()); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? ImmutableList.of( + new StringDimensionSchema("t"), + new StringDimensionSchema("dim1"), + new StringDimensionSchema("met1"), + new StringDimensionSchema("dim2") + ) + : ImmutableList.of( + new StringDimensionSchema("t"), + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2"), + new StringDimensionSchema("met1") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? RowSignature.builder() + .addTimeColumn() + .add("t", ColumnType.STRING) + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .build() + : RowSignature.builder() + .addTimeColumn() + .add("t", ColumnType.STRING) + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.STRING) + .build(), + response.getLogicalSegmentSchema() + ); List data = response.getData(); @@ -409,6 +454,36 @@ public void testWithTimestampSpec() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(6, response.getData().size()); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("met1"), + new StringDimensionSchema("dim2") + ) + : ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2"), + new StringDimensionSchema("met1") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .build() + : RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.STRING) + .build(), + response.getLogicalSegmentSchema() + ); List data = response.getData(); @@ -493,6 +568,137 @@ public void testWithTimestampSpec() throws IOException ); } + @Test + public void testWithTimestampSpecNestedDiscovery() throws IOException + { + + final TimestampSpec timestampSpec = new TimestampSpec("t", null, null); + final DimensionsSpec dimensionsSpec = DimensionsSpec.builder() + .useSchemaDiscovery(true) + .build(); + final DataSchema dataSchema = createDataSchema(timestampSpec, dimensionsSpec, null, null, null); + final InputSource inputSource = createInputSource(getTestRows(), dataSchema); + final InputFormat inputFormat = createInputFormat(); + + SamplerResponse response = inputSourceSampler.sample(inputSource, inputFormat, dataSchema, null); + + Assert.assertEquals(6, response.getNumRowsRead()); + Assert.assertEquals(5, response.getNumRowsIndexed()); + Assert.assertEquals(6, response.getData().size()); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? ImmutableList.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("met1"), + new StringDimensionSchema("dim2") + ) + : ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2"), + new StringDimensionSchema("met1") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .add("dim2", ColumnType.STRING) + .build() + : RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.STRING) + .build(), + response.getLogicalSegmentSchema() + ); + + List data = response.getData(); + + assertEqualsSamplerResponseRow( + new SamplerResponseRow( + getRawColumns().get(0), + new SamplerTestUtils.MapAllowingNullValuesBuilder() + .put("__time", 1555934400000L) + .put("dim2", StructuredData.wrap(null)) + .put("dim1", StructuredData.wrap("foo")) + .put("met1", StructuredData.wrap(parserType == ParserType.STR_JSON ? 1L : "1")) + .build(), + null, + null + ), + data.get(0) + ); + assertEqualsSamplerResponseRow( + new SamplerResponseRow( + getRawColumns().get(1), + new SamplerTestUtils.MapAllowingNullValuesBuilder() + .put("__time", 1555934400000L) + .put("dim2", StructuredData.wrap(null)) + .put("dim1", StructuredData.wrap("foo")) + .put("met1", StructuredData.wrap(parserType == ParserType.STR_JSON ? 2L : "2")) + .build(), + null, + null + ), + data.get(1) + ); + assertEqualsSamplerResponseRow( + new SamplerResponseRow( + getRawColumns().get(2), + new SamplerTestUtils.MapAllowingNullValuesBuilder() + .put("__time", 1555934460000L) + .put("dim2", StructuredData.wrap(null)) + .put("dim1", StructuredData.wrap("foo")) + .put("met1", StructuredData.wrap(parserType == ParserType.STR_JSON ? 3L : "3")) + .build(), + null, + null + ), + data.get(2) + ); + assertEqualsSamplerResponseRow( + new SamplerResponseRow( + getRawColumns().get(3), + new SamplerTestUtils.MapAllowingNullValuesBuilder() + .put("__time", 1555934400000L) + .put("dim2", StructuredData.wrap(null)) + .put("dim1", StructuredData.wrap("foo2")) + .put("met1", StructuredData.wrap(parserType == ParserType.STR_JSON ? 4L : "4")) + .build(), + null, + null + ), + data.get(3) + ); + assertEqualsSamplerResponseRow( + new SamplerResponseRow( + getRawColumns().get(4), + new SamplerTestUtils.MapAllowingNullValuesBuilder() + .put("__time", 1555934400000L) + .put("dim2", StructuredData.wrap("bar")) + .put("dim1", StructuredData.wrap("foo")) + .put("met1", StructuredData.wrap(parserType == ParserType.STR_JSON ? 5L : "5")) + .build(), + null, + null + ), + data.get(4) + ); + assertEqualsSamplerResponseRow( + new SamplerResponseRow( + getRawColumns().get(5), + null, + true, + getUnparseableTimestampString() + ), + data.get(5) + ); + } + @Test public void testWithDimensionSpec() throws IOException { @@ -509,6 +715,21 @@ public void testWithDimensionSpec() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(6, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("met1") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.STRING) + .build(), + response.getLogicalSegmentSchema() + ); List data = response.getData(); @@ -615,6 +836,22 @@ public void testWithNoRollup() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(6, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getLogicalSegmentSchema() + ); List data = response.getData(); @@ -726,6 +963,22 @@ public void testWithRollup() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(4, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getLogicalSegmentSchema() + ); List data = response.getData(); @@ -809,6 +1062,20 @@ public void testWithMoreRollup() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(3, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getLogicalSegmentSchema() + ); List data = response.getData(); @@ -880,6 +1147,22 @@ public void testWithTransformsAutoDimensions() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(4, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getLogicalSegmentSchema() + ); List data = response.getData(); @@ -969,6 +1252,20 @@ public void testWithTransformsDimensionsSpec() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(3, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1PlusBar") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1PlusBar", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getLogicalSegmentSchema() + ); List data = response.getData(); @@ -1037,6 +1334,22 @@ public void testWithFilter() throws IOException Assert.assertEquals(5, response.getNumRowsRead()); Assert.assertEquals(4, response.getNumRowsIndexed()); Assert.assertEquals(3, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getLogicalSegmentSchema() + ); List data = response.getData(); @@ -1110,7 +1423,8 @@ public void testIndexParseException() throws IOException // Map rawColumns4ParseExceptionRow = ImmutableMap.of("t", "2019-04-22T12:00", "dim1", "foo2", - "met1", "invalidNumber"); + "met1", "invalidNumber" + ); final List inputTestRows = Lists.newArrayList(getTestRows()); inputTestRows.add(ParserType.STR_CSV.equals(parserType) ? "2019-04-22T12:00,foo2,,invalidNumber" : @@ -1124,6 +1438,20 @@ public void testIndexParseException() throws IOException Assert.assertEquals(7, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(4, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1PlusBar") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1PlusBar", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getLogicalSegmentSchema() + ); List data = response.getData(); @@ -1167,8 +1495,8 @@ public void testIndexParseException() throws IOException // the last row has parse exception when indexing, check if rawColumns and exception message match the expected // String indexParseExceptioMessage = ParserType.STR_CSV.equals(parserType) - ? "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row=MapBasedInputRow{timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, dim2=null, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]" - : "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row=MapBasedInputRow{timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]"; + ? "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row=MapBasedInputRow{timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, dim2=null, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]" + : "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row=MapBasedInputRow{timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]"; assertEqualsSamplerResponseRow( new SamplerResponseRow( rawColumns4ParseExceptionRow, @@ -1181,15 +1509,13 @@ public void testIndexParseException() throws IOException } /** - * * This case tests sampling for multiple json lines in one text block * Currently only RecordSupplierInputSource supports this kind of input, see https://github.com/apache/druid/pull/10383 for more information - * + *

* This test combines illegal json block and legal json block together to verify: * 1. all lines in the illegal json block should not be parsed * 2. the illegal json block should not affect the processing of the 2nd record * 3. all lines in legal json block should be parsed successfully - * */ @Test public void testMultipleJsonStringInOneBlock() throws IOException @@ -1245,6 +1571,20 @@ public void testMultipleJsonStringInOneBlock() throws IOException Assert.assertEquals(illegalRows + legalRows, response.getNumRowsRead()); Assert.assertEquals(legalRows, response.getNumRowsIndexed()); Assert.assertEquals(illegalRows + 2, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1PlusBar") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1PlusBar", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getLogicalSegmentSchema() + ); List data = response.getData(); List> rawColumnList = this.getRawColumns(); @@ -1364,6 +1704,33 @@ public void testRowLimiting() throws IOException Assert.assertEquals(4, response.getNumRowsRead()); Assert.assertEquals(4, response.getNumRowsIndexed()); Assert.assertEquals(2, response.getData().size()); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? ImmutableList.of( + new StringDimensionSchema("dim1") + ) + : ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + + response.getLogicalDimensions() + ); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build() + : RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getLogicalSegmentSchema() + ); } @@ -1399,6 +1766,32 @@ public void testMaxBytesInMemoryLimiting() throws IOException Assert.assertEquals(4, response.getNumRowsRead()); Assert.assertEquals(4, response.getNumRowsIndexed()); Assert.assertEquals(2, response.getData().size()); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? ImmutableList.of( + new StringDimensionSchema("dim1") + ) + : ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build() + : RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getLogicalSegmentSchema() + ); } @Test @@ -1433,6 +1826,22 @@ public void testMaxClientResponseBytesLimiting() throws IOException Assert.assertEquals(4, response.getNumRowsRead()); Assert.assertEquals(4, response.getNumRowsIndexed()); Assert.assertEquals(2, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + response.getLogicalDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getLogicalSegmentSchema() + ); } private List getTestRows() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java index b95f68f1856b..446704f065a6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java @@ -23,7 +23,11 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.client.indexing.SamplerResponse; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.segment.NestedDataDimensionSchema; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.junit.Assert; import org.junit.Test; @@ -53,8 +57,21 @@ public void testSerde() throws IOException new SamplerResponse.SamplerResponseRow(ImmutableMap.of("row3", "val3"), null, true, "Could not parse") ); - String out = MAPPER.writeValueAsString(new SamplerResponse(1123, 1112, data)); - String expected = "{\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"data\":[{\"input\":{\"row1\":\"val1\"},\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"input\":{\"row2\":\"val2\"},\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"input\":{\"row3\":\"val3\"},\"unparseable\":true,\"error\":\"Could not parse\"}]}"; + String out = MAPPER.writeValueAsString( + new SamplerResponse( + 1123, + 1112, + ImmutableList.of( + new StringDimensionSchema("dim1") + ), + ImmutableList.of( + new NestedDataDimensionSchema("dim1") + ), + RowSignature.builder().addTimeColumn().add("dim1", ColumnType.STRING).add("met1", ColumnType.LONG).build(), + data + ) + ); + String expected = "{\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"logicalDimensions\":[{\"type\":\"string\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"physicalDimensions\":[{\"type\":\"json\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"logicalSegmentSchema\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"dim1\",\"type\":\"STRING\"},{\"name\":\"met1\",\"type\":\"LONG\"}],\"data\":[{\"input\":{\"row1\":\"val1\"},\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"input\":{\"row2\":\"val2\"},\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"input\":{\"row3\":\"val3\"},\"unparseable\":true,\"error\":\"Could not parse\"}]}"; Assert.assertEquals(expected, out); } diff --git a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json index bc96c730ee77..3fcce0f3ebaa 100644 --- a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json +++ b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json @@ -8,7 +8,7 @@ }, "dimensionsSpec": { "dimensions": [], - "useNestedColumnIndexerForSchemaDiscovery": %%USE_NESTED_COLUMN_INDEXER%% + "useSchemaDiscovery": %%USE_NESTED_COLUMN_INDEXER%% }, "metricsSpec": [], "granularitySpec": { diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java index c57f4728c403..bb07c669ab79 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java @@ -29,7 +29,11 @@ import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.segment.NestedDataDimensionSchema; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.TypeSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; import java.util.Objects; @@ -43,9 +47,27 @@ @JsonSubTypes.Type(name = DimensionSchema.FLOAT_TYPE_NAME, value = FloatDimensionSchema.class), @JsonSubTypes.Type(name = DimensionSchema.DOUBLE_TYPE_NAME, value = DoubleDimensionSchema.class), @JsonSubTypes.Type(name = DimensionSchema.SPATIAL_TYPE_NAME, value = NewSpatialDimensionSchema.class), + @JsonSubTypes.Type(name = NestedDataComplexTypeSerde.TYPE_NAME, value = NestedDataDimensionSchema.class) }) public abstract class DimensionSchema { + public static DimensionSchema getDefaultSchemaForBuiltInType(String name, TypeSignature type) + { + switch (type.getType()) { + case STRING: + return new StringDimensionSchema(name); + case LONG: + return new LongDimensionSchema(name); + case FLOAT: + return new FloatDimensionSchema(name); + case DOUBLE: + return new DoubleDimensionSchema(name); + default: + // the nested column indexer can handle any type + return new NestedDataDimensionSchema(name); + } + } + public static final String STRING_TYPE_NAME = "string"; public static final String LONG_TYPE_NAME = "long"; public static final String FLOAT_TYPE_NAME = "float"; diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java index 6dfaa9ad3d0a..2a73d3cb30b2 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java @@ -49,7 +49,7 @@ public class DimensionsSpec private final Map dimensionSchemaMap; private final boolean includeAllDimensions; - private final boolean useNestedColumnIndexerForSchemaDiscovery; + private final boolean useSchemaDiscovery; public static final DimensionsSpec EMPTY = new DimensionsSpec(null, null, null, false, null); @@ -89,7 +89,7 @@ private DimensionsSpec( @JsonProperty("dimensionExclusions") List dimensionExclusions, @Deprecated @JsonProperty("spatialDimensions") List spatialDimensions, @JsonProperty("includeAllDimensions") boolean includeAllDimensions, - @JsonProperty("useNestedColumnIndexerForSchemaDiscovery") Boolean useNestedColumnIndexerForSchemaDiscovery + @JsonProperty("useSchemaDiscovery") Boolean useSchemaDiscovery ) { this.dimensions = dimensions == null @@ -118,8 +118,8 @@ private DimensionsSpec( dimensionSchemaMap.put(newSchema.getName(), newSchema); } this.includeAllDimensions = includeAllDimensions; - this.useNestedColumnIndexerForSchemaDiscovery = - useNestedColumnIndexerForSchemaDiscovery != null && useNestedColumnIndexerForSchemaDiscovery; + this.useSchemaDiscovery = + useSchemaDiscovery != null && useSchemaDiscovery; } @JsonProperty @@ -141,9 +141,9 @@ public boolean isIncludeAllDimensions() } @JsonProperty - public boolean useNestedColumnIndexerForSchemaDiscovery() + public boolean useSchemaDiscovery() { - return useNestedColumnIndexerForSchemaDiscovery; + return useSchemaDiscovery; } @Deprecated @@ -204,7 +204,7 @@ public DimensionsSpec withDimensions(List dims) ImmutableList.copyOf(dimensionExclusions), null, includeAllDimensions, - useNestedColumnIndexerForSchemaDiscovery + useSchemaDiscovery ); } @@ -215,7 +215,7 @@ public DimensionsSpec withDimensionExclusions(Set dimExs) ImmutableList.copyOf(Sets.union(dimensionExclusions, dimExs)), null, includeAllDimensions, - useNestedColumnIndexerForSchemaDiscovery + useSchemaDiscovery ); } @@ -227,7 +227,7 @@ public DimensionsSpec withSpatialDimensions(List spatial ImmutableList.copyOf(dimensionExclusions), spatials, includeAllDimensions, - useNestedColumnIndexerForSchemaDiscovery + useSchemaDiscovery ); } @@ -267,7 +267,7 @@ public boolean equals(Object o) } DimensionsSpec that = (DimensionsSpec) o; return includeAllDimensions == that.includeAllDimensions - && useNestedColumnIndexerForSchemaDiscovery == that.useNestedColumnIndexerForSchemaDiscovery + && useSchemaDiscovery == that.useSchemaDiscovery && Objects.equals(dimensions, that.dimensions) && Objects.equals(dimensionExclusions, that.dimensionExclusions); } @@ -279,7 +279,7 @@ public int hashCode() dimensions, dimensionExclusions, includeAllDimensions, - useNestedColumnIndexerForSchemaDiscovery + useSchemaDiscovery ); } @@ -290,7 +290,7 @@ public String toString() "dimensions=" + dimensions + ", dimensionExclusions=" + dimensionExclusions + ", includeAllDimensions=" + includeAllDimensions + - ", useNestedColumnIndexerForSchemaDiscovery=" + useNestedColumnIndexerForSchemaDiscovery + + ", useSchemaDiscovery=" + useSchemaDiscovery + '}'; } @@ -301,7 +301,7 @@ public static final class Builder private List spatialDimensions; private boolean includeAllDimensions; - private boolean useNestedColumnIndexerForSchemaDiscovery; + private boolean useSchemaDiscovery; public Builder setDimensions(List dimensions) { @@ -334,9 +334,9 @@ public Builder setIncludeAllDimensions(boolean includeAllDimensions) return this; } - public Builder setUseNestedColumnIndexerForSchemaDiscovery(boolean useNestedColumnIndexerForSchemaDiscovery) + public Builder useSchemaDiscovery(boolean useSchemaDiscovery) { - this.useNestedColumnIndexerForSchemaDiscovery = useNestedColumnIndexerForSchemaDiscovery; + this.useSchemaDiscovery = useSchemaDiscovery; return this; } @@ -347,7 +347,7 @@ public DimensionsSpec build() dimensionExclusions, spatialDimensions, includeAllDimensions, - useNestedColumnIndexerForSchemaDiscovery + useSchemaDiscovery ); } } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java index aa5ecbb08674..02d16ffc9f5a 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java @@ -63,7 +63,7 @@ public class JsonLineReader extends TextReader flattenSpec, new JSONFlattenerMaker( keepNullColumns, - inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + inputRowSchema.getDimensionsSpec().useSchemaDiscovery() ) ); this.mapper = mapper; diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java index eaf42244cb32..a6ebb0a91136 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java @@ -84,7 +84,7 @@ public class JsonNodeReader extends IntermediateRowParsingReader flattenSpec, new JSONFlattenerMaker( keepNullColumns, - inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + inputRowSchema.getDimensionsSpec().useSchemaDiscovery() ) ); this.mapper = mapper; diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java index 1b1402f22a12..69f518aecad6 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java @@ -81,7 +81,7 @@ public class JsonReader extends IntermediateRowParsingReader flattenSpec, new JSONFlattenerMaker( keepNullColumns, - inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + inputRowSchema.getDimensionsSpec().useSchemaDiscovery() ) ); this.mapper = mapper; diff --git a/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java b/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java index 47fe401f65c2..17833d23aea3 100644 --- a/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java +++ b/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java @@ -27,7 +27,6 @@ import org.apache.druid.initialization.DruidModule; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.NestedDataDimensionHandler; -import org.apache.druid.segment.NestedDataDimensionSchema; import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; import org.apache.druid.segment.nested.StructuredData; import org.apache.druid.segment.nested.StructuredDataJsonSerializer; @@ -70,7 +69,6 @@ public static List getJacksonModulesList() return Collections.singletonList( new SimpleModule("NestedDataModule") .registerSubtypes( - new NamedType(NestedDataDimensionSchema.class, NestedDataComplexTypeSerde.TYPE_NAME), new NamedType(NestedFieldVirtualColumn.class, "nested-field") ) .addSerializer(StructuredData.class, new StructuredDataJsonSerializer()) diff --git a/processing/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java b/processing/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java index b2040d73db21..2c76c14c7b79 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java @@ -219,7 +219,7 @@ public interface FlattenerMaker JsonProvider getJsonProvider(); /** * List all "root" fields. If - * {@link org.apache.druid.data.input.impl.DimensionsSpec#useNestedColumnIndexerForSchemaDiscovery} is false, this + * {@link org.apache.druid.data.input.impl.DimensionsSpec#useSchemaDiscovery} is false, this * method should filter fields to include only fields that contain primitive and lists of primitive values */ Iterable discoverRootFields(T obj); diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index ddb52c309cb8..6148bf2e0883 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -242,7 +242,7 @@ public ColumnCapabilities getColumnCapabilities(String columnName) private final AtomicLong bytesInMemory = new AtomicLong(); private final boolean useMaxMemoryEstimates; - private final boolean useNestedColumnIndexerForSchemaDiscovery; + private final boolean useSchemaDiscovery; // This is modified on add() in a critical section. private final ThreadLocal in = new ThreadLocal<>(); @@ -288,8 +288,8 @@ protected IncrementalIndex( this.deserializeComplexMetrics = deserializeComplexMetrics; this.preserveExistingMetrics = preserveExistingMetrics; this.useMaxMemoryEstimates = useMaxMemoryEstimates; - this.useNestedColumnIndexerForSchemaDiscovery = incrementalIndexSchema.getDimensionsSpec() - .useNestedColumnIndexerForSchemaDiscovery(); + this.useSchemaDiscovery = incrementalIndexSchema.getDimensionsSpec() + .useSchemaDiscovery(); this.timeAndMetricsColumnCapabilities = new HashMap<>(); this.metricDescs = Maps.newLinkedHashMap(); @@ -591,7 +591,7 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) } else { wasNewDim = true; final DimensionHandler handler; - if (useNestedColumnIndexerForSchemaDiscovery) { + if (useSchemaDiscovery) { handler = DimensionHandlerUtils.getHandlerFromCapabilities( dimension, makeDefaultCapabilitiesFromValueType(NestedDataComplexTypeSerde.TYPE), diff --git a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java index e77cffbc30e3..d489bb5c1cdb 100644 --- a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java @@ -546,7 +546,7 @@ private static IncrementalIndex makeIncrementalIndex(long minTimestamp) new TimestampSpec(TIME_COL, "millis", null), Granularities.NONE, VirtualColumns.EMPTY, - DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(), + DimensionsSpec.builder().useSchemaDiscovery(true).build(), new AggregatorFactory[0], false ) diff --git a/processing/src/test/resources/types-test-data-parser.json b/processing/src/test/resources/types-test-data-parser.json index c148d6ef2d92..5aee72a48b21 100644 --- a/processing/src/test/resources/types-test-data-parser.json +++ b/processing/src/test/resources/types-test-data-parser.json @@ -10,7 +10,7 @@ "dimensions": [], "dimensionExclusions": [], "spatialDimensions": [], - "useNestedColumnIndexerForSchemaDiscovery": true + "useSchemaDiscovery": true } } } diff --git a/server/src/main/java/org/apache/druid/client/indexing/SamplerResponse.java b/server/src/main/java/org/apache/druid/client/indexing/SamplerResponse.java index a7b94043dd28..dc12fe5e222b 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/SamplerResponse.java +++ b/server/src/main/java/org/apache/druid/client/indexing/SamplerResponse.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.segment.column.RowSignature; import java.util.List; import java.util.Map; @@ -32,17 +34,27 @@ public class SamplerResponse { private final int numRowsRead; private final int numRowsIndexed; + + private final List logicalDimensions; + private final List physicalDimensions; + private final RowSignature logicalSegmentSchema; private final List data; @JsonCreator public SamplerResponse( @JsonProperty("numRowsRead") int numRowsRead, @JsonProperty("numRowsIndexed") int numRowsIndexed, + @JsonProperty("logicalDimensions") List logicalDimensions, + @JsonProperty("physicalDimensions") List physicalDimensions, + @JsonProperty("logicalSegmentSchema") RowSignature logicalSegmentSchema, @JsonProperty("data") List data ) { this.numRowsRead = numRowsRead; this.numRowsIndexed = numRowsIndexed; + this.logicalDimensions = logicalDimensions; + this.physicalDimensions = physicalDimensions; + this.logicalSegmentSchema = logicalSegmentSchema; this.data = data; } @@ -58,6 +70,24 @@ public int getNumRowsIndexed() return numRowsIndexed; } + @JsonProperty + public List getLogicalDimensions() + { + return logicalDimensions; + } + + @JsonProperty + public List getPhysicalDimensions() + { + return physicalDimensions; + } + + @JsonProperty + public RowSignature getLogicalSegmentSchema() + { + return logicalSegmentSchema; + } + @JsonProperty public List getData() { @@ -76,13 +106,36 @@ public boolean equals(Object o) SamplerResponse that = (SamplerResponse) o; return getNumRowsRead() == that.getNumRowsRead() && getNumRowsIndexed() == that.getNumRowsIndexed() && - Objects.equals(getData(), that.getData()); + Objects.equals(logicalDimensions, that.logicalDimensions) && + Objects.equals(physicalDimensions, that.physicalDimensions) && + Objects.equals(logicalSegmentSchema, that.logicalSegmentSchema) && + Objects.equals(data, that.data); } @Override public int hashCode() { - return Objects.hash(getNumRowsRead(), getNumRowsIndexed(), getData()); + return Objects.hash( + getNumRowsRead(), + getNumRowsIndexed(), + logicalDimensions, + physicalDimensions, + logicalSegmentSchema, + data + ); + } + + @Override + public String toString() + { + return "SamplerResponse{" + + "numRowsRead=" + numRowsRead + + ", numRowsIndexed=" + numRowsIndexed + + ", logicalDimensions=" + logicalDimensions + + ", physicalDimensions=" + physicalDimensions + + ", logicalSegmentSchema=" + logicalSegmentSchema + + ", data=" + data + + '}'; } @JsonInclude(JsonInclude.Include.NON_NULL) diff --git a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java index 19fa4df0b5e0..f669b4a8d872 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; @@ -29,6 +30,9 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.segment.NestedDataDimensionSchema; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.Capture; @@ -78,6 +82,15 @@ public void testSample() throws Exception final SamplerResponse samplerResponse = new SamplerResponse( 2, 2, + ImmutableList.of( + new StringDimensionSchema("x"), + new StringDimensionSchema("y") + ), + ImmutableList.of( + new NestedDataDimensionSchema("x"), + new NestedDataDimensionSchema("y") + ), + RowSignature.builder().addTimeColumn().add("x", ColumnType.STRING).add("y", ColumnType.STRING).build(), ImmutableList.of( new SamplerResponse.SamplerResponseRow( ImmutableMap.of("time", "2020-01-01", "x", "123", "y", "456"), @@ -132,6 +145,15 @@ public void testSampleError() throws Exception final SamplerResponse samplerResponse = new SamplerResponse( 2, 2, + ImmutableList.of( + new StringDimensionSchema("x"), + new StringDimensionSchema("y") + ), + ImmutableList.of( + new NestedDataDimensionSchema("x"), + new NestedDataDimensionSchema("y") + ), + RowSignature.builder().addTimeColumn().add("x", ColumnType.STRING).add("y", ColumnType.STRING).build(), ImmutableList.of( new SamplerResponse.SamplerResponseRow( ImmutableMap.of("time", "2020-01-01", "x", "123", "y", "456"),