From b4df3e72cd98fc22984eb12e9b2a2fba342f621d Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Thu, 22 Apr 2021 13:34:21 +0100 Subject: [PATCH 01/27] Add unit tests for write_csv_arrow --- r/tests/testthat/test-csv.R | 48 +++++++++++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index d27706f060d..4fcb2077e70 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -15,13 +15,57 @@ # specific language governing permissions and limitations # under the License. -context("CsvTableReader") - # Not all types round trip via CSV 100% identical by default tbl <- example_data[, c("dbl", "lgl", "false", "chr")] # Add a date to test its parsing tbl$date <- Sys.Date() + 1:10 +csv_file <- tempfile() + +test_that("Write a CSV file with header", { + tbl_out <- write_csv_arrow(tbl, csv_file) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out, tbl) + + tbl_in <- read_csv_arrow(csv_file) + expect_identical(tbl_in, tbl) +}) + + +test_that("Write a CSV file with no header", { + tbl_out <- write_csv_arrow(tbl, csv_file, include_header = FALSE) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out, tbl) + tbl_in <- read_csv_arrow(csv_file, col_names = FALSE) + + tbl_expected <- tbl + names(tbl_expected) <- c("f0", "f1", "f2", "f3", "f4") + + expect_identical(tbl_in, tbl_expected) +}) + +test_that("Write a CSV file with different batch sizes", { + tbl_out1 <- write_csv_arrow(tbl, csv_file, batch_size = 1) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out1, tbl) + tbl_in1 <- read_csv_arrow(csv_file) + expect_identical(tbl_in1, tbl) + + tbl_out2 <- write_csv_arrow(tbl, csv_file, batch_size = 2) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out2, tbl) + tbl_in2 <- read_csv_arrow(csv_file) + expect_identical(tbl_in2, tbl) + + tbl_out3 <- write_csv_arrow(tbl, csv_file, batch_size = 12) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out3, tbl) + tbl_in3 <- read_csv_arrow(csv_file) + expect_identical(tbl_in3, tbl) + +}) + + test_that("Can read csv file", { tf <- tempfile() on.exit(unlink(tf)) From 1077e8d42d3684103f7f34be61d0337ef92f912d Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Thu, 22 Apr 2021 15:56:56 +0100 Subject: [PATCH 02/27] Add CsvWriteOptions and write_csv_arrow functions --- r/R/csv.R | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/r/R/csv.R b/r/R/csv.R index 160c46e4753..c93ed45feb3 100644 --- a/r/R/csv.R +++ b/r/R/csv.R @@ -408,6 +408,17 @@ CsvReadOptions$create <- function(use_threads = option_use_threads(), ) } +#' @export +CsvWriteOptions <- R6Class("CsvWriteOptions", inherit = ArrowObject) +CsvWriteOptions$create <- function(include_header = TRUE, batch_size = 1024L){ + csv___WriteOptions__initialize( + list( + include_header = include_header, + batch_size = batch_size + ) + ) +} + readr_to_csv_read_options <- function(skip, col_names, col_types) { if (isTRUE(col_names)) { # C++ default to parse is 0-length string array @@ -585,3 +596,55 @@ readr_to_csv_convert_options <- function(na, include_columns = include_columns ) } + +#' Write CSV file to disk +#' +#' @param x `data.frame`, [RecordBatch], or [Table] +#' @param sink A string file path, URI, or [OutputStream], or path in a file +#' system (`SubTreeFileSystem`) +#' @param batch_size Maximum number of rows processed at a time. Default is 1024 +#' @param include_header Whether to write an initial header line with column names +#' +#' @return The input `x`, invisibly. Note that if `sink` is an [OutputStream], +#' the stream will be left open. +#' @export +#' @examples +#' \donttest{ +#' tf <- tempfile() +#' on.exit(unlink(tf)) +#' write_csv_arrow(mtcars, tf) +#' } +#' @include arrow-package.R +write_csv_arrow <- function(x, + sink, + include_header = TRUE, + batch_size = 1024L, + memory_pool = default_memory_pool(), + ) { + # Handle and validate options before touching data + batch_size <- as.integer(batch_size) + assert_that(batch_size > 0) + assert_that(length(include_header) == 1) + assert_that(is.logical(include_header)) + + write_options = CsvWriteOptions$create(include_header, batch_size) + + x_out <- x + if (is.data.frame(x)) { + x <- Table$create(x) + } + assert_is(x, c("Table", "RecordBatch")) + + if (!inherits(sink, "OutputStream")) { + sink <- make_output_stream(sink) + on.exit(sink$close()) + } + + if(inherits(x, "RecordBatch")){ + csv__WriteCSV___RecordBatch(x, write_options, memory_pool, sink) + } else if(inherits(x, "Table")){ + csv__WriteCSV___Table(x, write_options, memory_pool, sink) + } + + invisible(x_out) +} From 9d12fe699c2c1cd3350de8d9b8eb4917fd32a9b1 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Thu, 22 Apr 2021 15:58:33 +0100 Subject: [PATCH 03/27] Add C++ functions for intialising csv writeoptions and bindings to call WriteCSV functions --- r/src/csv.cpp | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 0ce4cd699f8..5ceef5e1227 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -22,6 +22,15 @@ #include #include +// [[arrow::export]] +std::shared_ptr csv___WriteOptions__initialize( + cpp11::list options) { + auto res = std::make_shared(arrow::csv::WriteOptions::Defaults()); + res->include_header = cpp11::as_cpp(options["include_header"]); + res->batch_size = cpp11::as_cpp(options["batch_size"]); + return res; +} + // [[arrow::export]] std::shared_ptr csv___ReadOptions__initialize( cpp11::list options) { @@ -174,4 +183,21 @@ std::shared_ptr TimestampParser__MakeISO8601() { return arrow::TimestampParser::MakeISO8601(); } +// [[arrow::export]] +void csv__WriteCSV___Table(const std::shared_ptr& table, + const std::shared_ptr write_options, + std::shared_ptr& pool, + std::shared_ptr& stream) { + StopIfNotOk(arrow::csv::writer::WriteCSV(*table, write_options, *pool, stream.get())); +} + +// [[arrow::export]] +void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, + const std::shared_ptr write_options, + std::shared_ptr& pool, + std::shared_ptr& stream) { + StopIfNotOk(arrow::csv::writer::WriteCSV(*record_batch, write_options, *pool, stream.get())); +} + + #endif From 5791f634b4db0d2515b1399a50630cff20ba3ab1 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Thu, 22 Apr 2021 15:58:54 +0100 Subject: [PATCH 04/27] Add pkgdown reference to CsvWriteOptions --- r/_pkgdown.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/r/_pkgdown.yml b/r/_pkgdown.yml index bb77b416aab..e744f1e578d 100644 --- a/r/_pkgdown.yml +++ b/r/_pkgdown.yml @@ -109,6 +109,7 @@ reference: - RecordBatchReader - RecordBatchWriter - CsvReadOptions + - CsvWriteOptions - title: Arrow data containers contents: - array From deff496bf56fdec4addbbf81d958bb4bff1a6762 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Fri, 23 Apr 2021 08:26:33 +0100 Subject: [PATCH 05/27] Add WriteOptions to arrow::csv namespace --- cpp/src/arrow/csv/type_fwd.h | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/csv/type_fwd.h b/cpp/src/arrow/csv/type_fwd.h index 17fcdbdcc56..c0a53847a90 100644 --- a/cpp/src/arrow/csv/type_fwd.h +++ b/cpp/src/arrow/csv/type_fwd.h @@ -22,6 +22,7 @@ class TableReader; struct ConvertOptions; struct ReadOptions; struct ParseOptions; +struct WriteOptions; } // namespace csv } // namespace arrow From e21aaa5b3e6f6203586558ed18a0e292cf6810c2 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Fri, 23 Apr 2021 13:19:49 +0100 Subject: [PATCH 06/27] Update types --- r/src/csv.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 5ceef5e1227..44d857c6010 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -185,18 +185,18 @@ std::shared_ptr TimestampParser__MakeISO8601() { // [[arrow::export]] void csv__WriteCSV___Table(const std::shared_ptr& table, - const std::shared_ptr write_options, - std::shared_ptr& pool, - std::shared_ptr& stream) { - StopIfNotOk(arrow::csv::writer::WriteCSV(*table, write_options, *pool, stream.get())); + const std::shared_ptr& write_options, + const arrow::MemoryPool* pool, + const std::shared_ptr* stream) { + StopIfNotOk(arrow::csv::writer::WriteCSV(*table, *write_options, *pool, stream.get())); } // [[arrow::export]] void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, - const std::shared_ptr write_options, - std::shared_ptr& pool, - std::shared_ptr& stream) { - StopIfNotOk(arrow::csv::writer::WriteCSV(*record_batch, write_options, *pool, stream.get())); + const std::shared_ptr& write_options, + const arrow::MemoryPool* pool, + const std::shared_ptr* stream) { + StopIfNotOk(arrow::csv::writer::WriteCSV(*record_batch, *write_options, *pool, stream.get())); } From 04d810609ec8437056b67be17006fe4fa7a87af7 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Fri, 23 Apr 2021 13:20:19 +0100 Subject: [PATCH 07/27] Update arrowExports --- r/R/arrowExports.R | 12 +++++++++ r/src/arrowExports.cpp | 56 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 51cdcf85df0..a6fa410dcdf 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -292,6 +292,10 @@ compute__GetFunctionNames <- function(){ .Call(`_arrow_compute__GetFunctionNames`) } +csv___WriteOptions__initialize <- function(options){ + .Call(`_arrow_csv___WriteOptions__initialize`, options) +} + csv___ReadOptions__initialize <- function(options){ .Call(`_arrow_csv___ReadOptions__initialize`, options) } @@ -332,6 +336,14 @@ TimestampParser__MakeISO8601 <- function(){ .Call(`_arrow_TimestampParser__MakeISO8601`) } +csv__WriteCSV___Table <- function(table, write_options, pool, stream){ + invisible(.Call(`_arrow_csv__WriteCSV___Table`, table, write_options, pool, stream)) +} + +csv__WriteCSV___RecordBatch <- function(record_batch, write_options, pool, stream){ + invisible(.Call(`_arrow_csv__WriteCSV___RecordBatch`, record_batch, write_options, pool, stream)) +} + dataset___Dataset__NewScan <- function(ds){ .Call(`_arrow_dataset___Dataset__NewScan`, ds) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index c5ef6343ced..eceae4a1ab1 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1143,6 +1143,21 @@ extern "C" SEXP _arrow_compute__GetFunctionNames(){ } #endif +// csv.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr csv___WriteOptions__initialize(cpp11::list options); +extern "C" SEXP _arrow_csv___WriteOptions__initialize(SEXP options_sexp){ +BEGIN_CPP11 + arrow::r::Input::type options(options_sexp); + return cpp11::as_sexp(csv___WriteOptions__initialize(options)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_csv___WriteOptions__initialize(SEXP options_sexp){ + Rf_error("Cannot call csv___WriteOptions__initialize(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // csv.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr csv___ReadOptions__initialize(cpp11::list options); @@ -1295,6 +1310,44 @@ extern "C" SEXP _arrow_TimestampParser__MakeISO8601(){ } #endif +// csv.cpp +#if defined(ARROW_R_WITH_ARROW) +void csv__WriteCSV___Table(const std::shared_ptr& table, const std::shared_ptr write_options, std::shared_ptr& pool, std::shared_ptr& stream); +extern "C" SEXP _arrow_csv__WriteCSV___Table(SEXP table_sexp, SEXP write_options_sexp, SEXP pool_sexp, SEXP stream_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type table(table_sexp); + arrow::r::Input>::type write_options(write_options_sexp); + arrow::r::Input&>::type pool(pool_sexp); + arrow::r::Input&>::type stream(stream_sexp); + csv__WriteCSV___Table(table, write_options, pool, stream); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_csv__WriteCSV___Table(SEXP table_sexp, SEXP write_options_sexp, SEXP pool_sexp, SEXP stream_sexp){ + Rf_error("Cannot call csv__WriteCSV___Table(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// csv.cpp +#if defined(ARROW_R_WITH_ARROW) +void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr write_options, std::shared_ptr& pool, std::shared_ptr& stream); +extern "C" SEXP _arrow_csv__WriteCSV___RecordBatch(SEXP record_batch_sexp, SEXP write_options_sexp, SEXP pool_sexp, SEXP stream_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type record_batch(record_batch_sexp); + arrow::r::Input>::type write_options(write_options_sexp); + arrow::r::Input&>::type pool(pool_sexp); + arrow::r::Input&>::type stream(stream_sexp); + csv__WriteCSV___RecordBatch(record_batch, write_options, pool, stream); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_csv__WriteCSV___RecordBatch(SEXP record_batch_sexp, SEXP write_options_sexp, SEXP pool_sexp, SEXP stream_sexp){ + Rf_error("Cannot call csv__WriteCSV___RecordBatch(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr dataset___Dataset__NewScan(const std::shared_ptr& ds); @@ -6677,6 +6730,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_compute__CallFunction", (DL_FUNC) &_arrow_compute__CallFunction, 3}, { "_arrow_compute__GroupBy", (DL_FUNC) &_arrow_compute__GroupBy, 3}, { "_arrow_compute__GetFunctionNames", (DL_FUNC) &_arrow_compute__GetFunctionNames, 0}, + { "_arrow_csv___WriteOptions__initialize", (DL_FUNC) &_arrow_csv___WriteOptions__initialize, 1}, { "_arrow_csv___ReadOptions__initialize", (DL_FUNC) &_arrow_csv___ReadOptions__initialize, 1}, { "_arrow_csv___ParseOptions__initialize", (DL_FUNC) &_arrow_csv___ParseOptions__initialize, 1}, { "_arrow_csv___ReadOptions__column_names", (DL_FUNC) &_arrow_csv___ReadOptions__column_names, 1}, @@ -6687,6 +6741,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_TimestampParser__format", (DL_FUNC) &_arrow_TimestampParser__format, 1}, { "_arrow_TimestampParser__MakeStrptime", (DL_FUNC) &_arrow_TimestampParser__MakeStrptime, 1}, { "_arrow_TimestampParser__MakeISO8601", (DL_FUNC) &_arrow_TimestampParser__MakeISO8601, 0}, + { "_arrow_csv__WriteCSV___Table", (DL_FUNC) &_arrow_csv__WriteCSV___Table, 4}, + { "_arrow_csv__WriteCSV___RecordBatch", (DL_FUNC) &_arrow_csv__WriteCSV___RecordBatch, 4}, { "_arrow_dataset___Dataset__NewScan", (DL_FUNC) &_arrow_dataset___Dataset__NewScan, 1}, { "_arrow_dataset___Dataset__schema", (DL_FUNC) &_arrow_dataset___Dataset__schema, 1}, { "_arrow_dataset___Dataset__type_name", (DL_FUNC) &_arrow_dataset___Dataset__type_name, 1}, From 59112f6485f12e4cd1c54d17b425143fa5b5276b Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Fri, 23 Apr 2021 13:20:46 +0100 Subject: [PATCH 08/27] Re-order params and add assertion --- r/R/csv.R | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/r/R/csv.R b/r/R/csv.R index c93ed45feb3..e2a08806f9d 100644 --- a/r/R/csv.R +++ b/r/R/csv.R @@ -602,8 +602,8 @@ readr_to_csv_convert_options <- function(na, #' @param x `data.frame`, [RecordBatch], or [Table] #' @param sink A string file path, URI, or [OutputStream], or path in a file #' system (`SubTreeFileSystem`) -#' @param batch_size Maximum number of rows processed at a time. Default is 1024 #' @param include_header Whether to write an initial header line with column names +#' @param batch_size Maximum number of rows processed at a time. Default is 1024 #' #' @return The input `x`, invisibly. Note that if `sink` is an [OutputStream], #' the stream will be left open. @@ -635,6 +635,8 @@ write_csv_arrow <- function(x, } assert_is(x, c("Table", "RecordBatch")) + assert_is(memory_pool, "MemoryPool") + if (!inherits(sink, "OutputStream")) { sink <- make_output_stream(sink) on.exit(sink$close()) From c172db85a0269f4ae7689b4347fbd73b5e0ced73 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Fri, 23 Apr 2021 14:07:08 +0100 Subject: [PATCH 09/27] try changing --- r/src/csv.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 44d857c6010..aa6f1a9b891 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -188,7 +188,7 @@ void csv__WriteCSV___Table(const std::shared_ptr& table, const std::shared_ptr& write_options, const arrow::MemoryPool* pool, const std::shared_ptr* stream) { - StopIfNotOk(arrow::csv::writer::WriteCSV(*table, *write_options, *pool, stream.get())); + StopIfNotOk(arrow::csv::writer::WriteCSV(*table, *write_options, pool, stream.get())); } // [[arrow::export]] @@ -196,7 +196,7 @@ void csv__WriteCSV___RecordBatch(const std::shared_ptr& reco const std::shared_ptr& write_options, const arrow::MemoryPool* pool, const std::shared_ptr* stream) { - StopIfNotOk(arrow::csv::writer::WriteCSV(*record_batch, *write_options, *pool, stream.get())); + StopIfNotOk(arrow::csv::writer::WriteCSV(*record_batch, *write_options, pool, stream.get())); } From 2b962ad7fae58d840dca7fa8e10014290841f687 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Fri, 23 Apr 2021 14:32:34 +0100 Subject: [PATCH 10/27] Remove const keyword --- r/src/csv.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/r/src/csv.cpp b/r/src/csv.cpp index aa6f1a9b891..423ae61fd6f 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -186,16 +186,16 @@ std::shared_ptr TimestampParser__MakeISO8601() { // [[arrow::export]] void csv__WriteCSV___Table(const std::shared_ptr& table, const std::shared_ptr& write_options, - const arrow::MemoryPool* pool, - const std::shared_ptr* stream) { + std::shared_ptr pool, + std::shared_ptr* stream) { StopIfNotOk(arrow::csv::writer::WriteCSV(*table, *write_options, pool, stream.get())); } // [[arrow::export]] void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr& write_options, - const arrow::MemoryPool* pool, - const std::shared_ptr* stream) { + std::shared_ptr pool, + std::shared_ptr* stream) { StopIfNotOk(arrow::csv::writer::WriteCSV(*record_batch, *write_options, pool, stream.get())); } From 1d70f0b70d81fbaba8a0a2cac3c25db1ec1135c0 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Fri, 23 Apr 2021 15:53:27 +0100 Subject: [PATCH 11/27] Remove unnecessary comma, include relevant header files, refactor cpp --- r/R/csv.R | 2 +- r/src/arrowExports.cpp | 16 ++++++++-------- r/src/csv.cpp | 15 +++++++++------ 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/r/R/csv.R b/r/R/csv.R index e2a08806f9d..9af2ccccff4 100644 --- a/r/R/csv.R +++ b/r/R/csv.R @@ -619,7 +619,7 @@ write_csv_arrow <- function(x, sink, include_header = TRUE, batch_size = 1024L, - memory_pool = default_memory_pool(), + memory_pool = default_memory_pool() ) { # Handle and validate options before touching data batch_size <- as.integer(batch_size) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index eceae4a1ab1..5192611d780 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1312,13 +1312,13 @@ extern "C" SEXP _arrow_TimestampParser__MakeISO8601(){ // csv.cpp #if defined(ARROW_R_WITH_ARROW) -void csv__WriteCSV___Table(const std::shared_ptr& table, const std::shared_ptr write_options, std::shared_ptr& pool, std::shared_ptr& stream); +void csv__WriteCSV___Table(const std::shared_ptr& table, const std::shared_ptr& write_options, arrow::MemoryPool* pool, std::shared_ptr stream); extern "C" SEXP _arrow_csv__WriteCSV___Table(SEXP table_sexp, SEXP write_options_sexp, SEXP pool_sexp, SEXP stream_sexp){ BEGIN_CPP11 arrow::r::Input&>::type table(table_sexp); - arrow::r::Input>::type write_options(write_options_sexp); - arrow::r::Input&>::type pool(pool_sexp); - arrow::r::Input&>::type stream(stream_sexp); + arrow::r::Input&>::type write_options(write_options_sexp); + arrow::r::Input::type pool(pool_sexp); + arrow::r::Input>::type stream(stream_sexp); csv__WriteCSV___Table(table, write_options, pool, stream); return R_NilValue; END_CPP11 @@ -1331,13 +1331,13 @@ extern "C" SEXP _arrow_csv__WriteCSV___Table(SEXP table_sexp, SEXP write_options // csv.cpp #if defined(ARROW_R_WITH_ARROW) -void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr write_options, std::shared_ptr& pool, std::shared_ptr& stream); +void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr& write_options, arrow::MemoryPool* pool, std::shared_ptr stream); extern "C" SEXP _arrow_csv__WriteCSV___RecordBatch(SEXP record_batch_sexp, SEXP write_options_sexp, SEXP pool_sexp, SEXP stream_sexp){ BEGIN_CPP11 arrow::r::Input&>::type record_batch(record_batch_sexp); - arrow::r::Input>::type write_options(write_options_sexp); - arrow::r::Input&>::type pool(pool_sexp); - arrow::r::Input&>::type stream(stream_sexp); + arrow::r::Input&>::type write_options(write_options_sexp); + arrow::r::Input::type pool(pool_sexp); + arrow::r::Input>::type stream(stream_sexp); csv__WriteCSV___RecordBatch(record_batch, write_options, pool, stream); return R_NilValue; END_CPP11 diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 423ae61fd6f..9eab418f3db 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -20,6 +20,9 @@ #if defined(ARROW_R_WITH_ARROW) #include +#include +#include + #include // [[arrow::export]] @@ -186,17 +189,17 @@ std::shared_ptr TimestampParser__MakeISO8601() { // [[arrow::export]] void csv__WriteCSV___Table(const std::shared_ptr& table, const std::shared_ptr& write_options, - std::shared_ptr pool, - std::shared_ptr* stream) { - StopIfNotOk(arrow::csv::writer::WriteCSV(*table, *write_options, pool, stream.get())); + arrow::MemoryPool* pool, + std::shared_ptr stream) { + StopIfNotOk(arrow::csv::WriteCSV(*table, *write_options, pool, stream.get())); } // [[arrow::export]] void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr& write_options, - std::shared_ptr pool, - std::shared_ptr* stream) { - StopIfNotOk(arrow::csv::writer::WriteCSV(*record_batch, *write_options, pool, stream.get())); + arrow::MemoryPool* pool, + std::shared_ptr stream) { + StopIfNotOk(arrow::csv::WriteCSV(*record_batch, *write_options, pool, stream.get())); } From 713075f075a171d857317e4ab489e4d3e50ce2d0 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Mon, 26 Apr 2021 13:07:50 +0100 Subject: [PATCH 12/27] Remove exposed memory pool argument, update NAMESPACE and docs --- r/NAMESPACE | 2 ++ r/R/arrowExports.R | 8 ++++---- r/R/csv.R | 9 +++------ r/man/write_csv_arrow.Rd | 32 ++++++++++++++++++++++++++++++++ r/src/arrowExports.cpp | 26 ++++++++++++-------------- r/src/arrow_types.h | 1 + r/src/csv.cpp | 11 +++++------ 7 files changed, 59 insertions(+), 30 deletions(-) create mode 100644 r/man/write_csv_arrow.Rd diff --git a/r/NAMESPACE b/r/NAMESPACE index 117e3de5c22..b40ccaf2832 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -122,6 +122,7 @@ export(CsvFragmentScanOptions) export(CsvParseOptions) export(CsvReadOptions) export(CsvTableReader) +export(CsvWriteOptions) export(Dataset) export(DatasetFactory) export(DateUnit) @@ -277,6 +278,7 @@ export(unify_schemas) export(utf8) export(value_counts) export(write_arrow) +export(write_csv_arrow) export(write_dataset) export(write_feather) export(write_ipc_stream) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index a6fa410dcdf..d714b2c4267 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -336,12 +336,12 @@ TimestampParser__MakeISO8601 <- function(){ .Call(`_arrow_TimestampParser__MakeISO8601`) } -csv__WriteCSV___Table <- function(table, write_options, pool, stream){ - invisible(.Call(`_arrow_csv__WriteCSV___Table`, table, write_options, pool, stream)) +csv__WriteCSV___Table <- function(table, write_options, stream){ + invisible(.Call(`_arrow_csv__WriteCSV___Table`, table, write_options, stream)) } -csv__WriteCSV___RecordBatch <- function(record_batch, write_options, pool, stream){ - invisible(.Call(`_arrow_csv__WriteCSV___RecordBatch`, record_batch, write_options, pool, stream)) +csv__WriteCSV___RecordBatch <- function(record_batch, write_options, stream){ + invisible(.Call(`_arrow_csv__WriteCSV___RecordBatch`, record_batch, write_options, stream)) } dataset___Dataset__NewScan <- function(ds){ diff --git a/r/R/csv.R b/r/R/csv.R index 9af2ccccff4..93f738f6e56 100644 --- a/r/R/csv.R +++ b/r/R/csv.R @@ -618,8 +618,7 @@ readr_to_csv_convert_options <- function(na, write_csv_arrow <- function(x, sink, include_header = TRUE, - batch_size = 1024L, - memory_pool = default_memory_pool() + batch_size = 1024L ) { # Handle and validate options before touching data batch_size <- as.integer(batch_size) @@ -635,17 +634,15 @@ write_csv_arrow <- function(x, } assert_is(x, c("Table", "RecordBatch")) - assert_is(memory_pool, "MemoryPool") - if (!inherits(sink, "OutputStream")) { sink <- make_output_stream(sink) on.exit(sink$close()) } if(inherits(x, "RecordBatch")){ - csv__WriteCSV___RecordBatch(x, write_options, memory_pool, sink) + csv__WriteCSV___RecordBatch(x, write_options, sink) } else if(inherits(x, "Table")){ - csv__WriteCSV___Table(x, write_options, memory_pool, sink) + csv__WriteCSV___Table(x, write_options, sink) } invisible(x_out) diff --git a/r/man/write_csv_arrow.Rd b/r/man/write_csv_arrow.Rd new file mode 100644 index 00000000000..f583e487e1f --- /dev/null +++ b/r/man/write_csv_arrow.Rd @@ -0,0 +1,32 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/csv.R +\name{write_csv_arrow} +\alias{write_csv_arrow} +\title{Write CSV file to disk} +\usage{ +write_csv_arrow(x, sink, include_header = TRUE, batch_size = 1024L) +} +\arguments{ +\item{x}{\code{data.frame}, \link{RecordBatch}, or \link{Table}} + +\item{sink}{A string file path, URI, or \link{OutputStream}, or path in a file +system (\code{SubTreeFileSystem})} + +\item{include_header}{Whether to write an initial header line with column names} + +\item{batch_size}{Maximum number of rows processed at a time. Default is 1024} +} +\value{ +The input \code{x}, invisibly. Note that if \code{sink} is an \link{OutputStream}, +the stream will be left open. +} +\description{ +Write CSV file to disk +} +\examples{ +\donttest{ +tf <- tempfile() +on.exit(unlink(tf)) +write_csv_arrow(mtcars, tf) +} +} diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 5192611d780..2889f1709ef 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1312,38 +1312,36 @@ extern "C" SEXP _arrow_TimestampParser__MakeISO8601(){ // csv.cpp #if defined(ARROW_R_WITH_ARROW) -void csv__WriteCSV___Table(const std::shared_ptr& table, const std::shared_ptr& write_options, arrow::MemoryPool* pool, std::shared_ptr stream); -extern "C" SEXP _arrow_csv__WriteCSV___Table(SEXP table_sexp, SEXP write_options_sexp, SEXP pool_sexp, SEXP stream_sexp){ +void csv__WriteCSV___Table(const std::shared_ptr& table, const std::shared_ptr& write_options, const std::shared_ptr& stream); +extern "C" SEXP _arrow_csv__WriteCSV___Table(SEXP table_sexp, SEXP write_options_sexp, SEXP stream_sexp){ BEGIN_CPP11 arrow::r::Input&>::type table(table_sexp); arrow::r::Input&>::type write_options(write_options_sexp); - arrow::r::Input::type pool(pool_sexp); - arrow::r::Input>::type stream(stream_sexp); - csv__WriteCSV___Table(table, write_options, pool, stream); + arrow::r::Input&>::type stream(stream_sexp); + csv__WriteCSV___Table(table, write_options, stream); return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_csv__WriteCSV___Table(SEXP table_sexp, SEXP write_options_sexp, SEXP pool_sexp, SEXP stream_sexp){ +extern "C" SEXP _arrow_csv__WriteCSV___Table(SEXP table_sexp, SEXP write_options_sexp, SEXP stream_sexp){ Rf_error("Cannot call csv__WriteCSV___Table(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // csv.cpp #if defined(ARROW_R_WITH_ARROW) -void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr& write_options, arrow::MemoryPool* pool, std::shared_ptr stream); -extern "C" SEXP _arrow_csv__WriteCSV___RecordBatch(SEXP record_batch_sexp, SEXP write_options_sexp, SEXP pool_sexp, SEXP stream_sexp){ +void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr& write_options, const std::shared_ptr& stream); +extern "C" SEXP _arrow_csv__WriteCSV___RecordBatch(SEXP record_batch_sexp, SEXP write_options_sexp, SEXP stream_sexp){ BEGIN_CPP11 arrow::r::Input&>::type record_batch(record_batch_sexp); arrow::r::Input&>::type write_options(write_options_sexp); - arrow::r::Input::type pool(pool_sexp); - arrow::r::Input>::type stream(stream_sexp); - csv__WriteCSV___RecordBatch(record_batch, write_options, pool, stream); + arrow::r::Input&>::type stream(stream_sexp); + csv__WriteCSV___RecordBatch(record_batch, write_options, stream); return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_csv__WriteCSV___RecordBatch(SEXP record_batch_sexp, SEXP write_options_sexp, SEXP pool_sexp, SEXP stream_sexp){ +extern "C" SEXP _arrow_csv__WriteCSV___RecordBatch(SEXP record_batch_sexp, SEXP write_options_sexp, SEXP stream_sexp){ Rf_error("Cannot call csv__WriteCSV___RecordBatch(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -6741,8 +6739,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_TimestampParser__format", (DL_FUNC) &_arrow_TimestampParser__format, 1}, { "_arrow_TimestampParser__MakeStrptime", (DL_FUNC) &_arrow_TimestampParser__MakeStrptime, 1}, { "_arrow_TimestampParser__MakeISO8601", (DL_FUNC) &_arrow_TimestampParser__MakeISO8601, 0}, - { "_arrow_csv__WriteCSV___Table", (DL_FUNC) &_arrow_csv__WriteCSV___Table, 4}, - { "_arrow_csv__WriteCSV___RecordBatch", (DL_FUNC) &_arrow_csv__WriteCSV___RecordBatch, 4}, + { "_arrow_csv__WriteCSV___Table", (DL_FUNC) &_arrow_csv__WriteCSV___Table, 3}, + { "_arrow_csv__WriteCSV___RecordBatch", (DL_FUNC) &_arrow_csv__WriteCSV___RecordBatch, 3}, { "_arrow_dataset___Dataset__NewScan", (DL_FUNC) &_arrow_dataset___Dataset__NewScan, 1}, { "_arrow_dataset___Dataset__schema", (DL_FUNC) &_arrow_dataset___Dataset__schema, 1}, { "_arrow_dataset___Dataset__type_name", (DL_FUNC) &_arrow_dataset___Dataset__type_name, 1}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index b94ab764729..b146b7ecccc 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -178,6 +178,7 @@ R6_CLASS_NAME(arrow::csv::ReadOptions, "CsvReadOptions"); R6_CLASS_NAME(arrow::csv::ParseOptions, "CsvParseOptions"); R6_CLASS_NAME(arrow::csv::ConvertOptions, "CsvConvertOptions"); R6_CLASS_NAME(arrow::csv::TableReader, "CsvTableReader"); +R6_CLASS_NAME(arrow::csv::WriteOptions, "CsvWriteOptions"); #if defined(ARROW_R_WITH_PARQUET) R6_CLASS_NAME(parquet::ArrowReaderProperties, "ParquetArrowReaderProperties"); diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 9eab418f3db..a3794b77c84 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -189,17 +189,16 @@ std::shared_ptr TimestampParser__MakeISO8601() { // [[arrow::export]] void csv__WriteCSV___Table(const std::shared_ptr& table, const std::shared_ptr& write_options, - arrow::MemoryPool* pool, - std::shared_ptr stream) { - StopIfNotOk(arrow::csv::WriteCSV(*table, *write_options, pool, stream.get())); + const std::shared_ptr& stream) { + + StopIfNotOk(arrow::csv::WriteCSV(*table, *write_options, arrow::default_memory_pool(), stream.get())); } // [[arrow::export]] void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr& write_options, - arrow::MemoryPool* pool, - std::shared_ptr stream) { - StopIfNotOk(arrow::csv::WriteCSV(*record_batch, *write_options, pool, stream.get())); + const std::shared_ptr& stream) { + StopIfNotOk(arrow::csv::WriteCSV(*record_batch, *write_options, arrow::default_memory_pool(), stream.get())); } From 86d83d88385167a45e679a72c82595eaddc04a35 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Mon, 26 Apr 2021 14:25:21 +0100 Subject: [PATCH 13/27] Use gc_memory_pool() instead of arrow::default_memory_pool() to prevent memory issues --- r/src/csv.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/r/src/csv.cpp b/r/src/csv.cpp index a3794b77c84..673da882aca 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -191,14 +191,14 @@ void csv__WriteCSV___Table(const std::shared_ptr& table, const std::shared_ptr& write_options, const std::shared_ptr& stream) { - StopIfNotOk(arrow::csv::WriteCSV(*table, *write_options, arrow::default_memory_pool(), stream.get())); + StopIfNotOk(arrow::csv::WriteCSV(*table, *write_options, gc_memory_pool()(), stream.get())); } // [[arrow::export]] void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr& write_options, const std::shared_ptr& stream) { - StopIfNotOk(arrow::csv::WriteCSV(*record_batch, *write_options, arrow::default_memory_pool(), stream.get())); + StopIfNotOk(arrow::csv::WriteCSV(*record_batch, *write_options, gc_memory_pool()(), stream.get())); } From 8de7c3bd0abe06b8c6362de39c5af00470aa6e37 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Mon, 26 Apr 2021 14:28:28 +0100 Subject: [PATCH 14/27] Typo fix --- r/src/csv.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 673da882aca..b1a2047ce76 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -191,14 +191,14 @@ void csv__WriteCSV___Table(const std::shared_ptr& table, const std::shared_ptr& write_options, const std::shared_ptr& stream) { - StopIfNotOk(arrow::csv::WriteCSV(*table, *write_options, gc_memory_pool()(), stream.get())); + StopIfNotOk(arrow::csv::WriteCSV(*table, *write_options, gc_memory_pool(), stream.get())); } // [[arrow::export]] void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr& write_options, const std::shared_ptr& stream) { - StopIfNotOk(arrow::csv::WriteCSV(*record_batch, *write_options, gc_memory_pool()(), stream.get())); + StopIfNotOk(arrow::csv::WriteCSV(*record_batch, *write_options, gc_memory_pool(), stream.get())); } From 3239575b959a4603e909a73ed47ca5e7544ca9e6 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Tue, 27 Apr 2021 15:12:06 +0100 Subject: [PATCH 15/27] Skip tests that include writing date columns --- r/tests/testthat/test-csv.R | 44 +++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index 4fcb2077e70..85860f804c1 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -17,12 +17,22 @@ # Not all types round trip via CSV 100% identical by default tbl <- example_data[, c("dbl", "lgl", "false", "chr")] +tbl_no_dates <- tbl # Add a date to test its parsing tbl$date <- Sys.Date() + 1:10 csv_file <- tempfile() test_that("Write a CSV file with header", { + tbl_out <- write_csv_arrow(tbl_no_dates, csv_file) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out, tbl_no_dates) + + tbl_in <- read_csv_arrow(csv_file) + expect_identical(tbl_in, tbl_no_dates) + + skip("Doesn't yet work with date columns due to ARROW-12540") + tbl_out <- write_csv_arrow(tbl, csv_file) expect_true(file.exists(csv_file)) expect_identical(tbl_out, tbl) @@ -33,6 +43,19 @@ test_that("Write a CSV file with header", { test_that("Write a CSV file with no header", { + + tbl_out <- write_csv_arrow(tbl_no_dates, csv_file, include_header = FALSE) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out, tbl_no_dates) + tbl_in <- read_csv_arrow(csv_file, col_names = FALSE) + + tbl_expected <- tbl_no_dates + names(tbl_expected) <- c("f0", "f1", "f2", "f3") + + expect_identical(tbl_in, tbl_expected) + + skip("Doesn't yet work with date columns due to ARROW-12540") + tbl_out <- write_csv_arrow(tbl, csv_file, include_header = FALSE) expect_true(file.exists(csv_file)) expect_identical(tbl_out, tbl) @@ -45,6 +68,27 @@ test_that("Write a CSV file with no header", { }) test_that("Write a CSV file with different batch sizes", { + + tbl_out1 <- write_csv_arrow(tbl_no_dates, csv_file, batch_size = 1) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out1, tbl_no_dates) + tbl_in1 <- read_csv_arrow(csv_file) + expect_identical(tbl_in1, tbl_no_dates) + + tbl_out2 <- write_csv_arrow(tbl_no_dates, csv_file, batch_size = 2) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out2, tbl_no_dates) + tbl_in2 <- read_csv_arrow(csv_file) + expect_identical(tbl_in2, tbl_no_dates) + + tbl_out3 <- write_csv_arrow(tbl_no_dates, csv_file, batch_size = 12) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out3, tbl_no_dates) + tbl_in3 <- read_csv_arrow(csv_file) + expect_identical(tbl_in3, tbl_no_dates) + + skip("Doesn't yet work with date columns due to ARROW-12540") + tbl_out1 <- write_csv_arrow(tbl, csv_file, batch_size = 1) expect_true(file.exists(csv_file)) expect_identical(tbl_out1, tbl) From 63a2f98474961fde5f87fdfdd1fcd47262b4f69d Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Tue, 27 Apr 2021 15:17:15 +0100 Subject: [PATCH 16/27] Change whitespace to force CI --- r/R/csv.R | 1 + 1 file changed, 1 insertion(+) diff --git a/r/R/csv.R b/r/R/csv.R index 93f738f6e56..a60c4559ca1 100644 --- a/r/R/csv.R +++ b/r/R/csv.R @@ -632,6 +632,7 @@ write_csv_arrow <- function(x, if (is.data.frame(x)) { x <- Table$create(x) } + assert_is(x, c("Table", "RecordBatch")) if (!inherits(sink, "OutputStream")) { From 96e9e2e8c0cb3118cb5b864eee4a099f0145a253 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Tue, 27 Apr 2021 15:21:04 +0100 Subject: [PATCH 17/27] Change R C++ function format --- r/R/arrowExports.R | 8 ++++---- r/R/csv.R | 4 ++-- r/src/arrowExports.cpp | 24 ++++++++++++------------ r/src/csv.cpp | 4 ++-- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index d714b2c4267..f78318cd555 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -336,12 +336,12 @@ TimestampParser__MakeISO8601 <- function(){ .Call(`_arrow_TimestampParser__MakeISO8601`) } -csv__WriteCSV___Table <- function(table, write_options, stream){ - invisible(.Call(`_arrow_csv__WriteCSV___Table`, table, write_options, stream)) +csv___WriteCSV__Table <- function(table, write_options, stream){ + invisible(.Call(`_arrow_csv___WriteCSV__Table`, table, write_options, stream)) } -csv__WriteCSV___RecordBatch <- function(record_batch, write_options, stream){ - invisible(.Call(`_arrow_csv__WriteCSV___RecordBatch`, record_batch, write_options, stream)) +csv___WriteCSV__RecordBatch <- function(record_batch, write_options, stream){ + invisible(.Call(`_arrow_csv___WriteCSV__RecordBatch`, record_batch, write_options, stream)) } dataset___Dataset__NewScan <- function(ds){ diff --git a/r/R/csv.R b/r/R/csv.R index a60c4559ca1..bfe0c2f2787 100644 --- a/r/R/csv.R +++ b/r/R/csv.R @@ -641,9 +641,9 @@ write_csv_arrow <- function(x, } if(inherits(x, "RecordBatch")){ - csv__WriteCSV___RecordBatch(x, write_options, sink) + csv___WriteCSV__RecordBatch(x, write_options, sink) } else if(inherits(x, "Table")){ - csv__WriteCSV___Table(x, write_options, sink) + csv___WriteCSV__Table(x, write_options, sink) } invisible(x_out) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 2889f1709ef..80fbf3eeaa3 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1312,37 +1312,37 @@ extern "C" SEXP _arrow_TimestampParser__MakeISO8601(){ // csv.cpp #if defined(ARROW_R_WITH_ARROW) -void csv__WriteCSV___Table(const std::shared_ptr& table, const std::shared_ptr& write_options, const std::shared_ptr& stream); -extern "C" SEXP _arrow_csv__WriteCSV___Table(SEXP table_sexp, SEXP write_options_sexp, SEXP stream_sexp){ +void csv___WriteCSV__Table(const std::shared_ptr& table, const std::shared_ptr& write_options, const std::shared_ptr& stream); +extern "C" SEXP _arrow_csv___WriteCSV__Table(SEXP table_sexp, SEXP write_options_sexp, SEXP stream_sexp){ BEGIN_CPP11 arrow::r::Input&>::type table(table_sexp); arrow::r::Input&>::type write_options(write_options_sexp); arrow::r::Input&>::type stream(stream_sexp); - csv__WriteCSV___Table(table, write_options, stream); + csv___WriteCSV__Table(table, write_options, stream); return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_csv__WriteCSV___Table(SEXP table_sexp, SEXP write_options_sexp, SEXP stream_sexp){ - Rf_error("Cannot call csv__WriteCSV___Table(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +extern "C" SEXP _arrow_csv___WriteCSV__Table(SEXP table_sexp, SEXP write_options_sexp, SEXP stream_sexp){ + Rf_error("Cannot call csv___WriteCSV__Table(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif // csv.cpp #if defined(ARROW_R_WITH_ARROW) -void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr& write_options, const std::shared_ptr& stream); -extern "C" SEXP _arrow_csv__WriteCSV___RecordBatch(SEXP record_batch_sexp, SEXP write_options_sexp, SEXP stream_sexp){ +void csv___WriteCSV__RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr& write_options, const std::shared_ptr& stream); +extern "C" SEXP _arrow_csv___WriteCSV__RecordBatch(SEXP record_batch_sexp, SEXP write_options_sexp, SEXP stream_sexp){ BEGIN_CPP11 arrow::r::Input&>::type record_batch(record_batch_sexp); arrow::r::Input&>::type write_options(write_options_sexp); arrow::r::Input&>::type stream(stream_sexp); - csv__WriteCSV___RecordBatch(record_batch, write_options, stream); + csv___WriteCSV__RecordBatch(record_batch, write_options, stream); return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_csv__WriteCSV___RecordBatch(SEXP record_batch_sexp, SEXP write_options_sexp, SEXP stream_sexp){ - Rf_error("Cannot call csv__WriteCSV___RecordBatch(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +extern "C" SEXP _arrow_csv___WriteCSV__RecordBatch(SEXP record_batch_sexp, SEXP write_options_sexp, SEXP stream_sexp){ + Rf_error("Cannot call csv___WriteCSV__RecordBatch(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -6739,8 +6739,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_TimestampParser__format", (DL_FUNC) &_arrow_TimestampParser__format, 1}, { "_arrow_TimestampParser__MakeStrptime", (DL_FUNC) &_arrow_TimestampParser__MakeStrptime, 1}, { "_arrow_TimestampParser__MakeISO8601", (DL_FUNC) &_arrow_TimestampParser__MakeISO8601, 0}, - { "_arrow_csv__WriteCSV___Table", (DL_FUNC) &_arrow_csv__WriteCSV___Table, 3}, - { "_arrow_csv__WriteCSV___RecordBatch", (DL_FUNC) &_arrow_csv__WriteCSV___RecordBatch, 3}, + { "_arrow_csv___WriteCSV__Table", (DL_FUNC) &_arrow_csv___WriteCSV__Table, 3}, + { "_arrow_csv___WriteCSV__RecordBatch", (DL_FUNC) &_arrow_csv___WriteCSV__RecordBatch, 3}, { "_arrow_dataset___Dataset__NewScan", (DL_FUNC) &_arrow_dataset___Dataset__NewScan, 1}, { "_arrow_dataset___Dataset__schema", (DL_FUNC) &_arrow_dataset___Dataset__schema, 1}, { "_arrow_dataset___Dataset__type_name", (DL_FUNC) &_arrow_dataset___Dataset__type_name, 1}, diff --git a/r/src/csv.cpp b/r/src/csv.cpp index b1a2047ce76..5157f60312b 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -187,7 +187,7 @@ std::shared_ptr TimestampParser__MakeISO8601() { } // [[arrow::export]] -void csv__WriteCSV___Table(const std::shared_ptr& table, +void csv___WriteCSV__Table(const std::shared_ptr& table, const std::shared_ptr& write_options, const std::shared_ptr& stream) { @@ -195,7 +195,7 @@ void csv__WriteCSV___Table(const std::shared_ptr& table, } // [[arrow::export]] -void csv__WriteCSV___RecordBatch(const std::shared_ptr& record_batch, +void csv___WriteCSV__RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr& write_options, const std::shared_ptr& stream) { StopIfNotOk(arrow::csv::WriteCSV(*record_batch, *write_options, gc_memory_pool(), stream.get())); From fcc6f5f073e266b86887bad769d5574ba5cfafae Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Tue, 27 Apr 2021 15:22:55 +0100 Subject: [PATCH 18/27] Run linter on R C++ --- r/src/csv.cpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 5157f60312b..3df5db87efa 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -28,7 +28,8 @@ // [[arrow::export]] std::shared_ptr csv___WriteOptions__initialize( cpp11::list options) { - auto res = std::make_shared(arrow::csv::WriteOptions::Defaults()); + auto res = + std::make_shared(arrow::csv::WriteOptions::Defaults()); res->include_header = cpp11::as_cpp(options["include_header"]); res->batch_size = cpp11::as_cpp(options["batch_size"]); return res; @@ -190,16 +191,17 @@ std::shared_ptr TimestampParser__MakeISO8601() { void csv___WriteCSV__Table(const std::shared_ptr& table, const std::shared_ptr& write_options, const std::shared_ptr& stream) { - - StopIfNotOk(arrow::csv::WriteCSV(*table, *write_options, gc_memory_pool(), stream.get())); + StopIfNotOk( + arrow::csv::WriteCSV(*table, *write_options, gc_memory_pool(), stream.get())); } // [[arrow::export]] -void csv___WriteCSV__RecordBatch(const std::shared_ptr& record_batch, - const std::shared_ptr& write_options, - const std::shared_ptr& stream) { - StopIfNotOk(arrow::csv::WriteCSV(*record_batch, *write_options, gc_memory_pool(), stream.get())); +void csv___WriteCSV__RecordBatch( + const std::shared_ptr& record_batch, + const std::shared_ptr& write_options, + const std::shared_ptr& stream) { + StopIfNotOk(arrow::csv::WriteCSV(*record_batch, *write_options, gc_memory_pool(), + stream.get())); } - #endif From ee384644077792fd05e91e0dc32ade8b848a4a26 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Tue, 27 Apr 2021 16:09:02 +0100 Subject: [PATCH 19/27] Update docs --- r/R/csv.R | 17 +++++++++++++++++ r/man/CsvWriteOptions.Rd | 22 ++++++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 r/man/CsvWriteOptions.Rd diff --git a/r/R/csv.R b/r/R/csv.R index bfe0c2f2787..6a14aaa3dbd 100644 --- a/r/R/csv.R +++ b/r/R/csv.R @@ -408,6 +408,23 @@ CsvReadOptions$create <- function(use_threads = option_use_threads(), ) } +#' @title File writer options +#' @rdname CsvWriteOptions +#' @name CsvWriteOptions +#' @docType class +#' @usage NULL +#' @format NULL +#' @description `CsvReadOptions`, `CsvParseOptions`, `CsvConvertOptions`, +#' `JsonReadOptions`, `JsonParseOptions`, and `TimestampParser` are containers for various +#' file reading options. See their usage in [read_csv_arrow()] and +#' [read_json_arrow()], respectively. +#' +#' @section Factory: +#' +#' The `CsvWriteOptions$create()` factory method takes the following arguments: +#' - `include_header` Whether to write an initial header line with column names +#' - `batch_size` Maximum number of rows processed at a time. Default is 1024 +#' @docType class #' @export CsvWriteOptions <- R6Class("CsvWriteOptions", inherit = ArrowObject) CsvWriteOptions$create <- function(include_header = TRUE, batch_size = 1024L){ diff --git a/r/man/CsvWriteOptions.Rd b/r/man/CsvWriteOptions.Rd new file mode 100644 index 00000000000..e83126c9f9a --- /dev/null +++ b/r/man/CsvWriteOptions.Rd @@ -0,0 +1,22 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/csv.R +\docType{class} +\name{CsvWriteOptions} +\alias{CsvWriteOptions} +\title{File writer options} +\description{ +\code{CsvReadOptions}, \code{CsvParseOptions}, \code{CsvConvertOptions}, +\code{JsonReadOptions}, \code{JsonParseOptions}, and \code{TimestampParser} are containers for various +file reading options. See their usage in \code{\link[=read_csv_arrow]{read_csv_arrow()}} and +\code{\link[=read_json_arrow]{read_json_arrow()}}, respectively. +} +\section{Factory}{ + + +The \code{CsvWriteOptions$create()} factory method takes the following arguments: +\itemize{ +\item \code{include_header} Whether to write an initial header line with column names +\item \code{batch_size} Maximum number of rows processed at a time. Default is 1024 +} +} + From 6b6914c2d3d81a34aea52fc9e88282f3cc790d34 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Wed, 28 Apr 2021 08:47:02 +0100 Subject: [PATCH 20/27] Add write_csv_arrrow to _pkgdown.yml --- r/_pkgdown.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/r/_pkgdown.yml b/r/_pkgdown.yml index e744f1e578d..b2266cde758 100644 --- a/r/_pkgdown.yml +++ b/r/_pkgdown.yml @@ -98,6 +98,7 @@ reference: - write_ipc_stream - write_to_raw - write_parquet + - write_csv_arrow - title: C++ reader/writer interface contents: - ParquetFileReader From 93338cc93862722dca1be6a18cbf10c77d854b6d Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Wed, 28 Apr 2021 19:57:02 +0100 Subject: [PATCH 21/27] Inconsequential grammar change to trigger CI --- r/R/csv.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/R/csv.R b/r/R/csv.R index 6a14aaa3dbd..f87a5e567a1 100644 --- a/r/R/csv.R +++ b/r/R/csv.R @@ -620,7 +620,7 @@ readr_to_csv_convert_options <- function(na, #' @param sink A string file path, URI, or [OutputStream], or path in a file #' system (`SubTreeFileSystem`) #' @param include_header Whether to write an initial header line with column names -#' @param batch_size Maximum number of rows processed at a time. Default is 1024 +#' @param batch_size Maximum number of rows processed at a time. Default is 1024. #' #' @return The input `x`, invisibly. Note that if `sink` is an [OutputStream], #' the stream will be left open. From 77222066a0c3db4eeb3e68f41fce9eece2e7d6e8 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Fri, 30 Apr 2021 18:39:18 +0100 Subject: [PATCH 22/27] Remove extra no-dates tests --- r/tests/testthat/test-csv.R | 33 +-------------------------------- 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index 85860f804c1..39e276d0582 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -53,18 +53,7 @@ test_that("Write a CSV file with no header", { names(tbl_expected) <- c("f0", "f1", "f2", "f3") expect_identical(tbl_in, tbl_expected) - - skip("Doesn't yet work with date columns due to ARROW-12540") - - tbl_out <- write_csv_arrow(tbl, csv_file, include_header = FALSE) - expect_true(file.exists(csv_file)) - expect_identical(tbl_out, tbl) - tbl_in <- read_csv_arrow(csv_file, col_names = FALSE) - - tbl_expected <- tbl - names(tbl_expected) <- c("f0", "f1", "f2", "f3", "f4") - - expect_identical(tbl_in, tbl_expected) + }) test_that("Write a CSV file with different batch sizes", { @@ -87,26 +76,6 @@ test_that("Write a CSV file with different batch sizes", { tbl_in3 <- read_csv_arrow(csv_file) expect_identical(tbl_in3, tbl_no_dates) - skip("Doesn't yet work with date columns due to ARROW-12540") - - tbl_out1 <- write_csv_arrow(tbl, csv_file, batch_size = 1) - expect_true(file.exists(csv_file)) - expect_identical(tbl_out1, tbl) - tbl_in1 <- read_csv_arrow(csv_file) - expect_identical(tbl_in1, tbl) - - tbl_out2 <- write_csv_arrow(tbl, csv_file, batch_size = 2) - expect_true(file.exists(csv_file)) - expect_identical(tbl_out2, tbl) - tbl_in2 <- read_csv_arrow(csv_file) - expect_identical(tbl_in2, tbl) - - tbl_out3 <- write_csv_arrow(tbl, csv_file, batch_size = 12) - expect_true(file.exists(csv_file)) - expect_identical(tbl_out3, tbl) - tbl_in3 <- read_csv_arrow(csv_file) - expect_identical(tbl_in3, tbl) - }) From 0d62fa1405be69437dd530f3cbec033ce35aed68 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Fri, 30 Apr 2021 18:39:40 +0100 Subject: [PATCH 23/27] Move docs for CsvWriteOptions --- r/R/csv.R | 38 +++++++++++--------------------------- 1 file changed, 11 insertions(+), 27 deletions(-) diff --git a/r/R/csv.R b/r/R/csv.R index f87a5e567a1..5962793927f 100644 --- a/r/R/csv.R +++ b/r/R/csv.R @@ -381,6 +381,11 @@ CsvTableReader$create <- function(file, #' `TimestampParser$create()` takes an optional `format` string argument. #' See [`strptime()`][base::strptime()] for example syntax. #' The default is to use an ISO-8601 format parser. +#' +#' The `CsvWriteOptions$create()` factory method takes the following arguments: +#' - `include_header` Whether to write an initial header line with column names +#' - `batch_size` Maximum number of rows processed at a time. Default is 1024. +#' #' @section Active bindings: #' #' - `column_names`: from `CsvReadOptions` @@ -408,23 +413,7 @@ CsvReadOptions$create <- function(use_threads = option_use_threads(), ) } -#' @title File writer options -#' @rdname CsvWriteOptions -#' @name CsvWriteOptions -#' @docType class -#' @usage NULL -#' @format NULL -#' @description `CsvReadOptions`, `CsvParseOptions`, `CsvConvertOptions`, -#' `JsonReadOptions`, `JsonParseOptions`, and `TimestampParser` are containers for various -#' file reading options. See their usage in [read_csv_arrow()] and -#' [read_json_arrow()], respectively. -#' -#' @section Factory: -#' -#' The `CsvWriteOptions$create()` factory method takes the following arguments: -#' - `include_header` Whether to write an initial header line with column names -#' - `batch_size` Maximum number of rows processed at a time. Default is 1024 -#' @docType class +#' @rdname CsvReadOptions #' @export CsvWriteOptions <- R6Class("CsvWriteOptions", inherit = ArrowObject) CsvWriteOptions$create <- function(include_header = TRUE, batch_size = 1024L){ @@ -633,24 +622,19 @@ readr_to_csv_convert_options <- function(na, #' } #' @include arrow-package.R write_csv_arrow <- function(x, - sink, - include_header = TRUE, - batch_size = 1024L - ) { + sink, + include_header = TRUE, + batch_size = 1024L) { # Handle and validate options before touching data batch_size <- as.integer(batch_size) - assert_that(batch_size > 0) - assert_that(length(include_header) == 1) - assert_that(is.logical(include_header)) - - write_options = CsvWriteOptions$create(include_header, batch_size) + write_options <- CsvWriteOptions$create(include_header, batch_size) x_out <- x if (is.data.frame(x)) { x <- Table$create(x) } - assert_is(x, c("Table", "RecordBatch")) + assert_is(x, "ArrowTabular") if (!inherits(sink, "OutputStream")) { sink <- make_output_stream(sink) From faab7ffffb58346b312a378a33dcf437183da6e9 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Mon, 3 May 2021 13:33:24 +0100 Subject: [PATCH 24/27] Move tests to bottom of file --- r/tests/testthat/test-csv.R | 113 ++++++++++++++++++------------------ 1 file changed, 57 insertions(+), 56 deletions(-) diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index 39e276d0582..e0ccd91f1f9 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -23,62 +23,6 @@ tbl$date <- Sys.Date() + 1:10 csv_file <- tempfile() -test_that("Write a CSV file with header", { - tbl_out <- write_csv_arrow(tbl_no_dates, csv_file) - expect_true(file.exists(csv_file)) - expect_identical(tbl_out, tbl_no_dates) - - tbl_in <- read_csv_arrow(csv_file) - expect_identical(tbl_in, tbl_no_dates) - - skip("Doesn't yet work with date columns due to ARROW-12540") - - tbl_out <- write_csv_arrow(tbl, csv_file) - expect_true(file.exists(csv_file)) - expect_identical(tbl_out, tbl) - - tbl_in <- read_csv_arrow(csv_file) - expect_identical(tbl_in, tbl) -}) - - -test_that("Write a CSV file with no header", { - - tbl_out <- write_csv_arrow(tbl_no_dates, csv_file, include_header = FALSE) - expect_true(file.exists(csv_file)) - expect_identical(tbl_out, tbl_no_dates) - tbl_in <- read_csv_arrow(csv_file, col_names = FALSE) - - tbl_expected <- tbl_no_dates - names(tbl_expected) <- c("f0", "f1", "f2", "f3") - - expect_identical(tbl_in, tbl_expected) - -}) - -test_that("Write a CSV file with different batch sizes", { - - tbl_out1 <- write_csv_arrow(tbl_no_dates, csv_file, batch_size = 1) - expect_true(file.exists(csv_file)) - expect_identical(tbl_out1, tbl_no_dates) - tbl_in1 <- read_csv_arrow(csv_file) - expect_identical(tbl_in1, tbl_no_dates) - - tbl_out2 <- write_csv_arrow(tbl_no_dates, csv_file, batch_size = 2) - expect_true(file.exists(csv_file)) - expect_identical(tbl_out2, tbl_no_dates) - tbl_in2 <- read_csv_arrow(csv_file) - expect_identical(tbl_in2, tbl_no_dates) - - tbl_out3 <- write_csv_arrow(tbl_no_dates, csv_file, batch_size = 12) - expect_true(file.exists(csv_file)) - expect_identical(tbl_out3, tbl_no_dates) - tbl_in3 <- read_csv_arrow(csv_file) - expect_identical(tbl_in3, tbl_no_dates) - -}) - - test_that("Can read csv file", { tf <- tempfile() on.exit(unlink(tf)) @@ -313,3 +257,60 @@ test_that("Mix of guessing and declaring types", { df <- read_csv_arrow(tf, col_types = "d-?c", col_names = cols, skip = 1) expect_identical(df, tbl[, c("dbl", "false", "chr")]) }) + + +test_that("Write a CSV file with header", { + tbl_out <- write_csv_arrow(tbl_no_dates, csv_file) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out, tbl_no_dates) + + tbl_in <- read_csv_arrow(csv_file) + expect_identical(tbl_in, tbl_no_dates) + + skip("Doesn't yet work with date columns due to ARROW-12540") + + tbl_out <- write_csv_arrow(tbl, csv_file) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out, tbl) + + tbl_in <- read_csv_arrow(csv_file) + expect_identical(tbl_in, tbl) +}) + + +test_that("Write a CSV file with no header", { + + tbl_out <- write_csv_arrow(tbl_no_dates, csv_file, include_header = FALSE) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out, tbl_no_dates) + tbl_in <- read_csv_arrow(csv_file, col_names = FALSE) + + tbl_expected <- tbl_no_dates + names(tbl_expected) <- c("f0", "f1", "f2", "f3") + + expect_identical(tbl_in, tbl_expected) + +}) + +test_that("Write a CSV file with different batch sizes", { + + tbl_out1 <- write_csv_arrow(tbl_no_dates, csv_file, batch_size = 1) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out1, tbl_no_dates) + tbl_in1 <- read_csv_arrow(csv_file) + expect_identical(tbl_in1, tbl_no_dates) + + tbl_out2 <- write_csv_arrow(tbl_no_dates, csv_file, batch_size = 2) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out2, tbl_no_dates) + tbl_in2 <- read_csv_arrow(csv_file) + expect_identical(tbl_in2, tbl_no_dates) + + tbl_out3 <- write_csv_arrow(tbl_no_dates, csv_file, batch_size = 12) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out3, tbl_no_dates) + tbl_in3 <- read_csv_arrow(csv_file) + expect_identical(tbl_in3, tbl_no_dates) + +}) + From 8ff781fd6b88f979252c951bea2aa157fe04d2c2 Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Mon, 3 May 2021 14:17:53 +0100 Subject: [PATCH 25/27] Add tests for invalid inputs --- r/R/csv.R | 2 ++ r/tests/testthat/test-csv.R | 15 +++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/r/R/csv.R b/r/R/csv.R index 5962793927f..deab4b09169 100644 --- a/r/R/csv.R +++ b/r/R/csv.R @@ -627,6 +627,8 @@ write_csv_arrow <- function(x, batch_size = 1024L) { # Handle and validate options before touching data batch_size <- as.integer(batch_size) + assert_that(batch_size > 0) + write_options <- CsvWriteOptions$create(include_header, batch_size) x_out <- x diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index e0ccd91f1f9..09dba4462d5 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -314,3 +314,18 @@ test_that("Write a CSV file with different batch sizes", { }) +test_that("Write a CSV file with invalid input type", { + expect_error( + write_csv_arrow(Array$create(1:5), csv_file), + regexp = 'x must be a "ArrowTabular"' + ) +}) + + +test_that("Write a CSV file with invalid batch size", { + expect_error( + write_csv_arrow(tbl_no_dates, csv_file, batch_size = -1), + regexp = 'batch_size not greater than 0' + ) +}) + From a421243165a5ca15c0fe92b240d137f3874eb3cc Mon Sep 17 00:00:00 2001 From: Nic Crane Date: Mon, 3 May 2021 15:03:16 +0100 Subject: [PATCH 26/27] Remove extra whitespace --- r/tests/testthat/test-csv.R | 2 -- 1 file changed, 2 deletions(-) diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index 09dba4462d5..a61480fb33a 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -321,11 +321,9 @@ test_that("Write a CSV file with invalid input type", { ) }) - test_that("Write a CSV file with invalid batch size", { expect_error( write_csv_arrow(tbl_no_dates, csv_file, batch_size = -1), regexp = 'batch_size not greater than 0' ) }) - From 6252dcd36ab60ef66f800c975d119a802507358a Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 3 May 2021 19:19:02 -0700 Subject: [PATCH 27/27] Move batch_size validation --- r/R/csv.R | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/r/R/csv.R b/r/R/csv.R index deab4b09169..3357df52132 100644 --- a/r/R/csv.R +++ b/r/R/csv.R @@ -417,10 +417,11 @@ CsvReadOptions$create <- function(use_threads = option_use_threads(), #' @export CsvWriteOptions <- R6Class("CsvWriteOptions", inherit = ArrowObject) CsvWriteOptions$create <- function(include_header = TRUE, batch_size = 1024L){ + assert_that(is_integerish(batch_size, n = 1, finite = TRUE), batch_size > 0) csv___WriteOptions__initialize( list( include_header = include_header, - batch_size = batch_size + batch_size = as.integer(batch_size) ) ) } @@ -625,9 +626,6 @@ write_csv_arrow <- function(x, sink, include_header = TRUE, batch_size = 1024L) { - # Handle and validate options before touching data - batch_size <- as.integer(batch_size) - assert_that(batch_size > 0) write_options <- CsvWriteOptions$create(include_header, batch_size)