diff --git a/r/NAMESPACE b/r/NAMESPACE index 656f4619df2..9bb3e150179 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -3,6 +3,7 @@ S3method("!=",ArrowObject) S3method("$",RecordBatch) S3method("$",Schema) +S3method("$",SubTreeFileSystem) S3method("$",Table) S3method("==",ArrowObject) S3method("[",Array) @@ -188,6 +189,7 @@ export(TimestampParser) export(Type) export(UnionDataset) export(arrow_available) +export(arrow_with_s3) export(binary) export(bool) export(boolean) @@ -196,6 +198,7 @@ export(cast_options) export(chunked_array) export(codec_is_available) export(contains) +export(copy_files) export(cpu_count) export(dataset_factory) export(date32) @@ -249,6 +252,7 @@ export(read_parquet) export(read_schema) export(read_tsv_arrow) export(record_batch) +export(s3_bucket) export(schema) export(set_cpu_count) export(starts_with) diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index 613f2ac0d3f..bb32efd33d8 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -53,18 +53,28 @@ #' Is the C++ Arrow library available? #' -#' You won't generally need to call this function, but it's here in case it -#' helps for development purposes. +#' You won't generally need to call these function, but they're made available +#' for diagnostic purposes. #' @return `TRUE` or `FALSE` depending on whether the package was installed -#' with the Arrow C++ library. If `FALSE`, you'll need to install the C++ -#' library and then reinstall the R package. See [install_arrow()] for help. +#' with the Arrow C++ library (check with `arrow_available()`) or with S3 +#' support enabled (check with `arrow_with_s3()`). #' @export #' @examples #' arrow_available() +#' arrow_with_s3() +#' @seealso If either of these are `FALSE`, see +#' `vignette("install", package = "arrow")` for guidance on reinstalling the +#' package. arrow_available <- function() { .Call(`_arrow_available`) } +#' @rdname arrow_available +#' @export +arrow_with_s3 <- function() { + .Call(`_s3_available`) +} + option_use_threads <- function() { !is_false(getOption("arrow.use_threads")) } diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index a79cbe74fd8..7c037327660 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -956,6 +956,14 @@ fs___SubTreeFileSystem__create <- function(base_path, base_fs){ .Call(`_arrow_fs___SubTreeFileSystem__create` , base_path, base_fs) } +fs___SubTreeFileSystem__base_fs <- function(file_system){ + .Call(`_arrow_fs___SubTreeFileSystem__base_fs` , file_system) +} + +fs___SubTreeFileSystem__base_path <- function(file_system){ + .Call(`_arrow_fs___SubTreeFileSystem__base_path` , file_system) +} + fs___FileSystemFromUri <- function(path){ .Call(`_arrow_fs___FileSystemFromUri` , path) } @@ -968,6 +976,10 @@ fs___S3FileSystem__create <- function(anonymous, access_key, secret_key, session .Call(`_arrow_fs___S3FileSystem__create` , anonymous, access_key, secret_key, session_token, role_arn, session_name, external_id, load_frequency, region, endpoint_override, scheme, background_writes) } +fs___S3FileSystem__region <- function(fs){ + .Call(`_arrow_fs___S3FileSystem__region` , fs) +} + io___Readable__Read <- function(x, nbytes){ .Call(`_arrow_io___Readable__Read` , x, nbytes) } diff --git a/r/R/csv.R b/r/R/csv.R index a7da10b244b..24a0f11312f 100644 --- a/r/R/csv.R +++ b/r/R/csv.R @@ -77,7 +77,8 @@ #' `col_names`, and the CSV file has a header row that would otherwise be used #' to idenfity column names, you'll need to add `skip = 1` to skip that row. #' -#' @param file A character file name or URI, `raw` vector, or an Arrow input stream. +#' @param file A character file name or URI, `raw` vector, an Arrow input stream, +#' or a `FileSystem` with path (`SubTreeFileSystem`). #' If a file name, a memory-mapped Arrow [InputStream] will be opened and #' closed when finished; compression will be detected from the file extension #' and handled automatically. If an input stream is provided, it will be left @@ -123,8 +124,6 @@ #' parsing options provided in other arguments (e.g. `delim`, `quote`, etc.). #' @param convert_options see [file reader options][CsvReadOptions] #' @param read_options see [file reader options][CsvReadOptions] -#' @param filesystem A [FileSystem] where `file` can be found if it is a -#' string file path; default is the local file system #' @param as_data_frame Should the function return a `data.frame` (default) or #' an Arrow [Table]? #' @@ -156,7 +155,6 @@ read_delim_arrow <- function(file, parse_options = NULL, convert_options = NULL, read_options = NULL, - filesystem = NULL, as_data_frame = TRUE, timestamp_parsers = NULL) { if (inherits(schema, "Schema")) { @@ -186,7 +184,7 @@ read_delim_arrow <- function(file, } if (!inherits(file, "InputStream")) { - file <- make_readable_file(file, filesystem = filesystem) + file <- make_readable_file(file) on.exit(file$close()) } reader <- CsvTableReader$create( diff --git a/r/R/dataset-factory.R b/r/R/dataset-factory.R index 00039faed0f..8b3439c419d 100644 --- a/r/R/dataset-factory.R +++ b/r/R/dataset-factory.R @@ -44,9 +44,6 @@ DatasetFactory$create <- function(x, if (is_list_of(x, "DatasetFactory")) { return(shared_ptr(DatasetFactory, dataset___UnionDatasetFactory__Make(x))) } - if (!is.string(x)) { - stop("'x' must be a string or a list of DatasetFactory", call. = FALSE) - } path_and_fs <- get_path_and_filesystem(x, filesystem) selector <- FileSelector$create(path_and_fs$path, allow_not_found = FALSE, recursive = TRUE) diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index abeb0ce4393..b97a4e3b724 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -27,8 +27,8 @@ #' and `group_by()` operations done on the dataset. `filter()` queries will be #' applied to restrict written rows. #' Note that `select()`-ed columns may not be renamed. -#' @param path string path or URI to a directory to write to (directory will be -#' created if it does not exist) +#' @param path string path, URI, or `SubTreeFileSystem` referencing a directory +#' to write to (directory will be created if it does not exist) #' @param format file format to write the dataset to. Currently supported #' formats are "feather" (aka "ipc") and "parquet". Default is to write to the #' same format as `dataset`. @@ -41,8 +41,6 @@ #' will yield `"part-0.feather", ...`. #' @param hive_style logical: write partition segments as Hive-style #' (`key1=value1/key2=value2/file.ext`) or as just bare values. Default is `TRUE`. -#' @param filesystem A [FileSystem] where the dataset should be written if it is a -#' string file path; default is the local file system #' @param ... additional format-specific arguments. For available Parquet #' options, see [write_parquet()]. #' @return The input `dataset`, invisibly @@ -53,7 +51,6 @@ write_dataset <- function(dataset, partitioning = dplyr::group_vars(dataset), basename_template = paste0("part-{i}.", as.character(format)), hive_style = TRUE, - filesystem = NULL, ...) { if (inherits(dataset, "arrow_dplyr_query")) { # We can select a subset of columns but we can't rename them @@ -79,7 +76,7 @@ write_dataset <- function(dataset, } } - path_and_fs <- get_path_and_filesystem(path, filesystem) + path_and_fs <- get_path_and_filesystem(path) options <- FileWriteOptions$create(format, table = scanner, ...) dataset___Dataset__Write(options, path_and_fs$fs, path_and_fs$path, diff --git a/r/R/feather.R b/r/R/feather.R index e9656aa0901..52f8b59ece6 100644 --- a/r/R/feather.R +++ b/r/R/feather.R @@ -24,9 +24,8 @@ #' and the version 2 specification, which is the Apache Arrow IPC file format. #' #' @param x `data.frame`, [RecordBatch], or [Table] -#' @param sink A string file path, URI, or [OutputStream] -#' @param filesystem A [FileSystem] where `sink` should be written if it is a -#' string file path; default is the local file system +#' @param sink A string file path, URI, or [OutputStream], or path in a file +#' system (`SubTreeFileSystem`) #' @param version integer Feather file version. Version 2 is the current. #' Version 1 is the more limited legacy format. #' @param chunk_size For V2 files, the number of rows that each chunk of data @@ -54,7 +53,6 @@ #' @include arrow-package.R write_feather <- function(x, sink, - filesystem = NULL, version = 2, chunk_size = 65536L, compression = c("default", "lz4", "uncompressed", "zstd"), @@ -108,11 +106,10 @@ write_feather <- function(x, } assert_is(x, "Table") - if (is.string(sink)) { - sink <- make_output_stream(sink, filesystem) + if (!inherits(sink, "OutputStream")) { + sink <- make_output_stream(sink) on.exit(sink$close()) } - assert_is(sink, "OutputStream") ipc___WriteFeather__Table(sink, x, version, chunk_size, compression, compression_level) invisible(x_out) } @@ -144,9 +141,9 @@ write_feather <- function(x, #' # Can select columns #' df <- read_feather(tf, col_select = starts_with("d")) #' } -read_feather <- function(file, col_select = NULL, as_data_frame = TRUE, filesystem = NULL, ...) { +read_feather <- function(file, col_select = NULL, as_data_frame = TRUE, ...) { if (!inherits(file, "RandomAccessFile")) { - file <- make_readable_file(file, filesystem = filesystem) + file <- make_readable_file(file) on.exit(file$close()) } reader <- FeatherReader$create(file, ...) diff --git a/r/R/filesystem.R b/r/R/filesystem.R index 0e346ff248b..f8b32b8b312 100644 --- a/r/R/filesystem.R +++ b/r/R/filesystem.R @@ -185,6 +185,15 @@ FileSelector$create <- function(base_dir, allow_not_found = FALSE, recursive = F #' - `$OpenAppendStream(path)`: Open an [output stream][OutputStream] for #' appending. #' +#' @section Active bindings: +#' +#' - `$type_name`: string filesystem type name, such as "local", "s3", etc. +#' - `$region`: string AWS region, for `S3FileSystem` and `SubTreeFileSystem` +#' containing a `S3FileSystem` +#' - `$base_fs`: for `SubTreeFileSystem`, the `FileSystem` it contains +#' - `$base_path`: for `SubTreeFileSystem`, the path in `$base_fs` which is considered +#' root in this `SubTreeFileSystem`. +#' #' @usage NULL #' @format NULL #' @docType class @@ -263,6 +272,19 @@ FileSystem <- R6Class("FileSystem", inherit = ArrowObject, }, OpenAppendStream = function(path) { shared_ptr(OutputStream, fs___FileSystem__OpenAppendStream(self, clean_path_rel(path))) + }, + + # Friendlier R user interface + path = function(x) SubTreeFileSystem$create(x, self), + cd = function(x) SubTreeFileSystem$create(x, self), + ls = function(path = "", ...) { + selector <- FileSelector$create(path, ...) # ... for recursive = TRUE + infos <- self$GetFileInfo(selector) + map_chr(infos, ~.$path) + # TODO: add full.names argument like base::dir() (default right now is TRUE) + # TODO: see fs package for glob/regexp filtering + # TODO: verbose method that shows other attributes as df + # TODO: print methods for FileInfo, SubTreeFileSystem, S3FileSystem } ), active = list( @@ -279,6 +301,9 @@ FileSystem$from_uri <- function(uri) { get_path_and_filesystem <- function(x, filesystem = NULL) { # Wrapper around FileSystem$from_uri that handles local paths # and an optional explicit filesystem + if (inherits(x, "SubTreeFileSystem")) { + return(list(fs = x$base_fs, path = x$base_path)) + } assert_that(is.string(x)) if (is_url(x)) { if (!is.null(filesystem)) { @@ -293,7 +318,7 @@ get_path_and_filesystem <- function(x, filesystem = NULL) { } } -is_url <- function(x) grepl("://", x) +is_url <- function(x) is.string(x) && grepl("://", x) #' @usage NULL #' @format NULL @@ -309,7 +334,11 @@ LocalFileSystem$create <- function() { #' @rdname FileSystem #' @importFrom utils modifyList #' @export -S3FileSystem <- R6Class("S3FileSystem", inherit = FileSystem) +S3FileSystem <- R6Class("S3FileSystem", inherit = FileSystem, + active = list( + region = function() fs___S3FileSystem__region(self) + ) +) S3FileSystem$create <- function(anonymous = FALSE, ...) { args <- list2(...) if (anonymous) { @@ -357,15 +386,54 @@ default_s3_options <- list( background_writes = TRUE ) -arrow_with_s3 <- function() { - .Call(`_s3_available`) +#' Connect to an AWS S3 bucket +#' +#' `s3_bucket()` is a convenience function to create an `S3FileSystem` object +#' that automatically detects the bucket's AWS region and holding onto the its +#' relative path. +#' +#' @param bucket string S3 bucket name or path +#' @param ... Additional connection options, passed to `S3FileSystem$create()` +#' @return A `SubTreeFileSystem` containing an `S3FileSystem` and the bucket's +#' relative path. Note that this function's success does not guarantee that you +#' are authorized to access the bucket's contents. +#' @examples +#' if (arrow_with_s3()) { +#' bucket <- s3_bucket("ursa-labs-taxi-data") +#' } +#' @export +s3_bucket <- function(bucket, ...) { + assert_that(is.string(bucket)) + args <- list2(...) + + # Use FileSystemFromUri to detect the bucket's region + if (!is_url(bucket)) { + bucket <- paste0("s3://", bucket) + } + fs_and_path <- FileSystem$from_uri(bucket) + fs <- fs_and_path$fs + # If there are no additional S3Options, we can use that filesystem + # Otherwise, take the region that was detected and make a new fs with the args + if (length(args)) { + args$region <- fs$region + fs <- exec(S3FileSystem$create, !!!args) + } + # Return a subtree pointing at that bucket path + SubTreeFileSystem$create(fs_and_path$path, fs) } #' @usage NULL #' @format NULL #' @rdname FileSystem #' @export -SubTreeFileSystem <- R6Class("SubTreeFileSystem", inherit = FileSystem) +SubTreeFileSystem <- R6Class("SubTreeFileSystem", inherit = FileSystem, + active = list( + base_fs = function() { + shared_ptr(FileSystem, fs___SubTreeFileSystem__base_fs(self))$..dispatch() + }, + base_path = function() fs___SubTreeFileSystem__base_path(self) + ) +) SubTreeFileSystem$create <- function(base_path, base_fs = NULL) { fs_and_path <- get_path_and_filesystem(base_path, base_fs) shared_ptr( @@ -374,28 +442,48 @@ SubTreeFileSystem$create <- function(base_path, base_fs = NULL) { ) } -#' Copy files, including between FileSystems +#' @export +`$.SubTreeFileSystem` <- function(x, name, ...) { + # This is to allow delegating methods/properties to the base_fs + assert_that(is.string(name)) + if (name %in% ls(x)) { + get(name, x) + } else { + get(name, x$base_fs) + } +} + +#' Copy files between FileSystems #' -#' @param src_fs The FileSystem from which files will be copied. -#' @param src_sel A FileSelector indicating which files should be copied. -#' A string may also be passed, which is used as the base dir for recursive -#' selection. -#' @param dest_fs The FileSystem into which files will be copied. -#' @param dest_base_dir Where the copied files should be placed. -#' Directories will be created as necessary. +#' @param from A string path to a local directory or file, a URI, or a +#' `SubTreeFileSystem`. Files will be copied recursively from this path. +#' @param to A string path to a local directory or file, a URI, or a +#' `SubTreeFileSystem`. Directories will be created as necessary #' @param chunk_size The maximum size of block to read before flushing #' to the destination file. A larger chunk_size will use more memory while #' copying but may help accommodate high latency FileSystems. -copy_files <- function(src_fs = LocalFileSystem$create(), - src_sel, - dest_fs = LocalFileSystem$create(), - dest_base_dir, - chunk_size = 1024L * 1024L) { - if (!inherits(src_sel, "FileSelector")) { - src_sel <- FileSelector$create(src_sel, recursive = TRUE) - } - fs___CopyFiles(src_fs, src_sel, dest_fs, dest_base_dir, - chunk_size, option_use_threads()) +#' @return Nothing: called for side effects in the file system +#' @export +#' @examples +#' \dontrun{ +#' # Copy an S3 bucket's files to a local directory: +#' copy_files("s3://your-bucket-name", "local-directory") +#' # Using a FileSystem object +#' copy_files(s3_bucket("your-bucket-name"), "local-directory") +#' # Or go the other way, from local to S3 +#' copy_files("local-directory", s3_bucket("your-bucket-name")) +#' } +copy_files <- function(from, to, chunk_size = 1024L * 1024L) { + from <- get_path_and_filesystem(from) + to <- get_path_and_filesystem(to) + invisible(fs___CopyFiles( + from$fs, + FileSelector$create(from$path, recursive = TRUE), + to$fs, + to$path, + chunk_size, + option_use_threads() + )) } clean_path_abs <- function(path) { diff --git a/r/R/io.R b/r/R/io.R index 98b89f79bd7..b4dbbeb6a5f 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -230,6 +230,10 @@ mmap_open <- function(path, mode = c("read", "write", "readwrite")) { #' @return An `InputStream` or a subclass of one. #' @keywords internal make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem = NULL) { + if (inherits(file, "SubTreeFileSystem")) { + filesystem <- file$base_fs + file <- file$base_path + } if (is.string(file)) { if (is_url(file)) { fs_and_path <- FileSystem$from_uri(file) @@ -258,11 +262,15 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem } make_output_stream <- function(x, filesystem = NULL) { - if (is_url(x)) { + if (inherits(x, "SubTreeFileSystem")) { + filesystem <- x$base_fs + x <- x$base_path + } else if (is_url(x)) { fs_and_path <- FileSystem$from_uri(x) filesystem = fs_and_path$fs x <- fs_and_path$path } + assert_that(is.string(x)) if (is.null(filesystem)) { FileOutputStream$create(x) } else { diff --git a/r/R/ipc_stream.R b/r/R/ipc_stream.R index be21157e292..4f506f3332b 100644 --- a/r/R/ipc_stream.R +++ b/r/R/ipc_stream.R @@ -35,16 +35,15 @@ #' serialize data to a buffer. #' [RecordBatchWriter] for a lower-level interface. #' @export -write_ipc_stream <- function(x, sink, filesystem = NULL, ...) { +write_ipc_stream <- function(x, sink, ...) { x_out <- x # So we can return the data we got if (is.data.frame(x)) { x <- Table$create(x) } - if (is.string(sink)) { - sink <- make_output_stream(sink, filesystem) + if (!inherits(sink, "OutputStream")) { + sink <- make_output_stream(sink) on.exit(sink$close()) } - assert_is(sink, "OutputStream") writer <- RecordBatchStreamWriter$create(sink, x$schema) writer$write(x) @@ -84,14 +83,13 @@ write_to_raw <- function(x, format = c("stream", "file")) { #' the function that will read the desired IPC format (stream or file) since #' a file or `InputStream` may contain either. #' -#' @param file A character file name or URI, `raw` vector, or an Arrow input stream. +#' @param file A character file name or URI, `raw` vector, an Arrow input stream, +#' or a `FileSystem` with path (`SubTreeFileSystem`). #' If a file name or URI, an Arrow [InputStream] will be opened and #' closed when finished. If an input stream is provided, it will be left #' open. #' @param as_data_frame Should the function return a `data.frame` (default) or #' an Arrow [Table]? -#' @param filesystem A [FileSystem] where `file` can be found if it is a -#' string file path; default is the local file system #' @param ... extra parameters passed to `read_feather()`. #' #' @return A `data.frame` if `as_data_frame` is `TRUE` (the default), or an @@ -99,9 +97,9 @@ write_to_raw <- function(x, format = c("stream", "file")) { #' @seealso [read_feather()] for writing IPC files. [RecordBatchReader] for a #' lower-level interface. #' @export -read_ipc_stream <- function(file, as_data_frame = TRUE, filesystem = NULL, ...) { +read_ipc_stream <- function(file, as_data_frame = TRUE, ...) { if (!inherits(file, "InputStream")) { - file <- make_readable_file(file, filesystem = filesystem) + file <- make_readable_file(file) on.exit(file$close()) } diff --git a/r/R/json.R b/r/R/json.R index 18c2888588d..1cc39fa42d0 100644 --- a/r/R/json.R +++ b/r/R/json.R @@ -38,10 +38,9 @@ read_json_arrow <- function(file, col_select = NULL, as_data_frame = TRUE, - filesystem = NULL, ...) { if (!inherits(file, "InputStream")) { - file <- make_readable_file(file, filesystem = filesystem) + file <- make_readable_file(file) on.exit(file$close()) } tab <- JsonTableReader$create(file, ...)$Read() diff --git a/r/R/parquet.R b/r/R/parquet.R index c95154e6693..acf7c2cf737 100644 --- a/r/R/parquet.R +++ b/r/R/parquet.R @@ -39,10 +39,9 @@ read_parquet <- function(file, col_select = NULL, as_data_frame = TRUE, props = ParquetReaderProperties$create(), - filesystem = NULL, ...) { if (is.string(file)) { - file <- make_readable_file(file, filesystem = filesystem) + file <- make_readable_file(file) on.exit(file$close()) } reader <- ParquetFileReader$create(file, props = props, ...) @@ -71,9 +70,8 @@ read_parquet <- function(file, #' This function enables you to write Parquet files from R. #' #' @param x `data.frame`, [RecordBatch], or [Table] -#' @param sink A string file path, URI, or [OutputStream] -#' @param filesystem A [FileSystem] where `sink` should be written if it is a -#' string file path; default is the local file system +#' @param sink A string file path, URI, or [OutputStream], or path in a file +#' system (`SubTreeFileSystem`) #' @param chunk_size chunk size in number of rows. If NULL, the total number of rows is used. #' @param version parquet version, "1.0" or "2.0". Default "1.0". Numeric values #' are coerced to character. @@ -125,7 +123,6 @@ read_parquet <- function(file, #' @export write_parquet <- function(x, sink, - filesystem = NULL, chunk_size = NULL, # writer properties version = NULL, @@ -143,11 +140,9 @@ write_parquet <- function(x, x <- Table$create(x) } - if (is.string(sink)) { - sink <- make_output_stream(sink, filesystem) + if (!inherits(sink, "OutputStream")) { + sink <- make_output_stream(sink) on.exit(sink$close()) - } else if (!inherits(sink, "OutputStream")) { - abort("sink must be a file path or an OutputStream") } writer <- ParquetFileWriter$create( diff --git a/r/man/FileSystem.Rd b/r/man/FileSystem.Rd index 3ca945fa14a..2f3dcff670b 100644 --- a/r/man/FileSystem.Rd +++ b/r/man/FileSystem.Rd @@ -85,3 +85,15 @@ appending. } } +\section{Active bindings}{ + +\itemize{ +\item \verb{$type_name}: string filesystem type name, such as "local", "s3", etc. +\item \verb{$region}: string AWS region, for \code{S3FileSystem} and \code{SubTreeFileSystem} +containing a \code{S3FileSystem} +\item \verb{$base_fs}: for \code{SubTreeFileSystem}, the \code{FileSystem} it contains +\item \verb{$base_path}: for \code{SubTreeFileSystem}, the path in \verb{$base_fs} which is considered +root in this \code{SubTreeFileSystem}. +} +} + diff --git a/r/man/arrow_available.Rd b/r/man/arrow_available.Rd index 9e696d530d0..bca7e684654 100644 --- a/r/man/arrow_available.Rd +++ b/r/man/arrow_available.Rd @@ -2,19 +2,28 @@ % Please edit documentation in R/arrow-package.R \name{arrow_available} \alias{arrow_available} +\alias{arrow_with_s3} \title{Is the C++ Arrow library available?} \usage{ arrow_available() + +arrow_with_s3() } \value{ \code{TRUE} or \code{FALSE} depending on whether the package was installed -with the Arrow C++ library. If \code{FALSE}, you'll need to install the C++ -library and then reinstall the R package. See \code{\link[=install_arrow]{install_arrow()}} for help. +with the Arrow C++ library (check with \code{arrow_available()}) or with S3 +support enabled (check with \code{arrow_with_s3()}). } \description{ -You won't generally need to call this function, but it's here in case it -helps for development purposes. +You won't generally need to call these function, but they're made available +for diagnostic purposes. } \examples{ arrow_available() +arrow_with_s3() +} +\seealso{ +If either of these are \code{FALSE}, see +\code{vignette("install", package = "arrow")} for guidance on reinstalling the +package. } diff --git a/r/man/copy_files.Rd b/r/man/copy_files.Rd index f99c9fb489b..65edf56cb48 100644 --- a/r/man/copy_files.Rd +++ b/r/man/copy_files.Rd @@ -2,32 +2,34 @@ % Please edit documentation in R/filesystem.R \name{copy_files} \alias{copy_files} -\title{Copy files, including between FileSystems} +\title{Copy files between FileSystems} \usage{ -copy_files( - src_fs = LocalFileSystem$create(), - src_sel, - dest_fs = LocalFileSystem$create(), - dest_base_dir, - chunk_size = 1024L * 1024L -) +copy_files(from, to, chunk_size = 1024L * 1024L) } \arguments{ -\item{src_fs}{The FileSystem from which files will be copied.} +\item{from}{A string path to a local directory or file, a URI, or a +\code{SubTreeFileSystem}. Files will be copied recursively from this path.} -\item{src_sel}{A FileSelector indicating which files should be copied. -A string may also be passed, which is used as the base dir for recursive -selection.} - -\item{dest_fs}{The FileSystem into which files will be copied.} - -\item{dest_base_dir}{Where the copied files should be placed. -Directories will be created as necessary.} +\item{to}{A string path to a local directory or file, a URI, or a +\code{SubTreeFileSystem}. Directories will be created as necessary} \item{chunk_size}{The maximum size of block to read before flushing to the destination file. A larger chunk_size will use more memory while copying but may help accommodate high latency FileSystems.} } +\value{ +Nothing: called for side effects in the file system +} \description{ -Copy files, including between FileSystems +Copy files between FileSystems +} +\examples{ +\dontrun{ +# Copy an S3 bucket's files to a local directory: +copy_files("s3://your-bucket-name", "local-directory") +# Using a FileSystem object +copy_files(s3_bucket("your-bucket-name"), "local-directory") +# Or go the other way, from local to S3 +copy_files("local-directory", s3_bucket("your-bucket-name")) +} } diff --git a/r/man/read_delim_arrow.Rd b/r/man/read_delim_arrow.Rd index 335549e35a8..f676b9fc75d 100644 --- a/r/man/read_delim_arrow.Rd +++ b/r/man/read_delim_arrow.Rd @@ -23,7 +23,6 @@ read_delim_arrow( parse_options = NULL, convert_options = NULL, read_options = NULL, - filesystem = NULL, as_data_frame = TRUE, timestamp_parsers = NULL ) @@ -69,7 +68,8 @@ read_tsv_arrow( ) } \arguments{ -\item{file}{A character file name or URI, \code{raw} vector, or an Arrow input stream. +\item{file}{A character file name or URI, \code{raw} vector, an Arrow input stream, +or a \code{FileSystem} with path (\code{SubTreeFileSystem}). If a file name, a memory-mapped Arrow \link{InputStream} will be opened and closed when finished; compression will be detected from the file extension and handled automatically. If an input stream is provided, it will be left @@ -125,9 +125,6 @@ parsing options provided in other arguments (e.g. \code{delim}, \code{quote}, et \item{read_options}{see \link[=CsvReadOptions]{file reader options}} -\item{filesystem}{A \link{FileSystem} where \code{file} can be found if it is a -string file path; default is the local file system} - \item{as_data_frame}{Should the function return a \code{data.frame} (default) or an Arrow \link{Table}?} diff --git a/r/man/read_feather.Rd b/r/man/read_feather.Rd index 99c1845e2aa..c5467c3a22f 100644 --- a/r/man/read_feather.Rd +++ b/r/man/read_feather.Rd @@ -4,16 +4,11 @@ \alias{read_feather} \title{Read a Feather file} \usage{ -read_feather( - file, - col_select = NULL, - as_data_frame = TRUE, - filesystem = NULL, - ... -) +read_feather(file, col_select = NULL, as_data_frame = TRUE, ...) } \arguments{ -\item{file}{A character file name or URI, \code{raw} vector, or an Arrow input stream. +\item{file}{A character file name or URI, \code{raw} vector, an Arrow input stream, +or a \code{FileSystem} with path (\code{SubTreeFileSystem}). If a file name or URI, an Arrow \link{InputStream} will be opened and closed when finished. If an input stream is provided, it will be left open.} @@ -26,9 +21,6 @@ of columns, as used in \code{dplyr::select()}.} \item{as_data_frame}{Should the function return a \code{data.frame} (default) or an Arrow \link{Table}?} -\item{filesystem}{A \link{FileSystem} where \code{file} can be found if it is a -string file path; default is the local file system} - \item{...}{additional parameters, passed to \link[=FeatherReader]{FeatherReader$create()}} } \value{ diff --git a/r/man/read_ipc_stream.Rd b/r/man/read_ipc_stream.Rd index 020f605aa65..d4dd7831421 100644 --- a/r/man/read_ipc_stream.Rd +++ b/r/man/read_ipc_stream.Rd @@ -7,10 +7,11 @@ \usage{ read_arrow(file, ...) -read_ipc_stream(file, as_data_frame = TRUE, filesystem = NULL, ...) +read_ipc_stream(file, as_data_frame = TRUE, ...) } \arguments{ -\item{file}{A character file name or URI, \code{raw} vector, or an Arrow input stream. +\item{file}{A character file name or URI, \code{raw} vector, an Arrow input stream, +or a \code{FileSystem} with path (\code{SubTreeFileSystem}). If a file name or URI, an Arrow \link{InputStream} will be opened and closed when finished. If an input stream is provided, it will be left open.} @@ -19,9 +20,6 @@ open.} \item{as_data_frame}{Should the function return a \code{data.frame} (default) or an Arrow \link{Table}?} - -\item{filesystem}{A \link{FileSystem} where \code{file} can be found if it is a -string file path; default is the local file system} } \value{ A \code{data.frame} if \code{as_data_frame} is \code{TRUE} (the default), or an diff --git a/r/man/read_json_arrow.Rd b/r/man/read_json_arrow.Rd index d835ec86e6e..8894a5d389e 100644 --- a/r/man/read_json_arrow.Rd +++ b/r/man/read_json_arrow.Rd @@ -4,16 +4,11 @@ \alias{read_json_arrow} \title{Read a JSON file} \usage{ -read_json_arrow( - file, - col_select = NULL, - as_data_frame = TRUE, - filesystem = NULL, - ... -) +read_json_arrow(file, col_select = NULL, as_data_frame = TRUE, ...) } \arguments{ -\item{file}{A character file name or URI, \code{raw} vector, or an Arrow input stream. +\item{file}{A character file name or URI, \code{raw} vector, an Arrow input stream, +or a \code{FileSystem} with path (\code{SubTreeFileSystem}). If a file name, a memory-mapped Arrow \link{InputStream} will be opened and closed when finished; compression will be detected from the file extension and handled automatically. If an input stream is provided, it will be left @@ -27,9 +22,6 @@ of columns, as used in \code{dplyr::select()}.} \item{as_data_frame}{Should the function return a \code{data.frame} (default) or an Arrow \link{Table}?} -\item{filesystem}{A \link{FileSystem} where \code{file} can be found if it is a -string file path; default is the local file system} - \item{...}{Additional options, passed to \code{json_table_reader()}} } \value{ diff --git a/r/man/read_parquet.Rd b/r/man/read_parquet.Rd index 89cae7809f0..ec36734e599 100644 --- a/r/man/read_parquet.Rd +++ b/r/man/read_parquet.Rd @@ -9,12 +9,12 @@ read_parquet( col_select = NULL, as_data_frame = TRUE, props = ParquetReaderProperties$create(), - filesystem = NULL, ... ) } \arguments{ -\item{file}{A character file name or URI, \code{raw} vector, or an Arrow input stream. +\item{file}{A character file name or URI, \code{raw} vector, an Arrow input stream, +or a \code{FileSystem} with path (\code{SubTreeFileSystem}). If a file name or URI, an Arrow \link{InputStream} will be opened and closed when finished. If an input stream is provided, it will be left open.} @@ -29,9 +29,6 @@ an Arrow \link{Table}?} \item{props}{\link{ParquetReaderProperties}} -\item{filesystem}{A \link{FileSystem} where \code{file} can be found if it is a -string file path; default is the local file system} - \item{...}{Additional arguments passed to \code{ParquetFileReader$create()}} } \value{ diff --git a/r/man/s3_bucket.Rd b/r/man/s3_bucket.Rd new file mode 100644 index 00000000000..7791e9bc5f2 --- /dev/null +++ b/r/man/s3_bucket.Rd @@ -0,0 +1,28 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/filesystem.R +\name{s3_bucket} +\alias{s3_bucket} +\title{Connect to an AWS S3 bucket} +\usage{ +s3_bucket(bucket, ...) +} +\arguments{ +\item{bucket}{string S3 bucket name or path} + +\item{...}{Additional connection options, passed to \code{S3FileSystem$create()}} +} +\value{ +A \code{SubTreeFileSystem} containing an \code{S3FileSystem} and the bucket's +relative path. Note that this function's success does not guarantee that you +are authorized to access the bucket's contents. +} +\description{ +\code{s3_bucket()} is a convenience function to create an \code{S3FileSystem} object +that automatically detects the bucket's AWS region and holding onto the its +relative path. +} +\examples{ +if (arrow_with_s3()) { + bucket <- s3_bucket("ursa-labs-taxi-data") +} +} diff --git a/r/man/write_dataset.Rd b/r/man/write_dataset.Rd index e12c4287266..cb3853a730f 100644 --- a/r/man/write_dataset.Rd +++ b/r/man/write_dataset.Rd @@ -11,7 +11,6 @@ write_dataset( partitioning = dplyr::group_vars(dataset), basename_template = paste0("part-{i}.", as.character(format)), hive_style = TRUE, - filesystem = NULL, ... ) } @@ -23,8 +22,8 @@ and \code{group_by()} operations done on the dataset. \code{filter()} queries wi applied to restrict written rows. Note that \code{select()}-ed columns may not be renamed.} -\item{path}{string path or URI to a directory to write to (directory will be -created if it does not exist)} +\item{path}{string path, URI, or \code{SubTreeFileSystem} referencing a directory +to write to (directory will be created if it does not exist)} \item{format}{file format to write the dataset to. Currently supported formats are "feather" (aka "ipc") and "parquet". Default is to write to the @@ -42,9 +41,6 @@ will yield \verb{"part-0.feather", ...}.} \item{hive_style}{logical: write partition segments as Hive-style (\code{key1=value1/key2=value2/file.ext}) or as just bare values. Default is \code{TRUE}.} -\item{filesystem}{A \link{FileSystem} where the dataset should be written if it is a -string file path; default is the local file system} - \item{...}{additional format-specific arguments. For available Parquet options, see \code{\link[=write_parquet]{write_parquet()}}.} } diff --git a/r/man/write_feather.Rd b/r/man/write_feather.Rd index 60b31cd8019..277c8197475 100644 --- a/r/man/write_feather.Rd +++ b/r/man/write_feather.Rd @@ -7,7 +7,6 @@ write_feather( x, sink, - filesystem = NULL, version = 2, chunk_size = 65536L, compression = c("default", "lz4", "uncompressed", "zstd"), @@ -17,10 +16,8 @@ write_feather( \arguments{ \item{x}{\code{data.frame}, \link{RecordBatch}, or \link{Table}} -\item{sink}{A string file path, URI, or \link{OutputStream}} - -\item{filesystem}{A \link{FileSystem} where \code{sink} should be written if it is a -string file path; default is the local file system} +\item{sink}{A string file path, URI, or \link{OutputStream}, or path in a file +system (\code{SubTreeFileSystem})} \item{version}{integer Feather file version. Version 2 is the current. Version 1 is the more limited legacy format.} diff --git a/r/man/write_ipc_stream.Rd b/r/man/write_ipc_stream.Rd index 5b73a911d10..4f742ce9178 100644 --- a/r/man/write_ipc_stream.Rd +++ b/r/man/write_ipc_stream.Rd @@ -7,17 +7,15 @@ \usage{ write_arrow(x, sink, ...) -write_ipc_stream(x, sink, filesystem = NULL, ...) +write_ipc_stream(x, sink, ...) } \arguments{ \item{x}{\code{data.frame}, \link{RecordBatch}, or \link{Table}} -\item{sink}{A string file path, URI, or \link{OutputStream}} +\item{sink}{A string file path, URI, or \link{OutputStream}, or path in a file +system (\code{SubTreeFileSystem})} \item{...}{extra parameters passed to \code{write_feather()}.} - -\item{filesystem}{A \link{FileSystem} where \code{sink} should be written if it is a -string file path; default is the local file system} } \value{ \code{x}, invisibly. diff --git a/r/man/write_parquet.Rd b/r/man/write_parquet.Rd index bb70502c2ba..f0adf942396 100644 --- a/r/man/write_parquet.Rd +++ b/r/man/write_parquet.Rd @@ -7,7 +7,6 @@ write_parquet( x, sink, - filesystem = NULL, chunk_size = NULL, version = NULL, compression = default_parquet_compression(), @@ -23,10 +22,8 @@ write_parquet( \arguments{ \item{x}{\code{data.frame}, \link{RecordBatch}, or \link{Table}} -\item{sink}{A string file path, URI, or \link{OutputStream}} - -\item{filesystem}{A \link{FileSystem} where \code{sink} should be written if it is a -string file path; default is the local file system} +\item{sink}{A string file path, URI, or \link{OutputStream}, or path in a file +system (\code{SubTreeFileSystem})} \item{chunk_size}{chunk size in number of rows. If NULL, the total number of rows is used.} diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index d2f44654c26..d0273a5a1ef 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -3714,6 +3714,36 @@ extern "C" SEXP _arrow_fs___SubTreeFileSystem__create(SEXP base_path_sexp, SEXP } #endif +// filesystem.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr fs___SubTreeFileSystem__base_fs(const std::shared_ptr& file_system); +extern "C" SEXP _arrow_fs___SubTreeFileSystem__base_fs(SEXP file_system_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type file_system(file_system_sexp); + return cpp11::as_sexp(fs___SubTreeFileSystem__base_fs(file_system)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_fs___SubTreeFileSystem__base_fs(SEXP file_system_sexp){ + Rf_error("Cannot call fs___SubTreeFileSystem__base_fs(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + +// filesystem.cpp +#if defined(ARROW_R_WITH_ARROW) +std::string fs___SubTreeFileSystem__base_path(const std::shared_ptr& file_system); +extern "C" SEXP _arrow_fs___SubTreeFileSystem__base_path(SEXP file_system_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type file_system(file_system_sexp); + return cpp11::as_sexp(fs___SubTreeFileSystem__base_path(file_system)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_fs___SubTreeFileSystem__base_path(SEXP file_system_sexp){ + Rf_error("Cannot call fs___SubTreeFileSystem__base_path(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // filesystem.cpp #if defined(ARROW_R_WITH_ARROW) cpp11::writable::list fs___FileSystemFromUri(const std::string& path); @@ -3776,6 +3806,21 @@ extern "C" SEXP _arrow_fs___S3FileSystem__create(SEXP anonymous_sexp, SEXP acces } #endif +// filesystem.cpp +#if defined(ARROW_R_WITH_S3) +std::string fs___S3FileSystem__region(const std::shared_ptr& fs); +extern "C" SEXP _arrow_fs___S3FileSystem__region(SEXP fs_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type fs(fs_sexp); + return cpp11::as_sexp(fs___S3FileSystem__region(fs)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_fs___S3FileSystem__region(SEXP fs_sexp){ + Rf_error("Cannot call fs___S3FileSystem__region(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // io.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr io___Readable__Read(const std::shared_ptr& x, int64_t nbytes); @@ -6485,9 +6530,12 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_fs___FileSystem__type_name", (DL_FUNC) &_arrow_fs___FileSystem__type_name, 1}, { "_arrow_fs___LocalFileSystem__create", (DL_FUNC) &_arrow_fs___LocalFileSystem__create, 0}, { "_arrow_fs___SubTreeFileSystem__create", (DL_FUNC) &_arrow_fs___SubTreeFileSystem__create, 2}, + { "_arrow_fs___SubTreeFileSystem__base_fs", (DL_FUNC) &_arrow_fs___SubTreeFileSystem__base_fs, 1}, + { "_arrow_fs___SubTreeFileSystem__base_path", (DL_FUNC) &_arrow_fs___SubTreeFileSystem__base_path, 1}, { "_arrow_fs___FileSystemFromUri", (DL_FUNC) &_arrow_fs___FileSystemFromUri, 1}, { "_arrow_fs___CopyFiles", (DL_FUNC) &_arrow_fs___CopyFiles, 6}, { "_arrow_fs___S3FileSystem__create", (DL_FUNC) &_arrow_fs___S3FileSystem__create, 12}, + { "_arrow_fs___S3FileSystem__region", (DL_FUNC) &_arrow_fs___S3FileSystem__region, 1}, { "_arrow_io___Readable__Read", (DL_FUNC) &_arrow_io___Readable__Read, 2}, { "_arrow_io___InputStream__Close", (DL_FUNC) &_arrow_io___InputStream__Close, 1}, { "_arrow_io___OutputStream__Close", (DL_FUNC) &_arrow_io___OutputStream__Close, 1}, diff --git a/r/src/filesystem.cpp b/r/src/filesystem.cpp index de785084143..53959804fe8 100644 --- a/r/src/filesystem.cpp +++ b/r/src/filesystem.cpp @@ -221,6 +221,18 @@ std::shared_ptr fs___SubTreeFileSystem__create( return std::make_shared(base_path, base_fs); } +// [[arrow::export]] +std::shared_ptr fs___SubTreeFileSystem__base_fs( + const std::shared_ptr& file_system) { + return file_system->base_fs(); +} + +// [[arrow::export]] +std::string fs___SubTreeFileSystem__base_path( + const std::shared_ptr& file_system) { + return file_system->base_path(); +} + // [[arrow::export]] cpp11::writable::list fs___FileSystemFromUri(const std::string& path) { using cpp11::literals::operator"" _nm; @@ -286,4 +298,9 @@ std::shared_ptr fs___S3FileSystem__create( return ValueOrStop(fs::S3FileSystem::Make(s3_opts)); } +// [[s3::export]] +std::string fs___S3FileSystem__region(const std::shared_ptr& fs) { + return fs->region(); +} + #endif diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index f33f8ad1def..074fcd35229 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -371,7 +371,7 @@ test_that("Creating UnionDataset", { ) # Confirm c() method error handling - expect_error(c(ds1, 42), "'x' must be a string or a list of DatasetFactory") + expect_error(c(ds1, 42), "string") }) test_that("InMemoryDataset", { diff --git a/r/tests/testthat/test-filesystem.R b/r/tests/testthat/test-filesystem.R index 255465d2b10..ff1f3feca3d 100644 --- a/r/tests/testthat/test-filesystem.R +++ b/r/tests/testthat/test-filesystem.R @@ -80,10 +80,14 @@ test_that("SubTreeFilesystem", { DESCRIPTION <- system.file("DESCRIPTION", package = "arrow") file.copy(DESCRIPTION, file.path(td, "DESCRIPTION")) - local_fs <- LocalFileSystem$create() - st_fs <- SubTreeFileSystem$create(td, local_fs) + st_fs <- SubTreeFileSystem$create(td) expect_is(st_fs, "SubTreeFileSystem") expect_is(st_fs, "FileSystem") + expect_is(st_fs$base_fs, "LocalFileSystem") + + # FIXME windows has a trailing slash for one but not the other + # expect_identical(normalizePath(st_fs$base_path), normalizePath(td)) + st_fs$CreateDir("test") st_fs$CopyFile("DESCRIPTION", "DESC.txt") infos <- st_fs$GetFileInfo(c("DESCRIPTION", "test", "nope", "DESC.txt")) @@ -93,6 +97,7 @@ test_that("SubTreeFilesystem", { expect_equal(infos[[4L]]$type, FileType$File) expect_equal(infos[[4L]]$extension(), "txt") + local_fs <- LocalFileSystem$create() local_fs$DeleteDirContents(td) infos <- st_fs$GetFileInfo(c("DESCRIPTION", "test", "nope", "DESC.txt")) expect_equal(infos[[1L]]$type, FileType$NotFound) @@ -129,6 +134,7 @@ test_that("FileSystem$from_uri", { skip_if_not_available("s3") fs_and_path <- FileSystem$from_uri("s3://ursa-labs-taxi-data") expect_is(fs_and_path$fs, "S3FileSystem") + expect_identical(fs_and_path$fs$region, "us-east-2") }) test_that("SubTreeFileSystem$create() with URI", { @@ -144,3 +150,14 @@ test_that("S3FileSystem", { s3fs <- S3FileSystem$create() expect_is(s3fs, "S3FileSystem") }) + +test_that("s3_bucket", { + skip_on_cran() + skip_if_not_available("s3") + bucket <- s3_bucket("ursa-labs-r-test") + expect_is(bucket, "SubTreeFileSystem") + expect_is(bucket$base_fs, "S3FileSystem") + expect_identical(bucket$region, "us-west-2") + skip_on_os("windows") # FIXME + expect_identical(bucket$base_path, "ursa-labs-r-test/") +}) diff --git a/r/tests/testthat/test-s3-minio.R b/r/tests/testthat/test-s3-minio.R index 5d9213f4f21..01d2d057e28 100644 --- a/r/tests/testthat/test-s3-minio.R +++ b/r/tests/testthat/test-s3-minio.R @@ -43,9 +43,9 @@ if (arrow_with_s3() && process_is_running("minio server")) { # If minio isn't running, this will hang for a few seconds and fail with a # curl timeout, causing `run_these` to be set to FALSE and skipping the tests fs$CreateDir(now) - # Clean up when we're all done - on.exit(fs$DeleteDir(now)) }) + # Clean up when we're all done + on.exit(fs$DeleteDir(now)) test_that("read/write Feather on minio", { write_feather(example_data, minio_uri("test.feather")) @@ -53,23 +53,23 @@ if (arrow_with_s3() && process_is_running("minio server")) { }) test_that("read/write Feather by filesystem, not URI", { - write_feather(example_data, minio_path("test2.feather"), filesystem = fs) + write_feather(example_data, fs$path(minio_path("test2.feather"))) expect_identical( - read_feather(minio_path("test2.feather"), filesystem = fs), + read_feather(fs$path(minio_path("test2.feather"))), example_data ) }) test_that("read/write stream", { - write_ipc_stream(example_data, minio_path("test3.ipc"), filesystem = fs) + write_ipc_stream(example_data, fs$path(minio_path("test3.ipc"))) expect_identical( - read_ipc_stream(minio_path("test3.ipc"), filesystem = fs), + read_ipc_stream(fs$path(minio_path("test3.ipc"))), example_data ) }) test_that("read/write Parquet on minio", { - write_parquet(example_data, minio_uri("test.parquet")) + write_parquet(example_data, fs$path(minio_uri("test.parquet"))) expect_identical(read_parquet(minio_uri("test.parquet")), example_data) }) @@ -99,17 +99,17 @@ if (arrow_with_s3() && process_is_running("minio server")) { test_that("write_parquet with filesystem arg", { fs$CreateDir(minio_path("hive_dir", "group=1", "other=xxx")) fs$CreateDir(minio_path("hive_dir", "group=2", "other=yyy")) - expect_length(fs$GetFileInfo(FileSelector$create(minio_path("hive_dir"))), 2) - write_parquet(df1, minio_path("hive_dir", "group=1", "other=xxx", "file1.parquet"), filesystem = fs) - write_parquet(df2, minio_path("hive_dir", "group=2", "other=yyy", "file2.parquet"), filesystem = fs) + expect_length(fs$ls(minio_path("hive_dir")), 2) + write_parquet(df1, fs$path(minio_path("hive_dir", "group=1", "other=xxx", "file1.parquet"))) + write_parquet(df2, fs$path(minio_path("hive_dir", "group=2", "other=yyy", "file2.parquet"))) expect_identical( - read_parquet(minio_path("hive_dir", "group=1", "other=xxx", "file1.parquet"), filesystem = fs), + read_parquet(fs$path(minio_path("hive_dir", "group=1", "other=xxx", "file1.parquet"))), df1 ) }) test_that("open_dataset with fs", { - ds <- open_dataset(minio_path("hive_dir"), filesystem = fs) + ds <- open_dataset(fs$path(minio_path("hive_dir"))) expect_identical( ds %>% select(dbl, lgl) %>% collect(), rbind(df1[, c("dbl", "lgl")], df2[, c("dbl", "lgl")]) @@ -117,11 +117,37 @@ if (arrow_with_s3() && process_is_running("minio server")) { }) test_that("write_dataset with fs", { - ds <- open_dataset(minio_path("hive_dir"), filesystem = fs) - write_dataset(ds, minio_path("new_dataset_dir"), filesystem = fs) - expect_length(fs$GetFileInfo(FileSelector$create(minio_path("new_dataset_dir"))), 1) + ds <- open_dataset(fs$path(minio_path("hive_dir"))) + write_dataset(ds, fs$path(minio_path("new_dataset_dir"))) + expect_length(fs$ls(minio_path("new_dataset_dir")), 1) }) + make_temp_dir <- function() { + path <- tempfile() + dir.create(path) + normalizePath(path, winslash = "/") + } + + test_that("Let's test copy_files too", { + td <- make_temp_dir() + copy_files(minio_uri("hive_dir"), td) + expect_length(dir(td), 2) + ds <- open_dataset(td) + expect_identical( + ds %>% select(dbl, lgl) %>% collect(), + rbind(df1[, c("dbl", "lgl")], df2[, c("dbl", "lgl")]) + ) + + # Let's copy the other way and use a SubTreeFileSystem rather than URI + copy_files(td, fs$path(minio_path("hive_dir2"))) + ds2 <- open_dataset(fs$path(minio_path("hive_dir2"))) + expect_identical( + ds2 %>% select(dbl, lgl) %>% collect(), + rbind(df1[, c("dbl", "lgl")], df2[, c("dbl", "lgl")]) + ) + }) + + test_that("S3FileSystem input validation", { expect_error( S3FileSystem$create(access_key = "foo"), diff --git a/r/tests/testthat/test-s3.R b/r/tests/testthat/test-s3.R index 9dfadfdfb58..33c249547a6 100644 --- a/r/tests/testthat/test-s3.R +++ b/r/tests/testthat/test-s3.R @@ -23,8 +23,8 @@ run_these <- tryCatch({ !identical(Sys.getenv("AWS_ACCESS_KEY_ID"), "") && !identical(Sys.getenv("AWS_SECRET_ACCESS_KEY"), "")) { # See if we have access to the test bucket - bucket <- FileSystem$from_uri("s3://ursa-labs-r-test?region=us-west-2") - bucket$fs$GetFileInfo(bucket$path) + bucket <- s3_bucket("ursa-labs-r-test") + bucket$GetFileInfo("") TRUE } else { FALSE @@ -38,7 +38,7 @@ bucket_uri <- function(..., bucket = "s3://ursa-labs-r-test/%s?region=us-west-2" if (run_these) { now <- as.numeric(Sys.time()) - on.exit(bucket$fs$DeleteDir(paste0("ursa-labs-r-test/", now))) + on.exit(bucket$DeleteDir(now)) test_that("read/write Feather on S3", { write_feather(example_data, bucket_uri(now, "test.feather")) diff --git a/r/vignettes/dataset.Rmd b/r/vignettes/dataset.Rmd index d9c90261f82..06653eef9d8 100644 --- a/r/vignettes/dataset.Rmd +++ b/r/vignettes/dataset.Rmd @@ -20,15 +20,35 @@ and what is on the immediate development roadmap. The [New York City taxi trip record data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) is widely used in big data exercises and competitions. For demonstration purposes, we have hosted a Parquet-formatted version -of about 10 years of the trip data in a public S3 bucket. +of about 10 years of the trip data in a public AWS S3 bucket. The total file size is around 37 gigabytes, even in the efficient Parquet file format. That's bigger than memory on most people's computers, so we can't just read it all in and stack it into a single data frame. -In a future release, you'll be able to point your R session at S3 and query -the dataset from there. For now, datasets need to be on your local file system. -To download the files, +In Windows and macOS binary packages, S3 support is included. +On Linux when installing from source, S3 support is not enabled by default, +and it has additional system requirements. +See `vignette("install", package = "arrow")` for details. +To see if your `arrow` installation has S3 support, run + +```{r} +arrow::arrow_with_s3() +``` + +Even with S3 support enabled network, speed will be a bottleneck unless your +machine is located in the same AWS region as the data. So, for this vignette, +we assume that the NYC taxi dataset has been downloaded locally in a "nyc-taxi" +directory. + +If your `arrow` build has S3 support, you can sync the data locally with: + +```{r, eval = FALSE} +arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi") +``` + +If your `arrow` build doesn't have S3 support, you can download the files +with some additional code: ```{r, eval = FALSE} bucket <- "https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com" @@ -55,7 +75,7 @@ for (year in 2009:2019) { } ``` -Note that the vignette will not execute that code chunk: if you want to run +Note that these download steps in the vignette are not executed: if you want to run with live data, you'll have to do it yourself separately. Given the size, if you're running this locally and don't have a fast connection, feel free to grab only a year or two of data. @@ -279,8 +299,7 @@ This would be useful, in our taxi dataset example, if you wanted to keep Another feature of Datasets is that they can be composed of multiple data sources. That is, you may have a directory of partitioned Parquet files in one location, and in another directory, files that haven't been partitioned. -In the future, when there is support for cloud storage and other file formats, -this would mean you could point to an S3 bucked of Parquet data and a directory +Or, you could point to an S3 bucket of Parquet data and a directory of CSVs on the local file system and query them together as a single dataset. To create a multi-source dataset, provide a list of datasets to `open_dataset()` instead of a file path, or simply concatenate them like `big_dataset <- c(ds1, ds2)`. @@ -345,13 +364,13 @@ partitions are because they can be read from the file paths. (To instead write bare values for partition segments, i.e. `1` rather than `payment_type=1`, call `write_dataset()` with `hive_style = FALSE`.) -Perhaps, though, `payment_type == 1` is the only data we care about for our -current work, and we just want to drop the rest and have a smaller working set. +Perhaps, though, `payment_type == 3` is the only data we ever care about, +and we just want to drop the rest and have a smaller working set. For this, we can `filter()` them out when writing: ```r ds %>% - filter(payment_type == 1) %>% + filter(payment_type == 3) %>% write_dataset("nyc-taxi/feather", format = "feather") ``` diff --git a/r/vignettes/fs.Rmd b/r/vignettes/fs.Rmd index d211a35a6b4..5d699c49df0 100644 --- a/r/vignettes/fs.Rmd +++ b/r/vignettes/fs.Rmd @@ -44,39 +44,43 @@ the cost of reading the data over the network should be much lower. ## Creating a FileSystem object -Another way to connect to S3 is to create an `S3FileSystem` object once and pass -that to the read/write functions. This may be a convenience when dealing with +Another way to connect to S3 is to create a `FileSystem` object once and pass +that to the read/write functions. +`S3FileSystem` objects can be created with the `s3_bucket()` function, which +automatically detects the bucket's AWS region. Additionally, the resulting +`FileSystem` will consider paths relative to the bucket's path (so for example +you don't need to prefix the bucket path when listing a directory). +This may be convenient when dealing with long URIs, and it's necessary for some options and authentication methods that aren't supported in the URI format. +With a `FileSystem` object, we can point to specific files in it with the `$path()` method. In the previous example, this would look like: ```r -fs <- S3FileSystem$create(region = "us-east-2") -df <- read_parquet("ursa-labs-taxi-data/2019/06/data.parquet", filesystem = fs) +bucket <- s3_bucket("ursa-labs-taxi-data") +df <- read_parquet(bucket$path("2019/06/data.parquet")) ``` -See the help for `FileSystem` for a list of options that `S3FileSystem$create()` +See the help for `FileSystem` for a list of options that `s3_bucket()` and `S3FileSystem$create()` can take. `region`, `scheme`, and `endpoint_override` can be encoded as query -parameters in the URI (though `region` will be auto-detected the bucket URI if omitted), -and `access_key` and `secret_key` can also be included, +parameters in the URI (though `region` will be auto-detected in `s3_bucket()` or from the URI if omitted). +`access_key` and `secret_key` can also be included, but other options are not supported in the URI. -Using the `SubTreeFileSystem` class, you can represent an S3 bucket or -subdirectory inside of one. +The object that `s3_bucket()` returns is technically a `SubTreeFileSystem`, which holds a path and a file system to which it corresponds. `SubTreeFileSystem`s can be useful for holding a reference to a subdirectory somewhere, on S3 or elsewhere. + +One way to get a subtree is to call the `$cd()` method on a `FileSystem` ```r -bucket <- SubTreeFileSystem$create( - "ursa-labs-taxi-data", - S3FileSystem$create(region = "us-east-2") -) -df <- read_parquet("2019/06/data.parquet", filesystem = bucket) +june2019 <- bucket$cd("2019/06") +df <- read_parquet(june2019$path("data.parquet")) ``` `SubTreeFileSystem` can also be made from a URI: ```r -bucket <- SubTreeFileSystem$create("s3://ursa-labs-taxi-data") +june2019 <- SubTreeFileSystem$create("s3://ursa-labs-taxi-data/2019/06") ``` ## Authentication @@ -88,14 +92,14 @@ There are a few options for passing these credentials: 1. Include them in the URI, like `s3://access_key:secret_key@bucket-name/path/to/file`. Be sure to [URL-encode](https://en.wikipedia.org/wiki/Percent-encoding) your secrets if they contain special characters like "/". -2. Pass them as `access_key` and `secret_key` to `S3FileSystem$create()` +2. Pass them as `access_key` and `secret_key` to `S3FileSystem$create()` or `s3_bucket()` 3. Set them as environment variables named `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`, respectively. 4. Define them in a `~/.aws/credentials` file, according to the [AWS documentation](https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/credentials.html). You can also use an [AccessRole](https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html) -for temporary access by passing the `role_arn` identifier to `S3FileSystem$create()`. +for temporary access by passing the `role_arn` identifier to `S3FileSystem$create()` or `s3_bucket()`. ## File systems that emulate S3