diff --git a/r/NEWS.md b/r/NEWS.md index 75a1cc676dd..d5dbc60efa9 100644 --- a/r/NEWS.md +++ b/r/NEWS.md @@ -22,6 +22,7 @@ ## Breaking changes * The R6 classes that wrap the C++ classes are now documented and exported and have been renamed to be more R-friendly. Users of the high-level R interface in this package are not affected. Those who want to interact with the Arrow C++ API more directly should work with these objects and methods. As part of this change, many functions that instantiated these R6 objects have been removed in favor of `Class$create()` methods. Notably, `arrow::array()` and `arrow::table()` have been removed in favor of `Array$create()` and `Table$create()`, eliminating the package startup message about masking `base` functions. For more information, see the new `vignette("arrow")`. +* Due to a subtle change in the Arrow message format, data written by the 0.15 version libraries may not be readable by older versions. If you need to send data to a process that uses an older version of Arrow (for example, an Apache Spark server that hasn't yet updated to Arrow 0.15), you can set the environment variable `ARROW_PRE_0_15_IPC_FORMAT=1`. * The `as_tibble` argument in the `read_*()` functions has been renamed to `as_data_frame` ([ARROW-6337](https://issues.apache.org/jira/browse/ARROW-6337), @jameslamb) * The `arrow::Column` class has been removed, as it was removed from the C++ library @@ -33,7 +34,7 @@ * `read_parquet()` and `read_feather()` can ingest data from a `raw` vector ([ARROW-6278](https://issues.apache.org/jira/browse/ARROW-6278)) * File readers now properly handle paths that need expanding, such as `~/file.parquet` ([ARROW-6323](https://issues.apache.org/jira/browse/ARROW-6323)) * Improved support for creating types in a schema: the types' printed names (e.g. "double") are guaranteed to be valid to use in instantiating a schema (e.g. `double()`), and time types can be created with human-friendly resolution strings ("ms", "s", etc.). ([ARROW-63378](https://issues.apache.org/jira/browse/ARROW-6338), [ARROW-6364](https://issues.apache.org/jira/browse/ARROW-6364)) -* Compressed streams are now supported on Windows ([ARROW-6360](https://issues.apache.org/jira/browse/ARROW-6360)) +* Compressed streams are now supported on Windows ([ARROW-6360](https://issues.apache.org/jira/browse/ARROW-6360)), and you can also specify a compression level ([ARROW-6533](https://issues.apache.org/jira/browse/ARROW-6533)) # arrow 0.14.1 diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 5d34f924470..a3400500a07 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -996,12 +996,12 @@ ipc___RecordBatchWriter__Close <- function(batch_writer){ invisible(.Call(`_arrow_ipc___RecordBatchWriter__Close` , batch_writer)) } -ipc___RecordBatchFileWriter__Open <- function(stream, schema){ - .Call(`_arrow_ipc___RecordBatchFileWriter__Open` , stream, schema) +ipc___RecordBatchFileWriter__Open <- function(stream, schema, use_legacy_format){ + .Call(`_arrow_ipc___RecordBatchFileWriter__Open` , stream, schema, use_legacy_format) } -ipc___RecordBatchStreamWriter__Open <- function(stream, schema){ - .Call(`_arrow_ipc___RecordBatchStreamWriter__Open` , stream, schema) +ipc___RecordBatchStreamWriter__Open <- function(stream, schema, use_legacy_format){ + .Call(`_arrow_ipc___RecordBatchStreamWriter__Open` , stream, schema, use_legacy_format) } schema_ <- function(fields){ diff --git a/r/R/record-batch-writer.R b/r/R/record-batch-writer.R index 208ceb1c3c5..6f43c52d96a 100644 --- a/r/R/record-batch-writer.R +++ b/r/R/record-batch-writer.R @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. + #' @title RecordBatchWriter classes #' @description `RecordBatchFileWriter` and `RecordBatchStreamWriter` are #' interfaces for writing record batches to either the binary file or streaming @@ -77,14 +78,15 @@ RecordBatchWriter <- R6Class("RecordBatchWriter", inherit = Object, #' @rdname RecordBatchWriter #' @export RecordBatchStreamWriter <- R6Class("RecordBatchStreamWriter", inherit = RecordBatchWriter) -RecordBatchStreamWriter$create <- function(sink, schema) { +RecordBatchStreamWriter$create <- function(sink, schema, use_legacy_format = NULL) { if (is.character(sink)) { sink <- FileOutputStream$create(sink) } + use_legacy_format <- use_legacy_format %||% identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1") assert_is(sink, "OutputStream") assert_is(schema, "Schema") - shared_ptr(RecordBatchStreamWriter, ipc___RecordBatchStreamWriter__Open(sink, schema)) + shared_ptr(RecordBatchStreamWriter, ipc___RecordBatchStreamWriter__Open(sink, schema, use_legacy_format)) } #' @usage NULL @@ -92,12 +94,13 @@ RecordBatchStreamWriter$create <- function(sink, schema) { #' @rdname RecordBatchWriter #' @export RecordBatchFileWriter <- R6Class("RecordBatchFileWriter", inherit = RecordBatchStreamWriter) -RecordBatchFileWriter$create <- function(sink, schema) { +RecordBatchFileWriter$create <- function(sink, schema, use_legacy_format = NULL) { if (is.character(sink)) { sink <- FileOutputStream$create(sink) } + use_legacy_format <- use_legacy_format %||% identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1") assert_is(sink, "OutputStream") assert_is(schema, "Schema") - shared_ptr(RecordBatchFileWriter, ipc___RecordBatchFileWriter__Open(sink, schema)) + shared_ptr(RecordBatchFileWriter, ipc___RecordBatchFileWriter__Open(sink, schema, use_legacy_format)) } diff --git a/r/R/write-arrow.R b/r/R/write-arrow.R index b74a8945013..dbab158204f 100644 --- a/r/R/write-arrow.R +++ b/r/R/write-arrow.R @@ -24,7 +24,7 @@ to_arrow.Table <- function(x) x # splice the data frame as arguments of Table$create() # see ?rlang::list2() -`to_arrow.data.frame` <- function(x) Table$create(!!!x) +to_arrow.data.frame <- function(x) Table$create(!!!x) #' Write Arrow formatted data #' @@ -60,7 +60,7 @@ write_arrow.RecordBatchWriter <- function(x, stream, ...){ } #' @export -`write_arrow.character` <- function(x, stream, ...) { +write_arrow.character <- function(x, stream, ...) { assert_that(length(stream) == 1L) x <- to_arrow(x) file_stream <- FileOutputStream$create(stream) @@ -77,7 +77,7 @@ write_arrow.RecordBatchWriter <- function(x, stream, ...){ } #' @export -`write_arrow.raw` <- function(x, stream, ...) { +write_arrow.raw <- function(x, stream, ...) { x <- to_arrow(x) schema <- x$schema diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 009d53b0be3..4c79d008b9a 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -3866,32 +3866,34 @@ RcppExport SEXP _arrow_ipc___RecordBatchWriter__Close(SEXP batch_writer_sexp){ // recordbatchwriter.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr ipc___RecordBatchFileWriter__Open(const std::shared_ptr& stream, const std::shared_ptr& schema); -RcppExport SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP stream_sexp, SEXP schema_sexp){ +std::shared_ptr ipc___RecordBatchFileWriter__Open(const std::shared_ptr& stream, const std::shared_ptr& schema, bool use_legacy_format); +RcppExport SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP stream_sexp, SEXP schema_sexp, SEXP use_legacy_format_sexp){ BEGIN_RCPP Rcpp::traits::input_parameter&>::type stream(stream_sexp); Rcpp::traits::input_parameter&>::type schema(schema_sexp); - return Rcpp::wrap(ipc___RecordBatchFileWriter__Open(stream, schema)); + Rcpp::traits::input_parameter::type use_legacy_format(use_legacy_format_sexp); + return Rcpp::wrap(ipc___RecordBatchFileWriter__Open(stream, schema, use_legacy_format)); END_RCPP } #else -RcppExport SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP stream_sexp, SEXP schema_sexp){ +RcppExport SEXP _arrow_ipc___RecordBatchFileWriter__Open(SEXP stream_sexp, SEXP schema_sexp, SEXP use_legacy_format_sexp){ Rf_error("Cannot call ipc___RecordBatchFileWriter__Open(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif // recordbatchwriter.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr ipc___RecordBatchStreamWriter__Open(const std::shared_ptr& stream, const std::shared_ptr& schema); -RcppExport SEXP _arrow_ipc___RecordBatchStreamWriter__Open(SEXP stream_sexp, SEXP schema_sexp){ +std::shared_ptr ipc___RecordBatchStreamWriter__Open(const std::shared_ptr& stream, const std::shared_ptr& schema, bool use_legacy_format); +RcppExport SEXP _arrow_ipc___RecordBatchStreamWriter__Open(SEXP stream_sexp, SEXP schema_sexp, SEXP use_legacy_format_sexp){ BEGIN_RCPP Rcpp::traits::input_parameter&>::type stream(stream_sexp); Rcpp::traits::input_parameter&>::type schema(schema_sexp); - return Rcpp::wrap(ipc___RecordBatchStreamWriter__Open(stream, schema)); + Rcpp::traits::input_parameter::type use_legacy_format(use_legacy_format_sexp); + return Rcpp::wrap(ipc___RecordBatchStreamWriter__Open(stream, schema, use_legacy_format)); END_RCPP } #else -RcppExport SEXP _arrow_ipc___RecordBatchStreamWriter__Open(SEXP stream_sexp, SEXP schema_sexp){ +RcppExport SEXP _arrow_ipc___RecordBatchStreamWriter__Open(SEXP stream_sexp, SEXP schema_sexp, SEXP use_legacy_format_sexp){ Rf_error("Cannot call ipc___RecordBatchStreamWriter__Open(). Please use arrow::install_arrow() to install required runtime libraries. "); } #endif @@ -4450,8 +4452,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ipc___RecordBatchWriter__WriteRecordBatch", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__WriteRecordBatch, 2}, { "_arrow_ipc___RecordBatchWriter__WriteTable", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__WriteTable, 2}, { "_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__Close, 1}, - { "_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 2}, - { "_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 2}, + { "_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 3}, + { "_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 3}, { "_arrow_schema_", (DL_FUNC) &_arrow_schema_, 1}, { "_arrow_Schema__ToString", (DL_FUNC) &_arrow_Schema__ToString, 1}, { "_arrow_Schema__num_fields", (DL_FUNC) &_arrow_Schema__num_fields, 1}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 38f3f83b22f..f8c52fa0716 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -19,6 +19,7 @@ #include #include +#include #include #include @@ -48,6 +49,17 @@ struct data { #define STOP_IF_NOT_OK(s) STOP_IF_NOT(s.ok(), s.ToString()) +#define ARROW_ASSIGN_OR_STOP_IMPL(status_name, lhs, rexpr) \ + auto status_name = (rexpr); \ + if (!status_name.status().ok()) { \ + Rcpp::stop(status_name.status().ToString()); \ + } \ + lhs = std::move(status_name).ValueOrDie(); + +#define ARROW_ASSIGN_OR_STOP(lhs, rexp) \ + ARROW_ASSIGN_OR_STOP_IMPL(ARROW_ASSIGN_OR_RAISE_NAME(_error_or_value, __COUNTER__), \ + lhs, rexp) + template struct NoDelete { inline void operator()(T* ptr) {} diff --git a/r/src/recordbatchwriter.cpp b/r/src/recordbatchwriter.cpp index b22f85885d9..a809e992ea5 100644 --- a/r/src/recordbatchwriter.cpp +++ b/r/src/recordbatchwriter.cpp @@ -42,20 +42,23 @@ void ipc___RecordBatchWriter__Close( // [[arrow::export]] std::shared_ptr ipc___RecordBatchFileWriter__Open( const std::shared_ptr& stream, - const std::shared_ptr& schema) { - std::shared_ptr file_writer; - STOP_IF_NOT_OK( - arrow::ipc::RecordBatchFileWriter::Open(stream.get(), schema, &file_writer)); + const std::shared_ptr& schema, bool use_legacy_format) { + auto options = arrow::ipc::IpcOptions::Defaults(); + options.write_legacy_ipc_format = use_legacy_format; + ARROW_ASSIGN_OR_STOP(auto file_writer, arrow::ipc::RecordBatchFileWriter::Open( + stream.get(), schema, options)); return file_writer; } // [[arrow::export]] std::shared_ptr ipc___RecordBatchStreamWriter__Open( const std::shared_ptr& stream, - const std::shared_ptr& schema) { - std::shared_ptr stream_writer; - STOP_IF_NOT_OK( - arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema, &stream_writer)); + const std::shared_ptr& schema, bool use_legacy_format) { + auto options = arrow::ipc::IpcOptions::Defaults(); + options.write_legacy_ipc_format = use_legacy_format; + + ARROW_ASSIGN_OR_STOP(auto stream_writer, arrow::ipc::RecordBatchStreamWriter::Open( + stream.get(), schema, options)); return stream_writer; }