diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java deleted file mode 100644 index 1d9ccdad509..00000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.parquet; - -import io.netty.buffer.DrillBuf; - -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.hadoop.fs.FSDataInputStream; - -import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.format.PageHeader; -import org.apache.parquet.format.Util; -import org.apache.parquet.hadoop.util.HadoopStreams; - -/** - * @deprecated it is never used. So can be removed in Drill 1.21.0 - */ -public class ColumnDataReader { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class); - - private final long endPosition; - public final FSDataInputStream input; - - public ColumnDataReader(FSDataInputStream input, long start, long length) throws IOException{ - this.input = input; - this.input.seek(start); - this.endPosition = start + length; - } - - public PageHeader readPageHeader() throws IOException{ - return Util.readPageHeader(input); - } - - public FSDataInputStream getInputStream() { - return input; - } - - public BytesInput getPageAsBytesInput(int pageLength) throws IOException{ - byte[] b = new byte[pageLength]; - input.read(b); - return new HadoopBytesInput(b); - } - - public void loadPage(DrillBuf target, int pageLength) throws IOException { - target.clear(); - HadoopStreams.wrap(input).read(target.nioBuffer(0, pageLength)); - target.writerIndex(pageLength); - } - - public void clear(){ - try{ - input.close(); - }catch(IOException ex){ - logger.warn("Error while closing input stream.", ex); - } - } - - public boolean hasRemainder() throws IOException{ - return input.getPos() < endPosition; - } - - public class HadoopBytesInput extends BytesInput{ - - private final byte[] pageBytes; - - public HadoopBytesInput(byte[] pageBytes) { - super(); - this.pageBytes = pageBytes; - } - - @Override - public byte[] toByteArray() throws IOException { - return pageBytes; - } - - @Override - public long size() { - return pageBytes.length; - } - - @Override - public void writeAllTo(OutputStream out) throws IOException { - out.write(pageBytes); - } - } -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index ae76971a373..87b73bf842a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -55,7 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -417,13 +416,10 @@ public void close() { recordReader = null; recordMaterializer = null; nullFilledVectors = null; - try { - if (pageReadStore != null) { - pageReadStore.close(); - pageReadStore = null; - } - } catch (IOException e) { - logger.warn("Failure while closing PageReadStore", e); + + if (pageReadStore != null) { + pageReadStore.close(); + pageReadStore = null; } } diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java index 7834eaa8166..64816bfd666 100644 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java @@ -17,13 +17,7 @@ */ package org.apache.parquet.hadoop; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +import io.netty.buffer.ByteBuf; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.memory.BufferAllocator; @@ -47,11 +41,16 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.util.HadoopStreams; - -import io.netty.buffer.ByteBuf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class ColumnChunkIncReadStore implements PageReadStore { private static final Logger logger = LoggerFactory.getLogger(ColumnChunkIncReadStore.class); @@ -295,9 +294,13 @@ public void addColumn(ColumnDescriptor descriptor, ColumnChunkMetaData metaData) columns.put(descriptor, reader); } - public void close() throws IOException { + public void close() { for (FSDataInputStream stream : streams) { - stream.close(); + try { + stream.close(); + } catch (IOException e) { + logger.warn("Error closing stream: {}", e.getMessage(), e); + } } for (ColumnChunkIncPageReader reader : columns.values()) { reader.close(); diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java index 28dfc278596..04d6109d2a0 100644 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java @@ -17,16 +17,7 @@ */ package org.apache.parquet.hadoop; -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.zip.CRC32; - +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; import org.apache.parquet.column.ColumnDescriptor; @@ -35,6 +26,7 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.PageWriteStore; import org.apache.parquet.column.page.PageWriter; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; @@ -51,11 +43,20 @@ import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; import org.apache.parquet.io.ParquetEncodingException; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.zip.CRC32; + @InterfaceAudience.Private public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore, AutoCloseable { @@ -152,6 +153,20 @@ public void writePage(BytesInput bytesInput, int valueCount, Statistics stati writePage(bytesInput, valueCount, -1, statistics, rlEncoding, dlEncoding, valuesEncoding); } + @Override + public void writePage( + BytesInput bytesInput, + int valueCount, + int rowCount, + Statistics statistics, + SizeStatistics sizeStatistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) + throws IOException { + writePage(bytesInput, valueCount, rowCount, statistics, rlEncoding, dlEncoding, valuesEncoding); + } + @Override public void writePage(BytesInput bytes, int valueCount, @@ -399,7 +414,7 @@ public void close() { } } - private final Map writers = new HashMap(); + private final Map writers = new HashMap(); private final MessageType schema; public ParquetColumnChunkPageWriteStore(BytesInputCompressor compressor, MessageType schema, int initialSlabSize, diff --git a/pom.xml b/pom.xml index 5e96f05fe28..e292dc44a35 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 4.9.3 9.5 1.23.0 - 1.11.4 + 1.12.0 1.78.1 2.9.3 org.apache.calcite @@ -125,8 +125,8 @@ 2.0.65.Final 4.1.115.Final - 2.9.0 - 1.12.3 + 2.11.0 + 1.15.1 1676438963 3.25.5 ${project.basedir}/src/main/protobuf/