Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b4df3e7
Add unit tests for write_csv_arrow
thisisnic Apr 22, 2021
1077e8d
Add CsvWriteOptions and write_csv_arrow functions
thisisnic Apr 22, 2021
9d12fe6
Add C++ functions for intialising csv writeoptions and bindings to ca…
thisisnic Apr 22, 2021
5791f63
Add pkgdown reference to CsvWriteOptions
thisisnic Apr 22, 2021
deff496
Add WriteOptions to arrow::csv namespace
thisisnic Apr 23, 2021
e21aaa5
Update types
thisisnic Apr 23, 2021
04d8106
Update arrowExports
thisisnic Apr 23, 2021
59112f6
Re-order params and add assertion
thisisnic Apr 23, 2021
c172db8
try changing
thisisnic Apr 23, 2021
2b962ad
Remove const keyword
thisisnic Apr 23, 2021
1d70f0b
Remove unnecessary comma, include relevant header files, refactor cpp
thisisnic Apr 23, 2021
713075f
Remove exposed memory pool argument, update NAMESPACE and docs
thisisnic Apr 26, 2021
86d83d8
Use gc_memory_pool() instead of arrow::default_memory_pool() to preve…
thisisnic Apr 26, 2021
8de7c3b
Typo fix
thisisnic Apr 26, 2021
3239575
Skip tests that include writing date columns
thisisnic Apr 27, 2021
63a2f98
Change whitespace to force CI
thisisnic Apr 27, 2021
96e9e2e
Change R C++ function format
thisisnic Apr 27, 2021
fcc6f5f
Run linter on R C++
thisisnic Apr 27, 2021
ee38464
Update docs
thisisnic Apr 27, 2021
6b6914c
Add write_csv_arrrow to _pkgdown.yml
thisisnic Apr 28, 2021
93338cc
Inconsequential grammar change to trigger CI
thisisnic Apr 28, 2021
7722206
Remove extra no-dates tests
thisisnic Apr 30, 2021
0d62fa1
Move docs for CsvWriteOptions
thisisnic Apr 30, 2021
faab7ff
Move tests to bottom of file
thisisnic May 3, 2021
8ff781f
Add tests for invalid inputs
thisisnic May 3, 2021
a421243
Remove extra whitespace
thisisnic May 3, 2021
6252dcd
Move batch_size validation
nealrichardson May 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/csv/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class TableReader;
struct ConvertOptions;
struct ReadOptions;
struct ParseOptions;
struct WriteOptions;

} // namespace csv
} // namespace arrow
2 changes: 2 additions & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ export(CsvFragmentScanOptions)
export(CsvParseOptions)
export(CsvReadOptions)
export(CsvTableReader)
export(CsvWriteOptions)
export(Dataset)
export(DatasetFactory)
export(DateUnit)
Expand Down Expand Up @@ -277,6 +278,7 @@ export(unify_schemas)
export(utf8)
export(value_counts)
export(write_arrow)
export(write_csv_arrow)
export(write_dataset)
export(write_feather)
export(write_ipc_stream)
Expand Down
12 changes: 12 additions & 0 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions r/R/csv.R
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,11 @@ CsvTableReader$create <- function(file,
#' `TimestampParser$create()` takes an optional `format` string argument.
#' See [`strptime()`][base::strptime()] for example syntax.
#' The default is to use an ISO-8601 format parser.
#'
#' The `CsvWriteOptions$create()` factory method takes the following arguments:
#' - `include_header` Whether to write an initial header line with column names
#' - `batch_size` Maximum number of rows processed at a time. Default is 1024.
#'
#' @section Active bindings:
#'
#' - `column_names`: from `CsvReadOptions`
Expand Down Expand Up @@ -408,6 +413,19 @@ CsvReadOptions$create <- function(use_threads = option_use_threads(),
)
}

#' @rdname CsvReadOptions
#' @export
CsvWriteOptions <- R6Class("CsvWriteOptions", inherit = ArrowObject)
CsvWriteOptions$create <- function(include_header = TRUE, batch_size = 1024L){
assert_that(is_integerish(batch_size, n = 1, finite = TRUE), batch_size > 0)
csv___WriteOptions__initialize(
list(
include_header = include_header,
batch_size = as.integer(batch_size)
)
)
}

readr_to_csv_read_options <- function(skip, col_names, col_types) {
if (isTRUE(col_names)) {
# C++ default to parse is 0-length string array
Expand Down Expand Up @@ -585,3 +603,49 @@ readr_to_csv_convert_options <- function(na,
include_columns = include_columns
)
}

#' Write CSV file to disk
#'
#' @param x `data.frame`, [RecordBatch], or [Table]
#' @param sink A string file path, URI, or [OutputStream], or path in a file
#' system (`SubTreeFileSystem`)
#' @param include_header Whether to write an initial header line with column names
#' @param batch_size Maximum number of rows processed at a time. Default is 1024.
#'
#' @return The input `x`, invisibly. Note that if `sink` is an [OutputStream],
#' the stream will be left open.
#' @export
#' @examples
#' \donttest{
#' tf <- tempfile()
#' on.exit(unlink(tf))
#' write_csv_arrow(mtcars, tf)
#' }
#' @include arrow-package.R
write_csv_arrow <- function(x,
sink,
include_header = TRUE,
batch_size = 1024L) {

write_options <- CsvWriteOptions$create(include_header, batch_size)

x_out <- x
if (is.data.frame(x)) {
x <- Table$create(x)
}

assert_is(x, "ArrowTabular")

if (!inherits(sink, "OutputStream")) {
sink <- make_output_stream(sink)
on.exit(sink$close())
}

if(inherits(x, "RecordBatch")){
csv___WriteCSV__RecordBatch(x, write_options, sink)
} else if(inherits(x, "Table")){
csv___WriteCSV__Table(x, write_options, sink)
}

invisible(x_out)
}
2 changes: 2 additions & 0 deletions r/_pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ reference:
- write_ipc_stream
- write_to_raw
- write_parquet
- write_csv_arrow
- title: C++ reader/writer interface
contents:
- ParquetFileReader
Expand All @@ -109,6 +110,7 @@ reference:
- RecordBatchReader
- RecordBatchWriter
- CsvReadOptions
- CsvWriteOptions
- title: Arrow data containers
contents:
- array
Expand Down
22 changes: 22 additions & 0 deletions r/man/CsvWriteOptions.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions r/man/write_csv_arrow.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 54 additions & 0 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions r/src/arrow_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ R6_CLASS_NAME(arrow::csv::ReadOptions, "CsvReadOptions");
R6_CLASS_NAME(arrow::csv::ParseOptions, "CsvParseOptions");
R6_CLASS_NAME(arrow::csv::ConvertOptions, "CsvConvertOptions");
R6_CLASS_NAME(arrow::csv::TableReader, "CsvTableReader");
R6_CLASS_NAME(arrow::csv::WriteOptions, "CsvWriteOptions");

#if defined(ARROW_R_WITH_PARQUET)
R6_CLASS_NAME(parquet::ArrowReaderProperties, "ParquetArrowReaderProperties");
Expand Down
30 changes: 30 additions & 0 deletions r/src/csv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,21 @@
#if defined(ARROW_R_WITH_ARROW)

#include <arrow/csv/reader.h>
#include <arrow/csv/writer.h>
#include <arrow/memory_pool.h>

#include <arrow/util/value_parsing.h>

// [[arrow::export]]
std::shared_ptr<arrow::csv::WriteOptions> csv___WriteOptions__initialize(
cpp11::list options) {
auto res =
std::make_shared<arrow::csv::WriteOptions>(arrow::csv::WriteOptions::Defaults());
res->include_header = cpp11::as_cpp<bool>(options["include_header"]);
res->batch_size = cpp11::as_cpp<int>(options["batch_size"]);
return res;
}

// [[arrow::export]]
std::shared_ptr<arrow::csv::ReadOptions> csv___ReadOptions__initialize(
cpp11::list options) {
Expand Down Expand Up @@ -174,4 +187,21 @@ std::shared_ptr<arrow::TimestampParser> TimestampParser__MakeISO8601() {
return arrow::TimestampParser::MakeISO8601();
}

// [[arrow::export]]
void csv___WriteCSV__Table(const std::shared_ptr<arrow::Table>& table,
const std::shared_ptr<arrow::csv::WriteOptions>& write_options,
const std::shared_ptr<arrow::io::OutputStream>& stream) {
StopIfNotOk(
arrow::csv::WriteCSV(*table, *write_options, gc_memory_pool(), stream.get()));
}

// [[arrow::export]]
void csv___WriteCSV__RecordBatch(
const std::shared_ptr<arrow::RecordBatch>& record_batch,
const std::shared_ptr<arrow::csv::WriteOptions>& write_options,
const std::shared_ptr<arrow::io::OutputStream>& stream) {
StopIfNotOk(arrow::csv::WriteCSV(*record_batch, *write_options, gc_memory_pool(),
stream.get()));
}

#endif
Loading