diff --git a/r/R/io.R b/r/R/io.R index 379dcf6f35a..8e72187b431 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -270,7 +270,7 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem file <- ReadableFile$create(file) } - if (!identical(compression, "uncompressed")) { + if (is_compressed(compression)) { file <- CompressedInputStream$create(file, compression) } } else if (inherits(file, c("raw", "Buffer"))) { @@ -292,7 +292,7 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem file } -make_output_stream <- function(x, filesystem = NULL) { +make_output_stream <- function(x, filesystem = NULL, compression = NULL) { if (inherits(x, "connection")) { if (!isOpen(x)) { open(x, "wb") @@ -309,11 +309,21 @@ make_output_stream <- function(x, filesystem = NULL) { filesystem <- fs_and_path$fs x <- fs_and_path$path } + + if (is.null(compression)) { + # Infer compression from sink + compression <- detect_compression(x) + } + assert_that(is.string(x)) - if (is.null(filesystem)) { - FileOutputStream$create(x) + if (is.null(filesystem) && is_compressed(compression)) { + CompressedOutputStream$create(x) ##compressed local + } else if (is.null(filesystem) && !is_compressed(compression)) { + FileOutputStream$create(x) ## uncompressed local + } else if (!is.null(filesystem) && is_compressed(compression)) { + CompressedOutputStream$create(filesystem$OpenOutputStream(x)) ## compressed remote } else { - filesystem$OpenOutputStream(x) + filesystem$OpenOutputStream(x) ## uncompressed remote } } @@ -322,6 +332,9 @@ detect_compression <- function(path) { return("uncompressed") } + # Remove any trailing slashes, which FileSystem$from_uri may add + path <- gsub("/$", "", path) + switch(tools::file_ext(path), bz2 = "bz2", gz = "gzip", diff --git a/r/R/util.R b/r/R/util.R index ff2bb070b88..4aff69e471a 100644 --- a/r/R/util.R +++ b/r/R/util.R @@ -211,3 +211,7 @@ handle_csv_read_error <- function(e, schema, call) { } abort(msg, call = call) } + +is_compressed <- function(compression) { + !identical(compression, "uncompressed") +} diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index 631e75fd74a..8e463d3abec 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -564,6 +564,23 @@ test_that("write_csv_arrow can write from RecordBatchReader objects", { expect_equal(nrow(tbl_in), 3) }) +test_that("read/write compressed file successfully", { + skip_if_not_available("gzip") + tfgz <- tempfile(fileext = ".csv.gz") + tf <- tempfile(fileext = ".csv") + on.exit(unlink(tf)) + on.exit(unlink(tfgz)) + + write_csv_arrow(tbl, tf) + write_csv_arrow(tbl, tfgz) + expect_lt(file.size(tfgz), file.size(tf)) + + expect_identical( + read_csv_arrow(tfgz), + tbl + ) +}) + test_that("read_csv_arrow() can read sub-second timestamps with col_types T setting (ARROW-15599)", { tbl <- tibble::tibble(time = c("2018-10-07 19:04:05.000", "2018-10-07 19:04:05.001")) tf <- tempfile() diff --git a/r/tests/testthat/test-s3-minio.R b/r/tests/testthat/test-s3-minio.R index 24f8495ae20..51db355783b 100644 --- a/r/tests/testthat/test-s3-minio.R +++ b/r/tests/testthat/test-s3-minio.R @@ -54,6 +54,26 @@ if (arrow_with_s3() && process_is_running("minio server")) { ) }) + test_that("read/write compressed csv by filesystem", { + skip_if_not_available("gzip") + dat <- tibble(x = seq(1, 10, by = 0.2)) + write_csv_arrow(dat, fs$path(minio_path("test.csv.gz"))) + expect_identical( + read_csv_arrow(fs$path(minio_path("test.csv.gz"))), + dat + ) + }) + + test_that("read/write csv by filesystem", { + skip_if_not_available("gzip") + dat <- tibble(x = seq(1, 10, by = 0.2)) + write_csv_arrow(dat, fs$path(minio_path("test.csv"))) + expect_identical( + read_csv_arrow(fs$path(minio_path("test.csv"))), + dat + ) + }) + test_that("read/write stream", { write_ipc_stream(example_data, fs$path(minio_path("test3.ipc"))) expect_identical(