From a37a311ded5790f2d8333db1b6e8436481456f38 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 25 Mar 2022 09:39:23 -0300 Subject: [PATCH 01/22] resolve merge conflicts --- r/R/arrowExports.R | 12 ++ r/R/io.R | 43 +++++- r/R/parquet.R | 3 +- r/src/arrowExports.cpp | 27 ++++ r/src/io.cpp | 220 ++++++++++++++++++++++++++++- r/tests/testthat/test-csv.R | 11 ++ r/tests/testthat/test-feather.R | 11 ++ r/tests/testthat/test-io.R | 96 +++++++++++++ r/tests/testthat/test-ipc_stream.R | 26 ++++ r/tests/testthat/test-parquet.R | 17 +++ 10 files changed, 458 insertions(+), 8 deletions(-) create mode 100644 r/tests/testthat/test-ipc_stream.R diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 6b969336c93..81c29a3da04 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1384,6 +1384,18 @@ io___BufferOutputStream__Write <- function(stream, bytes) { invisible(.Call(`_arrow_io___BufferOutputStream__Write`, stream, bytes)) } +MakeRConnectionInputStream <- function(con) { + .Call(`_arrow_MakeRConnectionInputStream`, con) +} + +MakeRConnectionOutputStream <- function(con) { + .Call(`_arrow_MakeRConnectionOutputStream`, con) +} + +MakeRConnectionRandomAccessFile <- function(con) { + .Call(`_arrow_MakeRConnectionRandomAccessFile`, con) +} + MakeReencodeInputStream <- function(wrapped, from) { .Call(`_arrow_MakeReencodeInputStream`, wrapped, from) } diff --git a/r/R/io.R b/r/R/io.R index eafa24fc655..0587e520ec9 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -200,6 +200,7 @@ BufferReader$create <- function(x) { io___BufferReader__initialize(x) } + #' Create a new read/write memory mapped file of a given size #' #' @param path file path @@ -244,32 +245,59 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem } if (is.string(file)) { if (is_url(file)) { - fs_and_path <- FileSystem$from_uri(file) - filesystem <- fs_and_path$fs - file <- fs_and_path$path + file <- tryCatch({ + fs_and_path <- FileSystem$from_uri(file) + filesystem <- fs_and_path$fs + fs_and_path$path + }, error = function(e) { + MakeRConnectionInputStream(url(file, open = "rb")) + }) } + if (is.null(compression)) { # Infer compression from the file path compression <- detect_compression(file) } + if (!is.null(filesystem)) { file <- filesystem$OpenInputFile(file) - } else if (isTRUE(mmap)) { + } else if (is.string(file) && isTRUE(mmap)) { file <- mmap_open(file) - } else { + } else if (is.string(file)) { file <- ReadableFile$create(file) } + if (!identical(compression, "uncompressed")) { file <- CompressedInputStream$create(file, compression) } } else if (inherits(file, c("raw", "Buffer"))) { file <- BufferReader$create(file) + } else if (inherits(file, "connection")) { + if (!isOpen(file)) { + open(file, "rb") + } + + # isSeekable() is not sufficient to check for seekability + # because we rely on seek(whence = "end") to get the size + # of the stream and a gzfile() is "seekable". + file <- tryCatch( + MakeRConnectionRandomAccessFile(file), + error = function(e) MakeRConnectionInputStream(file) + ) } assert_is(file, "InputStream") file } make_output_stream <- function(x, filesystem = NULL) { + if (inherits(x, "connection")) { + if (!isOpen(x)) { + open(x, "wb") + } + + return(MakeRConnectionOutputStream(x)) + } + if (inherits(x, "SubTreeFileSystem")) { filesystem <- x$base_fs x <- x$base_path @@ -287,7 +315,10 @@ make_output_stream <- function(x, filesystem = NULL) { } detect_compression <- function(path) { - assert_that(is.string(path)) + if (!is.string(path)) { + return("uncompressed") + } + switch(tools::file_ext(path), bz2 = "bz2", gz = "gzip", diff --git a/r/R/parquet.R b/r/R/parquet.R index c6c00ed3a48..4d63791a4f5 100644 --- a/r/R/parquet.R +++ b/r/R/parquet.R @@ -38,7 +38,7 @@ read_parquet <- function(file, as_data_frame = TRUE, props = ParquetArrowReaderProperties$create(), ...) { - if (is.string(file)) { + if (!inherits(file, "RandomAccessFile")) { file <- make_readable_file(file) on.exit(file$close()) } @@ -541,6 +541,7 @@ ParquetFileReader$create <- function(file, ...) { file <- make_readable_file(file, mmap) assert_is(props, "ParquetArrowReaderProperties") + assert_is(file, "RandomAccessFile") parquet___arrow___FileReader__OpenFile(file, props) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index fb9f3b94d18..d101647a858 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -3442,6 +3442,30 @@ BEGIN_CPP11 END_CPP11 } // io.cpp +std::shared_ptr MakeRConnectionInputStream(cpp11::sexp con); +extern "C" SEXP _arrow_MakeRConnectionInputStream(SEXP con_sexp){ +BEGIN_CPP11 + arrow::r::Input::type con(con_sexp); + return cpp11::as_sexp(MakeRConnectionInputStream(con)); +END_CPP11 +} +// io.cpp +std::shared_ptr MakeRConnectionOutputStream(cpp11::sexp con); +extern "C" SEXP _arrow_MakeRConnectionOutputStream(SEXP con_sexp){ +BEGIN_CPP11 + arrow::r::Input::type con(con_sexp); + return cpp11::as_sexp(MakeRConnectionOutputStream(con)); +END_CPP11 +} +// io.cpp +std::shared_ptr MakeRConnectionRandomAccessFile(cpp11::sexp con); +extern "C" SEXP _arrow_MakeRConnectionRandomAccessFile(SEXP con_sexp){ +BEGIN_CPP11 + arrow::r::Input::type con(con_sexp); + return cpp11::as_sexp(MakeRConnectionRandomAccessFile(con)); +END_CPP11 +} +// io.cpp std::shared_ptr MakeReencodeInputStream(const std::shared_ptr& wrapped, std::string from); extern "C" SEXP _arrow_MakeReencodeInputStream(SEXP wrapped_sexp, SEXP from_sexp){ BEGIN_CPP11 @@ -5440,6 +5464,9 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_io___BufferOutputStream__Finish", (DL_FUNC) &_arrow_io___BufferOutputStream__Finish, 1}, { "_arrow_io___BufferOutputStream__Tell", (DL_FUNC) &_arrow_io___BufferOutputStream__Tell, 1}, { "_arrow_io___BufferOutputStream__Write", (DL_FUNC) &_arrow_io___BufferOutputStream__Write, 2}, + { "_arrow_MakeRConnectionInputStream", (DL_FUNC) &_arrow_MakeRConnectionInputStream, 1}, + { "_arrow_MakeRConnectionOutputStream", (DL_FUNC) &_arrow_MakeRConnectionOutputStream, 1}, + { "_arrow_MakeRConnectionRandomAccessFile", (DL_FUNC) &_arrow_MakeRConnectionRandomAccessFile, 1}, { "_arrow_MakeReencodeInputStream", (DL_FUNC) &_arrow_MakeReencodeInputStream, 2}, { "_arrow_json___ReadOptions__initialize", (DL_FUNC) &_arrow_json___ReadOptions__initialize, 2}, { "_arrow_json___ParseOptions__initialize1", (DL_FUNC) &_arrow_json___ParseOptions__initialize1, 1}, diff --git a/r/src/io.cpp b/r/src/io.cpp index 7127cbe9ee3..baec087b602 100644 --- a/r/src/io.cpp +++ b/r/src/io.cpp @@ -27,6 +27,8 @@ #include #include +#include + // ------ arrow::io::Readable // [[arrow::export]] @@ -207,7 +209,223 @@ void io___BufferOutputStream__Write( StopIfNotOk(stream->Write(RAW(bytes), bytes.size())); } -// TransformInputStream::TransformFunc wrapper +// ------ RConnectionInputStream / RConnectionOutputStream + +class RConnectionFileInterface : public virtual arrow::io::FileInterface { + public: + explicit RConnectionFileInterface(cpp11::sexp connection_sexp) + : connection_sexp_(connection_sexp), + closed_(false), + thread_id_(std::this_thread::get_id()) { + check_closed(); + } + + arrow::Status Close() { + if (closed_) { + return arrow::Status::OK(); + } + + RETURN_NOT_OK(check_thread_is_r_main()); + cpp11::package("base")["close"](connection_sexp_); + closed_ = true; + return arrow::Status::OK(); + } + + arrow::Result Tell() const { + if (closed()) { + return arrow::Status::IOError("R connection is closed"); + } + + cpp11::sexp result = cpp11::package("base")["seek"](connection_sexp_); + return cpp11::as_cpp(result); + } + + bool closed() const { return closed_; } + + protected: + cpp11::sexp connection_sexp_; + + // Define the logic here because multiple inheritance makes it difficult + // for this base class, the InputStream and the RandomAccessFile + // interfaces to co-exist. + arrow::Result ReadBase(int64_t nbytes, void* out) { + if (closed()) { + return arrow::Status::IOError("R connection is closed"); + } + + try { + RETURN_NOT_OK(check_thread_is_r_main()); + cpp11::function read_bin = cpp11::package("base")["readBin"]; + cpp11::writable::raws ptype((R_xlen_t)0); + cpp11::integers n = cpp11::as_sexp(nbytes); + + cpp11::sexp result = read_bin(connection_sexp_, ptype, n); + + int64_t result_size = cpp11::safe[Rf_xlength](result); + memcpy(out, cpp11::safe[RAW](result), result_size); + return result_size; + } catch (std::exception& e) { + return arrow::Status::IOError(e.what()); + } + } + + arrow::Result> ReadBase(int64_t nbytes) { + arrow::BufferBuilder builder; + RETURN_NOT_OK(builder.Reserve(nbytes)); + + arrow::Result result; + RETURN_NOT_OK(result = ReadBase(nbytes, builder.mutable_data())); + + builder.UnsafeAdvance(result.ValueOrDie()); + return builder.Finish(); + } + + arrow::Status WriteBase(const void* data, int64_t nbytes) { + if (closed()) { + return arrow::Status::IOError("R connection is closed"); + } + + try { + RETURN_NOT_OK(check_thread_is_r_main()); + cpp11::writable::raws data_raw(nbytes); + memcpy(cpp11::safe[RAW](data_raw), data, nbytes); + + cpp11::function write_bin = cpp11::package("base")["writeBin"]; + write_bin(data_raw, connection_sexp_); + return arrow::Status::OK(); + } catch (std::exception& e) { + return arrow::Status::IOError(e.what()); + } + } + + arrow::Status SeekBase(int64_t pos) { + if (closed()) { + return arrow::Status::IOError("R connection is closed"); + } + + try { + RETURN_NOT_OK(check_thread_is_r_main()); + cpp11::package("base")["seek"](connection_sexp_, cpp11::as_sexp(pos)); + return arrow::Status::OK(); + } catch (std::exception& e) { + return arrow::Status::IOError(e.what()); + } + } + + arrow::Status check_thread_is_r_main() { + if (std::this_thread::get_id() != thread_id_) { + return arrow::Status::IOError("Attempt to call into R from a non-R thread"); + } else { + return arrow::Status::OK(); + } + } + + private: + bool closed_; + std::thread::id thread_id_; + + bool check_closed() { + if (closed_) { + return true; + } + + // safer than maybe calling into R and crashing it while checking + if (!check_thread_is_r_main().ok()) { + closed_ = true; + return true; + } + + try { + cpp11::sexp result = cpp11::package("base")["isOpen"](connection_sexp_); + closed_ = !cpp11::as_cpp(result); + } catch (std::exception& e) { + closed_ = true; + } + + return closed_; + } +}; + +class RConnectionInputStream : public virtual arrow::io::InputStream, + public RConnectionFileInterface { + public: + explicit RConnectionInputStream(cpp11::sexp connection_sexp) + : RConnectionFileInterface(connection_sexp) {} + + arrow::Result Read(int64_t nbytes, void* out) { return ReadBase(nbytes, out); } + + arrow::Result> Read(int64_t nbytes) { + return ReadBase(nbytes); + } +}; + +class RConnectionRandomAccessFile : public arrow::io::RandomAccessFile, + public RConnectionFileInterface { + public: + explicit RConnectionRandomAccessFile(cpp11::sexp connection_sexp) + : RConnectionFileInterface(connection_sexp) { + // save the current position to seek back to it + auto current_pos = Tell(); + if (!current_pos.ok()) { + cpp11::stop("Tell() returned an error"); + } + int64_t initial_pos = current_pos.ValueUnsafe(); + + cpp11::package("base")["seek"](connection_sexp_, 0, "end"); + current_pos = Tell(); + if (!current_pos.ok()) { + cpp11::stop("Tell() returned an error"); + } + size_ = current_pos.ValueUnsafe(); + + auto status = Seek(initial_pos); + if (!status.ok()) { + cpp11::stop("Seek() reutrned an error"); + } + } + + arrow::Result GetSize() { return size_; } + + arrow::Status Seek(int64_t pos) { return SeekBase(pos); } + + arrow::Result Read(int64_t nbytes, void* out) { return ReadBase(nbytes, out); } + + arrow::Result> Read(int64_t nbytes) { + return ReadBase(nbytes); + } + + private: + int64_t size_; +}; + +class RConnectionOutputStream : public arrow::io::OutputStream, + public RConnectionFileInterface { + public: + explicit RConnectionOutputStream(cpp11::sexp connection_sexp) + : RConnectionFileInterface(connection_sexp) {} + + arrow::Status Write(const void* data, int64_t nbytes) { + return WriteBase(data, nbytes); + } +}; + +// [[arrow::export]] +std::shared_ptr MakeRConnectionInputStream(cpp11::sexp con) { + return std::make_shared(con); +} + +// [[arrow::export]] +std::shared_ptr MakeRConnectionOutputStream(cpp11::sexp con) { + return std::make_shared(con); +} + +// [[arrow::export]] +std::shared_ptr MakeRConnectionRandomAccessFile( + cpp11::sexp con) { + return std::make_shared(con); +} + +// ------ MakeReencodeInputStream() class RIconvWrapper { public: diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index 08075de8b27..32b7934a98c 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -292,6 +292,17 @@ test_that("more informative error when reading a CSV with headers and schema", { ) }) +test_that("read_csv_arrow() and write_csv_arrow() accept connection objects", { + tf <- tempfile() + on.exit(unlink(tf)) + write_csv_arrow(tibble::tibble(x = 1:5), file(tf)) + expect_identical(read_csv_arrow(tf), tibble::tibble(x = 1:5)) + + # read_csv_arrow() on a connection may error because it can call + # the stream's Read() method from another thread + # expect_identical(read_csv_arrow(file(tf)), read_csv_arrow(tf)) +}) + test_that("CSV reader works on files with non-UTF-8 encoding", { strings <- c("a", "\u00e9", "\U0001f4a9") file_string <- paste0( diff --git a/r/tests/testthat/test-feather.R b/r/tests/testthat/test-feather.R index 80e9d09d8d3..644b64dc3b2 100644 --- a/r/tests/testthat/test-feather.R +++ b/r/tests/testthat/test-feather.R @@ -181,6 +181,17 @@ test_that("read_feather requires RandomAccessFile and errors nicely otherwise (A ) }) +test_that("read_feather() and write_feather() accept connection objects", { + tf <- tempfile() + on.exit(unlink(tf)) + write_feather(tibble::tibble(x = 1:5), file(tf)) + expect_identical(read_feather(tf), tibble::tibble(x = 1:5)) + + # read_feather() on a connection may error because it can call + # the stream's Read() method from another thread + # expect_identical(read_feather(file(feather_file)), read_feather(feather_file)) +}) + test_that("read_feather closes connection to file", { tf <- tempfile() on.exit(unlink(tf)) diff --git a/r/tests/testthat/test-io.R b/r/tests/testthat/test-io.R index 8c1d0b7928f..39cfe6e5e68 100644 --- a/r/tests/testthat/test-io.R +++ b/r/tests/testthat/test-io.R @@ -27,6 +27,102 @@ test_that("RandomAccessFile$ReadMetadata() works for LocalFileSystem", { ) }) +test_that("RConnectionInputStream can read from R connections", { + con <- rawConnection(as.raw(1:100)) + seek(con, 12) + stream <- MakeRConnectionRandomAccessFile(con) + expect_identical(stream$GetSize(), 100L) + expect_identical(stream$tell(), 12L) + + expect_identical(as.raw(stream$ReadAt(50, 50)), as.raw(51:100)) + expect_identical(as.raw(stream$ReadAt(0, 50)), as.raw(1:50)) + stream$close() + expect_error(isOpen(con), "invalid connection") +}) + +test_that("RConnectionRandomAccessFile can read from R connections", { + con <- rawConnection(as.raw(1:100)) + stream <- MakeRConnectionInputStream(con) + + expect_identical(as.raw(stream$Read(50)), as.raw(1:50)) + expect_identical(as.raw(stream$Read(50)), as.raw(51:100)) + stream$close() + expect_error(isOpen(con), "invalid connection") +}) + +test_that("RConnectionOutputStream can write to R connections", { + tf <- tempfile() + on.exit(unlink(tf)) + + con <- file(tf, open = "wb") + stream <- MakeRConnectionOutputStream(con) + stream$write(as.raw(1:50)) + stream$write(as.raw(51:100)) + stream$close() + expect_error(isOpen(con), "invalid connection") + + con <- file(tf, open = "rb") + expect_identical(readBin(con, raw(), 100), as.raw(1:100)) + expect_identical(readBin(con, raw(), 100), raw()) + close(con) +}) + +test_that("make_readable_file() works for non-filesystem URLs", { + skip_if_offline() + + readable_file <- make_readable_file( + "https://github.com/apache/arrow/raw/master/r/inst/v0.7.1.parquet" + ) + expect_r6_class(readable_file, "InputStream") + expect_identical(rawToChar(as.raw(readable_file$Read(3))), "PAR") + readable_file$close() +}) + +test_that("make_readable_file() works for seekable connection objects", { + con <- rawConnection(as.raw(1:100)) + readable_file <- make_readable_file(con) + expect_r6_class(readable_file, "RandomAccessFile") + expect_identical(as.raw(readable_file$Read(100)), as.raw(1:100)) + readable_file$close() +}) + +test_that("make_readable_file() and make_writable_file() open connections", { + tf <- tempfile() + on.exit(unlink(tf)) + + # check a seekable connection + write("abcdefg", tf) + readable_file <- make_readable_file(file(tf)) + expect_r6_class(readable_file, "RandomAccessFile") + expect_identical( + rawToChar(as.raw(readable_file$Read(7))), + "abcdefg" + ) + readable_file$close() + + # check output stream/non-seekable connection + con <- gzfile(tf) + stream <- make_output_stream(con) + stream$write(as.raw(1:100)) + stream$close() + + readable_file <- make_readable_file(gzfile(tf)) + expect_identical( + as.raw(readable_file$Read(100)), + as.raw(1:100) + ) + readable_file$close() +}) + +test_that("make_output_stream() works for connection objects", { + tf <- tempfile() + on.exit(unlink(tf)) + + con <- rawConnection(as.raw(1:100)) + expect_r6_class(make_readable_file(con), "InputStream") + close(con) +}) + test_that("reencoding input stream works for windows-1252", { string <- "province_name\nQu\u00e9bec" bytes_windows1252 <- iconv( diff --git a/r/tests/testthat/test-ipc_stream.R b/r/tests/testthat/test-ipc_stream.R new file mode 100644 index 00000000000..927f14780eb --- /dev/null +++ b/r/tests/testthat/test-ipc_stream.R @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +test_that("read_ipc_stream() and write_ipc_stream() accept connection objects", { + tf <- tempfile() + on.exit(unlink(tf)) + write_ipc_stream(tibble::tibble(x = 1:5), file(tf)) + expect_identical(read_ipc_stream(tf), tibble::tibble(x = 1:5)) + + expect_identical(read_ipc_stream(file(tf)), read_ipc_stream(tf)) +}) diff --git a/r/tests/testthat/test-parquet.R b/r/tests/testthat/test-parquet.R index d4b088928ad..deadb5555d1 100644 --- a/r/tests/testthat/test-parquet.R +++ b/r/tests/testthat/test-parquet.R @@ -199,6 +199,15 @@ test_that("Maps are preserved when writing/reading from Parquet", { expect_equal(df, df_read, ignore_attr = TRUE) }) +test_that("read_parquet() and write_parquet() accept connection objects", { + expect_identical(read_parquet(file(pq_file)), read_parquet(pq_file)) + + tf <- tempfile() + on.exit(unlink(tf)) + write_parquet(tibble::tibble(x = 1:5), file(tf)) + expect_identical(read_parquet(tf), tibble::tibble(x = 1:5)) +}) + test_that("write_parquet() to stream", { df <- tibble::tibble(x = 1:5) tf <- tempfile() @@ -231,6 +240,14 @@ test_that("write_parquet() handles version argument", { }) }) +test_that("ParquetFileReader raises an error for non-RandomAccessFile source", { + skip_if_not_available("gzip") + expect_error( + ParquetFileReader$create(CompressedInputStream$create(pq_file)), + 'file must be a "RandomAccessFile"' + ) +}) + test_that("ParquetFileWriter raises an error for non-OutputStream sink", { sch <- schema(a = float32()) # ARROW-9946 From 230854568c932a0e2db59aa3b5ad58f6b7485d5f Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 8 Apr 2022 09:20:57 -0300 Subject: [PATCH 02/22] start on using safecallintor --- r/src/io.cpp | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/r/src/io.cpp b/r/src/io.cpp index baec087b602..572b6d738a3 100644 --- a/r/src/io.cpp +++ b/r/src/io.cpp @@ -19,6 +19,8 @@ #if defined(ARROW_R_WITH_ARROW) +#include "./safe-call-into-r.h" + #include #include @@ -27,8 +29,6 @@ #include #include -#include - // ------ arrow::io::Readable // [[arrow::export]] @@ -215,8 +215,7 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { public: explicit RConnectionFileInterface(cpp11::sexp connection_sexp) : connection_sexp_(connection_sexp), - closed_(false), - thread_id_(std::this_thread::get_id()) { + closed_(false) { check_closed(); } @@ -225,6 +224,8 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { return arrow::Status::OK(); } + auto result = SafeCallIntoR([&]() {}); + RETURN_NOT_OK(check_thread_is_r_main()); cpp11::package("base")["close"](connection_sexp_); closed_ = true; @@ -312,17 +313,8 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { } } - arrow::Status check_thread_is_r_main() { - if (std::this_thread::get_id() != thread_id_) { - return arrow::Status::IOError("Attempt to call into R from a non-R thread"); - } else { - return arrow::Status::OK(); - } - } - private: bool closed_; - std::thread::id thread_id_; bool check_closed() { if (closed_) { From c47564fb4b3074bdb0f57e3b30374c1a1148df8d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 8 Apr 2022 14:07:12 -0300 Subject: [PATCH 03/22] actually use safecallintor --- r/src/io.cpp | 56 +++++++++++++++++++++++----------------------------- 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/r/src/io.cpp b/r/src/io.cpp index 572b6d738a3..29fb9a68cd2 100644 --- a/r/src/io.cpp +++ b/r/src/io.cpp @@ -214,8 +214,7 @@ void io___BufferOutputStream__Write( class RConnectionFileInterface : public virtual arrow::io::FileInterface { public: explicit RConnectionFileInterface(cpp11::sexp connection_sexp) - : connection_sexp_(connection_sexp), - closed_(false) { + : connection_sexp_(connection_sexp), closed_(false) { check_closed(); } @@ -224,10 +223,12 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { return arrow::Status::OK(); } - auto result = SafeCallIntoR([&]() {}); + auto result = SafeCallIntoR([&]() { + cpp11::package("base")["close"](connection_sexp_); + return true; + }); - RETURN_NOT_OK(check_thread_is_r_main()); - cpp11::package("base")["close"](connection_sexp_); + RETURN_NOT_OK(result); closed_ = true; return arrow::Status::OK(); } @@ -254,8 +255,7 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { return arrow::Status::IOError("R connection is closed"); } - try { - RETURN_NOT_OK(check_thread_is_r_main()); + return SafeCallIntoR([&] { cpp11::function read_bin = cpp11::package("base")["readBin"]; cpp11::writable::raws ptype((R_xlen_t)0); cpp11::integers n = cpp11::as_sexp(nbytes); @@ -265,9 +265,7 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { int64_t result_size = cpp11::safe[Rf_xlength](result); memcpy(out, cpp11::safe[RAW](result), result_size); return result_size; - } catch (std::exception& e) { - return arrow::Status::IOError(e.what()); - } + }); } arrow::Result> ReadBase(int64_t nbytes) { @@ -286,17 +284,16 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { return arrow::Status::IOError("R connection is closed"); } - try { - RETURN_NOT_OK(check_thread_is_r_main()); + auto result = SafeCallIntoR([&]() { cpp11::writable::raws data_raw(nbytes); memcpy(cpp11::safe[RAW](data_raw), data, nbytes); cpp11::function write_bin = cpp11::package("base")["writeBin"]; write_bin(data_raw, connection_sexp_); - return arrow::Status::OK(); - } catch (std::exception& e) { - return arrow::Status::IOError(e.what()); - } + return true; + }); + + return result.status(); } arrow::Status SeekBase(int64_t pos) { @@ -304,13 +301,12 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { return arrow::Status::IOError("R connection is closed"); } - try { - RETURN_NOT_OK(check_thread_is_r_main()); + auto result = SafeCallIntoR([&]() { cpp11::package("base")["seek"](connection_sexp_, cpp11::as_sexp(pos)); - return arrow::Status::OK(); - } catch (std::exception& e) { - return arrow::Status::IOError(e.what()); - } + return true; + }); + + return result.status(); } private: @@ -321,17 +317,15 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { return true; } - // safer than maybe calling into R and crashing it while checking - if (!check_thread_is_r_main().ok()) { - closed_ = true; - return true; - } - - try { + auto is_open_result = SafeCallIntoR([&]() { cpp11::sexp result = cpp11::package("base")["isOpen"](connection_sexp_); - closed_ = !cpp11::as_cpp(result); - } catch (std::exception& e) { + return cpp11::as_cpp(result); + }); + + if (!is_open_result.ok()) { closed_ = true; + } else { + closed_ = !is_open_result.ValueUnsafe(); } return closed_; From 94d40a93015833251a9113862f0d23d20168f6f0 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 11 Apr 2022 08:53:31 -0300 Subject: [PATCH 04/22] uncomment tests that should pass --- r/tests/testthat/test-csv.R | 2 +- r/tests/testthat/test-feather.R | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index 32b7934a98c..e978a59694a 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -300,7 +300,7 @@ test_that("read_csv_arrow() and write_csv_arrow() accept connection objects", { # read_csv_arrow() on a connection may error because it can call # the stream's Read() method from another thread - # expect_identical(read_csv_arrow(file(tf)), read_csv_arrow(tf)) + expect_identical(read_csv_arrow(file(tf)), read_csv_arrow(tf)) }) test_that("CSV reader works on files with non-UTF-8 encoding", { diff --git a/r/tests/testthat/test-feather.R b/r/tests/testthat/test-feather.R index 644b64dc3b2..3055d9b13ca 100644 --- a/r/tests/testthat/test-feather.R +++ b/r/tests/testthat/test-feather.R @@ -189,7 +189,7 @@ test_that("read_feather() and write_feather() accept connection objects", { # read_feather() on a connection may error because it can call # the stream's Read() method from another thread - # expect_identical(read_feather(file(feather_file)), read_feather(feather_file)) + expect_identical(read_feather(file(feather_file)), read_feather(feather_file)) }) test_that("read_feather closes connection to file", { From 4e4dd72092de5a9f7d5f58401da280dd9cdeb52b Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Mon, 11 Apr 2022 14:25:29 -0300 Subject: [PATCH 05/22] use RunWithCapturedR for some specific functions --- r/src/arrowExports.cpp | 4 +-- r/src/csv.cpp | 16 +++++++++- r/src/feather.cpp | 72 ++++++++++++++++++++++++++++++------------ 3 files changed, 69 insertions(+), 23 deletions(-) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index d101647a858..837abe4af7b 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -2814,11 +2814,11 @@ BEGIN_CPP11 END_CPP11 } // feather.cpp -std::shared_ptr ipc___feather___Reader__Read(const std::shared_ptr& reader, SEXP columns); +std::shared_ptr ipc___feather___Reader__Read(const std::shared_ptr& reader, cpp11::sexp columns); extern "C" SEXP _arrow_ipc___feather___Reader__Read(SEXP reader_sexp, SEXP columns_sexp){ BEGIN_CPP11 arrow::r::Input&>::type reader(reader_sexp); - arrow::r::Input::type columns(columns_sexp); + arrow::r::Input::type columns(columns_sexp); return cpp11::as_sexp(ipc___feather___Reader__Read(reader, columns)); END_CPP11 } diff --git a/r/src/csv.cpp b/r/src/csv.cpp index bb901e798d7..b9bd7b394b3 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -19,6 +19,8 @@ #if defined(ARROW_R_WITH_ARROW) +#include "./safe-call-into-r.h" + #include #include #include @@ -162,7 +164,19 @@ std::shared_ptr csv___TableReader__Make( // [[arrow::export]] std::shared_ptr csv___TableReader__Read( const std::shared_ptr& table_reader) { - return ValueOrStop(table_reader->Read()); + std::thread* thread_ptr; + auto result = RunWithCapturedR>([&]() { + auto fut = arrow::Future>::Make(); + + thread_ptr = new std::thread([&] { fut.MarkFinished(table_reader->Read()); }); + + return fut; + }); + + thread_ptr->join(); + delete thread_ptr; + + return ValueOrStop(result); } // [[arrow::export]] diff --git a/r/src/feather.cpp b/r/src/feather.cpp index 1df992baaa7..df69bddd994 100644 --- a/r/src/feather.cpp +++ b/r/src/feather.cpp @@ -18,6 +18,9 @@ #include "./arrow_types.h" #if defined(ARROW_R_WITH_ARROW) + +#include "./safe-call-into-r.h" + #include #include @@ -48,34 +51,63 @@ int ipc___feather___Reader__version( // [[arrow::export]] std::shared_ptr ipc___feather___Reader__Read( - const std::shared_ptr& reader, SEXP columns) { - std::shared_ptr table; - - switch (TYPEOF(columns)) { - case STRSXP: { - R_xlen_t n = XLENGTH(columns); - std::vector names(n); - for (R_xlen_t i = 0; i < n; i++) { - names[i] = CHAR(STRING_ELT(columns, i)); - } - StopIfNotOk(reader->Read(names, &table)); - break; + const std::shared_ptr& reader, cpp11::sexp columns) { + bool use_names = columns != R_NilValue; + std::vector names; + if (use_names) { + cpp11::strings columns_chr(columns); + names.reserve(columns_chr.size()); + for (const auto& name : columns_chr) { + names.push_back(name); } - case NILSXP: - StopIfNotOk(reader->Read(&table)); - break; - default: - cpp11::stop("incompatible column specification"); - break; } - return table; + std::thread* thread_ptr; + auto result = RunWithCapturedR>([&]() { + auto fut = arrow::Future>::Make(); + + thread_ptr = new std::thread([&] { + std::shared_ptr table; + arrow::Status read_result; + if (use_names) { + read_result = reader->Read(names, &table); + } else { + read_result = reader->Read(&table); + } + + if (read_result.ok()) { + fut.MarkFinished(table); + } else { + fut.MarkFinished(read_result); + } + }); + + return fut; + }); + + thread_ptr->join(); + delete thread_ptr; + + return ValueOrStop(result); } // [[arrow::export]] std::shared_ptr ipc___feather___Reader__Open( const std::shared_ptr& stream) { - return ValueOrStop(arrow::ipc::feather::Reader::Open(stream)); + std::thread* thread_ptr; + auto result = RunWithCapturedR>([&]() { + auto fut = arrow::Future>::Make(); + + thread_ptr = new std::thread( + [&] { fut.MarkFinished(arrow::ipc::feather::Reader::Open(stream)); }); + + return fut; + }); + + thread_ptr->join(); + delete thread_ptr; + + return ValueOrStop(result); } // [[arrow::export]] From 37f9376e31d57beca4249923e4da2e5f501e1dac Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 12 Apr 2022 08:11:09 -0300 Subject: [PATCH 06/22] clarify comment --- r/R/io.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/r/R/io.R b/r/R/io.R index 0587e520ec9..db6a8cd5d33 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -277,9 +277,9 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem open(file, "rb") } - # isSeekable() is not sufficient to check for seekability - # because we rely on seek(whence = "end") to get the size - # of the stream and a gzfile() is "seekable". + # Try to create a RandomAccessFile first because some readers need this + # (e.g., feather, parquet) but fall back on an InputStream for the readers + # that don't. file <- tryCatch( MakeRConnectionRandomAccessFile(file), error = function(e) MakeRConnectionInputStream(file) From 395ee506f4073354f8692386f845a0cdce6468be Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 12 Apr 2022 08:27:01 -0300 Subject: [PATCH 07/22] use the io thread pool for RunWithCapturedR in the feather reader --- r/src/feather.cpp | 33 ++++++++++----------------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/r/src/feather.cpp b/r/src/feather.cpp index df69bddd994..2f4ab0b0382 100644 --- a/r/src/feather.cpp +++ b/r/src/feather.cpp @@ -62,11 +62,10 @@ std::shared_ptr ipc___feather___Reader__Read( } } - std::thread* thread_ptr; - auto result = RunWithCapturedR>([&]() { - auto fut = arrow::Future>::Make(); + const auto& io_context = arrow::io::default_io_context(); - thread_ptr = new std::thread([&] { + auto result = RunWithCapturedR>([&]() { + return DeferNotOk(io_context.executor()->Submit([&]() { std::shared_ptr table; arrow::Status read_result; if (use_names) { @@ -76,37 +75,25 @@ std::shared_ptr ipc___feather___Reader__Read( } if (read_result.ok()) { - fut.MarkFinished(table); + return arrow::Result>(table); } else { - fut.MarkFinished(read_result); + return arrow::Result>(read_result); } - }); - - return fut; + })); }); - thread_ptr->join(); - delete thread_ptr; - return ValueOrStop(result); } // [[arrow::export]] std::shared_ptr ipc___feather___Reader__Open( const std::shared_ptr& stream) { - std::thread* thread_ptr; + const auto& io_context = arrow::io::default_io_context(); auto result = RunWithCapturedR>([&]() { - auto fut = arrow::Future>::Make(); - - thread_ptr = new std::thread( - [&] { fut.MarkFinished(arrow::ipc::feather::Reader::Open(stream)); }); - - return fut; + return DeferNotOk(io_context.executor()->Submit([&]() { + return arrow::ipc::feather::Reader::Open(stream); + })); }); - - thread_ptr->join(); - delete thread_ptr; - return ValueOrStop(result); } From f26a44929d7f79174f834a248f91df3e3fc3cf92 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 12 Apr 2022 08:28:52 -0300 Subject: [PATCH 08/22] use TableReader->ReadAsync() for csv reader --- r/src/csv.cpp | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/r/src/csv.cpp b/r/src/csv.cpp index b9bd7b394b3..761baa1da7a 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -164,18 +164,10 @@ std::shared_ptr csv___TableReader__Make( // [[arrow::export]] std::shared_ptr csv___TableReader__Read( const std::shared_ptr& table_reader) { - std::thread* thread_ptr; auto result = RunWithCapturedR>([&]() { - auto fut = arrow::Future>::Make(); - - thread_ptr = new std::thread([&] { fut.MarkFinished(table_reader->Read()); }); - - return fut; + return table_reader->ReadAsync(); }); - thread_ptr->join(); - delete thread_ptr; - return ValueOrStop(result); } From 4c1dee171a096582bc91361a6d2e3ef6ddcd3899 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 12 Apr 2022 08:43:34 -0300 Subject: [PATCH 09/22] more realistic test for connection objects with readers --- r/tests/testthat/test-csv.R | 14 ++++++++++---- r/tests/testthat/test-feather.R | 16 +++++++++++----- r/tests/testthat/test-parquet.R | 16 ++++++++++++---- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index e978a59694a..5e17ef0b1c2 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -295,11 +295,17 @@ test_that("more informative error when reading a CSV with headers and schema", { test_that("read_csv_arrow() and write_csv_arrow() accept connection objects", { tf <- tempfile() on.exit(unlink(tf)) - write_csv_arrow(tibble::tibble(x = 1:5), file(tf)) - expect_identical(read_csv_arrow(tf), tibble::tibble(x = 1:5)) - # read_csv_arrow() on a connection may error because it can call - # the stream's Read() method from another thread + # make this big enough that we might expose concurrency problems, + # but not so big that it slows down the tests + test_tbl <- tibble::tibble( + x = 1:1e4, + y = vapply(x, rlang::hash, character(1), USE.NAMES = FALSE), + z = vapply(y, rlang::hash, character(1), USE.NAMES = FALSE) + ) + + write_csv_arrow(test_tbl, file(tf)) + expect_identical(read_csv_arrow(tf), test_tbl) expect_identical(read_csv_arrow(file(tf)), read_csv_arrow(tf)) }) diff --git a/r/tests/testthat/test-feather.R b/r/tests/testthat/test-feather.R index 3055d9b13ca..035acd2b078 100644 --- a/r/tests/testthat/test-feather.R +++ b/r/tests/testthat/test-feather.R @@ -184,12 +184,18 @@ test_that("read_feather requires RandomAccessFile and errors nicely otherwise (A test_that("read_feather() and write_feather() accept connection objects", { tf <- tempfile() on.exit(unlink(tf)) - write_feather(tibble::tibble(x = 1:5), file(tf)) - expect_identical(read_feather(tf), tibble::tibble(x = 1:5)) - # read_feather() on a connection may error because it can call - # the stream's Read() method from another thread - expect_identical(read_feather(file(feather_file)), read_feather(feather_file)) + # make this big enough that we might expose concurrency problems, + # but not so big that it slows down the tests + test_tbl <- tibble::tibble( + x = 1:1e4, + y = vapply(x, rlang::hash, character(1), USE.NAMES = FALSE), + z = vapply(y, rlang::hash, character(1), USE.NAMES = FALSE) + ) + + write_feather(test_tbl, file(tf)) + expect_identical(read_feather(tf), test_tbl) + expect_identical(read_feather(file(tf)), read_feather(tf)) }) test_that("read_feather closes connection to file", { diff --git a/r/tests/testthat/test-parquet.R b/r/tests/testthat/test-parquet.R index deadb5555d1..3f43c3d8941 100644 --- a/r/tests/testthat/test-parquet.R +++ b/r/tests/testthat/test-parquet.R @@ -200,12 +200,20 @@ test_that("Maps are preserved when writing/reading from Parquet", { }) test_that("read_parquet() and write_parquet() accept connection objects", { - expect_identical(read_parquet(file(pq_file)), read_parquet(pq_file)) - tf <- tempfile() on.exit(unlink(tf)) - write_parquet(tibble::tibble(x = 1:5), file(tf)) - expect_identical(read_parquet(tf), tibble::tibble(x = 1:5)) + + # make this big enough that we might expose concurrency problems, + # but not so big that it slows down the tests + test_tbl <- tibble::tibble( + x = 1:1e4, + y = vapply(x, rlang::hash, character(1), USE.NAMES = FALSE), + z = vapply(y, rlang::hash, character(1), USE.NAMES = FALSE) + ) + + write_parquet(test_tbl, file(tf)) + expect_identical(read_parquet(tf), test_tbl) + expect_identical(read_parquet(file(tf)), read_parquet(tf)) }) test_that("write_parquet() to stream", { From 7e1593f701611b30f6998d46ea0f1507360d04b2 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 12 Apr 2022 08:48:00 -0300 Subject: [PATCH 10/22] clang-format --- r/src/csv.cpp | 5 ++--- r/src/feather.cpp | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 761baa1da7a..bb2e59e3551 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -164,9 +164,8 @@ std::shared_ptr csv___TableReader__Make( // [[arrow::export]] std::shared_ptr csv___TableReader__Read( const std::shared_ptr& table_reader) { - auto result = RunWithCapturedR>([&]() { - return table_reader->ReadAsync(); - }); + auto result = RunWithCapturedR>( + [&]() { return table_reader->ReadAsync(); }); return ValueOrStop(result); } diff --git a/r/src/feather.cpp b/r/src/feather.cpp index 2f4ab0b0382..1c986c282d0 100644 --- a/r/src/feather.cpp +++ b/r/src/feather.cpp @@ -90,9 +90,8 @@ std::shared_ptr ipc___feather___Reader__Open( const std::shared_ptr& stream) { const auto& io_context = arrow::io::default_io_context(); auto result = RunWithCapturedR>([&]() { - return DeferNotOk(io_context.executor()->Submit([&]() { - return arrow::ipc::feather::Reader::Open(stream); - })); + return DeferNotOk(io_context.executor()->Submit( + [&]() { return arrow::ipc::feather::Reader::Open(stream); })); }); return ValueOrStop(result); } From 7e745cc37acb6b4fd64c3ba1af1c441fb3eee3b2 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 12 Apr 2022 10:10:31 -0300 Subject: [PATCH 11/22] Update r/src/io.cpp Co-authored-by: Weston Pace --- r/src/io.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/r/src/io.cpp b/r/src/io.cpp index 29fb9a68cd2..c7f7af575b7 100644 --- a/r/src/io.cpp +++ b/r/src/io.cpp @@ -272,10 +272,8 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { arrow::BufferBuilder builder; RETURN_NOT_OK(builder.Reserve(nbytes)); - arrow::Result result; - RETURN_NOT_OK(result = ReadBase(nbytes, builder.mutable_data())); - - builder.UnsafeAdvance(result.ValueOrDie()); + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadBase(nbytes, builder.mutable_data())); + builder.UnsafeAdvance(bytes_read); return builder.Finish(); } From db9cecdfbc31543fd497af9d94acd9778d136ad4 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 12 Apr 2022 10:24:11 -0300 Subject: [PATCH 12/22] implement and use SafeCallIntoRVoid() --- r/src/io.cpp | 19 +++++-------------- r/src/safe-call-into-r.h | 8 ++++++++ 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/r/src/io.cpp b/r/src/io.cpp index c7f7af575b7..f637948bcad 100644 --- a/r/src/io.cpp +++ b/r/src/io.cpp @@ -223,14 +223,11 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { return arrow::Status::OK(); } - auto result = SafeCallIntoR([&]() { - cpp11::package("base")["close"](connection_sexp_); - return true; - }); + auto result = + SafeCallIntoRVoid([&]() { cpp11::package("base")["close"](connection_sexp_); }); - RETURN_NOT_OK(result); closed_ = true; - return arrow::Status::OK(); + return result; } arrow::Result Tell() const { @@ -282,16 +279,13 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { return arrow::Status::IOError("R connection is closed"); } - auto result = SafeCallIntoR([&]() { + return SafeCallIntoRVoid([&]() { cpp11::writable::raws data_raw(nbytes); memcpy(cpp11::safe[RAW](data_raw), data, nbytes); cpp11::function write_bin = cpp11::package("base")["writeBin"]; write_bin(data_raw, connection_sexp_); - return true; }); - - return result.status(); } arrow::Status SeekBase(int64_t pos) { @@ -299,12 +293,9 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { return arrow::Status::IOError("R connection is closed"); } - auto result = SafeCallIntoR([&]() { + return SafeCallIntoRVoid([&]() { cpp11::package("base")["seek"](connection_sexp_, cpp11::as_sexp(pos)); - return true; }); - - return result.status(); } private: diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 1a27507b788..9ac7230258f 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -122,6 +122,14 @@ arrow::Result SafeCallIntoR(std::function fun) { return future.result(); } +static inline arrow::Status SafeCallIntoRVoid(std::function fun) { + arrow::Future future = SafeCallIntoRAsync([&fun]() { + fun(); + return true; + }); + return future.status(); +} + template arrow::Result RunWithCapturedR(std::function()> make_arrow_call) { if (GetMainRThread().Executor() != nullptr) { From 1751eaa53ce90e67c14759a4ef46b48210c3aca2 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 12 Apr 2022 11:22:46 -0300 Subject: [PATCH 13/22] improve SafeCallIntoR usage in io.cpp --- r/src/io.cpp | 15 ++++++++------- r/src/safe-call-into-r.h | 4 ++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/r/src/io.cpp b/r/src/io.cpp index f637948bcad..36d8dfa5375 100644 --- a/r/src/io.cpp +++ b/r/src/io.cpp @@ -223,11 +223,10 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { return arrow::Status::OK(); } - auto result = - SafeCallIntoRVoid([&]() { cpp11::package("base")["close"](connection_sexp_); }); - closed_ = true; - return result; + + return SafeCallIntoRVoid( + [&]() { cpp11::package("base")["close"](connection_sexp_); }); } arrow::Result Tell() const { @@ -235,8 +234,10 @@ class RConnectionFileInterface : public virtual arrow::io::FileInterface { return arrow::Status::IOError("R connection is closed"); } - cpp11::sexp result = cpp11::package("base")["seek"](connection_sexp_); - return cpp11::as_cpp(result); + return SafeCallIntoR([&]() { + cpp11::sexp result = cpp11::package("base")["seek"](connection_sexp_); + return cpp11::as_cpp(result); + }); } bool closed() const { return closed_; } @@ -355,7 +356,7 @@ class RConnectionRandomAccessFile : public arrow::io::RandomAccessFile, auto status = Seek(initial_pos); if (!status.ok()) { - cpp11::stop("Seek() reutrned an error"); + cpp11::stop("Seek() returned an error"); } } diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 9ac7230258f..8bff153da62 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -86,7 +86,7 @@ MainRThread& GetMainRThread(); // a SEXP (use cpp11::as_cpp to convert it to a C++ type inside // `fun`). template -arrow::Future SafeCallIntoRAsync(std::function fun) { +arrow::Future SafeCallIntoRAsync(std::function(void)> fun) { MainRThread& main_r_thread = GetMainRThread(); if (main_r_thread.IsMainThread()) { // If we're on the main thread, run the task immediately and let @@ -104,7 +104,7 @@ arrow::Future SafeCallIntoRAsync(std::function fun) { } try { - return arrow::Result(fun()); + return fun(); } catch (cpp11::unwind_exception& e) { GetMainRThread().SetError(e.token); return arrow::Result(arrow::Status::UnknownError("R code execution error")); From d4e24b10f97917e2c18d43db2b103ba7221ffe7c Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 12 Apr 2022 11:45:06 -0300 Subject: [PATCH 14/22] also use bigger table for ipc stream test --- r/tests/testthat/test-ipc_stream.R | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/r/tests/testthat/test-ipc_stream.R b/r/tests/testthat/test-ipc_stream.R index 927f14780eb..905a22f679c 100644 --- a/r/tests/testthat/test-ipc_stream.R +++ b/r/tests/testthat/test-ipc_stream.R @@ -19,8 +19,14 @@ test_that("read_ipc_stream() and write_ipc_stream() accept connection objects", { tf <- tempfile() on.exit(unlink(tf)) - write_ipc_stream(tibble::tibble(x = 1:5), file(tf)) - expect_identical(read_ipc_stream(tf), tibble::tibble(x = 1:5)) + test_tbl <- tibble::tibble( + x = 1:1e4, + y = vapply(x, rlang::hash, character(1), USE.NAMES = FALSE), + z = vapply(y, rlang::hash, character(1), USE.NAMES = FALSE) + ) + + write_ipc_stream(test_tbl, file(tf)) + expect_identical(read_ipc_stream(tf), test_tbl) expect_identical(read_ipc_stream(file(tf)), read_ipc_stream(tf)) }) From 29828d11fb0b06fcebe9eecc7264cc3563f4709d Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Tue, 12 Apr 2022 15:07:25 -0300 Subject: [PATCH 15/22] complete thought on comment --- r/R/io.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/R/io.R b/r/R/io.R index db6a8cd5d33..5f332ad8dda 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -279,7 +279,7 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem # Try to create a RandomAccessFile first because some readers need this # (e.g., feather, parquet) but fall back on an InputStream for the readers - # that don't. + # that don't (e.g., IPC, CSV) file <- tryCatch( MakeRConnectionRandomAccessFile(file), error = function(e) MakeRConnectionInputStream(file) From 4f136b25cb0531f64adf9a8fe5e47a5bbb3a9b39 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 14 Apr 2022 09:05:03 -0300 Subject: [PATCH 16/22] try the other async trick for the csv reader to see if it works on windows --- r/src/csv.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/r/src/csv.cpp b/r/src/csv.cpp index bb2e59e3551..78f5eb6d7b6 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -164,9 +164,11 @@ std::shared_ptr csv___TableReader__Make( // [[arrow::export]] std::shared_ptr csv___TableReader__Read( const std::shared_ptr& table_reader) { - auto result = RunWithCapturedR>( - [&]() { return table_reader->ReadAsync(); }); - + const auto& io_context = arrow::io::default_io_context(); + auto result = RunWithCapturedR>([&]() { + return DeferNotOk( + io_context.executor()->Submit([&]() { return table_reader->Read(); })); + }); return ValueOrStop(result); } From 220b79c1d199d7413690cf4128e5fb7c0b925b7a Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 14 Apr 2022 10:33:04 -0300 Subject: [PATCH 17/22] don't use RunWithCapturedR for feather reading on old windows --- r/R/arrowExports.R | 8 ++--- r/R/feather.R | 4 +-- r/src/arrowExports.cpp | 18 +++++----- r/src/feather.cpp | 61 +++++++++++++++++++-------------- r/tests/testthat/test-feather.R | 3 ++ 5 files changed, 54 insertions(+), 40 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 81c29a3da04..6bf9a75d0fe 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1112,12 +1112,12 @@ ipc___feather___Reader__version <- function(reader) { .Call(`_arrow_ipc___feather___Reader__version`, reader) } -ipc___feather___Reader__Read <- function(reader, columns) { - .Call(`_arrow_ipc___feather___Reader__Read`, reader, columns) +ipc___feather___Reader__Read <- function(reader, columns, on_old_windows) { + .Call(`_arrow_ipc___feather___Reader__Read`, reader, columns, on_old_windows) } -ipc___feather___Reader__Open <- function(stream) { - .Call(`_arrow_ipc___feather___Reader__Open`, stream) +ipc___feather___Reader__Open <- function(stream, on_old_windows) { + .Call(`_arrow_ipc___feather___Reader__Open`, stream, on_old_windows) } ipc___feather___Reader__schema <- function(reader) { diff --git a/r/R/feather.R b/r/R/feather.R index 70a270bbe02..6065c285e8d 100644 --- a/r/R/feather.R +++ b/r/R/feather.R @@ -194,7 +194,7 @@ FeatherReader <- R6Class("FeatherReader", inherit = ArrowObject, public = list( Read = function(columns) { - ipc___feather___Reader__Read(self, columns) + ipc___feather___Reader__Read(self, columns, on_old_windows()) }, print = function(...) { cat("FeatherReader:\n") @@ -215,5 +215,5 @@ names.FeatherReader <- function(x) x$column_names FeatherReader$create <- function(file) { assert_is(file, "RandomAccessFile") - ipc___feather___Reader__Open(file) + ipc___feather___Reader__Open(file, on_old_windows()) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 837abe4af7b..760b71a5be3 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -2814,20 +2814,22 @@ BEGIN_CPP11 END_CPP11 } // feather.cpp -std::shared_ptr ipc___feather___Reader__Read(const std::shared_ptr& reader, cpp11::sexp columns); -extern "C" SEXP _arrow_ipc___feather___Reader__Read(SEXP reader_sexp, SEXP columns_sexp){ +std::shared_ptr ipc___feather___Reader__Read(const std::shared_ptr& reader, cpp11::sexp columns, bool on_old_windows); +extern "C" SEXP _arrow_ipc___feather___Reader__Read(SEXP reader_sexp, SEXP columns_sexp, SEXP on_old_windows_sexp){ BEGIN_CPP11 arrow::r::Input&>::type reader(reader_sexp); arrow::r::Input::type columns(columns_sexp); - return cpp11::as_sexp(ipc___feather___Reader__Read(reader, columns)); + arrow::r::Input::type on_old_windows(on_old_windows_sexp); + return cpp11::as_sexp(ipc___feather___Reader__Read(reader, columns, on_old_windows)); END_CPP11 } // feather.cpp -std::shared_ptr ipc___feather___Reader__Open(const std::shared_ptr& stream); -extern "C" SEXP _arrow_ipc___feather___Reader__Open(SEXP stream_sexp){ +std::shared_ptr ipc___feather___Reader__Open(const std::shared_ptr& stream, bool on_old_windows); +extern "C" SEXP _arrow_ipc___feather___Reader__Open(SEXP stream_sexp, SEXP on_old_windows_sexp){ BEGIN_CPP11 arrow::r::Input&>::type stream(stream_sexp); - return cpp11::as_sexp(ipc___feather___Reader__Open(stream)); + arrow::r::Input::type on_old_windows(on_old_windows_sexp); + return cpp11::as_sexp(ipc___feather___Reader__Open(stream, on_old_windows)); END_CPP11 } // feather.cpp @@ -5396,8 +5398,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_arrow__UnregisterRExtensionType", (DL_FUNC) &_arrow_arrow__UnregisterRExtensionType, 1}, { "_arrow_ipc___WriteFeather__Table", (DL_FUNC) &_arrow_ipc___WriteFeather__Table, 6}, { "_arrow_ipc___feather___Reader__version", (DL_FUNC) &_arrow_ipc___feather___Reader__version, 1}, - { "_arrow_ipc___feather___Reader__Read", (DL_FUNC) &_arrow_ipc___feather___Reader__Read, 2}, - { "_arrow_ipc___feather___Reader__Open", (DL_FUNC) &_arrow_ipc___feather___Reader__Open, 1}, + { "_arrow_ipc___feather___Reader__Read", (DL_FUNC) &_arrow_ipc___feather___Reader__Read, 3}, + { "_arrow_ipc___feather___Reader__Open", (DL_FUNC) &_arrow_ipc___feather___Reader__Open, 2}, { "_arrow_ipc___feather___Reader__schema", (DL_FUNC) &_arrow_ipc___feather___Reader__schema, 1}, { "_arrow_Field__initialize", (DL_FUNC) &_arrow_Field__initialize, 3}, { "_arrow_Field__ToString", (DL_FUNC) &_arrow_Field__ToString, 1}, diff --git a/r/src/feather.cpp b/r/src/feather.cpp index 1c986c282d0..1bb42cf7faf 100644 --- a/r/src/feather.cpp +++ b/r/src/feather.cpp @@ -51,7 +51,8 @@ int ipc___feather___Reader__version( // [[arrow::export]] std::shared_ptr ipc___feather___Reader__Read( - const std::shared_ptr& reader, cpp11::sexp columns) { + const std::shared_ptr& reader, cpp11::sexp columns, + bool on_old_windows) { bool use_names = columns != R_NilValue; std::vector names; if (use_names) { @@ -62,38 +63,46 @@ std::shared_ptr ipc___feather___Reader__Read( } } - const auto& io_context = arrow::io::default_io_context(); - - auto result = RunWithCapturedR>([&]() { - return DeferNotOk(io_context.executor()->Submit([&]() { - std::shared_ptr table; - arrow::Status read_result; - if (use_names) { - read_result = reader->Read(names, &table); - } else { - read_result = reader->Read(&table); - } + auto read_table = [&]() { + std::shared_ptr table; + arrow::Status read_result; + if (use_names) { + read_result = reader->Read(names, &table); + } else { + read_result = reader->Read(&table); + } - if (read_result.ok()) { - return arrow::Result>(table); - } else { - return arrow::Result>(read_result); - } - })); - }); + if (read_result.ok()) { + return arrow::Result>(table); + } else { + return arrow::Result>(read_result); + } + }; + if (!on_old_windows) { + const auto& io_context = arrow::io::default_io_context(); + auto result = RunWithCapturedR>([&]() { + return DeferNotOk(io_context.executor()->Submit(read_table)); + }); return ValueOrStop(result); + } else { + return ValueOrStop(read_table()); + } } // [[arrow::export]] std::shared_ptr ipc___feather___Reader__Open( - const std::shared_ptr& stream) { - const auto& io_context = arrow::io::default_io_context(); - auto result = RunWithCapturedR>([&]() { - return DeferNotOk(io_context.executor()->Submit( - [&]() { return arrow::ipc::feather::Reader::Open(stream); })); - }); - return ValueOrStop(result); + const std::shared_ptr& stream, bool on_old_windows) { + if (!on_old_windows) { + const auto& io_context = arrow::io::default_io_context(); + auto result = RunWithCapturedR>([&]() { + return DeferNotOk(io_context.executor()->Submit( + [&]() { return arrow::ipc::feather::Reader::Open(stream); })); + }); + return ValueOrStop(result); + } else { + return ValueOrStop(arrow::ipc::feather::Reader::Open(stream)); + } } // [[arrow::export]] diff --git a/r/tests/testthat/test-feather.R b/r/tests/testthat/test-feather.R index 035acd2b078..c6b536a3ffd 100644 --- a/r/tests/testthat/test-feather.R +++ b/r/tests/testthat/test-feather.R @@ -182,6 +182,9 @@ test_that("read_feather requires RandomAccessFile and errors nicely otherwise (A }) test_that("read_feather() and write_feather() accept connection objects", { + # connection object don't work on Windows i386 before R 4.0 + skip_if(on_old_windows()) + tf <- tempfile() on.exit(unlink(tf)) From 4acdbf4d560dac054a8e85f96cf43e4c4d617884 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 14 Apr 2022 10:33:32 -0300 Subject: [PATCH 18/22] clang-format --- r/src/feather.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/r/src/feather.cpp b/r/src/feather.cpp index 1bb42cf7faf..f48a1b96bca 100644 --- a/r/src/feather.cpp +++ b/r/src/feather.cpp @@ -81,10 +81,9 @@ std::shared_ptr ipc___feather___Reader__Read( if (!on_old_windows) { const auto& io_context = arrow::io::default_io_context(); - auto result = RunWithCapturedR>([&]() { - return DeferNotOk(io_context.executor()->Submit(read_table)); - }); - return ValueOrStop(result); + auto result = RunWithCapturedR>( + [&]() { return DeferNotOk(io_context.executor()->Submit(read_table)); }); + return ValueOrStop(result); } else { return ValueOrStop(read_table()); } From 76ae1ab0aca93aeb5f2f05736752d4ec4ce9a16c Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Thu, 21 Apr 2022 13:50:01 -0300 Subject: [PATCH 19/22] try to avoid segfault on R 3.4 --- r/src/safe-call-into-r.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 8bff153da62..15799c8248d 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -132,6 +132,12 @@ static inline arrow::Status SafeCallIntoRVoid(std::function fun) { template arrow::Result RunWithCapturedR(std::function()> make_arrow_call) { + // Unwind protection was added in R 3.5 and some calls here crash R + // in older versions (ARROW-16201). Rather than crash, just return an error result + // when this is attempted. +#if !defined(HAS_UNWIND_PROTECT) + return arrow::Status::NotImplemented("RunWithCapturedR() without UnwindProtect"); +#else if (GetMainRThread().Executor() != nullptr) { return arrow::Status::AlreadyExists("Attempt to use more than one R Executor()"); } @@ -148,6 +154,7 @@ arrow::Result RunWithCapturedR(std::function()> make_arrow_c GetMainRThread().ClearError(); return result; +#endif } #endif From eac17c587b5bc755a1ffe12465602832de26decf Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 22 Apr 2022 09:22:36 -0300 Subject: [PATCH 20/22] skip one more SafeCallIntoR test on R 3.4 --- r/tests/testthat/test-safe-call-into-r.R | 1 + 1 file changed, 1 insertion(+) diff --git a/r/tests/testthat/test-safe-call-into-r.R b/r/tests/testthat/test-safe-call-into-r.R index 55cb68abdd3..ab69c339c5c 100644 --- a/r/tests/testthat/test-safe-call-into-r.R +++ b/r/tests/testthat/test-safe-call-into-r.R @@ -32,6 +32,7 @@ test_that("SafeCallIntoR works from the main R thread", { }) test_that("SafeCallIntoR works within RunWithCapturedR", { + skip_if_r_version("3.4.4") skip_on_cran() expect_identical( From 13ddd2074162650bddeb4c4cc91d4073caf9660e Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 22 Apr 2022 09:44:05 -0300 Subject: [PATCH 21/22] maybe pass CMD check on R 3.4 --- r/src/csv.cpp | 4 ++++ r/src/feather.cpp | 8 ++++++++ r/src/safe-call-into-r.h | 12 ++++++++---- r/tests/testthat/test-csv.R | 4 ++++ r/tests/testthat/test-feather.R | 3 +++ 5 files changed, 27 insertions(+), 4 deletions(-) diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 78f5eb6d7b6..4f09aa12f69 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -164,12 +164,16 @@ std::shared_ptr csv___TableReader__Make( // [[arrow::export]] std::shared_ptr csv___TableReader__Read( const std::shared_ptr& table_reader) { +#if !defined(HAS_SAFE_CALL_INTO_R) + return table_reader->Read(); +#else const auto& io_context = arrow::io::default_io_context(); auto result = RunWithCapturedR>([&]() { return DeferNotOk( io_context.executor()->Submit([&]() { return table_reader->Read(); })); }); return ValueOrStop(result); +#endif } // [[arrow::export]] diff --git a/r/src/feather.cpp b/r/src/feather.cpp index f48a1b96bca..ab66c4bb32c 100644 --- a/r/src/feather.cpp +++ b/r/src/feather.cpp @@ -79,6 +79,9 @@ std::shared_ptr ipc___feather___Reader__Read( } }; +#if !defined(HAS_SAFE_CALL_INTO_R) + return ValueOrStop(read_table()); +#else if (!on_old_windows) { const auto& io_context = arrow::io::default_io_context(); auto result = RunWithCapturedR>( @@ -87,11 +90,15 @@ std::shared_ptr ipc___feather___Reader__Read( } else { return ValueOrStop(read_table()); } +#endif } // [[arrow::export]] std::shared_ptr ipc___feather___Reader__Open( const std::shared_ptr& stream, bool on_old_windows) { +#if !defined(HAS_SAFE_CALL_INTO_R) + return ValueOrStop(arrow::ipc::feather::Reader::Open(stream)); +#else if (!on_old_windows) { const auto& io_context = arrow::io::default_io_context(); auto result = RunWithCapturedR>([&]() { @@ -102,6 +109,7 @@ std::shared_ptr ipc___feather___Reader__Open( } else { return ValueOrStop(arrow::ipc::feather::Reader::Open(stream)); } +#endif } // [[arrow::export]] diff --git a/r/src/safe-call-into-r.h b/r/src/safe-call-into-r.h index 15799c8248d..0555628d7d5 100644 --- a/r/src/safe-call-into-r.h +++ b/r/src/safe-call-into-r.h @@ -26,6 +26,13 @@ #include #include +// Unwind protection was added in R 3.5 and some calls here use it +// and crash R in older versions (ARROW-16201). We use this define +// to make sure we don't crash on R 3.4 and lower. +#if defined(HAS_UNWIND_PROTECT) +#define HAS_SAFE_CALL_INTO_R +#endif + // The MainRThread class keeps track of the thread on which it is safe // to call the R API to facilitate its safe use (or erroring // if it is not safe). The MainRThread singleton can be accessed from @@ -132,10 +139,7 @@ static inline arrow::Status SafeCallIntoRVoid(std::function fun) { template arrow::Result RunWithCapturedR(std::function()> make_arrow_call) { - // Unwind protection was added in R 3.5 and some calls here crash R - // in older versions (ARROW-16201). Rather than crash, just return an error result - // when this is attempted. -#if !defined(HAS_UNWIND_PROTECT) +#if !defined(HAS_SAFE_CALL_INTO_R) return arrow::Status::NotImplemented("RunWithCapturedR() without UnwindProtect"); #else if (GetMainRThread().Executor() != nullptr) { diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index 5e17ef0b1c2..631e75fd74a 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -293,6 +293,10 @@ test_that("more informative error when reading a CSV with headers and schema", { }) test_that("read_csv_arrow() and write_csv_arrow() accept connection objects", { + # connections with csv need RunWithCapturedR, which is not available + # in R <= 3.4.4 + skip_if_r_version("3.4.4") + tf <- tempfile() on.exit(unlink(tf)) diff --git a/r/tests/testthat/test-feather.R b/r/tests/testthat/test-feather.R index c6b536a3ffd..65fb2f03759 100644 --- a/r/tests/testthat/test-feather.R +++ b/r/tests/testthat/test-feather.R @@ -184,6 +184,9 @@ test_that("read_feather requires RandomAccessFile and errors nicely otherwise (A test_that("read_feather() and write_feather() accept connection objects", { # connection object don't work on Windows i386 before R 4.0 skip_if(on_old_windows()) + # connections with feather need RunWithCapturedR, which is not available + # in R <= 3.4.4 + skip_if_r_version("3.4.4") tf <- tempfile() on.exit(unlink(tf)) From 82e6b6d364f15af20dbdead17e767f0fe927a507 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 22 Apr 2022 11:01:05 -0300 Subject: [PATCH 22/22] fix csv reading on R 3.4 --- r/src/csv.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 4f09aa12f69..2902462b50e 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -165,7 +165,7 @@ std::shared_ptr csv___TableReader__Make( std::shared_ptr csv___TableReader__Read( const std::shared_ptr& table_reader) { #if !defined(HAS_SAFE_CALL_INTO_R) - return table_reader->Read(); + return ValueOrStop(table_reader->Read()); #else const auto& io_context = arrow::io::default_io_context(); auto result = RunWithCapturedR>([&]() {