From e6724de142cbd20be1f37a586bf0bccf92963ab2 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 11 Jul 2016 23:54:39 -0700 Subject: [PATCH 1/7] Implement alternate ctor to construct parquet::FileReader from an arrow::io::RandomAccessFile Change-Id: I7982556541a60ca03b3064a333b207fd45e323c3 --- cpp/src/arrow/parquet/io.cc | 8 +++++--- cpp/src/arrow/parquet/io.h | 7 ++++--- cpp/src/arrow/parquet/parquet-io-test.cc | 3 +-- cpp/src/arrow/parquet/reader.cc | 17 +++++++++++++++++ cpp/src/arrow/parquet/reader.h | 9 ++++++++- cpp/src/arrow/parquet/writer.cc | 1 - cpp/src/arrow/parquet/writer.h | 2 +- 7 files changed, 36 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/parquet/io.cc b/cpp/src/arrow/parquet/io.cc index c81aa8c4da9..595acfcc4e7 100644 --- a/cpp/src/arrow/parquet/io.cc +++ b/cpp/src/arrow/parquet/io.cc @@ -56,8 +56,10 @@ void ParquetAllocator::Free(uint8_t* buffer, int64_t size) { // ParquetReadSource ParquetReadSource::ParquetReadSource( - const std::shared_ptr& file, ParquetAllocator* allocator) - : file_(file), allocator_(allocator) {} + const std::shared_ptr& file, MemoryPool* pool) + : file_(file) { + allocator_.set_pool(pool); +} void ParquetReadSource::Close() { PARQUET_THROW_NOT_OK(file_->Close()); @@ -82,7 +84,7 @@ int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) { std::shared_ptr<::parquet::Buffer> ParquetReadSource::Read(int64_t nbytes) { // TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests // that there should be more code sharing amongst file-like sources - auto result = std::make_shared<::parquet::OwnedMutableBuffer>(0, allocator_); + auto result = std::make_shared<::parquet::OwnedMutableBuffer>(0, &allocator_); result->Resize(nbytes); int64_t bytes_read = Read(nbytes, result->mutable_data()); diff --git a/cpp/src/arrow/parquet/io.h b/cpp/src/arrow/parquet/io.h index ef8871da4df..0ce7b094dd0 100644 --- a/cpp/src/arrow/parquet/io.h +++ b/cpp/src/arrow/parquet/io.h @@ -49,6 +49,8 @@ class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator { uint8_t* Malloc(int64_t size) override; void Free(uint8_t* buffer, int64_t size) override; + void set_pool(MemoryPool* pool) { pool_ = pool; } + MemoryPool* pool() { return pool_; } private: @@ -57,8 +59,7 @@ class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator { class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource { public: - ParquetReadSource( - const std::shared_ptr& file, ParquetAllocator* allocator); + ParquetReadSource(const std::shared_ptr& file, MemoryPool* pool); void Close() override; int64_t Tell() const override; @@ -71,7 +72,7 @@ class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource { std::shared_ptr file_; // The allocator is required for creating managed buffers - ParquetAllocator* allocator_; + ParquetAllocator allocator_; }; } // namespace parquet diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index 7e724b31e38..0099d2313bb 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -147,9 +147,8 @@ TEST(TestParquetReadSource, Basics) { std::string data = "this is the data"; auto data_buffer = reinterpret_cast(data.c_str()); - ParquetAllocator allocator; auto file = std::make_shared(data_buffer, data.size()); - auto source = std::make_shared(file, &allocator); + auto source = std::make_shared(file, default_memory_pool()); ASSERT_EQ(0, source->Tell()); ASSERT_NO_THROW(source->Seek(5)); diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index c7c400e9573..05a04f52cdb 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -23,6 +23,7 @@ #include #include "arrow/column.h" +#include "arrow/parquet/io.h" #include "arrow/parquet/schema.h" #include "arrow/parquet/utils.h" #include "arrow/schema.h" @@ -35,6 +36,10 @@ using parquet::ColumnReader; using parquet::Repetition; using parquet::TypedColumnReader; +// Help reduce verbosity +using ParquetRAS = parquet::RandomAccessSource; +using ParquetReader = parquet::ParquetFileReader; + namespace arrow { namespace parquet { @@ -181,6 +186,18 @@ FileReader::FileReader( FileReader::~FileReader() {} +// Static ctor +Status Open(MemoryPool* pool, const std::shared_ptr& file, + std::unique_ptr* reader) { + std::unique_ptr source(new ParquetReadSource(file, pool)); + + // TODO(wesm): reader properties + std::unique_ptr pq_reader; + PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(std::move(source))); + reader->reset(new FileReader(pool, std::move(pq_reader))); + return Status::OK(); +} + Status FileReader::GetFlatColumn(int i, std::unique_ptr* out) { return impl_->GetFlatColumn(i, out); } diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h index 2c8a9dfd025..d451f4fdd42 100644 --- a/cpp/src/arrow/parquet/reader.h +++ b/cpp/src/arrow/parquet/reader.h @@ -23,6 +23,8 @@ #include "parquet/api/reader.h" #include "parquet/api/schema.h" +#include "arrow/io/interfaces.h" +#include "arrow/parquet/io.h" #include "arrow/util/visibility.h" namespace arrow { @@ -81,6 +83,11 @@ class FlatColumnReader; // arrays class ARROW_EXPORT FileReader { public: + // Helper function to create a file reader from an implementation of an Arrow + // readable file + static Status Open(MemoryPool* pool, const std::shared_ptr& file, + std::unique_ptr* reader); + FileReader(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); // Since the distribution of columns amongst a Parquet file's row groups may @@ -99,7 +106,7 @@ class ARROW_EXPORT FileReader { virtual ~FileReader(); private: - class Impl; + class PARQUET_NO_EXPORT Impl; std::unique_ptr impl_; }; diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc index 0139edd3bb8..f9514aa2ad2 100644 --- a/cpp/src/arrow/parquet/writer.cc +++ b/cpp/src/arrow/parquet/writer.cc @@ -35,7 +35,6 @@ using parquet::ParquetVersion; using parquet::schema::GroupNode; namespace arrow { - namespace parquet { class FileWriter::Impl { diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h index 45d0fd59868..3860069e547 100644 --- a/cpp/src/arrow/parquet/writer.h +++ b/cpp/src/arrow/parquet/writer.h @@ -55,7 +55,7 @@ class ARROW_EXPORT FileWriter { MemoryPool* memory_pool() const; private: - class Impl; + class PARQUET_NO_EXPORT Impl; std::unique_ptr impl_; }; From c7a913eedbb0a7e3737ec3302dec8f7e93ef21e1 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 12 Jul 2016 00:46:17 -0700 Subject: [PATCH 2/7] Provide a means to expose abstract native file handles Change-Id: I86e43b42582276302332eb3c61afffd6f7187c40 --- cpp/src/arrow/io/interfaces.h | 1 + cpp/src/arrow/parquet/io.cc | 8 ++-- cpp/src/arrow/parquet/io.h | 7 ++-- cpp/src/arrow/parquet/parquet-io-test.cc | 4 +- cpp/src/arrow/parquet/reader.cc | 10 +++-- cpp/src/arrow/parquet/reader.h | 4 +- python/pyarrow/includes/libarrow_io.pxd | 49 +++++++++++++++--------- python/pyarrow/includes/parquet.pxd | 18 ++++++++- python/pyarrow/io.pyx | 21 +++++++++- python/pyarrow/parquet.pyx | 6 +-- 10 files changed, 89 insertions(+), 39 deletions(-) diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index 25361d5633d..c2128525371 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -19,6 +19,7 @@ #define ARROW_IO_INTERFACES_H #include +#include namespace arrow { diff --git a/cpp/src/arrow/parquet/io.cc b/cpp/src/arrow/parquet/io.cc index 595acfcc4e7..c81aa8c4da9 100644 --- a/cpp/src/arrow/parquet/io.cc +++ b/cpp/src/arrow/parquet/io.cc @@ -56,10 +56,8 @@ void ParquetAllocator::Free(uint8_t* buffer, int64_t size) { // ParquetReadSource ParquetReadSource::ParquetReadSource( - const std::shared_ptr& file, MemoryPool* pool) - : file_(file) { - allocator_.set_pool(pool); -} + const std::shared_ptr& file, ParquetAllocator* allocator) + : file_(file), allocator_(allocator) {} void ParquetReadSource::Close() { PARQUET_THROW_NOT_OK(file_->Close()); @@ -84,7 +82,7 @@ int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) { std::shared_ptr<::parquet::Buffer> ParquetReadSource::Read(int64_t nbytes) { // TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests // that there should be more code sharing amongst file-like sources - auto result = std::make_shared<::parquet::OwnedMutableBuffer>(0, &allocator_); + auto result = std::make_shared<::parquet::OwnedMutableBuffer>(0, allocator_); result->Resize(nbytes); int64_t bytes_read = Read(nbytes, result->mutable_data()); diff --git a/cpp/src/arrow/parquet/io.h b/cpp/src/arrow/parquet/io.h index 0ce7b094dd0..7d8a73ed2a9 100644 --- a/cpp/src/arrow/parquet/io.h +++ b/cpp/src/arrow/parquet/io.h @@ -51,7 +51,7 @@ class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator { void set_pool(MemoryPool* pool) { pool_ = pool; } - MemoryPool* pool() { return pool_; } + MemoryPool* pool() const { return pool_; } private: MemoryPool* pool_; @@ -59,7 +59,8 @@ class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator { class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource { public: - ParquetReadSource(const std::shared_ptr& file, MemoryPool* pool); + ParquetReadSource( + const std::shared_ptr& file, ParquetAllocator* allocator); void Close() override; int64_t Tell() const override; @@ -72,7 +73,7 @@ class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource { std::shared_ptr file_; // The allocator is required for creating managed buffers - ParquetAllocator allocator_; + ParquetAllocator* allocator_; }; } // namespace parquet diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index 0099d2313bb..e7b5fbd92c6 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -147,8 +147,10 @@ TEST(TestParquetReadSource, Basics) { std::string data = "this is the data"; auto data_buffer = reinterpret_cast(data.c_str()); + ParquetAllocator allocator(default_memory_pool()); + auto file = std::make_shared(data_buffer, data.size()); - auto source = std::make_shared(file, default_memory_pool()); + auto source = std::make_shared(file, &allocator); ASSERT_EQ(0, source->Tell()); ASSERT_NO_THROW(source->Seek(5)); diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 05a04f52cdb..007c60b971b 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -187,14 +187,16 @@ FileReader::FileReader( FileReader::~FileReader() {} // Static ctor -Status Open(MemoryPool* pool, const std::shared_ptr& file, - std::unique_ptr* reader) { - std::unique_ptr source(new ParquetReadSource(file, pool)); +Status Open(const std::shared_ptr& file, + ParquetAllocator* allocator, std::unique_ptr* reader) { + std::unique_ptr source(new ParquetReadSource(file, allocator)); // TODO(wesm): reader properties std::unique_ptr pq_reader; PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(std::move(source))); - reader->reset(new FileReader(pool, std::move(pq_reader))); + + // Use the same memory pool as the ParquetAllocator + reader->reset(new FileReader(allocator->pool(), std::move(pq_reader))); return Status::OK(); } diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h index d451f4fdd42..50900214804 100644 --- a/cpp/src/arrow/parquet/reader.h +++ b/cpp/src/arrow/parquet/reader.h @@ -85,8 +85,8 @@ class ARROW_EXPORT FileReader { public: // Helper function to create a file reader from an implementation of an Arrow // readable file - static Status Open(MemoryPool* pool, const std::shared_ptr& file, - std::unique_ptr* reader); + static Status Open(const std::shared_ptr& file, + ParquetAllocator* allocator, std::unique_ptr* reader); FileReader(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd index d0fb8f9f000..734ace6c923 100644 --- a/python/pyarrow/includes/libarrow_io.pxd +++ b/python/pyarrow/includes/libarrow_io.pxd @@ -19,11 +19,37 @@ from pyarrow.includes.common cimport * -cdef extern from "arrow/io/interfaces.h" nogil: +cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil: + enum FileMode" arrow::io::FileMode::type": + FileMode_READ" arrow::io::FileMode::READ" + FileMode_WRITE" arrow::io::FileMode::WRITE" + FileMode_READWRITE" arrow::io::FileMode::READWRITE" + enum ObjectType" arrow::io::ObjectType::type": ObjectType_FILE" arrow::io::ObjectType::FILE" ObjectType_DIRECTORY" arrow::io::ObjectType::DIRECTORY" + cdef cppclass FileBase: + CStatus Close() + CStatus Tell(int64_t* position) + + cdef cppclass ReadableFile(FileBase): + CStatus GetSize(int64_t* size) + CStatus Read(int64_t nbytes, int64_t* bytes_read, + uint8_t* buffer) + + CStatus ReadAt(int64_t position, int64_t nbytes, + int64_t* bytes_read, uint8_t* buffer) + + cdef cppclass RandomAccessFile(ReadableFile): + CStatus Seek(int64_t position) + + cdef cppclass WriteableFile(FileBase): + CStatus Write(const uint8_t* buffer, int64_t nbytes) + # CStatus Write(const uint8_t* buffer, int64_t nbytes, + # int64_t* bytes_written) + + cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: CStatus ConnectLibHdfs() @@ -44,24 +70,11 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: int64_t block_size int16_t permissions - cdef cppclass CHdfsFile: - CStatus Close() - CStatus Seek(int64_t position) - CStatus Tell(int64_t* position) - - cdef cppclass HdfsReadableFile(CHdfsFile): - CStatus GetSize(int64_t* size) - CStatus Read(int64_t nbytes, int64_t* bytes_read, - uint8_t* buffer) - - CStatus ReadAt(int64_t position, int64_t nbytes, - int64_t* bytes_read, uint8_t* buffer) - - cdef cppclass HdfsWriteableFile(CHdfsFile): - CStatus Write(const uint8_t* buffer, int64_t nbytes) + cdef cppclass HdfsReadableFile(RandomAccessFile): + pass - CStatus Write(const uint8_t* buffer, int64_t nbytes, - int64_t* bytes_written) + cdef cppclass HdfsWriteableFile(WriteableFile): + pass cdef cppclass CHdfsClient" arrow::io::HdfsClient": @staticmethod diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index a2f83ea5ea5..3ff5fd424ea 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -19,6 +19,7 @@ from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool +from pyarrow.includes.libarrow_io cimport RandomAccessFile cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: @@ -90,8 +91,24 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: shared_ptr[WriterProperties] build() +cdef extern from "arrow/parquet/io.h" namespace "arrow::parquet" nogil: + cdef cppclass ParquetAllocator: + ParquetAllocator() + ParquetAllocator(MemoryPool* pool) + MemoryPool* pool() + + cdef cppclass ParquetReadSource: + ParquetReadSource(const shared_ptr[RandomAccessFile]& file, + ParquetAllocator* allocator) + + cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil: cdef cppclass FileReader: + @staticmethod + CStatus Open(const shared_ptr[RandomAccessFile]& file, + ParquetAllocator* allocator, + unique_ptr[FileReader]* reader) + FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader) CStatus ReadFlatTable(shared_ptr[CTable]* out); @@ -105,4 +122,3 @@ cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil: cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool, const shared_ptr[OutputStream]& sink, int64_t chunk_size, const shared_ptr[WriterProperties]& properties) - diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 071eea5ba6e..aa1c79afea8 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -164,7 +164,7 @@ cdef class HdfsClient: .ListDirectory(c_path, &listing)) cdef const HdfsPathInfo* info - for i in range(listing.size()): + for i in range( listing.size()): info = &listing[i] # Try to trim off the hdfs://HOST:PORT piece @@ -314,8 +314,15 @@ cdef class HdfsClient: f = self.open(path, 'rb', buffer_size=buffer_size) f.download(stream) +cdef class NativeFileInterface: -cdef class HdfsFile: + cdef int read_handle(self, shared_ptr[RandomAccessFile]* file) except -1: + raise NotImplementedError + + cdef int write_handle(self, shared_ptr[WriteableFile]* file) except -1: + raise NotImplementedError + +cdef class HdfsFile(NativeFileInterface): cdef: shared_ptr[HdfsReadableFile] rd_file shared_ptr[HdfsWriteableFile] wr_file @@ -357,6 +364,16 @@ cdef class HdfsFile: if self.is_readonly: raise IOError("only valid on writeonly files") + cdef int read_handle(self, shared_ptr[RandomAccessFile]* file) except -1: + self._assert_readable() + file[0] = self.rd_file + return 0 + + cdef int write_handle(self, shared_ptr[WriteableFile]* file) except -1: + self._assert_writeable() + file[0] = self.wr_file + return 0 + def size(self): cdef int64_t size self._assert_readable() diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index 0b2b2088033..da9ff6dc655 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -39,10 +39,11 @@ def read_table(filename, columns=None): cdef Table table = Table() cdef shared_ptr[CTable] ctable - # Must be in one expression to avoid calling std::move which is not possible - # in Cython (due to missing rvalue support) + # Must be in one expression to avoid calling std::move which is not + # possible in Cython (due to missing rvalue support) reader = unique_ptr[FileReader](new FileReader(default_memory_pool(), ParquetFileReader.OpenFile(tobytes(filename)))) + with nogil: check_cstatus(reader.get().ReadFlatTable(&ctable)) @@ -84,4 +85,3 @@ def write_table(table, filename, chunk_size=None, version=None): with nogil: check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink, chunk_size_, properties_builder.build())) - From 06ddd06331bbc784dee9e78a1bccfa96f77052ff Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 12 Jul 2016 13:52:14 -0700 Subject: [PATCH 3/7] Slight refactoring of read table to be able to also handle classes wrapping C++ file interfaces Change-Id: I4a3d0c4d2a763abb02ca546df35b9556f1060c0e --- cpp/src/arrow/parquet/reader.cc | 2 +- cpp/src/arrow/parquet/reader.h | 16 +++---- cpp/src/arrow/parquet/writer.h | 2 +- python/pyarrow/includes/parquet.pxd | 12 +++--- python/pyarrow/io.pyx | 10 ++--- python/pyarrow/parquet.pyx | 66 +++++++++++++++++++++++------ 6 files changed, 74 insertions(+), 34 deletions(-) diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 007c60b971b..83a529c4344 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -187,7 +187,7 @@ FileReader::FileReader( FileReader::~FileReader() {} // Static ctor -Status Open(const std::shared_ptr& file, +Status OpenFile(const std::shared_ptr& file, ParquetAllocator* allocator, std::unique_ptr* reader) { std::unique_ptr source(new ParquetReadSource(file, allocator)); diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h index 50900214804..f1492f64521 100644 --- a/cpp/src/arrow/parquet/reader.h +++ b/cpp/src/arrow/parquet/reader.h @@ -83,11 +83,6 @@ class FlatColumnReader; // arrays class ARROW_EXPORT FileReader { public: - // Helper function to create a file reader from an implementation of an Arrow - // readable file - static Status Open(const std::shared_ptr& file, - ParquetAllocator* allocator, std::unique_ptr* reader); - FileReader(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); // Since the distribution of columns amongst a Parquet file's row groups may @@ -106,7 +101,7 @@ class ARROW_EXPORT FileReader { virtual ~FileReader(); private: - class PARQUET_NO_EXPORT Impl; + class ARROW_NO_EXPORT Impl; std::unique_ptr impl_; }; @@ -132,15 +127,20 @@ class ARROW_EXPORT FlatColumnReader { Status NextBatch(int batch_size, std::shared_ptr* out); private: - class Impl; + class ARROW_NO_EXPORT Impl; std::unique_ptr impl_; explicit FlatColumnReader(std::unique_ptr impl); friend class FileReader; }; -} // namespace parquet +// Helper function to create a file reader from an implementation of an Arrow +// readable file +ARROW_EXPORT +Status OpenFile(const std::shared_ptr& file, + ParquetAllocator* allocator, std::unique_ptr* reader); +} // namespace parquet } // namespace arrow #endif // ARROW_PARQUET_READER_H diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h index 3860069e547..5aa1ba58717 100644 --- a/cpp/src/arrow/parquet/writer.h +++ b/cpp/src/arrow/parquet/writer.h @@ -55,7 +55,7 @@ class ARROW_EXPORT FileWriter { MemoryPool* memory_pool() const; private: - class PARQUET_NO_EXPORT Impl; + class ARROW_NO_EXPORT Impl; std::unique_ptr impl_; }; diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index 3ff5fd424ea..3f7b87ed819 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -96,6 +96,7 @@ cdef extern from "arrow/parquet/io.h" namespace "arrow::parquet" nogil: ParquetAllocator() ParquetAllocator(MemoryPool* pool) MemoryPool* pool() + void set_pool(MemoryPool* pool) cdef cppclass ParquetReadSource: ParquetReadSource(const shared_ptr[RandomAccessFile]& file, @@ -103,19 +104,20 @@ cdef extern from "arrow/parquet/io.h" namespace "arrow::parquet" nogil: cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil: - cdef cppclass FileReader: - @staticmethod - CStatus Open(const shared_ptr[RandomAccessFile]& file, + CStatus OpenFile(const shared_ptr[RandomAccessFile]& file, ParquetAllocator* allocator, unique_ptr[FileReader]* reader) + cdef cppclass FileReader: FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader) CStatus ReadFlatTable(shared_ptr[CTable]* out); cdef extern from "arrow/parquet/schema.h" namespace "arrow::parquet" nogil: - CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema, shared_ptr[CSchema]* out) - CStatus ToParquetSchema(const CSchema* arrow_schema, shared_ptr[SchemaDescriptor]* out) + CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema, + shared_ptr[CSchema]* out) + CStatus ToParquetSchema(const CSchema* arrow_schema, + shared_ptr[SchemaDescriptor]* out) cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil: diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index aa1c79afea8..b8bf8835620 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -316,10 +316,10 @@ cdef class HdfsClient: cdef class NativeFileInterface: - cdef int read_handle(self, shared_ptr[RandomAccessFile]* file) except -1: + cdef read_handle(self, shared_ptr[RandomAccessFile]* file): raise NotImplementedError - cdef int write_handle(self, shared_ptr[WriteableFile]* file) except -1: + cdef write_handle(self, shared_ptr[WriteableFile]* file): raise NotImplementedError cdef class HdfsFile(NativeFileInterface): @@ -364,15 +364,13 @@ cdef class HdfsFile(NativeFileInterface): if self.is_readonly: raise IOError("only valid on writeonly files") - cdef int read_handle(self, shared_ptr[RandomAccessFile]* file) except -1: + cdef read_handle(self, shared_ptr[RandomAccessFile]* file): self._assert_readable() file[0] = self.rd_file - return 0 - cdef int write_handle(self, shared_ptr[WriteableFile]* file) except -1: + cdef write_handle(self, shared_ptr[WriteableFile]* file): self._assert_writeable() file[0] = self.wr_file - return 0 def size(self): cdef int64_t size diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index da9ff6dc655..ebba1a17ac7 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -20,35 +20,75 @@ # cython: embedsignature = True from pyarrow.includes.libarrow cimport * -cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.includes.parquet cimport * +from pyarrow.includes.libarrow_io cimport RandomAccessFile, WriteableFile +cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.compat import tobytes from pyarrow.error import ArrowException from pyarrow.error cimport check_cstatus +from pyarrow.io import NativeFileInterface from pyarrow.table cimport Table -def read_table(filename, columns=None): +from pyarrow.io cimport NativeFileInterface + +import six + + +cdef class ParquetReader: + cdef: + ParquetAllocator allocator + unique_ptr[FileReader] reader + + def __cinit__(self): + self.allocator.set_pool(default_memory_pool()) + + cdef open_local_file(self, file_path): + cdef c_string path = tobytes(file_path) + + # Must be in one expression to avoid calling std::move which is not + # possible in Cython (due to missing rvalue support) + + # TODO(wesm): ParquetFileReader::OpenFIle can throw? + self.reader = unique_ptr[FileReader]( + new FileReader(default_memory_pool(), + ParquetFileReader.OpenFile(path))) + + cdef open_native_file(self, NativeFileInterface file): + cdef shared_ptr[RandomAccessFile] cpp_handle + file.read_handle(&cpp_handle) + + check_cstatus(OpenFile(cpp_handle, &self.allocator, &self.reader)) + + def read_all(self): + cdef: + Table table = Table() + shared_ptr[CTable] ctable + + with nogil: + check_cstatus(self.reader.get() + .ReadFlatTable(&ctable)) + + table.init(ctable) + return table + + +def read_table(source, columns=None): """ Read a Table from Parquet format Returns ------- table: pyarrow.Table """ - cdef unique_ptr[FileReader] reader - cdef Table table = Table() - cdef shared_ptr[CTable] ctable + cdef ParquetReader reader = ParquetReader() - # Must be in one expression to avoid calling std::move which is not - # possible in Cython (due to missing rvalue support) - reader = unique_ptr[FileReader](new FileReader(default_memory_pool(), - ParquetFileReader.OpenFile(tobytes(filename)))) + if isinstance(source, six.string_types): + reader.open_local_file(source) + elif isinstance(source, NativeFileInterface): + reader.open_native_file(source) - with nogil: - check_cstatus(reader.get().ReadFlatTable(&ctable)) + return reader.read_all() - table.init(ctable) - return table def write_table(table, filename, chunk_size=None, version=None): """ From 9b9d94db6740885f599a80d3babb2c7f076219c7 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 12 Jul 2016 16:19:26 -0700 Subject: [PATCH 4/7] Barely working direct HDFS-Parquet reads Change-Id: I3f6329e159431df781959d6266b4e016e4f6fa2c --- cpp/src/arrow/parquet/io.cc | 14 +++++++++++--- cpp/src/arrow/parquet/io.h | 6 ++++-- cpp/src/arrow/parquet/parquet-io-test.cc | 5 ++++- cpp/src/arrow/parquet/reader.cc | 3 ++- python/pyarrow/includes/parquet.pxd | 4 ++-- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/parquet/io.cc b/cpp/src/arrow/parquet/io.cc index c81aa8c4da9..5dce6a9f175 100644 --- a/cpp/src/arrow/parquet/io.cc +++ b/cpp/src/arrow/parquet/io.cc @@ -55,9 +55,17 @@ void ParquetAllocator::Free(uint8_t* buffer, int64_t size) { // ---------------------------------------------------------------------- // ParquetReadSource -ParquetReadSource::ParquetReadSource( - const std::shared_ptr& file, ParquetAllocator* allocator) - : file_(file), allocator_(allocator) {} +ParquetReadSource::ParquetReadSource(ParquetAllocator* allocator) + : file_(nullptr), allocator_(allocator) {} + +Status ParquetReadSource::Open(const std::shared_ptr& file) { + int64_t file_size; + RETURN_NOT_OK(file->GetSize(&file_size)); + + file_ = file; + size_ = file_size; + return Status::OK(); +} void ParquetReadSource::Close() { PARQUET_THROW_NOT_OK(file_->Close()); diff --git a/cpp/src/arrow/parquet/io.h b/cpp/src/arrow/parquet/io.h index 7d8a73ed2a9..54b2ca1c3e2 100644 --- a/cpp/src/arrow/parquet/io.h +++ b/cpp/src/arrow/parquet/io.h @@ -59,8 +59,10 @@ class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator { class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource { public: - ParquetReadSource( - const std::shared_ptr& file, ParquetAllocator* allocator); + ParquetReadSource(ParquetAllocator* allocator); + + // We need to ask for the file size on opening the file, and this can fail + Status Open(const std::shared_ptr& file); void Close() override; int64_t Tell() const override; diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index e7b5fbd92c6..6615457c483 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -23,6 +23,7 @@ #include "gtest/gtest.h" #include "arrow/parquet/io.h" +#include "arrow/test-util.h" #include "arrow/util/memory-pool.h" #include "arrow/util/status.h" @@ -150,7 +151,9 @@ TEST(TestParquetReadSource, Basics) { ParquetAllocator allocator(default_memory_pool()); auto file = std::make_shared(data_buffer, data.size()); - auto source = std::make_shared(file, &allocator); + auto source = std::make_shared(&allocator); + + ASSERT_OK(source->Open(file)); ASSERT_EQ(0, source->Tell()); ASSERT_NO_THROW(source->Seek(5)); diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 83a529c4344..e92967e5363 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -189,7 +189,8 @@ FileReader::~FileReader() {} // Static ctor Status OpenFile(const std::shared_ptr& file, ParquetAllocator* allocator, std::unique_ptr* reader) { - std::unique_ptr source(new ParquetReadSource(file, allocator)); + std::unique_ptr source(new ParquetReadSource(allocator)); + RETURN_NOT_OK(source->Open(file)); // TODO(wesm): reader properties std::unique_ptr pq_reader; diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index 3f7b87ed819..fe24f593e32 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -99,8 +99,8 @@ cdef extern from "arrow/parquet/io.h" namespace "arrow::parquet" nogil: void set_pool(MemoryPool* pool) cdef cppclass ParquetReadSource: - ParquetReadSource(const shared_ptr[RandomAccessFile]& file, - ParquetAllocator* allocator) + ParquetReadSource(ParquetAllocator* allocator) + Open(const shared_ptr[RandomAccessFile]& file) cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil: From 94bcd30cd2a7bac0a64a8c2449244ddea78987bc Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 12 Jul 2016 17:34:06 -0700 Subject: [PATCH 5/7] Do not let Parquet close an Arrow file Change-Id: I8e3a1f90907357d138d875b2761a7833b069b86f --- cpp/src/arrow/parquet/io.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/parquet/io.cc b/cpp/src/arrow/parquet/io.cc index 5dce6a9f175..b6fdd67d15b 100644 --- a/cpp/src/arrow/parquet/io.cc +++ b/cpp/src/arrow/parquet/io.cc @@ -68,7 +68,10 @@ Status ParquetReadSource::Open(const std::shared_ptr& file } void ParquetReadSource::Close() { - PARQUET_THROW_NOT_OK(file_->Close()); + // TODO(wesm): Make this a no-op for now. This leaves Python wrappers for + // these classes in a borked state. Probably better to explicitly close. + + // PARQUET_THROW_NOT_OK(file_->Close()); } int64_t ParquetReadSource::Tell() const { From f2cd77febc36e861c2064d44d7e56d04806be901 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 16 Jul 2016 20:39:33 -0700 Subject: [PATCH 6/7] Check in io.pxd Change-Id: I2df54d0dc25457055011cd8a2b798fc28b1640d1 --- python/pyarrow/io.pxd | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 python/pyarrow/io.pxd diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd new file mode 100644 index 00000000000..b92af72704a --- /dev/null +++ b/python/pyarrow/io.pxd @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# distutils: language = c++ + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_io cimport RandomAccessFile, WriteableFile + + +cdef class NativeFileInterface: + + # By implementing these "virtual" functions (all functions in Cython + # extension classes are technically virtual in the C++ sense)m we can + # expose the arrow::io abstract file interfaces to other components + # throughout the suite of Arrow C++ libraries + cdef read_handle(self, shared_ptr[RandomAccessFile]* file) + cdef write_handle(self, shared_ptr[WriteableFile]* file) From 73648e00e074ca58ca3d6a481bd247efbbc081c9 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 18 Jul 2016 15:17:34 -0700 Subject: [PATCH 7/7] cpplint Change-Id: Icf2093b4a379bf159b3b1ecce119c7fde77c96ef --- cpp/src/arrow/parquet/io.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/parquet/io.h b/cpp/src/arrow/parquet/io.h index 54b2ca1c3e2..1c59695c6c1 100644 --- a/cpp/src/arrow/parquet/io.h +++ b/cpp/src/arrow/parquet/io.h @@ -59,7 +59,7 @@ class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator { class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource { public: - ParquetReadSource(ParquetAllocator* allocator); + explicit ParquetReadSource(ParquetAllocator* allocator); // We need to ask for the file size on opening the file, and this can fail Status Open(const std::shared_ptr& file);