Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cpp/src/arrow/io/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,17 @@ FileOutputStream::~FileOutputStream() {
DCHECK(impl_->Close().ok());
}

Status FileOutputStream::Open(const std::string& path,
std::shared_ptr<OutputStream>* file) {
return Open(path, false, file);
}

Status FileOutputStream::Open(const std::string& path, bool append,
std::shared_ptr<OutputStream>* out) {
*out = std::shared_ptr<FileOutputStream>(new FileOutputStream());
return std::static_pointer_cast<FileOutputStream>(*out)->impl_->Open(path, append);
}

Status FileOutputStream::Open(const std::string& path,
std::shared_ptr<FileOutputStream>* file) {
return Open(path, false, file);
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/arrow/io/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
public:
~FileOutputStream() override;

/// \brief Open a local file for writing, truncating any existing file
/// \param[in] path with UTF8 encoding
/// \param[out] out a base interface OutputStream instance
///
/// When opening a new file, any existing file with the indicated path is
/// truncated to 0 bytes, deleting any existing memory
static Status Open(const std::string& path, std::shared_ptr<OutputStream>* out);

/// \brief Open a local file for writing
/// \param[in] path with UTF8 encoding
/// \param[in] append append to existing file, otherwise truncate to 0 bytes
/// \param[out] out a base interface OutputStream instance
static Status Open(const std::string& path, bool append,
std::shared_ptr<OutputStream>* out);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some unit tests for these. We should also change the Python bindings to use these APIs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change the Python bindings in this pull request?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit here to make the change, so we're good now


/// \brief Open a local file for writing, truncating any existing file
/// \param[in] path with UTF8 encoding
/// \param[out] file a FileOutputStream instance
Expand Down
46 changes: 44 additions & 2 deletions cpp/src/arrow/io/io-file-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ class TestFileOutputStream : public FileTestFixture {
public:
void OpenFile(bool append = false) {
ASSERT_OK(FileOutputStream::Open(path_, append, &file_));
ASSERT_OK(FileOutputStream::Open(path_, append, &stream_));
}

protected:
std::shared_ptr<FileOutputStream> file_;
std::shared_ptr<OutputStream> stream_;
};

#if defined(_MSC_VER)
Expand All @@ -116,6 +118,9 @@ TEST_F(TestFileOutputStream, FileNameWideCharConversionRangeException) {
std::string file_name = "\x80";
ASSERT_RAISES(Invalid, FileOutputStream::Open(file_name, &file));

std::shared_ptr<OutputStream> stream;
ASSERT_RAISES(Invalid, FileOutputStream::Open(file_name, &stream));

std::shared_ptr<ReadableFile> rd_file;
ASSERT_RAISES(Invalid, ReadableFile::Open(file_name, &rd_file));
}
Expand All @@ -129,6 +134,12 @@ TEST_F(TestFileOutputStream, DestructorClosesFile) {
fd = file->file_descriptor();
}
ASSERT_TRUE(FileIsClosed(fd));
{
std::shared_ptr<OutputStream> stream;
ASSERT_OK(FileOutputStream::Open(path_, &stream));
fd = std::static_pointer_cast<FileOutputStream>(stream)->file_descriptor();
}
ASSERT_TRUE(FileIsClosed(fd));
}

TEST_F(TestFileOutputStream, Close) {
Expand All @@ -139,7 +150,6 @@ TEST_F(TestFileOutputStream, Close) {

int fd = file_->file_descriptor();
ASSERT_OK(file_->Close());

ASSERT_TRUE(FileIsClosed(fd));

// Idempotent
Expand All @@ -151,6 +161,19 @@ TEST_F(TestFileOutputStream, Close) {
int64_t size = 0;
ASSERT_OK(rd_file->GetSize(&size));
ASSERT_EQ(strlen(data), size);

ASSERT_OK(stream_->Write(data, strlen(data)));

fd = std::static_pointer_cast<FileOutputStream>(stream_)->file_descriptor();
ASSERT_OK(stream_->Close());
ASSERT_TRUE(FileIsClosed(fd));

// Idempotent
ASSERT_OK(stream_->Close());

ASSERT_OK(ReadableFile::Open(path_, &rd_file));
ASSERT_OK(rd_file->GetSize(&size));
ASSERT_EQ(strlen(data), size);
}

TEST_F(TestFileOutputStream, InvalidWrites) {
Expand All @@ -159,20 +182,27 @@ TEST_F(TestFileOutputStream, InvalidWrites) {
const char* data = "";

ASSERT_RAISES(IOError, file_->Write(data, -1));
ASSERT_RAISES(IOError, stream_->Write(data, -1));
}

TEST_F(TestFileOutputStream, Tell) {
OpenFile();

int64_t position;

ASSERT_OK(file_->Tell(&position));
ASSERT_EQ(0, position);

const char* data = "testdata";
ASSERT_OK(file_->Write(data, 8));
ASSERT_OK(file_->Tell(&position));
ASSERT_EQ(8, position);

ASSERT_OK(stream_->Tell(&position));
ASSERT_EQ(0, position);

ASSERT_OK(stream_->Write(data, 8));
ASSERT_OK(stream_->Tell(&position));
ASSERT_EQ(8, position);
}

TEST_F(TestFileOutputStream, TruncatesNewFile) {
Expand All @@ -191,6 +221,18 @@ TEST_F(TestFileOutputStream, TruncatesNewFile) {
int64_t size;
ASSERT_OK(rd_file->GetSize(&size));
ASSERT_EQ(0, size);

ASSERT_OK(FileOutputStream::Open(path_, &stream_));

ASSERT_OK(stream_->Write(data, strlen(data)));
ASSERT_OK(stream_->Close());

ASSERT_OK(FileOutputStream::Open(path_, &stream_));
ASSERT_OK(stream_->Close());

ASSERT_OK(ReadableFile::Open(path_, &rd_file));
ASSERT_OK(rd_file->GetSize(&size));
ASSERT_EQ(0, size);
}

// ----------------------------------------------------------------------
Expand Down
4 changes: 1 addition & 3 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,6 @@ cdef class ParquetWriter:
use_deprecated_int96_timestamps=False,
coerce_timestamps=None):
cdef:
shared_ptr[FileOutputStream] filestream
shared_ptr[WriterProperties] properties
c_string c_where
CMemoryPool* pool
Expand All @@ -830,8 +829,7 @@ cdef class ParquetWriter:
c_where = tobytes(where)
with nogil:
check_status(FileOutputStream.Open(c_where,
&filestream))
self.sink = <shared_ptr[OutputStream]> filestream
&self.sink))
self.own_sink = True
else:
get_writer(where, &self.sink)
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:

cdef cppclass FileOutputStream(OutputStream):
@staticmethod
CStatus Open(const c_string& path, shared_ptr[FileOutputStream]* file)
CStatus Open(const c_string& path, shared_ptr[OutputStream]* file)

int file_descriptor()

Expand Down
5 changes: 1 addition & 4 deletions python/pyarrow/io.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -550,12 +550,9 @@ cdef class OSFile(NativeFile):
self.rd_file = <shared_ptr[RandomAccessFile]> handle

cdef _open_writable(self, c_string path):
cdef shared_ptr[FileOutputStream] handle

with nogil:
check_status(FileOutputStream.Open(path, &handle))
check_status(FileOutputStream.Open(path, &self.wr_file))
self.is_writable = True
self.wr_file = <shared_ptr[OutputStream]> handle


cdef class FixedSizeBufferWriter(NativeFile):
Expand Down