From 20a3f28744d437dcc677fb7b346ffcea7df0ad40 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 1 Oct 2016 16:53:43 -0400 Subject: [PATCH 01/11] Draft API for Arrow IO wrappers for Python files Change-Id: I712800f68cd0c9b0de1cca0d511f3f7d297c05b2 --- python/src/pyarrow/common.h | 2 - python/src/pyarrow/io.cc | 90 +++++++++++++++++++++++++++++++++++++ python/src/pyarrow/io.h | 63 ++++++++++++++++++++++++++ 3 files changed, 153 insertions(+), 2 deletions(-) create mode 100644 python/src/pyarrow/io.cc create mode 100644 python/src/pyarrow/io.h diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h index fb0ba3e4822..3640f63bfcd 100644 --- a/python/src/pyarrow/common.h +++ b/python/src/pyarrow/common.h @@ -19,9 +19,7 @@ #define PYARROW_COMMON_H #include "pyarrow/config.h" - #include "arrow/util/buffer.h" - #include "pyarrow/visibility.h" namespace arrow { class MemoryPool; } diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc new file mode 100644 index 00000000000..8608a0e75c4 --- /dev/null +++ b/python/src/pyarrow/io.cc @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pyarrow/common.h" + +#include +#include + +#include +#include +#include "pyarrow/status.h" + +namespace pyarrow { + +// ---------------------------------------------------------------------- +// Seekable input stream + +PyReadableFile::PyReadableFile(PyObject* file) + : file_(file) { + Py_INCREF(file); +} + +PyReadableFile::~PyReadableFile() { + Py_DECREF(); +} + +arrow::Status PyReadableFile::ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) override { + + return Status::OK(); +} + +arrow::Status PyReadableFile::GetSize(int64_t* size) override { + return Status::OK(); +} + + // Does not copy if not necessary +Status PyReadableFile::ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out) override { + + return Status::OK(); +} + +bool PyReadableFile::supports_zero_copy() const { + return false; +} + +// ---------------------------------------------------------------------- +// Output stream + +PyOutputStream::PyOutputStream(PyObject* file) + : file_(file) { + Py_INCREF(file); +} + +PyOutputStream::~PyOutputStream() { + Py_DECREF(file_); +} + +Status PyOutputStream::Close() override { + return arrow::Status::OK(); +} + +Status PyOutputStream::Tell(int64_t* position) { + return arrow::Status::OK(); +} + +Status PyOutputStream::Write(const uint8_t* data, int64_t nbytes) { + return arrow::Status::OK(); +} + +private: + PyObject* file_; +}; + +} // namespace pyarrow diff --git a/python/src/pyarrow/io.h b/python/src/pyarrow/io.h new file mode 100644 index 00000000000..3220b3667cd --- /dev/null +++ b/python/src/pyarrow/io.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PYARROW_COMMON_H +#define PYARROW_COMMON_H + +#include "arrow/io/interfaces.h" + +#include "pyarrow/config.h" +#include "pyarrow/visibility.h" + +namespace arrow { class MemoryPool; } + +namespace pyarrow { + +class PYARROW_EXPORT PyReadableFile : public arrow::io::ReadableFileInterface { +public: + explicit PyReadableFile(PyObject* file); + ~PyReadableFile(); + + arrow::Status ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) override; + + arrow::Status GetSize(int64_t* size) override; + + // Does not copy if not necessary + arrow::Status ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out) override; + + bool supports_zero_copy() const override; + +private: + PyObject* file_; +}; + +class PYARROW_EXPORT PyOutputStream : public arrow::io::OutputStream { +public: + explicit PyOutputStream(PyObject* file); + ~PyOutputStream(); + + arrow::Status Close() override; + arrow::Status Tell(int64_t* position) override; + arrow::Status Write(const uint8_t* data, int64_t nbytes) override; + +private: + PyObject* file_; +}; + +} // namespace pyarrow From 8be433fd94056c06a3dace970a0ae3e6ea588a3d Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 1 Oct 2016 18:29:01 -0400 Subject: [PATCH 02/11] Draft PyReadableFile implementation, not yet tested Change-Id: I2feee0fd6a6faa5c486ab4699a7cdc8c042d5726 --- python/CMakeLists.txt | 1 + python/src/pyarrow/common.cc | 15 +++++ python/src/pyarrow/common.h | 28 ++++++++- python/src/pyarrow/io.cc | 111 +++++++++++++++++++++++++++++------ python/src/pyarrow/io.h | 10 +++- 5 files changed, 142 insertions(+), 23 deletions(-) diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 6357e3c1725..77a771ab21c 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -432,6 +432,7 @@ set(PYARROW_SRCS src/pyarrow/common.cc src/pyarrow/config.cc src/pyarrow/helpers.cc + src/pyarrow/io.cc src/pyarrow/status.cc src/pyarrow/adapters/builtin.cc diff --git a/python/src/pyarrow/common.cc b/python/src/pyarrow/common.cc index a2748f99b67..a940149efa0 100644 --- a/python/src/pyarrow/common.cc +++ b/python/src/pyarrow/common.cc @@ -68,4 +68,19 @@ arrow::MemoryPool* GetMemoryPool() { return &memory_pool; } +// ---------------------------------------------------------------------- +// PyBytesBuffer + +PyBytesBuffer::PyBytesBuffer(PyObject* obj) + : Buffer(reinterpret_cast(PyBytes_AS_STRING(obj_)), + PyBytes_GET_SIZE(obj_)), + obj_(obj) { + Py_INCREF(obj_); +} + +PyBytesBuffer::~PyBytesBuffer() { + PyAcquireGIL_RAII lock; + Py_DECREF(obj_); +} + } // namespace pyarrow diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h index 3640f63bfcd..132abd8c217 100644 --- a/python/src/pyarrow/common.h +++ b/python/src/pyarrow/common.h @@ -20,6 +20,7 @@ #include "pyarrow/config.h" #include "arrow/util/buffer.h" +#include "arrow/util/macros.h" #include "pyarrow/visibility.h" namespace arrow { class MemoryPool; } @@ -81,6 +82,20 @@ struct PyObjectStringify { } }; +class PyAcquireGIL_RAII { + public: + PyAcquireGIL_RAII() { + state_ = PyGILState_Ensure(); + } + + ~PyAcquireGIL_RAII() { + PyGILState_Release(state_); + } + private: + PyGILState_STATE state_; + DISALLOW_COPY_AND_ASSIGN(PyAcquireGIL_RAII); +}; + // TODO(wesm): We can just let errors pass through. To be explored later #define RETURN_IF_PYERROR() \ if (PyErr_Occurred()) { \ @@ -98,8 +113,8 @@ PYARROW_EXPORT arrow::MemoryPool* GetMemoryPool(); class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer { public: - NumPyBuffer(PyArrayObject* arr) : - Buffer(nullptr, 0) { + NumPyBuffer(PyArrayObject* arr) + : Buffer(nullptr, 0) { arr_ = arr; Py_INCREF(arr); @@ -115,6 +130,15 @@ class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer { PyArrayObject* arr_; }; +class PYARROW_EXPORT PyBytesBuffer : public arrow::Buffer { + public: + PyBytesBuffer(PyObject* obj); + virtual ~PyBytesBuffer(); + + private: + PyObject* obj_; +}; + } // namespace pyarrow #endif // PYARROW_COMMON_H diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc index 8608a0e75c4..5667045142f 100644 --- a/python/src/pyarrow/io.cc +++ b/python/src/pyarrow/io.cc @@ -15,13 +15,15 @@ // specific language governing permissions and limitations // under the License. -#include "pyarrow/common.h" +#include "pyarrow/io.h" #include #include #include #include + +#include "pyarrow/common.h" #include "pyarrow/status.h" namespace pyarrow { @@ -31,28 +33,105 @@ namespace pyarrow { PyReadableFile::PyReadableFile(PyObject* file) : file_(file) { - Py_INCREF(file); + Py_INCREF(file_); } PyReadableFile::~PyReadableFile() { - Py_DECREF(); + Py_DECREF(file_); +} + +#define ARROW_RETURN_IF_PYERROR() \ + if (PyErr_Occurred()) { \ + PyObject *exc_type, *exc_value, *traceback; \ + PyErr_Fetch(&exc_type, &exc_value, &traceback); \ + PyObjectStringify stringified(exc_value); \ + std::string message(stringified.bytes); \ + Py_DECREF(exc_type); \ + Py_DECREF(exc_value); \ + Py_DECREF(traceback); \ + PyErr_Clear(); \ + return arrow::Status::IOError(message); \ + } + +static arrow::Status SeekNoGIL(PyObject* file, int64_t position, int whence) { + // whence: 0 for relative to start of file, 2 for end of file + PyObject* result = PyObject_CallMethod(file, "seek", "(i)", position); + Py_XDECREF(result); + ARROW_RETURN_IF_PYERROR(); + return arrow::Status::OK(); +} + +static arrow::Status ReadNoGIL(PyObject* file, int64_t nbytes, PyObject** out) { + PyObject* result = PyObject_CallMethod(file, "read", "(i)", nbytes); + ARROW_RETURN_IF_PYERROR(); + *out = result; + return arrow::Status::OK(); +} + +static arrow::Status TellNoGIL(PyObject* file, int64_t* position) { + PyObject* result = PyObject_CallMethod(file, "tell", "()"); + ARROW_RETURN_IF_PYERROR(); + + *position = PyLong_AsLongLong(result); + Py_DECREF(result); + + // PyLong_AsLongLong can raise OverflowError + ARROW_RETURN_IF_PYERROR(); + + return arrow::Status::OK(); +} + +arrow::Status PyReadableFile::Seek(int64_t position) { + PyAcquireGIL_RAII lock; + return SeekNoGIL(file_, position, 0); } arrow::Status PyReadableFile::ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) override { + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) { + PyAcquireGIL_RAII lock; + ARROW_RETURN_NOT_OK(SeekNoGIL(file_, position, 0)); + + PyObject* bytes_obj; + ARROW_RETURN_NOT_OK(ReadNoGIL(file_, nbytes, &bytes_obj)); + + *bytes_read = PyBytes_GET_SIZE(bytes_obj); + std::memcpy(out, PyBytes_AS_STRING(bytes_obj), *bytes_read); + Py_DECREF(bytes_obj); - return Status::OK(); + return arrow::Status::OK(); } -arrow::Status PyReadableFile::GetSize(int64_t* size) override { - return Status::OK(); +arrow::Status PyReadableFile::GetSize(int64_t* size) { + PyAcquireGIL_RAII lock; + + int64_t current_position;; + ARROW_RETURN_NOT_OK(TellNoGIL(file_, ¤t_position)); + + ARROW_RETURN_NOT_OK(SeekNoGIL(file_, 0, 2)); + + int64_t file_size; + ARROW_RETURN_NOT_OK(TellNoGIL(file_, &file_size)); + + // Restore previous file position + ARROW_RETURN_NOT_OK(SeekNoGIL(file_, current_position, 0)); + + *size = file_size; + return arrow::Status::OK(); } - // Does not copy if not necessary -Status PyReadableFile::ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) override { +// Does not copy if not necessary +arrow::Status PyReadableFile::ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out) { + PyAcquireGIL_RAII lock; + ARROW_RETURN_NOT_OK(SeekNoGIL(file_, position, 0)); + + PyObject* bytes_obj; + ARROW_RETURN_NOT_OK(ReadNoGIL(file_, nbytes, &bytes_obj)); + + *out = std::make_shared(bytes_obj); + Py_DECREF(bytes_obj); - return Status::OK(); + return arrow::Status::OK(); } bool PyReadableFile::supports_zero_copy() const { @@ -71,20 +150,16 @@ PyOutputStream::~PyOutputStream() { Py_DECREF(file_); } -Status PyOutputStream::Close() override { +arrow::Status PyOutputStream::Close() { return arrow::Status::OK(); } -Status PyOutputStream::Tell(int64_t* position) { +arrow::Status PyOutputStream::Tell(int64_t* position) { return arrow::Status::OK(); } -Status PyOutputStream::Write(const uint8_t* data, int64_t nbytes) { +arrow::Status PyOutputStream::Write(const uint8_t* data, int64_t nbytes) { return arrow::Status::OK(); } -private: - PyObject* file_; -}; - } // namespace pyarrow diff --git a/python/src/pyarrow/io.h b/python/src/pyarrow/io.h index 3220b3667cd..c2211434ef7 100644 --- a/python/src/pyarrow/io.h +++ b/python/src/pyarrow/io.h @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -#ifndef PYARROW_COMMON_H -#define PYARROW_COMMON_H +#ifndef PYARROW_IO_H +#define PYARROW_IO_H #include "arrow/io/interfaces.h" @@ -39,7 +39,9 @@ class PYARROW_EXPORT PyReadableFile : public arrow::io::ReadableFileInterface { // Does not copy if not necessary arrow::Status ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) override; + int64_t position, int64_t nbytes, std::shared_ptr* out) override; + + arrow::Status Seek(int64_t position) override; bool supports_zero_copy() const override; @@ -61,3 +63,5 @@ class PYARROW_EXPORT PyOutputStream : public arrow::io::OutputStream { }; } // namespace pyarrow + +#endif // PYARROW_IO_H From 737a8dbee4ebff904e9aa5ac36e4fbc55b561370 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 2 Oct 2016 12:48:12 -0400 Subject: [PATCH 03/11] Refactoring, more code sharing with native file interfaces Change-Id: Ie9f6f13d7e1fb55e948da2a7709864978df7306a --- python/pyarrow/__init__.py | 5 +- python/pyarrow/array.pyx | 31 ------- python/pyarrow/error.pxd | 4 +- python/pyarrow/error.pyx | 2 +- python/pyarrow/includes/libarrow_io.pxd | 19 ++++ python/pyarrow/includes/pyarrow.pxd | 31 +++++-- python/pyarrow/io.pxd | 13 ++- python/pyarrow/io.pyx | 79 ++++++++--------- python/pyarrow/parquet.pyx | 8 +- python/pyarrow/table.pyx | 37 +++++++- python/src/pyarrow/io.cc | 111 +++++++++++++++--------- python/src/pyarrow/io.h | 25 +++++- 12 files changed, 226 insertions(+), 139 deletions(-) diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 40a09c2feae..7561f6d46df 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -41,6 +41,5 @@ list_, struct, field, DataType, Field, Schema, schema) -from pyarrow.array import RowBatch, from_pandas_dataframe - -from pyarrow.table import Column, Table +from pyarrow.array import RowBatch +from pyarrow.table import Column, Table, from_pandas_dataframe diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx index 5229b429f58..cdbe73ad21f 100644 --- a/python/pyarrow/array.pyx +++ b/python/pyarrow/array.pyx @@ -35,7 +35,6 @@ from pyarrow.scalar import NA from pyarrow.schema cimport Schema import pyarrow.schema as schema -from pyarrow.table cimport Table def total_allocated_bytes(): cdef MemoryPool* pool = pyarrow.GetMemoryPool() @@ -254,35 +253,6 @@ def from_pandas_series(object series, object mask=None, timestamps_to_ms=False): return box_arrow_array(out) -def from_pandas_dataframe(object df, name=None, timestamps_to_ms=False): - """ - Convert pandas.DataFrame to an Arrow Table - - Parameters - ---------- - df: pandas.DataFrame - - name: str - - timestamps_to_ms: bool - Convert datetime columns to ms resolution. This is needed for - compability with other functionality like Parquet I/O which - only supports milliseconds. - """ - cdef: - list names = [] - list arrays = [] - - for name in df.columns: - col = df[name] - arr = from_pandas_series(col, timestamps_to_ms=timestamps_to_ms) - - names.append(name) - arrays.append(arr) - - return Table.from_arrays(names, arrays, name=name) - - cdef object series_as_ndarray(object obj): import pandas as pd @@ -324,4 +294,3 @@ cdef class RowBatch: def __getitem__(self, i): return self.arrays[i] - diff --git a/python/pyarrow/error.pxd b/python/pyarrow/error.pxd index 1fb6fad396a..891d1ac1c7e 100644 --- a/python/pyarrow/error.pxd +++ b/python/pyarrow/error.pxd @@ -16,7 +16,7 @@ # under the License. from pyarrow.includes.libarrow cimport CStatus -from pyarrow.includes.pyarrow cimport * +from pyarrow.includes.pyarrow cimport PyStatus cdef int check_cstatus(const CStatus& status) nogil except -1 -cdef int check_status(const Status& status) nogil except -1 +cdef int check_status(const PyStatus& status) nogil except -1 diff --git a/python/pyarrow/error.pyx b/python/pyarrow/error.pyx index 244019321a7..a2c53fed8c6 100644 --- a/python/pyarrow/error.pyx +++ b/python/pyarrow/error.pyx @@ -30,7 +30,7 @@ cdef int check_cstatus(const CStatus& status) nogil except -1: with gil: raise ArrowException(frombytes(c_message)) -cdef int check_status(const Status& status) nogil except -1: +cdef int check_status(const PyStatus& status) nogil except -1: if status.ok(): return 0 diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd index f338a436814..d31c017a1eb 100644 --- a/python/pyarrow/includes/libarrow_io.pxd +++ b/python/pyarrow/includes/libarrow_io.pxd @@ -18,6 +18,7 @@ # distutils: language = c++ from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport MemoryPool cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil: enum FileMode" arrow::io::FileMode::type": @@ -66,6 +67,24 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil: pass +cdef extern from "arrow/io/file.h" namespace "arrow::io" nogil: + cdef cppclass FileOutputStream(OutputStream): + @staticmethod + CStatus Open(const c_string& path, shared_ptr[FileOutputStream]* file) + + int file_descriptor() + + cdef cppclass ReadableFile(ReadableFileInterface): + @staticmethod + CStatus Open(const c_string& path, shared_ptr[ReadableFile]* file) + + @staticmethod + CStatus Open(const c_string& path, MemoryPool* memory_pool, + shared_ptr[ReadableFile]* file) + + int file_descriptor() + + cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: CStatus ConnectLibHdfs() diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd index 92c814706fd..4f86ecf492e 100644 --- a/python/pyarrow/includes/pyarrow.pxd +++ b/python/pyarrow/includes/pyarrow.pxd @@ -18,15 +18,18 @@ # distutils: language = c++ from pyarrow.includes.common cimport * -from pyarrow.includes.libarrow cimport (CArray, CColumn, CDataType, +from pyarrow.includes.libarrow cimport (CArray, CColumn, CDataType, CStatus, Type, MemoryPool) +cimport pyarrow.includes.libarrow_io as arrow_io + + cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil: # We can later add more of the common status factory methods as needed - cdef Status Status_OK "Status::OK"() + cdef PyStatus PyStatus_OK "Status::OK"() - cdef cppclass Status: - Status() + cdef cppclass PyStatus "pyarrow::Status": + PyStatus() c_string ToString() @@ -40,12 +43,22 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil: c_bool IsArrowError() shared_ptr[CDataType] GetPrimitiveType(Type type) - Status ConvertPySequence(object obj, shared_ptr[CArray]* out) + PyStatus ConvertPySequence(object obj, shared_ptr[CArray]* out) - Status PandasToArrow(MemoryPool* pool, object ao, shared_ptr[CArray]* out) - Status PandasMaskedToArrow(MemoryPool* pool, object ao, object mo, - shared_ptr[CArray]* out) + PyStatus PandasToArrow(MemoryPool* pool, object ao, + shared_ptr[CArray]* out) + PyStatus PandasMaskedToArrow(MemoryPool* pool, object ao, object mo, + shared_ptr[CArray]* out) - Status ArrowToPandas(const shared_ptr[CColumn]& arr, object py_ref, PyObject** out) + PyStatus ArrowToPandas(const shared_ptr[CColumn]& arr, object py_ref, + PyObject** out) MemoryPool* GetMemoryPool() + + +cdef extern from "pyarrow/io.h" namespace "pyarrow" nogil: + cdef cppclass PyReadableFile(arrow_io.ReadableFileInterface): + PyReadableFile(object fo) + + cdef cppclass PyOutputStream(arrow_io.OutputStream): + PyOutputStream(object fo) diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd index f55fc0ab53a..1dbb3fd76bb 100644 --- a/python/pyarrow/io.pxd +++ b/python/pyarrow/io.pxd @@ -23,11 +23,16 @@ from pyarrow.includes.libarrow_io cimport (ReadableFileInterface, OutputStream) -cdef class NativeFileInterface: +cdef class NativeFile: + cdef: + shared_ptr[ReadableFileInterface] rd_file + shared_ptr[OutputStream] wr_file + bint is_readonly + bint is_open # By implementing these "virtual" functions (all functions in Cython - # extension classes are technically virtual in the C++ sense)m we can - # expose the arrow::io abstract file interfaces to other components - # throughout the suite of Arrow C++ libraries + # extension classes are technically virtual in the C++ sense) we can expose + # the arrow::io abstract file interfaces to other components throughout the + # suite of Arrow C++ libraries cdef read_handle(self, shared_ptr[ReadableFileInterface]* file) cdef write_handle(self, shared_ptr[OutputStream]* file) diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index f2eee260c33..3c2727cb00d 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -242,6 +242,9 @@ cdef class HdfsClient: cdef int16_t c_replication = replication or 0 cdef int64_t c_default_block_size = default_block_size or 0 + cdef shared_ptr[HdfsOutputStream] wr_handle + cdef shared_ptr[HdfsReadableFile] rd_handle + if mode in ('wb', 'ab'): if mode == 'ab': append = True @@ -251,13 +254,17 @@ cdef class HdfsClient: self.client.get() .OpenWriteable(c_path, append, c_buffer_size, c_replication, c_default_block_size, - &out.wr_file)) + &wr_handle)) + + out.wr_file = wr_handle out.is_readonly = False else: with nogil: check_cstatus(self.client.get() - .OpenReadable(c_path, &out.rd_file)) + .OpenReadable(c_path, &rd_handle)) + + out.rd_file = rd_handle out.is_readonly = True if c_buffer_size == 0: @@ -314,25 +321,8 @@ cdef class HdfsClient: f = self.open(path, 'rb', buffer_size=buffer_size) f.download(stream) -cdef class NativeFileInterface: - - cdef read_handle(self, shared_ptr[ReadableFileInterface]* file): - raise NotImplementedError - cdef write_handle(self, shared_ptr[OutputStream]* file): - raise NotImplementedError - -cdef class HdfsFile(NativeFileInterface): - cdef: - shared_ptr[HdfsReadableFile] rd_file - shared_ptr[HdfsOutputStream] wr_file - bint is_readonly - bint is_open - object parent - - cdef readonly: - int32_t buffer_size - object mode +cdef class NativeFile: def __cinit__(self): self.is_open = False @@ -356,14 +346,6 @@ cdef class HdfsFile(NativeFileInterface): check_cstatus(self.wr_file.get().Close()) self.is_open = False - cdef _assert_readable(self): - if not self.is_readonly: - raise IOError("only valid on readonly files") - - cdef _assert_writeable(self): - if self.is_readonly: - raise IOError("only valid on writeonly files") - cdef read_handle(self, shared_ptr[ReadableFileInterface]* file): self._assert_readable() file[0] = self.rd_file @@ -372,6 +354,14 @@ cdef class HdfsFile(NativeFileInterface): self._assert_writeable() file[0] = self.wr_file + def _assert_readable(self): + if not self.is_readonly: + raise IOError("only valid on readonly files") + + def _assert_writeable(self): + if self.is_readonly: + raise IOError("only valid on writeonly files") + def size(self): cdef int64_t size self._assert_readable() @@ -393,6 +383,26 @@ cdef class HdfsFile(NativeFileInterface): with nogil: check_cstatus(self.rd_file.get().Seek(position)) + def write(self, data): + """ + Write bytes-like (unicode, encoded to UTF-8) to file + """ + self._assert_writeable() + + data = tobytes(data) + + cdef const uint8_t* buf = cp.PyBytes_AS_STRING(data) + cdef int64_t bufsize = len(data) + with nogil: + check_cstatus(self.wr_file.get().Write(buf, bufsize)) + + +cdef class HdfsFile(NativeFile): + cdef readonly: + int32_t buffer_size + object mode + object parent + def read(self, int nbytes): """ Read indicated number of bytes from the file, up to EOF @@ -504,16 +514,3 @@ cdef class HdfsFile(NativeFileInterface): writer_thread.join() if exc_info is not None: raise exc_info[0], exc_info[1], exc_info[2] - - def write(self, data): - """ - Write bytes-like (unicode, encoded to UTF-8) to file - """ - self._assert_writeable() - - data = tobytes(data) - - cdef const uint8_t* buf = cp.PyBytes_AS_STRING(data) - cdef int64_t bufsize = len(data) - with nogil: - check_cstatus(self.wr_file.get().Write(buf, bufsize)) diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index 099e148abc1..ca0176a7c04 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -27,10 +27,10 @@ cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.compat import tobytes from pyarrow.error import ArrowException from pyarrow.error cimport check_cstatus -from pyarrow.io import NativeFileInterface +from pyarrow.io import NativeFile from pyarrow.table cimport Table -from pyarrow.io cimport NativeFileInterface +from pyarrow.io cimport NativeFile import six @@ -54,7 +54,7 @@ cdef class ParquetReader: new FileReader(default_memory_pool(), ParquetFileReader.OpenFile(path))) - cdef open_native_file(self, NativeFileInterface file): + cdef open_native_file(self, NativeFile file): cdef shared_ptr[ReadableFileInterface] cpp_handle file.read_handle(&cpp_handle) @@ -84,7 +84,7 @@ def read_table(source, columns=None): if isinstance(source, six.string_types): reader.open_local_file(source) - elif isinstance(source, NativeFileInterface): + elif isinstance(source, NativeFile): reader.open_native_file(source) return reader.read_all() diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index f02d36f520b..ade82aa6761 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -25,10 +25,12 @@ cimport pyarrow.includes.pyarrow as pyarrow import pyarrow.config from pyarrow.array cimport Array, box_arrow_array -from pyarrow.compat import frombytes, tobytes from pyarrow.error cimport check_status from pyarrow.schema cimport box_data_type, box_schema +from pyarrow.compat import frombytes, tobytes + + cdef class ChunkedArray: ''' Do not call this class's constructor directly. @@ -161,7 +163,7 @@ cdef class Table: @staticmethod def from_pandas(df, name=None): - pass + return from_pandas_dataframe(df, name=name) @staticmethod def from_arrays(names, arrays, name=None): @@ -264,3 +266,34 @@ cdef class Table: def __get__(self): return (self.num_rows, self.num_columns) + + +def from_pandas_dataframe(object df, name=None, timestamps_to_ms=False): + """ + Convert pandas.DataFrame to an Arrow Table + + Parameters + ---------- + df: pandas.DataFrame + + name: str + + timestamps_to_ms: bool + Convert datetime columns to ms resolution. This is needed for + compability with other functionality like Parquet I/O which + only supports milliseconds. + """ + from pyarrow.array import from_pandas_series + + cdef: + list names = [] + list arrays = [] + + for name in df.columns: + col = df[name] + arr = from_pandas_series(col, timestamps_to_ms=timestamps_to_ms) + + names.append(name) + arrays.append(arr) + + return Table.from_arrays(names, arrays, name=name) diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc index 5667045142f..681470f69c1 100644 --- a/python/src/pyarrow/io.cc +++ b/python/src/pyarrow/io.cc @@ -29,70 +29,104 @@ namespace pyarrow { // ---------------------------------------------------------------------- -// Seekable input stream +// Python file -PyReadableFile::PyReadableFile(PyObject* file) - : file_(file) { +PythonFile::PythonFile(PyObject* file) { Py_INCREF(file_); } -PyReadableFile::~PyReadableFile() { +PythonFile::~PythonFile() { Py_DECREF(file_); } -#define ARROW_RETURN_IF_PYERROR() \ - if (PyErr_Occurred()) { \ - PyObject *exc_type, *exc_value, *traceback; \ - PyErr_Fetch(&exc_type, &exc_value, &traceback); \ - PyObjectStringify stringified(exc_value); \ - std::string message(stringified.bytes); \ - Py_DECREF(exc_type); \ - Py_DECREF(exc_value); \ - Py_DECREF(traceback); \ - PyErr_Clear(); \ - return arrow::Status::IOError(message); \ +static arrow::Status CheckPyError() { + if (PyErr_Occurred()) { + PyObject *exc_type, *exc_value, *traceback; + PyErr_Fetch(&exc_type, &exc_value, &traceback); + PyObjectStringify stringified(exc_value); + std::string message(stringified.bytes); + Py_DECREF(exc_type); + Py_DECREF(exc_value); + Py_DECREF(traceback); + PyErr_Clear(); + return arrow::Status::IOError(message); } + return arrow::Status::OK(); +} + +arrow::Status PythonFile::Close() { + // whence: 0 for relative to start of file, 2 for end of file + PyObject* result = PyObject_CallMethod(file_, "close", "()"); + Py_XDECREF(result); + ARROW_RETURN_NOT_OK(CheckPyError()); + return arrow::Status::OK(); +} -static arrow::Status SeekNoGIL(PyObject* file, int64_t position, int whence) { +arrow::Status PythonFile::Seek(int64_t position, int whence) { // whence: 0 for relative to start of file, 2 for end of file - PyObject* result = PyObject_CallMethod(file, "seek", "(i)", position); + PyObject* result = PyObject_CallMethod(file_, "seek", "(i)", position); Py_XDECREF(result); - ARROW_RETURN_IF_PYERROR(); + ARROW_RETURN_NOT_OK(CheckPyError()); return arrow::Status::OK(); } -static arrow::Status ReadNoGIL(PyObject* file, int64_t nbytes, PyObject** out) { - PyObject* result = PyObject_CallMethod(file, "read", "(i)", nbytes); - ARROW_RETURN_IF_PYERROR(); +arrow::Status PythonFile::Read(int64_t nbytes, PyObject** out) { + PyObject* result = PyObject_CallMethod(file_, "read", "(i)", nbytes); + ARROW_RETURN_NOT_OK(CheckPyError()); *out = result; return arrow::Status::OK(); } -static arrow::Status TellNoGIL(PyObject* file, int64_t* position) { - PyObject* result = PyObject_CallMethod(file, "tell", "()"); - ARROW_RETURN_IF_PYERROR(); +arrow::Status PythonFile::Write(const uint8_t* data, int64_t nbytes) { + PyObject* py_data = PyBytes_FromStringAndSize( + reinterpret_cast(data), nbytes); + ARROW_RETURN_NOT_OK(CheckPyError()); + + PyObject* result = PyObject_CallMethod(file_, "write", "(O)", py_data); + Py_DECREF(py_data); + ARROW_RETURN_NOT_OK(CheckPyError()); + return arrow::Status::OK(); +} + +arrow::Status PythonFile::Tell(int64_t* position) { + PyObject* result = PyObject_CallMethod(file_, "tell", "()"); + ARROW_RETURN_NOT_OK(CheckPyError()); *position = PyLong_AsLongLong(result); Py_DECREF(result); // PyLong_AsLongLong can raise OverflowError - ARROW_RETURN_IF_PYERROR(); + ARROW_RETURN_NOT_OK(CheckPyError()); return arrow::Status::OK(); } +// ---------------------------------------------------------------------- +// Seekable input stream + +PyReadableFile::PyReadableFile(PyObject* file) { + file_.reset(new PythonFile(file)); +} + +PyReadableFile::~PyReadableFile() {} + +arrow::Status PyReadableFile::Close() { + PyAcquireGIL_RAII lock; + return file_->Close(); +} + arrow::Status PyReadableFile::Seek(int64_t position) { PyAcquireGIL_RAII lock; - return SeekNoGIL(file_, position, 0); + return file_->Seek(position, 0); } arrow::Status PyReadableFile::ReadAt( int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) { PyAcquireGIL_RAII lock; - ARROW_RETURN_NOT_OK(SeekNoGIL(file_, position, 0)); + ARROW_RETURN_NOT_OK(file_->Seek(position, 0)); PyObject* bytes_obj; - ARROW_RETURN_NOT_OK(ReadNoGIL(file_, nbytes, &bytes_obj)); + ARROW_RETURN_NOT_OK(file_->Read(nbytes, &bytes_obj)); *bytes_read = PyBytes_GET_SIZE(bytes_obj); std::memcpy(out, PyBytes_AS_STRING(bytes_obj), *bytes_read); @@ -105,15 +139,15 @@ arrow::Status PyReadableFile::GetSize(int64_t* size) { PyAcquireGIL_RAII lock; int64_t current_position;; - ARROW_RETURN_NOT_OK(TellNoGIL(file_, ¤t_position)); + ARROW_RETURN_NOT_OK(file_->Tell(¤t_position)); - ARROW_RETURN_NOT_OK(SeekNoGIL(file_, 0, 2)); + ARROW_RETURN_NOT_OK(file_->Seek(0, 2)); int64_t file_size; - ARROW_RETURN_NOT_OK(TellNoGIL(file_, &file_size)); + ARROW_RETURN_NOT_OK(file_->Tell(&file_size)); // Restore previous file position - ARROW_RETURN_NOT_OK(SeekNoGIL(file_, current_position, 0)); + ARROW_RETURN_NOT_OK(file_->Seek(current_position, 0)); *size = file_size; return arrow::Status::OK(); @@ -123,10 +157,10 @@ arrow::Status PyReadableFile::GetSize(int64_t* size) { arrow::Status PyReadableFile::ReadAt( int64_t position, int64_t nbytes, std::shared_ptr* out) { PyAcquireGIL_RAII lock; - ARROW_RETURN_NOT_OK(SeekNoGIL(file_, position, 0)); + ARROW_RETURN_NOT_OK(file_->Seek(position, 0)); PyObject* bytes_obj; - ARROW_RETURN_NOT_OK(ReadNoGIL(file_, nbytes, &bytes_obj)); + ARROW_RETURN_NOT_OK(file_->Read(nbytes, &bytes_obj)); *out = std::make_shared(bytes_obj); Py_DECREF(bytes_obj); @@ -141,14 +175,11 @@ bool PyReadableFile::supports_zero_copy() const { // ---------------------------------------------------------------------- // Output stream -PyOutputStream::PyOutputStream(PyObject* file) - : file_(file) { - Py_INCREF(file); +PyOutputStream::PyOutputStream(PyObject* file) { + file_.reset(new PythonFile(file)); } -PyOutputStream::~PyOutputStream() { - Py_DECREF(file_); -} +PyOutputStream::~PyOutputStream() {} arrow::Status PyOutputStream::Close() { return arrow::Status::OK(); diff --git a/python/src/pyarrow/io.h b/python/src/pyarrow/io.h index c2211434ef7..de57181a8ce 100644 --- a/python/src/pyarrow/io.h +++ b/python/src/pyarrow/io.h @@ -27,11 +27,30 @@ namespace arrow { class MemoryPool; } namespace pyarrow { +// A common interface to a Python file-like object. Must acquire GIL before +// calling any methods +class PythonFile { + public: + PythonFile(PyObject* file); + ~PythonFile(); + + arrow::Status Close(); + arrow::Status Seek(int64_t position, int whence); + arrow::Status Read(int64_t nbytes, PyObject** out); + arrow::Status Tell(int64_t* position); + arrow::Status Write(const uint8_t* data, int64_t nbytes); + + private: + PyObject* file_; +}; + class PYARROW_EXPORT PyReadableFile : public arrow::io::ReadableFileInterface { public: explicit PyReadableFile(PyObject* file); ~PyReadableFile(); + arrow::Status Close() override; + arrow::Status ReadAt( int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) override; @@ -46,7 +65,7 @@ class PYARROW_EXPORT PyReadableFile : public arrow::io::ReadableFileInterface { bool supports_zero_copy() const override; private: - PyObject* file_; + std::unique_ptr file_; }; class PYARROW_EXPORT PyOutputStream : public arrow::io::OutputStream { @@ -59,9 +78,11 @@ class PYARROW_EXPORT PyOutputStream : public arrow::io::OutputStream { arrow::Status Write(const uint8_t* data, int64_t nbytes) override; private: - PyObject* file_; + std::unique_ptr file_; }; +// TODO(wesm): seekable output files + } // namespace pyarrow #endif // PYARROW_IO_H From d4701335e7efa252768e8a1ddbc2e1c224fe90e1 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 2 Oct 2016 14:23:49 -0400 Subject: [PATCH 04/11] Share default implementations of ReadAt, add Buffer-based Read API Change-Id: If86ff0ce8b25bd28c9028e93624af62ba01f8a60 --- cpp/src/arrow/io/CMakeLists.txt | 1 + cpp/src/arrow/io/file.cc | 10 +------ cpp/src/arrow/io/file.h | 6 ++-- cpp/src/arrow/io/hdfs.cc | 53 ++++++++++++++++++++++++++++++--- cpp/src/arrow/io/hdfs.h | 12 +++++--- cpp/src/arrow/io/interfaces.cc | 48 +++++++++++++++++++++++++++++ cpp/src/arrow/io/interfaces.h | 27 ++++++++++------- cpp/src/arrow/io/memory.cc | 40 ++++++++++--------------- cpp/src/arrow/io/memory.h | 21 +++++++------ python/pyarrow/io.pyx | 39 ++++++++++++++++++++++++ 10 files changed, 190 insertions(+), 67 deletions(-) create mode 100644 cpp/src/arrow/io/interfaces.cc diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt index d2e3491b75f..47bb0893863 100644 --- a/cpp/src/arrow/io/CMakeLists.txt +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -39,6 +39,7 @@ set(ARROW_IO_TEST_LINK_LIBS set(ARROW_IO_SRCS file.cc + interfaces.cc memory.cc ) diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc index 87bae7f3928..93f0ad91ee8 100644 --- a/cpp/src/arrow/io/file.cc +++ b/cpp/src/arrow/io/file.cc @@ -413,15 +413,7 @@ Status ReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) { return impl_->Read(nbytes, bytes_read, out); } -Status ReadableFile::ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) { - RETURN_NOT_OK(Seek(position)); - return impl_->Read(nbytes, bytes_read, out); -} - -Status ReadableFile::ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) { - RETURN_NOT_OK(Seek(position)); +Status ReadableFile::Read(int64_t nbytes, std::shared_ptr* out) { return impl_->ReadBuffer(nbytes, out); } diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h index 5e714ea9667..10fe16e5112 100644 --- a/cpp/src/arrow/io/file.h +++ b/cpp/src/arrow/io/file.h @@ -71,11 +71,9 @@ class ARROW_EXPORT ReadableFile : public ReadableFileInterface { Status Close() override; Status Tell(int64_t* position) override; - Status ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; - Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; - Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; + Status Read(int64_t nbytes, std::shared_ptr* out) override; + Status GetSize(int64_t* size) override; Status Seek(int64_t position) override; diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index a6b4b2f3846..a5b2c33df39 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -22,6 +22,8 @@ #include #include "arrow/io/hdfs.h" +#include "arrow/util/buffer.h" +#include "arrow/util/memory-pool.h" #include "arrow/util/status.h" namespace arrow { @@ -89,7 +91,8 @@ class HdfsAnyFileImpl { // Private implementation for read-only files class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { public: - HdfsReadableFileImpl() {} + HdfsReadableFileImpl(MemoryPool* pool) + : pool_(pool) {} Status Close() { if (is_open_) { @@ -108,6 +111,20 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { return Status::OK(); } + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) { + auto buffer = std::make_shared(pool_); + RETURN_NOT_OK(buffer->Resize(nbytes)); + + int64_t bytes_read = 0; + RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data())); + + // XXX: heuristic + if (bytes_read < nbytes / 2) { RETURN_NOT_OK(buffer->Resize(bytes_read)); } + + *out = buffer; + return Status::OK(); + } + Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { tSize ret = hdfsRead(fs_, file_, reinterpret_cast(buffer), nbytes); RETURN_NOT_OK(CheckReadResult(ret)); @@ -115,6 +132,20 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { return Status::OK(); } + Status Read(int64_t nbytes, std::shared_ptr* out) { + auto buffer = std::make_shared(pool_); + RETURN_NOT_OK(buffer->Resize(nbytes)); + + int64_t bytes_read = 0; + RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); + + // XXX: heuristic + if (bytes_read < nbytes / 2) { RETURN_NOT_OK(buffer->Resize(bytes_read)); } + + *out = buffer; + return Status::OK(); + } + Status GetSize(int64_t* size) { hdfsFileInfo* entry = hdfsGetPathInfo(fs_, path_.c_str()); if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo failed"); } @@ -123,10 +154,20 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { hdfsFreeFileInfo(entry, 1); return Status::OK(); } + + void set_memory_pool(MemoryPool* pool) { + pool_ = pool; + } + + private: + MemoryPool* pool_; }; -HdfsReadableFile::HdfsReadableFile() { - impl_.reset(new HdfsReadableFileImpl()); +HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) { + if (pool == nullptr) { + pool = default_memory_pool(); + } + impl_.reset(new HdfsReadableFileImpl(pool)); } HdfsReadableFile::~HdfsReadableFile() { @@ -144,7 +185,7 @@ Status HdfsReadableFile::ReadAt( Status HdfsReadableFile::ReadAt( int64_t position, int64_t nbytes, std::shared_ptr* out) { - return Status::NotImplemented("Not yet implemented"); + return impl_->ReadAt(position, nbytes, out); } bool HdfsReadableFile::supports_zero_copy() const { @@ -155,6 +196,10 @@ Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buff return impl_->Read(nbytes, bytes_read, buffer); } +Status HdfsReadableFile::Read(int64_t nbytes, std::shared_ptr* buffer) { + return impl_->Read(nbytes, buffer); +} + Status HdfsReadableFile::GetSize(int64_t* size) { return impl_->GetSize(size); } diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index 39720cc17e4..c860e1baa94 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -164,6 +164,12 @@ class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface { Status GetSize(int64_t* size) override; + // NOTE: If you wish to read a particular range of a file in a multithreaded + // context, you may prefer to use ReadAt to avoid locking issues + Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; + + Status Read(int64_t nbytes, std::shared_ptr* out) override; + Status ReadAt( int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; @@ -174,9 +180,7 @@ class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface { Status Seek(int64_t position) override; Status Tell(int64_t* position) override; - // NOTE: If you wish to read a particular range of a file in a multithreaded - // context, you may prefer to use ReadAt to avoid locking issues - Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; + void set_memory_pool(MemoryPool* pool); private: class ARROW_NO_EXPORT HdfsReadableFileImpl; @@ -184,7 +188,7 @@ class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface { friend class HdfsClient::HdfsClientImpl; - HdfsReadableFile(); + HdfsReadableFile(MemoryPool* pool = nullptr); DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile); }; diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc new file mode 100644 index 00000000000..44986cee1af --- /dev/null +++ b/cpp/src/arrow/io/interfaces.cc @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/io/interfaces.h" + +#include +#include + +#include "arrow/util/buffer.h" +#include "arrow/util/status.h" + +namespace arrow { +namespace io { + +FileInterface::~FileInterface() {} + +ReadableFileInterface::ReadableFileInterface() { + set_mode(FileMode::READ); +} + +Status ReadableFileInterface::ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) { + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, bytes_read, out); +} + +Status ReadableFileInterface::ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out) { + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, out); +} + +} // namespace io +} // namespace arrow diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index fa34b43b2c9..e8114fe14ea 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -22,10 +22,12 @@ #include #include "arrow/util/macros.h" +#include "arrow/util/visibility.h" namespace arrow { class Buffer; +class MemoryPool; class Status; namespace io { @@ -43,9 +45,9 @@ class FileSystemClient { virtual ~FileSystemClient() {} }; -class FileInterface { +class ARROW_EXPORT FileInterface { public: - virtual ~FileInterface() {} + virtual ~FileInterface() = 0; virtual Status Close() = 0; virtual Status Tell(int64_t* position) = 0; @@ -54,7 +56,6 @@ class FileInterface { protected: FileInterface() {} FileMode::type mode_; - void set_mode(FileMode::type mode) { mode_ = mode; } private: @@ -74,6 +75,9 @@ class Writeable { class Readable { public: virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0; + + // Does not copy if not necessary + virtual Status Read(int64_t nbytes, std::shared_ptr* out) = 0; }; class OutputStream : public FileInterface, public Writeable { @@ -86,21 +90,22 @@ class InputStream : public FileInterface, public Readable { InputStream() {} }; -class ReadableFileInterface : public InputStream, public Seekable { +class ARROW_EXPORT ReadableFileInterface : public InputStream, public Seekable { public: - virtual Status ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0; - virtual Status GetSize(int64_t* size) = 0; - // Does not copy if not necessary + virtual bool supports_zero_copy() const = 0; + + // Read at position, provide default implementations using Read(...), but can + // be overridden virtual Status ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) = 0; + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out); - virtual bool supports_zero_copy() const = 0; + virtual Status ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out); protected: - ReadableFileInterface() { set_mode(FileMode::READ); } + ReadableFileInterface(); }; class WriteableFileInterface : public OutputStream, public Seekable { diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index c168c91c5f8..2834cffc9f3 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -123,6 +123,8 @@ MemoryMappedFile::MemoryMappedFile(FileMode::type mode) { ReadableFileInterface::set_mode(mode); } +MemoryMappedFile::~MemoryMappedFile() {} + Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode, std::shared_ptr* out) { std::shared_ptr result(new MemoryMappedFile(mode)); @@ -161,16 +163,8 @@ Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) return Status::OK(); } -Status MemoryMappedFile::ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) { - RETURN_NOT_OK(impl_->Seek(position)); - return Read(nbytes, bytes_read, out); -} - -Status MemoryMappedFile::ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) { - nbytes = std::min(nbytes, impl_->size() - position); - RETURN_NOT_OK(impl_->Seek(position)); +Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr* out) { + nbytes = std::min(nbytes, impl_->size() - impl_->position()); *out = std::make_shared(impl_->head(), nbytes); impl_->advance(nbytes); return Status::OK(); @@ -246,6 +240,11 @@ Status BufferOutputStream::Reserve(int64_t nbytes) { // ---------------------------------------------------------------------- // In-memory buffer reader +BufferReader::BufferReader(const uint8_t* buffer, int buffer_size) + : buffer_(buffer), buffer_size_(buffer_size), position_(0) {} + +BufferReader::~BufferReader() {} + Status BufferReader::Close() { // no-op return Status::OK(); @@ -256,20 +255,6 @@ Status BufferReader::Tell(int64_t* position) { return Status::OK(); } -Status BufferReader::ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { - RETURN_NOT_OK(Seek(position)); - return Read(nbytes, bytes_read, buffer); -} - -Status BufferReader::ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) { - int64_t size = std::min(nbytes, buffer_size_ - position_); - *out = std::make_shared(buffer_ + position, size); - position_ += nbytes; - return Status::OK(); -} - bool BufferReader::supports_zero_copy() const { return true; } @@ -281,6 +266,13 @@ Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) return Status::OK(); } +Status BufferReader::Read(int64_t nbytes, std::shared_ptr* out) { + int64_t size = std::min(nbytes, buffer_size_ - position_); + *out = std::make_shared(buffer_ + position_, size); + position_ += nbytes; + return Status::OK(); +} + Status BufferReader::GetSize(int64_t* size) { *size = buffer_size_; return Status::OK(); diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index 6989d732ca7..df2fe8d6efb 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -61,6 +61,8 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream { // A memory source that uses memory-mapped files for memory interactions class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { public: + ~MemoryMappedFile(); + static Status Open(const std::string& path, FileMode::type mode, std::shared_ptr* out); @@ -73,11 +75,8 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { // Required by ReadableFileInterface, copies memory into out Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override; - Status ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) override; - - // Read into a buffer, zero copy if possible - Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + // Zero copy read + Status Read(int64_t nbytes, std::shared_ptr* out) override; bool supports_zero_copy() const override; @@ -100,17 +99,17 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { class ARROW_EXPORT BufferReader : public ReadableFileInterface { public: - BufferReader(const uint8_t* buffer, int buffer_size) - : buffer_(buffer), buffer_size_(buffer_size), position_(0) {} + BufferReader(const uint8_t* buffer, int buffer_size); + ~BufferReader(); Status Close() override; Status Tell(int64_t* position) override; - Status ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; - Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; - Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; + + // Zero copy read + Status Read(int64_t nbytes, std::shared_ptr* out) override; + Status GetSize(int64_t* size) override; Status Seek(int64_t position) override; diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index 3c2727cb00d..b42571661de 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -396,6 +396,45 @@ cdef class NativeFile: with nogil: check_cstatus(self.wr_file.get().Write(buf, bufsize)) + def read(self, int nbytes): + cdef: + int64_t bytes_read = 0 + uint8_t* buf + shared_ptr[Buffer] out + + self._assert_readable() + + with nogil: + check_cstatus(self.rd_file.get() + .Read(nbytes, &out)) + + result = cp.PyBytes_FromStringAndSize( + out.get().data(), out.get().size()) + + return result + + +# ---------------------------------------------------------------------- +# Python file-like objects + +cdef class PythonFileInterface(NativeFile): + cdef: + object handle + + def __cinit__(self, handle, mode='w'): + self.handle = handle + + if mode.startswith('w'): + self.wr_file.reset(new PyOutputStream(handle)) + elif mode.startswith('r'): + self.wr_file.reset(new PyReadableFile(handle)) + else: + raise ValueError('Invalid file mode: {0}'.format(mode)) + + +# ---------------------------------------------------------------------- +# Specialization for HDFS + cdef class HdfsFile(NativeFile): cdef readonly: From 7e357eba7b554d05dcdc88ba3b9d630343f7b3d2 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 2 Oct 2016 19:34:06 -0400 Subject: [PATCH 05/11] Get basic Python file read/write working Change-Id: I9d6e656ea152ef678ac01e22e4e24b3783ccacc0 --- python/pyarrow/includes/libarrow_io.pxd | 1 + python/pyarrow/io.pyx | 10 +- python/pyarrow/tests/test_hdfs.py | 128 ++++++++++++++++++++++++ python/pyarrow/tests/test_io.py | 109 +++++--------------- python/src/pyarrow/common.cc | 6 +- python/src/pyarrow/common.h | 8 +- python/src/pyarrow/io.cc | 56 ++++++----- python/src/pyarrow/io.h | 10 +- 8 files changed, 204 insertions(+), 124 deletions(-) create mode 100644 python/pyarrow/tests/test_hdfs.py diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd index d31c017a1eb..f1363611a25 100644 --- a/python/pyarrow/includes/libarrow_io.pxd +++ b/python/pyarrow/includes/libarrow_io.pxd @@ -36,6 +36,7 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil: FileMode mode() cdef cppclass Readable: + CStatus ReadB" Read"(int64_t nbytes, shared_ptr[Buffer]* out) CStatus Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) cdef cppclass Seekable: diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index b42571661de..a04d0e112f6 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -406,7 +406,7 @@ cdef class NativeFile: with nogil: check_cstatus(self.rd_file.get() - .Read(nbytes, &out)) + .ReadB(nbytes, &out)) result = cp.PyBytes_FromStringAndSize( out.get().data(), out.get().size()) @@ -425,12 +425,16 @@ cdef class PythonFileInterface(NativeFile): self.handle = handle if mode.startswith('w'): - self.wr_file.reset(new PyOutputStream(handle)) + self.wr_file.reset(new pyarrow.PyOutputStream(handle)) + self.is_readonly = 0 elif mode.startswith('r'): - self.wr_file.reset(new PyReadableFile(handle)) + self.rd_file.reset(new pyarrow.PyReadableFile(handle)) + self.is_readonly = 1 else: raise ValueError('Invalid file mode: {0}'.format(mode)) + self.is_open = True + # ---------------------------------------------------------------------- # Specialization for HDFS diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py new file mode 100644 index 00000000000..ed8d41994cd --- /dev/null +++ b/python/pyarrow/tests/test_hdfs.py @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from io import BytesIO +from os.path import join as pjoin +import os +import random + +import pytest + +import pyarrow.io as io + +# ---------------------------------------------------------------------- +# HDFS tests + + +def hdfs_test_client(): + host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost') + user = os.environ['ARROW_HDFS_TEST_USER'] + try: + port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 20500)) + except ValueError: + raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ' + 'an integer') + + return io.HdfsClient.connect(host, port, user) + + +libhdfs = pytest.mark.skipif(not io.have_libhdfs(), + reason='No libhdfs available on system') + + +HDFS_TMP_PATH = '/tmp/pyarrow-test-{0}'.format(random.randint(0, 1000)) + + +@pytest.fixture(scope='session') +def hdfs(request): + fixture = hdfs_test_client() + + def teardown(): + fixture.delete(HDFS_TMP_PATH, recursive=True) + fixture.close() + request.addfinalizer(teardown) + return fixture + + +@libhdfs +def test_hdfs_close(): + client = hdfs_test_client() + assert client.is_open + client.close() + assert not client.is_open + + with pytest.raises(Exception): + client.ls('/') + + +@libhdfs +def test_hdfs_mkdir(hdfs): + path = pjoin(HDFS_TMP_PATH, 'test-dir/test-dir') + parent_path = pjoin(HDFS_TMP_PATH, 'test-dir') + + hdfs.mkdir(path) + assert hdfs.exists(path) + + hdfs.delete(parent_path, recursive=True) + assert not hdfs.exists(path) + + +@libhdfs +def test_hdfs_ls(hdfs): + base_path = pjoin(HDFS_TMP_PATH, 'ls-test') + hdfs.mkdir(base_path) + + dir_path = pjoin(base_path, 'a-dir') + f1_path = pjoin(base_path, 'a-file-1') + + hdfs.mkdir(dir_path) + + f = hdfs.open(f1_path, 'wb') + f.write('a' * 10) + + contents = sorted(hdfs.ls(base_path, False)) + assert contents == [dir_path, f1_path] + + +@libhdfs +def test_hdfs_download_upload(hdfs): + base_path = pjoin(HDFS_TMP_PATH, 'upload-test') + + data = b'foobarbaz' + buf = BytesIO(data) + buf.seek(0) + + hdfs.upload(base_path, buf) + + out_buf = BytesIO() + hdfs.download(base_path, out_buf) + out_buf.seek(0) + assert out_buf.getvalue() == data + + +@libhdfs +def test_hdfs_file_context_manager(hdfs): + path = pjoin(HDFS_TMP_PATH, 'ctx-manager') + + data = b'foo' + with hdfs.open(path, 'wb') as f: + f.write(data) + + with hdfs.open(path, 'rb') as f: + assert f.size() == 3 + result = f.read(10) + assert result == data diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index eb92e8ea93a..096a313159b 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -24,104 +24,49 @@ import pyarrow.io as io -#---------------------------------------------------------------------- -# HDFS tests +# ---------------------------------------------------------------------- +# Python file-like objects -def hdfs_test_client(): - host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost') - user = os.environ['ARROW_HDFS_TEST_USER'] - try: - port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 20500)) - except ValueError: - raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ' - 'an integer') +def test_python_file_write(): + buf = BytesIO() - return io.HdfsClient.connect(host, port, user) + f = io.PythonFileInterface(buf) + assert f.tell() == 0 -libhdfs = pytest.mark.skipif(not io.have_libhdfs(), - reason='No libhdfs available on system') + s1 = b'enga\xc3\xb1ado' + s2 = b'foobar' + f.write(s1.decode('utf8')) + assert f.tell() == len(s1) -HDFS_TMP_PATH = '/tmp/pyarrow-test-{0}'.format(random.randint(0, 1000)) + f.write(s2) + expected = s1 + s2 -@pytest.fixture(scope='session') -def hdfs(request): - fixture = hdfs_test_client() - def teardown(): - fixture.delete(HDFS_TMP_PATH, recursive=True) - fixture.close() - request.addfinalizer(teardown) - return fixture + result = buf.getvalue() + assert result == expected + f.close() -@libhdfs -def test_hdfs_close(): - client = hdfs_test_client() - assert client.is_open - client.close() - assert not client.is_open - with pytest.raises(Exception): - client.ls('/') +def test_python_file_read(): + buf = BytesIO(b'some sample data') + f = io.PythonFileInterface(buf, mode='r') -@libhdfs -def test_hdfs_mkdir(hdfs): - path = pjoin(HDFS_TMP_PATH, 'test-dir/test-dir') - parent_path = pjoin(HDFS_TMP_PATH, 'test-dir') + assert f.tell() == 0 - hdfs.mkdir(path) - assert hdfs.exists(path) + assert f.read(4) == b'some' + assert f.tell() == 4 - hdfs.delete(parent_path, recursive=True) - assert not hdfs.exists(path) + f.seek(0) + assert f.tell() == 0 + f.seek(5) + assert f.tell() == 5 -@libhdfs -def test_hdfs_ls(hdfs): - base_path = pjoin(HDFS_TMP_PATH, 'ls-test') - hdfs.mkdir(base_path) + assert f.read(50) == b'sample data' - dir_path = pjoin(base_path, 'a-dir') - f1_path = pjoin(base_path, 'a-file-1') - - hdfs.mkdir(dir_path) - - f = hdfs.open(f1_path, 'wb') - f.write('a' * 10) - - contents = sorted(hdfs.ls(base_path, False)) - assert contents == [dir_path, f1_path] - - -@libhdfs -def test_hdfs_download_upload(hdfs): - base_path = pjoin(HDFS_TMP_PATH, 'upload-test') - - data = b'foobarbaz' - buf = BytesIO(data) - buf.seek(0) - - hdfs.upload(base_path, buf) - - out_buf = BytesIO() - hdfs.download(base_path, out_buf) - out_buf.seek(0) - assert out_buf.getvalue() == data - - -@libhdfs -def test_hdfs_file_context_manager(hdfs): - path = pjoin(HDFS_TMP_PATH, 'ctx-manager') - - data = b'foo' - with hdfs.open(path, 'wb') as f: - f.write(data) - - with hdfs.open(path, 'rb') as f: - assert f.size() == 3 - result = f.read(10) - assert result == data + f.close() diff --git a/python/src/pyarrow/common.cc b/python/src/pyarrow/common.cc index a940149efa0..82b14fdf401 100644 --- a/python/src/pyarrow/common.cc +++ b/python/src/pyarrow/common.cc @@ -72,14 +72,14 @@ arrow::MemoryPool* GetMemoryPool() { // PyBytesBuffer PyBytesBuffer::PyBytesBuffer(PyObject* obj) - : Buffer(reinterpret_cast(PyBytes_AS_STRING(obj_)), - PyBytes_GET_SIZE(obj_)), + : Buffer(reinterpret_cast(PyBytes_AS_STRING(obj)), + PyBytes_GET_SIZE(obj)), obj_(obj) { Py_INCREF(obj_); } PyBytesBuffer::~PyBytesBuffer() { - PyAcquireGIL_RAII lock; + PyGILGuard lock; Py_DECREF(obj_); } diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h index 132abd8c217..fd5b4effb1b 100644 --- a/python/src/pyarrow/common.h +++ b/python/src/pyarrow/common.h @@ -82,18 +82,18 @@ struct PyObjectStringify { } }; -class PyAcquireGIL_RAII { +class PyGILGuard { public: - PyAcquireGIL_RAII() { + PyGILGuard() { state_ = PyGILState_Ensure(); } - ~PyAcquireGIL_RAII() { + ~PyGILGuard() { PyGILState_Release(state_); } private: PyGILState_STATE state_; - DISALLOW_COPY_AND_ASSIGN(PyAcquireGIL_RAII); + DISALLOW_COPY_AND_ASSIGN(PyGILGuard); }; // TODO(wesm): We can just let errors pass through. To be explored later diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc index 681470f69c1..3e469cb7ef8 100644 --- a/python/src/pyarrow/io.cc +++ b/python/src/pyarrow/io.cc @@ -31,7 +31,8 @@ namespace pyarrow { // ---------------------------------------------------------------------- // Python file -PythonFile::PythonFile(PyObject* file) { +PythonFile::PythonFile(PyObject* file) + : file_(file) { Py_INCREF(file_); } @@ -84,6 +85,7 @@ arrow::Status PythonFile::Write(const uint8_t* data, int64_t nbytes) { PyObject* result = PyObject_CallMethod(file_, "write", "(O)", py_data); Py_DECREF(py_data); + Py_XDECREF(result); ARROW_RETURN_NOT_OK(CheckPyError()); return arrow::Status::OK(); } @@ -111,20 +113,22 @@ PyReadableFile::PyReadableFile(PyObject* file) { PyReadableFile::~PyReadableFile() {} arrow::Status PyReadableFile::Close() { - PyAcquireGIL_RAII lock; + PyGILGuard lock; return file_->Close(); } arrow::Status PyReadableFile::Seek(int64_t position) { - PyAcquireGIL_RAII lock; + PyGILGuard lock; return file_->Seek(position, 0); } -arrow::Status PyReadableFile::ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) { - PyAcquireGIL_RAII lock; - ARROW_RETURN_NOT_OK(file_->Seek(position, 0)); +arrow::Status PyReadableFile::Tell(int64_t* position) { + PyGILGuard lock; + return file_->Tell(position); +} +arrow::Status PyReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) { + PyGILGuard lock; PyObject* bytes_obj; ARROW_RETURN_NOT_OK(file_->Read(nbytes, &bytes_obj)); @@ -135,8 +139,20 @@ arrow::Status PyReadableFile::ReadAt( return arrow::Status::OK(); } +arrow::Status PyReadableFile::Read(int64_t nbytes, std::shared_ptr* out) { + PyGILGuard lock; + + PyObject* bytes_obj; + ARROW_RETURN_NOT_OK(file_->Read(nbytes, &bytes_obj)); + + *out = std::make_shared(bytes_obj); + Py_DECREF(bytes_obj); + + return arrow::Status::OK(); +} + arrow::Status PyReadableFile::GetSize(int64_t* size) { - PyAcquireGIL_RAII lock; + PyGILGuard lock; int64_t current_position;; ARROW_RETURN_NOT_OK(file_->Tell(¤t_position)); @@ -153,21 +169,6 @@ arrow::Status PyReadableFile::GetSize(int64_t* size) { return arrow::Status::OK(); } -// Does not copy if not necessary -arrow::Status PyReadableFile::ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) { - PyAcquireGIL_RAII lock; - ARROW_RETURN_NOT_OK(file_->Seek(position, 0)); - - PyObject* bytes_obj; - ARROW_RETURN_NOT_OK(file_->Read(nbytes, &bytes_obj)); - - *out = std::make_shared(bytes_obj); - Py_DECREF(bytes_obj); - - return arrow::Status::OK(); -} - bool PyReadableFile::supports_zero_copy() const { return false; } @@ -182,15 +183,18 @@ PyOutputStream::PyOutputStream(PyObject* file) { PyOutputStream::~PyOutputStream() {} arrow::Status PyOutputStream::Close() { - return arrow::Status::OK(); + PyGILGuard lock; + return file_->Close(); } arrow::Status PyOutputStream::Tell(int64_t* position) { - return arrow::Status::OK(); + PyGILGuard lock; + return file_->Tell(position); } arrow::Status PyOutputStream::Write(const uint8_t* data, int64_t nbytes) { - return arrow::Status::OK(); + PyGILGuard lock; + return file_->Write(data, nbytes); } } // namespace pyarrow diff --git a/python/src/pyarrow/io.h b/python/src/pyarrow/io.h index de57181a8ce..e764b2319a2 100644 --- a/python/src/pyarrow/io.h +++ b/python/src/pyarrow/io.h @@ -51,17 +51,15 @@ class PYARROW_EXPORT PyReadableFile : public arrow::io::ReadableFileInterface { arrow::Status Close() override; - arrow::Status ReadAt( - int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) override; + arrow::Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override; + arrow::Status Read(int64_t nbytes, std::shared_ptr* out) override; arrow::Status GetSize(int64_t* size) override; - // Does not copy if not necessary - arrow::Status ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out) override; - arrow::Status Seek(int64_t position) override; + arrow::Status Tell(int64_t* position) override; + bool supports_zero_copy() const override; private: From 6481e91b2e6eb36247c5a6804e4842f122bb3b93 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 2 Oct 2016 19:54:07 -0400 Subject: [PATCH 06/11] Add a zero-copy reader for PyBytes Change-Id: I131f49d73c73097c5ac250bfc0c60639358c0954 --- python/pyarrow/includes/libarrow_io.pxd | 9 ++++++++ python/pyarrow/includes/pyarrow.pxd | 3 +++ python/pyarrow/io.pyx | 14 ++++++++++++ python/pyarrow/tests/test_io.py | 29 +++++++++++++++++++++---- python/src/pyarrow/io.cc | 17 ++++++++++++++- python/src/pyarrow/io.h | 19 ++++++++++++---- 6 files changed, 82 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd index f1363611a25..56d8d4cf614 100644 --- a/python/pyarrow/includes/libarrow_io.pxd +++ b/python/pyarrow/includes/libarrow_io.pxd @@ -140,3 +140,12 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil: int32_t buffer_size, int16_t replication, int64_t default_block_size, shared_ptr[HdfsOutputStream]* handle) + + +cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil: + cdef cppclass BufferReader(ReadableFileInterface): + BufferReader(const uint8_t* data, int64_t nbytes) + + cdef cppclass BufferOutputStream(OutputStream): + # TODO(wesm) + pass diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd index 4f86ecf492e..4c971665ff6 100644 --- a/python/pyarrow/includes/pyarrow.pxd +++ b/python/pyarrow/includes/pyarrow.pxd @@ -62,3 +62,6 @@ cdef extern from "pyarrow/io.h" namespace "pyarrow" nogil: cdef cppclass PyOutputStream(arrow_io.OutputStream): PyOutputStream(object fo) + + cdef cppclass PyBytesReader(arrow_io.BufferReader): + PyBytesReader(object fo) diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index a04d0e112f6..e6e2b625e87 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -436,6 +436,20 @@ cdef class PythonFileInterface(NativeFile): self.is_open = True +cdef class BytesReader(NativeFile): + cdef: + object obj + + def __cinit__(self, obj): + if not isinstance(obj, bytes): + raise ValueError('Must pass bytes object') + + self.obj = obj + self.is_readonly = 1 + self.is_open = True + + self.rd_file.reset(new pyarrow.PyBytesReader(obj)) + # ---------------------------------------------------------------------- # Specialization for HDFS diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index 096a313159b..f49c13fc396 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -16,10 +16,6 @@ # under the License. from io import BytesIO -from os.path import join as pjoin -import os -import random - import pytest import pyarrow.io as io @@ -70,3 +66,28 @@ def test_python_file_read(): assert f.read(50) == b'sample data' f.close() + + +def test_bytes_reader(): + # Like a BytesIO, but zero-copy underneath for C++ consumers + f = io.BytesReader(b'some sample data') + + assert f.tell() == 0 + + assert f.read(4) == b'some' + assert f.tell() == 4 + + f.seek(0) + assert f.tell() == 0 + + f.seek(5) + assert f.tell() == 5 + + assert f.read(50) == b'sample data' + + f.close() + + +def test_bytes_reader_non_bytes(): + with pytest.raises(ValueError): + io.BytesReader('some sample data') diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc index 3e469cb7ef8..f7b1903e685 100644 --- a/python/src/pyarrow/io.cc +++ b/python/src/pyarrow/io.cc @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -197,4 +198,18 @@ arrow::Status PyOutputStream::Write(const uint8_t* data, int64_t nbytes) { return file_->Write(data, nbytes); } -} // namespace pyarrow +// ---------------------------------------------------------------------- +// A readable file that is backed by a PyBytes + +PyBytesReader::PyBytesReader(PyObject* obj) + : arrow::io::BufferReader(reinterpret_cast(PyBytes_AS_STRING(obj)), + PyBytes_GET_SIZE(obj)), + obj_(obj) { + Py_INCREF(obj_); +} + +PyBytesReader::~PyBytesReader() { + Py_DECREF(obj_); +} + +} // namespace pyarrow diff --git a/python/src/pyarrow/io.h b/python/src/pyarrow/io.h index e764b2319a2..4b1a18a9bc4 100644 --- a/python/src/pyarrow/io.h +++ b/python/src/pyarrow/io.h @@ -19,6 +19,7 @@ #define PYARROW_IO_H #include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" #include "pyarrow/config.h" #include "pyarrow/visibility.h" @@ -45,7 +46,7 @@ class PythonFile { }; class PYARROW_EXPORT PyReadableFile : public arrow::io::ReadableFileInterface { -public: + public: explicit PyReadableFile(PyObject* file); ~PyReadableFile(); @@ -62,12 +63,12 @@ class PYARROW_EXPORT PyReadableFile : public arrow::io::ReadableFileInterface { bool supports_zero_copy() const override; -private: + private: std::unique_ptr file_; }; class PYARROW_EXPORT PyOutputStream : public arrow::io::OutputStream { -public: + public: explicit PyOutputStream(PyObject* file); ~PyOutputStream(); @@ -75,10 +76,20 @@ class PYARROW_EXPORT PyOutputStream : public arrow::io::OutputStream { arrow::Status Tell(int64_t* position) override; arrow::Status Write(const uint8_t* data, int64_t nbytes) override; -private: + private: std::unique_ptr file_; }; +// A zero-copy reader backed by a PyBytes object +class PYARROW_EXPORT PyBytesReader : public arrow::io::BufferReader { + public: + explicit PyBytesReader(PyObject* obj); + ~PyBytesReader(); + + private: + PyObject* obj_; +}; + // TODO(wesm): seekable output files } // namespace pyarrow From e9b8c60441927e7529a9255e2dbf3c4b3795af24 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 2 Oct 2016 20:03:09 -0400 Subject: [PATCH 07/11] Test the size() method and fix bug with missing whence Change-Id: I2ef9faca323f333e7f828d757d8740d20e5e467a --- python/pyarrow/tests/test_io.py | 10 ++++++++-- python/src/pyarrow/common.h | 2 +- python/src/pyarrow/io.cc | 2 +- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index f49c13fc396..c188a413363 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -48,10 +48,13 @@ def test_python_file_write(): def test_python_file_read(): - buf = BytesIO(b'some sample data') + data = b'some sample data' + buf = BytesIO(data) f = io.PythonFileInterface(buf, mode='r') + assert f.size() == len(data) + assert f.tell() == 0 assert f.read(4) == b'some' @@ -70,10 +73,13 @@ def test_python_file_read(): def test_bytes_reader(): # Like a BytesIO, but zero-copy underneath for C++ consumers - f = io.BytesReader(b'some sample data') + data = b'some sample data' + f = io.BytesReader(data) assert f.tell() == 0 + assert f.size() == len(data) + assert f.read(4) == b'some' assert f.tell() == 4 diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h index fd5b4effb1b..bc599f84fab 100644 --- a/python/src/pyarrow/common.h +++ b/python/src/pyarrow/common.h @@ -133,7 +133,7 @@ class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer { class PYARROW_EXPORT PyBytesBuffer : public arrow::Buffer { public: PyBytesBuffer(PyObject* obj); - virtual ~PyBytesBuffer(); + ~PyBytesBuffer(); private: PyObject* obj_; diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc index f7b1903e685..8c7af40e384 100644 --- a/python/src/pyarrow/io.cc +++ b/python/src/pyarrow/io.cc @@ -66,7 +66,7 @@ arrow::Status PythonFile::Close() { arrow::Status PythonFile::Seek(int64_t position, int whence) { // whence: 0 for relative to start of file, 2 for end of file - PyObject* result = PyObject_CallMethod(file_, "seek", "(i)", position); + PyObject* result = PyObject_CallMethod(file_, "seek", "(ii)", position, whence); Py_XDECREF(result); ARROW_RETURN_NOT_OK(CheckPyError()); return arrow::Status::OK(); From 0fc4cf12c313c3b0d06a0dd6ca4b4aaf7430cef7 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 2 Oct 2016 20:08:12 -0400 Subject: [PATCH 08/11] cpplint Change-Id: I87b6281d7a42ce7c3d0a9b9ae8744afe530a84d6 --- cpp/src/arrow/io/hdfs.cc | 2 +- cpp/src/arrow/io/hdfs.h | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index a5b2c33df39..a11575f563b 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -91,7 +91,7 @@ class HdfsAnyFileImpl { // Private implementation for read-only files class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { public: - HdfsReadableFileImpl(MemoryPool* pool) + explicit HdfsReadableFileImpl(MemoryPool* pool) : pool_(pool) {} Status Close() { diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h index c860e1baa94..4a4e3ec5f51 100644 --- a/cpp/src/arrow/io/hdfs.h +++ b/cpp/src/arrow/io/hdfs.h @@ -183,12 +183,13 @@ class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface { void set_memory_pool(MemoryPool* pool); private: + explicit HdfsReadableFile(MemoryPool* pool = nullptr); + class ARROW_NO_EXPORT HdfsReadableFileImpl; std::unique_ptr impl_; friend class HdfsClient::HdfsClientImpl; - HdfsReadableFile(MemoryPool* pool = nullptr); DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile); }; From e79189385095fde342dc7e872a947810e2ff01ce Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 2 Oct 2016 21:43:46 -0400 Subject: [PATCH 09/11] Python 2.7 fix Change-Id: Icd397820eb6d7d9aa36d617c3f50900ec7d04030 --- python/pyarrow/tests/test_io.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py index c188a413363..9a41ebe3e8c 100644 --- a/python/pyarrow/tests/test_io.py +++ b/python/pyarrow/tests/test_io.py @@ -18,6 +18,7 @@ from io import BytesIO import pytest +from pyarrow.compat import u import pyarrow.io as io # ---------------------------------------------------------------------- @@ -96,4 +97,4 @@ def test_bytes_reader(): def test_bytes_reader_non_bytes(): with pytest.raises(ValueError): - io.BytesReader('some sample data') + io.BytesReader(u('some sample data')) From 316b8451f9544d4534f0d0ff14ed66df194fae82 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 3 Oct 2016 16:32:14 -0400 Subject: [PATCH 10/11] Code review comments Change-Id: Ib67ce254d8a2aeff3df12e8d57525f3383a3c344 --- cpp/CMakeLists.txt | 2 ++ cpp/src/arrow/io/hdfs.cc | 17 +++++------------ cpp/src/arrow/io/interfaces.h | 3 +-- cpp/src/arrow/io/memory.cc | 2 +- python/src/pyarrow/io.cc | 2 +- python/src/pyarrow/io.h | 6 +++--- 6 files changed, 13 insertions(+), 19 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index d65c7153196..f70c8ab4bcc 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -166,6 +166,8 @@ else() message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}") endif () +message(STATUS "Build Type: ${CMAKE_BUILD_TYPE}") + # Add common flags set(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}") diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc index a11575f563b..b74f84604f1 100644 --- a/cpp/src/arrow/io/hdfs.cc +++ b/cpp/src/arrow/io/hdfs.cc @@ -91,8 +91,7 @@ class HdfsAnyFileImpl { // Private implementation for read-only files class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { public: - explicit HdfsReadableFileImpl(MemoryPool* pool) - : pool_(pool) {} + explicit HdfsReadableFileImpl(MemoryPool* pool) : pool_(pool) {} Status Close() { if (is_open_) { @@ -118,8 +117,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { int64_t bytes_read = 0; RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data())); - // XXX: heuristic - if (bytes_read < nbytes / 2) { RETURN_NOT_OK(buffer->Resize(bytes_read)); } + if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); } *out = buffer; return Status::OK(); @@ -139,8 +137,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { int64_t bytes_read = 0; RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); - // XXX: heuristic - if (bytes_read < nbytes / 2) { RETURN_NOT_OK(buffer->Resize(bytes_read)); } + if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); } *out = buffer; return Status::OK(); @@ -155,18 +152,14 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl { return Status::OK(); } - void set_memory_pool(MemoryPool* pool) { - pool_ = pool; - } + void set_memory_pool(MemoryPool* pool) { pool_ = pool; } private: MemoryPool* pool_; }; HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) { - if (pool == nullptr) { - pool = default_memory_pool(); - } + if (pool == nullptr) { pool = default_memory_pool(); } impl_.reset(new HdfsReadableFileImpl(pool)); } diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index e8114fe14ea..db0c059c6e2 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -101,8 +101,7 @@ class ARROW_EXPORT ReadableFileInterface : public InputStream, public Seekable { virtual Status ReadAt( int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out); - virtual Status ReadAt( - int64_t position, int64_t nbytes, std::shared_ptr* out); + virtual Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out); protected: ReadableFileInterface(); diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 2834cffc9f3..7d6e02e25b4 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -241,7 +241,7 @@ Status BufferOutputStream::Reserve(int64_t nbytes) { // In-memory buffer reader BufferReader::BufferReader(const uint8_t* buffer, int buffer_size) - : buffer_(buffer), buffer_size_(buffer_size), position_(0) {} + : buffer_(buffer), buffer_size_(buffer_size), position_(0) {} BufferReader::~BufferReader() {} diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc index 8c7af40e384..35054e9025a 100644 --- a/python/src/pyarrow/io.cc +++ b/python/src/pyarrow/io.cc @@ -17,8 +17,8 @@ #include "pyarrow/io.h" +#include #include -#include #include #include diff --git a/python/src/pyarrow/io.h b/python/src/pyarrow/io.h index 4b1a18a9bc4..e14aa8cfb27 100644 --- a/python/src/pyarrow/io.h +++ b/python/src/pyarrow/io.h @@ -48,7 +48,7 @@ class PythonFile { class PYARROW_EXPORT PyReadableFile : public arrow::io::ReadableFileInterface { public: explicit PyReadableFile(PyObject* file); - ~PyReadableFile(); + virtual ~PyReadableFile(); arrow::Status Close() override; @@ -70,7 +70,7 @@ class PYARROW_EXPORT PyReadableFile : public arrow::io::ReadableFileInterface { class PYARROW_EXPORT PyOutputStream : public arrow::io::OutputStream { public: explicit PyOutputStream(PyObject* file); - ~PyOutputStream(); + virtual ~PyOutputStream(); arrow::Status Close() override; arrow::Status Tell(int64_t* position) override; @@ -84,7 +84,7 @@ class PYARROW_EXPORT PyOutputStream : public arrow::io::OutputStream { class PYARROW_EXPORT PyBytesReader : public arrow::io::BufferReader { public: explicit PyBytesReader(PyObject* obj); - ~PyBytesReader(); + virtual ~PyBytesReader(); private: PyObject* obj_; From 2de9f97fd7b97eeadb7df234efdedb9ee58be277 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 3 Oct 2016 16:44:29 -0400 Subject: [PATCH 11/11] Fix compiler warning / bug from OS X Change-Id: I498fb6e26642065c32c8425e5e6564bfb7b75c78 --- python/src/pyarrow/adapters/pandas.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index a4e7fb6f3bb..d224074d652 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -618,7 +618,7 @@ class ArrowDeserializer { Status OutputFromData(int type, void* data) { // Zero-Copy. We can pass the data pointer directly to NumPy. Py_INCREF(py_ref_); - OwnedRef py_ref(py_ref); + OwnedRef py_ref(py_ref_); npy_intp dims[1] = {col_->length()}; out_ = reinterpret_cast(PyArray_SimpleNewFromData(1, dims, type, data));