Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cpp/src/arrow/filesystem/localfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,11 @@ Result<std::shared_ptr<io::OutputStream>> OpenOutputStreamGeneric(const std::str
ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
const bool write_only = true;
ARROW_ASSIGN_OR_RAISE(
int fd, ::arrow::internal::FileOpenWritable(fn, write_only, truncate, append));
auto maybe_stream = io::FileOutputStream::Open(fd);
auto fd, ::arrow::internal::FileOpenWritable(fn, write_only, truncate, append));
int raw_fd = fd.Detach();
auto maybe_stream = io::FileOutputStream::Open(raw_fd);
if (!maybe_stream.ok()) {
ARROW_UNUSED(::arrow::internal::FileClose(fd));
ARROW_UNUSED(::arrow::internal::FileClose(raw_fd));
}
return maybe_stream;
}
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/filesystem/localfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace arrow {
namespace fs {
namespace internal {

using ::arrow::internal::FileDescriptor;
using ::arrow::internal::PlatformFilename;
using ::arrow::internal::TemporaryDir;

Expand Down Expand Up @@ -237,9 +238,8 @@ class TestLocalFS : public LocalFSTestMixin {

void CheckConcreteFile(const std::string& path, int64_t expected_size) {
ASSERT_OK_AND_ASSIGN(auto fn, PlatformFilename::FromString(path));
ASSERT_OK_AND_ASSIGN(int fd, ::arrow::internal::FileOpenReadable(fn));
auto result = ::arrow::internal::FileGetSize(fd);
ASSERT_OK(::arrow::internal::FileClose(fd));
ASSERT_OK_AND_ASSIGN(FileDescriptor fd, ::arrow::internal::FileOpenReadable(fn));
auto result = ::arrow::internal::FileGetSize(fd.fd());
ASSERT_OK_AND_ASSIGN(int64_t size, result);
ASSERT_EQ(size, expected_size);
}
Expand Down
89 changes: 19 additions & 70 deletions cpp/src/arrow/flight/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@

#include "arrow/flight/server.h"

#ifdef _WIN32
#include "arrow/util/windows_compatibility.h"

#include <io.h>
#else
#include <fcntl.h>
#include <unistd.h>
#endif
#include <atomic>
#include <cerrno>
#include <chrono>
Expand All @@ -52,22 +44,16 @@

namespace arrow {
namespace flight {

namespace {
#if (ATOMIC_INT_LOCK_FREE != 2 || ATOMIC_POINTER_LOCK_FREE != 2)
#error "atomic ints and atomic pointers not always lock-free!"
#endif

using ::arrow::internal::SelfPipe;
using ::arrow::internal::SetSignalHandler;
using ::arrow::internal::SignalHandler;

#ifdef WIN32
#define PIPE_WRITE _write
#define PIPE_READ _read
#else
#define PIPE_WRITE write
#define PIPE_READ read
#endif

/// RAII guard that manages a self-pipe and a thread that listens on
/// the self-pipe, shutting down the server when a signal handler
/// writes to the pipe.
Expand All @@ -80,51 +66,22 @@ class ServerSignalHandler {
///
/// \return the fd of the write side of the pipe.
template <typename Fn>
arrow::Result<int> Init(Fn handler) {
ARROW_ASSIGN_OR_RAISE(auto pipe, arrow::internal::CreatePipe());
#ifndef WIN32
// Make write end nonblocking
int flags = fcntl(pipe.wfd, F_GETFL);
if (flags == -1) {
RETURN_NOT_OK(arrow::internal::FileClose(pipe.rfd));
RETURN_NOT_OK(arrow::internal::FileClose(pipe.wfd));
return arrow::internal::IOErrorFromErrno(
errno, "Could not initialize self-pipe to wait for signals");
}
flags |= O_NONBLOCK;
if (fcntl(pipe.wfd, F_SETFL, flags) == -1) {
RETURN_NOT_OK(arrow::internal::FileClose(pipe.rfd));
RETURN_NOT_OK(arrow::internal::FileClose(pipe.wfd));
return arrow::internal::IOErrorFromErrno(
errno, "Could not initialize self-pipe to wait for signals");
}
#endif
self_pipe_ = pipe;
handle_signals_ = std::thread(handler, self_pipe_.rfd);
return self_pipe_.wfd;
arrow::Result<std::shared_ptr<SelfPipe>> Init(Fn handler) {
ARROW_ASSIGN_OR_RAISE(self_pipe_, SelfPipe::Make(/*signal_safe=*/true));
handle_signals_ = std::thread(handler, self_pipe_);
return self_pipe_;
}

Status Shutdown() {
if (self_pipe_.rfd == 0) {
// Already closed
return Status::OK();
}
if (PIPE_WRITE(self_pipe_.wfd, "0", 1) < 0 && errno != EAGAIN &&
errno != EWOULDBLOCK && errno != EINTR) {
return arrow::internal::IOErrorFromErrno(errno, "Could not unblock signal thread");
}
RETURN_NOT_OK(self_pipe_->Shutdown());
handle_signals_.join();
RETURN_NOT_OK(arrow::internal::FileClose(self_pipe_.rfd));
RETURN_NOT_OK(arrow::internal::FileClose(self_pipe_.wfd));
self_pipe_.rfd = 0;
self_pipe_.wfd = 0;
return Status::OK();
}

~ServerSignalHandler() { ARROW_CHECK_OK(Shutdown()); }

private:
arrow::internal::Pipe self_pipe_;
std::shared_ptr<SelfPipe> self_pipe_;
std::thread handle_signals_;
};
} // namespace
Expand All @@ -140,7 +97,7 @@ struct FlightServerBase::Impl {
static std::atomic<Impl*> running_instance_;
// We'll use the self-pipe trick to notify a thread from the signal
// handler. The thread will then shut down the server.
int self_pipe_wfd_;
std::shared_ptr<SelfPipe> self_pipe_;

// Signal handling
std::vector<int> signals_;
Expand All @@ -156,24 +113,17 @@ struct FlightServerBase::Impl {

void DoHandleSignal(int signum) {
got_signal_ = signum;
int saved_errno = errno;
if (PIPE_WRITE(self_pipe_wfd_, "0", 1) < 0) {
// Can't do much here, though, pipe is nonblocking so hopefully this doesn't happen
ARROW_LOG(WARNING) << "FlightServerBase: failed to handle signal " << signum
<< " errno: " << errno;
}
errno = saved_errno;

// Send dummy payload over self-pipe
self_pipe_->Send(/*payload=*/0);
}

static void WaitForSignals(int fd) {
// Wait for a signal handler to write to the pipe
int8_t buf[1];
while (PIPE_READ(fd, /*buf=*/buf, /*count=*/1) == -1) {
if (errno == EINTR) {
continue;
}
ARROW_CHECK_OK(arrow::internal::IOErrorFromErrno(
errno, "Error while waiting for shutdown signal"));
static void WaitForSignals(std::shared_ptr<SelfPipe> self_pipe) {
// Wait for a signal handler to wake up the pipe
auto st = self_pipe->Wait().status();
// Status::Invalid means the pipe was shutdown without any wakeup
if (!st.ok() && !st.IsInvalid()) {
ARROW_LOG(FATAL) << "Failed to wait on self-pipe: " << st.ToString();
}
auto instance = running_instance_.load();
if (instance != nullptr) {
Expand Down Expand Up @@ -232,8 +182,7 @@ Status FlightServerBase::Serve() {
impl_->running_instance_ = impl_.get();

ServerSignalHandler signal_handler;
ARROW_ASSIGN_OR_RAISE(impl_->self_pipe_wfd_,
signal_handler.Init(&Impl::WaitForSignals));
ARROW_ASSIGN_OR_RAISE(impl_->self_pipe_, signal_handler.Init(&Impl::WaitForSignals));
// Override existing signal handlers with our own handler so as to stop the server.
for (size_t i = 0; i < impl_->signals_.size(); ++i) {
int signum = impl_->signals_[i];
Expand Down
62 changes: 20 additions & 42 deletions cpp/src/arrow/io/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,13 @@

namespace arrow {

using internal::FileDescriptor;
using internal::IOErrorFromErrno;

namespace io {

class OSFile {
public:
OSFile() : fd_(-1), is_open_(false), size_(-1), need_seeking_(false) {}

~OSFile() {}

// Note: only one of the Open* methods below may be called on a given instance

Status OpenWritable(const std::string& path, bool truncate, bool append,
Expand All @@ -76,11 +73,10 @@ class OSFile {

ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenWritable(file_name_, write_only,
truncate, append));
is_open_ = true;
mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE;

if (!truncate) {
ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_.fd()));
} else {
size_ = 0;
}
Expand All @@ -98,55 +94,42 @@ class OSFile {
size_ = -1;
}
RETURN_NOT_OK(SetFileName(fd));
is_open_ = true;
mode_ = FileMode::WRITE;
fd_ = fd;
fd_ = FileDescriptor(fd);
return Status::OK();
}

Status OpenReadable(const std::string& path) {
RETURN_NOT_OK(SetFileName(path));

ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenReadable(file_name_));
ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_.fd()));

is_open_ = true;
mode_ = FileMode::READ;
return Status::OK();
}

Status OpenReadable(int fd) {
ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd));
RETURN_NOT_OK(SetFileName(fd));
is_open_ = true;
mode_ = FileMode::READ;
fd_ = fd;
fd_ = FileDescriptor(fd);
return Status::OK();
}

Status CheckClosed() const {
if (!is_open_) {
if (fd_.closed()) {
return Status::Invalid("Invalid operation on closed file");
}
return Status::OK();
}

Status Close() {
if (is_open_) {
// Even if closing fails, the fd will likely be closed (perhaps it's
// already closed).
is_open_ = false;
int fd = fd_;
fd_ = -1;
RETURN_NOT_OK(::arrow::internal::FileClose(fd));
}
return Status::OK();
}
Status Close() { return fd_.Close(); }

Result<int64_t> Read(int64_t nbytes, void* out) {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(CheckPositioned());
return ::arrow::internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes);
return ::arrow::internal::FileRead(fd_.fd(), reinterpret_cast<uint8_t*>(out), nbytes);
}

Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
Expand All @@ -155,16 +138,16 @@ class OSFile {
// ReadAt() leaves the file position undefined, so require that we seek
// before calling Read() or Write().
need_seeking_.store(true);
return ::arrow::internal::FileReadAt(fd_, reinterpret_cast<uint8_t*>(out), position,
nbytes);
return ::arrow::internal::FileReadAt(fd_.fd(), reinterpret_cast<uint8_t*>(out),
position, nbytes);
}

Status Seek(int64_t pos) {
RETURN_NOT_OK(CheckClosed());
if (pos < 0) {
return Status::Invalid("Invalid position");
}
Status st = ::arrow::internal::FileSeek(fd_, pos);
Status st = ::arrow::internal::FileSeek(fd_.fd(), pos);
if (st.ok()) {
need_seeking_.store(false);
}
Expand All @@ -173,7 +156,7 @@ class OSFile {

Result<int64_t> Tell() const {
RETURN_NOT_OK(CheckClosed());
return ::arrow::internal::FileTell(fd_);
return ::arrow::internal::FileTell(fd_.fd());
}

Status Write(const void* data, int64_t length) {
Expand All @@ -184,13 +167,13 @@ class OSFile {
if (length < 0) {
return Status::IOError("Length must be non-negative");
}
return ::arrow::internal::FileWrite(fd_, reinterpret_cast<const uint8_t*>(data),
return ::arrow::internal::FileWrite(fd_.fd(), reinterpret_cast<const uint8_t*>(data),
length);
}

int fd() const { return fd_; }
int fd() const { return fd_.fd(); }

bool is_open() const { return is_open_; }
bool is_open() const { return !fd_.closed(); }

int64_t size() const { return size_; }

Expand Down Expand Up @@ -221,16 +204,11 @@ class OSFile {
::arrow::internal::PlatformFilename file_name_;

std::mutex lock_;

// File descriptor
int fd_;

FileDescriptor fd_;
FileMode::type mode_;

bool is_open_;
int64_t size_;
int64_t size_{-1};
// Whether ReadAt made the file position non-deterministic.
std::atomic<bool> need_seeking_;
std::atomic<bool> need_seeking_{false};
};

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -287,7 +265,7 @@ class ReadableFile::ReadableFileImpl : public OSFile {
for (const auto& range : ranges) {
RETURN_NOT_OK(internal::ValidateRange(range.offset, range.length));
#if defined(POSIX_FADV_WILLNEED)
int ret = posix_fadvise(fd_, range.offset, range.length, POSIX_FADV_WILLNEED);
int ret = posix_fadvise(fd_.fd(), range.offset, range.length, POSIX_FADV_WILLNEED);
if (ret) {
RETURN_NOT_OK(report_error(ret, "posix_fadvise failed"));
}
Expand All @@ -296,7 +274,7 @@ class ReadableFile::ReadableFileImpl : public OSFile {
off_t ra_offset;
int ra_count;
} radvisory{range.offset, static_cast<int>(range.length)};
if (radvisory.ra_count > 0 && fcntl(fd_, F_RDADVISE, &radvisory) == -1) {
if (radvisory.ra_count > 0 && fcntl(fd_.fd(), F_RDADVISE, &radvisory) == -1) {
RETURN_NOT_OK(report_error(errno, "fcntl(fd, F_RDADVISE, ...) failed"));
}
#else
Expand Down
Loading