diff --git a/cpp/src/arrow/csv/type_fwd.h b/cpp/src/arrow/csv/type_fwd.h index 17fcdbdcc56..c0a53847a90 100644 --- a/cpp/src/arrow/csv/type_fwd.h +++ b/cpp/src/arrow/csv/type_fwd.h @@ -22,6 +22,7 @@ class TableReader; struct ConvertOptions; struct ReadOptions; struct ParseOptions; +struct WriteOptions; } // namespace csv } // namespace arrow diff --git a/r/NAMESPACE b/r/NAMESPACE index 117e3de5c22..b40ccaf2832 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -122,6 +122,7 @@ export(CsvFragmentScanOptions) export(CsvParseOptions) export(CsvReadOptions) export(CsvTableReader) +export(CsvWriteOptions) export(Dataset) export(DatasetFactory) export(DateUnit) @@ -277,6 +278,7 @@ export(unify_schemas) export(utf8) export(value_counts) export(write_arrow) +export(write_csv_arrow) export(write_dataset) export(write_feather) export(write_ipc_stream) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 51cdcf85df0..f78318cd555 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -292,6 +292,10 @@ compute__GetFunctionNames <- function(){ .Call(`_arrow_compute__GetFunctionNames`) } +csv___WriteOptions__initialize <- function(options){ + .Call(`_arrow_csv___WriteOptions__initialize`, options) +} + csv___ReadOptions__initialize <- function(options){ .Call(`_arrow_csv___ReadOptions__initialize`, options) } @@ -332,6 +336,14 @@ TimestampParser__MakeISO8601 <- function(){ .Call(`_arrow_TimestampParser__MakeISO8601`) } +csv___WriteCSV__Table <- function(table, write_options, stream){ + invisible(.Call(`_arrow_csv___WriteCSV__Table`, table, write_options, stream)) +} + +csv___WriteCSV__RecordBatch <- function(record_batch, write_options, stream){ + invisible(.Call(`_arrow_csv___WriteCSV__RecordBatch`, record_batch, write_options, stream)) +} + dataset___Dataset__NewScan <- function(ds){ .Call(`_arrow_dataset___Dataset__NewScan`, ds) } diff --git a/r/R/csv.R b/r/R/csv.R index 160c46e4753..3357df52132 100644 --- a/r/R/csv.R +++ b/r/R/csv.R @@ -381,6 +381,11 @@ CsvTableReader$create <- function(file, #' `TimestampParser$create()` takes an optional `format` string argument. #' See [`strptime()`][base::strptime()] for example syntax. #' The default is to use an ISO-8601 format parser. +#' +#' The `CsvWriteOptions$create()` factory method takes the following arguments: +#' - `include_header` Whether to write an initial header line with column names +#' - `batch_size` Maximum number of rows processed at a time. Default is 1024. +#' #' @section Active bindings: #' #' - `column_names`: from `CsvReadOptions` @@ -408,6 +413,19 @@ CsvReadOptions$create <- function(use_threads = option_use_threads(), ) } +#' @rdname CsvReadOptions +#' @export +CsvWriteOptions <- R6Class("CsvWriteOptions", inherit = ArrowObject) +CsvWriteOptions$create <- function(include_header = TRUE, batch_size = 1024L){ + assert_that(is_integerish(batch_size, n = 1, finite = TRUE), batch_size > 0) + csv___WriteOptions__initialize( + list( + include_header = include_header, + batch_size = as.integer(batch_size) + ) + ) +} + readr_to_csv_read_options <- function(skip, col_names, col_types) { if (isTRUE(col_names)) { # C++ default to parse is 0-length string array @@ -585,3 +603,49 @@ readr_to_csv_convert_options <- function(na, include_columns = include_columns ) } + +#' Write CSV file to disk +#' +#' @param x `data.frame`, [RecordBatch], or [Table] +#' @param sink A string file path, URI, or [OutputStream], or path in a file +#' system (`SubTreeFileSystem`) +#' @param include_header Whether to write an initial header line with column names +#' @param batch_size Maximum number of rows processed at a time. Default is 1024. +#' +#' @return The input `x`, invisibly. Note that if `sink` is an [OutputStream], +#' the stream will be left open. +#' @export +#' @examples +#' \donttest{ +#' tf <- tempfile() +#' on.exit(unlink(tf)) +#' write_csv_arrow(mtcars, tf) +#' } +#' @include arrow-package.R +write_csv_arrow <- function(x, + sink, + include_header = TRUE, + batch_size = 1024L) { + + write_options <- CsvWriteOptions$create(include_header, batch_size) + + x_out <- x + if (is.data.frame(x)) { + x <- Table$create(x) + } + + assert_is(x, "ArrowTabular") + + if (!inherits(sink, "OutputStream")) { + sink <- make_output_stream(sink) + on.exit(sink$close()) + } + + if(inherits(x, "RecordBatch")){ + csv___WriteCSV__RecordBatch(x, write_options, sink) + } else if(inherits(x, "Table")){ + csv___WriteCSV__Table(x, write_options, sink) + } + + invisible(x_out) +} diff --git a/r/_pkgdown.yml b/r/_pkgdown.yml index bb77b416aab..b2266cde758 100644 --- a/r/_pkgdown.yml +++ b/r/_pkgdown.yml @@ -98,6 +98,7 @@ reference: - write_ipc_stream - write_to_raw - write_parquet + - write_csv_arrow - title: C++ reader/writer interface contents: - ParquetFileReader @@ -109,6 +110,7 @@ reference: - RecordBatchReader - RecordBatchWriter - CsvReadOptions + - CsvWriteOptions - title: Arrow data containers contents: - array diff --git a/r/man/CsvWriteOptions.Rd b/r/man/CsvWriteOptions.Rd new file mode 100644 index 00000000000..e83126c9f9a --- /dev/null +++ b/r/man/CsvWriteOptions.Rd @@ -0,0 +1,22 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/csv.R +\docType{class} +\name{CsvWriteOptions} +\alias{CsvWriteOptions} +\title{File writer options} +\description{ +\code{CsvReadOptions}, \code{CsvParseOptions}, \code{CsvConvertOptions}, +\code{JsonReadOptions}, \code{JsonParseOptions}, and \code{TimestampParser} are containers for various +file reading options. See their usage in \code{\link[=read_csv_arrow]{read_csv_arrow()}} and +\code{\link[=read_json_arrow]{read_json_arrow()}}, respectively. +} +\section{Factory}{ + + +The \code{CsvWriteOptions$create()} factory method takes the following arguments: +\itemize{ +\item \code{include_header} Whether to write an initial header line with column names +\item \code{batch_size} Maximum number of rows processed at a time. Default is 1024 +} +} + diff --git a/r/man/write_csv_arrow.Rd b/r/man/write_csv_arrow.Rd new file mode 100644 index 00000000000..f583e487e1f --- /dev/null +++ b/r/man/write_csv_arrow.Rd @@ -0,0 +1,32 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/csv.R +\name{write_csv_arrow} +\alias{write_csv_arrow} +\title{Write CSV file to disk} +\usage{ +write_csv_arrow(x, sink, include_header = TRUE, batch_size = 1024L) +} +\arguments{ +\item{x}{\code{data.frame}, \link{RecordBatch}, or \link{Table}} + +\item{sink}{A string file path, URI, or \link{OutputStream}, or path in a file +system (\code{SubTreeFileSystem})} + +\item{include_header}{Whether to write an initial header line with column names} + +\item{batch_size}{Maximum number of rows processed at a time. Default is 1024} +} +\value{ +The input \code{x}, invisibly. Note that if \code{sink} is an \link{OutputStream}, +the stream will be left open. +} +\description{ +Write CSV file to disk +} +\examples{ +\donttest{ +tf <- tempfile() +on.exit(unlink(tf)) +write_csv_arrow(mtcars, tf) +} +} diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index c5ef6343ced..80fbf3eeaa3 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1143,6 +1143,21 @@ extern "C" SEXP _arrow_compute__GetFunctionNames(){ } #endif +// csv.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr csv___WriteOptions__initialize(cpp11::list options); +extern "C" SEXP _arrow_csv___WriteOptions__initialize(SEXP options_sexp){ +BEGIN_CPP11 + arrow::r::Input::type options(options_sexp); + return cpp11::as_sexp(csv___WriteOptions__initialize(options)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_csv___WriteOptions__initialize(SEXP options_sexp){ + Rf_error("Cannot call csv___WriteOptions__initialize(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // csv.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr csv___ReadOptions__initialize(cpp11::list options); @@ -1295,6 +1310,42 @@ extern "C" SEXP _arrow_TimestampParser__MakeISO8601(){ } #endif +// csv.cpp +#if defined(ARROW_R_WITH_ARROW) +void csv___WriteCSV__Table(const std::shared_ptr& table, const std::shared_ptr& write_options, const std::shared_ptr& stream); +extern "C" SEXP _arrow_csv___WriteCSV__Table(SEXP table_sexp, SEXP write_options_sexp, SEXP stream_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type table(table_sexp); + arrow::r::Input&>::type write_options(write_options_sexp); + arrow::r::Input&>::type stream(stream_sexp); + csv___WriteCSV__Table(table, write_options, stream); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_csv___WriteCSV__Table(SEXP table_sexp, SEXP write_options_sexp, SEXP stream_sexp){ + Rf_error("Cannot call csv___WriteCSV__Table(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// csv.cpp +#if defined(ARROW_R_WITH_ARROW) +void csv___WriteCSV__RecordBatch(const std::shared_ptr& record_batch, const std::shared_ptr& write_options, const std::shared_ptr& stream); +extern "C" SEXP _arrow_csv___WriteCSV__RecordBatch(SEXP record_batch_sexp, SEXP write_options_sexp, SEXP stream_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type record_batch(record_batch_sexp); + arrow::r::Input&>::type write_options(write_options_sexp); + arrow::r::Input&>::type stream(stream_sexp); + csv___WriteCSV__RecordBatch(record_batch, write_options, stream); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_csv___WriteCSV__RecordBatch(SEXP record_batch_sexp, SEXP write_options_sexp, SEXP stream_sexp){ + Rf_error("Cannot call csv___WriteCSV__RecordBatch(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr dataset___Dataset__NewScan(const std::shared_ptr& ds); @@ -6677,6 +6728,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_compute__CallFunction", (DL_FUNC) &_arrow_compute__CallFunction, 3}, { "_arrow_compute__GroupBy", (DL_FUNC) &_arrow_compute__GroupBy, 3}, { "_arrow_compute__GetFunctionNames", (DL_FUNC) &_arrow_compute__GetFunctionNames, 0}, + { "_arrow_csv___WriteOptions__initialize", (DL_FUNC) &_arrow_csv___WriteOptions__initialize, 1}, { "_arrow_csv___ReadOptions__initialize", (DL_FUNC) &_arrow_csv___ReadOptions__initialize, 1}, { "_arrow_csv___ParseOptions__initialize", (DL_FUNC) &_arrow_csv___ParseOptions__initialize, 1}, { "_arrow_csv___ReadOptions__column_names", (DL_FUNC) &_arrow_csv___ReadOptions__column_names, 1}, @@ -6687,6 +6739,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_TimestampParser__format", (DL_FUNC) &_arrow_TimestampParser__format, 1}, { "_arrow_TimestampParser__MakeStrptime", (DL_FUNC) &_arrow_TimestampParser__MakeStrptime, 1}, { "_arrow_TimestampParser__MakeISO8601", (DL_FUNC) &_arrow_TimestampParser__MakeISO8601, 0}, + { "_arrow_csv___WriteCSV__Table", (DL_FUNC) &_arrow_csv___WriteCSV__Table, 3}, + { "_arrow_csv___WriteCSV__RecordBatch", (DL_FUNC) &_arrow_csv___WriteCSV__RecordBatch, 3}, { "_arrow_dataset___Dataset__NewScan", (DL_FUNC) &_arrow_dataset___Dataset__NewScan, 1}, { "_arrow_dataset___Dataset__schema", (DL_FUNC) &_arrow_dataset___Dataset__schema, 1}, { "_arrow_dataset___Dataset__type_name", (DL_FUNC) &_arrow_dataset___Dataset__type_name, 1}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index b94ab764729..b146b7ecccc 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -178,6 +178,7 @@ R6_CLASS_NAME(arrow::csv::ReadOptions, "CsvReadOptions"); R6_CLASS_NAME(arrow::csv::ParseOptions, "CsvParseOptions"); R6_CLASS_NAME(arrow::csv::ConvertOptions, "CsvConvertOptions"); R6_CLASS_NAME(arrow::csv::TableReader, "CsvTableReader"); +R6_CLASS_NAME(arrow::csv::WriteOptions, "CsvWriteOptions"); #if defined(ARROW_R_WITH_PARQUET) R6_CLASS_NAME(parquet::ArrowReaderProperties, "ParquetArrowReaderProperties"); diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 0ce4cd699f8..3df5db87efa 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -20,8 +20,21 @@ #if defined(ARROW_R_WITH_ARROW) #include +#include +#include + #include +// [[arrow::export]] +std::shared_ptr csv___WriteOptions__initialize( + cpp11::list options) { + auto res = + std::make_shared(arrow::csv::WriteOptions::Defaults()); + res->include_header = cpp11::as_cpp(options["include_header"]); + res->batch_size = cpp11::as_cpp(options["batch_size"]); + return res; +} + // [[arrow::export]] std::shared_ptr csv___ReadOptions__initialize( cpp11::list options) { @@ -174,4 +187,21 @@ std::shared_ptr TimestampParser__MakeISO8601() { return arrow::TimestampParser::MakeISO8601(); } +// [[arrow::export]] +void csv___WriteCSV__Table(const std::shared_ptr& table, + const std::shared_ptr& write_options, + const std::shared_ptr& stream) { + StopIfNotOk( + arrow::csv::WriteCSV(*table, *write_options, gc_memory_pool(), stream.get())); +} + +// [[arrow::export]] +void csv___WriteCSV__RecordBatch( + const std::shared_ptr& record_batch, + const std::shared_ptr& write_options, + const std::shared_ptr& stream) { + StopIfNotOk(arrow::csv::WriteCSV(*record_batch, *write_options, gc_memory_pool(), + stream.get())); +} + #endif diff --git a/r/tests/testthat/test-csv.R b/r/tests/testthat/test-csv.R index d27706f060d..a61480fb33a 100644 --- a/r/tests/testthat/test-csv.R +++ b/r/tests/testthat/test-csv.R @@ -15,13 +15,14 @@ # specific language governing permissions and limitations # under the License. -context("CsvTableReader") - # Not all types round trip via CSV 100% identical by default tbl <- example_data[, c("dbl", "lgl", "false", "chr")] +tbl_no_dates <- tbl # Add a date to test its parsing tbl$date <- Sys.Date() + 1:10 +csv_file <- tempfile() + test_that("Can read csv file", { tf <- tempfile() on.exit(unlink(tf)) @@ -256,3 +257,73 @@ test_that("Mix of guessing and declaring types", { df <- read_csv_arrow(tf, col_types = "d-?c", col_names = cols, skip = 1) expect_identical(df, tbl[, c("dbl", "false", "chr")]) }) + + +test_that("Write a CSV file with header", { + tbl_out <- write_csv_arrow(tbl_no_dates, csv_file) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out, tbl_no_dates) + + tbl_in <- read_csv_arrow(csv_file) + expect_identical(tbl_in, tbl_no_dates) + + skip("Doesn't yet work with date columns due to ARROW-12540") + + tbl_out <- write_csv_arrow(tbl, csv_file) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out, tbl) + + tbl_in <- read_csv_arrow(csv_file) + expect_identical(tbl_in, tbl) +}) + + +test_that("Write a CSV file with no header", { + + tbl_out <- write_csv_arrow(tbl_no_dates, csv_file, include_header = FALSE) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out, tbl_no_dates) + tbl_in <- read_csv_arrow(csv_file, col_names = FALSE) + + tbl_expected <- tbl_no_dates + names(tbl_expected) <- c("f0", "f1", "f2", "f3") + + expect_identical(tbl_in, tbl_expected) + +}) + +test_that("Write a CSV file with different batch sizes", { + + tbl_out1 <- write_csv_arrow(tbl_no_dates, csv_file, batch_size = 1) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out1, tbl_no_dates) + tbl_in1 <- read_csv_arrow(csv_file) + expect_identical(tbl_in1, tbl_no_dates) + + tbl_out2 <- write_csv_arrow(tbl_no_dates, csv_file, batch_size = 2) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out2, tbl_no_dates) + tbl_in2 <- read_csv_arrow(csv_file) + expect_identical(tbl_in2, tbl_no_dates) + + tbl_out3 <- write_csv_arrow(tbl_no_dates, csv_file, batch_size = 12) + expect_true(file.exists(csv_file)) + expect_identical(tbl_out3, tbl_no_dates) + tbl_in3 <- read_csv_arrow(csv_file) + expect_identical(tbl_in3, tbl_no_dates) + +}) + +test_that("Write a CSV file with invalid input type", { + expect_error( + write_csv_arrow(Array$create(1:5), csv_file), + regexp = 'x must be a "ArrowTabular"' + ) +}) + +test_that("Write a CSV file with invalid batch size", { + expect_error( + write_csv_arrow(tbl_no_dates, csv_file, batch_size = -1), + regexp = 'batch_size not greater than 0' + ) +})