From dd7e72e22fff121334a4b3c49f863b9f618f817b Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Sat, 11 Apr 2020 14:10:35 -0700 Subject: [PATCH 01/10] Set up machinery for building s3 bindings if ARROW_S3 is on --- r/R/arrowExports.R | 8 +++++ r/R/filesystem.R | 13 ++++++++ r/configure | 6 ++++ r/data-raw/codegen.R | 50 ++++++++++++++++++++---------- r/src/arrowExports.cpp | 42 +++++++++++++++++++++++++ r/src/arrow_types.h | 1 + r/src/filesystem.cpp | 15 +++++++++ r/tests/testthat/helper-skip.R | 5 +-- r/tests/testthat/test-filesystem.R | 6 ++++ 9 files changed, 128 insertions(+), 18 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index c7407b305a0..2bcc14970e8 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -880,6 +880,14 @@ fs___SubTreeFileSystem__create <- function(base_path, base_fs){ .Call(`_arrow_fs___SubTreeFileSystem__create` , base_path, base_fs) } +fs___EnsureS3Initialized <- function(){ + invisible(.Call(`_arrow_fs___EnsureS3Initialized` )) +} + +fs___S3FileSystem__create <- function(){ + .Call(`_arrow_fs___S3FileSystem__create` ) +} + io___Readable__Read <- function(x, nbytes){ .Call(`_arrow_io___Readable__Read` , x, nbytes) } diff --git a/r/R/filesystem.R b/r/R/filesystem.R index 939764b80ec..f2d5294f189 100644 --- a/r/R/filesystem.R +++ b/r/R/filesystem.R @@ -236,6 +236,19 @@ LocalFileSystem$create <- function() { shared_ptr(LocalFileSystem, fs___LocalFileSystem__create()) } +#' @usage NULL +#' @format NULL +#' @rdname FileSystem +#' @export +S3FileSystem <- R6Class("S3FileSystem", inherit = FileSystem) +S3FileSystem$create <- function() { + fs___EnsureS3Initialized() + shared_ptr(S3FileSystem, fs___S3FileSystem__create()) +} + +arrow_with_s3 <- function() { + .Call(`_s3_available`) +} #' @usage NULL #' @format NULL diff --git a/r/configure b/r/configure index a8dedcb2653..3afbb28d1a2 100755 --- a/r/configure +++ b/r/configure @@ -159,6 +159,12 @@ echo "#include $PKG_TEST_HEADER" | ${TEST_CMD} >/dev/null 2>&1 if [ $? -eq 0 ] || [ "$UNAME" = "Darwin" ]; then # Always build with arrow on macOS PKG_CFLAGS="$PKG_CFLAGS -DARROW_R_WITH_ARROW" + # Check for features + LIB_DIR=`echo $PKG_LIBS | sed -e 's/ -l.*//' | sed -e 's/^-L//'` + grep 'set(ARROW_S3 "ON")' $LIB_DIR/cmake/arrow/ArrowOptions.cmake >/dev/null 2>&1 + if [ $? -eq 0 ]; then + PKG_CFLAGS="$PKG_CFLAGS -DARROW_R_WITH_S3" + fi echo "PKG_CFLAGS=$PKG_CFLAGS" echo "PKG_LIBS=$PKG_LIBS" else diff --git a/r/data-raw/codegen.R b/r/data-raw/codegen.R index b90f8543aa2..54c1d121332 100644 --- a/r/data-raw/codegen.R +++ b/r/data-raw/codegen.R @@ -48,16 +48,19 @@ if (packageVersion("decor") < '0.0.0.9001') { stop("more recent version of `decor` needed, please install with `remotes::install_github('romainfrancois/decor')`") } -decorations <- cpp_decorations() %>% - filter(decoration == "arrow::export") %>% - # the three lines below can be expressed with rap() - # more concisely - # rap( ~ decor:::parse_cpp_function(context)) - mutate(functions = map(context, decor:::parse_cpp_function)) %>% - { vec_cbind(., vec_rbind(!!!pull(., functions))) } %>% - select(-functions) - -message(glue("*** > {n} functions decorated with [[arrow::export]]", n = nrow(decorations))) +get_exported_functions <- function(decorations, export_tag) { + out <- decorations %>% + filter(decoration %in% paste0(export_tag, "::export")) %>% + # the three lines below can be expressed with rap() + # more concisely + # rap( ~ decor:::parse_cpp_function(context)) + mutate(functions = map(context, decor:::parse_cpp_function)) %>% + { vec_cbind(., vec_rbind(!!!pull(., functions))) } %>% + select(-functions) %>% + mutate(decoration = sub("::export", "", decoration)) + message(glue("*** > {n} functions decorated with [[{tags}::export]]", n = nrow(out), tags = paste0(export_tag, collapse = "|"))) + out +} glue_collapse_data <- function(data, ..., sep = ", ", last = "") { res <- glue_collapse(glue_data(data, ...), sep = sep, last = last) @@ -73,12 +76,16 @@ wrap_call <- function(name, return_type, args) { glue::glue("\treturn Rcpp::wrap({call});") } } -cpp_functions_definitions <- decorations %>% - select(name, return_type, args, file, line) %>% - pmap_chr(function(name, return_type, args, file, line){ + +all_decorations <- cpp_decorations() +arrow_exports <- get_exported_functions(all_decorations, c("arrow", "s3")) + +cpp_functions_definitions <- arrow_exports %>% + select(name, return_type, args, file, line, decoration) %>% + pmap_chr(function(name, return_type, args, file, line, decoration){ glue::glue(' // {basename(file)} - #if defined(ARROW_R_WITH_ARROW) + #if defined(ARROW_R_WITH_{toupper(decoration)}) {return_type} {name}({real_params}); RcppExport SEXP _arrow_{name}({sexp_params}){{ BEGIN_RCPP @@ -101,7 +108,7 @@ cpp_functions_definitions <- decorations %>% }) %>% glue_collapse(sep = "\n") -cpp_functions_registration <- decorations %>% +cpp_functions_registration <- arrow_exports %>% select(name, return_type, args) %>% pmap_chr(function(name, return_type, args){ glue('\t\t{{ "_arrow_{name}", (DL_FUNC) &_arrow_{name}, {nrow(args)}}}, ') @@ -127,8 +134,19 @@ return Rf_ScalarLogical( ); }} +extern "C" SEXP _s3_available() {{ +return Rf_ScalarLogical( +#if defined(ARROW_R_WITH_S3) + TRUE +#else + FALSE +#endif +); +}} + static const R_CallMethodDef CallEntries[] = {{ \t\t{{ "_arrow_available", (DL_FUNC)& _arrow_available, 0 }}, +\t\t{{ "_s3_available", (DL_FUNC)& _s3_available, 0 }}, {cpp_functions_registration} \t\t{{NULL, NULL, 0}} }}; @@ -142,7 +160,7 @@ RcppExport void R_init_arrow(DllInfo* dll){{ message("*** > generated file `src/arrowExports.cpp`") -r_functions <- decorations %>% +r_functions <- arrow_exports %>% select(name, return_type, args) %>% pmap_chr(function(name, return_type, args) { params <- if (nrow(args)) { diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 599f8d22ee4..f3c508d5d0b 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -3434,6 +3434,35 @@ RcppExport SEXP _arrow_fs___SubTreeFileSystem__create(SEXP base_path_sexp, SEXP } #endif +// filesystem.cpp +#if defined(ARROW_R_WITH_S3) +void fs___EnsureS3Initialized(); +RcppExport SEXP _arrow_fs___EnsureS3Initialized(){ +BEGIN_RCPP + fs___EnsureS3Initialized(); + return R_NilValue; +END_RCPP +} +#else +RcppExport SEXP _arrow_fs___EnsureS3Initialized(){ + Rf_error("Cannot call fs___EnsureS3Initialized(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + +// filesystem.cpp +#if defined(ARROW_R_WITH_S3) +std::shared_ptr fs___S3FileSystem__create(); +RcppExport SEXP _arrow_fs___S3FileSystem__create(){ +BEGIN_RCPP + return Rcpp::wrap(fs___S3FileSystem__create()); +END_RCPP +} +#else +RcppExport SEXP _arrow_fs___S3FileSystem__create(){ + Rf_error("Cannot call fs___S3FileSystem__create(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // io.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr io___Readable__Read(const std::shared_ptr& x, int64_t nbytes); @@ -5810,8 +5839,19 @@ return Rf_ScalarLogical( ); } +extern "C" SEXP _s3_available() { +return Rf_ScalarLogical( +#if defined(ARROW_R_WITH_S3) + TRUE +#else + FALSE +#endif +); +} + static const R_CallMethodDef CallEntries[] = { { "_arrow_available", (DL_FUNC)& _arrow_available, 0 }, + { "_s3_available", (DL_FUNC)& _s3_available, 0 }, { "_arrow_Array__Slice1", (DL_FUNC) &_arrow_Array__Slice1, 2}, { "_arrow_Array__Slice2", (DL_FUNC) &_arrow_Array__Slice2, 3}, { "_arrow_Array__IsNull", (DL_FUNC) &_arrow_Array__IsNull, 2}, @@ -6032,6 +6072,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_fs___FileSystem__OpenAppendStream", (DL_FUNC) &_arrow_fs___FileSystem__OpenAppendStream, 2}, { "_arrow_fs___LocalFileSystem__create", (DL_FUNC) &_arrow_fs___LocalFileSystem__create, 0}, { "_arrow_fs___SubTreeFileSystem__create", (DL_FUNC) &_arrow_fs___SubTreeFileSystem__create, 2}, + { "_arrow_fs___EnsureS3Initialized", (DL_FUNC) &_arrow_fs___EnsureS3Initialized, 0}, + { "_arrow_fs___S3FileSystem__create", (DL_FUNC) &_arrow_fs___S3FileSystem__create, 0}, { "_arrow_io___Readable__Read", (DL_FUNC) &_arrow_io___Readable__Read, 2}, { "_arrow_io___InputStream__Close", (DL_FUNC) &_arrow_io___InputStream__Close, 1}, { "_arrow_io___OutputStream__Close", (DL_FUNC) &_arrow_io___OutputStream__Close, 1}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 502ee4d2e41..bcc1869ef9b 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -203,6 +203,7 @@ inline std::shared_ptr extract(SEXP x) { #include #include #include +#include #include #include #include diff --git a/r/src/filesystem.cpp b/r/src/filesystem.cpp index b1e0d61025e..20ca7762c50 100644 --- a/r/src/filesystem.cpp +++ b/r/src/filesystem.cpp @@ -210,3 +210,18 @@ std::shared_ptr fs___SubTreeFileSystem__create( } #endif + +#if defined(ARROW_R_WITH_S3) + +// [[s3::export]] +void fs___EnsureS3Initialized() { + STOP_IF_NOT_OK(fs::EnsureS3Initialized()); +} + +// [[s3::export]] +std::shared_ptr fs___S3FileSystem__create() { + fs::S3Options opts; + return VALUE_OR_STOP(fs::S3FileSystem::Make(opts)); +} + +#endif diff --git a/r/tests/testthat/helper-skip.R b/r/tests/testthat/helper-skip.R index 2026021daed..78c70eca51d 100644 --- a/r/tests/testthat/helper-skip.R +++ b/r/tests/testthat/helper-skip.R @@ -16,8 +16,9 @@ # under the License. skip_if_not_available <- function(feature) { - # This is currently for compression only but we can extend to other features - if (!codec_is_available(feature)) { + if (feature == "s3") { + skip_if_not(arrow_with_s3()) + } else if (!codec_is_available(feature)) { skip(paste("Arrow C++ not built with support for", feature)) } } diff --git a/r/tests/testthat/test-filesystem.R b/r/tests/testthat/test-filesystem.R index 8fddf7ad291..c8d2fe1d1d6 100644 --- a/r/tests/testthat/test-filesystem.R +++ b/r/tests/testthat/test-filesystem.R @@ -122,3 +122,9 @@ test_that("LocalFileSystem + Selector", { expect_equal(sum(types == FileType$File), 2L) expect_equal(sum(types == FileType$Directory), 1L) }) + +test_that("S3FileSystem", { + skip_if_not_available("s3") + s3fs <- S3FileSystem$create() + expect_is(s3fs, "S3FileSystem") +}) From 8667d12bc74d9db09e06c4fb31487fe224a94fd4 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 13 Apr 2020 10:44:23 -0700 Subject: [PATCH 02/10] Add FileSystem$from_uri --- r/R/arrowExports.R | 8 ++++++++ r/R/filesystem.R | 20 +++++++++++++++++++ r/src/arrowExports.cpp | 32 ++++++++++++++++++++++++++++++ r/src/filesystem.cpp | 13 ++++++++++++ r/tests/testthat/test-filesystem.R | 11 ++++++++-- 5 files changed, 82 insertions(+), 2 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 2bcc14970e8..581d9768543 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -872,6 +872,10 @@ fs___FileSystem__OpenAppendStream <- function(file_system, path){ .Call(`_arrow_fs___FileSystem__OpenAppendStream` , file_system, path) } +fs___FileSystem__type_name <- function(file_system){ + .Call(`_arrow_fs___FileSystem__type_name` , file_system) +} + fs___LocalFileSystem__create <- function(){ .Call(`_arrow_fs___LocalFileSystem__create` ) } @@ -880,6 +884,10 @@ fs___SubTreeFileSystem__create <- function(base_path, base_fs){ .Call(`_arrow_fs___SubTreeFileSystem__create` , base_path, base_fs) } +fs___FileSystemFromUri <- function(path){ + .Call(`_arrow_fs___FileSystemFromUri` , path) +} + fs___EnsureS3Initialized <- function(){ invisible(.Call(`_arrow_fs___EnsureS3Initialized` )) } diff --git a/r/R/filesystem.R b/r/R/filesystem.R index f2d5294f189..f0e123ac4cd 100644 --- a/r/R/filesystem.R +++ b/r/R/filesystem.R @@ -166,6 +166,18 @@ FileSelector$create <- function(base_dir, allow_not_found = FALSE, recursive = F #' @export FileSystem <- R6Class("FileSystem", inherit = ArrowObject, public = list( + ..dispatch = function() { + type_name <- self$type_name + if (type_name == "local") { + shared_ptr(LocalFileSystem, self$pointer()) + } else if (type_name == "s3") { + shared_ptr(S3FileSystem, self$pointer()) + } else if (type_name == "subtree") { + shared_ptr(SubTreeFileSystem, self$pointer()) + } else { + self + } + }, GetFileInfo = function(x) { if (inherits(x, "FileSelector")) { map( @@ -224,8 +236,16 @@ FileSystem <- R6Class("FileSystem", inherit = ArrowObject, OpenAppendStream = function(path) { shared_ptr(OutputStream, fs___FileSystem__OpenAppendStream(self, clean_path_rel(path))) } + ), + active = list( + type_name = function() fs___FileSystem__type_name(self) ) ) +FileSystem$from_uri <- function(uri) { + out <- fs___FileSystemFromUri(uri) + out$fs <- shared_ptr(FileSystem, out$fs)$..dispatch() + out +} #' @usage NULL #' @format NULL diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index f3c508d5d0b..f05cc950998 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -3404,6 +3404,21 @@ RcppExport SEXP _arrow_fs___FileSystem__OpenAppendStream(SEXP file_system_sexp, } #endif +// filesystem.cpp +#if defined(ARROW_R_WITH_ARROW) +std::string fs___FileSystem__type_name(const std::shared_ptr& file_system); +RcppExport SEXP _arrow_fs___FileSystem__type_name(SEXP file_system_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type file_system(file_system_sexp); + return Rcpp::wrap(fs___FileSystem__type_name(file_system)); +END_RCPP +} +#else +RcppExport SEXP _arrow_fs___FileSystem__type_name(SEXP file_system_sexp){ + Rf_error("Cannot call fs___FileSystem__type_name(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // filesystem.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr fs___LocalFileSystem__create(); @@ -3434,6 +3449,21 @@ RcppExport SEXP _arrow_fs___SubTreeFileSystem__create(SEXP base_path_sexp, SEXP } #endif +// filesystem.cpp +#if defined(ARROW_R_WITH_ARROW) +Rcpp::List fs___FileSystemFromUri(const std::string& path); +RcppExport SEXP _arrow_fs___FileSystemFromUri(SEXP path_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter::type path(path_sexp); + return Rcpp::wrap(fs___FileSystemFromUri(path)); +END_RCPP +} +#else +RcppExport SEXP _arrow_fs___FileSystemFromUri(SEXP path_sexp){ + Rf_error("Cannot call fs___FileSystemFromUri(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // filesystem.cpp #if defined(ARROW_R_WITH_S3) void fs___EnsureS3Initialized(); @@ -6070,8 +6100,10 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_fs___FileSystem__OpenInputFile", (DL_FUNC) &_arrow_fs___FileSystem__OpenInputFile, 2}, { "_arrow_fs___FileSystem__OpenOutputStream", (DL_FUNC) &_arrow_fs___FileSystem__OpenOutputStream, 2}, { "_arrow_fs___FileSystem__OpenAppendStream", (DL_FUNC) &_arrow_fs___FileSystem__OpenAppendStream, 2}, + { "_arrow_fs___FileSystem__type_name", (DL_FUNC) &_arrow_fs___FileSystem__type_name, 1}, { "_arrow_fs___LocalFileSystem__create", (DL_FUNC) &_arrow_fs___LocalFileSystem__create, 0}, { "_arrow_fs___SubTreeFileSystem__create", (DL_FUNC) &_arrow_fs___SubTreeFileSystem__create, 2}, + { "_arrow_fs___FileSystemFromUri", (DL_FUNC) &_arrow_fs___FileSystemFromUri, 1}, { "_arrow_fs___EnsureS3Initialized", (DL_FUNC) &_arrow_fs___EnsureS3Initialized, 0}, { "_arrow_fs___S3FileSystem__create", (DL_FUNC) &_arrow_fs___S3FileSystem__create, 0}, { "_arrow_io___Readable__Read", (DL_FUNC) &_arrow_io___Readable__Read, 2}, diff --git a/r/src/filesystem.cpp b/r/src/filesystem.cpp index 20ca7762c50..0606d9280dc 100644 --- a/r/src/filesystem.cpp +++ b/r/src/filesystem.cpp @@ -198,6 +198,12 @@ std::shared_ptr fs___FileSystem__OpenAppendStream( return VALUE_OR_STOP(file_system->OpenAppendStream(path)); } +// [[arrow::export]] +std::string fs___FileSystem__type_name( + const std::shared_ptr& file_system) { + return file_system->type_name(); +} + // [[arrow::export]] std::shared_ptr fs___LocalFileSystem__create() { return std::make_shared(); @@ -209,6 +215,13 @@ std::shared_ptr fs___SubTreeFileSystem__create( return std::make_shared(base_path, base_fs); } +// [[arrow::export]] +Rcpp::List fs___FileSystemFromUri(const std::string& path) { + std::string out_path; + auto file_system = VALUE_OR_STOP(fs::FileSystemFromUri(path, &out_path)); + return Rcpp::List::create(Rcpp::Named("fs") = file_system, Rcpp::Named("path") = out_path); +} + #endif #if defined(ARROW_R_WITH_S3) diff --git a/r/tests/testthat/test-filesystem.R b/r/tests/testthat/test-filesystem.R index c8d2fe1d1d6..b58aa459086 100644 --- a/r/tests/testthat/test-filesystem.R +++ b/r/tests/testthat/test-filesystem.R @@ -19,6 +19,7 @@ context("File system") test_that("LocalFilesystem", { fs <- LocalFileSystem$create() + expect_identical(fs$type_name, "local") DESCRIPTION <- system.file("DESCRIPTION", package = "arrow") info <- fs$GetFileInfo(DESCRIPTION)[[1]] expect_equal(info$base_name(), "DESCRIPTION") @@ -123,8 +124,14 @@ test_that("LocalFileSystem + Selector", { expect_equal(sum(types == FileType$Directory), 1L) }) +test_that("FileSystem$from_uri", { + skip_if_not_available("s3") + fs_and_path <- FileSystem$from_uri("s3://ursa-labs-taxi-data") + expect_is(fs_and_path$fs, "S3FileSystem") +}) + test_that("S3FileSystem", { skip_if_not_available("s3") - s3fs <- S3FileSystem$create() - expect_is(s3fs, "S3FileSystem") + # s3fs <- S3FileSystem$create() + # expect_is(s3fs, "S3FileSystem") }) From cce3bc5fbd440999b4d169540d52936c42195045 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 13 Apr 2020 12:39:33 -0700 Subject: [PATCH 03/10] Wire up FileSystem$from_uri in DatasetFactory$create --- r/NAMESPACE | 1 + r/R/dataset.R | 34 ++++++++++-------------------- r/man/FileSystem.Rd | 1 + r/man/cpu_count.Rd | 10 +++++++-- r/man/dataset_factory.Rd | 16 ++++---------- r/man/set_cpu_count.Rd | 14 ------------ r/tests/testthat/test-dataset.R | 17 +++++++++++++++ r/tests/testthat/test-filesystem.R | 2 ++ 8 files changed, 44 insertions(+), 51 deletions(-) delete mode 100644 r/man/set_cpu_count.Rd diff --git a/r/NAMESPACE b/r/NAMESPACE index 598a7f0b427..568c72f0300 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -119,6 +119,7 @@ export(RecordBatchFileReader) export(RecordBatchFileWriter) export(RecordBatchStreamReader) export(RecordBatchStreamWriter) +export(S3FileSystem) export(ScalarExpression) export(Scanner) export(ScannerBuilder) diff --git a/r/R/dataset.R b/r/R/dataset.R index 3ef89504b66..1baec6cd5d8 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -268,11 +268,9 @@ DatasetFactory <- R6Class("DatasetFactory", inherit = ArrowObject, ) ) DatasetFactory$create <- function(x, - filesystem = c("auto", "local"), + filesystem = NULL, format = c("parquet", "arrow", "ipc", "feather"), partitioning = NULL, - allow_not_found = FALSE, - recursive = TRUE, ...) { if (is_list_of(x, "DatasetFactory")) { return(shared_ptr(DatasetFactory, dataset___UnionDatasetFactory__Make(x))) @@ -282,21 +280,15 @@ DatasetFactory$create <- function(x, } if (!inherits(filesystem, "FileSystem")) { - filesystem <- match.arg(filesystem) - if (filesystem == "auto") { - # When there are other FileSystems supported, detect e.g. S3 from x - filesystem <- "local" + if (grepl("://", x)) { + fs_from_uri <- FileSystem$from_uri(x) + filesystem <- fs_from_uri$fs + x <- fs_from_uri$path + } else { + filesystem <- LocalFileSystem$create() } - filesystem <- list( - local = LocalFileSystem - # We'll register other file systems here - )[[filesystem]]$create(...) } - selector <- FileSelector$create( - x, - allow_not_found = allow_not_found, - recursive = recursive - ) + selector <- FileSelector$create(x, allow_not_found = FALSE, recursive = TRUE) if (is.character(format)) { format <- FileFormat$create(match.arg(format)) @@ -331,8 +323,8 @@ DatasetFactory$create <- function(x, #' a list of `DatasetFactory` objects whose datasets should be #' grouped. If this argument is specified it will be used to construct a #' `UnionDatasetFactory` and other arguments will be ignored. -#' @param filesystem A string identifier for the filesystem corresponding to -#' `x`. Currently only "local" is supported. +#' @param filesystem A [FileSystem] object; if omitted, the `FileSystem` will +#' be detected from `x` #' @param format A string identifier of the format of the files in `x`. #' Currently "parquet" and "ipc"/"arrow"/"feather" (aliases for each other) #' are supported. For Feather, only version 2 files are supported. @@ -348,11 +340,7 @@ DatasetFactory$create <- function(x, #' by [hive_partition()] which parses explicit or autodetected fields from #' Hive-style path segments #' * `NULL` for no partitioning -#' @param allow_not_found logical: is `x` allowed to not exist? Default -#' `FALSE`. See [FileSelector]. -#' @param recursive logical: should files be discovered in subdirectories of -#' `x`? Default `TRUE`. -#' @param ... Additional arguments passed to the [FileSystem] `$create()` method +#' @param ... Additional arguments, currently ignored #' @return A `DatasetFactory` object. Pass this to [open_dataset()], #' in a list potentially with other `DatasetFactory` objects, to create #' a `Dataset`. diff --git a/r/man/FileSystem.Rd b/r/man/FileSystem.Rd index 5b627f267cc..a675c5bfb4e 100644 --- a/r/man/FileSystem.Rd +++ b/r/man/FileSystem.Rd @@ -4,6 +4,7 @@ \name{FileSystem} \alias{FileSystem} \alias{LocalFileSystem} +\alias{S3FileSystem} \alias{SubTreeFileSystem} \title{FileSystem classes} \description{ diff --git a/r/man/cpu_count.Rd b/r/man/cpu_count.Rd index 5764da6db6a..f2abfc197ac 100644 --- a/r/man/cpu_count.Rd +++ b/r/man/cpu_count.Rd @@ -2,10 +2,16 @@ % Please edit documentation in R/config.R \name{cpu_count} \alias{cpu_count} -\title{Return number of threads used by the global CPU thread pool in libarrow} +\alias{set_cpu_count} +\title{Manage the global CPU thread pool in libarrow} \usage{ cpu_count() + +set_cpu_count(num_threads) +} +\arguments{ +\item{num_threads}{integer: New number of threads for thread pool} } \description{ -Return number of threads used by the global CPU thread pool in libarrow +Manage the global CPU thread pool in libarrow } diff --git a/r/man/dataset_factory.Rd b/r/man/dataset_factory.Rd index b905d3f4649..c9950531aa7 100644 --- a/r/man/dataset_factory.Rd +++ b/r/man/dataset_factory.Rd @@ -6,11 +6,9 @@ \usage{ dataset_factory( x, - filesystem = c("auto", "local"), + filesystem = NULL, format = c("parquet", "arrow", "ipc", "feather"), partitioning = NULL, - allow_not_found = FALSE, - recursive = TRUE, ... ) } @@ -20,8 +18,8 @@ a list of \code{DatasetFactory} objects whose datasets should be grouped. If this argument is specified it will be used to construct a \code{UnionDatasetFactory} and other arguments will be ignored.} -\item{filesystem}{A string identifier for the filesystem corresponding to -\code{x}. Currently only "local" is supported.} +\item{filesystem}{A \link{FileSystem} object; if omitted, the \code{FileSystem} will +be detected from \code{x}} \item{format}{A string identifier of the format of the files in \code{x}. Currently "parquet" and "ipc"/"arrow"/"feather" (aliases for each other) @@ -42,13 +40,7 @@ Hive-style path segments \item \code{NULL} for no partitioning }} -\item{allow_not_found}{logical: is \code{x} allowed to not exist? Default -\code{FALSE}. See \link{FileSelector}.} - -\item{recursive}{logical: should files be discovered in subdirectories of -\code{x}? Default \code{TRUE}.} - -\item{...}{Additional arguments passed to the \link{FileSystem} \verb{$create()} method} +\item{...}{Additional arguments, currently ignored} } \value{ A \code{DatasetFactory} object. Pass this to \code{\link[=open_dataset]{open_dataset()}}, diff --git a/r/man/set_cpu_count.Rd b/r/man/set_cpu_count.Rd deleted file mode 100644 index dc36e9140d2..00000000000 --- a/r/man/set_cpu_count.Rd +++ /dev/null @@ -1,14 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/config.R -\name{set_cpu_count} -\alias{set_cpu_count} -\title{Set number of threads used by the global CPU thread pool in libarrow} -\usage{ -set_cpu_count(num_threads) -} -\arguments{ -\item{num_threads}{integer: New number of threads for thread pool} -} -\description{ -Set number of threads used by the global CPU thread pool in libarrow -} diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 03a0a4be2a1..021ba4268eb 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -118,6 +118,23 @@ test_that("dim() correctly determine numbers of rows and columns on arrow_dplyr_ }) +test_that("dataset from URI", { + uri <- paste0("file:/", dataset_dir) + ds <- open_dataset(uri, partitioning = schema(part = uint8())) + expect_is(ds, "Dataset") + expect_equivalent( + ds %>% + select(chr, dbl) %>% + filter(dbl > 7 & dbl < 53L) %>% + collect() %>% + arrange(dbl), + rbind( + df1[8:10, c("chr", "dbl")], + df2[1:2, c("chr", "dbl")] + ) + ) +}) + test_that("Simple interface for datasets (custom ParquetFileFormat)", { ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()), format = FileFormat$create("parquet", dict_columns = c("chr"))) diff --git a/r/tests/testthat/test-filesystem.R b/r/tests/testthat/test-filesystem.R index b58aa459086..01699da9dd0 100644 --- a/r/tests/testthat/test-filesystem.R +++ b/r/tests/testthat/test-filesystem.R @@ -125,12 +125,14 @@ test_that("LocalFileSystem + Selector", { }) test_that("FileSystem$from_uri", { + skip_on_cran() skip_if_not_available("s3") fs_and_path <- FileSystem$from_uri("s3://ursa-labs-taxi-data") expect_is(fs_and_path$fs, "S3FileSystem") }) test_that("S3FileSystem", { + skip_on_cran() skip_if_not_available("s3") # s3fs <- S3FileSystem$create() # expect_is(s3fs, "S3FileSystem") From 124b502357d394d3c813d1d7f0d343e9e586d4f2 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Wed, 15 Apr 2020 13:27:59 -0700 Subject: [PATCH 04/10] Lint --- r/src/filesystem.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/r/src/filesystem.cpp b/r/src/filesystem.cpp index 0606d9280dc..4edefaff790 100644 --- a/r/src/filesystem.cpp +++ b/r/src/filesystem.cpp @@ -219,7 +219,8 @@ std::shared_ptr fs___SubTreeFileSystem__create( Rcpp::List fs___FileSystemFromUri(const std::string& path) { std::string out_path; auto file_system = VALUE_OR_STOP(fs::FileSystemFromUri(path, &out_path)); - return Rcpp::List::create(Rcpp::Named("fs") = file_system, Rcpp::Named("path") = out_path); + return Rcpp::List::create(Rcpp::Named("fs") = file_system, + Rcpp::Named("path") = out_path); } #endif @@ -227,9 +228,7 @@ Rcpp::List fs___FileSystemFromUri(const std::string& path) { #if defined(ARROW_R_WITH_S3) // [[s3::export]] -void fs___EnsureS3Initialized() { - STOP_IF_NOT_OK(fs::EnsureS3Initialized()); -} +void fs___EnsureS3Initialized() { STOP_IF_NOT_OK(fs::EnsureS3Initialized()); } // [[s3::export]] std::shared_ptr fs___S3FileSystem__create() { From 5e899f5771aaf0b5cf34349ccd652680e9a02ad2 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Wed, 15 Apr 2020 13:41:56 -0700 Subject: [PATCH 05/10] Fix test --- r/tests/testthat/test-dataset.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 021ba4268eb..5b48009da36 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -119,7 +119,7 @@ test_that("dim() correctly determine numbers of rows and columns on arrow_dplyr_ }) test_that("dataset from URI", { - uri <- paste0("file:/", dataset_dir) + uri <- paste0("file://", dataset_dir) ds <- open_dataset(uri, partitioning = schema(part = uint8())) expect_is(ds, "Dataset") expect_equivalent( From 5c04d4284ef8ab08986556ea330ee9dca4172881 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Wed, 15 Apr 2020 13:43:24 -0700 Subject: [PATCH 06/10] Turn off windows test verbosity --- .github/workflows/r.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/r.yml b/.github/workflows/r.yml index fdca3bdc18f..a9f72934b77 100644 --- a/.github/workflows/r.yml +++ b/.github/workflows/r.yml @@ -156,6 +156,9 @@ jobs: with: fetch-depth: 0 - name: Make R tests verbose + # If you get a segfault/mysterious test Execution halted, + # make this `true` to see where it dies. + if: false shell: cmd run: | cd r/tests From 3a40da46e6ca1c12460b9475f5518b290eb6e2cb Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Wed, 15 Apr 2020 16:29:09 -0700 Subject: [PATCH 07/10] Skip test on windows (not worth debugging) --- r/tests/testthat/test-dataset.R | 1 + 1 file changed, 1 insertion(+) diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 5b48009da36..acf4d73d86d 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -119,6 +119,7 @@ test_that("dim() correctly determine numbers of rows and columns on arrow_dplyr_ }) test_that("dataset from URI", { + skip_on_os("windows") uri <- paste0("file://", dataset_dir) ds <- open_dataset(uri, partitioning = schema(part = uint8())) expect_is(ds, "Dataset") From c7945119e7a1d85ab3d70780d350b4cd78b9234c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Thu, 16 Apr 2020 16:03:01 -0400 Subject: [PATCH 08/10] Fix S3FileSystem --- r/src/filesystem.cpp | 2 +- r/tests/testthat/test-filesystem.R | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/r/src/filesystem.cpp b/r/src/filesystem.cpp index 4edefaff790..54f24918225 100644 --- a/r/src/filesystem.cpp +++ b/r/src/filesystem.cpp @@ -232,7 +232,7 @@ void fs___EnsureS3Initialized() { STOP_IF_NOT_OK(fs::EnsureS3Initialized()); } // [[s3::export]] std::shared_ptr fs___S3FileSystem__create() { - fs::S3Options opts; + auto opts = fs::S3Options::Defaults(); return VALUE_OR_STOP(fs::S3FileSystem::Make(opts)); } diff --git a/r/tests/testthat/test-filesystem.R b/r/tests/testthat/test-filesystem.R index 01699da9dd0..fc0e02f34a1 100644 --- a/r/tests/testthat/test-filesystem.R +++ b/r/tests/testthat/test-filesystem.R @@ -134,6 +134,6 @@ test_that("FileSystem$from_uri", { test_that("S3FileSystem", { skip_on_cran() skip_if_not_available("s3") - # s3fs <- S3FileSystem$create() - # expect_is(s3fs, "S3FileSystem") + s3fs <- S3FileSystem$create() + expect_is(s3fs, "S3FileSystem") }) From 9e5282dc77e7817817740ad7311ffa078da14e8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Thu, 16 Apr 2020 16:06:56 -0400 Subject: [PATCH 09/10] Improve S3 tests speed --- ci/scripts/cpp_test.sh | 2 ++ ci/scripts/r_test.sh | 1 + 2 files changed, 3 insertions(+) diff --git a/ci/scripts/cpp_test.sh b/ci/scripts/cpp_test.sh index 697241e79d9..38ce4406c72 100755 --- a/ci/scripts/cpp_test.sh +++ b/ci/scripts/cpp_test.sh @@ -28,6 +28,8 @@ export ARROW_TEST_DATA=${arrow_dir}/testing/data export PARQUET_TEST_DATA=${source_dir}/submodules/parquet-testing/data export LD_LIBRARY_PATH=${ARROW_HOME}/${CMAKE_INSTALL_LIBDIR:-lib}:${LD_LIBRARY_PATH} +export AWS_EC2_METADATA_DISABLED=TRUE + case "$(uname)" in Linux) n_jobs=$(nproc) diff --git a/ci/scripts/r_test.sh b/ci/scripts/r_test.sh index e79fac5f361..c08f48ca575 100755 --- a/ci/scripts/r_test.sh +++ b/ci/scripts/r_test.sh @@ -42,6 +42,7 @@ export _R_CHECK_TESTS_NLINES_=0 export _R_CHECK_CRAN_INCOMING_REMOTE_=FALSE export _R_CHECK_LIMIT_CORES_=FALSE export VERSION=$(grep ^Version DESCRIPTION | sed s/Version:\ //) +export AWS_EC2_METADATA_DISABLED=TRUE # Make sure we aren't writing to the home dir (CRAN _hates_ this but there is no official check) BEFORE=$(ls -alh ~/) From 3f49da9be0bd95ae564357f363981c922868d447 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Thu, 16 Apr 2020 16:08:09 -0700 Subject: [PATCH 10/10] Add comments to explain AWS env var --- ci/scripts/cpp_test.sh | 2 ++ ci/scripts/r_test.sh | 2 ++ 2 files changed, 4 insertions(+) diff --git a/ci/scripts/cpp_test.sh b/ci/scripts/cpp_test.sh index 38ce4406c72..21284eef0fa 100755 --- a/ci/scripts/cpp_test.sh +++ b/ci/scripts/cpp_test.sh @@ -28,6 +28,8 @@ export ARROW_TEST_DATA=${arrow_dir}/testing/data export PARQUET_TEST_DATA=${source_dir}/submodules/parquet-testing/data export LD_LIBRARY_PATH=${ARROW_HOME}/${CMAKE_INSTALL_LIBDIR:-lib}:${LD_LIBRARY_PATH} +# By default, aws-sdk tries to contact a non-existing local ip host +# to retrieve metadata. Disable this so that S3FileSystem tests run faster. export AWS_EC2_METADATA_DISABLED=TRUE case "$(uname)" in diff --git a/ci/scripts/r_test.sh b/ci/scripts/r_test.sh index c08f48ca575..df6b1697acd 100755 --- a/ci/scripts/r_test.sh +++ b/ci/scripts/r_test.sh @@ -42,6 +42,8 @@ export _R_CHECK_TESTS_NLINES_=0 export _R_CHECK_CRAN_INCOMING_REMOTE_=FALSE export _R_CHECK_LIMIT_CORES_=FALSE export VERSION=$(grep ^Version DESCRIPTION | sed s/Version:\ //) +# By default, aws-sdk tries to contact a non-existing local ip host +# to retrieve metadata. Disable this so that S3FileSystem tests run faster. export AWS_EC2_METADATA_DISABLED=TRUE # Make sure we aren't writing to the home dir (CRAN _hates_ this but there is no official check)