From 07f49b43de45b74f5ce93725f6f6e9996d386616 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 8 Jun 2022 15:29:17 +0200 Subject: [PATCH 1/3] ARROW-16799: [C++] Create a self-pipe abstraction. Also create a FileDescriptor RAII wrapper to automate the chore of closing file descriptors, and make it more robust. --- cpp/src/arrow/filesystem/localfs.cc | 7 +- cpp/src/arrow/filesystem/localfs_test.cc | 6 +- cpp/src/arrow/flight/server.cc | 89 ++---- cpp/src/arrow/io/file.cc | 62 ++-- cpp/src/arrow/io/file_benchmark.cc | 33 +- cpp/src/arrow/io/file_test.cc | 72 ++--- cpp/src/arrow/io/test_common.cc | 37 +-- cpp/src/arrow/io/test_common.h | 2 - cpp/src/arrow/testing/gtest_util.cc | 40 ++- cpp/src/arrow/testing/gtest_util.h | 2 + cpp/src/arrow/util/io_util.cc | 389 +++++++++++++++++------ cpp/src/arrow/util/io_util.h | 68 +++- cpp/src/arrow/util/io_util_test.cc | 357 +++++++++++++++++++-- 13 files changed, 819 insertions(+), 345 deletions(-) diff --git a/cpp/src/arrow/filesystem/localfs.cc b/cpp/src/arrow/filesystem/localfs.cc index e4595491093..889775d7250 100644 --- a/cpp/src/arrow/filesystem/localfs.cc +++ b/cpp/src/arrow/filesystem/localfs.cc @@ -439,10 +439,11 @@ Result> OpenOutputStreamGeneric(const std::str ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path)); const bool write_only = true; ARROW_ASSIGN_OR_RAISE( - int fd, ::arrow::internal::FileOpenWritable(fn, write_only, truncate, append)); - auto maybe_stream = io::FileOutputStream::Open(fd); + auto fd, ::arrow::internal::FileOpenWritable(fn, write_only, truncate, append)); + int raw_fd = fd.Detach(); + auto maybe_stream = io::FileOutputStream::Open(raw_fd); if (!maybe_stream.ok()) { - ARROW_UNUSED(::arrow::internal::FileClose(fd)); + ARROW_UNUSED(::arrow::internal::FileClose(raw_fd)); } return maybe_stream; } diff --git a/cpp/src/arrow/filesystem/localfs_test.cc b/cpp/src/arrow/filesystem/localfs_test.cc index 795c476c3db..748c832ddd4 100644 --- a/cpp/src/arrow/filesystem/localfs_test.cc +++ b/cpp/src/arrow/filesystem/localfs_test.cc @@ -36,6 +36,7 @@ namespace arrow { namespace fs { namespace internal { +using ::arrow::internal::FileDescriptor; using ::arrow::internal::PlatformFilename; using ::arrow::internal::TemporaryDir; @@ -237,9 +238,8 @@ class TestLocalFS : public LocalFSTestMixin { void CheckConcreteFile(const std::string& path, int64_t expected_size) { ASSERT_OK_AND_ASSIGN(auto fn, PlatformFilename::FromString(path)); - ASSERT_OK_AND_ASSIGN(int fd, ::arrow::internal::FileOpenReadable(fn)); - auto result = ::arrow::internal::FileGetSize(fd); - ASSERT_OK(::arrow::internal::FileClose(fd)); + ASSERT_OK_AND_ASSIGN(FileDescriptor fd, ::arrow::internal::FileOpenReadable(fn)); + auto result = ::arrow::internal::FileGetSize(fd.fd()); ASSERT_OK_AND_ASSIGN(int64_t size, result); ASSERT_EQ(size, expected_size); } diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc index c64bd1c98e3..ae15585621f 100644 --- a/cpp/src/arrow/flight/server.cc +++ b/cpp/src/arrow/flight/server.cc @@ -23,14 +23,6 @@ #include "arrow/flight/server.h" -#ifdef _WIN32 -#include "arrow/util/windows_compatibility.h" - -#include -#else -#include -#include -#endif #include #include #include @@ -52,22 +44,16 @@ namespace arrow { namespace flight { + namespace { #if (ATOMIC_INT_LOCK_FREE != 2 || ATOMIC_POINTER_LOCK_FREE != 2) #error "atomic ints and atomic pointers not always lock-free!" #endif +using ::arrow::internal::SelfPipe; using ::arrow::internal::SetSignalHandler; using ::arrow::internal::SignalHandler; -#ifdef WIN32 -#define PIPE_WRITE _write -#define PIPE_READ _read -#else -#define PIPE_WRITE write -#define PIPE_READ read -#endif - /// RAII guard that manages a self-pipe and a thread that listens on /// the self-pipe, shutting down the server when a signal handler /// writes to the pipe. @@ -80,51 +66,22 @@ class ServerSignalHandler { /// /// \return the fd of the write side of the pipe. template - arrow::Result Init(Fn handler) { - ARROW_ASSIGN_OR_RAISE(auto pipe, arrow::internal::CreatePipe()); -#ifndef WIN32 - // Make write end nonblocking - int flags = fcntl(pipe.wfd, F_GETFL); - if (flags == -1) { - RETURN_NOT_OK(arrow::internal::FileClose(pipe.rfd)); - RETURN_NOT_OK(arrow::internal::FileClose(pipe.wfd)); - return arrow::internal::IOErrorFromErrno( - errno, "Could not initialize self-pipe to wait for signals"); - } - flags |= O_NONBLOCK; - if (fcntl(pipe.wfd, F_SETFL, flags) == -1) { - RETURN_NOT_OK(arrow::internal::FileClose(pipe.rfd)); - RETURN_NOT_OK(arrow::internal::FileClose(pipe.wfd)); - return arrow::internal::IOErrorFromErrno( - errno, "Could not initialize self-pipe to wait for signals"); - } -#endif - self_pipe_ = pipe; - handle_signals_ = std::thread(handler, self_pipe_.rfd); - return self_pipe_.wfd; + arrow::Result> Init(Fn handler) { + ARROW_ASSIGN_OR_RAISE(self_pipe_, SelfPipe::Make(/*signal_safe=*/true)); + handle_signals_ = std::thread(handler, self_pipe_); + return self_pipe_; } Status Shutdown() { - if (self_pipe_.rfd == 0) { - // Already closed - return Status::OK(); - } - if (PIPE_WRITE(self_pipe_.wfd, "0", 1) < 0 && errno != EAGAIN && - errno != EWOULDBLOCK && errno != EINTR) { - return arrow::internal::IOErrorFromErrno(errno, "Could not unblock signal thread"); - } + RETURN_NOT_OK(self_pipe_->Shutdown()); handle_signals_.join(); - RETURN_NOT_OK(arrow::internal::FileClose(self_pipe_.rfd)); - RETURN_NOT_OK(arrow::internal::FileClose(self_pipe_.wfd)); - self_pipe_.rfd = 0; - self_pipe_.wfd = 0; return Status::OK(); } ~ServerSignalHandler() { ARROW_CHECK_OK(Shutdown()); } private: - arrow::internal::Pipe self_pipe_; + std::shared_ptr self_pipe_; std::thread handle_signals_; }; } // namespace @@ -140,7 +97,7 @@ struct FlightServerBase::Impl { static std::atomic running_instance_; // We'll use the self-pipe trick to notify a thread from the signal // handler. The thread will then shut down the server. - int self_pipe_wfd_; + std::shared_ptr self_pipe_; // Signal handling std::vector signals_; @@ -156,24 +113,17 @@ struct FlightServerBase::Impl { void DoHandleSignal(int signum) { got_signal_ = signum; - int saved_errno = errno; - if (PIPE_WRITE(self_pipe_wfd_, "0", 1) < 0) { - // Can't do much here, though, pipe is nonblocking so hopefully this doesn't happen - ARROW_LOG(WARNING) << "FlightServerBase: failed to handle signal " << signum - << " errno: " << errno; - } - errno = saved_errno; + + // Send dummy payload over self-pipe + self_pipe_->Send(/*payload=*/0); } - static void WaitForSignals(int fd) { - // Wait for a signal handler to write to the pipe - int8_t buf[1]; - while (PIPE_READ(fd, /*buf=*/buf, /*count=*/1) == -1) { - if (errno == EINTR) { - continue; - } - ARROW_CHECK_OK(arrow::internal::IOErrorFromErrno( - errno, "Error while waiting for shutdown signal")); + static void WaitForSignals(std::shared_ptr self_pipe) { + // Wait for a signal handler to wake up the pipe + auto st = self_pipe->Wait().status(); + // Status::Invalid means the pipe was shutdown without any wakeup + if (!st.ok() && !st.IsInvalid()) { + ARROW_LOG(FATAL) << "Failed to wait on self-pipe: " << st.ToString(); } auto instance = running_instance_.load(); if (instance != nullptr) { @@ -232,8 +182,7 @@ Status FlightServerBase::Serve() { impl_->running_instance_ = impl_.get(); ServerSignalHandler signal_handler; - ARROW_ASSIGN_OR_RAISE(impl_->self_pipe_wfd_, - signal_handler.Init(&Impl::WaitForSignals)); + ARROW_ASSIGN_OR_RAISE(impl_->self_pipe_, signal_handler.Init(&Impl::WaitForSignals)); // Override existing signal handlers with our own handler so as to stop the server. for (size_t i = 0; i < impl_->signals_.size(); ++i) { int signum = impl_->signals_[i]; diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc index 55bb3bc392f..e57f93ad96e 100644 --- a/cpp/src/arrow/io/file.cc +++ b/cpp/src/arrow/io/file.cc @@ -58,16 +58,13 @@ namespace arrow { +using internal::FileDescriptor; using internal::IOErrorFromErrno; namespace io { class OSFile { public: - OSFile() : fd_(-1), is_open_(false), size_(-1), need_seeking_(false) {} - - ~OSFile() {} - // Note: only one of the Open* methods below may be called on a given instance Status OpenWritable(const std::string& path, bool truncate, bool append, @@ -76,11 +73,10 @@ class OSFile { ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenWritable(file_name_, write_only, truncate, append)); - is_open_ = true; mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE; if (!truncate) { - ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_)); + ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_.fd())); } else { size_ = 0; } @@ -98,9 +94,8 @@ class OSFile { size_ = -1; } RETURN_NOT_OK(SetFileName(fd)); - is_open_ = true; mode_ = FileMode::WRITE; - fd_ = fd; + fd_ = FileDescriptor(fd); return Status::OK(); } @@ -108,9 +103,8 @@ class OSFile { RETURN_NOT_OK(SetFileName(path)); ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenReadable(file_name_)); - ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_)); + ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_.fd())); - is_open_ = true; mode_ = FileMode::READ; return Status::OK(); } @@ -118,35 +112,24 @@ class OSFile { Status OpenReadable(int fd) { ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd)); RETURN_NOT_OK(SetFileName(fd)); - is_open_ = true; mode_ = FileMode::READ; - fd_ = fd; + fd_ = FileDescriptor(fd); return Status::OK(); } Status CheckClosed() const { - if (!is_open_) { + if (fd_.closed()) { return Status::Invalid("Invalid operation on closed file"); } return Status::OK(); } - Status Close() { - if (is_open_) { - // Even if closing fails, the fd will likely be closed (perhaps it's - // already closed). - is_open_ = false; - int fd = fd_; - fd_ = -1; - RETURN_NOT_OK(::arrow::internal::FileClose(fd)); - } - return Status::OK(); - } + Status Close() { return fd_.Close(); } Result Read(int64_t nbytes, void* out) { RETURN_NOT_OK(CheckClosed()); RETURN_NOT_OK(CheckPositioned()); - return ::arrow::internal::FileRead(fd_, reinterpret_cast(out), nbytes); + return ::arrow::internal::FileRead(fd_.fd(), reinterpret_cast(out), nbytes); } Result ReadAt(int64_t position, int64_t nbytes, void* out) { @@ -155,8 +138,8 @@ class OSFile { // ReadAt() leaves the file position undefined, so require that we seek // before calling Read() or Write(). need_seeking_.store(true); - return ::arrow::internal::FileReadAt(fd_, reinterpret_cast(out), position, - nbytes); + return ::arrow::internal::FileReadAt(fd_.fd(), reinterpret_cast(out), + position, nbytes); } Status Seek(int64_t pos) { @@ -164,7 +147,7 @@ class OSFile { if (pos < 0) { return Status::Invalid("Invalid position"); } - Status st = ::arrow::internal::FileSeek(fd_, pos); + Status st = ::arrow::internal::FileSeek(fd_.fd(), pos); if (st.ok()) { need_seeking_.store(false); } @@ -173,7 +156,7 @@ class OSFile { Result Tell() const { RETURN_NOT_OK(CheckClosed()); - return ::arrow::internal::FileTell(fd_); + return ::arrow::internal::FileTell(fd_.fd()); } Status Write(const void* data, int64_t length) { @@ -184,13 +167,13 @@ class OSFile { if (length < 0) { return Status::IOError("Length must be non-negative"); } - return ::arrow::internal::FileWrite(fd_, reinterpret_cast(data), + return ::arrow::internal::FileWrite(fd_.fd(), reinterpret_cast(data), length); } - int fd() const { return fd_; } + int fd() const { return fd_.fd(); } - bool is_open() const { return is_open_; } + bool is_open() const { return !fd_.closed(); } int64_t size() const { return size_; } @@ -221,16 +204,11 @@ class OSFile { ::arrow::internal::PlatformFilename file_name_; std::mutex lock_; - - // File descriptor - int fd_; - + FileDescriptor fd_; FileMode::type mode_; - - bool is_open_; - int64_t size_; + int64_t size_{-1}; // Whether ReadAt made the file position non-deterministic. - std::atomic need_seeking_; + std::atomic need_seeking_{false}; }; // ---------------------------------------------------------------------- @@ -287,7 +265,7 @@ class ReadableFile::ReadableFileImpl : public OSFile { for (const auto& range : ranges) { RETURN_NOT_OK(internal::ValidateRange(range.offset, range.length)); #if defined(POSIX_FADV_WILLNEED) - int ret = posix_fadvise(fd_, range.offset, range.length, POSIX_FADV_WILLNEED); + int ret = posix_fadvise(fd_.fd(), range.offset, range.length, POSIX_FADV_WILLNEED); if (ret) { RETURN_NOT_OK(report_error(ret, "posix_fadvise failed")); } @@ -296,7 +274,7 @@ class ReadableFile::ReadableFileImpl : public OSFile { off_t ra_offset; int ra_count; } radvisory{range.offset, static_cast(range.length)}; - if (radvisory.ra_count > 0 && fcntl(fd_, F_RDADVISE, &radvisory) == -1) { + if (radvisory.ra_count > 0 && fcntl(fd_.fd(), F_RDADVISE, &radvisory) == -1) { RETURN_NOT_OK(report_error(errno, "fcntl(fd, F_RDADVISE, ...) failed")); } #else diff --git a/cpp/src/arrow/io/file_benchmark.cc b/cpp/src/arrow/io/file_benchmark.cc index b8e8ee5a269..7fd10a0a0e6 100644 --- a/cpp/src/arrow/io/file_benchmark.cc +++ b/cpp/src/arrow/io/file_benchmark.cc @@ -45,6 +45,9 @@ namespace arrow { +using internal::FileDescriptor; +using internal::Pipe; + std::string GetNullFile() { #ifdef _WIN32 return "NUL"; @@ -128,31 +131,24 @@ class BackgroundReader { } void Stop() { const uint8_t data[] = "x"; - ABORT_NOT_OK(internal::FileWrite(wakeup_w_, data, 1)); + ABORT_NOT_OK(internal::FileWrite(wakeup_pipe_.wfd.fd(), data, 1)); } void Join() { worker_->join(); } - ~BackgroundReader() { - for (int fd : {fd_, wakeup_r_, wakeup_w_}) { - ABORT_NOT_OK(internal::FileClose(fd)); - } - } - protected: - explicit BackgroundReader(int fd) : fd_(fd), total_bytes_(0) { - // Prepare self-pipe trick - auto pipe = *internal::CreatePipe(); - wakeup_r_ = pipe.rfd; - wakeup_w_ = pipe.wfd; + explicit BackgroundReader(int fd) + : fd_(fd), wakeup_pipe_(*internal::CreatePipe()), total_bytes_(0) { // Put fd in non-blocking mode fcntl(fd, F_SETFL, O_NONBLOCK); + // Note the wakeup pipe itself does not need to be non-blocking, + // since we're not actually reading from it. } void LoopReading() { struct pollfd pollfds[2]; - pollfds[0].fd = fd_; + pollfds[0].fd = fd_.fd(); pollfds[0].events = POLLIN; - pollfds[1].fd = wakeup_r_; + pollfds[1].fd = wakeup_pipe_.rfd.fd(); pollfds[1].events = POLLIN; while (true) { int ret = poll(pollfds, 2, -1 /* timeout */); @@ -167,7 +163,7 @@ class BackgroundReader { if (!(pollfds[0].revents & POLLIN)) { continue; } - auto result = internal::FileRead(fd_, buffer_, buffer_size_); + auto result = internal::FileRead(fd_.fd(), buffer_, buffer_size_); // There could be a spurious wakeup followed by EAGAIN if (result.ok()) { total_bytes_ += *result; @@ -175,7 +171,8 @@ class BackgroundReader { } } - int fd_, wakeup_r_, wakeup_w_; + FileDescriptor fd_; + Pipe wakeup_pipe_; int64_t total_bytes_; static const int64_t buffer_size_ = 16384; @@ -191,8 +188,8 @@ class BackgroundReader { static void SetupPipeWriter(std::shared_ptr* stream, std::shared_ptr* reader) { auto pipe = *internal::CreatePipe(); - *stream = *io::FileOutputStream::Open(pipe.wfd); - *reader = BackgroundReader::StartReader(pipe.rfd); + *stream = *io::FileOutputStream::Open(pipe.wfd.Detach()); + *reader = BackgroundReader::StartReader(pipe.rfd.Detach()); } static void BenchmarkStreamingWrites(benchmark::State& state, diff --git a/cpp/src/arrow/io/file_test.cc b/cpp/src/arrow/io/file_test.cc index 7d3d1c621ce..8165c9c0b49 100644 --- a/cpp/src/arrow/io/file_test.cc +++ b/cpp/src/arrow/io/file_test.cc @@ -48,6 +48,7 @@ namespace arrow { using internal::CreatePipe; using internal::FileClose; +using internal::FileDescriptor; using internal::FileGetSize; using internal::FileOpenReadable; using internal::FileOpenWritable; @@ -93,11 +94,11 @@ class TestFileOutputStream : public FileTestFixture { } void OpenFileDescriptor() { - int fd_file; ASSERT_OK_AND_ASSIGN(auto file_name, PlatformFilename::FromString(path_)); - ASSERT_OK_AND_ASSIGN(fd_file, FileOpenWritable(file_name, true /* write_only */, - false /* truncate */)); - ASSERT_OK_AND_ASSIGN(file_, FileOutputStream::Open(fd_file)); + ASSERT_OK_AND_ASSIGN( + FileDescriptor fd, + FileOpenWritable(file_name, true /* write_only */, false /* truncate */)); + ASSERT_OK_AND_ASSIGN(file_, FileOutputStream::Open(fd.Detach())); } protected: @@ -155,18 +156,20 @@ TEST_F(TestFileOutputStream, FromFileDescriptor) { std::string data1 = "test"; ASSERT_OK(file_->Write(data1.data(), data1.size())); - int fd = file_->file_descriptor(); + int raw_fd = file_->file_descriptor(); ASSERT_OK(file_->Close()); - ASSERT_TRUE(FileIsClosed(fd)); + ASSERT_TRUE(FileIsClosed(raw_fd)); AssertFileContents(path_, data1); // Re-open at end of file ASSERT_OK_AND_ASSIGN(auto file_name, PlatformFilename::FromString(path_)); ASSERT_OK_AND_ASSIGN( - fd, FileOpenWritable(file_name, true /* write_only */, false /* truncate */)); - ASSERT_OK(FileSeek(fd, 0, SEEK_END)); - ASSERT_OK_AND_ASSIGN(file_, FileOutputStream::Open(fd)); + FileDescriptor fd, + FileOpenWritable(file_name, true /* write_only */, false /* truncate */)); + raw_fd = fd.Detach(); + ASSERT_OK(FileSeek(raw_fd, 0, SEEK_END)); + ASSERT_OK_AND_ASSIGN(file_, FileOutputStream::Open(raw_fd)); std::string data2 = "data"; ASSERT_OK(file_->Write(data2.data(), data2.size())); @@ -270,24 +273,24 @@ TEST_F(TestReadableFile, Close) { TEST_F(TestReadableFile, FromFileDescriptor) { MakeTestFile(); - int fd = -2; ASSERT_OK_AND_ASSIGN(auto file_name, PlatformFilename::FromString(path_)); - ASSERT_OK_AND_ASSIGN(fd, FileOpenReadable(file_name)); - ASSERT_GE(fd, 0); - ASSERT_OK(FileSeek(fd, 4)); + ASSERT_OK_AND_ASSIGN(FileDescriptor fd, FileOpenReadable(file_name)); + int raw_fd = fd.fd(); + ASSERT_GE(raw_fd, 0); + ASSERT_OK(FileSeek(raw_fd, 4)); - ASSERT_OK_AND_ASSIGN(file_, ReadableFile::Open(fd)); - ASSERT_EQ(file_->file_descriptor(), fd); + ASSERT_OK_AND_ASSIGN(file_, ReadableFile::Open(fd.Detach())); + ASSERT_EQ(file_->file_descriptor(), raw_fd); ASSERT_OK_AND_ASSIGN(auto buf, file_->Read(5)); ASSERT_EQ(buf->size(), 4); ASSERT_TRUE(buf->Equals(Buffer("data"))); - ASSERT_FALSE(FileIsClosed(fd)); + ASSERT_FALSE(FileIsClosed(raw_fd)); ASSERT_OK(file_->Close()); - ASSERT_TRUE(FileIsClosed(fd)); + ASSERT_TRUE(FileIsClosed(raw_fd)); // Idempotent ASSERT_OK(file_->Close()); - ASSERT_TRUE(FileIsClosed(fd)); + ASSERT_TRUE(FileIsClosed(raw_fd)); } TEST_F(TestReadableFile, Peek) { @@ -500,26 +503,18 @@ TEST_F(TestReadableFile, ThreadSafety) { class TestPipeIO : public ::testing::Test { public: void MakePipe() { - ASSERT_OK_AND_ASSIGN(auto pipe, CreatePipe()); - r_ = pipe.rfd; - w_ = pipe.wfd; - ASSERT_GE(r_, 0); - ASSERT_GE(w_, 0); + ASSERT_OK_AND_ASSIGN(pipe_, CreatePipe()); + ASSERT_GE(pipe_.rfd.fd(), 0); + ASSERT_GE(pipe_.rfd.fd(), 0); } void ClosePipe() { - if (r_ != -1) { - ASSERT_OK(FileClose(r_)); - r_ = -1; - } - if (w_ != -1) { - ASSERT_OK(FileClose(w_)); - w_ = -1; - } + ASSERT_OK(pipe_.rfd.Close()); + ASSERT_OK(pipe_.wfd.Close()); } void TearDown() { ClosePipe(); } protected: - int r_ = -1, w_ = -1; + ::arrow::internal::Pipe pipe_; }; TEST_F(TestPipeIO, TestWrite) { @@ -529,33 +524,32 @@ TEST_F(TestPipeIO, TestWrite) { int64_t bytes_read; MakePipe(); - ASSERT_OK_AND_ASSIGN(file, FileOutputStream::Open(w_)); - w_ = -1; // now owned by FileOutputStream + ASSERT_OK_AND_ASSIGN(file, FileOutputStream::Open(pipe_.wfd.Detach())); ASSERT_OK(file->Write(data1.data(), data1.size())); - ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(r_, buffer, 4)); + ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(pipe_.rfd.fd(), buffer, 4)); ASSERT_EQ(bytes_read, 4); ASSERT_EQ(0, std::memcmp(buffer, "test", 4)); ASSERT_OK(file->Write(Buffer::FromString(std::string(data2)))); - ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(r_, buffer, 4)); + ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(pipe_.rfd.fd(), buffer, 4)); ASSERT_EQ(bytes_read, 4); ASSERT_EQ(0, std::memcmp(buffer, "data", 4)); ASSERT_FALSE(file->closed()); ASSERT_OK(file->Close()); ASSERT_TRUE(file->closed()); - ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(r_, buffer, 2)); + ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(pipe_.rfd.fd(), buffer, 2)); ASSERT_EQ(bytes_read, 1); ASSERT_EQ(0, std::memcmp(buffer, "!", 1)); // EOF reached - ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(r_, buffer, 2)); + ASSERT_OK_AND_ASSIGN(bytes_read, FileRead(pipe_.rfd.fd(), buffer, 2)); ASSERT_EQ(bytes_read, 0); } TEST_F(TestPipeIO, ReadableFileFails) { // ReadableFile fails on non-seekable fd - ASSERT_RAISES(IOError, ReadableFile::Open(r_)); + ASSERT_RAISES(IOError, ReadableFile::Open(pipe_.rfd.fd())); } // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/test_common.cc b/cpp/src/arrow/io/test_common.cc index 2da6b4a1c5a..7cd6e518414 100644 --- a/cpp/src/arrow/io/test_common.cc +++ b/cpp/src/arrow/io/test_common.cc @@ -21,10 +21,7 @@ #include #include // IWYU pragma: keep -#ifdef _WIN32 -#include -#include -#else +#ifndef _WIN32 #include #endif @@ -74,38 +71,6 @@ Status PurgeLocalFileFromOsCache(const std::string& path) { #endif } -#if defined(_WIN32) -static void InvalidParamHandler(const wchar_t* expr, const wchar_t* func, - const wchar_t* source_file, unsigned int source_line, - uintptr_t reserved) { - wprintf(L"Invalid parameter in function '%s'. Source: '%s' line %d expression '%s'\n", - func, source_file, source_line, expr); -} -#endif - -bool FileIsClosed(int fd) { -#if defined(_WIN32) - // Disables default behavior on wrong params which causes the application to crash - // https://msdn.microsoft.com/en-us/library/ksazx244.aspx - _set_invalid_parameter_handler(InvalidParamHandler); - - // Disables possible assertion alert box on invalid input arguments - _CrtSetReportMode(_CRT_ASSERT, 0); - - int new_fd = _dup(fd); - if (new_fd == -1) { - return errno == EBADF; - } - _close(new_fd); - return false; -#else - if (-1 != fcntl(fd, F_GETFD)) { - return false; - } - return errno == EBADF; -#endif -} - Status ZeroMemoryMap(MemoryMappedFile* file) { constexpr int64_t kBufferSize = 512; static constexpr uint8_t kZeroBytes[kBufferSize] = {0}; diff --git a/cpp/src/arrow/io/test_common.h b/cpp/src/arrow/io/test_common.h index 9b68c8104a7..149ee987d7c 100644 --- a/cpp/src/arrow/io/test_common.h +++ b/cpp/src/arrow/io/test_common.h @@ -36,8 +36,6 @@ ARROW_TESTING_EXPORT bool FileExists(const std::string& path); ARROW_TESTING_EXPORT Status PurgeLocalFileFromOsCache(const std::string& path); -ARROW_TESTING_EXPORT bool FileIsClosed(int fd); - ARROW_TESTING_EXPORT Status ZeroMemoryMap(MemoryMappedFile* file); diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index c1cdfb137a8..c5ab367befe 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -19,7 +19,11 @@ #include "arrow/testing/extension_type.h" -#ifndef _WIN32 +#ifdef _WIN32 +#include +#include +#else +#include // IWYU pragma: keep #include // IWYU pragma: keep #include // IWYU pragma: keep #include // IWYU pragma: keep @@ -574,6 +578,40 @@ std::shared_ptr TweakValidityBit(const std::shared_ptr& array, return MakeArray(data); } +// XXX create a testing/io.{h,cc}? + +#if defined(_WIN32) +static void InvalidParamHandler(const wchar_t* expr, const wchar_t* func, + const wchar_t* source_file, unsigned int source_line, + uintptr_t reserved) { + wprintf(L"Invalid parameter in function '%s'. Source: '%s' line %d expression '%s'\n", + func, source_file, source_line, expr); +} +#endif + +bool FileIsClosed(int fd) { +#if defined(_WIN32) + // Disables default behavior on wrong params which causes the application to crash + // https://msdn.microsoft.com/en-us/library/ksazx244.aspx + _set_invalid_parameter_handler(InvalidParamHandler); + + // Disables possible assertion alert box on invalid input arguments + _CrtSetReportMode(_CRT_ASSERT, 0); + + int new_fd = _dup(fd); + if (new_fd == -1) { + return errno == EBADF; + } + _close(new_fd); + return false; +#else + if (-1 != fcntl(fd, F_GETFD)) { + return false; + } + return errno == EBADF; +#endif +} + bool LocaleExists(const char* locale) { try { std::locale loc(locale); diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index c8cb6af986e..8ce5049452a 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -373,6 +373,8 @@ Future<> SleepAsync(double seconds); ARROW_TESTING_EXPORT Future<> SleepABitAsync(); +ARROW_TESTING_EXPORT bool FileIsClosed(int fd); + template std::vector IteratorToVector(Iterator iterator) { EXPECT_OK_AND_ASSIGN(auto out, iterator.ToVector()); diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index c225656cb8f..0592b4864a9 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -95,6 +95,7 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" +#include "arrow/util/ubsan.h" // For filename conversion #if defined(_WIN32) @@ -989,82 +990,97 @@ Result FileExists(const PlatformFilename& path) { } // -// Functions for creating file descriptors +// Creating and destroying file descriptors // -#define CHECK_LSEEK(retval) \ - if ((retval) == -1) return Status::IOError("lseek failed"); +FileDescriptor::FileDescriptor(FileDescriptor&& other) : fd_(other.fd_.exchange(-1)) {} -static inline int64_t lseek64_compat(int fd, int64_t pos, int whence) { -#if defined(_WIN32) - return _lseeki64(fd, pos, whence); -#else - return lseek(fd, pos, whence); -#endif +FileDescriptor& FileDescriptor::operator=(FileDescriptor&& other) { + int old_fd = fd_.exchange(other.fd_.exchange(-1)); + if (old_fd != -1) { + CloseFromDestructor(old_fd); + } + return *this; } -static inline Result CheckFileOpResult(int fd_ret, int errno_actual, - const PlatformFilename& file_name, - const char* opname) { - if (fd_ret == -1) { -#ifdef _WIN32 - int winerr = GetLastError(); - if (winerr != ERROR_SUCCESS) { - return IOErrorFromWinError(GetLastError(), "Failed to ", opname, " file '", - file_name.ToString(), "'"); - } +void FileDescriptor::CloseFromDestructor(int fd) { + auto st = FileClose(fd); + if (!st.ok()) { + ARROW_LOG(WARNING) << "Failed to close file descriptor: " << st.ToString(); + } +} + +FileDescriptor::~FileDescriptor() { + int fd = fd_.load(); + if (fd != -1) { + CloseFromDestructor(fd); + } +} + +Status FileDescriptor::Close() { + int fd = fd_.exchange(-1); + if (fd != -1) { + return FileClose(fd); + } + return Status::OK(); +} + +int FileDescriptor::Detach() { return fd_.exchange(-1); } + +static Result lseek64_compat(int fd, int64_t pos, int whence) { +#if defined(_WIN32) + int64_t ret = _lseeki64(fd, pos, whence); +#else + int64_t ret = lseek(fd, pos, whence); #endif - return IOErrorFromErrno(errno_actual, "Failed to ", opname, " file '", - file_name.ToString(), "'"); + if (ret == -1) { + return Status::IOError("lseek failed"); } - return fd_ret; + return ret; } -Result FileOpenReadable(const PlatformFilename& file_name) { - int fd, errno_actual; +Result FileOpenReadable(const PlatformFilename& file_name) { + FileDescriptor fd; #if defined(_WIN32) - SetLastError(0); HANDLE file_handle = CreateFileW(file_name.ToNative().c_str(), GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); - - DWORD last_error = GetLastError(); - if (last_error == ERROR_SUCCESS) { - errno_actual = 0; - fd = _open_osfhandle(reinterpret_cast(file_handle), - _O_RDONLY | _O_BINARY | _O_NOINHERIT); - } else { - return IOErrorFromWinError(last_error, "Failed to open local file '", + if (file_handle == INVALID_HANDLE_VALUE) { + return IOErrorFromWinError(GetLastError(), "Failed to open local file '", file_name.ToString(), "'"); } + int ret = _open_osfhandle(reinterpret_cast(file_handle), + _O_RDONLY | _O_BINARY | _O_NOINHERIT); + if (ret == -1) { + CloseHandle(file_handle); + return IOErrorFromErrno(errno, "Failed to open local file '", file_name.ToString(), + "'"); + } + fd = FileDescriptor(ret); #else - fd = open(file_name.ToNative().c_str(), O_RDONLY); - errno_actual = errno; - - if (fd >= 0) { - // open(O_RDONLY) succeeds on directories, check for it - struct stat st; - int ret = fstat(fd, &st); - if (ret == -1) { - ARROW_UNUSED(FileClose(fd)); - // Will propagate error below - } else if (S_ISDIR(st.st_mode)) { - ARROW_UNUSED(FileClose(fd)); - return Status::IOError("Cannot open for reading: path '", file_name.ToString(), - "' is a directory"); - } + int ret = open(file_name.ToNative().c_str(), O_RDONLY); + if (ret < 0) { + return IOErrorFromErrno(errno, "Failed to open local file '", file_name.ToString(), + "'"); + } + // open(O_RDONLY) succeeds on directories, check for it + fd = FileDescriptor(ret); + struct stat st; + ret = fstat(fd.fd(), &st); + if (ret == 0 && S_ISDIR(st.st_mode)) { + return Status::IOError("Cannot open for reading: path '", file_name.ToString(), + "' is a directory"); } #endif - return CheckFileOpResult(fd, errno_actual, file_name, "open local"); + return fd; } -Result FileOpenWritable(const PlatformFilename& file_name, bool write_only, - bool truncate, bool append) { - int fd, errno_actual; +Result FileOpenWritable(const PlatformFilename& file_name, + bool write_only, bool truncate, bool append) { + FileDescriptor fd; #if defined(_WIN32) - SetLastError(0); int oflag = _O_CREAT | _O_BINARY | _O_NOINHERIT; DWORD desired_access = GENERIC_WRITE; DWORD share_mode = FILE_SHARE_READ | FILE_SHARE_WRITE; @@ -1089,15 +1105,19 @@ Result FileOpenWritable(const PlatformFilename& file_name, bool write_only, HANDLE file_handle = CreateFileW(file_name.ToNative().c_str(), desired_access, share_mode, NULL, creation_disposition, FILE_ATTRIBUTE_NORMAL, NULL); - - DWORD last_error = GetLastError(); - if (last_error == ERROR_SUCCESS || last_error == ERROR_ALREADY_EXISTS) { - errno_actual = 0; - fd = _open_osfhandle(reinterpret_cast(file_handle), oflag); - } else { - return IOErrorFromWinError(last_error, "Failed to open local file '", + if (file_handle == INVALID_HANDLE_VALUE) { + return IOErrorFromWinError(GetLastError(), "Failed to open local file '", file_name.ToString(), "'"); } + + int ret = _open_osfhandle(reinterpret_cast(file_handle), + _O_RDONLY | _O_BINARY | _O_NOINHERIT); + if (ret == -1) { + CloseHandle(file_handle); + return IOErrorFromErrno(errno, "Failed to open local file '", file_name.ToString(), + "'"); + } + fd = FileDescriptor(ret); #else int oflag = O_CREAT; @@ -1114,60 +1134,221 @@ Result FileOpenWritable(const PlatformFilename& file_name, bool write_only, oflag |= O_RDWR; } - fd = open(file_name.ToNative().c_str(), oflag, 0666); - errno_actual = errno; + int ret = open(file_name.ToNative().c_str(), oflag, 0666); + if (ret == -1) { + return IOErrorFromErrno(errno, "Failed to open local file '", file_name.ToString(), + "'"); + } + fd = FileDescriptor(ret); #endif - RETURN_NOT_OK(CheckFileOpResult(fd, errno_actual, file_name, "open local")); if (append) { // Seek to end, as O_APPEND does not necessarily do it - auto ret = lseek64_compat(fd, 0, SEEK_END); - if (ret == -1) { - ARROW_UNUSED(FileClose(fd)); - return Status::IOError("lseek failed"); - } + RETURN_NOT_OK(lseek64_compat(fd.fd(), 0, SEEK_END)); } return fd; } Result FileTell(int fd) { - int64_t current_pos; #if defined(_WIN32) - current_pos = _telli64(fd); + int64_t current_pos = _telli64(fd); if (current_pos == -1) { return Status::IOError("_telli64 failed"); } + return current_pos; #else - current_pos = lseek64_compat(fd, 0, SEEK_CUR); - CHECK_LSEEK(current_pos); + return lseek64_compat(fd, 0, SEEK_CUR); #endif - return current_pos; } Result CreatePipe() { int ret; - int fd[2]; + int fds[2]; + #if defined(_WIN32) - ret = _pipe(fd, 4096, _O_BINARY); + ret = _pipe(fds, 4096, _O_BINARY); #else - ret = pipe(fd); + ret = ::pipe(fds); #endif - if (ret == -1) { return IOErrorFromErrno(errno, "Error creating pipe"); } - return Pipe{fd[0], fd[1]}; + + return Pipe{FileDescriptor(fds[0]), FileDescriptor(fds[1])}; } -static Status StatusFromMmapErrno(const char* prefix) { +Status SetPipeFileDescriptorNonBlocking(int fd) { +#if defined(_WIN32) + const auto handle = reinterpret_cast(_get_osfhandle(fd)); + DWORD mode = PIPE_NOWAIT; + if (!SetNamedPipeHandleState(handle, &mode, nullptr, nullptr)) { + return IOErrorFromWinError(GetLastError(), "Error making pipe non-blocking"); + } +#else + int flags = fcntl(fd, F_GETFL); + if (flags == -1 || fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { + return IOErrorFromErrno(errno, "Error making pipe non-blocking"); + } +#endif + return Status::OK(); +} + +namespace { + +#ifdef WIN32 +#define PIPE_WRITE _write +#define PIPE_READ _read +#else +#define PIPE_WRITE write +#define PIPE_READ read +#endif + +class SelfPipeImpl : public SelfPipe { + using PayloadBytes = std::array; + + static constexpr uint64_t kEofPayload = 5804561806345822987ULL; + + public: + explicit SelfPipeImpl(bool signal_safe) : signal_safe_(signal_safe) {} + + Status Init() { + ARROW_ASSIGN_OR_RAISE(pipe_, CreatePipe()); + if (signal_safe_) { + if (!please_shutdown_.is_lock_free()) { + return Status::IOError("Cannot use non-lock-free atomic in a signal handler"); + } + // We cannot afford blocking writes in a signal handler + RETURN_NOT_OK(SetPipeFileDescriptorNonBlocking(pipe_.wfd.fd())); + } + return Status::OK(); + } + + Result Wait() override { + if (pipe_.rfd.closed()) { + // Already closed + return ClosedPipe(); + } + PayloadBytes bytes; + char* buf = bytes.data(); + auto buf_size = static_cast(bytes.size()); + while (buf_size > 0) { + int64_t n_read = PIPE_READ(pipe_.rfd.fd(), buf, static_cast(buf_size)); + if (n_read < 0) { + if (errno == EINTR) { + continue; + } + if (pipe_.rfd.closed()) { + return ClosedPipe(); + } + return IOErrorFromErrno(errno, "Failed reading from self-pipe"); + } + buf += n_read; + buf_size -= n_read; + } + const auto payload = PayloadFromBytes(bytes); + if (payload == kEofPayload && please_shutdown_.load()) { + RETURN_NOT_OK(pipe_.rfd.Close()); + return ClosedPipe(); + } + return payload; + } + + // XXX return StatusCode from here? + void Send(uint64_t payload) override { + if (signal_safe_) { + int saved_errno = errno; + DoSend(payload); + errno = saved_errno; + } else { + DoSend(payload); + } + } + + Status Shutdown() override { + please_shutdown_.store(true); + errno = 0; + if (!DoSend(kEofPayload)) { + if (errno) { + return IOErrorFromErrno(errno, "Could not shutdown self-pipe"); + } else if (!pipe_.wfd.closed()) { + return Status::UnknownError("Could not shutdown self-pipe"); + } + } + return pipe_.wfd.Close(); + } + + ~SelfPipeImpl() { + auto st = Shutdown(); + if (!st.ok()) { + ARROW_LOG(WARNING) << "On self-pipe destruction: " << st.ToString(); + } + } + + protected: + PayloadBytes PayloadToBytes(uint64_t payload) const { + return arrow::util::SafeCopy(payload); + } + + uint64_t PayloadFromBytes(PayloadBytes bytes) const { + return arrow::util::SafeCopy(bytes); + } + + Status ClosedPipe() const { return Status::Invalid("Self-pipe closed"); } + + bool DoSend(uint64_t payload) { + // This needs to be async-signal safe as it's called from Send() + if (pipe_.wfd.closed()) { + // Already closed + return false; + } + const auto bytes = PayloadToBytes(payload); + const char* buf = bytes.data(); + auto buf_size = static_cast(bytes.size()); + while (buf_size > 0) { + int64_t n_written = + PIPE_WRITE(pipe_.wfd.fd(), buf, static_cast(buf_size)); + if (n_written < 0) { + if (errno == EINTR) { + continue; + } else { + // Perhaps EAGAIN if non-blocking, or EBADF if closed in the meantime? + // In any case, we can't do anything more here. + break; + } + } + buf += n_written; + buf_size -= n_written; + } + return buf_size == 0; + } + + const bool signal_safe_; + Pipe pipe_; + std::atomic please_shutdown_{false}; +}; + +#undef PIPE_WRITE +#undef PIPE_READ + +} // namespace + +Result> SelfPipe::Make(bool signal_safe) { + auto ptr = std::make_shared(signal_safe); + RETURN_NOT_OK(ptr->Init()); + return ptr; +} + +SelfPipe::~SelfPipe() = default; + +namespace { + +Status StatusFromMmapErrno(const char* prefix) { #ifdef _WIN32 errno = __map_mman_error(GetLastError(), EPERM); #endif return IOErrorFromErrno(errno, prefix); } -namespace { - int64_t GetPageSizeInternal() { #if defined(__APPLE__) return getpagesize(); @@ -1342,9 +1523,7 @@ Status FileClose(int fd) { // Status FileSeek(int fd, int64_t pos, int whence) { - int64_t ret = lseek64_compat(fd, pos, whence); - CHECK_LSEEK(ret); - return Status::OK(); + return lseek64_compat(fd, pos, whence).status(); } Status FileSeek(int fd, int64_t pos) { return FileSeek(fd, pos, SEEK_SET); } @@ -1408,32 +1587,44 @@ static inline int64_t pread_compat(int fd, void* buf, int64_t nbytes, int64_t po } Result FileRead(int fd, uint8_t* buffer, int64_t nbytes) { - int64_t bytes_read = 0; +#if defined(_WIN32) + HANDLE handle = reinterpret_cast(_get_osfhandle(fd)); +#endif + int64_t total_bytes_read = 0; - while (bytes_read < nbytes) { + while (total_bytes_read < nbytes) { const int64_t chunksize = - std::min(static_cast(ARROW_MAX_IO_CHUNKSIZE), nbytes - bytes_read); + std::min(static_cast(ARROW_MAX_IO_CHUNKSIZE), nbytes - total_bytes_read); + int64_t bytes_read = 0; #if defined(_WIN32) - int64_t ret = - static_cast(_read(fd, buffer, static_cast(chunksize))); + DWORD dwBytesRead = 0; + if (!ReadFile(handle, buffer, static_cast(chunksize), &dwBytesRead, + nullptr)) { + auto errnum = GetLastError(); + // Return a normal EOF when the write end of a pipe was closed + if (errnum != ERROR_HANDLE_EOF && errnum != ERROR_BROKEN_PIPE) { + return IOErrorFromWinError(GetLastError(), "Error reading bytes from file"); + } + } + bytes_read = dwBytesRead; #else - int64_t ret = static_cast(read(fd, buffer, static_cast(chunksize))); - if (ret == -1 && errno == EINTR) { - continue; + bytes_read = static_cast(read(fd, buffer, static_cast(chunksize))); + if (bytes_read == -1) { + if (errno == EINTR) { + continue; + } + return IOErrorFromErrno(errno, "Error reading bytes from file"); } #endif - if (ret == -1) { - return IOErrorFromErrno(errno, "Error reading bytes from file"); - } - if (ret == 0) { + if (bytes_read == 0) { // EOF break; } - buffer += ret; - bytes_read += ret; + buffer += bytes_read; + total_bytes_read += bytes_read; } - return bytes_read; + return total_bytes_read; } Result FileReadAt(int fd, uint8_t* buffer, int64_t position, int64_t nbytes) { diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index 30dfb2ba677..df63de47e83 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -21,6 +21,7 @@ #define ARROW_HAVE_SIGACTION 1 #endif +#include #include #include #include @@ -124,14 +125,46 @@ Result DeleteFile(const PlatformFilename& file_path, bool allow_not_found ARROW_EXPORT Result FileExists(const PlatformFilename& path); +// TODO expose this more publicly to make it available from io/file.h? +/// A RAII wrapper for a file descriptor. +/// +/// The underlying file descriptor is automatically closed on destruction. +/// Moving is supported with well-defined semantics. +/// Furthermore, closing is idempotent. +class ARROW_EXPORT FileDescriptor { + public: + FileDescriptor() = default; + explicit FileDescriptor(int fd) : fd_(fd) {} + FileDescriptor(FileDescriptor&&); + FileDescriptor& operator=(FileDescriptor&&); + + ~FileDescriptor(); + + Status Close(); + + /// May return -1 if closed or default-initialized + int fd() const { return fd_.load(); } + + /// Detach and return the underlying file descriptor + int Detach(); + + bool closed() const { return fd_.load() == -1; } + + protected: + static void CloseFromDestructor(int fd); + + std::atomic fd_{-1}; +}; + /// Open a file for reading and return a file descriptor. ARROW_EXPORT -Result FileOpenReadable(const PlatformFilename& file_name); +Result FileOpenReadable(const PlatformFilename& file_name); /// Open a file for writing and return a file descriptor. ARROW_EXPORT -Result FileOpenWritable(const PlatformFilename& file_name, bool write_only = true, - bool truncate = true, bool append = false); +Result FileOpenWritable(const PlatformFilename& file_name, + bool write_only = true, bool truncate = true, + bool append = false); /// Read from current file position. Return number of bytes read. ARROW_EXPORT @@ -158,13 +191,38 @@ ARROW_EXPORT Status FileClose(int fd); struct Pipe { - int rfd; - int wfd; + FileDescriptor rfd; + FileDescriptor wfd; + + Status Close() { return rfd.Close() & wfd.Close(); } }; ARROW_EXPORT Result CreatePipe(); +ARROW_EXPORT +Status SetPipeFileDescriptorNonBlocking(int fd); + +class ARROW_EXPORT SelfPipe { + public: + static Result> Make(bool signal_safe); + virtual ~SelfPipe(); + + /// \brief Wait for a wakeup. + /// + /// Status::Invalid is returned if the pipe has been shutdown. + /// Otherwise the next sent payload is returned. + virtual Result Wait() = 0; + + /// \brief Wake up the pipe by sending a payload. + /// + /// This method is async-signal-safe if `signal_safe` was set to true. + virtual void Send(uint64_t payload) = 0; + + /// \brief Wake up the pipe and shut it down. + virtual Status Shutdown() = 0; +}; + ARROW_EXPORT int64_t GetPageSize(); diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index a38699dfd82..67011d02b16 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -20,13 +20,16 @@ #include #include #include +#include #include +#include #include #include #ifndef _WIN32 #include +#include #endif #include @@ -38,9 +41,18 @@ #include "arrow/util/cpu_info.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" +#include "arrow/util/optional.h" #include "arrow/util/windows_compatibility.h" #include "arrow/util/windows_fixup.h" +#ifdef WIN32 +#define PIPE_WRITE _write +#define PIPE_READ _read +#else +#define PIPE_WRITE write +#define PIPE_READ read +#endif + namespace arrow { namespace internal { @@ -57,9 +69,8 @@ void AssertNotExists(const PlatformFilename& path) { } void TouchFile(const PlatformFilename& path) { - int fd = -1; - ASSERT_OK_AND_ASSIGN(fd, FileOpenWritable(path)); - ASSERT_OK(FileClose(fd)); + ASSERT_OK_AND_ASSIGN(FileDescriptor fd, FileOpenWritable(path)); + ASSERT_OK(fd.Close()); } TEST(ErrnoFromStatus, Basics) { @@ -159,6 +170,294 @@ TEST(WinErrorFromStatus, Basics) { } #endif +class TestFileDescriptor : public ::testing::Test { + public: + Result NewFileDescriptor() { + // Make a new fd by dup'ing C stdout (why not?) + int new_fd = dup(1); + if (new_fd < 0) { + return IOErrorFromErrno(errno, "Failed to dup() C stdout"); + } + return new_fd; + } + + void AssertValidFileDescriptor(int fd) { + ASSERT_FALSE(FileIsClosed(fd)) << "Not a valid file descriptor: " << fd; + } + + void AssertInvalidFileDescriptor(int fd) { + ASSERT_TRUE(FileIsClosed(fd)) << "Unexpectedly valid file descriptor: " << fd; + } + + Result IsValidFileDescriptor(int fd) { + int new_fd = dup(fd); + if (new_fd >= 0) { + close(new_fd); + return true; + } else if (errno == EBADF) { + return false; + } else { + return IOErrorFromErrno(errno, "Failed to dup fd ", fd); + } + } +}; + +TEST_F(TestFileDescriptor, Basics) { + int new_fd, new_fd2; + + // Default initialization + FileDescriptor a; + ASSERT_EQ(a.fd(), -1); + ASSERT_TRUE(a.closed()); + ASSERT_OK(a.Close()); + ASSERT_OK(a.Close()); + + // Assignment + ASSERT_OK_AND_ASSIGN(new_fd, NewFileDescriptor()); + AssertValidFileDescriptor(new_fd); + a = FileDescriptor(new_fd); + ASSERT_FALSE(a.closed()); + ASSERT_GT(a.fd(), 2); + ASSERT_OK(a.Close()); + AssertInvalidFileDescriptor(new_fd); // underlying fd was actually closed + ASSERT_TRUE(a.closed()); + ASSERT_EQ(a.fd(), -1); + ASSERT_OK(a.Close()); + ASSERT_TRUE(a.closed()); + + ASSERT_OK_AND_ASSIGN(new_fd, NewFileDescriptor()); + ASSERT_OK_AND_ASSIGN(new_fd2, NewFileDescriptor()); + + // Move assignment + FileDescriptor b(new_fd); + FileDescriptor c(new_fd2); + AssertValidFileDescriptor(new_fd); + AssertValidFileDescriptor(new_fd2); + c = std::move(b); + ASSERT_TRUE(b.closed()); + ASSERT_EQ(b.fd(), -1); + ASSERT_FALSE(c.closed()); + ASSERT_EQ(c.fd(), new_fd); + AssertValidFileDescriptor(new_fd); + AssertInvalidFileDescriptor(new_fd2); + + // Move constructor + FileDescriptor d(std::move(c)); + ASSERT_TRUE(c.closed()); + ASSERT_EQ(c.fd(), -1); + ASSERT_FALSE(d.closed()); + ASSERT_EQ(d.fd(), new_fd); + AssertValidFileDescriptor(new_fd); + + // Detaching + { + FileDescriptor e(d.Detach()); + ASSERT_TRUE(d.closed()); + ASSERT_EQ(d.fd(), -1); + ASSERT_FALSE(e.closed()); + ASSERT_EQ(e.fd(), new_fd); + AssertValidFileDescriptor(new_fd); + } + AssertInvalidFileDescriptor(new_fd); // e was closed +} + +class TestCreatePipe : public ::testing::Test { + public: + void TearDown() override { ASSERT_OK(pipe_.Close()); } + + protected: + Pipe pipe_; +}; + +TEST_F(TestCreatePipe, Blocking) { + ASSERT_OK_AND_ASSIGN(pipe_, CreatePipe()); + + std::string buf("abcd"); + ASSERT_OK(FileWrite(pipe_.wfd.fd(), reinterpret_cast(buf.data()), + buf.size())); + buf = "xxxx"; + ASSERT_OK_AND_EQ( + 4, FileRead(pipe_.rfd.fd(), reinterpret_cast(&buf[0]), buf.size())); + ASSERT_EQ(buf, "abcd"); +} + +TEST_F(TestCreatePipe, NonBlocking) { + ASSERT_OK_AND_ASSIGN(pipe_, CreatePipe()); + ASSERT_OK(SetPipeFileDescriptorNonBlocking(pipe_.rfd.fd())); + ASSERT_OK(SetPipeFileDescriptorNonBlocking(pipe_.wfd.fd())); + + std::string buf("abcd"); + ASSERT_OK(FileWrite(pipe_.wfd.fd(), reinterpret_cast(buf.data()), + buf.size())); + buf = "xxxx"; + ASSERT_OK_AND_EQ( + 4, FileRead(pipe_.rfd.fd(), reinterpret_cast(&buf[0]), buf.size())); + ASSERT_EQ(buf, "abcd"); + + auto st = + FileRead(pipe_.rfd.fd(), reinterpret_cast(&buf[0]), buf.size()).status(); + ASSERT_RAISES(IOError, st); +#ifdef _WIN32 + ASSERT_EQ(WinErrorFromStatus(st), ERROR_NO_DATA); +#else + ASSERT_EQ(ErrnoFromStatus(st), EAGAIN); +#endif +} + +class TestSelfPipe : public ::testing::Test { + public: + void SetUp() override { + instance_ = this; + ASSERT_OK_AND_ASSIGN(self_pipe_, SelfPipe::Make(/*signal_safe=*/true)); + } + + void StartReading() { + read_thread_ = std::thread([this]() { ReadUntilEof(); }); + } + + void FinishReading() { read_thread_.join(); } + + void TearDown() override { + ASSERT_OK(self_pipe_->Shutdown()); + if (read_thread_.joinable()) { + read_thread_.join(); + } + instance_ = nullptr; + } + + Status ReadStatus() { + std::lock_guard lock(mutex_); + return status_; + } + + std::vector ReadPayloads() { + std::lock_guard lock(mutex_); + return payloads_; + } + + void AssertPayloadsEventually(const std::vector& expected) { + BusyWait(1.0, [&]() { return ReadPayloads().size() == expected.size(); }); + ASSERT_EQ(ReadPayloads(), expected); + } + + protected: + void ReadUntilEof() { + while (true) { + auto maybe_payload = self_pipe_->Wait(); + std::lock_guard lock(mutex_); + if (maybe_payload.ok()) { + payloads_.push_back(*maybe_payload); + } else if (maybe_payload.status().IsInvalid()) { + // EOF + break; + } else { + status_ = maybe_payload.status(); + // Since we got an error, we may not be able to ever detect EOF, + // so bail out? + break; + } + } + } + + static void HandleSignal(int signum) { + instance_->signal_received_.store(signum); + instance_->self_pipe_->Send(123); + } + + std::mutex mutex_; + std::shared_ptr self_pipe_; + std::thread read_thread_; + std::vector payloads_; + Status status_; + std::atomic signal_received_; + + static TestSelfPipe* instance_; +}; + +TestSelfPipe* TestSelfPipe::instance_; + +TEST_F(TestSelfPipe, MakeAndShutdown) {} + +TEST_F(TestSelfPipe, WaitAndSend) { + StartReading(); + SleepABit(); + AssertPayloadsEventually({}); + ASSERT_OK(ReadStatus()); + + self_pipe_->Send(123456789123456789ULL); + self_pipe_->Send(987654321987654321ULL); + AssertPayloadsEventually({123456789123456789ULL, 987654321987654321ULL}); + ASSERT_OK(ReadStatus()); +} + +TEST_F(TestSelfPipe, SendAndWait) { + self_pipe_->Send(123456789123456789ULL); + StartReading(); + SleepABit(); + self_pipe_->Send(987654321987654321ULL); + + AssertPayloadsEventually({123456789123456789ULL, 987654321987654321ULL}); + ASSERT_OK(ReadStatus()); +} + +TEST_F(TestSelfPipe, WaitAndShutdown) { + StartReading(); + SleepABit(); + ASSERT_OK(self_pipe_->Shutdown()); + FinishReading(); + + ASSERT_THAT(ReadPayloads(), testing::ElementsAre()); + ASSERT_OK(ReadStatus()); + ASSERT_OK(self_pipe_->Shutdown()); // idempotent +} + +TEST_F(TestSelfPipe, ShutdownAndWait) { + self_pipe_->Send(123456789123456789ULL); + ASSERT_OK(self_pipe_->Shutdown()); + StartReading(); + SleepABit(); + FinishReading(); + + ASSERT_THAT(ReadPayloads(), testing::ElementsAre(123456789123456789ULL)); + ASSERT_OK(ReadStatus()); + ASSERT_OK(self_pipe_->Shutdown()); // idempotent +} + +TEST_F(TestSelfPipe, WaitAndSendFromSignal) { + signal_received_.store(0); + SignalHandlerGuard guard(SIGINT, &HandleSignal); + + StartReading(); + SleepABit(); + + self_pipe_->Send(456); + ASSERT_OK(SendSignal(SIGINT)); // will send 123 + self_pipe_->Send(789); + BusyWait(1.0, [&]() { return signal_received_.load() != 0; }); + ASSERT_EQ(signal_received_.load(), SIGINT); + + BusyWait(1.0, [&]() { return ReadPayloads().size() == 3; }); + ASSERT_THAT(ReadPayloads(), testing::UnorderedElementsAre(123, 456, 789)); + ASSERT_OK(ReadStatus()); +} + +TEST_F(TestSelfPipe, SendFromSignalAndWait) { + signal_received_.store(0); + SignalHandlerGuard guard(SIGINT, &HandleSignal); + + self_pipe_->Send(456); + ASSERT_OK(SendSignal(SIGINT)); // will send 123 + self_pipe_->Send(789); + BusyWait(1.0, [&]() { return signal_received_.load() != 0; }); + ASSERT_EQ(signal_received_.load(), SIGINT); + + StartReading(); + + BusyWait(1.0, [&]() { return ReadPayloads().size() == 3; }); + ASSERT_THAT(ReadPayloads(), testing::UnorderedElementsAre(123, 456, 789)); + ASSERT_OK(ReadStatus()); +} + TEST(PlatformFilename, RoundtripAscii) { PlatformFilename fn; ASSERT_OK_AND_ASSIGN(fn, PlatformFilename::FromString("a/b")); @@ -648,7 +947,6 @@ TEST(FileUtils, LongPaths) { const std::string BASE = "xxx-io-util-test-dir-long"; PlatformFilename base_path, long_path, long_filename; - int fd = -1; std::stringstream fs; fs << BASE; for (int i = 0; i < 64; ++i) { @@ -665,9 +963,9 @@ TEST(FileUtils, LongPaths) { PlatformFilename::FromString(fs.str() + "/file.txt")); TouchFile(long_filename); AssertExists(long_filename); - fd = -1; - ASSERT_OK_AND_ASSIGN(fd, FileOpenReadable(long_filename)); - ASSERT_OK(FileClose(fd)); + + ASSERT_OK_AND_ASSIGN(FileDescriptor fd, FileOpenReadable(long_filename)); + ASSERT_OK(fd.Close()); ASSERT_OK_AND_ASSIGN(deleted, DeleteDirContents(long_path)); ASSERT_TRUE(deleted); ASSERT_OK_AND_ASSIGN(deleted, DeleteDirTree(long_path)); @@ -679,44 +977,49 @@ TEST(FileUtils, LongPaths) { } #endif -static std::atomic signal_received; +class TestSendSignal : public ::testing::Test { + protected: + static std::atomic signal_received_; -static void handle_signal(int signum) { - ReinstateSignalHandler(signum, &handle_signal); - signal_received.store(signum); -} + static void HandleSignal(int signum) { + ReinstateSignalHandler(signum, &HandleSignal); + signal_received_.store(signum); + } +}; + +std::atomic TestSendSignal::signal_received_; -TEST(SendSignal, Generic) { - signal_received.store(0); - SignalHandlerGuard guard(SIGINT, &handle_signal); +TEST_F(TestSendSignal, Generic) { + signal_received_.store(0); + SignalHandlerGuard guard(SIGINT, &HandleSignal); - ASSERT_EQ(signal_received.load(), 0); + ASSERT_EQ(signal_received_.load(), 0); ASSERT_OK(SendSignal(SIGINT)); - BusyWait(1.0, [&]() { return signal_received.load() != 0; }); - ASSERT_EQ(signal_received.load(), SIGINT); + BusyWait(1.0, [&]() { return signal_received_.load() != 0; }); + ASSERT_EQ(signal_received_.load(), SIGINT); // Re-try (exercise ReinstateSignalHandler) - signal_received.store(0); + signal_received_.store(0); ASSERT_OK(SendSignal(SIGINT)); - BusyWait(1.0, [&]() { return signal_received.load() != 0; }); - ASSERT_EQ(signal_received.load(), SIGINT); + BusyWait(1.0, [&]() { return signal_received_.load() != 0; }); + ASSERT_EQ(signal_received_.load(), SIGINT); } -TEST(SendSignal, ToThread) { +TEST_F(TestSendSignal, ToThread) { #ifdef _WIN32 uint64_t dummy_thread_id = 42; ASSERT_RAISES(NotImplemented, SendSignalToThread(SIGINT, dummy_thread_id)); #else // Have to use a C-style cast because pthread_t can be a pointer *or* integer type uint64_t thread_id = (uint64_t)(pthread_self()); // NOLINT readability-casting - signal_received.store(0); - SignalHandlerGuard guard(SIGINT, &handle_signal); + signal_received_.store(0); + SignalHandlerGuard guard(SIGINT, &HandleSignal); - ASSERT_EQ(signal_received.load(), 0); + ASSERT_EQ(signal_received_.load(), 0); ASSERT_OK(SendSignalToThread(SIGINT, thread_id)); - BusyWait(1.0, [&]() { return signal_received.load() != 0; }); + BusyWait(1.0, [&]() { return signal_received_.load() != 0; }); - ASSERT_EQ(signal_received.load(), SIGINT); + ASSERT_EQ(signal_received_.load(), SIGINT); #endif } From ddd73b3b3849751225cda78bf051419299f71fc4 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 9 Jun 2022 18:48:24 +0200 Subject: [PATCH 2/3] Fix compilation failure on old gccs --- cpp/src/arrow/util/io_util.cc | 4 ++-- cpp/src/arrow/util/io_util_test.cc | 12 ------------ 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 0592b4864a9..32e250f4b9e 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -1073,7 +1073,7 @@ Result FileOpenReadable(const PlatformFilename& file_name) { } #endif - return fd; + return std::move(fd); } Result FileOpenWritable(const PlatformFilename& file_name, @@ -1146,7 +1146,7 @@ Result FileOpenWritable(const PlatformFilename& file_name, // Seek to end, as O_APPEND does not necessarily do it RETURN_NOT_OK(lseek64_compat(fd.fd(), 0, SEEK_END)); } - return fd; + return std::move(fd); } Result FileTell(int fd) { diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index 67011d02b16..57c75fff3c7 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -188,18 +188,6 @@ class TestFileDescriptor : public ::testing::Test { void AssertInvalidFileDescriptor(int fd) { ASSERT_TRUE(FileIsClosed(fd)) << "Unexpectedly valid file descriptor: " << fd; } - - Result IsValidFileDescriptor(int fd) { - int new_fd = dup(fd); - if (new_fd >= 0) { - close(new_fd); - return true; - } else if (errno == EBADF) { - return false; - } else { - return IOErrorFromErrno(errno, "Failed to dup fd ", fd); - } - } }; TEST_F(TestFileDescriptor, Basics) { From dcf11356c22772b791d843453eb9ee59d0e6bb5c Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 9 Jun 2022 21:58:30 +0200 Subject: [PATCH 3/3] Fix test failure on test-debian-10-cpp-i386 --- cpp/src/arrow/util/io_util.cc | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 32e250f4b9e..8d393d733dc 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -95,7 +95,6 @@ #include "arrow/util/checked_cast.h" #include "arrow/util/io_util.h" #include "arrow/util/logging.h" -#include "arrow/util/ubsan.h" // For filename conversion #if defined(_WIN32) @@ -1204,8 +1203,6 @@ namespace { #endif class SelfPipeImpl : public SelfPipe { - using PayloadBytes = std::array; - static constexpr uint64_t kEofPayload = 5804561806345822987ULL; public: @@ -1228,9 +1225,9 @@ class SelfPipeImpl : public SelfPipe { // Already closed return ClosedPipe(); } - PayloadBytes bytes; - char* buf = bytes.data(); - auto buf_size = static_cast(bytes.size()); + uint64_t payload = 0; + char* buf = reinterpret_cast(&payload); + auto buf_size = static_cast(sizeof(payload)); while (buf_size > 0) { int64_t n_read = PIPE_READ(pipe_.rfd.fd(), buf, static_cast(buf_size)); if (n_read < 0) { @@ -1245,7 +1242,6 @@ class SelfPipeImpl : public SelfPipe { buf += n_read; buf_size -= n_read; } - const auto payload = PayloadFromBytes(bytes); if (payload == kEofPayload && please_shutdown_.load()) { RETURN_NOT_OK(pipe_.rfd.Close()); return ClosedPipe(); @@ -1285,14 +1281,6 @@ class SelfPipeImpl : public SelfPipe { } protected: - PayloadBytes PayloadToBytes(uint64_t payload) const { - return arrow::util::SafeCopy(payload); - } - - uint64_t PayloadFromBytes(PayloadBytes bytes) const { - return arrow::util::SafeCopy(bytes); - } - Status ClosedPipe() const { return Status::Invalid("Self-pipe closed"); } bool DoSend(uint64_t payload) { @@ -1301,9 +1289,8 @@ class SelfPipeImpl : public SelfPipe { // Already closed return false; } - const auto bytes = PayloadToBytes(payload); - const char* buf = bytes.data(); - auto buf_size = static_cast(bytes.size()); + const char* buf = reinterpret_cast(&payload); + auto buf_size = static_cast(sizeof(payload)); while (buf_size > 0) { int64_t n_written = PIPE_WRITE(pipe_.wfd.fd(), buf, static_cast(buf_size));