From 5a5e382452ae18194c20385a514351e9cc31d0b0 Mon Sep 17 00:00:00 2001 From: Nandor Kollar Date: Wed, 6 Mar 2019 14:11:58 +0100 Subject: [PATCH 1/2] [SPARK-26509][SQL] Parquet DELTA_BYTE_ARRAY is not supported in Spark 2.x's Vectorized Reader --- .../parquet/VectorizedColumnReader.java | 18 +++- .../VectorizedDeltaBinaryPackedReader.java | 91 +++++++++++++++++ .../VectorizedDeltaByteArrayReader.java | 98 +++++++++++++++++++ .../parquet/ParquetEncodingSuite.scala | 36 +++++++ 4 files changed, 239 insertions(+), 4 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaByteArrayReader.java diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index ba26b57567e64..4aa3070d29dfc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -580,10 +580,7 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) thr this.dataColumn = new VectorizedRleValuesReader(); this.isCurrentPageDictionaryEncoded = true; } else { - if (dataEncoding != Encoding.PLAIN) { - throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding); - } - this.dataColumn = new VectorizedPlainValuesReader(); + this.dataColumn = getValuesReader(dataEncoding); this.isCurrentPageDictionaryEncoded = false; } @@ -594,6 +591,19 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) thr } } + private ValuesReader getValuesReader(Encoding encoding) { + switch (encoding) { + case PLAIN: + return new VectorizedPlainValuesReader(); + case DELTA_BYTE_ARRAY: + return new VectorizedDeltaByteArrayReader(); + case DELTA_BINARY_PACKED: + return new VectorizedDeltaBinaryPackedReader(); + default: + throw new UnsupportedOperationException("Unsupported encoding: " + encoding); + } + } + private void readPageV1(DataPageV1 page) throws IOException { this.pageValueCount = page.getValueCount(); ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java new file mode 100644 index 0000000000000..f6885214489ea --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet; + +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; +import org.apache.parquet.io.api.Binary; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; + +import java.io.IOException; + +/** + * An implementation of the Parquet DELTA_BINARY_PACKED decoder that supports the vectorized interface. + */ +public class VectorizedDeltaBinaryPackedReader extends ValuesReader implements VectorizedValuesReader { + private final DeltaBinaryPackingValuesReader deltaBinaryPackingValuesReader = new DeltaBinaryPackingValuesReader(); + + @Override + public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { + deltaBinaryPackingValuesReader.initFromPage(valueCount, in); + } + + @Override + public void skip() { + throw new UnsupportedOperationException(); + } + + @Override + public byte readByte() { + throw new UnsupportedOperationException(); + } + + @Override + public Binary readBinary(int len) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBooleans(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBytes(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public void readIntegers(int total, WritableColumnVector c, int rowId) { + for (int i =0; i ParquetProperties.WriterVersion.PARQUET_2_0.toString, + ParquetOutputFormat.ENABLE_DICTIONARY -> "false" + ) + + val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true", + ParquetOutputFormat.JOB_SUMMARY_LEVEL -> "ALL") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/test.parquet" + val data = (1 to 3).map { i => + (i, i.toLong, s"test_${i}") + } + + spark.createDataFrame(data).write.options(extraOptions).mode("overwrite").parquet(path) + + val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head + val columnChunkMetadataList = blockMetadata.getColumns.asScala + + // Verify that indeed delta encoding is used for each column + assert(columnChunkMetadataList.length === 3) + assert(columnChunkMetadataList(0).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + assert(columnChunkMetadataList(1).getEncodings.contains(Encoding.DELTA_BINARY_PACKED)) + assert(columnChunkMetadataList(2).getEncodings.contains(Encoding.DELTA_BYTE_ARRAY)) + + val actual = spark.read.parquet(path).collect + assert(actual.sortBy(_.getInt(0)) === data.map(Row.fromTuple)); + } + } + } } From 35fc3029de19bbd7b5326b1d962cc29af8be7cbd Mon Sep 17 00:00:00 2001 From: Nandor Kollar Date: Wed, 6 Mar 2019 17:36:26 +0100 Subject: [PATCH 2/2] Fix style errors --- .../VectorizedDeltaBinaryPackedReader.java | 18 ++++++++++-------- .../VectorizedDeltaByteArrayReader.java | 3 ++- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java index f6885214489ea..b573dadd2fe0c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaBinaryPackedReader.java @@ -25,14 +25,16 @@ import java.io.IOException; /** - * An implementation of the Parquet DELTA_BINARY_PACKED decoder that supports the vectorized interface. + * An implementation of the Parquet DELTA_BINARY_PACKED decoder + * that supports the vectorized interface. */ -public class VectorizedDeltaBinaryPackedReader extends ValuesReader implements VectorizedValuesReader { - private final DeltaBinaryPackingValuesReader deltaBinaryPackingValuesReader = new DeltaBinaryPackingValuesReader(); +public class VectorizedDeltaBinaryPackedReader extends ValuesReader + implements VectorizedValuesReader { + private final DeltaBinaryPackingValuesReader valuesReader = new DeltaBinaryPackingValuesReader(); @Override public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { - deltaBinaryPackingValuesReader.initFromPage(valueCount, in); + valuesReader.initFromPage(valueCount, in); } @Override @@ -62,15 +64,15 @@ public void readBytes(int total, WritableColumnVector c, int rowId) { @Override public void readIntegers(int total, WritableColumnVector c, int rowId) { - for (int i =0; i