Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class AvroOCFReader extends IntermediateRowParsingReader<GenericRecord>
false,
binaryAsString,
extractUnionsByType,
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
inputRowSchema.getDimensionsSpec().useSchemaDiscovery()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static ObjectFlattener<GenericRecord> makeFlattener(
fromPigAvroStorage,
binaryAsString,
extractUnionsByType,
dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery()
dimensionsSpec != null && dimensionsSpec.useSchemaDiscovery()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class AvroStreamReader extends IntermediateRowParsingReader<GenericRecord
false,
binaryAsString,
extractUnionsByType,
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
inputRowSchema.getDimensionsSpec().useSchemaDiscovery()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public OrcHadoopInputRowParser(
flattenSpec,
new OrcStructFlattenerMaker(
this.binaryAsString,
dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery()
dimensionsSpec != null && dimensionsSpec.useSchemaDiscovery()
)
);
this.parser = new MapInputRowParser(parseSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class OrcReader extends IntermediateRowParsingReader<OrcStruct>
flattenSpec,
new OrcStructFlattenerMaker(
binaryAsString,
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
inputRowSchema.getDimensionsSpec().useSchemaDiscovery()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ public void testNestedColumnSchemaless() throws IOException
);
final InputRowSchema schema = new InputRowSchema(
new TimestampSpec("ts", "millis", null),
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(),
DimensionsSpec.builder().useSchemaDiscovery(true).build(),
ColumnsFilter.all(),
null
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class ParquetReader extends IntermediateRowParsingReader<Group>
flattenSpec,
new ParquetGroupFlattenerMaker(
binaryAsString,
inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery()
inputRowSchema.getDimensionsSpec().useSchemaDiscovery()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public ParquetAvroHadoopInputRowParser(
false,
this.binaryAsString,
this.extractUnionsByType,
dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery()
dimensionsSpec != null && dimensionsSpec.useSchemaDiscovery()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public ParquetHadoopInputRowParser(
flattenSpec,
new ParquetGroupFlattenerMaker(
this.binaryAsString,
dimensionsSpec != null && dimensionsSpec.useNestedColumnIndexerForSchemaDiscovery()
dimensionsSpec != null && dimensionsSpec.useSchemaDiscovery()
)
);
this.parser = new MapInputRowParser(parseSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testNestedColumnSchemalessNestedTestFileNoNested() throws IOExceptio
final String file = "example/flattening/test_nested_1.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(false).build(),
DimensionsSpec.builder().useSchemaDiscovery(false).build(),
ColumnsFilter.all(),
null
);
Expand Down Expand Up @@ -206,7 +206,7 @@ public void testNestedColumnSchemalessNestedTestFile() throws IOException
final String file = "example/flattening/test_nested_1.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(),
DimensionsSpec.builder().useSchemaDiscovery(true).build(),
ColumnsFilter.all(),
null
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class ProtobufReader extends IntermediateRowParsingReader<DynamicMessage>
this.inputRowSchema = inputRowSchema;
this.recordFlattener = ObjectFlatteners.create(
flattenSpec,
new ProtobufFlattenerMaker(inputRowSchema.getDimensionsSpec().useNestedColumnIndexerForSchemaDiscovery())
new ProtobufFlattenerMaker(inputRowSchema.getDimensionsSpec().useSchemaDiscovery())
);
this.source = source;
this.protobufBytesDecoder = protobufBytesDecoder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public void testParseNestedDataSchemaless() throws Exception
InputEntityReader reader = protobufInputFormat.createReader(
new InputRowSchema(
timestampSpec,
DimensionsSpec.builder().setUseNestedColumnIndexerForSchemaDiscovery(true).build(),
DimensionsSpec.builder().useSchemaDiscovery(true).build(),
null,
null
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.Row;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimedShutoffInputSourceReader;
import org.apache.druid.data.input.impl.TimestampSpec;
Expand All @@ -44,6 +45,8 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
Expand Down Expand Up @@ -232,9 +235,41 @@ public SamplerResponse sample(

int numRowsRead = responseRows.size();

List<DimensionSchema> logicalDimensionSchemas = new ArrayList<>();
List<DimensionSchema> physicalDimensionSchemas = new ArrayList<>();

RowSignature.Builder signatureBuilder = RowSignature.builder();
signatureBuilder.add(
ColumnHolder.TIME_COLUMN_NAME,
index.getColumnCapabilities(ColumnHolder.TIME_COLUMN_NAME).toColumnType()
);
for (IncrementalIndex.DimensionDesc dimensionDesc : index.getDimensions()) {
if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(dimensionDesc.getName())) {
final ColumnType columnType = dimensionDesc.getCapabilities().toColumnType();
signatureBuilder.add(dimensionDesc.getName(), columnType);
logicalDimensionSchemas.add(
DimensionSchema.getDefaultSchemaForBuiltInType(dimensionDesc.getName(), dimensionDesc.getCapabilities())
);
physicalDimensionSchemas.add(
dimensionDesc.getHandler().getDimensionSchema(dimensionDesc.getCapabilities())
);
}
}
for (AggregatorFactory aggregatorFactory : index.getMetricAggs()) {
if (!SamplerInputRow.SAMPLER_ORDERING_COLUMN.equals(aggregatorFactory.getName())) {
signatureBuilder.add(
aggregatorFactory.getName(),
index.getColumnCapabilities(aggregatorFactory.getName()).toColumnType()
);
}
}

return new SamplerResponse(
numRowsRead,
numRowsIndexed,
logicalDimensionSchemas,
physicalDimensionSchemas,
signatureBuilder.build(),
responseRows.stream()
.filter(Objects::nonNull)
.filter(x -> x.getParsed() != null || x.isUnparseable() != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;

public class CsvInputSourceSamplerTest
public class CsvInputSourceSamplerTest extends InitializedNullHandlingTest
{
@Test
public void testCSVColumnAllNull()
Expand Down Expand Up @@ -73,6 +77,25 @@ public void testCSVColumnAllNull()
Assert.assertEquals(4, response.getNumRowsRead());
Assert.assertEquals(4, response.getNumRowsIndexed());
Assert.assertEquals(4, response.getData().size());
Assert.assertEquals(
ImmutableList.of(
new StringDimensionSchema("FirstName"),
new StringDimensionSchema("LastName"),
new StringDimensionSchema("Number"),
new StringDimensionSchema("Gender")
),
response.getLogicalDimensions()
);
Assert.assertEquals(
RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("FirstName", ColumnType.STRING)
.add("LastName", ColumnType.STRING)
.add("Number", ColumnType.STRING)
.add("Gender", ColumnType.STRING)
.build(),
response.getLogicalSegmentSchema()
);

List<SamplerResponseRow> data = response.getData();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.indexing.SamplerResponse;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.DataSchema;
import org.easymock.Capture;
import org.easymock.EasyMock;
Expand Down Expand Up @@ -104,7 +106,7 @@ public void testSerde() throws IOException
EasyMock.capture(capturedInputFormat),
EasyMock.capture(capturedDataSchema),
EasyMock.capture(capturedSamplerConfig)
)).andReturn(new SamplerResponse(0, 0, null));
)).andReturn(new SamplerResponse(0, 0, ImmutableList.of(), ImmutableList.of(), RowSignature.empty(), null));

replayAll();

Expand Down
Loading