diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index e6b3f481e21..0314a1cbd19 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -102,7 +102,10 @@ supported_dplyr_methods <- list( configure_tzdb() } - # register extension types that we use internally + # Set interrupt handlers + SetEnableSignalStopSource(TRUE) + + # Register extension types that we use internally reregister_extension_type(vctrs_extension_type(vctrs::unspecified())) invisible() @@ -142,6 +145,19 @@ configure_tzdb <- function() { }) } +# Clean up the StopSource that was registered in .onLoad() so that if the +# package is reloaded we don't get an error from C++ informing us that +# a StopSource has already been set up. +.onUnload <- function(...) { + DeinitializeMainRThread() +} + +# While .onUnload should be sufficient, devtools::load_all() does not call it +# (but it does call .onDetach()). It is safe to call DeinitializeMainRThread() +# more than once. +.onDetach <- function(...) { + DeinitializeMainRThread() +} # True when the OS is linux + and the R version is development # helpful for skipping on Valgrind, and the sanitizer checks (clang + gcc) on cran diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 35c73e547c9..b73bef71023 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1820,6 +1820,14 @@ InitializeMainRThread <- function() { invisible(.Call(`_arrow_InitializeMainRThread`)) } +DeinitializeMainRThread <- function() { + invisible(.Call(`_arrow_DeinitializeMainRThread`)) +} + +SetEnableSignalStopSource <- function(enabled) { + .Call(`_arrow_SetEnableSignalStopSource`, enabled) +} + CanRunWithCapturedR <- function() { .Call(`_arrow_CanRunWithCapturedR`) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 26ec6e3d9b1..aa4fd01af49 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -4658,6 +4658,22 @@ BEGIN_CPP11 END_CPP11 } // safe-call-into-r-impl.cpp +void DeinitializeMainRThread(); +extern "C" SEXP _arrow_DeinitializeMainRThread(){ +BEGIN_CPP11 + DeinitializeMainRThread(); + return R_NilValue; +END_CPP11 +} +// safe-call-into-r-impl.cpp +bool SetEnableSignalStopSource(bool enabled); +extern "C" SEXP _arrow_SetEnableSignalStopSource(SEXP enabled_sexp){ +BEGIN_CPP11 + arrow::r::Input::type enabled(enabled_sexp); + return cpp11::as_sexp(SetEnableSignalStopSource(enabled)); +END_CPP11 +} +// safe-call-into-r-impl.cpp bool CanRunWithCapturedR(); extern "C" SEXP _arrow_CanRunWithCapturedR(){ BEGIN_CPP11 @@ -5657,6 +5673,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 4}, { "_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 4}, { "_arrow_InitializeMainRThread", (DL_FUNC) &_arrow_InitializeMainRThread, 0}, + { "_arrow_DeinitializeMainRThread", (DL_FUNC) &_arrow_DeinitializeMainRThread, 0}, + { "_arrow_SetEnableSignalStopSource", (DL_FUNC) &_arrow_SetEnableSignalStopSource, 1}, { "_arrow_CanRunWithCapturedR", (DL_FUNC) &_arrow_CanRunWithCapturedR, 0}, { "_arrow_TestSafeCallIntoR", (DL_FUNC) &_arrow_TestSafeCallIntoR, 2}, { "_arrow_Array__GetScalar", (DL_FUNC) &_arrow_Array__GetScalar, 2}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index dd0dc24449e..c29a31fd663 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -94,8 +94,8 @@ class UnwindProtectDetail : public StatusDetail { virtual std::string ToString() const { return "R code execution error"; } }; -static inline Status StatusUnwindProtect(SEXP token) { - return Status::Invalid("R code execution error") +static inline Status StatusUnwindProtect(SEXP token, std::string reason = "") { + return Status::Invalid("R code execution error (", reason, ")") .WithDetail(std::make_shared(token)); } diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 5af6450050e..ca980345dfb 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -73,10 +73,14 @@ class ExecPlanReader : public arrow::RecordBatchReader { ExecPlanReader(const std::shared_ptr& plan, const std::shared_ptr& schema, arrow::AsyncGenerator> sink_gen) - : schema_(schema), plan_(plan), sink_gen_(sink_gen), status_(PLAN_NOT_STARTED) {} + : schema_(schema), + plan_(plan), + sink_gen_(sink_gen), + plan_status_(PLAN_NOT_STARTED), + stop_token_(MainRThread::GetInstance().GetStopToken()) {} std::string PlanStatus() const { - switch (status_) { + switch (plan_status_) { case PLAN_NOT_STARTED: return "PLAN_NOT_STARTED"; case PLAN_RUNNING: @@ -91,21 +95,27 @@ class ExecPlanReader : public arrow::RecordBatchReader { std::shared_ptr schema() const override { return schema_; } arrow::Status ReadNext(std::shared_ptr* batch_out) override { - // TODO(ARROW-11841) check a StopToken to potentially cancel this plan - // If this is the first batch getting pulled, tell the exec plan to // start producing - if (status_ == PLAN_NOT_STARTED) { + if (plan_status_ == PLAN_NOT_STARTED) { ARROW_RETURN_NOT_OK(StartProducing()); } // If we've closed the reader, keep sending nullptr // (consistent with what most RecordBatchReader subclasses do) - if (status_ == PLAN_FINISHED) { + if (plan_status_ == PLAN_FINISHED) { batch_out->reset(); return arrow::Status::OK(); } + // Check for cancellation and stop the plan if we have a request. When + // the ExecPlan supports passing a StopToken and handling this itself, + // this will be redundant. + if (stop_token_.IsStopRequested()) { + StopProducing(); + return stop_token_.Poll(); + } + auto out = sink_gen_().result(); if (!out.ok()) { StopProducing(); @@ -141,16 +151,17 @@ class ExecPlanReader : public arrow::RecordBatchReader { std::shared_ptr schema_; std::shared_ptr plan_; arrow::AsyncGenerator> sink_gen_; - int status_; + ExecPlanReaderStatus plan_status_; + arrow::StopToken stop_token_; arrow::Status StartProducing() { ARROW_RETURN_NOT_OK(plan_->StartProducing()); - status_ = PLAN_RUNNING; + plan_status_ = PLAN_RUNNING; return arrow::Status::OK(); } void StopProducing() { - if (status_ == PLAN_RUNNING) { + if (plan_status_ == PLAN_RUNNING) { // We're done with the plan, but it may still need some time // to finish and clean up after itself. To do this, we give a // callable with its own copy of the shared_ptr so @@ -164,7 +175,7 @@ class ExecPlanReader : public arrow::RecordBatchReader { } } - status_ = PLAN_FINISHED; + plan_status_ = PLAN_FINISHED; plan_.reset(); sink_gen_ = arrow::MakeEmptyGenerator>(); } diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 7ce55feb5fe..7747369300a 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -31,7 +31,7 @@ std::shared_ptr csv___WriteOptions__initialize( std::make_shared(arrow::csv::WriteOptions::Defaults()); res->include_header = cpp11::as_cpp(options["include_header"]); res->batch_size = cpp11::as_cpp(options["batch_size"]); - res->io_context = arrow::io::IOContext(gc_memory_pool()); + res->io_context = MainRThread::GetInstance().CancellableIOContext(); return res; } @@ -154,9 +154,9 @@ std::shared_ptr csv___TableReader__Make( const std::shared_ptr& read_options, const std::shared_ptr& parse_options, const std::shared_ptr& convert_options) { - return ValueOrStop(arrow::csv::TableReader::Make(arrow::io::IOContext(gc_memory_pool()), - input, *read_options, *parse_options, - *convert_options)); + return ValueOrStop(arrow::csv::TableReader::Make( + MainRThread::GetInstance().CancellableIOContext(), input, *read_options, + *parse_options, *convert_options)); } // [[arrow::export]] diff --git a/r/src/extension-impl.cpp b/r/src/extension-impl.cpp index e6efcf36479..a13b252b283 100644 --- a/r/src/extension-impl.cpp +++ b/r/src/extension-impl.cpp @@ -80,7 +80,7 @@ arrow::Result> RExtensionType::Deserialize( // an event loop from wherever this *might* be called is high and hard to // predict. As a compromise, just create the instance when it is safe to // do so. - if (GetMainRThread().IsMainThread()) { + if (MainRThread::GetInstance().IsMainThread()) { r6_instance(); } diff --git a/r/src/filesystem.cpp b/r/src/filesystem.cpp index f6c5499bd3d..f42fa6875e9 100644 --- a/r/src/filesystem.cpp +++ b/r/src/filesystem.cpp @@ -16,6 +16,7 @@ // under the License. #include "./arrow_types.h" +#include "./safe-call-into-r.h" #include #include @@ -239,7 +240,7 @@ std::string fs___FileSystem__type_name( // [[arrow::export]] std::shared_ptr fs___LocalFileSystem__create() { // Affects OpenInputFile/OpenInputStream - auto io_context = arrow::io::IOContext(gc_memory_pool()); + auto io_context = MainRThread::GetInstance().CancellableIOContext(); return std::make_shared(io_context); } @@ -334,7 +335,7 @@ std::shared_ptr fs___S3FileSystem__create( s3_opts.allow_bucket_creation = allow_bucket_creation; s3_opts.allow_bucket_deletion = allow_bucket_deletion; - auto io_context = arrow::io::IOContext(gc_memory_pool()); + auto io_context = MainRThread::GetInstance().CancellableIOContext(); return ValueOrStop(fs::S3FileSystem::Make(s3_opts, io_context)); } @@ -412,7 +413,7 @@ std::shared_ptr fs___GcsFileSystem__Make(bool anonymous, gcs_opts.default_metadata = strings_to_kvm(options["default_metadata"]); } - auto io_context = arrow::io::IOContext(gc_memory_pool()); + auto io_context = MainRThread::GetInstance().CancellableIOContext(); // TODO(ARROW-16884): update when this returns Result return fs::GcsFileSystem::Make(gcs_opts, io_context); } diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index b37ae5df78a..aa517995856 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -296,7 +296,7 @@ class AsArrowArrayConverter : public RConverter { arrays_.push_back(std::move(array)); return Status::OK(); } catch (cpp11::unwind_exception& e) { - return StatusUnwindProtect(e.token); + return StatusUnwindProtect(e.token, "calling as_arrow_array()"); } } diff --git a/r/src/safe-call-into-r-impl.cpp b/r/src/safe-call-into-r-impl.cpp index 6b3ebc9fccb..92dce7e0ba0 100644 --- a/r/src/safe-call-into-r-impl.cpp +++ b/r/src/safe-call-into-r-impl.cpp @@ -21,18 +21,33 @@ #include #include -MainRThread& GetMainRThread() { +MainRThread& MainRThread::GetInstance() { static MainRThread main_r_thread; return main_r_thread; } // [[arrow::export]] -void InitializeMainRThread() { GetMainRThread().Initialize(); } +void InitializeMainRThread() { MainRThread::GetInstance().Initialize(); } + +// [[arrow::export]] +void DeinitializeMainRThread() { MainRThread::GetInstance().Deinitialize(); } + +// [[arrow::export]] +bool SetEnableSignalStopSource(bool enabled) { + bool was_enabled = MainRThread::GetInstance().SignalStopSourceEnabled(); + if (was_enabled && !enabled) { + MainRThread::GetInstance().DisableSignalStopSource(); + } else if (!was_enabled && enabled) { + MainRThread::GetInstance().EnableSignalStopSource(); + } + + return was_enabled; +} // [[arrow::export]] bool CanRunWithCapturedR() { #if defined(HAS_UNWIND_PROTECT) - return GetMainRThread().Executor() == nullptr; + return MainRThread::GetInstance().Executor() == nullptr; #else return false; #endif @@ -42,31 +57,28 @@ bool CanRunWithCapturedR() { std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, std::string opt) { if (opt == "async_with_executor") { - std::thread* thread_ptr; + std::thread thread; - auto result = - RunWithCapturedR([&thread_ptr, r_fun_that_returns_a_string]() { - auto fut = arrow::Future::Make(); - thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable { - auto result = SafeCallIntoR([&] { - return cpp11::as_cpp(r_fun_that_returns_a_string()); - }); + auto result = RunWithCapturedR([&thread, r_fun_that_returns_a_string]() { + auto fut = arrow::Future::Make(); + thread = std::thread([&fut, r_fun_that_returns_a_string]() { + auto result = SafeCallIntoR( + [&] { return cpp11::as_cpp(r_fun_that_returns_a_string()); }); - fut.MarkFinished(result); - }); + fut.MarkFinished(result); + }); - return fut; - }); + return fut; + }); - thread_ptr->join(); - delete thread_ptr; + if (thread.joinable()) { + thread.join(); + } return arrow::ValueOrStop(result); } else if (opt == "async_without_executor") { - std::thread* thread_ptr; - auto fut = arrow::Future::Make(); - thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable { + std::thread thread([&fut, r_fun_that_returns_a_string]() { auto result = SafeCallIntoR( [&] { return cpp11::as_cpp(r_fun_that_returns_a_string()); }); @@ -77,8 +89,7 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string, } }); - thread_ptr->join(); - delete thread_ptr; + thread.join(); // We should be able to get this far, but fut will contain an error // because it tried to evaluate R code from another thread diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 5e24a3892b1..319d46d11f0 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -21,9 +21,11 @@ #include "./arrow_types.h" #include +#include #include #include +#include #include #include @@ -38,12 +40,13 @@ bool CanRunWithCapturedR(); // The MainRThread class keeps track of the thread on which it is safe // to call the R API to facilitate its safe use (or erroring // if it is not safe). The MainRThread singleton can be accessed from -// any thread using GetMainRThread(); the preferred way to call +// any thread using MainRThread::GetInstance(); the preferred way to call // the R API where it may not be safe to do so is to use // SafeCallIntoR([&]() { ... }). class MainRThread { public: - MainRThread() : initialized_(false), executor_(nullptr) {} + // Return a reference to the MainRThread singleton + static MainRThread& GetInstance(); // Call this method from the R thread (e.g., on package load) // to save an internal copy of the thread id. @@ -55,9 +58,51 @@ class MainRThread { bool IsInitialized() { return initialized_; } + void Deinitialize() { + initialized_ = false; + DisableSignalStopSource(); + } + // Check if the current thread is the main R thread bool IsMainThread() { return initialized_ && std::this_thread::get_id() == thread_id_; } + arrow::StopToken GetStopToken() { + if (SignalStopSourceEnabled()) { + return stop_source_->token(); + } else { + return arrow::StopToken::Unstoppable(); + } + } + + bool SignalStopSourceEnabled() { return stop_source_ != nullptr; } + + void EnableSignalStopSource() { + // Try to set up the stop source. If another library linking to + // the same libarrow shared object has already done this, this call + // will fail (which is OK, we just don't get the ability to cancel) + if (!SignalStopSourceEnabled()) { + auto maybe_stop_source = arrow::SetSignalStopSource(); + if (maybe_stop_source.ok()) { + stop_source_ = maybe_stop_source.ValueUnsafe(); + } else { + cpp11::warning("Failed to enable user cancellation: %s", + maybe_stop_source.status().message().c_str()); + } + } + } + + void DisableSignalStopSource() { + if (SignalStopSourceEnabled()) { + arrow::ResetSignalStopSource(); + stop_source_ = nullptr; + } + } + + arrow::io::IOContext CancellableIOContext() { + return arrow::io::IOContext(gc_memory_pool(), + MainRThread::GetInstance().GetStopToken()); + } + // Check if a SafeCallIntoR call is able to execute bool CanExecuteSafeCallIntoR() { return IsMainThread() || executor_ != nullptr; } @@ -75,11 +120,15 @@ class MainRThread { // Check if there is a saved error bool HasError() { return !status_.ok(); } - // Throw a cpp11::unwind_exception() if - void ClearError() { + // Resets this object after a RunWithCapturedR is about to return + // to the R interpreter. + arrow::Status ReraiseErrorIfExists() { + if (SignalStopSourceEnabled()) { + stop_source_->Reset(); + } arrow::Status maybe_error_status = status_; ResetError(); - arrow::StopIfNotOk(maybe_error_status); + return maybe_error_status; } private: @@ -87,10 +136,75 @@ class MainRThread { std::thread::id thread_id_; arrow::Status status_; arrow::internal::Executor* executor_; + arrow::StopSource* stop_source_; + + MainRThread() : initialized_(false), executor_(nullptr), stop_source_(nullptr) {} }; -// Retrieve the MainRThread singleton -MainRThread& GetMainRThread(); +// This object is used to ensure that signal hanlders are registered when +// RunWithCapturedR launches its background thread to call Arrow and is +// cleaned up however this exits. Note that the lifecycle of the StopSource, +// which is registered at package load, is not necessarily tied to the +// lifecycle of the signal handlers. The general approach is to register +// the signal handlers only when we are evaluating code outside the R thread +// (when we are evaluating code *on* the R thread, R's signal handlers are +// sufficient and will signal an interupt condition that will propagate +// via a cpp11::unwind_excpetion). +class WithSignalHandlerContext { + public: + WithSignalHandlerContext() : signal_handler_registered_(false) { + if (MainRThread::GetInstance().SignalStopSourceEnabled()) { + arrow::Status result = arrow::RegisterCancellingSignalHandler({SIGINT}); + + // If this result was not OK we don't get cancellation for the + // lifecycle of this object; however, we can still carry on. This + // can occur when forking the R process (e.g., using parallel::mclapply()). + if (result.ok()) { + signal_handler_registered_ = true; + } else { + result.Warn(); + } + } + } + + ~WithSignalHandlerContext() { + if (signal_handler_registered_) { + arrow::UnregisterCancellingSignalHandler(); + } + } + + private: + bool signal_handler_registered_; +}; + +// This is an object whose scope ensures we do not register signal handlers when +// evaluating R code when that evaluation happens via SafeCallIntoR. +class WithoutSignalHandlerContext { + public: + WithoutSignalHandlerContext() : signal_handler_unregistered_(false) { + if (MainRThread::GetInstance().SignalStopSourceEnabled()) { + arrow::UnregisterCancellingSignalHandler(); + signal_handler_unregistered_ = true; + } + } + + ~WithoutSignalHandlerContext() { + if (signal_handler_unregistered_) { + arrow::Status result = arrow::RegisterCancellingSignalHandler({SIGINT}); + + // This is unlikely because the signal handlers were previously registered; + // however, it's better to warn here instead of error because it doesn't + // affect what the user tried to do (it probably just means we didn't + // anticipate a use case). + if (!result.ok()) { + result.Warn(); + } + } + } + + private: + bool signal_handler_unregistered_; +}; // Call into R and return a C++ object. Note that you can't return // a SEXP (use cpp11::as_cpp to convert it to a C++ type inside @@ -98,7 +212,7 @@ MainRThread& GetMainRThread(); template arrow::Future SafeCallIntoRAsync(std::function(void)> fun, std::string reason = "unspecified") { - MainRThread& main_r_thread = GetMainRThread(); + MainRThread& main_r_thread = MainRThread::GetInstance(); if (main_r_thread.IsMainThread()) { // If we're on the main thread, run the task immediately and let // the cpp11::unwind_exception be thrown since it will be caught @@ -108,28 +222,27 @@ arrow::Future SafeCallIntoRAsync(std::function(void)> fun, // If we are not on the main thread and have an Executor, // use it to run the task on the main R thread. We can't throw // a cpp11::unwind_exception here, so we need to propagate it back - // to RunWithCapturedR through the MainRThread singleton. - return DeferNotOk(main_r_thread.Executor()->Submit([fun, reason]() { + // to RunWithCapturedR through the MainRThread instance. + return DeferNotOk(main_r_thread.Executor()->Submit([fun, + reason]() -> arrow::Result { // This occurs when some other R code that was previously scheduled to run // has errored, in which case we skip execution and let the original // error surface. - if (GetMainRThread().HasError()) { - return arrow::Result( - arrow::Status::Cancelled("Previous R code execution error (", reason, ")")); + if (MainRThread::GetInstance().HasError()) { + return arrow::Status::Cancelled("Previous R code execution error (", reason, ")"); } try { + WithoutSignalHandlerContext context; return fun(); } catch (cpp11::unwind_exception& e) { - // Here we save the token and set the main R thread to an error state - GetMainRThread().SetError(arrow::StatusUnwindProtect(e.token)); - - // We also return an error although this should not surface because - // main_r_thread.ClearError() will get called before this value can be - // returned and will StopIfNotOk(). We don't save the error token here - // to ensure that it will only get thrown once. - return arrow::Result( - arrow::Status::UnknownError("R code execution error (", reason, ")")); + // Set the MainRThread error so that subsequent calls to SafeCallIntoR + // know not to execute R code. + MainRThread::GetInstance().SetError(arrow::StatusUnwindProtect(e.token, reason)); + + // Return an error Status (which is unlikely to surface since RunWithCapturedR + // will preferentially return the MainRThread error). + return arrow::Status::Invalid("R code execution error (", reason, ")"); } })); } else { @@ -165,21 +278,23 @@ arrow::Result RunWithCapturedR(std::function()> make_arrow_c return arrow::Status::NotImplemented("RunWithCapturedR() without UnwindProtect"); } - if (GetMainRThread().Executor() != nullptr) { + if (MainRThread::GetInstance().Executor() != nullptr) { return arrow::Status::AlreadyExists("Attempt to use more than one R Executor()"); } - GetMainRThread().ResetError(); - + MainRThread::GetInstance().ResetError(); + WithSignalHandlerContext context; arrow::Result result = arrow::internal::SerialExecutor::RunInSerialExecutor( [make_arrow_call](arrow::internal::Executor* executor) { - GetMainRThread().Executor() = executor; + MainRThread::GetInstance().Executor() = executor; return make_arrow_call(); }); - GetMainRThread().Executor() = nullptr; - GetMainRThread().ClearError(); + MainRThread::GetInstance().Executor() = nullptr; + // A StatusUnwindProtect error, if it was thrown, lives in the MainRThread and + // should be returned if possible. + ARROW_RETURN_NOT_OK(MainRThread::GetInstance().ReraiseErrorIfExists()); return result; } @@ -212,8 +327,8 @@ static inline arrow::Status RunWithCapturedRIfPossibleVoid( ARROW_RETURN_NOT_OK(make_arrow_call()); return true; }); - ARROW_RETURN_NOT_OK(result); - return arrow::Status::OK(); + + return result.status(); } #endif