From 3d327bbda175311f2d0ccc36a502e2d0ad330300 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 25 Jan 2023 20:06:07 -0800 Subject: [PATCH 1/5] sampler + type detection = bff --- .../overlord/sampler/InputSourceSampler.java | 59 +++ .../sampler/CsvInputSourceSamplerTest.java | 25 +- .../sampler/IndexTaskSamplerSpecTest.java | 4 +- .../sampler/InputSourceSamplerTest.java | 422 +++++++++++++++++- .../overlord/sampler/SamplerResponseTest.java | 17 +- .../client/indexing/SamplerResponse.java | 26 +- .../HttpIndexingServiceClientTest.java | 13 + 7 files changed, 554 insertions(+), 12 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java index c91d8434c21b..bc4b96f81b11 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java @@ -31,7 +31,12 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.Row; +import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.DoubleDimensionSchema; +import org.apache.druid.data.input.impl.FloatDimensionSchema; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimedShutoffInputSourceReader; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.guice.annotations.Json; @@ -43,7 +48,12 @@ import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongMinAggregatorFactory; +import org.apache.druid.segment.NestedDataDimensionSchema; import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.TypeSignature; +import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexAddResult; import org.apache.druid.segment.incremental.IncrementalIndexSchema; @@ -232,9 +242,39 @@ public SamplerResponse sample( int numRowsRead = responseRows.size(); + List dimensionSchemas = new ArrayList<>(); + + RowSignature.Builder signatureBuilder = RowSignature.builder(); + signatureBuilder.add( + ColumnHolder.TIME_COLUMN_NAME, + index.getColumnCapabilities(ColumnHolder.TIME_COLUMN_NAME).toColumnType() + ); + for (IncrementalIndex.DimensionDesc dimensionDesc : index.getDimensions()) { + if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionDesc.getName())) { + final ColumnType columnType = dimensionDesc.getCapabilities().toColumnType(); + signatureBuilder.add(dimensionDesc.getName(), columnType); + dimensionSchemas.add( + getDefaultSchemaForBuiltInType( + dimensionDesc.getName(), + dimensionDesc.getCapabilities() + ) + ); + } + } + for (AggregatorFactory aggregatorFactory : index.getMetricAggs()) { + if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(aggregatorFactory.getName())) { + signatureBuilder.add( + aggregatorFactory.getName(), + index.getColumnCapabilities(aggregatorFactory.getName()).toColumnType() + ); + } + } + return new SamplerResponse( numRowsRead, numRowsIndexed, + dimensionSchemas, + signatureBuilder.build(), responseRows.stream() .filter(Objects::nonNull) .filter(x -> x.getParsed() != null || x.isUnparseable() != null) @@ -280,4 +320,23 @@ private IncrementalIndex buildIncrementalIndex(SamplerConfig samplerConfig, Data .setMaxRowCount(samplerConfig.getNumRows()) .build(); } + + private static DimensionSchema getDefaultSchemaForBuiltInType(String name, TypeSignature type) + { + // this is a temporary home for this method, a future refactor should place this in a more centralized place + // for getting the default dimension schema for a given column type, or more approriately, get the default column + // "format" for a given type and with that "format" create a default dimension schema + switch (type.getType()) { + case STRING: + return new StringDimensionSchema(name); + case LONG: + return new LongDimensionSchema(name); + case FLOAT: + return new FloatDimensionSchema(name); + case DOUBLE: + return new DoubleDimensionSchema(name); + default: + return new NestedDataDimensionSchema(name); + } + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java index 464579764d11..e80e38f7d48e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java @@ -27,16 +27,20 @@ import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InlineInputSource; +import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; import org.junit.Test; import java.util.List; -public class CsvInputSourceSamplerTest +public class CsvInputSourceSamplerTest extends InitializedNullHandlingTest { @Test public void testCSVColumnAllNull() @@ -73,6 +77,25 @@ public void testCSVColumnAllNull() Assert.assertEquals(4, response.getNumRowsRead()); Assert.assertEquals(4, response.getNumRowsIndexed()); Assert.assertEquals(4, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("FirstName"), + new StringDimensionSchema("LastName"), + new StringDimensionSchema("Number"), + new StringDimensionSchema("Gender") + ), + response.getDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("FirstName", ColumnType.STRING) + .add("LastName", ColumnType.STRING) + .add("Number", ColumnType.STRING) + .add("Gender", ColumnType.STRING) + .build(), + response.getSegmentSchema() + ); List data = response.getData(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java index b305cd989ede..baf5e4a663d8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import org.apache.druid.client.indexing.SamplerResponse; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputSource; @@ -29,6 +30,7 @@ import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.guice.FirehoseModule; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.indexing.DataSchema; import org.easymock.Capture; import org.easymock.EasyMock; @@ -104,7 +106,7 @@ public void testSerde() throws IOException EasyMock.capture(capturedInputFormat), EasyMock.capture(capturedDataSchema), EasyMock.capture(capturedSamplerConfig) - )).andReturn(new SamplerResponse(0, 0, null)); + )).andReturn(new SamplerResponse(0, 0, ImmutableList.of(), RowSignature.empty(), null)); replayAll(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index cbd58d1adfbb..fc5a830c5945 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -39,9 +39,11 @@ import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.JSONParseSpec; import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.LongDimensionSchema; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.guice.NestedDataModule; import org.apache.druid.indexing.seekablestream.RecordSupplierInputSource; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; @@ -58,9 +60,12 @@ import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.nested.StructuredData; import org.apache.druid.segment.realtime.firehose.InlineFirehoseFactory; import org.apache.druid.segment.transform.ExpressionTransform; import org.apache.druid.segment.transform.TransformSpec; @@ -127,7 +132,9 @@ private enum ParserType @Parameterized.Parameters(name = "parserType = {0}, useInputFormatApi={1}") public static Iterable constructorFeeder() { + NestedDataModule.registerHandlersAndSerde(); OBJECT_MAPPER.registerModules(new SamplerModule().getJacksonModules()); + OBJECT_MAPPER.registerModules(NestedDataModule.getJacksonModulesList()); return ImmutableList.of( new Object[]{ParserType.STR_JSON, false}, new Object[]{ParserType.STR_JSON, true}, @@ -181,6 +188,8 @@ public void testNoDataSchema() Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(0, response.getNumRowsIndexed()); Assert.assertEquals(6, response.getData().size()); + Assert.assertEquals(ImmutableList.of(), response.getDimensions()); + Assert.assertEquals(RowSignature.builder().addTimeColumn().build(), response.getSegmentSchema()); List data = response.getData(); @@ -254,6 +263,8 @@ public void testNoDataSchemaNumRows() Assert.assertEquals(3, response.getNumRowsRead()); Assert.assertEquals(0, response.getNumRowsIndexed()); Assert.assertEquals(3, response.getData().size()); + Assert.assertEquals(ImmutableList.of(), response.getDimensions()); + Assert.assertEquals(RowSignature.builder().addTimeColumn().build(), response.getSegmentSchema()); List data = response.getData(); @@ -300,6 +311,41 @@ public void testMissingValueTimestampSpec() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(6, response.getNumRowsIndexed()); Assert.assertEquals(6, response.getData().size()); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? ImmutableList.of( + new StringDimensionSchema("t"), + new StringDimensionSchema("dim1"), + new StringDimensionSchema("met1"), + new StringDimensionSchema("dim2") + ) + : ImmutableList.of( + new StringDimensionSchema("t"), + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2"), + new StringDimensionSchema("met1") + ), + response.getDimensions() + ); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? RowSignature.builder() + .addTimeColumn() + .add("t", ColumnType.STRING) + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .build() + : RowSignature.builder() + .addTimeColumn() + .add("t", ColumnType.STRING) + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.STRING) + .build() + , + response.getSegmentSchema() + ); List data = response.getData(); @@ -409,6 +455,36 @@ public void testWithTimestampSpec() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(6, response.getData().size()); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("met1"), + new StringDimensionSchema("dim2") + ) + : ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2"), + new StringDimensionSchema("met1") + ), + response.getDimensions() + ); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .build() + : RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.STRING) + .build(), + response.getSegmentSchema() + ); List data = response.getData(); @@ -493,6 +569,137 @@ public void testWithTimestampSpec() throws IOException ); } + @Test + public void testWithTimestampSpecNestedDiscovery() throws IOException + { + + final TimestampSpec timestampSpec = new TimestampSpec("t", null, null); + final DimensionsSpec dimensionsSpec = DimensionsSpec.builder() + .setUseNestedColumnIndexerForSchemaDiscovery(true) + .build(); + final DataSchema dataSchema = createDataSchema(timestampSpec, dimensionsSpec, null, null, null); + final InputSource inputSource = createInputSource(getTestRows(), dataSchema); + final InputFormat inputFormat = createInputFormat(); + + SamplerResponse response = inputSourceSampler.sample(inputSource, inputFormat, dataSchema, null); + + Assert.assertEquals(6, response.getNumRowsRead()); + Assert.assertEquals(5, response.getNumRowsIndexed()); + Assert.assertEquals(6, response.getData().size()); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? ImmutableList.of( + new StringDimensionSchema("dim1"), + new LongDimensionSchema("met1"), + new StringDimensionSchema("dim2") + ) + : ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2"), + new StringDimensionSchema("met1") + ), + response.getDimensions() + ); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .add("dim2", ColumnType.STRING) + .build() + : RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.STRING) + .build(), + response.getSegmentSchema() + ); + + List data = response.getData(); + + assertEqualsSamplerResponseRow( + new SamplerResponseRow( + getRawColumns().get(0), + new SamplerTestUtils.MapAllowingNullValuesBuilder() + .put("__time", 1555934400000L) + .put("dim2", StructuredData.wrap(null)) + .put("dim1", StructuredData.wrap("foo")) + .put("met1", StructuredData.wrap(parserType == ParserType.STR_JSON ? 1L : "1")) + .build(), + null, + null + ), + data.get(0) + ); + assertEqualsSamplerResponseRow( + new SamplerResponseRow( + getRawColumns().get(1), + new SamplerTestUtils.MapAllowingNullValuesBuilder() + .put("__time", 1555934400000L) + .put("dim2", StructuredData.wrap(null)) + .put("dim1", StructuredData.wrap("foo")) + .put("met1", StructuredData.wrap(parserType == ParserType.STR_JSON ? 2L : "2")) + .build(), + null, + null + ), + data.get(1) + ); + assertEqualsSamplerResponseRow( + new SamplerResponseRow( + getRawColumns().get(2), + new SamplerTestUtils.MapAllowingNullValuesBuilder() + .put("__time", 1555934460000L) + .put("dim2", StructuredData.wrap(null)) + .put("dim1", StructuredData.wrap("foo")) + .put("met1", StructuredData.wrap(parserType == ParserType.STR_JSON ? 3L : "3")) + .build(), + null, + null + ), + data.get(2) + ); + assertEqualsSamplerResponseRow( + new SamplerResponseRow( + getRawColumns().get(3), + new SamplerTestUtils.MapAllowingNullValuesBuilder() + .put("__time", 1555934400000L) + .put("dim2", StructuredData.wrap(null)) + .put("dim1", StructuredData.wrap("foo2")) + .put("met1", StructuredData.wrap(parserType == ParserType.STR_JSON ? 4L : "4")) + .build(), + null, + null + ), + data.get(3) + ); + assertEqualsSamplerResponseRow( + new SamplerResponseRow( + getRawColumns().get(4), + new SamplerTestUtils.MapAllowingNullValuesBuilder() + .put("__time", 1555934400000L) + .put("dim2", StructuredData.wrap("bar")) + .put("dim1", StructuredData.wrap("foo")) + .put("met1", StructuredData.wrap(parserType == ParserType.STR_JSON ? 5L : "5")) + .build(), + null, + null + ), + data.get(4) + ); + assertEqualsSamplerResponseRow( + new SamplerResponseRow( + getRawColumns().get(5), + null, + true, + getUnparseableTimestampString() + ), + data.get(5) + ); + } + @Test public void testWithDimensionSpec() throws IOException { @@ -509,6 +716,21 @@ public void testWithDimensionSpec() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(6, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("met1") + ), + response.getDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.STRING) + .build(), + response.getSegmentSchema() + ); List data = response.getData(); @@ -615,6 +837,22 @@ public void testWithNoRollup() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(6, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + response.getDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getSegmentSchema() + ); List data = response.getData(); @@ -726,6 +964,22 @@ public void testWithRollup() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(4, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + response.getDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getSegmentSchema() + ); List data = response.getData(); @@ -809,6 +1063,20 @@ public void testWithMoreRollup() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(3, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1") + ), + response.getDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getSegmentSchema() + ); List data = response.getData(); @@ -880,6 +1148,22 @@ public void testWithTransformsAutoDimensions() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(4, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + response.getDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getSegmentSchema() + ); List data = response.getData(); @@ -969,6 +1253,20 @@ public void testWithTransformsDimensionsSpec() throws IOException Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(3, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1PlusBar") + ), + response.getDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1PlusBar", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getSegmentSchema() + ); List data = response.getData(); @@ -1037,6 +1335,22 @@ public void testWithFilter() throws IOException Assert.assertEquals(5, response.getNumRowsRead()); Assert.assertEquals(4, response.getNumRowsIndexed()); Assert.assertEquals(3, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + response.getDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getSegmentSchema() + ); List data = response.getData(); @@ -1110,7 +1424,8 @@ public void testIndexParseException() throws IOException // Map rawColumns4ParseExceptionRow = ImmutableMap.of("t", "2019-04-22T12:00", "dim1", "foo2", - "met1", "invalidNumber"); + "met1", "invalidNumber" + ); final List inputTestRows = Lists.newArrayList(getTestRows()); inputTestRows.add(ParserType.STR_CSV.equals(parserType) ? "2019-04-22T12:00,foo2,,invalidNumber" : @@ -1124,6 +1439,20 @@ public void testIndexParseException() throws IOException Assert.assertEquals(7, response.getNumRowsRead()); Assert.assertEquals(5, response.getNumRowsIndexed()); Assert.assertEquals(4, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1PlusBar") + ), + response.getDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1PlusBar", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getSegmentSchema() + ); List data = response.getData(); @@ -1167,8 +1496,8 @@ public void testIndexParseException() throws IOException // the last row has parse exception when indexing, check if rawColumns and exception message match the expected // String indexParseExceptioMessage = ParserType.STR_CSV.equals(parserType) - ? "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row=MapBasedInputRow{timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, dim2=null, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]" - : "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row=MapBasedInputRow{timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]"; + ? "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row=MapBasedInputRow{timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, dim2=null, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]" + : "Found unparseable columns in row: [SamplerInputRow{row=TransformedInputRow{row=MapBasedInputRow{timestamp=2019-04-22T12:00:00.000Z, event={t=2019-04-22T12:00, dim1=foo2, met1=invalidNumber}, dimensions=[dim1PlusBar]}}}], exceptions: [Unable to parse value[invalidNumber] for field[met1]]"; assertEqualsSamplerResponseRow( new SamplerResponseRow( rawColumns4ParseExceptionRow, @@ -1181,15 +1510,13 @@ public void testIndexParseException() throws IOException } /** - * * This case tests sampling for multiple json lines in one text block * Currently only RecordSupplierInputSource supports this kind of input, see https://github.com/apache/druid/pull/10383 for more information - * + *

* This test combines illegal json block and legal json block together to verify: * 1. all lines in the illegal json block should not be parsed * 2. the illegal json block should not affect the processing of the 2nd record * 3. all lines in legal json block should be parsed successfully - * */ @Test public void testMultipleJsonStringInOneBlock() throws IOException @@ -1245,6 +1572,20 @@ public void testMultipleJsonStringInOneBlock() throws IOException Assert.assertEquals(illegalRows + legalRows, response.getNumRowsRead()); Assert.assertEquals(legalRows, response.getNumRowsIndexed()); Assert.assertEquals(illegalRows + 2, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1PlusBar") + ), + response.getDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1PlusBar", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getSegmentSchema() + ); List data = response.getData(); List> rawColumnList = this.getRawColumns(); @@ -1364,6 +1705,33 @@ public void testRowLimiting() throws IOException Assert.assertEquals(4, response.getNumRowsRead()); Assert.assertEquals(4, response.getNumRowsIndexed()); Assert.assertEquals(2, response.getData().size()); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? ImmutableList.of( + new StringDimensionSchema("dim1") + ) + : ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + + response.getDimensions() + ); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build() + : RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getSegmentSchema() + ); } @@ -1399,6 +1767,32 @@ public void testMaxBytesInMemoryLimiting() throws IOException Assert.assertEquals(4, response.getNumRowsRead()); Assert.assertEquals(4, response.getNumRowsIndexed()); Assert.assertEquals(2, response.getData().size()); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? ImmutableList.of( + new StringDimensionSchema("dim1") + ) + : ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + response.getDimensions() + ); + Assert.assertEquals( + parserType == ParserType.STR_JSON + ? RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build() + : RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getSegmentSchema() + ); } @Test @@ -1433,6 +1827,22 @@ public void testMaxClientResponseBytesLimiting() throws IOException Assert.assertEquals(4, response.getNumRowsRead()); Assert.assertEquals(4, response.getNumRowsIndexed()); Assert.assertEquals(2, response.getData().size()); + Assert.assertEquals( + ImmutableList.of( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim2") + ), + response.getDimensions() + ); + Assert.assertEquals( + RowSignature.builder() + .addTimeColumn() + .add("dim1", ColumnType.STRING) + .add("dim2", ColumnType.STRING) + .add("met1", ColumnType.LONG) + .build(), + response.getSegmentSchema() + ); } private List getTestRows() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java index b95f68f1856b..43cf46fa90ce 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java @@ -23,7 +23,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.client.indexing.SamplerResponse; +import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.junit.Assert; import org.junit.Test; @@ -53,8 +56,18 @@ public void testSerde() throws IOException new SamplerResponse.SamplerResponseRow(ImmutableMap.of("row3", "val3"), null, true, "Could not parse") ); - String out = MAPPER.writeValueAsString(new SamplerResponse(1123, 1112, data)); - String expected = "{\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"data\":[{\"input\":{\"row1\":\"val1\"},\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"input\":{\"row2\":\"val2\"},\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"input\":{\"row3\":\"val3\"},\"unparseable\":true,\"error\":\"Could not parse\"}]}"; + String out = MAPPER.writeValueAsString( + new SamplerResponse( + 1123, + 1112, + ImmutableList.of( + new StringDimensionSchema("dim1") + ), + RowSignature.builder().addTimeColumn().add("dim1", ColumnType.STRING).add("met1", ColumnType.LONG).build(), + data + ) + ); + String expected = "{\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"dimensions\":[{\"type\":\"string\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"segmentSchema\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"dim1\",\"type\":\"STRING\"},{\"name\":\"met1\",\"type\":\"LONG\"}],\"data\":[{\"input\":{\"row1\":\"val1\"},\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"input\":{\"row2\":\"val2\"},\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"input\":{\"row3\":\"val3\"},\"unparseable\":true,\"error\":\"Could not parse\"}]}"; Assert.assertEquals(expected, out); } diff --git a/server/src/main/java/org/apache/druid/client/indexing/SamplerResponse.java b/server/src/main/java/org/apache/druid/client/indexing/SamplerResponse.java index a7b94043dd28..cc6f5c56e309 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/SamplerResponse.java +++ b/server/src/main/java/org/apache/druid/client/indexing/SamplerResponse.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.segment.column.RowSignature; import java.util.List; import java.util.Map; @@ -32,17 +34,24 @@ public class SamplerResponse { private final int numRowsRead; private final int numRowsIndexed; + + private final List dimensions; + private final RowSignature segmentSchema; private final List data; @JsonCreator public SamplerResponse( @JsonProperty("numRowsRead") int numRowsRead, @JsonProperty("numRowsIndexed") int numRowsIndexed, + @JsonProperty("dimensions") List dimensions, + @JsonProperty("segmentSchema") RowSignature segmentSchema, @JsonProperty("data") List data ) { this.numRowsRead = numRowsRead; this.numRowsIndexed = numRowsIndexed; + this.dimensions = dimensions; + this.segmentSchema = segmentSchema; this.data = data; } @@ -58,6 +67,18 @@ public int getNumRowsIndexed() return numRowsIndexed; } + @JsonProperty + public List getDimensions() + { + return dimensions; + } + + @JsonProperty + public RowSignature getSegmentSchema() + { + return segmentSchema; + } + @JsonProperty public List getData() { @@ -76,13 +97,14 @@ public boolean equals(Object o) SamplerResponse that = (SamplerResponse) o; return getNumRowsRead() == that.getNumRowsRead() && getNumRowsIndexed() == that.getNumRowsIndexed() && - Objects.equals(getData(), that.getData()); + Objects.equals(segmentSchema, that.segmentSchema) && + Objects.equals(data, that.data); } @Override public int hashCode() { - return Objects.hash(getNumRowsRead(), getNumRowsIndexed(), getData()); + return Objects.hash(getNumRowsRead(), getNumRowsIndexed(), segmentSchema, data); } @JsonInclude(JsonInclude.Include.NON_NULL) diff --git a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java index 19fa4df0b5e0..0b502f9483a1 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; @@ -29,6 +30,8 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; import org.easymock.Capture; @@ -78,6 +81,11 @@ public void testSample() throws Exception final SamplerResponse samplerResponse = new SamplerResponse( 2, 2, + ImmutableList.of( + new StringDimensionSchema("x"), + new StringDimensionSchema("y") + ), + RowSignature.builder().addTimeColumn().add("x", ColumnType.STRING).add("y", ColumnType.STRING).build(), ImmutableList.of( new SamplerResponse.SamplerResponseRow( ImmutableMap.of("time", "2020-01-01", "x", "123", "y", "456"), @@ -132,6 +140,11 @@ public void testSampleError() throws Exception final SamplerResponse samplerResponse = new SamplerResponse( 2, 2, + ImmutableList.of( + new StringDimensionSchema("x"), + new StringDimensionSchema("y") + ), + RowSignature.builder().addTimeColumn().add("x", ColumnType.STRING).add("y", ColumnType.STRING).build(), ImmutableList.of( new SamplerResponse.SamplerResponseRow( ImmutableMap.of("time", "2020-01-01", "x", "123", "y", "456"), From 2eaf2c31c6e8318957cddeb8246267f3d4a81141 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 25 Jan 2023 22:50:20 -0800 Subject: [PATCH 2/5] fix style --- .../indexing/overlord/sampler/InputSourceSamplerTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index fc5a830c5945..dca8c4fcea26 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -342,8 +342,7 @@ public void testMissingValueTimestampSpec() throws IOException .add("dim1", ColumnType.STRING) .add("dim2", ColumnType.STRING) .add("met1", ColumnType.STRING) - .build() - , + .build(), response.getSegmentSchema() ); From ce489bf446276a511b2dd20a013ba0278dd691e4 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Sat, 18 Feb 2023 19:46:27 -0800 Subject: [PATCH 3/5] split logical and physical dimensions, tidy up --- .../druid/data/input/avro/AvroOCFReader.java | 2 +- .../druid/data/input/avro/AvroParsers.java | 2 +- .../data/input/avro/AvroStreamReader.java | 2 +- .../input/orc/OrcHadoopInputRowParser.java | 2 +- .../druid/data/input/orc/OrcReader.java | 2 +- .../data/input/parquet/ParquetReader.java | 2 +- .../avro/ParquetAvroHadoopInputRowParser.java | 2 +- .../simple/ParquetHadoopInputRowParser.java | 2 +- .../data/input/protobuf/ProtobufReader.java | 2 +- .../overlord/sampler/InputSourceSampler.java | 42 +++--------- .../sampler/CsvInputSourceSamplerTest.java | 4 +- .../sampler/IndexTaskSamplerSpecTest.java | 2 +- .../sampler/InputSourceSamplerTest.java | 68 +++++++++---------- .../overlord/sampler/SamplerResponseTest.java | 6 +- .../data/input/impl/DimensionSchema.java | 22 ++++++ .../druid/data/input/impl/DimensionsSpec.java | 24 +++---- .../druid/data/input/impl/JsonLineReader.java | 2 +- .../druid/data/input/impl/JsonNodeReader.java | 2 +- .../druid/data/input/impl/JsonReader.java | 2 +- .../apache/druid/guice/NestedDataModule.java | 2 - .../util/common/parsers/ObjectFlatteners.java | 2 +- .../segment/incremental/IncrementalIndex.java | 8 +-- .../client/indexing/SamplerResponse.java | 55 +++++++++++---- .../HttpIndexingServiceClientTest.java | 9 +++ 24 files changed, 154 insertions(+), 114 deletions(-) diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java index 3039fbb7c843..012e2e801a0d 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java @@ -70,7 +70,7 @@ public class AvroOCFReader extends IntermediateRowParsingReader false, binaryAsString, extractUnionsByType, - inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + inputRowSchema.getDimensionsSpec().useSchemaDiscovery() ) ); } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java index 12f8a20bab7b..283286b1bbb7 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java @@ -59,7 +59,7 @@ public static ObjectFlattener makeFlattener( fromPigAvroStorage, binaryAsString, extractUnionsByType, - dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery() + dimensionsSpec != null && dimensionsSpec.useSchemaDiscovery() ) ); } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java index 51bc733d4305..c63e83b065a3 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java @@ -66,7 +66,7 @@ public class AvroStreamReader extends IntermediateRowParsingReader flattenSpec, new OrcStructFlattenerMaker( binaryAsString, - inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + inputRowSchema.getDimensionsSpec().useSchemaDiscovery() ) ); } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java index 494f66251633..0bc5f374a0cf 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java @@ -70,7 +70,7 @@ public class ParquetReader extends IntermediateRowParsingReader flattenSpec, new ParquetGroupFlattenerMaker( binaryAsString, - inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + inputRowSchema.getDimensionsSpec().useSchemaDiscovery() ) ); } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java index 5c2dd2844f19..1bef1e9e9129 100755 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java @@ -85,7 +85,7 @@ public ParquetAvroHadoopInputRowParser( false, this.binaryAsString, this.extractUnionsByType, - dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery() + dimensionsSpec != null && dimensionsSpec.useSchemaDiscovery() ) ); } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java index f3e853968511..41db6efe2009 100644 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/simple/ParquetHadoopInputRowParser.java @@ -60,7 +60,7 @@ public ParquetHadoopInputRowParser( flattenSpec, new ParquetGroupFlattenerMaker( this.binaryAsString, - dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery() + dimensionsSpec != null && dimensionsSpec.useSchemaDiscovery() ) ); this.parser = new MapInputRowParser(parseSpec); diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java index ac77a545dd7b..ecb2fbc7653c 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java @@ -59,7 +59,7 @@ public class ProtobufReader extends IntermediateRowParsingReader this.inputRowSchema = inputRowSchema; this.recordFlattener = ObjectFlatteners.create( flattenSpec, - new ProtobufFlattenerMaker(inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()) + new ProtobufFlattenerMaker(inputRowSchema.getDimensionsSpec().useSchemaDiscovery()) ); this.source = source; this.protobufBytesDecoder = protobufBytesDecoder; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java index bc4b96f81b11..be3b31be3b70 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java @@ -33,10 +33,6 @@ import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.data.input.impl.DoubleDimensionSchema; -import org.apache.druid.data.input.impl.FloatDimensionSchema; -import org.apache.druid.data.input.impl.LongDimensionSchema; -import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimedShutoffInputSourceReader; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.guice.annotations.Json; @@ -48,12 +44,9 @@ import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongMinAggregatorFactory; -import org.apache.druid.segment.NestedDataDimensionSchema; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.segment.column.TypeSignature; -import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexAddResult; import org.apache.druid.segment.incremental.IncrementalIndexSchema; @@ -242,7 +235,8 @@ public SamplerResponse sample( int numRowsRead = responseRows.size(); - List dimensionSchemas = new ArrayList<>(); + List logicalDimensionSchemas = new ArrayList<>(); + List physicalDimensionSchemas = new ArrayList<>(); RowSignature.Builder signatureBuilder = RowSignature.builder(); signatureBuilder.add( @@ -253,11 +247,11 @@ public SamplerResponse sample( if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionDesc.getName())) { final ColumnType columnType = dimensionDesc.getCapabilities().toColumnType(); signatureBuilder.add(dimensionDesc.getName(), columnType); - dimensionSchemas.add( - getDefaultSchemaForBuiltInType( - dimensionDesc.getName(), - dimensionDesc.getCapabilities() - ) + logicalDimensionSchemas.add( + DimensionSchema.getDefaultSchemaForBuiltInType(dimensionDesc.getName(), dimensionDesc.getCapabilities()) + ); + physicalDimensionSchemas.add( + dimensionDesc.getHandler().getDimensionSchema(dimensionDesc.getCapabilities()) ); } } @@ -273,7 +267,8 @@ public SamplerResponse sample( return new SamplerResponse( numRowsRead, numRowsIndexed, - dimensionSchemas, + logicalDimensionSchemas, + physicalDimensionSchemas, signatureBuilder.build(), responseRows.stream() .filter(Objects::nonNull) @@ -320,23 +315,4 @@ private IncrementalIndex buildIncrementalIndex(SamplerConfig samplerConfig, Data .setMaxRowCount(samplerConfig.getNumRows()) .build(); } - - private static DimensionSchema getDefaultSchemaForBuiltInType(String name, TypeSignature type) - { - // this is a temporary home for this method, a future refactor should place this in a more centralized place - // for getting the default dimension schema for a given column type, or more approriately, get the default column - // "format" for a given type and with that "format" create a default dimension schema - switch (type.getType()) { - case STRING: - return new StringDimensionSchema(name); - case LONG: - return new LongDimensionSchema(name); - case FLOAT: - return new FloatDimensionSchema(name); - case DOUBLE: - return new DoubleDimensionSchema(name); - default: - return new NestedDataDimensionSchema(name); - } - } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java index e80e38f7d48e..e788545507cd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java @@ -84,7 +84,7 @@ public void testCSVColumnAllNull() new StringDimensionSchema("Number"), new StringDimensionSchema("Gender") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( RowSignature.builder() @@ -94,7 +94,7 @@ public void testCSVColumnAllNull() .add("Number", ColumnType.STRING) .add("Gender", ColumnType.STRING) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); List data = response.getData(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java index baf5e4a663d8..bc92ff298b49 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/IndexTaskSamplerSpecTest.java @@ -106,7 +106,7 @@ public void testSerde() throws IOException EasyMock.capture(capturedInputFormat), EasyMock.capture(capturedDataSchema), EasyMock.capture(capturedSamplerConfig) - )).andReturn(new SamplerResponse(0, 0, ImmutableList.of(), RowSignature.empty(), null)); + )).andReturn(new SamplerResponse(0, 0, ImmutableList.of(), ImmutableList.of(), RowSignature.empty(), null)); replayAll(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index dca8c4fcea26..92078acc1a89 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -188,8 +188,8 @@ public void testNoDataSchema() Assert.assertEquals(6, response.getNumRowsRead()); Assert.assertEquals(0, response.getNumRowsIndexed()); Assert.assertEquals(6, response.getData().size()); - Assert.assertEquals(ImmutableList.of(), response.getDimensions()); - Assert.assertEquals(RowSignature.builder().addTimeColumn().build(), response.getSegmentSchema()); + Assert.assertEquals(ImmutableList.of(), response.getLogicalDimensions()); + Assert.assertEquals(RowSignature.builder().addTimeColumn().build(), response.getLogicalSegmentSchema()); List data = response.getData(); @@ -263,8 +263,8 @@ public void testNoDataSchemaNumRows() Assert.assertEquals(3, response.getNumRowsRead()); Assert.assertEquals(0, response.getNumRowsIndexed()); Assert.assertEquals(3, response.getData().size()); - Assert.assertEquals(ImmutableList.of(), response.getDimensions()); - Assert.assertEquals(RowSignature.builder().addTimeColumn().build(), response.getSegmentSchema()); + Assert.assertEquals(ImmutableList.of(), response.getLogicalDimensions()); + Assert.assertEquals(RowSignature.builder().addTimeColumn().build(), response.getLogicalSegmentSchema()); List data = response.getData(); @@ -325,7 +325,7 @@ public void testMissingValueTimestampSpec() throws IOException new StringDimensionSchema("dim2"), new StringDimensionSchema("met1") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( parserType == ParserType.STR_JSON @@ -343,7 +343,7 @@ public void testMissingValueTimestampSpec() throws IOException .add("dim2", ColumnType.STRING) .add("met1", ColumnType.STRING) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); List data = response.getData(); @@ -466,7 +466,7 @@ public void testWithTimestampSpec() throws IOException new StringDimensionSchema("dim2"), new StringDimensionSchema("met1") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( parserType == ParserType.STR_JSON @@ -482,7 +482,7 @@ public void testWithTimestampSpec() throws IOException .add("dim2", ColumnType.STRING) .add("met1", ColumnType.STRING) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); List data = response.getData(); @@ -597,7 +597,7 @@ public void testWithTimestampSpecNestedDiscovery() throws IOException new StringDimensionSchema("dim2"), new StringDimensionSchema("met1") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( parserType == ParserType.STR_JSON @@ -613,7 +613,7 @@ public void testWithTimestampSpecNestedDiscovery() throws IOException .add("dim2", ColumnType.STRING) .add("met1", ColumnType.STRING) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); List data = response.getData(); @@ -720,7 +720,7 @@ public void testWithDimensionSpec() throws IOException new StringDimensionSchema("dim1"), new StringDimensionSchema("met1") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( RowSignature.builder() @@ -728,7 +728,7 @@ public void testWithDimensionSpec() throws IOException .add("dim1", ColumnType.STRING) .add("met1", ColumnType.STRING) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); List data = response.getData(); @@ -841,7 +841,7 @@ public void testWithNoRollup() throws IOException new StringDimensionSchema("dim1"), new StringDimensionSchema("dim2") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( RowSignature.builder() @@ -850,7 +850,7 @@ public void testWithNoRollup() throws IOException .add("dim2", ColumnType.STRING) .add("met1", ColumnType.LONG) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); List data = response.getData(); @@ -968,7 +968,7 @@ public void testWithRollup() throws IOException new StringDimensionSchema("dim1"), new StringDimensionSchema("dim2") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( RowSignature.builder() @@ -977,7 +977,7 @@ public void testWithRollup() throws IOException .add("dim2", ColumnType.STRING) .add("met1", ColumnType.LONG) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); List data = response.getData(); @@ -1066,7 +1066,7 @@ public void testWithMoreRollup() throws IOException ImmutableList.of( new StringDimensionSchema("dim1") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( RowSignature.builder() @@ -1074,7 +1074,7 @@ public void testWithMoreRollup() throws IOException .add("dim1", ColumnType.STRING) .add("met1", ColumnType.LONG) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); List data = response.getData(); @@ -1152,7 +1152,7 @@ public void testWithTransformsAutoDimensions() throws IOException new StringDimensionSchema("dim1"), new StringDimensionSchema("dim2") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( RowSignature.builder() @@ -1161,7 +1161,7 @@ public void testWithTransformsAutoDimensions() throws IOException .add("dim2", ColumnType.STRING) .add("met1", ColumnType.LONG) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); List data = response.getData(); @@ -1256,7 +1256,7 @@ public void testWithTransformsDimensionsSpec() throws IOException ImmutableList.of( new StringDimensionSchema("dim1PlusBar") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( RowSignature.builder() @@ -1264,7 +1264,7 @@ public void testWithTransformsDimensionsSpec() throws IOException .add("dim1PlusBar", ColumnType.STRING) .add("met1", ColumnType.LONG) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); List data = response.getData(); @@ -1339,7 +1339,7 @@ public void testWithFilter() throws IOException new StringDimensionSchema("dim1"), new StringDimensionSchema("dim2") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( RowSignature.builder() @@ -1348,7 +1348,7 @@ public void testWithFilter() throws IOException .add("dim2", ColumnType.STRING) .add("met1", ColumnType.LONG) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); List data = response.getData(); @@ -1442,7 +1442,7 @@ public void testIndexParseException() throws IOException ImmutableList.of( new StringDimensionSchema("dim1PlusBar") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( RowSignature.builder() @@ -1450,7 +1450,7 @@ public void testIndexParseException() throws IOException .add("dim1PlusBar", ColumnType.STRING) .add("met1", ColumnType.LONG) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); List data = response.getData(); @@ -1575,7 +1575,7 @@ public void testMultipleJsonStringInOneBlock() throws IOException ImmutableList.of( new StringDimensionSchema("dim1PlusBar") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( RowSignature.builder() @@ -1583,7 +1583,7 @@ public void testMultipleJsonStringInOneBlock() throws IOException .add("dim1PlusBar", ColumnType.STRING) .add("met1", ColumnType.LONG) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); List data = response.getData(); @@ -1714,7 +1714,7 @@ public void testRowLimiting() throws IOException new StringDimensionSchema("dim2") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( parserType == ParserType.STR_JSON @@ -1729,7 +1729,7 @@ public void testRowLimiting() throws IOException .add("dim2", ColumnType.STRING) .add("met1", ColumnType.LONG) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); } @@ -1775,7 +1775,7 @@ public void testMaxBytesInMemoryLimiting() throws IOException new StringDimensionSchema("dim1"), new StringDimensionSchema("dim2") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( parserType == ParserType.STR_JSON @@ -1790,7 +1790,7 @@ public void testMaxBytesInMemoryLimiting() throws IOException .add("dim2", ColumnType.STRING) .add("met1", ColumnType.LONG) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); } @@ -1831,7 +1831,7 @@ public void testMaxClientResponseBytesLimiting() throws IOException new StringDimensionSchema("dim1"), new StringDimensionSchema("dim2") ), - response.getDimensions() + response.getLogicalDimensions() ); Assert.assertEquals( RowSignature.builder() @@ -1840,7 +1840,7 @@ public void testMaxClientResponseBytesLimiting() throws IOException .add("dim2", ColumnType.STRING) .add("met1", ColumnType.LONG) .build(), - response.getSegmentSchema() + response.getLogicalSegmentSchema() ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java index 43cf46fa90ce..d898ca75e2f3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.client.indexing.SamplerResponse; import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.segment.NestedDataDimensionSchema; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -63,11 +64,14 @@ public void testSerde() throws IOException ImmutableList.of( new StringDimensionSchema("dim1") ), + ImmutableList.of( + new NestedDataDimensionSchema("dim1") + ), RowSignature.builder().addTimeColumn().add("dim1", ColumnType.STRING).add("met1", ColumnType.LONG).build(), data ) ); - String expected = "{\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"dimensions\":[{\"type\":\"string\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"segmentSchema\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"dim1\",\"type\":\"STRING\"},{\"name\":\"met1\",\"type\":\"LONG\"}],\"data\":[{\"input\":{\"row1\":\"val1\"},\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"input\":{\"row2\":\"val2\"},\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"input\":{\"row3\":\"val3\"},\"unparseable\":true,\"error\":\"Could not parse\"}]}"; + String expected = "{\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"logicalDimensions\":[{\"type\":\"string\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"physicalDimensions\":[{\"type\":\"NestedDataDimensionSchema\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"logicalSegmentSchema\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"dim1\",\"type\":\"STRING\"},{\"name\":\"met1\",\"type\":\"LONG\"}],\"data\":[{\"input\":{\"row1\":\"val1\"},\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"input\":{\"row2\":\"val2\"},\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"input\":{\"row3\":\"val3\"},\"unparseable\":true,\"error\":\"Could not parse\"}]}"; Assert.assertEquals(expected, out); } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java index c57f4728c403..bb07c669ab79 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionSchema.java @@ -29,7 +29,11 @@ import org.apache.druid.guice.annotations.PublicApi; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.segment.NestedDataDimensionSchema; import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.TypeSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; import java.util.Objects; @@ -43,9 +47,27 @@ @JsonSubTypes.Type(name = DimensionSchema.FLOAT_TYPE_NAME, value = FloatDimensionSchema.class), @JsonSubTypes.Type(name = DimensionSchema.DOUBLE_TYPE_NAME, value = DoubleDimensionSchema.class), @JsonSubTypes.Type(name = DimensionSchema.SPATIAL_TYPE_NAME, value = NewSpatialDimensionSchema.class), + @JsonSubTypes.Type(name = NestedDataComplexTypeSerde.TYPE_NAME, value = NestedDataDimensionSchema.class) }) public abstract class DimensionSchema { + public static DimensionSchema getDefaultSchemaForBuiltInType(String name, TypeSignature type) + { + switch (type.getType()) { + case STRING: + return new StringDimensionSchema(name); + case LONG: + return new LongDimensionSchema(name); + case FLOAT: + return new FloatDimensionSchema(name); + case DOUBLE: + return new DoubleDimensionSchema(name); + default: + // the nested column indexer can handle any type + return new NestedDataDimensionSchema(name); + } + } + public static final String STRING_TYPE_NAME = "string"; public static final String LONG_TYPE_NAME = "long"; public static final String FLOAT_TYPE_NAME = "float"; diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java index 6dfaa9ad3d0a..58a72f2ce474 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java @@ -49,7 +49,7 @@ public class DimensionsSpec private final Map dimensionSchemaMap; private final boolean includeAllDimensions; - private final boolean useNestedColumnIndexerForSchemaDiscovery; + private final boolean useSchemaDiscovery; public static final DimensionsSpec EMPTY = new DimensionsSpec(null, null, null, false, null); @@ -89,7 +89,7 @@ private DimensionsSpec( @JsonProperty("dimensionExclusions") List dimensionExclusions, @Deprecated @JsonProperty("spatialDimensions") List spatialDimensions, @JsonProperty("includeAllDimensions") boolean includeAllDimensions, - @JsonProperty("useNestedColumnIndexerForSchemaDiscovery") Boolean useNestedColumnIndexerForSchemaDiscovery + @JsonProperty("useSchemaDiscovery") Boolean useSchemaDiscovery ) { this.dimensions = dimensions == null @@ -118,8 +118,8 @@ private DimensionsSpec( dimensionSchemaMap.put(newSchema.getName(), newSchema); } this.includeAllDimensions = includeAllDimensions; - this.useNestedColumnIndexerForSchemaDiscovery = - useNestedColumnIndexerForSchemaDiscovery != null && useNestedColumnIndexerForSchemaDiscovery; + this.useSchemaDiscovery = + useSchemaDiscovery != null && useSchemaDiscovery; } @JsonProperty @@ -141,9 +141,9 @@ public boolean isIncludeAllDimensions() } @JsonProperty - public boolean useNestedColumnIndexerForSchemaDiscovery() + public boolean useSchemaDiscovery() { - return useNestedColumnIndexerForSchemaDiscovery; + return useSchemaDiscovery; } @Deprecated @@ -204,7 +204,7 @@ public DimensionsSpec withDimensions(List dims) ImmutableList.copyOf(dimensionExclusions), null, includeAllDimensions, - useNestedColumnIndexerForSchemaDiscovery + useSchemaDiscovery ); } @@ -215,7 +215,7 @@ public DimensionsSpec withDimensionExclusions(Set dimExs) ImmutableList.copyOf(Sets.union(dimensionExclusions, dimExs)), null, includeAllDimensions, - useNestedColumnIndexerForSchemaDiscovery + useSchemaDiscovery ); } @@ -227,7 +227,7 @@ public DimensionsSpec withSpatialDimensions(List spatial ImmutableList.copyOf(dimensionExclusions), spatials, includeAllDimensions, - useNestedColumnIndexerForSchemaDiscovery + useSchemaDiscovery ); } @@ -267,7 +267,7 @@ public boolean equals(Object o) } DimensionsSpec that = (DimensionsSpec) o; return includeAllDimensions == that.includeAllDimensions - && useNestedColumnIndexerForSchemaDiscovery == that.useNestedColumnIndexerForSchemaDiscovery + && useSchemaDiscovery == that.useSchemaDiscovery && Objects.equals(dimensions, that.dimensions) && Objects.equals(dimensionExclusions, that.dimensionExclusions); } @@ -279,7 +279,7 @@ public int hashCode() dimensions, dimensionExclusions, includeAllDimensions, - useNestedColumnIndexerForSchemaDiscovery + useSchemaDiscovery ); } @@ -290,7 +290,7 @@ public String toString() "dimensions=" + dimensions + ", dimensionExclusions=" + dimensionExclusions + ", includeAllDimensions=" + includeAllDimensions + - ", useNestedColumnIndexerForSchemaDiscovery=" + useNestedColumnIndexerForSchemaDiscovery + + ", useNestedColumnIndexerForSchemaDiscovery=" + useSchemaDiscovery + '}'; } diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java index aa5ecbb08674..02d16ffc9f5a 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonLineReader.java @@ -63,7 +63,7 @@ public class JsonLineReader extends TextReader flattenSpec, new JSONFlattenerMaker( keepNullColumns, - inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + inputRowSchema.getDimensionsSpec().useSchemaDiscovery() ) ); this.mapper = mapper; diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java index eaf42244cb32..a6ebb0a91136 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java @@ -84,7 +84,7 @@ public class JsonNodeReader extends IntermediateRowParsingReader flattenSpec, new JSONFlattenerMaker( keepNullColumns, - inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + inputRowSchema.getDimensionsSpec().useSchemaDiscovery() ) ); this.mapper = mapper; diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java b/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java index 1b1402f22a12..69f518aecad6 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/JsonReader.java @@ -81,7 +81,7 @@ public class JsonReader extends IntermediateRowParsingReader flattenSpec, new JSONFlattenerMaker( keepNullColumns, - inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery() + inputRowSchema.getDimensionsSpec().useSchemaDiscovery() ) ); this.mapper = mapper; diff --git a/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java b/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java index 47fe401f65c2..17833d23aea3 100644 --- a/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java +++ b/processing/src/main/java/org/apache/druid/guice/NestedDataModule.java @@ -27,7 +27,6 @@ import org.apache.druid.initialization.DruidModule; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.NestedDataDimensionHandler; -import org.apache.druid.segment.NestedDataDimensionSchema; import org.apache.druid.segment.nested.NestedDataComplexTypeSerde; import org.apache.druid.segment.nested.StructuredData; import org.apache.druid.segment.nested.StructuredDataJsonSerializer; @@ -70,7 +69,6 @@ public static List getJacksonModulesList() return Collections.singletonList( new SimpleModule("NestedDataModule") .registerSubtypes( - new NamedType(NestedDataDimensionSchema.class, NestedDataComplexTypeSerde.TYPE_NAME), new NamedType(NestedFieldVirtualColumn.class, "nested-field") ) .addSerializer(StructuredData.class, new StructuredDataJsonSerializer()) diff --git a/processing/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java b/processing/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java index b2040d73db21..2c76c14c7b79 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java @@ -219,7 +219,7 @@ public interface FlattenerMaker JsonProvider getJsonProvider(); /** * List all "root" fields. If - * {@link org.apache.druid.data.input.impl.DimensionsSpec#useNestedColumnIndexerForSchemaDiscovery} is false, this + * {@link org.apache.druid.data.input.impl.DimensionsSpec#useSchemaDiscovery} is false, this * method should filter fields to include only fields that contain primitive and lists of primitive values */ Iterable discoverRootFields(T obj); diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index ddb52c309cb8..6148bf2e0883 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -242,7 +242,7 @@ public ColumnCapabilities getColumnCapabilities(String columnName) private final AtomicLong bytesInMemory = new AtomicLong(); private final boolean useMaxMemoryEstimates; - private final boolean useNestedColumnIndexerForSchemaDiscovery; + private final boolean useSchemaDiscovery; // This is modified on add() in a critical section. private final ThreadLocal in = new ThreadLocal<>(); @@ -288,8 +288,8 @@ protected IncrementalIndex( this.deserializeComplexMetrics = deserializeComplexMetrics; this.preserveExistingMetrics = preserveExistingMetrics; this.useMaxMemoryEstimates = useMaxMemoryEstimates; - this.useNestedColumnIndexerForSchemaDiscovery = incrementalIndexSchema.getDimensionsSpec() - .useNestedColumnIndexerForSchemaDiscovery(); + this.useSchemaDiscovery = incrementalIndexSchema.getDimensionsSpec() + .useSchemaDiscovery(); this.timeAndMetricsColumnCapabilities = new HashMap<>(); this.metricDescs = Maps.newLinkedHashMap(); @@ -591,7 +591,7 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) } else { wasNewDim = true; final DimensionHandler handler; - if (useNestedColumnIndexerForSchemaDiscovery) { + if (useSchemaDiscovery) { handler = DimensionHandlerUtils.getHandlerFromCapabilities( dimension, makeDefaultCapabilitiesFromValueType(NestedDataComplexTypeSerde.TYPE), diff --git a/server/src/main/java/org/apache/druid/client/indexing/SamplerResponse.java b/server/src/main/java/org/apache/druid/client/indexing/SamplerResponse.java index cc6f5c56e309..dc12fe5e222b 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/SamplerResponse.java +++ b/server/src/main/java/org/apache/druid/client/indexing/SamplerResponse.java @@ -35,23 +35,26 @@ public class SamplerResponse private final int numRowsRead; private final int numRowsIndexed; - private final List dimensions; - private final RowSignature segmentSchema; + private final List logicalDimensions; + private final List physicalDimensions; + private final RowSignature logicalSegmentSchema; private final List data; @JsonCreator public SamplerResponse( @JsonProperty("numRowsRead") int numRowsRead, @JsonProperty("numRowsIndexed") int numRowsIndexed, - @JsonProperty("dimensions") List dimensions, - @JsonProperty("segmentSchema") RowSignature segmentSchema, + @JsonProperty("logicalDimensions") List logicalDimensions, + @JsonProperty("physicalDimensions") List physicalDimensions, + @JsonProperty("logicalSegmentSchema") RowSignature logicalSegmentSchema, @JsonProperty("data") List data ) { this.numRowsRead = numRowsRead; this.numRowsIndexed = numRowsIndexed; - this.dimensions = dimensions; - this.segmentSchema = segmentSchema; + this.logicalDimensions = logicalDimensions; + this.physicalDimensions = physicalDimensions; + this.logicalSegmentSchema = logicalSegmentSchema; this.data = data; } @@ -68,15 +71,21 @@ public int getNumRowsIndexed() } @JsonProperty - public List getDimensions() + public List getLogicalDimensions() { - return dimensions; + return logicalDimensions; } @JsonProperty - public RowSignature getSegmentSchema() + public List getPhysicalDimensions() { - return segmentSchema; + return physicalDimensions; + } + + @JsonProperty + public RowSignature getLogicalSegmentSchema() + { + return logicalSegmentSchema; } @JsonProperty @@ -97,14 +106,36 @@ public boolean equals(Object o) SamplerResponse that = (SamplerResponse) o; return getNumRowsRead() == that.getNumRowsRead() && getNumRowsIndexed() == that.getNumRowsIndexed() && - Objects.equals(segmentSchema, that.segmentSchema) && + Objects.equals(logicalDimensions, that.logicalDimensions) && + Objects.equals(physicalDimensions, that.physicalDimensions) && + Objects.equals(logicalSegmentSchema, that.logicalSegmentSchema) && Objects.equals(data, that.data); } @Override public int hashCode() { - return Objects.hash(getNumRowsRead(), getNumRowsIndexed(), segmentSchema, data); + return Objects.hash( + getNumRowsRead(), + getNumRowsIndexed(), + logicalDimensions, + physicalDimensions, + logicalSegmentSchema, + data + ); + } + + @Override + public String toString() + { + return "SamplerResponse{" + + "numRowsRead=" + numRowsRead + + ", numRowsIndexed=" + numRowsIndexed + + ", logicalDimensions=" + logicalDimensions + + ", physicalDimensions=" + physicalDimensions + + ", logicalSegmentSchema=" + logicalSegmentSchema + + ", data=" + data + + '}'; } @JsonInclude(JsonInclude.Include.NON_NULL) diff --git a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java index 0b502f9483a1..f669b4a8d872 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java @@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.segment.NestedDataDimensionSchema; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.timeline.DataSegment; @@ -85,6 +86,10 @@ public void testSample() throws Exception new StringDimensionSchema("x"), new StringDimensionSchema("y") ), + ImmutableList.of( + new NestedDataDimensionSchema("x"), + new NestedDataDimensionSchema("y") + ), RowSignature.builder().addTimeColumn().add("x", ColumnType.STRING).add("y", ColumnType.STRING).build(), ImmutableList.of( new SamplerResponse.SamplerResponseRow( @@ -144,6 +149,10 @@ public void testSampleError() throws Exception new StringDimensionSchema("x"), new StringDimensionSchema("y") ), + ImmutableList.of( + new NestedDataDimensionSchema("x"), + new NestedDataDimensionSchema("y") + ), RowSignature.builder().addTimeColumn().add("x", ColumnType.STRING).add("y", ColumnType.STRING).build(), ImmutableList.of( new SamplerResponse.SamplerResponseRow( From b43f23b3119b4f2d1a056ed5c3832b09dfd13e55 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Sat, 18 Feb 2023 19:48:08 -0800 Subject: [PATCH 4/5] fixup --- .../org/apache/druid/data/input/orc/OrcReaderTest.java | 2 +- .../input/parquet/NestedColumnParquetReaderTest.java | 4 ++-- .../data/input/protobuf/ProtobufInputFormatTest.java | 2 +- .../overlord/sampler/InputSourceSamplerTest.java | 2 +- ...pedia_local_input_source_index_task_schemaless.json | 2 +- .../apache/druid/data/input/impl/DimensionsSpec.java | 10 +++++----- .../druid/segment/NestedDataColumnIndexerTest.java | 2 +- .../src/test/resources/types-test-data-parser.json | 2 +- 8 files changed, 13 insertions(+), 13 deletions(-) diff --git a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java index cf8ba19d9b74..3013b4439ef2 100644 --- a/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java +++ b/extensions-core/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcReaderTest.java @@ -432,7 +432,7 @@ public void testNestedColumnSchemaless() throws IOException ); final InputRowSchema schema = new InputRowSchema( new TimestampSpec("ts", "millis", null), - DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(), + DimensionsSpec.builder().useSchemaDiscovery(true).build(), ColumnsFilter.all(), null ); diff --git a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java index 950039d06956..cd587f444f82 100644 --- a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java +++ b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/NestedColumnParquetReaderTest.java @@ -169,7 +169,7 @@ public void testNestedColumnSchemalessNestedTestFileNoNested() throws IOExceptio final String file = "example/flattening/test_nested_1.parquet"; InputRowSchema schema = new InputRowSchema( new TimestampSpec("timestamp", "auto", null), - DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(false).build(), + DimensionsSpec.builder().useSchemaDiscovery(false).build(), ColumnsFilter.all(), null ); @@ -206,7 +206,7 @@ public void testNestedColumnSchemalessNestedTestFile() throws IOException final String file = "example/flattening/test_nested_1.parquet"; InputRowSchema schema = new InputRowSchema( new TimestampSpec("timestamp", "auto", null), - DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(), + DimensionsSpec.builder().useSchemaDiscovery(true).build(), ColumnsFilter.all(), null ); diff --git a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java index 9255e1e7ebd2..498174852bfc 100644 --- a/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java +++ b/extensions-core/protobuf-extensions/src/test/java/org/apache/druid/data/input/protobuf/ProtobufInputFormatTest.java @@ -345,7 +345,7 @@ public void testParseNestedDataSchemaless() throws Exception InputEntityReader reader = protobufInputFormat.createReader( new InputRowSchema( timestampSpec, - DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(), + DimensionsSpec.builder().useSchemaDiscovery(true).build(), null, null ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index 92078acc1a89..a80aabdc34cb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -574,7 +574,7 @@ public void testWithTimestampSpecNestedDiscovery() throws IOException final TimestampSpec timestampSpec = new TimestampSpec("t", null, null); final DimensionsSpec dimensionsSpec = DimensionsSpec.builder() - .setUseNestedColumnIndexerForSchemaDiscovery(true) + .useSchemaDiscovery(true) .build(); final DataSchema dataSchema = createDataSchema(timestampSpec, dimensionsSpec, null, null, null); final InputSource inputSource = createInputSource(getTestRows(), dataSchema); diff --git a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json index bc96c730ee77..3fcce0f3ebaa 100644 --- a/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json +++ b/integration-tests-ex/cases/src/test/resources/indexer/wikipedia_local_input_source_index_task_schemaless.json @@ -8,7 +8,7 @@ }, "dimensionsSpec": { "dimensions": [], - "useNestedColumnIndexerForSchemaDiscovery": %%USE_NESTED_COLUMN_INDEXER%% + "useSchemaDiscovery": %%USE_NESTED_COLUMN_INDEXER%% }, "metricsSpec": [], "granularitySpec": { diff --git a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java index 58a72f2ce474..2a73d3cb30b2 100644 --- a/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java +++ b/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java @@ -290,7 +290,7 @@ public String toString() "dimensions=" + dimensions + ", dimensionExclusions=" + dimensionExclusions + ", includeAllDimensions=" + includeAllDimensions + - ", useNestedColumnIndexerForSchemaDiscovery=" + useSchemaDiscovery + + ", useSchemaDiscovery=" + useSchemaDiscovery + '}'; } @@ -301,7 +301,7 @@ public static final class Builder private List spatialDimensions; private boolean includeAllDimensions; - private boolean useNestedColumnIndexerForSchemaDiscovery; + private boolean useSchemaDiscovery; public Builder setDimensions(List dimensions) { @@ -334,9 +334,9 @@ public Builder setIncludeAllDimensions(boolean includeAllDimensions) return this; } - public Builder setUseNestedColumnIndexerForSchemaDiscovery(boolean useNestedColumnIndexerForSchemaDiscovery) + public Builder useSchemaDiscovery(boolean useSchemaDiscovery) { - this.useNestedColumnIndexerForSchemaDiscovery = useNestedColumnIndexerForSchemaDiscovery; + this.useSchemaDiscovery = useSchemaDiscovery; return this; } @@ -347,7 +347,7 @@ public DimensionsSpec build() dimensionExclusions, spatialDimensions, includeAllDimensions, - useNestedColumnIndexerForSchemaDiscovery + useSchemaDiscovery ); } } diff --git a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java index e77cffbc30e3..d489bb5c1cdb 100644 --- a/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java +++ b/processing/src/test/java/org/apache/druid/segment/NestedDataColumnIndexerTest.java @@ -546,7 +546,7 @@ private static IncrementalIndex makeIncrementalIndex(long minTimestamp) new TimestampSpec(TIME_COL, "millis", null), Granularities.NONE, VirtualColumns.EMPTY, - DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(), + DimensionsSpec.builder().useSchemaDiscovery(true).build(), new AggregatorFactory[0], false ) diff --git a/processing/src/test/resources/types-test-data-parser.json b/processing/src/test/resources/types-test-data-parser.json index c148d6ef2d92..5aee72a48b21 100644 --- a/processing/src/test/resources/types-test-data-parser.json +++ b/processing/src/test/resources/types-test-data-parser.json @@ -10,7 +10,7 @@ "dimensions": [], "dimensionExclusions": [], "spatialDimensions": [], - "useNestedColumnIndexerForSchemaDiscovery": true + "useSchemaDiscovery": true } } } From 0adcc38000940653738b9027dadfce32db357b7b Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Sat, 18 Feb 2023 21:22:53 -0800 Subject: [PATCH 5/5] fix --- .../druid/indexing/overlord/sampler/SamplerResponseTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java index d898ca75e2f3..446704f065a6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/SamplerResponseTest.java @@ -71,7 +71,7 @@ public void testSerde() throws IOException data ) ); - String expected = "{\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"logicalDimensions\":[{\"type\":\"string\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"physicalDimensions\":[{\"type\":\"NestedDataDimensionSchema\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"logicalSegmentSchema\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"dim1\",\"type\":\"STRING\"},{\"name\":\"met1\",\"type\":\"LONG\"}],\"data\":[{\"input\":{\"row1\":\"val1\"},\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"input\":{\"row2\":\"val2\"},\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"input\":{\"row3\":\"val3\"},\"unparseable\":true,\"error\":\"Could not parse\"}]}"; + String expected = "{\"numRowsRead\":1123,\"numRowsIndexed\":1112,\"logicalDimensions\":[{\"type\":\"string\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"physicalDimensions\":[{\"type\":\"json\",\"name\":\"dim1\",\"multiValueHandling\":\"SORTED_ARRAY\",\"createBitmapIndex\":true}],\"logicalSegmentSchema\":[{\"name\":\"__time\",\"type\":\"LONG\"},{\"name\":\"dim1\",\"type\":\"STRING\"},{\"name\":\"met1\",\"type\":\"LONG\"}],\"data\":[{\"input\":{\"row1\":\"val1\"},\"parsed\":{\"t\":123456,\"dim1\":\"foo\",\"met1\":6}},{\"input\":{\"row2\":\"val2\"},\"parsed\":{\"t\":123457,\"dim1\":\"foo2\",\"met1\":7}},{\"input\":{\"row3\":\"val3\"},\"unparseable\":true,\"error\":\"Could not parse\"}]}"; Assert.assertEquals(expected, out); }