diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 9c008e2da32..7c7b1f3cea2 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -600,20 +600,12 @@ dataset___UnionDatasetFactory__Make <- function(children) { .Call(`_arrow_dataset___UnionDatasetFactory__Make`, children) } -dataset___FileSystemDatasetFactory__Make0 <- function(fs, paths, format) { - .Call(`_arrow_dataset___FileSystemDatasetFactory__Make0`, fs, paths, format) +dataset___FileSystemDatasetFactory__Make <- function(fs, selector, format, fsf_options) { + .Call(`_arrow_dataset___FileSystemDatasetFactory__Make`, fs, selector, format, fsf_options) } -dataset___FileSystemDatasetFactory__Make2 <- function(fs, selector, format, partitioning) { - .Call(`_arrow_dataset___FileSystemDatasetFactory__Make2`, fs, selector, format, partitioning) -} - -dataset___FileSystemDatasetFactory__Make1 <- function(fs, selector, format) { - .Call(`_arrow_dataset___FileSystemDatasetFactory__Make1`, fs, selector, format) -} - -dataset___FileSystemDatasetFactory__Make3 <- function(fs, selector, format, factory) { - .Call(`_arrow_dataset___FileSystemDatasetFactory__Make3`, fs, selector, format, factory) +dataset___FileSystemDatasetFactory__MakePaths <- function(fs, paths, format, exclude_invalid_files) { + .Call(`_arrow_dataset___FileSystemDatasetFactory__MakePaths`, fs, paths, format, exclude_invalid_files) } dataset___FileFormat__type_name <- function(format) { @@ -2011,3 +2003,4 @@ SetIOThreadPoolCapacity <- function(threads) { Array__infer_type <- function(x) { .Call(`_arrow_Array__infer_type`, x) } + diff --git a/r/R/dataset-factory.R b/r/R/dataset-factory.R index 13fe81f0c5b..c1818d8b54c 100644 --- a/r/R/dataset-factory.R +++ b/r/R/dataset-factory.R @@ -42,6 +42,7 @@ DatasetFactory$create <- function(x, format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text"), partitioning = NULL, hive_style = NA, + factory_options = list(), ...) { if (is_list_of(x, "DatasetFactory")) { return(dataset___UnionDatasetFactory__Make(x)) @@ -58,9 +59,26 @@ DatasetFactory$create <- function(x, if (length(info) > 1 || info[[1]]$type == FileType$File) { # x looks like a vector of one or more file paths (not a directory path) - return(FileSystemDatasetFactory$create(path_and_fs$fs, NULL, path_and_fs$path, format)) + return(FileSystemDatasetFactory$create( + path_and_fs$fs, + NULL, + path_and_fs$path, + format, + factory_options = factory_options + )) } + partitioning <- handle_partitioning(partitioning, path_and_fs, hive_style) + selector <- FileSelector$create( + path_and_fs$path, + allow_not_found = FALSE, + recursive = TRUE + ) + + FileSystemDatasetFactory$create(path_and_fs$fs, selector, NULL, format, partitioning, factory_options) +} + +handle_partitioning <- function(partitioning, path_and_fs, hive_style) { # Handle partitioning arg in cases where it is "character" or "Schema" if (!is.null(partitioning) && !inherits(partitioning, c("Partitioning", "PartitioningFactory"))) { if (!is_false(hive_style)) { @@ -120,14 +138,7 @@ DatasetFactory$create <- function(x, } } } - - selector <- FileSelector$create( - path_and_fs$path, - allow_not_found = FALSE, - recursive = TRUE - ) - - FileSystemDatasetFactory$create(path_and_fs$fs, selector, NULL, format, partitioning) + partitioning } #' Create a DatasetFactory @@ -161,20 +172,38 @@ DatasetFactory$create <- function(x, #' it is assumed to be "text". #' @param partitioning One of #' * A `Schema`, in which case the file paths relative to `sources` will be -#' parsed, and path segments will be matched with the schema fields. For -#' example, `schema(year = int16(), month = int8())` would create partitions -#' for file paths like "2019/01/file.parquet", "2019/02/file.parquet", etc. +#' parsed, and path segments will be matched with the schema fields. For +#' example, `schema(year = int16(), month = int8())` would create partitions +#' for file paths like "2019/01/file.parquet", "2019/02/file.parquet", etc. #' * A character vector that defines the field names corresponding to those -#' path segments (that is, you're providing the names that would correspond -#' to a `Schema` but the types will be autodetected) +#' path segments (that is, you're providing the names that would correspond +#' to a `Schema` but the types will be autodetected) #' * A `HivePartitioning` or `HivePartitioningFactory`, as returned -#' by [hive_partition()] which parses explicit or autodetected fields from -#' Hive-style path segments +#' by [hive_partition()] which parses explicit or autodetected fields from +#' Hive-style path segments #' * `NULL` for no partitioning #' @param hive_style Logical: if `partitioning` is a character vector or a #' `Schema`, should it be interpreted as specifying Hive-style partitioning? #' Default is `NA`, which means to inspect the file paths for Hive-style #' partitioning and behave accordingly. +#' @param factory_options list of optional FileSystemFactoryOptions: +#' * `partition_base_dir`: string path segment prefix to ignore when +#' discovering partition information with DirectoryPartitioning. Not +#' meaningful (ignored with a warning) for HivePartitioning, nor is it +#' valid when providing a vector of file paths. +#' * `exclude_invalid_files`: logical: should files that are not valid data +#' files be excluded? Default is `FALSE` because checking all files up +#' front incurs I/O and thus will be slower, especially on remote +#' filesystems. If false and there are invalid files, there will be an +#' error at scan time. This is the only FileSystemFactoryOption that is +#' valid for both when providing a directory path in which to discover +#' files and when providing a vector of file paths. +#' * `selector_ignore_prefixes`: character vector of file prefixes to ignore +#' when discovering files in a directory. If invalid files can be excluded +#' by a common filename prefix this way, you can avoid the I/O cost of +#' `exclude_invalid_files`. Not valid when providing a vector of file paths +#' (but if you're providing the file list, you can filter invalid files +#' yourself). #' @param ... Additional format-specific options, passed to #' `FileFormat$create()`. For CSV options, note that you can specify them either #' with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the @@ -198,7 +227,8 @@ FileSystemDatasetFactory$create <- function(filesystem, selector = NULL, paths = NULL, format, - partitioning = NULL) { + partitioning = NULL, + factory_options = list()) { assert_is(filesystem, "FileSystem") is.null(selector) || assert_is(selector, "FileSelector") is.null(paths) || assert_is(paths, "character") @@ -208,23 +238,89 @@ FileSystemDatasetFactory$create <- function(filesystem, ) assert_is(format, "FileFormat") if (!is.null(paths)) { - assert_that(is.null(partitioning), msg = "Partitioning not supported with paths") + assert_that( + is.null(partitioning), + msg = "Partitioning not supported with paths" + ) + # Validate that exclude_invalid_files is only option provided + # All other options are only relevant for the FileSelector method + invalid_opts <- setdiff(names(factory_options), "exclude_invalid_files") + if (length(invalid_opts)) { + stop( + "Invalid factory_options for creating a Dataset from a vector of file paths: ", + oxford_paste(invalid_opts), + call. = FALSE + ) + } + return(dataset___FileSystemDatasetFactory__MakePaths( + filesystem, + paths, + format, + isTRUE(factory_options[["exclude_invalid_files"]]) + )) } - if (!is.null(paths)) { - ptr <- dataset___FileSystemDatasetFactory__Make0(filesystem, paths, format) - } else if (is.null(partitioning)) { - ptr <- dataset___FileSystemDatasetFactory__Make1(filesystem, selector, format) - } else if (inherits(partitioning, "PartitioningFactory")) { - ptr <- dataset___FileSystemDatasetFactory__Make3(filesystem, selector, format, partitioning) + dataset___FileSystemDatasetFactory__Make( + filesystem, + selector, + format, + fsf_options(factory_options, partitioning) + ) +} + +fsf_options <- function(factory_options, partitioning) { + # Validate FileSystemFactoryOptions and put partitioning in it + valid_opts <- c( + "partition_base_dir", + "exclude_invalid_files", + "selector_ignore_prefixes" + ) + invalid_opts <- setdiff(names(factory_options), valid_opts) + if (length(invalid_opts)) { + stop("Invalid factory_options: ", oxford_paste(invalid_opts), call. = FALSE) + } + if (!is.null(factory_options$partition_base_dir)) { + if ( + inherits(partitioning, "HivePartitioning") || + ( + inherits(partitioning, "PartitioningFactory") && + identical(partitioning$type_name, "hive") + ) + ) { + warning( + "factory_options$partition_base_dir is not meaningful for Hive partitioning", + call. = FALSE + ) + } else { + assert_that(is.string(factory_options$partition_base_dir)) + } + } + + exclude <- factory_options$exclude_invalid_files %||% FALSE + if (!(isTRUE(exclude) || is_false(exclude))) { + stop( + "factory_options$exclude_invalid_files must be TRUE/FALSE", + call. = FALSE + ) + } + + if (!is.character(factory_options$selector_ignore_prefixes %||% character())) { + stop( + "factory_options$selector_ignore_prefixes must be a character vector", + call. = FALSE + ) + } + + if (inherits(partitioning, "PartitioningFactory")) { + factory_options[["partitioning_factory"]] <- partitioning } else if (inherits(partitioning, "Partitioning")) { - ptr <- dataset___FileSystemDatasetFactory__Make2(filesystem, selector, format, partitioning) - } else { + factory_options[["partitioning"]] <- partitioning + } else if (!is.null(partitioning)) { stop( "Expected 'partitioning' to be NULL, PartitioningFactory or Partitioning", call. = FALSE ) } - ptr + factory_options } diff --git a/r/R/dataset.R b/r/R/dataset.R index 16dfcc32fc3..12765fbfc02 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -119,6 +119,7 @@ #' is a directory path/URI or vector of file paths/URIs, otherwise ignored. #' These may include `format` to indicate the file format, or other #' format-specific options (see [read_csv_arrow()], [read_parquet()] and [read_feather()] on how to specify these). +#' @inheritParams dataset_factory #' @return A [Dataset] R6 object. Use `dplyr` methods on it to query the data, #' or call [`$NewScan()`][Scanner] to construct a query directly. #' @export @@ -178,6 +179,7 @@ open_dataset <- function(sources, hive_style = NA, unify_schemas = NULL, format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text"), + factory_options = list(), ...) { stop_if_no_datasets() @@ -212,6 +214,7 @@ open_dataset <- function(sources, format = format, schema = schema, hive_style = hive_style, + factory_options = factory_options, ... ) tryCatch( diff --git a/r/man/dataset_factory.Rd b/r/man/dataset_factory.Rd index 509d5db6a15..a9d2a384687 100644 --- a/r/man/dataset_factory.Rd +++ b/r/man/dataset_factory.Rd @@ -10,6 +10,7 @@ dataset_factory( format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text"), partitioning = NULL, hive_style = NA, + factory_options = list(), ... ) } @@ -57,6 +58,27 @@ Hive-style path segments Default is \code{NA}, which means to inspect the file paths for Hive-style partitioning and behave accordingly.} +\item{factory_options}{list of optional FileSystemFactoryOptions: +\itemize{ +\item \code{partition_base_dir}: string path segment prefix to ignore when +discovering partition information with DirectoryPartitioning. Not +meaningful (ignored with a warning) for HivePartitioning, nor is it +valid when providing a vector of file paths. +\item \code{exclude_invalid_files}: logical: should files that are not valid data +files be excluded? Default is \code{FALSE} because checking all files up +front incurs I/O and thus will be slower, especially on remote +filesystems. If false and there are invalid files, there will be an +error at scan time. This is the only FileSystemFactoryOption that is +valid for both when providing a directory path in which to discover +files and when providing a vector of file paths. +\item \code{selector_ignore_prefixes}: character vector of file prefixes to ignore +when discovering files in a directory. If invalid files can be excluded +by a common filename prefix this way, you can avoid the I/O cost of +\code{exclude_invalid_files}. Not valid when providing a vector of file paths +(but if you're providing the file list, you can filter invalid files +yourself). +}} + \item{...}{Additional format-specific options, passed to \code{FileFormat$create()}. For CSV options, note that you can specify them either with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the diff --git a/r/man/open_dataset.Rd b/r/man/open_dataset.Rd index eef18b06ec9..d39f6c2023e 100644 --- a/r/man/open_dataset.Rd +++ b/r/man/open_dataset.Rd @@ -11,6 +11,7 @@ open_dataset( hive_style = NA, unify_schemas = NULL, format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text"), + factory_options = list(), ... ) } @@ -78,6 +79,27 @@ delimiter for text files Default is "parquet", unless a \code{delimiter} is also specified, in which case it is assumed to be "text".} +\item{factory_options}{list of optional FileSystemFactoryOptions: +\itemize{ +\item \code{partition_base_dir}: string path segment prefix to ignore when +discovering partition information with DirectoryPartitioning. Not +meaningful (ignored with a warning) for HivePartitioning, nor is it +valid when providing a vector of file paths. +\item \code{exclude_invalid_files}: logical: should files that are not valid data +files be excluded? Default is \code{FALSE} because checking all files up +front incurs I/O and thus will be slower, especially on remote +filesystems. If false and there are invalid files, there will be an +error at scan time. This is the only FileSystemFactoryOption that is +valid for both when providing a directory path in which to discover +files and when providing a vector of file paths. +\item \code{selector_ignore_prefixes}: character vector of file prefixes to ignore +when discovering files in a directory. If invalid files can be excluded +by a common filename prefix this way, you can avoid the I/O cost of +\code{exclude_invalid_files}. Not valid when providing a vector of file paths +(but if you're providing the file list, you can filter invalid files +yourself). +}} + \item{...}{additional arguments passed to \code{dataset_factory()} when \code{sources} is a directory path/URI or vector of file paths/URIs, otherwise ignored. These may include \code{format} to indicate the file format, or other diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 5810320a057..f372e7b6e7f 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1454,71 +1454,37 @@ extern "C" SEXP _arrow_dataset___UnionDatasetFactory__Make(SEXP children_sexp){ // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___FileSystemDatasetFactory__Make0(const std::shared_ptr& fs, const std::vector& paths, const std::shared_ptr& format); -extern "C" SEXP _arrow_dataset___FileSystemDatasetFactory__Make0(SEXP fs_sexp, SEXP paths_sexp, SEXP format_sexp){ -BEGIN_CPP11 - arrow::r::Input&>::type fs(fs_sexp); - arrow::r::Input&>::type paths(paths_sexp); - arrow::r::Input&>::type format(format_sexp); - return cpp11::as_sexp(dataset___FileSystemDatasetFactory__Make0(fs, paths, format)); -END_CPP11 -} -#else -extern "C" SEXP _arrow_dataset___FileSystemDatasetFactory__Make0(SEXP fs_sexp, SEXP paths_sexp, SEXP format_sexp){ - Rf_error("Cannot call dataset___FileSystemDatasetFactory__Make0(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); -} -#endif - -// dataset.cpp -#if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___FileSystemDatasetFactory__Make2(const std::shared_ptr& fs, const std::shared_ptr& selector, const std::shared_ptr& format, const std::shared_ptr& partitioning); -extern "C" SEXP _arrow_dataset___FileSystemDatasetFactory__Make2(SEXP fs_sexp, SEXP selector_sexp, SEXP format_sexp, SEXP partitioning_sexp){ -BEGIN_CPP11 - arrow::r::Input&>::type fs(fs_sexp); - arrow::r::Input&>::type selector(selector_sexp); - arrow::r::Input&>::type format(format_sexp); - arrow::r::Input&>::type partitioning(partitioning_sexp); - return cpp11::as_sexp(dataset___FileSystemDatasetFactory__Make2(fs, selector, format, partitioning)); -END_CPP11 -} -#else -extern "C" SEXP _arrow_dataset___FileSystemDatasetFactory__Make2(SEXP fs_sexp, SEXP selector_sexp, SEXP format_sexp, SEXP partitioning_sexp){ - Rf_error("Cannot call dataset___FileSystemDatasetFactory__Make2(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); -} -#endif - -// dataset.cpp -#if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___FileSystemDatasetFactory__Make1(const std::shared_ptr& fs, const std::shared_ptr& selector, const std::shared_ptr& format); -extern "C" SEXP _arrow_dataset___FileSystemDatasetFactory__Make1(SEXP fs_sexp, SEXP selector_sexp, SEXP format_sexp){ +std::shared_ptr dataset___FileSystemDatasetFactory__Make(const std::shared_ptr& fs, const std::shared_ptr& selector, const std::shared_ptr& format, cpp11::list fsf_options); +extern "C" SEXP _arrow_dataset___FileSystemDatasetFactory__Make(SEXP fs_sexp, SEXP selector_sexp, SEXP format_sexp, SEXP fsf_options_sexp){ BEGIN_CPP11 arrow::r::Input&>::type fs(fs_sexp); arrow::r::Input&>::type selector(selector_sexp); arrow::r::Input&>::type format(format_sexp); - return cpp11::as_sexp(dataset___FileSystemDatasetFactory__Make1(fs, selector, format)); + arrow::r::Input::type fsf_options(fsf_options_sexp); + return cpp11::as_sexp(dataset___FileSystemDatasetFactory__Make(fs, selector, format, fsf_options)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___FileSystemDatasetFactory__Make1(SEXP fs_sexp, SEXP selector_sexp, SEXP format_sexp){ - Rf_error("Cannot call dataset___FileSystemDatasetFactory__Make1(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +extern "C" SEXP _arrow_dataset___FileSystemDatasetFactory__Make(SEXP fs_sexp, SEXP selector_sexp, SEXP format_sexp, SEXP fsf_options_sexp){ + Rf_error("Cannot call dataset___FileSystemDatasetFactory__Make(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr dataset___FileSystemDatasetFactory__Make3(const std::shared_ptr& fs, const std::shared_ptr& selector, const std::shared_ptr& format, const std::shared_ptr& factory); -extern "C" SEXP _arrow_dataset___FileSystemDatasetFactory__Make3(SEXP fs_sexp, SEXP selector_sexp, SEXP format_sexp, SEXP factory_sexp){ +std::shared_ptr dataset___FileSystemDatasetFactory__MakePaths(const std::shared_ptr& fs, const std::vector& paths, const std::shared_ptr& format, bool exclude_invalid_files); +extern "C" SEXP _arrow_dataset___FileSystemDatasetFactory__MakePaths(SEXP fs_sexp, SEXP paths_sexp, SEXP format_sexp, SEXP exclude_invalid_files_sexp){ BEGIN_CPP11 arrow::r::Input&>::type fs(fs_sexp); - arrow::r::Input&>::type selector(selector_sexp); + arrow::r::Input&>::type paths(paths_sexp); arrow::r::Input&>::type format(format_sexp); - arrow::r::Input&>::type factory(factory_sexp); - return cpp11::as_sexp(dataset___FileSystemDatasetFactory__Make3(fs, selector, format, factory)); + arrow::r::Input::type exclude_invalid_files(exclude_invalid_files_sexp); + return cpp11::as_sexp(dataset___FileSystemDatasetFactory__MakePaths(fs, paths, format, exclude_invalid_files)); END_CPP11 } #else -extern "C" SEXP _arrow_dataset___FileSystemDatasetFactory__Make3(SEXP fs_sexp, SEXP selector_sexp, SEXP format_sexp, SEXP factory_sexp){ - Rf_error("Cannot call dataset___FileSystemDatasetFactory__Make3(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +extern "C" SEXP _arrow_dataset___FileSystemDatasetFactory__MakePaths(SEXP fs_sexp, SEXP paths_sexp, SEXP format_sexp, SEXP exclude_invalid_files_sexp){ + Rf_error("Cannot call dataset___FileSystemDatasetFactory__MakePaths(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -5286,10 +5252,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___DatasetFactory__Finish2", (DL_FUNC) &_arrow_dataset___DatasetFactory__Finish2, 2}, { "_arrow_dataset___DatasetFactory__Inspect", (DL_FUNC) &_arrow_dataset___DatasetFactory__Inspect, 2}, { "_arrow_dataset___UnionDatasetFactory__Make", (DL_FUNC) &_arrow_dataset___UnionDatasetFactory__Make, 1}, - { "_arrow_dataset___FileSystemDatasetFactory__Make0", (DL_FUNC) &_arrow_dataset___FileSystemDatasetFactory__Make0, 3}, - { "_arrow_dataset___FileSystemDatasetFactory__Make2", (DL_FUNC) &_arrow_dataset___FileSystemDatasetFactory__Make2, 4}, - { "_arrow_dataset___FileSystemDatasetFactory__Make1", (DL_FUNC) &_arrow_dataset___FileSystemDatasetFactory__Make1, 3}, - { "_arrow_dataset___FileSystemDatasetFactory__Make3", (DL_FUNC) &_arrow_dataset___FileSystemDatasetFactory__Make3, 4}, + { "_arrow_dataset___FileSystemDatasetFactory__Make", (DL_FUNC) &_arrow_dataset___FileSystemDatasetFactory__Make, 4}, + { "_arrow_dataset___FileSystemDatasetFactory__MakePaths", (DL_FUNC) &_arrow_dataset___FileSystemDatasetFactory__MakePaths, 4}, { "_arrow_dataset___FileFormat__type_name", (DL_FUNC) &_arrow_dataset___FileFormat__type_name, 1}, { "_arrow_dataset___FileFormat__DefaultWriteOptions", (DL_FUNC) &_arrow_dataset___FileFormat__DefaultWriteOptions, 1}, { "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC) &_arrow_dataset___ParquetFileFormat__Make, 2}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 4ff30d9d941..aa44a0721e3 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -165,26 +165,29 @@ std::shared_ptr dataset___UnionDatasetFactory__Make( } // [[dataset::export]] -std::shared_ptr dataset___FileSystemDatasetFactory__Make0( - const std::shared_ptr& fs, const std::vector& paths, - const std::shared_ptr& format) { - // TODO(fsaintjacques): Make options configurable - auto options = ds::FileSystemFactoryOptions{}; - - return arrow::internal::checked_pointer_cast( - ValueOrStop(ds::FileSystemDatasetFactory::Make(fs, paths, format, options))); -} - -// [[dataset::export]] -std::shared_ptr dataset___FileSystemDatasetFactory__Make2( +std::shared_ptr dataset___FileSystemDatasetFactory__Make( const std::shared_ptr& fs, const std::shared_ptr& selector, - const std::shared_ptr& format, - const std::shared_ptr& partitioning) { - // TODO(fsaintjacques): Make options configurable + const std::shared_ptr& format, cpp11::list fsf_options) { auto options = ds::FileSystemFactoryOptions{}; - if (partitioning != nullptr) { - options.partitioning = partitioning; + if (!Rf_isNull(fsf_options["partitioning"])) { + options.partitioning = + cpp11::as_cpp>(fsf_options["partitioning"]); + } else if (!Rf_isNull(fsf_options["partitioning_factory"])) { + options.partitioning = cpp11::as_cpp>( + fsf_options["partitioning_factory"]); + } + if (!Rf_isNull(fsf_options["partition_base_dir"])) { + options.partition_base_dir = + cpp11::as_cpp(fsf_options["partition_base_dir"]); + } + if (!Rf_isNull(fsf_options["exclude_invalid_files"])) { + options.exclude_invalid_files = + cpp11::as_cpp(fsf_options["exclude_invalid_files"]); + } + if (!Rf_isNull(fsf_options["selector_ignore_prefixes"])) { + options.selector_ignore_prefixes = + cpp11::as_cpp>(fsf_options["selector_ignore_prefixes"]); } return arrow::internal::checked_pointer_cast( @@ -192,27 +195,16 @@ std::shared_ptr dataset___FileSystemDatasetFactory } // [[dataset::export]] -std::shared_ptr dataset___FileSystemDatasetFactory__Make1( - const std::shared_ptr& fs, - const std::shared_ptr& selector, - const std::shared_ptr& format) { - return dataset___FileSystemDatasetFactory__Make2(fs, selector, format, nullptr); -} - -// [[dataset::export]] -std::shared_ptr dataset___FileSystemDatasetFactory__Make3( - const std::shared_ptr& fs, - const std::shared_ptr& selector, - const std::shared_ptr& format, - const std::shared_ptr& factory) { - // TODO(fsaintjacques): Make options configurable +std::shared_ptr +dataset___FileSystemDatasetFactory__MakePaths( + const std::shared_ptr& fs, const std::vector& paths, + const std::shared_ptr& format, bool exclude_invalid_files) { + // exclude_invalid_files is the only meaningful option with a vector of paths auto options = ds::FileSystemFactoryOptions{}; - if (factory != nullptr) { - options.partitioning = factory; - } + options.exclude_invalid_files = exclude_invalid_files; return arrow::internal::checked_pointer_cast( - ValueOrStop(ds::FileSystemDatasetFactory::Make(fs, *selector, format, options))); + ValueOrStop(ds::FileSystemDatasetFactory::Make(fs, paths, format, options))); } // FileFormat, ParquetFileFormat, IpcFileFormat diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index c61226d5161..6a209f1cbc6 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -611,7 +611,9 @@ test_that("UnionDataset handles InMemoryDatasets", { ds1 <- InMemoryDataset$create(sub_df1) ds2 <- InMemoryDataset$create(sub_df2) ds <- c(ds1, ds2) - actual <- ds %>% collect(as_data_frame = FALSE) + actual <- ds %>% + arrange(x) %>% + compute() expected <- concat_tables(sub_df1, sub_df2) expect_equal(actual, expected) }) @@ -1021,3 +1023,196 @@ test_that("Filter parquet dataset with is.na ARROW-15312", { df %>% filter(is.na(z)) %>% collect() ) }) + +test_that("FileSystemFactoryOptions with DirectoryPartitioning", { + parent_path <- make_temp_dir() + ds_path <- file.path(parent_path, "a_subdir") + write_dataset(mtcars, ds_path, partitioning = "cyl", hive = FALSE) + expect_equal(dir(parent_path), "a_subdir") + expect_equal(dir(ds_path), c("4", "6", "8")) + ds <- open_dataset( + parent_path, + partitioning = "cyl", + factory_options = list(partition_base_dir = ds_path) + ) + expect_equal( + ds %>% + arrange(cyl) %>% + pull(cyl), + sort(mtcars$cyl) + ) + + # Add an invalid file + file.create(file.path(ds_path, "ASDFnotdata.pq")) + ds <- open_dataset( + parent_path, + partitioning = "cyl", + factory_options = list( + partition_base_dir = ds_path, + # open_dataset() fails if we don't exclude invalid + exclude_invalid_files = TRUE + ) + ) + expect_equal( + ds %>% + arrange(cyl) %>% + pull(cyl), + sort(mtcars$cyl) + ) + + ds <- open_dataset( + parent_path, + partitioning = "cyl", + factory_options = list( + partition_base_dir = ds_path, + # We can also ignore by prefix + selector_ignore_prefixes = "ASDF" + ) + ) + expect_equal( + ds %>% + arrange(cyl) %>% + pull(cyl), + sort(mtcars$cyl) + ) + + # Add a _$folder$ file (which Hadoop may write) + file.create(file.path(ds_path, "cyl_$folder$")) + ds <- open_dataset( + parent_path, + partitioning = "cyl", + factory_options = list( + partition_base_dir = ds_path, + # we can't ignore suffixes but we can exclude invalid + exclude_invalid_files = TRUE + ) + ) + expect_equal( + ds %>% + arrange(cyl) %>% + pull(cyl), + sort(mtcars$cyl) + ) + + # Now with a list of files + ds <- open_dataset( + dir(ds_path, recursive = TRUE, full.names = TRUE), + factory_options = list( + exclude_invalid_files = TRUE + ) + ) + expect_equal( + ds %>% + summarize(sum(gear)) %>% + collect() %>% + as.data.frame(), + mtcars %>% + summarize(sum(gear)) + ) +}) + +test_that("FileSystemFactoryOptions with HivePartitioning", { + parent_path <- make_temp_dir() + ds_path <- file.path(parent_path, "a_subdir") + write_dataset(mtcars, ds_path, partitioning = "cyl") + expect_equal(dir(parent_path), "a_subdir") + expect_equal(dir(ds_path), c("cyl=4", "cyl=6", "cyl=8")) + ds <- open_dataset(parent_path) + + # With Hive partitioning, partition_base_dir isn't needed + expect_setequal(names(ds), names(mtcars)) + expect_equal( + ds %>% + arrange(cyl) %>% + pull(cyl), + sort(mtcars$cyl) + ) + + # Add an invalid file + file.create(file.path(ds_path, "ASDFnotdata.pq")) + ds <- open_dataset( + parent_path, + factory_options = list( + # open_dataset() fails if we don't exclude invalid + exclude_invalid_files = TRUE + ) + ) + expect_equal( + ds %>% + arrange(cyl) %>% + pull(cyl), + sort(mtcars$cyl) + ) + + ds <- open_dataset( + parent_path, + factory_options = list( + # We can also ignore by prefix + selector_ignore_prefixes = "ASDF" + ) + ) + expect_equal( + ds %>% + arrange(cyl) %>% + pull(cyl), + sort(mtcars$cyl) + ) + + # Add a _$folder$ file (which Hadoop may write) + file.create(file.path(ds_path, "cyl_$folder$")) + ds <- open_dataset( + parent_path, + factory_options = list( + # we can't ignore suffixes but we can exclude invalid + exclude_invalid_files = TRUE + ) + ) + expect_equal( + ds %>% + arrange(cyl) %>% + pull(cyl), + sort(mtcars$cyl) + ) +}) + +test_that("FileSystemFactoryOptions input validation", { + expect_error( + open_dataset(dataset_dir, factory_options = list(other = TRUE)), + 'Invalid factory_options: "other"' + ) + expect_error( + open_dataset( + dataset_dir, + partitioning = "part", + factory_options = list(partition_base_dir = 42) + ), + "factory_options$partition_base_dir is not a string", + fixed = TRUE + ) + expect_error( + open_dataset(dataset_dir, factory_options = list(selector_ignore_prefixes = 42)), + "factory_options$selector_ignore_prefixes must be a character vector", + fixed = TRUE + ) + expect_error( + open_dataset(dataset_dir, factory_options = list(exclude_invalid_files = 42)), + "factory_options$exclude_invalid_files must be TRUE/FALSE", + fixed = TRUE + ) + + expect_warning( + open_dataset(hive_dir, factory_options = list(partition_base_dir = hive_dir)), + "factory_options$partition_base_dir is not meaningful for Hive partitioning", + fixed = TRUE + ) + + files <- dir(dataset_dir, full.names = TRUE, recursive = TRUE) + expect_error( + open_dataset(files, factory_options = list(selector_ignore_prefixes = "__")), + paste( + "Invalid factory_options for creating a Dataset from a vector", + 'of file paths: "selector_ignore_prefixes"' + ), + fixed = TRUE + ) +})