Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions exec/java-exec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,6 @@
<exclusions>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-format</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
Expand Down Expand Up @@ -251,10 +250,6 @@ private void newSchema() throws IOException {
// We don't want this number to be too small either. Ideally, slightly bigger than the page size,
// but not bigger than the block buffer
int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
// TODO: Use initialSlabSize from ParquetProperties once drill will be updated to the latest version of Parquet library
int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(64, pageSize, 10);
// TODO: Replace ParquetColumnChunkPageWriteStore with ColumnChunkPageWriteStore from parquet library
// once PARQUET-1006 will be resolved
ParquetProperties parquetProperties = ParquetProperties.builder()
.withPageSize(pageSize)
.withDictionaryEncoding(enableDictionary)
Expand All @@ -263,10 +258,11 @@ private void newSchema() throws IOException {
.withAllocator(new ParquetDirectByteBufferAllocator(oContext))
.withValuesWriterFactory(new DefaultV1ValuesWriterFactory())
.build();
pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, initialSlabSize,
pageSize, parquetProperties.getAllocator(), parquetProperties.getPageWriteChecksumEnabled(),
parquetProperties.getColumnIndexTruncateLength()
);
// TODO: Replace ParquetColumnChunkPageWriteStore with ColumnChunkPageWriteStore from parquet library
// once DRILL-7906 (PARQUET-1006) will be resolved
pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema,
parquetProperties.getInitialSlabSize(), pageSize, parquetProperties.getAllocator(),
parquetProperties.getColumnIndexTruncateLength(), parquetProperties.getPageWriteChecksumEnabled());
store = new ColumnWriteStoreV1(pageStore, parquetProperties);
MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
consumer = columnIO.getRecordWriter(store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,15 @@ static ColumnReader<?> createFixedColumnReader(ParquetRecordReader recordReader,
} else if (convertedType == ConvertedType.INTERVAL) {
return new NullableFixedByteAlignedReaders.NullableIntervalReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (NullableIntervalVector) v, schemaElement);
} else {
return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
}
} else {
return getNullableColumnReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, v, schemaElement);
}
}
throw new Exception("Unexpected parquet metadata configuration.");
}

static VarLengthValuesColumn<?> getReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;

Expand Down Expand Up @@ -71,6 +72,7 @@
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Type.Repetition;
Expand Down Expand Up @@ -328,23 +330,30 @@ protected PrimitiveConverter getConverterForType(String name, PrimitiveType type
}
}
case FIXED_LEN_BYTE_ARRAY:
switch (type.getOriginalType()) {
case DECIMAL: {
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveConverter> typeAnnotationVisitor = new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveConverter>() {
@Override
public Optional<PrimitiveConverter> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
ParquetReaderUtility.checkDecimalTypeEnabled(options);
return getVarDecimalConverter(name, type);
return Optional.of(getVarDecimalConverter(name, type));
}
case INTERVAL: {

@Override
public Optional<PrimitiveConverter> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
IntervalWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).interval(), l -> l.list().interval())
: getWriter(name, (m, f) -> m.interval(f), l -> l.interval());
return new DrillFixedLengthByteArrayToInterval(writer);
: getWriter(name, MapWriter::interval, ListWriter::interval);
return Optional.of(new DrillFixedLengthByteArrayToInterval(writer));
}
default: {
};

LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
if (logicalTypeAnnotation != null) {
return logicalTypeAnnotation.accept(typeAnnotationVisitor).orElseGet(() -> {
VarBinaryWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).varBinary(), l -> l.list().varBinary())
: getWriter(name, (m, f) -> m.varBinary(f), l -> l.varBinary());
: getWriter(name, MapWriter::varBinary, ListWriter::varBinary);
return new DrillFixedBinaryToVarbinaryConverter(writer, type.getTypeLength(), mutator.getManagedBuffer());
}
});
}
default:
throw new UnsupportedOperationException("Unsupported type: " + type.getPrimitiveTypeName());
Expand Down
Loading