diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index 25361d5633d..c2128525371 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -19,6 +19,7 @@ #define ARROW_IO_INTERFACES_H #include +#include namespace arrow { diff --git a/cpp/src/arrow/parquet/io.cc b/cpp/src/arrow/parquet/io.cc index c81aa8c4da9..b6fdd67d15b 100644 --- a/cpp/src/arrow/parquet/io.cc +++ b/cpp/src/arrow/parquet/io.cc @@ -55,12 +55,23 @@ void ParquetAllocator::Free(uint8_t* buffer, int64_t size) { // ---------------------------------------------------------------------- // ParquetReadSource -ParquetReadSource::ParquetReadSource( - const std::shared_ptr& file, ParquetAllocator* allocator) - : file_(file), allocator_(allocator) {} +ParquetReadSource::ParquetReadSource(ParquetAllocator* allocator) + : file_(nullptr), allocator_(allocator) {} + +Status ParquetReadSource::Open(const std::shared_ptr& file) { + int64_t file_size; + RETURN_NOT_OK(file->GetSize(&file_size)); + + file_ = file; + size_ = file_size; + return Status::OK(); +} void ParquetReadSource::Close() { - PARQUET_THROW_NOT_OK(file_->Close()); + // TODO(wesm): Make this a no-op for now. This leaves Python wrappers for + // these classes in a borked state. Probably better to explicitly close. + + // PARQUET_THROW_NOT_OK(file_->Close()); } int64_t ParquetReadSource::Tell() const { diff --git a/cpp/src/arrow/parquet/io.h b/cpp/src/arrow/parquet/io.h index ef8871da4df..1c59695c6c1 100644 --- a/cpp/src/arrow/parquet/io.h +++ b/cpp/src/arrow/parquet/io.h @@ -49,7 +49,9 @@ class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator { uint8_t* Malloc(int64_t size) override; void Free(uint8_t* buffer, int64_t size) override; - MemoryPool* pool() { return pool_; } + void set_pool(MemoryPool* pool) { pool_ = pool; } + + MemoryPool* pool() const { return pool_; } private: MemoryPool* pool_; @@ -57,8 +59,10 @@ class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator { class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource { public: - ParquetReadSource( - const std::shared_ptr& file, ParquetAllocator* allocator); + explicit ParquetReadSource(ParquetAllocator* allocator); + + // We need to ask for the file size on opening the file, and this can fail + Status Open(const std::shared_ptr& file); void Close() override; int64_t Tell() const override; diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index 7e724b31e38..6615457c483 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -23,6 +23,7 @@ #include "gtest/gtest.h" #include "arrow/parquet/io.h" +#include "arrow/test-util.h" #include "arrow/util/memory-pool.h" #include "arrow/util/status.h" @@ -147,9 +148,12 @@ TEST(TestParquetReadSource, Basics) { std::string data = "this is the data"; auto data_buffer = reinterpret_cast(data.c_str()); - ParquetAllocator allocator; + ParquetAllocator allocator(default_memory_pool()); + auto file = std::make_shared(data_buffer, data.size()); - auto source = std::make_shared(file, &allocator); + auto source = std::make_shared(&allocator); + + ASSERT_OK(source->Open(file)); ASSERT_EQ(0, source->Tell()); ASSERT_NO_THROW(source->Seek(5)); diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index c7c400e9573..e92967e5363 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -23,6 +23,7 @@ #include #include "arrow/column.h" +#include "arrow/parquet/io.h" #include "arrow/parquet/schema.h" #include "arrow/parquet/utils.h" #include "arrow/schema.h" @@ -35,6 +36,10 @@ using parquet::ColumnReader; using parquet::Repetition; using parquet::TypedColumnReader; +// Help reduce verbosity +using ParquetRAS = parquet::RandomAccessSource; +using ParquetReader = parquet::ParquetFileReader; + namespace arrow { namespace parquet { @@ -181,6 +186,21 @@ FileReader::FileReader( FileReader::~FileReader() {} +// Static ctor +Status OpenFile(const std::shared_ptr& file, + ParquetAllocator* allocator, std::unique_ptr* reader) { + std::unique_ptr source(new ParquetReadSource(allocator)); + RETURN_NOT_OK(source->Open(file)); + + // TODO(wesm): reader properties + std::unique_ptr pq_reader; + PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(std::move(source))); + + // Use the same memory pool as the ParquetAllocator + reader->reset(new FileReader(allocator->pool(), std::move(pq_reader))); + return Status::OK(); +} + Status FileReader::GetFlatColumn(int i, std::unique_ptr* out) { return impl_->GetFlatColumn(i, out); } diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h index 2c8a9dfd025..f1492f64521 100644 --- a/cpp/src/arrow/parquet/reader.h +++ b/cpp/src/arrow/parquet/reader.h @@ -23,6 +23,8 @@ #include "parquet/api/reader.h" #include "parquet/api/schema.h" +#include "arrow/io/interfaces.h" +#include "arrow/parquet/io.h" #include "arrow/util/visibility.h" namespace arrow { @@ -99,7 +101,7 @@ class ARROW_EXPORT FileReader { virtual ~FileReader(); private: - class Impl; + class ARROW_NO_EXPORT Impl; std::unique_ptr impl_; }; @@ -125,15 +127,20 @@ class ARROW_EXPORT FlatColumnReader { Status NextBatch(int batch_size, std::shared_ptr* out); private: - class Impl; + class ARROW_NO_EXPORT Impl; std::unique_ptr impl_; explicit FlatColumnReader(std::unique_ptr impl); friend class FileReader; }; -} // namespace parquet +// Helper function to create a file reader from an implementation of an Arrow +// readable file +ARROW_EXPORT +Status OpenFile(const std::shared_ptr& file, + ParquetAllocator* allocator, std::unique_ptr* reader); +} // namespace parquet } // namespace arrow #endif // ARROW_PARQUET_READER_H diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc index 0139edd3bb8..f9514aa2ad2 100644 --- a/cpp/src/arrow/parquet/writer.cc +++ b/cpp/src/arrow/parquet/writer.cc @@ -35,7 +35,6 @@ using parquet::ParquetVersion; using parquet::schema::GroupNode; namespace arrow { - namespace parquet { class FileWriter::Impl { diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h index 45d0fd59868..5aa1ba58717 100644 --- a/cpp/src/arrow/parquet/writer.h +++ b/cpp/src/arrow/parquet/writer.h @@ -55,7 +55,7 @@ class ARROW_EXPORT FileWriter { MemoryPool* memory_pool() const; private: - class Impl; + class ARROW_NO_EXPORT Impl; std::unique_ptr impl_; }; diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd index d0fb8f9f000..734ace6c923 100644 --- a/python/pyarrow/includes/libarrow_io.pxd +++ b/python/pyarrow/includes/libarrow_io.pxd @@ -19,11 +19,37 @@ from pyarrow.includes.common cimport * -cdef extern from "arrow/io/interfaces.h" nogil: +cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil: + enum FileMode" arrow::io::FileMode::type": + FileMode_READ" arrow::io::FileMode::READ" + FileMode_WRITE" arrow::io::FileMode::WRITE" + FileMode_READWRITE" arrow::io::FileMode::READWRITE" + enum ObjectType" arrow::io::ObjectType::type": ObjectType_FILE" arrow::io::ObjectType::FILE" ObjectType_DIRECTORY" arrow::io::ObjectType::DIRECTORY" + cdef cppclass FileBase: + CStatus Close() + CStatus Tell(int64_t* position) + + cdef cppclass ReadableFile(FileBase): + CStatus GetSize(int64_t* size) + CStatus Read(int64_t nbytes, int64_t* bytes_read, + uint8_t* buffer) + + CStatus ReadAt(int64_t position, int64_t nbytes, + int64_t* bytes_read, uint8_t* buffer) + + cdef cppclass RandomAccessFile(ReadableFile): + CStatus Seek(int64_t position) + + cdef cppclass WriteableFile(FileBase): + CStatus Write(const uint8_t* buffer, int64_t nbytes) + # CStatus Write(const uint8_t* buffer, int64_t nbytes, + # int64_t* bytes_written) + + cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: CStatus ConnectLibHdfs() @@ -44,24 +70,11 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: int64_t block_size int16_t permissions - cdef cppclass CHdfsFile: - CStatus Close() - CStatus Seek(int64_t position) - CStatus Tell(int64_t* position) - - cdef cppclass HdfsReadableFile(CHdfsFile): - CStatus GetSize(int64_t* size) - CStatus Read(int64_t nbytes, int64_t* bytes_read, - uint8_t* buffer) - - CStatus ReadAt(int64_t position, int64_t nbytes, - int64_t* bytes_read, uint8_t* buffer) - - cdef cppclass HdfsWriteableFile(CHdfsFile): - CStatus Write(const uint8_t* buffer, int64_t nbytes) + cdef cppclass HdfsReadableFile(RandomAccessFile): + pass - CStatus Write(const uint8_t* buffer, int64_t nbytes, - int64_t* bytes_written) + cdef cppclass HdfsWriteableFile(WriteableFile): + pass cdef cppclass CHdfsClient" arrow::io::HdfsClient": @staticmethod diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index a2f83ea5ea5..fe24f593e32 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -19,6 +19,7 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool +from pyarrow.includes.libarrow_io cimport RandomAccessFile cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: @@ -90,19 +91,36 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: shared_ptr[WriterProperties] build() +cdef extern from "arrow/parquet/io.h" namespace "arrow::parquet" nogil: + cdef cppclass ParquetAllocator: + ParquetAllocator() + ParquetAllocator(MemoryPool* pool) + MemoryPool* pool() + void set_pool(MemoryPool* pool) + + cdef cppclass ParquetReadSource: + ParquetReadSource(ParquetAllocator* allocator) + Open(const shared_ptr[RandomAccessFile]& file) + + cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil: + CStatus OpenFile(const shared_ptr[RandomAccessFile]& file, + ParquetAllocator* allocator, + unique_ptr[FileReader]* reader) + cdef cppclass FileReader: FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader) CStatus ReadFlatTable(shared_ptr[CTable]* out); cdef extern from "arrow/parquet/schema.h" namespace "arrow::parquet" nogil: - CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema, shared_ptr[CSchema]* out) - CStatus ToParquetSchema(const CSchema* arrow_schema, shared_ptr[SchemaDescriptor]* out) + CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema, + shared_ptr[CSchema]* out) + CStatus ToParquetSchema(const CSchema* arrow_schema, + shared_ptr[SchemaDescriptor]* out) cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil: cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool, const shared_ptr[OutputStream]& sink, int64_t chunk_size, const shared_ptr[WriterProperties]& properties) - diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd new file mode 100644 index 00000000000..b92af72704a --- /dev/null +++ b/python/pyarrow/io.pxd @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language = c++ + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_io cimport RandomAccessFile, WriteableFile + + +cdef class NativeFileInterface: + + # By implementing these "virtual" functions (all functions in Cython + # extension classes are technically virtual in the C++ sense)m we can + # expose the arrow::io abstract file interfaces to other components + # throughout the suite of Arrow C++ libraries + cdef read_handle(self, shared_ptr[RandomAccessFile]* file) + cdef write_handle(self, shared_ptr[WriteableFile]* file) diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 071eea5ba6e..b8bf8835620 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -164,7 +164,7 @@ cdef class HdfsClient: .ListDirectory(c_path, &listing)) cdef const HdfsPathInfo* info - for i in range(listing.size()): + for i in range( listing.size()): info = &listing[i] # Try to trim off the hdfs://HOST:PORT piece @@ -314,8 +314,15 @@ cdef class HdfsClient: f = self.open(path, 'rb', buffer_size=buffer_size) f.download(stream) +cdef class NativeFileInterface: -cdef class HdfsFile: + cdef read_handle(self, shared_ptr[RandomAccessFile]* file): + raise NotImplementedError + + cdef write_handle(self, shared_ptr[WriteableFile]* file): + raise NotImplementedError + +cdef class HdfsFile(NativeFileInterface): cdef: shared_ptr[HdfsReadableFile] rd_file shared_ptr[HdfsWriteableFile] wr_file @@ -357,6 +364,14 @@ cdef class HdfsFile: if self.is_readonly: raise IOError("only valid on writeonly files") + cdef read_handle(self, shared_ptr[RandomAccessFile]* file): + self._assert_readable() + file[0] = self.rd_file + + cdef write_handle(self, shared_ptr[WriteableFile]* file): + self._assert_writeable() + file[0] = self.wr_file + def size(self): cdef int64_t size self._assert_readable() diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index 0b2b2088033..ebba1a17ac7 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -20,34 +20,75 @@ # cython: embedsignature = True from pyarrow.includes.libarrow cimport * -cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.includes.parquet cimport * +from pyarrow.includes.libarrow_io cimport RandomAccessFile, WriteableFile +cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.compat import tobytes from pyarrow.error import ArrowException from pyarrow.error cimport check_cstatus +from pyarrow.io import NativeFileInterface from pyarrow.table cimport Table -def read_table(filename, columns=None): +from pyarrow.io cimport NativeFileInterface + +import six + + +cdef class ParquetReader: + cdef: + ParquetAllocator allocator + unique_ptr[FileReader] reader + + def __cinit__(self): + self.allocator.set_pool(default_memory_pool()) + + cdef open_local_file(self, file_path): + cdef c_string path = tobytes(file_path) + + # Must be in one expression to avoid calling std::move which is not + # possible in Cython (due to missing rvalue support) + + # TODO(wesm): ParquetFileReader::OpenFIle can throw? + self.reader = unique_ptr[FileReader]( + new FileReader(default_memory_pool(), + ParquetFileReader.OpenFile(path))) + + cdef open_native_file(self, NativeFileInterface file): + cdef shared_ptr[RandomAccessFile] cpp_handle + file.read_handle(&cpp_handle) + + check_cstatus(OpenFile(cpp_handle, &self.allocator, &self.reader)) + + def read_all(self): + cdef: + Table table = Table() + shared_ptr[CTable] ctable + + with nogil: + check_cstatus(self.reader.get() + .ReadFlatTable(&ctable)) + + table.init(ctable) + return table + + +def read_table(source, columns=None): """ Read a Table from Parquet format Returns ------- table: pyarrow.Table """ - cdef unique_ptr[FileReader] reader - cdef Table table = Table() - cdef shared_ptr[CTable] ctable - - # Must be in one expression to avoid calling std::move which is not possible - # in Cython (due to missing rvalue support) - reader = unique_ptr[FileReader](new FileReader(default_memory_pool(), - ParquetFileReader.OpenFile(tobytes(filename)))) - with nogil: - check_cstatus(reader.get().ReadFlatTable(&ctable)) + cdef ParquetReader reader = ParquetReader() + + if isinstance(source, six.string_types): + reader.open_local_file(source) + elif isinstance(source, NativeFileInterface): + reader.open_native_file(source) + + return reader.read_all() - table.init(ctable) - return table def write_table(table, filename, chunk_size=None, version=None): """ @@ -84,4 +125,3 @@ def write_table(table, filename, chunk_size=None, version=None): with nogil: check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink, chunk_size_, properties_builder.build())) -