From ccc4c54493749eb77fcad3995835c0ac61c92f24 Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Tue, 17 May 2022 12:56:24 -0700 Subject: [PATCH 01/15] compress write_csv_arrow output by file extension --- r/R/io.R | 13 +++++++++++-- r/tests/testthat/test-csv.R | 12 ++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/r/R/io.R b/r/R/io.R index 379dcf6f35a..8997b7cdeb6 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -292,7 +292,7 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem file } -make_output_stream <- function(x, filesystem = NULL) { +make_output_stream <- function(x, filesystem = NULL, compression = NULL) { if (inherits(x, "connection")) { if (!isOpen(x)) { open(x, "wb") @@ -301,6 +301,11 @@ make_output_stream <- function(x, filesystem = NULL) { return(MakeRConnectionOutputStream(x)) } + if (is.null(compression)) { + # Infer compression from sink + compression <- detect_compression(x) + } + if (inherits(x, "SubTreeFileSystem")) { filesystem <- x$base_fs x <- x$base_path @@ -311,7 +316,11 @@ make_output_stream <- function(x, filesystem = NULL) { } assert_that(is.string(x)) if (is.null(filesystem)) { - FileOutputStream$create(x) + if (!identical(compression, "uncompressed")) { + CompressedOutputStream$create(x) + } else { + FileOutputStream$create(x) + } } else { filesystem$OpenOutputStream(x) } diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index 631e75fd74a..fbd087a0dbf 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -564,6 +564,18 @@ test_that("write_csv_arrow can write from RecordBatchReader objects", { expect_equal(nrow(tbl_in), 3) }) +test_that("write_csv_arrow() compresses by file extension", { + skip_if_not_available("gzip") + tfgz <- tempfile(fileext = ".csv.gz") + tf <- tempfile(fileext = ".csv") + on.exit(unlink(tf)) + on.exit(unlink(tfgz)) + + write_csv_arrow(tbl, tf) + write_csv_arrow(tbl, tfgz) + expect_lt(file.size(tfgz), file.size(tf)) +}) + test_that("read_csv_arrow() can read sub-second timestamps with col_types T setting (ARROW-15599)", { tbl <- tibble::tibble(time = c("2018-10-07 19:04:05.000", "2018-10-07 19:04:05.001")) tf <- tempfile() From 873c9008e8fc562f48a0a8f3542bfba776ed252a Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Tue, 17 May 2022 16:17:50 -0700 Subject: [PATCH 02/15] detect compression for stream and local --- r/R/io.R | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/r/R/io.R b/r/R/io.R index 8997b7cdeb6..72ac32b5c2f 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -301,10 +301,6 @@ make_output_stream <- function(x, filesystem = NULL, compression = NULL) { return(MakeRConnectionOutputStream(x)) } - if (is.null(compression)) { - # Infer compression from sink - compression <- detect_compression(x) - } if (inherits(x, "SubTreeFileSystem")) { filesystem <- x$base_fs @@ -314,15 +310,21 @@ make_output_stream <- function(x, filesystem = NULL, compression = NULL) { filesystem <- fs_and_path$fs x <- fs_and_path$path } + + if (is.null(compression)) { + # Infer compression from sink + compression <- detect_compression(x) + } + assert_that(is.string(x)) - if (is.null(filesystem)) { - if (!identical(compression, "uncompressed")) { - CompressedOutputStream$create(x) - } else { - FileOutputStream$create(x) - } + if (is.null(filesystem) && is_compressed(compression)) { + CompressedOutputStream$create(x) ##compressed local + } else if (is.null(filesystem) && !is_compressed(compression)) { + FileOutputStream$create(x) ## uncompressed local + } else if (!is.null(filesystem) && is_compressed(compression)) { + CompressedOutputStream$create(filesystem$OpenOutputStream(x)) ## compressed s3 } else { - filesystem$OpenOutputStream(x) + filesystem$OpenOutputStream(x) ## uncompressed s3 } } @@ -331,6 +333,10 @@ detect_compression <- function(path) { return("uncompressed") } + ## Remove any trailing slashes + path <- gsub("/$", "", path) + + switch(tools::file_ext(path), bz2 = "bz2", gz = "gzip", From 0b712b27ddd1107d2584e25b64b08906da97f4fa Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Tue, 17 May 2022 16:18:09 -0700 Subject: [PATCH 03/15] add helper for compression --- r/R/util.R | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/r/R/util.R b/r/R/util.R index ff2bb070b88..4aff69e471a 100644 --- a/r/R/util.R +++ b/r/R/util.R @@ -211,3 +211,7 @@ handle_csv_read_error <- function(e, schema, call) { } abort(msg, call = call) } + +is_compressed <- function(compression) { + !identical(compression, "uncompressed") +} From 82cea820d11b59b4c212350063b64a37375002a2 Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Tue, 17 May 2022 16:19:20 -0700 Subject: [PATCH 04/15] add into read function as well --- r/R/io.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/R/io.R b/r/R/io.R index 72ac32b5c2f..3b02f4ef559 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -270,7 +270,7 @@ make_readable_file <- function(file, mmap = TRUE, compression = NULL, filesystem file <- ReadableFile$create(file) } - if (!identical(compression, "uncompressed")) { + if (is_compressed(compression)) { file <- CompressedInputStream$create(file, compression) } } else if (inherits(file, c("raw", "Buffer"))) { From 44c0b32d22984fc48d6dc9d3a6975cfece5a6ffe Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Tue, 17 May 2022 16:27:55 -0700 Subject: [PATCH 05/15] add filesystem tests --- r/tests/testthat/test-s3-minio.R | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/r/tests/testthat/test-s3-minio.R b/r/tests/testthat/test-s3-minio.R index 24f8495ae20..fef87be6ce8 100644 --- a/r/tests/testthat/test-s3-minio.R +++ b/r/tests/testthat/test-s3-minio.R @@ -54,6 +54,22 @@ if (arrow_with_s3() && process_is_running("minio server")) { ) }) + test_that("read/write compressed csv by filesystem", { + write_csv_arrow(example_data, fs$path(minio_path("test.csv.gz"))) + expect_identical( + read_csv_arrow(fs$path(minio_path("test.csv.gz"))), + example_data + ) + }) + + test_that("read/write csv by filesystem", { + write_csv_arrow(example_data, fs$path(minio_path("test.csv"))) + expect_identical( + read_csv_arrow(fs$path(minio_path("test.csv"))), + example_data + ) + }) + test_that("read/write stream", { write_ipc_stream(example_data, fs$path(minio_path("test3.ipc"))) expect_identical( From f27e9b7c56b75fb95ac5d914ba35dd514329f96b Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Tue, 17 May 2022 19:53:47 -0700 Subject: [PATCH 06/15] add rountrip test for local --- r/tests/testthat/test-csv.R | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index fbd087a0dbf..5a48969bc2b 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -576,6 +576,17 @@ test_that("write_csv_arrow() compresses by file extension", { expect_lt(file.size(tfgz), file.size(tf)) }) +test_that("read/write compressed file", { + skip_if_not_available("gzip") + tfgz <- tempfile(fileext = ".csv.gz") + on.exit(unlink(tfgz)) + write_csv_arrow(tbl, tfgz) + expect_identical( + read_csv_arrow(tfgz), + tbl + ) +}) + test_that("read_csv_arrow() can read sub-second timestamps with col_types T setting (ARROW-15599)", { tbl <- tibble::tibble(time = c("2018-10-07 19:04:05.000", "2018-10-07 19:04:05.001")) tf <- tempfile() From c4dfed293d44398c0a072415ff36a02af76c8916 Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Tue, 17 May 2022 20:04:45 -0700 Subject: [PATCH 07/15] use simpler data to avoid col_type changes --- r/tests/testthat/test-s3-minio.R | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/r/tests/testthat/test-s3-minio.R b/r/tests/testthat/test-s3-minio.R index fef87be6ce8..2f104e4c132 100644 --- a/r/tests/testthat/test-s3-minio.R +++ b/r/tests/testthat/test-s3-minio.R @@ -63,10 +63,11 @@ if (arrow_with_s3() && process_is_running("minio server")) { }) test_that("read/write csv by filesystem", { - write_csv_arrow(example_data, fs$path(minio_path("test.csv"))) + dat <- data.frame(x = seq(1,10, by = 0.2)) + write_csv_arrow(dat, fs$path(minio_path("test.csv"))) expect_identical( read_csv_arrow(fs$path(minio_path("test.csv"))), - example_data + dat ) }) From e26b858609065768474c0ef20fd856fcfc4bdc84 Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Tue, 17 May 2022 20:49:07 -0700 Subject: [PATCH 08/15] use a tibble and make both simpler --- r/tests/testthat/test-s3-minio.R | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/r/tests/testthat/test-s3-minio.R b/r/tests/testthat/test-s3-minio.R index 2f104e4c132..efd86b4f89a 100644 --- a/r/tests/testthat/test-s3-minio.R +++ b/r/tests/testthat/test-s3-minio.R @@ -55,15 +55,16 @@ if (arrow_with_s3() && process_is_running("minio server")) { }) test_that("read/write compressed csv by filesystem", { - write_csv_arrow(example_data, fs$path(minio_path("test.csv.gz"))) + dat <- tibble(x = seq(1,10, by = 0.2)) + write_csv_arrow(dat, fs$path(minio_path("test.csv.gz"))) expect_identical( read_csv_arrow(fs$path(minio_path("test.csv.gz"))), - example_data + dat ) }) test_that("read/write csv by filesystem", { - dat <- data.frame(x = seq(1,10, by = 0.2)) + dat <- tibble(x = seq(1,10, by = 0.2)) write_csv_arrow(dat, fs$path(minio_path("test.csv"))) expect_identical( read_csv_arrow(fs$path(minio_path("test.csv"))), From da2a3a3ec3b89f0c1fabd40aec62a74057264d31 Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Tue, 17 May 2022 21:57:10 -0700 Subject: [PATCH 09/15] fix lint error --- r/tests/testthat/test-s3-minio.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/r/tests/testthat/test-s3-minio.R b/r/tests/testthat/test-s3-minio.R index efd86b4f89a..1769e95ffb2 100644 --- a/r/tests/testthat/test-s3-minio.R +++ b/r/tests/testthat/test-s3-minio.R @@ -55,7 +55,7 @@ if (arrow_with_s3() && process_is_running("minio server")) { }) test_that("read/write compressed csv by filesystem", { - dat <- tibble(x = seq(1,10, by = 0.2)) + dat <- tibble(x = seq(1, 10, by = 0.2)) write_csv_arrow(dat, fs$path(minio_path("test.csv.gz"))) expect_identical( read_csv_arrow(fs$path(minio_path("test.csv.gz"))), @@ -64,7 +64,7 @@ if (arrow_with_s3() && process_is_running("minio server")) { }) test_that("read/write csv by filesystem", { - dat <- tibble(x = seq(1,10, by = 0.2)) + dat <- tibble(x = seq(1, 10, by = 0.2)) write_csv_arrow(dat, fs$path(minio_path("test.csv"))) expect_identical( read_csv_arrow(fs$path(minio_path("test.csv"))), From 29e2f6af13c89c151f5b86dcc8b0917afead5a00 Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Wed, 18 May 2022 10:18:38 -0700 Subject: [PATCH 10/15] Update r/R/io.R Co-authored-by: Neal Richardson --- r/R/io.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/R/io.R b/r/R/io.R index 3b02f4ef559..2b7cbe2801d 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -333,7 +333,7 @@ detect_compression <- function(path) { return("uncompressed") } - ## Remove any trailing slashes + # Remove any trailing slashes, which FileSystem$from_uri may add path <- gsub("/$", "", path) From afd1910d9895a732d1671777e4e6e08d9c03a686 Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Wed, 18 May 2022 10:19:14 -0700 Subject: [PATCH 11/15] Update r/R/io.R Co-authored-by: Neal Richardson --- r/R/io.R | 1 - 1 file changed, 1 deletion(-) diff --git a/r/R/io.R b/r/R/io.R index 2b7cbe2801d..a0086b35301 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -301,7 +301,6 @@ make_output_stream <- function(x, filesystem = NULL, compression = NULL) { return(MakeRConnectionOutputStream(x)) } - if (inherits(x, "SubTreeFileSystem")) { filesystem <- x$base_fs x <- x$base_path From 26d4a2fbc7ff051ca3847e43786b211e1f997187 Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Wed, 18 May 2022 10:19:22 -0700 Subject: [PATCH 12/15] Update r/R/io.R Co-authored-by: Neal Richardson --- r/R/io.R | 1 - 1 file changed, 1 deletion(-) diff --git a/r/R/io.R b/r/R/io.R index a0086b35301..766745ac8f3 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -335,7 +335,6 @@ detect_compression <- function(path) { # Remove any trailing slashes, which FileSystem$from_uri may add path <- gsub("/$", "", path) - switch(tools::file_ext(path), bz2 = "bz2", gz = "gzip", From ba432a572dd77fe7226a7837a9f43d51487d5c5b Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Wed, 18 May 2022 10:22:42 -0700 Subject: [PATCH 13/15] clarify remote comment --- r/R/io.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/r/R/io.R b/r/R/io.R index 766745ac8f3..8e72187b431 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -321,9 +321,9 @@ make_output_stream <- function(x, filesystem = NULL, compression = NULL) { } else if (is.null(filesystem) && !is_compressed(compression)) { FileOutputStream$create(x) ## uncompressed local } else if (!is.null(filesystem) && is_compressed(compression)) { - CompressedOutputStream$create(filesystem$OpenOutputStream(x)) ## compressed s3 + CompressedOutputStream$create(filesystem$OpenOutputStream(x)) ## compressed remote } else { - filesystem$OpenOutputStream(x) ## uncompressed s3 + filesystem$OpenOutputStream(x) ## uncompressed remote } } From 0ff68a6cbf57d60340bd672dfab4e9b9777e2f65 Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Wed, 18 May 2022 10:24:06 -0700 Subject: [PATCH 14/15] consolidate test --- r/tests/testthat/test-csv.R | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index 5a48969bc2b..8e463d3abec 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -564,7 +564,7 @@ test_that("write_csv_arrow can write from RecordBatchReader objects", { expect_equal(nrow(tbl_in), 3) }) -test_that("write_csv_arrow() compresses by file extension", { +test_that("read/write compressed file successfully", { skip_if_not_available("gzip") tfgz <- tempfile(fileext = ".csv.gz") tf <- tempfile(fileext = ".csv") @@ -574,13 +574,7 @@ test_that("write_csv_arrow() compresses by file extension", { write_csv_arrow(tbl, tf) write_csv_arrow(tbl, tfgz) expect_lt(file.size(tfgz), file.size(tf)) -}) -test_that("read/write compressed file", { - skip_if_not_available("gzip") - tfgz <- tempfile(fileext = ".csv.gz") - on.exit(unlink(tfgz)) - write_csv_arrow(tbl, tfgz) expect_identical( read_csv_arrow(tfgz), tbl From 1ad3e4e7cabcb3de61896991e976b5186cb54f61 Mon Sep 17 00:00:00 2001 From: Sam Albers Date: Wed, 18 May 2022 10:26:32 -0700 Subject: [PATCH 15/15] skip gzip for remote tests --- r/tests/testthat/test-s3-minio.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/r/tests/testthat/test-s3-minio.R b/r/tests/testthat/test-s3-minio.R index 1769e95ffb2..51db355783b 100644 --- a/r/tests/testthat/test-s3-minio.R +++ b/r/tests/testthat/test-s3-minio.R @@ -55,6 +55,7 @@ if (arrow_with_s3() && process_is_running("minio server")) { }) test_that("read/write compressed csv by filesystem", { + skip_if_not_available("gzip") dat <- tibble(x = seq(1, 10, by = 0.2)) write_csv_arrow(dat, fs$path(minio_path("test.csv.gz"))) expect_identical( @@ -64,6 +65,7 @@ if (arrow_with_s3() && process_is_running("minio server")) { }) test_that("read/write csv by filesystem", { + skip_if_not_available("gzip") dat <- tibble(x = seq(1, 10, by = 0.2)) write_csv_arrow(dat, fs$path(minio_path("test.csv"))) expect_identical(