diff --git a/r/DESCRIPTION b/r/DESCRIPTION index 5303a877f9e..620cd548ee4 100644 --- a/r/DESCRIPTION +++ b/r/DESCRIPTION @@ -55,11 +55,12 @@ Collate: 'Table.R' 'array.R' 'buffer.R' + 'io.R' + 'compression.R' 'compute.R' 'csv.R' 'dictionary.R' 'feather.R' - 'io.R' 'memory_pool.R' 'message.R' 'on_exit.R' diff --git a/r/NAMESPACE b/r/NAMESPACE index f8f6384dce1..a6156db2b18 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -8,6 +8,12 @@ S3method("==","arrow::RecordBatch") S3method("==","arrow::ipc::Message") S3method(BufferReader,"arrow::Buffer") S3method(BufferReader,default) +S3method(CompressedInputStream,"arrow::io::InputStream") +S3method(CompressedInputStream,character) +S3method(CompressedInputStream,fs_path) +S3method(CompressedOutputStream,"arrow::io::OutputStream") +S3method(CompressedOutputStream,character) +S3method(CompressedOutputStream,fs_path) S3method(FeatherTableReader,"arrow::io::RandomAccessFile") S3method(FeatherTableReader,"arrow::ipc::feather::TableReader") S3method(FeatherTableReader,character) @@ -34,6 +40,7 @@ S3method(RecordBatchStreamWriter,character) S3method(RecordBatchStreamWriter,fs_path) S3method(as_tibble,"arrow::RecordBatch") S3method(as_tibble,"arrow::Table") +S3method(buffer,"arrow::Buffer") S3method(buffer,complex) S3method(buffer,default) S3method(buffer,integer) @@ -74,6 +81,9 @@ S3method(write_feather_RecordBatch,default) S3method(write_feather_RecordBatch,fs_path) export(BufferOutputStream) export(BufferReader) +export(CompressedInputStream) +export(CompressedOutputStream) +export(CompressionType) export(DateUnit) export(FeatherTableReader) export(FeatherTableWriter) @@ -97,6 +107,7 @@ export(boolean) export(buffer) export(cast_options) export(chunked_array) +export(compression_codec) export(csv_convert_options) export(csv_parse_options) export(csv_read_options) diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index c6fe8719f4e..3068582ec47 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -173,6 +173,18 @@ Column__data <- function(column) { .Call(`_arrow_Column__data`, column) } +util___Codec__Create <- function(codec) { + .Call(`_arrow_util___Codec__Create`, codec) +} + +io___CompressedOutputStream__Make <- function(codec, raw) { + .Call(`_arrow_io___CompressedOutputStream__Make`, codec, raw) +} + +io___CompressedInputStream__Make <- function(codec, raw) { + .Call(`_arrow_io___CompressedInputStream__Make`, codec, raw) +} + compute___CastOptions__initialize <- function(allow_int_overflow, allow_time_truncate, allow_float_truncate) { .Call(`_arrow_compute___CastOptions__initialize`, allow_int_overflow, allow_time_truncate, allow_float_truncate) } @@ -545,6 +557,14 @@ io___BufferReader__initialize <- function(buffer) { .Call(`_arrow_io___BufferReader__initialize`, buffer) } +io___Writable__write <- function(stream, buf) { + invisible(.Call(`_arrow_io___Writable__write`, stream, buf)) +} + +io___OutputStream__Tell <- function(stream) { + .Call(`_arrow_io___OutputStream__Tell`, stream) +} + io___FileOutputStream__Open <- function(path) { .Call(`_arrow_io___FileOutputStream__Open`, path) } diff --git a/r/R/buffer.R b/r/R/buffer.R index 2fecd0e4fc6..ca9b2eeff21 100644 --- a/r/R/buffer.R +++ b/r/R/buffer.R @@ -81,3 +81,8 @@ buffer.complex <- function(x) { shared_ptr(`arrow::Buffer`, r___RBuffer__initialize(x)) } +#' @export +`buffer.arrow::Buffer` <- function(x) { + x +} + diff --git a/r/R/compression.R b/r/R/compression.R new file mode 100644 index 00000000000..083774ae0fe --- /dev/null +++ b/r/R/compression.R @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#' @include enums.R +#' @include R6.R +#' @include io.R + +`arrow::util::Codec` <- R6Class("arrow::util::Codec", inherit = `arrow::Object`) + +`arrow::io::CompressedOutputStream` <- R6Class("arrow::io::CompressedOutputStream", inherit = `arrow::io::OutputStream`) +`arrow::io::CompressedInputStream` <- R6Class("arrow::io::CompressedInputStream", inherit = `arrow::io::InputStream`) + +#' codec +#' +#' @param type type of codec +#' +#' @export +compression_codec <- function(type = "GZIP") { + type <- CompressionType[[match.arg(type, names(CompressionType))]] + unique_ptr(`arrow::util::Codec`, util___Codec__Create(type)) +} + + +#' Compressed output stream +#' +#' @param stream Underlying raw output stream +#' @param codec a codec +#' @export +CompressedOutputStream <- function(stream, codec = compression_codec("GZIP")){ + UseMethod("CompressedOutputStream") +} + +#' @export +CompressedOutputStream.character <- function(stream, codec = compression_codec("GZIP")){ + CompressedOutputStream(fs::path_abs(stream), codec = codec) +} + +#' @export +CompressedOutputStream.fs_path <- function(stream, codec = compression_codec("GZIP")){ + CompressedOutputStream(FileOutputStream(stream), codec = codec) +} + +#' @export +`CompressedOutputStream.arrow::io::OutputStream` <- function(stream, codec = compression_codec("GZIP")) { + assert_that(inherits(codec, "arrow::util::Codec")) + shared_ptr(`arrow::io::CompressedOutputStream`, io___CompressedOutputStream__Make(codec, stream)) +} + +#' Compressed input stream +#' +#' @param stream Underlying raw input stream +#' @param codec a codec +#' @export +CompressedInputStream <- function(stream, codec = codec("GZIP")){ + UseMethod("CompressedInputStream") +} + +#' @export +CompressedInputStream.character <- function(stream, codec = compression_codec("GZIP")){ + CompressedInputStream(fs::path_abs(stream), codec = codec) +} + +#' @export +CompressedInputStream.fs_path <- function(stream, codec = compression_codec("GZIP")){ + CompressedInputStream(ReadableFile(stream), codec = codec) +} + +#' @export +`CompressedInputStream.arrow::io::InputStream` <- function(stream, codec = compression_codec("GZIP")) { + assert_that(inherits(codec, "arrow::util::Codec")) + shared_ptr(`arrow::io::CompressedInputStream`, io___CompressedInputStream__Make(codec, stream)) +} diff --git a/r/R/enums.R b/r/R/enums.R index 35e6aaa440e..3a6ac5c23bf 100644 --- a/r/R/enums.R +++ b/r/R/enums.R @@ -70,3 +70,9 @@ FileMode <- enum("arrow::io::FileMode", MessageType <- enum("arrow::ipc::Message::Type", NONE = 0L, SCHEMA = 1L, DICTIONARY_BATCH = 2L, RECORD_BATCH = 3L, TENSOR = 4L ) + +#' @rdname DataType +#' @export +CompressionType <- enum("arrow::Compression::type", + UNCOMPRESSED = 0L, SNAPPY = 1L, GZIP = 2L, BROTLI = 3L, ZSTD = 4L, LZ4 = 5L, LZO = 6L, BZ2 = 7L +) diff --git a/r/R/io.R b/r/R/io.R index b772be30acf..ad350f3d179 100644 --- a/r/R/io.R +++ b/r/R/io.R @@ -21,7 +21,11 @@ # OutputStream ------------------------------------------------------------ -`arrow::io::Writable` <- R6Class("arrow::io::Writable", inherit = `arrow::Object`) +`arrow::io::Writable` <- R6Class("arrow::io::Writable", inherit = `arrow::Object`, + public = list( + write = function(x) io___Writable__write(self, buffer(x)) + ) +) #' @title OutputStream #' @@ -38,7 +42,8 @@ #' @name arrow__io__OutputStream `arrow::io::OutputStream` <- R6Class("arrow::io::OutputStream", inherit = `arrow::io::Writable`, public = list( - close = function() io___OutputStream__Close(self) + close = function() io___OutputStream__Close(self), + tell = function() io___OutputStream__Tell(self) ) ) diff --git a/r/man/CompressedInputStream.Rd b/r/man/CompressedInputStream.Rd new file mode 100644 index 00000000000..cfff053083d --- /dev/null +++ b/r/man/CompressedInputStream.Rd @@ -0,0 +1,16 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/compression.R +\name{CompressedInputStream} +\alias{CompressedInputStream} +\title{Compressed input stream} +\usage{ +CompressedInputStream(stream, codec = codec("GZIP")) +} +\arguments{ +\item{stream}{Underlying raw input stream} + +\item{codec}{a codec} +} +\description{ +Compressed input stream +} diff --git a/r/man/CompressedOutputStream.Rd b/r/man/CompressedOutputStream.Rd new file mode 100644 index 00000000000..85c4d9209ac --- /dev/null +++ b/r/man/CompressedOutputStream.Rd @@ -0,0 +1,16 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/compression.R +\name{CompressedOutputStream} +\alias{CompressedOutputStream} +\title{Compressed output stream} +\usage{ +CompressedOutputStream(stream, codec = compression_codec("GZIP")) +} +\arguments{ +\item{stream}{Underlying raw output stream} + +\item{codec}{a codec} +} +\description{ +Compressed output stream +} diff --git a/r/man/DataType.Rd b/r/man/DataType.Rd index b10414020b5..bf5f1d40456 100644 --- a/r/man/DataType.Rd +++ b/r/man/DataType.Rd @@ -8,6 +8,7 @@ \alias{StatusCode} \alias{FileMode} \alias{MessageType} +\alias{CompressionType} \alias{int8} \alias{int16} \alias{int32} @@ -45,6 +46,8 @@ FileMode MessageType +CompressionType + int8() int16() diff --git a/r/man/compression_codec.Rd b/r/man/compression_codec.Rd new file mode 100644 index 00000000000..a7db8ab0deb --- /dev/null +++ b/r/man/compression_codec.Rd @@ -0,0 +1,14 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/compression.R +\name{compression_codec} +\alias{compression_codec} +\title{codec} +\usage{ +compression_codec(type = "GZIP") +} +\arguments{ +\item{type}{type of codec} +} +\description{ +codec +} diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index 1e8fed18676..68f6b417960 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -493,6 +493,41 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// util___Codec__Create +std::unique_ptr util___Codec__Create(arrow::Compression::type codec); +RcppExport SEXP _arrow_util___Codec__Create(SEXP codecSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< arrow::Compression::type >::type codec(codecSEXP); + rcpp_result_gen = Rcpp::wrap(util___Codec__Create(codec)); + return rcpp_result_gen; +END_RCPP +} +// io___CompressedOutputStream__Make +std::shared_ptr io___CompressedOutputStream__Make(const std::unique_ptr& codec, const std::shared_ptr& raw); +RcppExport SEXP _arrow_io___CompressedOutputStream__Make(SEXP codecSEXP, SEXP rawSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::unique_ptr& >::type codec(codecSEXP); + Rcpp::traits::input_parameter< const std::shared_ptr& >::type raw(rawSEXP); + rcpp_result_gen = Rcpp::wrap(io___CompressedOutputStream__Make(codec, raw)); + return rcpp_result_gen; +END_RCPP +} +// io___CompressedInputStream__Make +std::shared_ptr io___CompressedInputStream__Make(const std::unique_ptr& codec, const std::shared_ptr& raw); +RcppExport SEXP _arrow_io___CompressedInputStream__Make(SEXP codecSEXP, SEXP rawSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::unique_ptr& >::type codec(codecSEXP); + Rcpp::traits::input_parameter< const std::shared_ptr& >::type raw(rawSEXP); + rcpp_result_gen = Rcpp::wrap(io___CompressedInputStream__Make(codec, raw)); + return rcpp_result_gen; +END_RCPP +} // compute___CastOptions__initialize std::shared_ptr compute___CastOptions__initialize(bool allow_int_overflow, bool allow_time_truncate, bool allow_float_truncate); RcppExport SEXP _arrow_compute___CastOptions__initialize(SEXP allow_int_overflowSEXP, SEXP allow_time_truncateSEXP, SEXP allow_float_truncateSEXP) { @@ -1526,6 +1561,28 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// io___Writable__write +void io___Writable__write(const std::shared_ptr& stream, const std::shared_ptr& buf); +RcppExport SEXP _arrow_io___Writable__write(SEXP streamSEXP, SEXP bufSEXP) { +BEGIN_RCPP + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::shared_ptr& >::type stream(streamSEXP); + Rcpp::traits::input_parameter< const std::shared_ptr& >::type buf(bufSEXP); + io___Writable__write(stream, buf); + return R_NilValue; +END_RCPP +} +// io___OutputStream__Tell +int64_t io___OutputStream__Tell(const std::shared_ptr& stream); +RcppExport SEXP _arrow_io___OutputStream__Tell(SEXP streamSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::shared_ptr& >::type stream(streamSEXP); + rcpp_result_gen = Rcpp::wrap(io___OutputStream__Tell(stream)); + return rcpp_result_gen; +END_RCPP +} // io___FileOutputStream__Open std::shared_ptr io___FileOutputStream__Open(const std::string& path); RcppExport SEXP _arrow_io___FileOutputStream__Open(SEXP pathSEXP) { @@ -2264,6 +2321,9 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_Column__null_count", (DL_FUNC) &_arrow_Column__null_count, 1}, {"_arrow_Column__type", (DL_FUNC) &_arrow_Column__type, 1}, {"_arrow_Column__data", (DL_FUNC) &_arrow_Column__data, 1}, + {"_arrow_util___Codec__Create", (DL_FUNC) &_arrow_util___Codec__Create, 1}, + {"_arrow_io___CompressedOutputStream__Make", (DL_FUNC) &_arrow_io___CompressedOutputStream__Make, 2}, + {"_arrow_io___CompressedInputStream__Make", (DL_FUNC) &_arrow_io___CompressedInputStream__Make, 2}, {"_arrow_compute___CastOptions__initialize", (DL_FUNC) &_arrow_compute___CastOptions__initialize, 3}, {"_arrow_Array__cast", (DL_FUNC) &_arrow_Array__cast, 3}, {"_arrow_ChunkedArray__cast", (DL_FUNC) &_arrow_ChunkedArray__cast, 3}, @@ -2357,6 +2417,8 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_io___MemoryMappedFile__Resize", (DL_FUNC) &_arrow_io___MemoryMappedFile__Resize, 2}, {"_arrow_io___ReadableFile__Open", (DL_FUNC) &_arrow_io___ReadableFile__Open, 1}, {"_arrow_io___BufferReader__initialize", (DL_FUNC) &_arrow_io___BufferReader__initialize, 1}, + {"_arrow_io___Writable__write", (DL_FUNC) &_arrow_io___Writable__write, 2}, + {"_arrow_io___OutputStream__Tell", (DL_FUNC) &_arrow_io___OutputStream__Tell, 1}, {"_arrow_io___FileOutputStream__Open", (DL_FUNC) &_arrow_io___FileOutputStream__Open, 1}, {"_arrow_io___BufferOutputStream__Create", (DL_FUNC) &_arrow_io___BufferOutputStream__Create, 1}, {"_arrow_io___BufferOutputStream__capacity", (DL_FUNC) &_arrow_io___BufferOutputStream__capacity, 1}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 6fef7997dbf..e8b06e10db7 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -23,12 +23,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #define STOP_IF_NOT(TEST, MSG) \ do { \ @@ -127,6 +129,7 @@ RCPP_EXPOSED_ENUM_NODECL(arrow::TimeUnit::type) RCPP_EXPOSED_ENUM_NODECL(arrow::StatusCode) RCPP_EXPOSED_ENUM_NODECL(arrow::io::FileMode::type) RCPP_EXPOSED_ENUM_NODECL(arrow::ipc::Message::Type) +RCPP_EXPOSED_ENUM_NODECL(arrow::Compression::type) namespace Rcpp { namespace internal { diff --git a/r/src/compression.cpp b/r/src/compression.cpp new file mode 100644 index 00000000000..4c522d85f5d --- /dev/null +++ b/r/src/compression.cpp @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow_types.h" + +// [[Rcpp::export]] +std::unique_ptr util___Codec__Create(arrow::Compression::type codec) { + std::unique_ptr out; + STOP_IF_NOT_OK(arrow::util::Codec::Create(codec, &out)); + return out; +} + +// [[Rcpp::export]] +std::shared_ptr io___CompressedOutputStream__Make( + const std::unique_ptr& codec, + const std::shared_ptr& raw) { + std::shared_ptr stream; + STOP_IF_NOT_OK(arrow::io::CompressedOutputStream::Make(codec.get(), raw, &stream)); + return stream; +} + +// [[Rcpp::export]] +std::shared_ptr io___CompressedInputStream__Make( + const std::unique_ptr& codec, + const std::shared_ptr& raw) { + std::shared_ptr stream; + STOP_IF_NOT_OK(arrow::io::CompressedInputStream::Make(codec.get(), raw, &stream)); + return stream; +} diff --git a/r/src/io.cpp b/r/src/io.cpp index b8d2d53ad21..2f9fe304d8f 100644 --- a/r/src/io.cpp +++ b/r/src/io.cpp @@ -115,6 +115,23 @@ std::shared_ptr io___BufferReader__initialize( return std::make_shared(buffer); } +// ------- arrow::io::Writable + +// [[Rcpp::export]] +void io___Writable__write(const std::shared_ptr& stream, + const std::shared_ptr& buf) { + STOP_IF_NOT_OK(stream->Write(buf->data(), buf->size())); +} + +// ------- arrow::io::OutputStream + +// [[Rcpp::export]] +int64_t io___OutputStream__Tell(const std::shared_ptr& stream) { + int64_t position; + STOP_IF_NOT_OK(stream->Tell(&position)); + return position; +} + // ------ arrow::io::FileOutputStream // [[Rcpp::export]] diff --git a/r/tests/testthat/test-compressed.R b/r/tests/testthat/test-compressed.R new file mode 100644 index 00000000000..5ed0df8768f --- /dev/null +++ b/r/tests/testthat/test-compressed.R @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +context("arrow::io::Compressed.*Stream") + +test_that("can write Buffer to CompressedOutputStream and read back in CompressedInputStream", { + buf <- buffer(as.raw(sample(0:255, size = 1024, replace = TRUE))) + + tf1 <- local_tempfile() + stream1 <- CompressedOutputStream(tf1) + stream1$write(buf) + expect_error(stream1$tell()) + stream1$close() + + tf2 <- local_tempfile() + sink2 <- FileOutputStream(tf2) + stream2 <- CompressedOutputStream(sink2) + stream2$write(buf) + expect_error(stream2$tell()) + stream2$close() + sink2$close() + + + input1 <- CompressedInputStream(tf1) + buf1 <- input1$Read(1024L) + + file2 <- ReadableFile(tf2) + input2 <- CompressedInputStream(file2) + buf2 <- input2$Read(1024L) + + expect_equal(buf, buf1) + expect_equal(buf, buf2) +}) +