From da10e94ff21d23e96a1b94e94e57b3b3007223f5 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 14 Nov 2018 16:39:30 +0100 Subject: [PATCH 1/8] + Compressed streams --- r/DESCRIPTION | 3 +- r/NAMESPACE | 29 +++++++++++ r/R/RcppExports.R | 12 +++++ r/R/compression.R | 86 +++++++++++++++++++++++++++++++ r/R/enums.R | 6 +++ r/man/DataType.Rd | 3 ++ r/man/compressed_input_stream.Rd | 16 ++++++ r/man/compressed_output_stream.Rd | 16 ++++++ r/man/compression_codec.Rd | 14 +++++ r/src/RcppExports.cpp | 38 ++++++++++++++ r/src/arrow_types.h | 3 ++ r/src/compression.cpp | 43 ++++++++++++++++ 12 files changed, 268 insertions(+), 1 deletion(-) create mode 100644 r/R/compression.R create mode 100644 r/man/compressed_input_stream.Rd create mode 100644 r/man/compressed_output_stream.Rd create mode 100644 r/man/compression_codec.Rd create mode 100644 r/src/compression.cpp diff --git a/r/DESCRIPTION b/r/DESCRIPTION index 5303a877f9e..620cd548ee4 100644 --- a/r/DESCRIPTION +++ b/r/DESCRIPTION @@ -55,11 +55,12 @@ Collate: 'Table.R' 'array.R' 'buffer.R' + 'io.R' + 'compression.R' 'compute.R' 'csv.R' 'dictionary.R' 'feather.R' - 'io.R' 'memory_pool.R' 'message.R' 'on_exit.R' diff --git a/r/NAMESPACE b/r/NAMESPACE index f8f6384dce1..8e8649e7915 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -44,6 +44,21 @@ S3method(csv_table_reader,"arrow::io::InputStream") S3method(csv_table_reader,character) S3method(csv_table_reader,default) S3method(csv_table_reader,fs_path) +S3method(buffer_reader,"arrow::Buffer") +S3method(buffer_reader,default) +S3method(compressed_input_stream,"arrow::io::InputStream") +S3method(compressed_input_stream,character) +S3method(compressed_input_stream,fs_path) +S3method(compressed_output_stream,"arrow::io::OutputStream") +S3method(compressed_output_stream,character) +S3method(compressed_output_stream,fs_path) +S3method(feather_table_reader,"arrow::io::RandomAccessFile") +S3method(feather_table_reader,"arrow::ipc::feather::TableReader") +S3method(feather_table_reader,character) +S3method(feather_table_reader,default) +S3method(feather_table_reader,fs_path) +S3method(feather_table_writer,"arrow::io::OutputStream") +>>>>>>> + Compressed streams S3method(length,"arrow::Array") S3method(names,"arrow::RecordBatch") S3method(print,"arrow-enum") @@ -74,6 +89,17 @@ S3method(write_feather_RecordBatch,default) S3method(write_feather_RecordBatch,fs_path) export(BufferOutputStream) export(BufferReader) +S3method(write_record_batch,"arrow::io::OutputStream") +S3method(write_record_batch,"arrow::ipc::RecordBatchWriter") +S3method(write_record_batch,character) +S3method(write_record_batch,fs_path) +S3method(write_record_batch,raw) +S3method(write_table,"arrow::io::OutputStream") +S3method(write_table,"arrow::ipc::RecordBatchWriter") +S3method(write_table,character) +S3method(write_table,fs_path) +S3method(write_table,raw) +export(CompressionType) export(DateUnit) export(FeatherTableReader) export(FeatherTableWriter) @@ -101,6 +127,9 @@ export(csv_convert_options) export(csv_parse_options) export(csv_read_options) export(csv_table_reader) +export(compressed_input_stream) +export(compressed_output_stream) +export(compression_codec) export(date32) export(date64) export(decimal) diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index c6fe8719f4e..b0178d8515a 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -173,6 +173,18 @@ Column__data <- function(column) { .Call(`_arrow_Column__data`, column) } +util___Codec__Create <- function(codec) { + .Call(`_arrow_util___Codec__Create`, codec) +} + +io___CompressedOutputStream__Make <- function(codec, raw) { + .Call(`_arrow_io___CompressedOutputStream__Make`, codec, raw) +} + +io___CompressedInputStream__Make <- function(codec, raw) { + .Call(`_arrow_io___CompressedInputStream__Make`, codec, raw) +} + compute___CastOptions__initialize <- function(allow_int_overflow, allow_time_truncate, allow_float_truncate) { .Call(`_arrow_compute___CastOptions__initialize`, allow_int_overflow, allow_time_truncate, allow_float_truncate) } diff --git a/r/R/compression.R b/r/R/compression.R new file mode 100644 index 00000000000..64dacbf314f --- /dev/null +++ b/r/R/compression.R @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#' @include enums.R +#' @include R6.R +#' @include io.R + +`arrow::util::Codec` <- R6Class("arrow::util::Codec", inherit = `arrow::Object`) + +`arrow::io::CompressedOutputStream` <- R6Class("arrow::io::CompressedOutputStream", inherit = `arrow::io::OutputStream`) +`arrow::io::CompressedInputStream` <- R6Class("arrow::io::CompressedInputStream", inherit = `arrow::io::InputStream`) + +#' codec +#' +#' @param type type of codec +#' +#' @export +compression_codec <- function(type = "GZIP") { + type <- CompressionType[[match.arg(type, names(CompressionType))]] + unique_ptr(`arrow::util::Codec`, util___Codec__Create(type)) +} + + +#' Compressed output stream +#' +#' @param stream Underlying raw output stream +#' @param codec a codec +#' @export +compressed_output_stream <- function(stream, codec = compression_codec("GZIP")){ + UseMethod("compressed_output_stream") +} + +#' @export +compressed_output_stream.character <- function(stream, codec = compression_codec("GZIP")){ + compressed_output_stream(fs::path_abs(stream), codec = codec) +} + +#' @export +compressed_output_stream.fs_path <- function(stream, codec = compression_codec("GZIP")){ + compressed_output_stream(file_output_stream(stream), codec = codec) +} + +#' @export +`compressed_output_stream.arrow::io::OutputStream` <- function(stream, codec) { + assert_that(inherits(codec, "arrow::util::Codec")) + shared_ptr(`arrow::io::CompressedOutputStream`, io___CompressedOutputStream__Make(codec, stream)) +} + +#' Compressed input stream +#' +#' @param stream Underlying raw input stream +#' @param codec a codec +#' @export +compressed_input_stream <- function(stream, codec = codec("GZIP")){ + UseMethod("compressed_input_stream") +} + +#' @export +compressed_input_stream.character <- function(stream, codec = compression_codec("GZIP")){ + compressed_input_stream(fs::path_abs(stream), codec = codec) +} + +#' @export +compressed_input_stream.fs_path <- function(stream, codec = compression_codec("GZIP")){ + compressed_input_stream(file_open(stream), codec = codec) +} + +#' @export +`compressed_input_stream.arrow::io::InputStream` <- function(stream, codec) { + assert_that(inherits(codec, "arrow::util::Codec")) + shared_ptr(`arrow::io::CompressedInputStream`, io___CompressedInputStream__Make(codec, stream)) +} diff --git a/r/R/enums.R b/r/R/enums.R index 35e6aaa440e..3a6ac5c23bf 100644 --- a/r/R/enums.R +++ b/r/R/enums.R @@ -70,3 +70,9 @@ FileMode <- enum("arrow::io::FileMode", MessageType <- enum("arrow::ipc::Message::Type", NONE = 0L, SCHEMA = 1L, DICTIONARY_BATCH = 2L, RECORD_BATCH = 3L, TENSOR = 4L ) + +#' @rdname DataType +#' @export +CompressionType <- enum("arrow::Compression::type", + UNCOMPRESSED = 0L, SNAPPY = 1L, GZIP = 2L, BROTLI = 3L, ZSTD = 4L, LZ4 = 5L, LZO = 6L, BZ2 = 7L +) diff --git a/r/man/DataType.Rd b/r/man/DataType.Rd index b10414020b5..bf5f1d40456 100644 --- a/r/man/DataType.Rd +++ b/r/man/DataType.Rd @@ -8,6 +8,7 @@ \alias{StatusCode} \alias{FileMode} \alias{MessageType} +\alias{CompressionType} \alias{int8} \alias{int16} \alias{int32} @@ -45,6 +46,8 @@ FileMode MessageType +CompressionType + int8() int16() diff --git a/r/man/compressed_input_stream.Rd b/r/man/compressed_input_stream.Rd new file mode 100644 index 00000000000..3dd0766bc15 --- /dev/null +++ b/r/man/compressed_input_stream.Rd @@ -0,0 +1,16 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/compression.R +\name{compressed_input_stream} +\alias{compressed_input_stream} +\title{Compressed input stream} +\usage{ +compressed_input_stream(stream, codec = codec("GZIP")) +} +\arguments{ +\item{stream}{Underlying raw input stream} + +\item{codec}{a codec} +} +\description{ +Compressed input stream +} diff --git a/r/man/compressed_output_stream.Rd b/r/man/compressed_output_stream.Rd new file mode 100644 index 00000000000..f3310b2ac00 --- /dev/null +++ b/r/man/compressed_output_stream.Rd @@ -0,0 +1,16 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/compression.R +\name{compressed_output_stream} +\alias{compressed_output_stream} +\title{Compressed output stream} +\usage{ +compressed_output_stream(stream, codec = compression_codec("GZIP")) +} +\arguments{ +\item{stream}{Underlying raw output stream} + +\item{codec}{a codec} +} +\description{ +Compressed output stream +} diff --git a/r/man/compression_codec.Rd b/r/man/compression_codec.Rd new file mode 100644 index 00000000000..a7db8ab0deb --- /dev/null +++ b/r/man/compression_codec.Rd @@ -0,0 +1,14 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/compression.R +\name{compression_codec} +\alias{compression_codec} +\title{codec} +\usage{ +compression_codec(type = "GZIP") +} +\arguments{ +\item{type}{type of codec} +} +\description{ +codec +} diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index 1e8fed18676..10497d1859b 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -493,6 +493,41 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// util___Codec__Create +std::unique_ptr util___Codec__Create(arrow::Compression::type codec); +RcppExport SEXP _arrow_util___Codec__Create(SEXP codecSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< arrow::Compression::type >::type codec(codecSEXP); + rcpp_result_gen = Rcpp::wrap(util___Codec__Create(codec)); + return rcpp_result_gen; +END_RCPP +} +// io___CompressedOutputStream__Make +std::shared_ptr io___CompressedOutputStream__Make(const std::unique_ptr& codec, const std::shared_ptr& raw); +RcppExport SEXP _arrow_io___CompressedOutputStream__Make(SEXP codecSEXP, SEXP rawSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::unique_ptr& >::type codec(codecSEXP); + Rcpp::traits::input_parameter< const std::shared_ptr& >::type raw(rawSEXP); + rcpp_result_gen = Rcpp::wrap(io___CompressedOutputStream__Make(codec, raw)); + return rcpp_result_gen; +END_RCPP +} +// io___CompressedInputStream__Make +std::shared_ptr io___CompressedInputStream__Make(const std::unique_ptr& codec, const std::shared_ptr& raw); +RcppExport SEXP _arrow_io___CompressedInputStream__Make(SEXP codecSEXP, SEXP rawSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::unique_ptr& >::type codec(codecSEXP); + Rcpp::traits::input_parameter< const std::shared_ptr& >::type raw(rawSEXP); + rcpp_result_gen = Rcpp::wrap(io___CompressedInputStream__Make(codec, raw)); + return rcpp_result_gen; +END_RCPP +} // compute___CastOptions__initialize std::shared_ptr compute___CastOptions__initialize(bool allow_int_overflow, bool allow_time_truncate, bool allow_float_truncate); RcppExport SEXP _arrow_compute___CastOptions__initialize(SEXP allow_int_overflowSEXP, SEXP allow_time_truncateSEXP, SEXP allow_float_truncateSEXP) { @@ -2264,6 +2299,9 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_Column__null_count", (DL_FUNC) &_arrow_Column__null_count, 1}, {"_arrow_Column__type", (DL_FUNC) &_arrow_Column__type, 1}, {"_arrow_Column__data", (DL_FUNC) &_arrow_Column__data, 1}, + {"_arrow_util___Codec__Create", (DL_FUNC) &_arrow_util___Codec__Create, 1}, + {"_arrow_io___CompressedOutputStream__Make", (DL_FUNC) &_arrow_io___CompressedOutputStream__Make, 2}, + {"_arrow_io___CompressedInputStream__Make", (DL_FUNC) &_arrow_io___CompressedInputStream__Make, 2}, {"_arrow_compute___CastOptions__initialize", (DL_FUNC) &_arrow_compute___CastOptions__initialize, 3}, {"_arrow_Array__cast", (DL_FUNC) &_arrow_Array__cast, 3}, {"_arrow_ChunkedArray__cast", (DL_FUNC) &_arrow_ChunkedArray__cast, 3}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 6fef7997dbf..e8b06e10db7 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -23,12 +23,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #define STOP_IF_NOT(TEST, MSG) \ do { \ @@ -127,6 +129,7 @@ RCPP_EXPOSED_ENUM_NODECL(arrow::TimeUnit::type) RCPP_EXPOSED_ENUM_NODECL(arrow::StatusCode) RCPP_EXPOSED_ENUM_NODECL(arrow::io::FileMode::type) RCPP_EXPOSED_ENUM_NODECL(arrow::ipc::Message::Type) +RCPP_EXPOSED_ENUM_NODECL(arrow::Compression::type) namespace Rcpp { namespace internal { diff --git a/r/src/compression.cpp b/r/src/compression.cpp new file mode 100644 index 00000000000..4c522d85f5d --- /dev/null +++ b/r/src/compression.cpp @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow_types.h" + +// [[Rcpp::export]] +std::unique_ptr util___Codec__Create(arrow::Compression::type codec) { + std::unique_ptr out; + STOP_IF_NOT_OK(arrow::util::Codec::Create(codec, &out)); + return out; +} + +// [[Rcpp::export]] +std::shared_ptr io___CompressedOutputStream__Make( + const std::unique_ptr& codec, + const std::shared_ptr& raw) { + std::shared_ptr stream; + STOP_IF_NOT_OK(arrow::io::CompressedOutputStream::Make(codec.get(), raw, &stream)); + return stream; +} + +// [[Rcpp::export]] +std::shared_ptr io___CompressedInputStream__Make( + const std::unique_ptr& codec, + const std::shared_ptr& raw) { + std::shared_ptr stream; + STOP_IF_NOT_OK(arrow::io::CompressedInputStream::Make(codec.get(), raw, &stream)); + return stream; +} From a3c5af4b30e6ed9fe166f54c666777ac9c256f7f Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 3 Dec 2018 10:15:18 +0100 Subject: [PATCH 2/8] align with changes from #3043 --- r/NAMESPACE | 14 ++++++---- r/R/compression.R | 28 +++++++++---------- ...put_stream.Rd => CompressedInputStream.Rd} | 6 ++-- ...ut_stream.Rd => CompressedOutputStream.Rd} | 6 ++-- 4 files changed, 28 insertions(+), 26 deletions(-) rename r/man/{compressed_input_stream.Rd => CompressedInputStream.Rd} (68%) rename r/man/{compressed_output_stream.Rd => CompressedOutputStream.Rd} (65%) diff --git a/r/NAMESPACE b/r/NAMESPACE index 8e8649e7915..06a5fdd1c28 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -32,6 +32,12 @@ S3method(RecordBatchStreamReader,raw) S3method(RecordBatchStreamWriter,"arrow::io::OutputStream") S3method(RecordBatchStreamWriter,character) S3method(RecordBatchStreamWriter,fs_path) +S3method(CompressedInputStream,"arrow::io::InputStream") +S3method(CompressedInputStream,character) +S3method(CompressedInputStream,fs_path) +S3method(CompressedOutputStream,"arrow::io::OutputStream") +S3method(CompressedOutputStream,character) +S3method(CompressedOutputStream,fs_path) S3method(as_tibble,"arrow::RecordBatch") S3method(as_tibble,"arrow::Table") S3method(buffer,complex) @@ -46,12 +52,6 @@ S3method(csv_table_reader,default) S3method(csv_table_reader,fs_path) S3method(buffer_reader,"arrow::Buffer") S3method(buffer_reader,default) -S3method(compressed_input_stream,"arrow::io::InputStream") -S3method(compressed_input_stream,character) -S3method(compressed_input_stream,fs_path) -S3method(compressed_output_stream,"arrow::io::OutputStream") -S3method(compressed_output_stream,character) -S3method(compressed_output_stream,fs_path) S3method(feather_table_reader,"arrow::io::RandomAccessFile") S3method(feather_table_reader,"arrow::ipc::feather::TableReader") S3method(feather_table_reader,character) @@ -99,6 +99,8 @@ S3method(write_table,"arrow::ipc::RecordBatchWriter") S3method(write_table,character) S3method(write_table,fs_path) S3method(write_table,raw) +export(CompressedInputStream) +export(CompressedOutputStream) export(CompressionType) export(DateUnit) export(FeatherTableReader) diff --git a/r/R/compression.R b/r/R/compression.R index 64dacbf314f..55b5c6b4117 100644 --- a/r/R/compression.R +++ b/r/R/compression.R @@ -40,22 +40,22 @@ compression_codec <- function(type = "GZIP") { #' @param stream Underlying raw output stream #' @param codec a codec #' @export -compressed_output_stream <- function(stream, codec = compression_codec("GZIP")){ - UseMethod("compressed_output_stream") +CompressedOutputStream <- function(stream, codec = compression_codec("GZIP")){ + UseMethod("CompressedOutputStream") } #' @export -compressed_output_stream.character <- function(stream, codec = compression_codec("GZIP")){ - compressed_output_stream(fs::path_abs(stream), codec = codec) +CompressedOutputStream.character <- function(stream, codec = compression_codec("GZIP")){ + CompressedOutputStream(fs::path_abs(stream), codec = codec) } #' @export -compressed_output_stream.fs_path <- function(stream, codec = compression_codec("GZIP")){ - compressed_output_stream(file_output_stream(stream), codec = codec) +CompressedOutputStream.fs_path <- function(stream, codec = compression_codec("GZIP")){ + CompressedOutputStream(file_output_stream(stream), codec = codec) } #' @export -`compressed_output_stream.arrow::io::OutputStream` <- function(stream, codec) { +`CompressedOutputStream.arrow::io::OutputStream` <- function(stream, codec) { assert_that(inherits(codec, "arrow::util::Codec")) shared_ptr(`arrow::io::CompressedOutputStream`, io___CompressedOutputStream__Make(codec, stream)) } @@ -65,22 +65,22 @@ compressed_output_stream.fs_path <- function(stream, codec = compression_codec(" #' @param stream Underlying raw input stream #' @param codec a codec #' @export -compressed_input_stream <- function(stream, codec = codec("GZIP")){ - UseMethod("compressed_input_stream") +CompressedInputStream <- function(stream, codec = codec("GZIP")){ + UseMethod("CompressedInputStream") } #' @export -compressed_input_stream.character <- function(stream, codec = compression_codec("GZIP")){ - compressed_input_stream(fs::path_abs(stream), codec = codec) +CompressedInputStream.character <- function(stream, codec = compression_codec("GZIP")){ + CompressedInputStream(fs::path_abs(stream), codec = codec) } #' @export -compressed_input_stream.fs_path <- function(stream, codec = compression_codec("GZIP")){ - compressed_input_stream(file_open(stream), codec = codec) +CompressedInputStream.fs_path <- function(stream, codec = compression_codec("GZIP")){ + CompressedInputStream(file_open(stream), codec = codec) } #' @export -`compressed_input_stream.arrow::io::InputStream` <- function(stream, codec) { +`CompressedInputStream.arrow::io::InputStream` <- function(stream, codec) { assert_that(inherits(codec, "arrow::util::Codec")) shared_ptr(`arrow::io::CompressedInputStream`, io___CompressedInputStream__Make(codec, stream)) } diff --git a/r/man/compressed_input_stream.Rd b/r/man/CompressedInputStream.Rd similarity index 68% rename from r/man/compressed_input_stream.Rd rename to r/man/CompressedInputStream.Rd index 3dd0766bc15..cfff053083d 100644 --- a/r/man/compressed_input_stream.Rd +++ b/r/man/CompressedInputStream.Rd @@ -1,10 +1,10 @@ % Generated by roxygen2: do not edit by hand % Please edit documentation in R/compression.R -\name{compressed_input_stream} -\alias{compressed_input_stream} +\name{CompressedInputStream} +\alias{CompressedInputStream} \title{Compressed input stream} \usage{ -compressed_input_stream(stream, codec = codec("GZIP")) +CompressedInputStream(stream, codec = codec("GZIP")) } \arguments{ \item{stream}{Underlying raw input stream} diff --git a/r/man/compressed_output_stream.Rd b/r/man/CompressedOutputStream.Rd similarity index 65% rename from r/man/compressed_output_stream.Rd rename to r/man/CompressedOutputStream.Rd index f3310b2ac00..85c4d9209ac 100644 --- a/r/man/compressed_output_stream.Rd +++ b/r/man/CompressedOutputStream.Rd @@ -1,10 +1,10 @@ % Generated by roxygen2: do not edit by hand % Please edit documentation in R/compression.R -\name{compressed_output_stream} -\alias{compressed_output_stream} +\name{CompressedOutputStream} +\alias{CompressedOutputStream} \title{Compressed output stream} \usage{ -compressed_output_stream(stream, codec = compression_codec("GZIP")) +CompressedOutputStream(stream, codec = compression_codec("GZIP")) } \arguments{ \item{stream}{Underlying raw output stream} From 08124a7e505f875d4f69a1bd25db3ebd16171562 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 3 Dec 2018 10:57:34 +0100 Subject: [PATCH 3/8] test reading and writing random bytes to Compressed streams --- r/NAMESPACE | 1 + r/R/RcppExports.R | 8 +++++ r/R/buffer.R | 5 ++++ r/R/compression.R | 4 +-- r/R/io.R | 9 ++++-- r/src/RcppExports.cpp | 24 +++++++++++++++ r/src/io.cpp | 16 ++++++++++ r/tests/testthat/test-compressed.R | 48 ++++++++++++++++++++++++++++++ 8 files changed, 111 insertions(+), 4 deletions(-) create mode 100644 r/tests/testthat/test-compressed.R diff --git a/r/NAMESPACE b/r/NAMESPACE index 06a5fdd1c28..324f58810d1 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -40,6 +40,7 @@ S3method(CompressedOutputStream,character) S3method(CompressedOutputStream,fs_path) S3method(as_tibble,"arrow::RecordBatch") S3method(as_tibble,"arrow::Table") +S3method(buffer,"arrow::Buffer") S3method(buffer,complex) S3method(buffer,default) S3method(buffer,integer) diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index b0178d8515a..3068582ec47 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -557,6 +557,14 @@ io___BufferReader__initialize <- function(buffer) { .Call(`_arrow_io___BufferReader__initialize`, buffer) } +io___Writable__write <- function(stream, buf) { + invisible(.Call(`_arrow_io___Writable__write`, stream, buf)) +} + +io___OutputStream__Tell <- function(stream) { + .Call(`_arrow_io___OutputStream__Tell`, stream) +} + io___FileOutputStream__Open <- function(path) { .Call(`_arrow_io___FileOutputStream__Open`, path) } diff --git a/r/R/buffer.R b/r/R/buffer.R index 2fecd0e4fc6..ca9b2eeff21 100644 --- a/r/R/buffer.R +++ b/r/R/buffer.R @@ -81,3 +81,8 @@ buffer.complex <- function(x) { shared_ptr(`arrow::Buffer`, r___RBuffer__initialize(x)) } +#' @export +`buffer.arrow::Buffer` <- function(x) { + x +} + diff --git a/r/R/compression.R b/r/R/compression.R index 55b5c6b4117..30eb67fa076 100644 --- a/r/R/compression.R +++ b/r/R/compression.R @@ -55,7 +55,7 @@ CompressedOutputStream.fs_path <- function(stream, codec = compression_codec("GZ } #' @export -`CompressedOutputStream.arrow::io::OutputStream` <- function(stream, codec) { +`CompressedOutputStream.arrow::io::OutputStream` <- function(stream, codec = compression_codec("GZIP")) { assert_that(inherits(codec, "arrow::util::Codec")) shared_ptr(`arrow::io::CompressedOutputStream`, io___CompressedOutputStream__Make(codec, stream)) } @@ -80,7 +80,7 @@ CompressedInputStream.fs_path <- function(stream, codec = compression_codec("GZI } #' @export -`CompressedInputStream.arrow::io::InputStream` <- function(stream, codec) { +`CompressedInputStream.arrow::io::InputStream` <- function(stream, codec = compression_codec("GZIP")) { assert_that(inherits(codec, "arrow::util::Codec")) shared_ptr(`arrow::io::CompressedInputStream`, io___CompressedInputStream__Make(codec, stream)) } diff --git a/r/R/io.R b/r/R/io.R index b772be30acf..ad350f3d179 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -21,7 +21,11 @@ # OutputStream ------------------------------------------------------------ -`arrow::io::Writable` <- R6Class("arrow::io::Writable", inherit = `arrow::Object`) +`arrow::io::Writable` <- R6Class("arrow::io::Writable", inherit = `arrow::Object`, + public = list( + write = function(x) io___Writable__write(self, buffer(x)) + ) +) #' @title OutputStream #' @@ -38,7 +42,8 @@ #' @name arrow__io__OutputStream `arrow::io::OutputStream` <- R6Class("arrow::io::OutputStream", inherit = `arrow::io::Writable`, public = list( - close = function() io___OutputStream__Close(self) + close = function() io___OutputStream__Close(self), + tell = function() io___OutputStream__Tell(self) ) ) diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index 10497d1859b..68f6b417960 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -1561,6 +1561,28 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// io___Writable__write +void io___Writable__write(const std::shared_ptr& stream, const std::shared_ptr& buf); +RcppExport SEXP _arrow_io___Writable__write(SEXP streamSEXP, SEXP bufSEXP) { +BEGIN_RCPP + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::shared_ptr& >::type stream(streamSEXP); + Rcpp::traits::input_parameter< const std::shared_ptr& >::type buf(bufSEXP); + io___Writable__write(stream, buf); + return R_NilValue; +END_RCPP +} +// io___OutputStream__Tell +int64_t io___OutputStream__Tell(const std::shared_ptr& stream); +RcppExport SEXP _arrow_io___OutputStream__Tell(SEXP streamSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::shared_ptr& >::type stream(streamSEXP); + rcpp_result_gen = Rcpp::wrap(io___OutputStream__Tell(stream)); + return rcpp_result_gen; +END_RCPP +} // io___FileOutputStream__Open std::shared_ptr io___FileOutputStream__Open(const std::string& path); RcppExport SEXP _arrow_io___FileOutputStream__Open(SEXP pathSEXP) { @@ -2395,6 +2417,8 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_io___MemoryMappedFile__Resize", (DL_FUNC) &_arrow_io___MemoryMappedFile__Resize, 2}, {"_arrow_io___ReadableFile__Open", (DL_FUNC) &_arrow_io___ReadableFile__Open, 1}, {"_arrow_io___BufferReader__initialize", (DL_FUNC) &_arrow_io___BufferReader__initialize, 1}, + {"_arrow_io___Writable__write", (DL_FUNC) &_arrow_io___Writable__write, 2}, + {"_arrow_io___OutputStream__Tell", (DL_FUNC) &_arrow_io___OutputStream__Tell, 1}, {"_arrow_io___FileOutputStream__Open", (DL_FUNC) &_arrow_io___FileOutputStream__Open, 1}, {"_arrow_io___BufferOutputStream__Create", (DL_FUNC) &_arrow_io___BufferOutputStream__Create, 1}, {"_arrow_io___BufferOutputStream__capacity", (DL_FUNC) &_arrow_io___BufferOutputStream__capacity, 1}, diff --git a/r/src/io.cpp b/r/src/io.cpp index b8d2d53ad21..1f5605f827b 100644 --- a/r/src/io.cpp +++ b/r/src/io.cpp @@ -115,6 +115,22 @@ std::shared_ptr io___BufferReader__initialize( return std::make_shared(buffer); } +// ------- arrow::io::Writable + +// [[Rcpp::export]] +void io___Writable__write(const std::shared_ptr& stream, const std::shared_ptr& buf) { + STOP_IF_NOT_OK(stream->Write(buf->data(), buf->size())); +} + +// ------- arrow::io::OutputStream + +// [[Rcpp::export]] +int64_t io___OutputStream__Tell(const std::shared_ptr& stream) { + int64_t position; + STOP_IF_NOT_OK(stream->Tell(&position)); + return position; +} + // ------ arrow::io::FileOutputStream // [[Rcpp::export]] diff --git a/r/tests/testthat/test-compressed.R b/r/tests/testthat/test-compressed.R new file mode 100644 index 00000000000..dc865675f1e --- /dev/null +++ b/r/tests/testthat/test-compressed.R @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +context("arrow::io::Compressed.*Stream") + +test_that("can write Buffer to CompressedOutputStream and read back in CompressedInputStream", { + buf <- buffer(as.raw(sample(0:255, size = 1024, replace = TRUE))) + + tf1 <- local_tempfile() + stream1 <- CompressedOutputStream(tf1) + stream1$write(buf) + expect_error(stream1$tell()) + stream1$Close() + + tf2 <- local_tempfile() + sink2 <- file_output_stream(tf2) + stream2 <- CompressedOutputStream(sink2) + stream2$write(buf) + expect_error(stream2$tell()) + stream2$Close() + sink2$Close() + + + input1 <- CompressedInputStream(tf1) + buf1 <- input1$Read(1024L) + + file2 <- file_open(tf2) + input2 <- CompressedInputStream(file2) + buf2 <- input2$Read(1024L) + + expect_equal(buf, buf1) + expect_equal(buf, buf2) +}) + From c696ff313c9fe3ebe41c06cb0e0a43865df1542c Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 3 Dec 2018 11:15:15 +0100 Subject: [PATCH 4/8] lint --- r/src/io.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/r/src/io.cpp b/r/src/io.cpp index 1f5605f827b..2f9fe304d8f 100644 --- a/r/src/io.cpp +++ b/r/src/io.cpp @@ -118,7 +118,8 @@ std::shared_ptr io___BufferReader__initialize( // ------- arrow::io::Writable // [[Rcpp::export]] -void io___Writable__write(const std::shared_ptr& stream, const std::shared_ptr& buf) { +void io___Writable__write(const std::shared_ptr& stream, + const std::shared_ptr& buf) { STOP_IF_NOT_OK(stream->Write(buf->data(), buf->size())); } From af3714b0a9d996bf4b3599f7b0870e4d68156d48 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 3 Dec 2018 11:41:40 +0100 Subject: [PATCH 5/8] using size() until #3043 is merged --- r/tests/testthat/test-buffer.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/tests/testthat/test-buffer.R b/r/tests/testthat/test-buffer.R index 26ec8dfde0a..a071c35304c 100644 --- a/r/tests/testthat/test-buffer.R +++ b/r/tests/testthat/test-buffer.R @@ -42,5 +42,5 @@ test_that("arrow::Buffer can be created from complex vector", { vec <- complex(3) buf <- buffer(vec) expect_is(buf, "arrow::Buffer") - expect_equal(buf$size, 3 * 16) + expect_equal(buf$size(), 3 * 16) }) From 45c692f8271c0b849aa01858d86949332832bbd0 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 4 Dec 2018 21:33:16 +0100 Subject: [PATCH 6/8] adapt to changes from #3043 --- r/NAMESPACE | 23 ++++++----------------- r/R/compression.R | 4 ++-- r/tests/testthat/test-buffer.R | 2 +- r/tests/testthat/test-compressed.R | 10 +++++----- 4 files changed, 14 insertions(+), 25 deletions(-) diff --git a/r/NAMESPACE b/r/NAMESPACE index 324f58810d1..518cfc35819 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -14,6 +14,12 @@ S3method(FeatherTableReader,character) S3method(FeatherTableReader,default) S3method(FeatherTableReader,fs_path) S3method(FeatherTableWriter,"arrow::io::OutputStream") +S3method(CompressedInputStream,"arrow::io::InputStream") +S3method(CompressedInputStream,character) +S3method(CompressedInputStream,fs_path) +S3method(CompressedOutputStream,"arrow::io::OutputStream") +S3method(CompressedOutputStream,character) +S3method(CompressedOutputStream,fs_path) S3method(FixedSizeBufferWriter,"arrow::Buffer") S3method(FixedSizeBufferWriter,default) S3method(MessageReader,"arrow::io::InputStream") @@ -32,12 +38,6 @@ S3method(RecordBatchStreamReader,raw) S3method(RecordBatchStreamWriter,"arrow::io::OutputStream") S3method(RecordBatchStreamWriter,character) S3method(RecordBatchStreamWriter,fs_path) -S3method(CompressedInputStream,"arrow::io::InputStream") -S3method(CompressedInputStream,character) -S3method(CompressedInputStream,fs_path) -S3method(CompressedOutputStream,"arrow::io::OutputStream") -S3method(CompressedOutputStream,character) -S3method(CompressedOutputStream,fs_path) S3method(as_tibble,"arrow::RecordBatch") S3method(as_tibble,"arrow::Table") S3method(buffer,"arrow::Buffer") @@ -59,7 +59,6 @@ S3method(feather_table_reader,character) S3method(feather_table_reader,default) S3method(feather_table_reader,fs_path) S3method(feather_table_writer,"arrow::io::OutputStream") ->>>>>>> + Compressed streams S3method(length,"arrow::Array") S3method(names,"arrow::RecordBatch") S3method(print,"arrow-enum") @@ -90,16 +89,6 @@ S3method(write_feather_RecordBatch,default) S3method(write_feather_RecordBatch,fs_path) export(BufferOutputStream) export(BufferReader) -S3method(write_record_batch,"arrow::io::OutputStream") -S3method(write_record_batch,"arrow::ipc::RecordBatchWriter") -S3method(write_record_batch,character) -S3method(write_record_batch,fs_path) -S3method(write_record_batch,raw) -S3method(write_table,"arrow::io::OutputStream") -S3method(write_table,"arrow::ipc::RecordBatchWriter") -S3method(write_table,character) -S3method(write_table,fs_path) -S3method(write_table,raw) export(CompressedInputStream) export(CompressedOutputStream) export(CompressionType) diff --git a/r/R/compression.R b/r/R/compression.R index 30eb67fa076..083774ae0fe 100644 --- a/r/R/compression.R +++ b/r/R/compression.R @@ -51,7 +51,7 @@ CompressedOutputStream.character <- function(stream, codec = compression_codec(" #' @export CompressedOutputStream.fs_path <- function(stream, codec = compression_codec("GZIP")){ - CompressedOutputStream(file_output_stream(stream), codec = codec) + CompressedOutputStream(FileOutputStream(stream), codec = codec) } #' @export @@ -76,7 +76,7 @@ CompressedInputStream.character <- function(stream, codec = compression_codec("G #' @export CompressedInputStream.fs_path <- function(stream, codec = compression_codec("GZIP")){ - CompressedInputStream(file_open(stream), codec = codec) + CompressedInputStream(ReadableFile(stream), codec = codec) } #' @export diff --git a/r/tests/testthat/test-buffer.R b/r/tests/testthat/test-buffer.R index a071c35304c..26ec8dfde0a 100644 --- a/r/tests/testthat/test-buffer.R +++ b/r/tests/testthat/test-buffer.R @@ -42,5 +42,5 @@ test_that("arrow::Buffer can be created from complex vector", { vec <- complex(3) buf <- buffer(vec) expect_is(buf, "arrow::Buffer") - expect_equal(buf$size(), 3 * 16) + expect_equal(buf$size, 3 * 16) }) diff --git a/r/tests/testthat/test-compressed.R b/r/tests/testthat/test-compressed.R index dc865675f1e..5ed0df8768f 100644 --- a/r/tests/testthat/test-compressed.R +++ b/r/tests/testthat/test-compressed.R @@ -24,21 +24,21 @@ test_that("can write Buffer to CompressedOutputStream and read back in Compresse stream1 <- CompressedOutputStream(tf1) stream1$write(buf) expect_error(stream1$tell()) - stream1$Close() + stream1$close() tf2 <- local_tempfile() - sink2 <- file_output_stream(tf2) + sink2 <- FileOutputStream(tf2) stream2 <- CompressedOutputStream(sink2) stream2$write(buf) expect_error(stream2$tell()) - stream2$Close() - sink2$Close() + stream2$close() + sink2$close() input1 <- CompressedInputStream(tf1) buf1 <- input1$Read(1024L) - file2 <- file_open(tf2) + file2 <- ReadableFile(tf2) input2 <- CompressedInputStream(file2) buf2 <- input2$Read(1024L) From 795cc71d7d62b14dabf935d0672b6915e0329842 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 10 Dec 2018 19:15:43 +0100 Subject: [PATCH 7/8] rebase --- r/NAMESPACE | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/r/NAMESPACE b/r/NAMESPACE index 518cfc35819..61e7be4ef22 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -8,18 +8,18 @@ S3method("==","arrow::RecordBatch") S3method("==","arrow::ipc::Message") S3method(BufferReader,"arrow::Buffer") S3method(BufferReader,default) -S3method(FeatherTableReader,"arrow::io::RandomAccessFile") -S3method(FeatherTableReader,"arrow::ipc::feather::TableReader") -S3method(FeatherTableReader,character) -S3method(FeatherTableReader,default) -S3method(FeatherTableReader,fs_path) -S3method(FeatherTableWriter,"arrow::io::OutputStream") S3method(CompressedInputStream,"arrow::io::InputStream") S3method(CompressedInputStream,character) S3method(CompressedInputStream,fs_path) S3method(CompressedOutputStream,"arrow::io::OutputStream") S3method(CompressedOutputStream,character) S3method(CompressedOutputStream,fs_path) +S3method(FeatherTableReader,"arrow::io::RandomAccessFile") +S3method(FeatherTableReader,"arrow::ipc::feather::TableReader") +S3method(FeatherTableReader,character) +S3method(FeatherTableReader,default) +S3method(FeatherTableReader,fs_path) +S3method(FeatherTableWriter,"arrow::io::OutputStream") S3method(FixedSizeBufferWriter,"arrow::Buffer") S3method(FixedSizeBufferWriter,default) S3method(MessageReader,"arrow::io::InputStream") From 397baca5d44acf500723880c7c3609c40ea3d481 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 7 Jan 2019 16:19:10 +0100 Subject: [PATCH 8/8] rebasing --- r/NAMESPACE | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/r/NAMESPACE b/r/NAMESPACE index 61e7be4ef22..a6156db2b18 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -51,14 +51,6 @@ S3method(csv_table_reader,"arrow::io::InputStream") S3method(csv_table_reader,character) S3method(csv_table_reader,default) S3method(csv_table_reader,fs_path) -S3method(buffer_reader,"arrow::Buffer") -S3method(buffer_reader,default) -S3method(feather_table_reader,"arrow::io::RandomAccessFile") -S3method(feather_table_reader,"arrow::ipc::feather::TableReader") -S3method(feather_table_reader,character) -S3method(feather_table_reader,default) -S3method(feather_table_reader,fs_path) -S3method(feather_table_writer,"arrow::io::OutputStream") S3method(length,"arrow::Array") S3method(names,"arrow::RecordBatch") S3method(print,"arrow-enum") @@ -115,13 +107,11 @@ export(boolean) export(buffer) export(cast_options) export(chunked_array) +export(compression_codec) export(csv_convert_options) export(csv_parse_options) export(csv_read_options) export(csv_table_reader) -export(compressed_input_stream) -export(compressed_output_stream) -export(compression_codec) export(date32) export(date64) export(decimal)