Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6af6955
backport improvements from UDFs PR
paleolimbot Jul 18, 2022
a3d2567
maybe basic signal handling overriding
paleolimbot Jul 18, 2022
dfc551c
maybe use signal handler api from arrow?
paleolimbot Jul 18, 2022
6dfb517
newline at end of file
paleolimbot Jul 18, 2022
77bf53d
maybe do the whole stop token thing properly
paleolimbot Jul 21, 2022
adf9907
remove trash, fix registering signal handler when there is no stop so…
paleolimbot Jul 21, 2022
4a73042
pkgdown
paleolimbot Jul 21, 2022
89c9ae8
don't rely on transitive include for SIGINT
paleolimbot Jul 21, 2022
8157472
nix the need for arrow_cancellable()
paleolimbot Jul 21, 2022
e7aa849
revert new pkgdown page
paleolimbot Jul 21, 2022
2a54856
Update r/src/safe-call-into-r.h
paleolimbot Jul 21, 2022
4eda9c2
more idiomatic storage of MainRThread singleton, better name for Clea…
paleolimbot Jul 21, 2022
a982529
helper for cancellable io context
paleolimbot Jul 21, 2022
f8318b6
properly clean up the stop token on package unload
paleolimbot Jul 21, 2022
38c9e7d
make cancellation via signal handler override opt-in
paleolimbot Jul 21, 2022
57e7817
synchronize when we can and cannot runwithcapturedr
paleolimbot Jul 21, 2022
20494b9
better name, pkgdown section
paleolimbot Jul 21, 2022
576c665
fix merges
paleolimbot Jul 22, 2022
18813ff
fix merges
paleolimbot Sep 16, 2022
0c361f5
check stop token in exec plan reader
paleolimbot Sep 16, 2022
dd3c810
actually enable the stop source
paleolimbot Sep 16, 2022
57b51b5
remove user-facing option to enable handlers since it's on by default…
paleolimbot Sep 16, 2022
4ed6641
only use cancelling in interactive session
paleolimbot Sep 20, 2022
ef6fdd2
propagate R code execution errors via Status instead of via exception
paleolimbot Sep 20, 2022
a3d7a10
don't use raw thread pointers in tests
paleolimbot Sep 20, 2022
0201112
fix include
paleolimbot Sep 20, 2022
f4480f2
always set the interrupt handlers
paleolimbot Sep 22, 2022
e110159
polish some cpp
paleolimbot Sep 23, 2022
c1d16d2
make sure the stop source always gets reset
paleolimbot Sep 23, 2022
64d88a5
clang-format
paleolimbot Sep 23, 2022
a855d75
re-remove on old windows
paleolimbot Sep 23, 2022
8965e38
Update r/src/compute-exec.cpp
paleolimbot Sep 28, 2022
4ccd840
Update r/src/safe-call-into-r.h
paleolimbot Sep 28, 2022
e6aaa74
improvements to signal handling
paleolimbot Sep 28, 2022
9c965f2
improvements to compute-exec
paleolimbot Sep 28, 2022
bb617fd
don't join thread that may not have been assigned
paleolimbot Sep 28, 2022
f68ab9e
fix thread syntax
paleolimbot Sep 28, 2022
6d4f5cd
Update r/src/compute-exec.cpp
paleolimbot Sep 28, 2022
e30e59a
remove unnecessary if()
paleolimbot Sep 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ supported_dplyr_methods <- list(
configure_tzdb()
}

# register extension types that we use internally
# Set interrupt handlers
SetEnableSignalStopSource(TRUE)

# Register extension types that we use internally
reregister_extension_type(vctrs_extension_type(vctrs::unspecified()))

invisible()
Expand Down Expand Up @@ -142,6 +145,19 @@ configure_tzdb <- function() {
})
}

# Clean up the StopSource that was registered in .onLoad() so that if the
# package is reloaded we don't get an error from C++ informing us that
# a StopSource has already been set up.
.onUnload <- function(...) {
DeinitializeMainRThread()
}

# While .onUnload should be sufficient, devtools::load_all() does not call it
# (but it does call .onDetach()). It is safe to call DeinitializeMainRThread()
# more than once.
.onDetach <- function(...) {
DeinitializeMainRThread()
}

# True when the OS is linux + and the R version is development
# helpful for skipping on Valgrind, and the sanitizer checks (clang + gcc) on cran
Expand Down
8 changes: 8 additions & 0 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions r/src/arrow_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ class UnwindProtectDetail : public StatusDetail {
virtual std::string ToString() const { return "R code execution error"; }
};

static inline Status StatusUnwindProtect(SEXP token) {
return Status::Invalid("R code execution error")
static inline Status StatusUnwindProtect(SEXP token, std::string reason = "") {
return Status::Invalid("R code execution error (", reason, ")")
.WithDetail(std::make_shared<UnwindProtectDetail>(token));
}

Expand Down
31 changes: 21 additions & 10 deletions r/src/compute-exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,14 @@ class ExecPlanReader : public arrow::RecordBatchReader {
ExecPlanReader(const std::shared_ptr<arrow::compute::ExecPlan>& plan,
const std::shared_ptr<arrow::Schema>& schema,
arrow::AsyncGenerator<std::optional<compute::ExecBatch>> sink_gen)
: schema_(schema), plan_(plan), sink_gen_(sink_gen), status_(PLAN_NOT_STARTED) {}
: schema_(schema),
plan_(plan),
sink_gen_(sink_gen),
plan_status_(PLAN_NOT_STARTED),
stop_token_(MainRThread::GetInstance().GetStopToken()) {}

std::string PlanStatus() const {
switch (status_) {
switch (plan_status_) {
case PLAN_NOT_STARTED:
return "PLAN_NOT_STARTED";
case PLAN_RUNNING:
Expand All @@ -91,21 +95,27 @@ class ExecPlanReader : public arrow::RecordBatchReader {
std::shared_ptr<arrow::Schema> schema() const override { return schema_; }

arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch_out) override {
// TODO(ARROW-11841) check a StopToken to potentially cancel this plan

// If this is the first batch getting pulled, tell the exec plan to
// start producing
if (status_ == PLAN_NOT_STARTED) {
if (plan_status_ == PLAN_NOT_STARTED) {
ARROW_RETURN_NOT_OK(StartProducing());
}

// If we've closed the reader, keep sending nullptr
// (consistent with what most RecordBatchReader subclasses do)
if (status_ == PLAN_FINISHED) {
if (plan_status_ == PLAN_FINISHED) {
batch_out->reset();
return arrow::Status::OK();
}

// Check for cancellation and stop the plan if we have a request. When
// the ExecPlan supports passing a StopToken and handling this itself,
// this will be redundant.
if (stop_token_.IsStopRequested()) {
StopProducing();
return stop_token_.Poll();
}

auto out = sink_gen_().result();
if (!out.ok()) {
StopProducing();
Expand Down Expand Up @@ -141,16 +151,17 @@ class ExecPlanReader : public arrow::RecordBatchReader {
std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::compute::ExecPlan> plan_;
arrow::AsyncGenerator<std::optional<compute::ExecBatch>> sink_gen_;
int status_;
ExecPlanReaderStatus plan_status_;
arrow::StopToken stop_token_;

arrow::Status StartProducing() {
ARROW_RETURN_NOT_OK(plan_->StartProducing());
status_ = PLAN_RUNNING;
plan_status_ = PLAN_RUNNING;
return arrow::Status::OK();
}

void StopProducing() {
if (status_ == PLAN_RUNNING) {
if (plan_status_ == PLAN_RUNNING) {
// We're done with the plan, but it may still need some time
// to finish and clean up after itself. To do this, we give a
// callable with its own copy of the shared_ptr<ExecPlan> so
Expand All @@ -164,7 +175,7 @@ class ExecPlanReader : public arrow::RecordBatchReader {
}
}

status_ = PLAN_FINISHED;
plan_status_ = PLAN_FINISHED;
plan_.reset();
sink_gen_ = arrow::MakeEmptyGenerator<std::optional<compute::ExecBatch>>();
}
Expand Down
8 changes: 4 additions & 4 deletions r/src/csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ std::shared_ptr<arrow::csv::WriteOptions> csv___WriteOptions__initialize(
std::make_shared<arrow::csv::WriteOptions>(arrow::csv::WriteOptions::Defaults());
res->include_header = cpp11::as_cpp<bool>(options["include_header"]);
res->batch_size = cpp11::as_cpp<int>(options["batch_size"]);
res->io_context = arrow::io::IOContext(gc_memory_pool());
res->io_context = MainRThread::GetInstance().CancellableIOContext();
return res;
}

Expand Down Expand Up @@ -154,9 +154,9 @@ std::shared_ptr<arrow::csv::TableReader> csv___TableReader__Make(
const std::shared_ptr<arrow::csv::ReadOptions>& read_options,
const std::shared_ptr<arrow::csv::ParseOptions>& parse_options,
const std::shared_ptr<arrow::csv::ConvertOptions>& convert_options) {
return ValueOrStop(arrow::csv::TableReader::Make(arrow::io::IOContext(gc_memory_pool()),
input, *read_options, *parse_options,
*convert_options));
return ValueOrStop(arrow::csv::TableReader::Make(
MainRThread::GetInstance().CancellableIOContext(), input, *read_options,
*parse_options, *convert_options));
}

// [[arrow::export]]
Expand Down
2 changes: 1 addition & 1 deletion r/src/extension-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ arrow::Result<std::shared_ptr<arrow::DataType>> RExtensionType::Deserialize(
// an event loop from wherever this *might* be called is high and hard to
// predict. As a compromise, just create the instance when it is safe to
// do so.
if (GetMainRThread().IsMainThread()) {
if (MainRThread::GetInstance().IsMainThread()) {
r6_instance();
}

Expand Down
7 changes: 4 additions & 3 deletions r/src/filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include "./arrow_types.h"
#include "./safe-call-into-r.h"

#include <arrow/filesystem/filesystem.h>
#include <arrow/filesystem/localfs.h>
Expand Down Expand Up @@ -239,7 +240,7 @@ std::string fs___FileSystem__type_name(
// [[arrow::export]]
std::shared_ptr<fs::LocalFileSystem> fs___LocalFileSystem__create() {
// Affects OpenInputFile/OpenInputStream
auto io_context = arrow::io::IOContext(gc_memory_pool());
auto io_context = MainRThread::GetInstance().CancellableIOContext();
return std::make_shared<fs::LocalFileSystem>(io_context);
}

Expand Down Expand Up @@ -334,7 +335,7 @@ std::shared_ptr<fs::S3FileSystem> fs___S3FileSystem__create(
s3_opts.allow_bucket_creation = allow_bucket_creation;
s3_opts.allow_bucket_deletion = allow_bucket_deletion;

auto io_context = arrow::io::IOContext(gc_memory_pool());
auto io_context = MainRThread::GetInstance().CancellableIOContext();
return ValueOrStop(fs::S3FileSystem::Make(s3_opts, io_context));
}

Expand Down Expand Up @@ -412,7 +413,7 @@ std::shared_ptr<fs::GcsFileSystem> fs___GcsFileSystem__Make(bool anonymous,
gcs_opts.default_metadata = strings_to_kvm(options["default_metadata"]);
}

auto io_context = arrow::io::IOContext(gc_memory_pool());
auto io_context = MainRThread::GetInstance().CancellableIOContext();
// TODO(ARROW-16884): update when this returns Result
return fs::GcsFileSystem::Make(gcs_opts, io_context);
}
Expand Down
2 changes: 1 addition & 1 deletion r/src/r_to_arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ class AsArrowArrayConverter : public RConverter {
arrays_.push_back(std::move(array));
return Status::OK();
} catch (cpp11::unwind_exception& e) {
return StatusUnwindProtect(e.token);
return StatusUnwindProtect(e.token, "calling as_arrow_array()");
}
}

Expand Down
55 changes: 33 additions & 22 deletions r/src/safe-call-into-r-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,33 @@
#include <functional>
#include <thread>

MainRThread& GetMainRThread() {
MainRThread& MainRThread::GetInstance() {
static MainRThread main_r_thread;
return main_r_thread;
}

// [[arrow::export]]
void InitializeMainRThread() { GetMainRThread().Initialize(); }
void InitializeMainRThread() { MainRThread::GetInstance().Initialize(); }

// [[arrow::export]]
void DeinitializeMainRThread() { MainRThread::GetInstance().Deinitialize(); }

// [[arrow::export]]
bool SetEnableSignalStopSource(bool enabled) {
bool was_enabled = MainRThread::GetInstance().SignalStopSourceEnabled();
if (was_enabled && !enabled) {
MainRThread::GetInstance().DisableSignalStopSource();
} else if (!was_enabled && enabled) {
MainRThread::GetInstance().EnableSignalStopSource();
}

return was_enabled;
}

// [[arrow::export]]
bool CanRunWithCapturedR() {
#if defined(HAS_UNWIND_PROTECT)
return GetMainRThread().Executor() == nullptr;
return MainRThread::GetInstance().Executor() == nullptr;
#else
return false;
#endif
Expand All @@ -42,31 +57,28 @@ bool CanRunWithCapturedR() {
std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string,
std::string opt) {
if (opt == "async_with_executor") {
std::thread* thread_ptr;
std::thread thread;

auto result =
RunWithCapturedR<std::string>([&thread_ptr, r_fun_that_returns_a_string]() {
auto fut = arrow::Future<std::string>::Make();
thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable {
auto result = SafeCallIntoR<std::string>([&] {
return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string());
});
auto result = RunWithCapturedR<std::string>([&thread, r_fun_that_returns_a_string]() {
auto fut = arrow::Future<std::string>::Make();
thread = std::thread([&fut, r_fun_that_returns_a_string]() {
auto result = SafeCallIntoR<std::string>(
[&] { return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string()); });

fut.MarkFinished(result);
});
fut.MarkFinished(result);
});

return fut;
});
return fut;
});

thread_ptr->join();
delete thread_ptr;
if (thread.joinable()) {
thread.join();
}

return arrow::ValueOrStop(result);
} else if (opt == "async_without_executor") {
std::thread* thread_ptr;

auto fut = arrow::Future<std::string>::Make();
thread_ptr = new std::thread([fut, r_fun_that_returns_a_string]() mutable {
std::thread thread([&fut, r_fun_that_returns_a_string]() {
auto result = SafeCallIntoR<std::string>(
[&] { return cpp11::as_cpp<std::string>(r_fun_that_returns_a_string()); });

Expand All @@ -77,8 +89,7 @@ std::string TestSafeCallIntoR(cpp11::function r_fun_that_returns_a_string,
}
});

thread_ptr->join();
delete thread_ptr;
thread.join();

// We should be able to get this far, but fut will contain an error
// because it tried to evaluate R code from another thread
Expand Down
Loading