From f4e3b6a0c01d3ed464da5f02d3a8417bb144aed7 Mon Sep 17 00:00:00 2001 From: Panchen Xue Date: Wed, 21 Feb 2018 18:03:10 -0500 Subject: [PATCH 1/3] Add static constructor for FileOutputStream returning pointer to base OutputStream --- cpp/src/arrow/io/file.cc | 11 +++++++++++ cpp/src/arrow/io/file.h | 15 +++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc index 65a302c8bea..d44d90cbe35 100644 --- a/cpp/src/arrow/io/file.cc +++ b/cpp/src/arrow/io/file.cc @@ -554,6 +554,17 @@ FileOutputStream::~FileOutputStream() { DCHECK(impl_->Close().ok()); } +Status FileOutputStream::Open(const std::string& path, + std::shared_ptr* file) { + return Open(path, false, file); +} + +Status FileOutputStream::Open(const std::string& path, bool append, + std::shared_ptr* out) { + *out = std::shared_ptr(new FileOutputStream()); + return std::static_pointer_cast(*out)->impl_->Open(path, append); +} + Status FileOutputStream::Open(const std::string& path, std::shared_ptr* file) { return Open(path, false, file); diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h index 3d65834640d..a1f9edc1c66 100644 --- a/cpp/src/arrow/io/file.h +++ b/cpp/src/arrow/io/file.h @@ -39,6 +39,21 @@ class ARROW_EXPORT FileOutputStream : public OutputStream { public: ~FileOutputStream() override; + /// \brief Open a local file for writing, truncating any existing file + /// \param[in] path with UTF8 encoding + /// \param[out] out a base interface OutputStream instance + /// + /// When opening a new file, any existing file with the indicated path is + /// truncated to 0 bytes, deleting any existing memory + static Status Open(const std::string& path, std::shared_ptr* out); + + /// \brief Open a local file for writing + /// \param[in] path with UTF8 encoding + /// \param[in] append append to existing file, otherwise truncate to 0 bytes + /// \param[out] out a base interface OutputStream instance + static Status Open(const std::string& path, bool append, + std::shared_ptr* out); + /// \brief Open a local file for writing, truncating any existing file /// \param[in] path with UTF8 encoding /// \param[out] file a FileOutputStream instance From d0930627ee8a34daffa958b2348cc42c405052fa Mon Sep 17 00:00:00 2001 From: Panchen Xue Date: Fri, 23 Feb 2018 16:43:53 -0500 Subject: [PATCH 2/3] Add test cases --- cpp/src/arrow/io/io-file-test.cc | 46 ++++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc index 7a7f3963682..2a4acab59db 100644 --- a/cpp/src/arrow/io/io-file-test.cc +++ b/cpp/src/arrow/io/io-file-test.cc @@ -103,10 +103,12 @@ class TestFileOutputStream : public FileTestFixture { public: void OpenFile(bool append = false) { ASSERT_OK(FileOutputStream::Open(path_, append, &file_)); + ASSERT_OK(FileOutputStream::Open(path_, append, &stream_)); } protected: std::shared_ptr file_; + std::shared_ptr stream_; }; #if defined(_MSC_VER) @@ -116,6 +118,9 @@ TEST_F(TestFileOutputStream, FileNameWideCharConversionRangeException) { std::string file_name = "\x80"; ASSERT_RAISES(Invalid, FileOutputStream::Open(file_name, &file)); + std::shared_ptr stream; + ASSERT_RAISES(Invalid, FileOutputStream::Open(file_name, &stream)); + std::shared_ptr rd_file; ASSERT_RAISES(Invalid, ReadableFile::Open(file_name, &rd_file)); } @@ -129,6 +134,12 @@ TEST_F(TestFileOutputStream, DestructorClosesFile) { fd = file->file_descriptor(); } ASSERT_TRUE(FileIsClosed(fd)); + { + std::shared_ptr stream; + ASSERT_OK(FileOutputStream::Open(path_, &stream)); + fd = std::static_pointer_cast(stream)->file_descriptor(); + } + ASSERT_TRUE(FileIsClosed(fd)); } TEST_F(TestFileOutputStream, Close) { @@ -139,7 +150,6 @@ TEST_F(TestFileOutputStream, Close) { int fd = file_->file_descriptor(); ASSERT_OK(file_->Close()); - ASSERT_TRUE(FileIsClosed(fd)); // Idempotent @@ -151,6 +161,19 @@ TEST_F(TestFileOutputStream, Close) { int64_t size = 0; ASSERT_OK(rd_file->GetSize(&size)); ASSERT_EQ(strlen(data), size); + + ASSERT_OK(stream_->Write(data, strlen(data))); + + fd = std::static_pointer_cast(stream_)->file_descriptor(); + ASSERT_OK(stream_->Close()); + ASSERT_TRUE(FileIsClosed(fd)); + + // Idempotent + ASSERT_OK(stream_->Close()); + + ASSERT_OK(ReadableFile::Open(path_, &rd_file)); + ASSERT_OK(rd_file->GetSize(&size)); + ASSERT_EQ(strlen(data), size); } TEST_F(TestFileOutputStream, InvalidWrites) { @@ -159,13 +182,13 @@ TEST_F(TestFileOutputStream, InvalidWrites) { const char* data = ""; ASSERT_RAISES(IOError, file_->Write(data, -1)); + ASSERT_RAISES(IOError, stream_->Write(data, -1)); } TEST_F(TestFileOutputStream, Tell) { OpenFile(); int64_t position; - ASSERT_OK(file_->Tell(&position)); ASSERT_EQ(0, position); @@ -173,6 +196,13 @@ TEST_F(TestFileOutputStream, Tell) { ASSERT_OK(file_->Write(data, 8)); ASSERT_OK(file_->Tell(&position)); ASSERT_EQ(8, position); + + ASSERT_OK(stream_->Tell(&position)); + ASSERT_EQ(0, position); + + ASSERT_OK(stream_->Write(data, 8)); + ASSERT_OK(stream_->Tell(&position)); + ASSERT_EQ(8, position); } TEST_F(TestFileOutputStream, TruncatesNewFile) { @@ -191,6 +221,18 @@ TEST_F(TestFileOutputStream, TruncatesNewFile) { int64_t size; ASSERT_OK(rd_file->GetSize(&size)); ASSERT_EQ(0, size); + + ASSERT_OK(FileOutputStream::Open(path_, &stream_)); + + ASSERT_OK(stream_->Write(data, strlen(data))); + ASSERT_OK(stream_->Close()); + + ASSERT_OK(FileOutputStream::Open(path_, &stream_)); + ASSERT_OK(stream_->Close()); + + ASSERT_OK(ReadableFile::Open(path_, &rd_file)); + ASSERT_OK(rd_file->GetSize(&size)); + ASSERT_EQ(0, size); } // ---------------------------------------------------------------------- From 43dc4834c9930f8e9b48abcbe8405f0432d7b617 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 24 Feb 2018 11:30:17 -0500 Subject: [PATCH 3/3] Update Python bindings to use new FileOutputStream API Change-Id: I4fe462e126292de1a954ca735cb2aaf02ead0e53 --- python/pyarrow/_parquet.pyx | 4 +--- python/pyarrow/includes/libarrow.pxd | 2 +- python/pyarrow/io.pxi | 5 +---- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx index 147af217579..9061ed53dac 100644 --- a/python/pyarrow/_parquet.pyx +++ b/python/pyarrow/_parquet.pyx @@ -821,7 +821,6 @@ cdef class ParquetWriter: use_deprecated_int96_timestamps=False, coerce_timestamps=None): cdef: - shared_ptr[FileOutputStream] filestream shared_ptr[WriterProperties] properties c_string c_where CMemoryPool* pool @@ -830,8 +829,7 @@ cdef class ParquetWriter: c_where = tobytes(where) with nogil: check_status(FileOutputStream.Open(c_where, - &filestream)) - self.sink = filestream + &self.sink)) self.own_sink = True else: get_writer(where, &self.sink) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 81cc4d2761e..8da126aafd5 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -539,7 +539,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: cdef cppclass FileOutputStream(OutputStream): @staticmethod - CStatus Open(const c_string& path, shared_ptr[FileOutputStream]* file) + CStatus Open(const c_string& path, shared_ptr[OutputStream]* file) int file_descriptor() diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index aa2f7ed07a2..8b364dc7163 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -550,12 +550,9 @@ cdef class OSFile(NativeFile): self.rd_file = handle cdef _open_writable(self, c_string path): - cdef shared_ptr[FileOutputStream] handle - with nogil: - check_status(FileOutputStream.Open(path, &handle)) + check_status(FileOutputStream.Open(path, &self.wr_file)) self.is_writable = True - self.wr_file = handle cdef class FixedSizeBufferWriter(NativeFile):