Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions r/R/arrow-datum.R
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ eval_array_expression <- function(FUN,
}

#' @export
na.omit.ArrowDatum <- function(object, ...){
na.omit.ArrowDatum <- function(object, ...) {
object$Filter(!is.na(object))
}

#' @export
na.exclude.ArrowDatum <- na.omit.ArrowDatum

#' @export
na.fail.ArrowDatum <- function(object, ...){
na.fail.ArrowDatum <- function(object, ...) {
if (object$null_count > 0) {
stop("missing values in object", call. = FALSE)
}
Expand Down
2 changes: 1 addition & 1 deletion r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ ArrowObject <- R6Class("ArrowObject",
class_title <- class(self)[[1]]
}
cat(class_title, "\n", sep = "")
if (!is.null(self$ToString)){
if (!is.null(self$ToString)) {
cat(self$ToString(), "\n", sep = "")
}
invisible(self)
Expand Down
4 changes: 2 additions & 2 deletions r/R/arrow-tabular.R
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ head.ArrowTabular <- head.ArrowDatum
tail.ArrowTabular <- tail.ArrowDatum

#' @export
na.fail.ArrowTabular <- function(object, ...){
na.fail.ArrowTabular <- function(object, ...) {
for (col in seq_len(object$num_columns)) {
if (object$column(col - 1L)$null_count > 0) {
stop("missing values in object", call. = FALSE)
Expand All @@ -222,7 +222,7 @@ na.fail.ArrowTabular <- function(object, ...){
}

#' @export
na.omit.ArrowTabular <- function(object, ...){
na.omit.ArrowTabular <- function(object, ...) {
not_na <- map(object$columns, ~call_function("is_valid", .x))
not_na_agg <- Reduce("&", not_na)
object$Filter(not_na_agg)
Expand Down
4 changes: 2 additions & 2 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion r/R/chunked-array.R
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ ChunkedArray <- R6Class("ChunkedArray", inherit = ArrowDatum,
type_id = function() ChunkedArray__type(self)$id,
chunk = function(i) Array$create(ChunkedArray__chunk(self, i)),
as_vector = function() ChunkedArray__as_vector(self),
Slice = function(offset, length = NULL){
Slice = function(offset, length = NULL) {
if (is.null(length)) {
ChunkedArray__Slice1(self, offset)
} else {
Expand Down
4 changes: 2 additions & 2 deletions r/R/compression.R
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ compression_from_name <- function(name) {
#' @export
#' @include arrow-package.R
CompressedOutputStream <- R6Class("CompressedOutputStream", inherit = OutputStream)
CompressedOutputStream$create <- function(stream, codec = "gzip", compression_level = NA){
CompressedOutputStream$create <- function(stream, codec = "gzip", compression_level = NA) {
codec <- Codec$create(codec, compression_level = compression_level)
if (is.string(stream)) {
stream <- FileOutputStream$create(stream)
Expand All @@ -113,7 +113,7 @@ CompressedOutputStream$create <- function(stream, codec = "gzip", compression_le
#' @format NULL
#' @export
CompressedInputStream <- R6Class("CompressedInputStream", inherit = InputStream)
CompressedInputStream$create <- function(stream, codec = "gzip", compression_level = NA){
CompressedInputStream$create <- function(stream, codec = "gzip", compression_level = NA) {
codec <- Codec$create(codec, compression_level = compression_level)
if (is.string(stream)) {
stream <- ReadableFile$create(stream)
Expand Down
4 changes: 2 additions & 2 deletions r/R/compute.R
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ unique.ArrowDatum <- function(x, incomparables = FALSE, ...) {
}

#' @export
any.ArrowDatum <- function(..., na.rm = FALSE){
any.ArrowDatum <- function(..., na.rm = FALSE) {

a <- collect_arrays_from_dots(list(...))
result <- call_function("any", a)
Expand All @@ -217,7 +217,7 @@ any.ArrowDatum <- function(..., na.rm = FALSE){
}

#' @export
all.ArrowDatum <- function(..., na.rm = FALSE){
all.ArrowDatum <- function(..., na.rm = FALSE) {

a <- collect_arrays_from_dots(list(...))
result <- call_function("all", a)
Expand Down
6 changes: 3 additions & 3 deletions r/R/csv.R
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ CsvReadOptions$create <- function(use_threads = option_use_threads(),
#' @rdname CsvReadOptions
#' @export
CsvWriteOptions <- R6Class("CsvWriteOptions", inherit = ArrowObject)
CsvWriteOptions$create <- function(include_header = TRUE, batch_size = 1024L){
CsvWriteOptions$create <- function(include_header = TRUE, batch_size = 1024L) {
assert_that(is_integerish(batch_size, n = 1, finite = TRUE), batch_size > 0)
csv___WriteOptions__initialize(
list(
Expand Down Expand Up @@ -637,9 +637,9 @@ write_csv_arrow <- function(x,
on.exit(sink$close())
}

if(inherits(x, "RecordBatch")){
if (inherits(x, "RecordBatch")) {
csv___WriteCSV__RecordBatch(x, write_options, sink)
} else if(inherits(x, "Table")){
} else if (inherits(x, "Table")) {
csv___WriteCSV__Table(x, write_options, sink)
}

Expand Down
4 changes: 2 additions & 2 deletions r/R/enums.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
# under the License.

#' @export
`print.arrow-enum` <- function(x, ...){
`print.arrow-enum` <- function(x, ...) {
NextMethod()
}

enum <- function(class, ..., .list = list(...)){
enum <- function(class, ..., .list = list(...)) {
structure(
.list,
class = c(class, "arrow-enum")
Expand Down
2 changes: 1 addition & 1 deletion r/R/filesystem.R
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ FileSystem <- R6Class("FileSystem", inherit = ArrowObject,
GetFileInfo = function(x) {
if (inherits(x, "FileSelector")) {
fs___FileSystem__GetTargetInfos_FileSelector(self, x)
} else if (is.character(x)){
} else if (is.character(x)) {
fs___FileSystem__GetTargetInfos_Paths(self, clean_path_rel(x))
} else {
abort("incompatible type for FileSystem$GetFileInfo()")
Expand Down
2 changes: 1 addition & 1 deletion r/R/metadata.R
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ apply_arrow_r_metadata <- function(x, r_metadata) {
x[[name]] <- apply_arrow_r_metadata(x[[name]], columns_metadata[[name]])
}
}
} else if(is.list(x) && !inherits(x, "POSIXlt") && !is.null(columns_metadata)) {
} else if (is.list(x) && !inherits(x, "POSIXlt") && !is.null(columns_metadata)) {
x <- map2(x, columns_metadata, function(.x, .y) {
apply_arrow_r_metadata(.x, .y)
})
Expand Down
4 changes: 2 additions & 2 deletions r/R/parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder", inhe
parquet___ArrowWriterProperties___Builder__set_compressions
)
},
set_compression_level = function(table, compression_level){
set_compression_level = function(table, compression_level) {
# cast to integer but keep names
compression_level <- set_names(as.integer(compression_level), names(compression_level))
private$.set(table, compression_level,
Expand Down Expand Up @@ -558,7 +558,7 @@ ParquetArrowReaderProperties <- R6Class("ParquetArrowReaderProperties",
),
active = list(
use_threads = function(use_threads) {
if(missing(use_threads)) {
if (missing(use_threads)) {
parquet___arrow___ArrowReaderProperties__get_use_threads(self)
} else {
parquet___arrow___ArrowReaderProperties__set_use_threads(self, use_threads)
Expand Down
3 changes: 3 additions & 0 deletions r/R/record-batch.R
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ RecordBatch$create <- function(..., schema = NULL) {
return(dplyr::group_by(out, !!!dplyr::groups(arrays[[1]])))
}

# If any arrays are length 1, recycle them
arrays <- recycle_scalars(arrays)

# TODO: should this also assert that they're all Arrays?
RecordBatch__from_arrays(schema, arrays)
}
Expand Down
2 changes: 1 addition & 1 deletion r/R/scalar.R
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Scalar <- R6Class("Scalar",
ToString = function() Scalar__ToString(self),
type_id = function() Scalar__type(self)$id,
as_vector = function() Scalar__as_vector(self),
as_array = function() MakeArrayFromScalar(self),
as_array = function(length = 1L) MakeArrayFromScalar(self, as.integer(length)),
Equals = function(other, ...) {
inherits(other, "Scalar") && Scalar__Equals(self, other)
},
Expand Down
19 changes: 11 additions & 8 deletions r/R/table.R
Original file line number Diff line number Diff line change
Expand Up @@ -166,18 +166,21 @@ Table$create <- function(..., schema = NULL) {
names(dots) <- rep_len("", length(dots))
}
stopifnot(length(dots) > 0)

if (all_record_batches(dots)) {
return(Table__from_record_batches(dots, schema))
}

# If any arrays are length 1, recycle them
dots <- recycle_scalars(dots)

out <- Table__from_dots(dots, schema, option_use_threads())

# Preserve any grouping
if (length(dots) == 1 && inherits(dots[[1]], "grouped_df")) {
out <- Table__from_dots(dots, schema, option_use_threads())
return(dplyr::group_by(out, !!!dplyr::groups(dots[[1]])))
}

if (all_record_batches(dots)) {
Table__from_record_batches(dots, schema)
} else {
Table__from_dots(dots, schema, option_use_threads())
out <- dplyr::group_by(out, !!!dplyr::groups(dots[[1]]))
}
out
}

#' @export
Expand Down
45 changes: 45 additions & 0 deletions r/R/util.R
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,48 @@ attr(is_writable_table, "fail") <- function(call, env){
)
}

#' Recycle scalar values in a list of arrays
#'
#' @param arrays List of arrays
#' @return List of arrays with any vector/Scalar/Array/ChunkedArray values of length 1 recycled
#' @keywords internal
recycle_scalars <- function(arrays){
# Get lengths of items in arrays
arr_lens <- map_int(arrays, NROW)

is_scalar <- arr_lens == 1

if (length(arrays) > 1 && any(is_scalar) && !all(is_scalar)) {

# Recycling not supported for tibbles and data.frames
if (all(map_lgl(arrays, ~inherits(.x, "data.frame")))) {

abort(c(
"All input tibbles or data.frames must have the same number of rows",
x = paste(
"Number of rows in longest and shortest inputs:",
oxford_paste(c(max(arr_lens), min(arr_lens)))
)
))
}

max_array_len <- max(arr_lens)
arrays[is_scalar] <- lapply(arrays[is_scalar], repeat_value_as_array, max_array_len)
}
arrays
}

#' Take an object of length 1 and repeat it.
#'
#' @param object Object of length 1 to be repeated - vector, `Scalar`, `Array`, or `ChunkedArray`
#' @param n Number of repetitions
#'
#' @return `Array` of length `n`
#'
#' @keywords internal
repeat_value_as_array <- function(object, n) {
if (inherits(object, "ChunkedArray")) {
return(Scalar$create(object$chunks[[1]])$as_array(n))
}
return(Scalar$create(object)$as_array(n))
}
8 changes: 4 additions & 4 deletions r/data-raw/codegen.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ get_exported_functions <- function(decorations, export_tag) {

glue_collapse_data <- function(data, ..., sep = ", ", last = "") {
res <- glue_collapse(glue_data(data, ...), sep = sep, last = last)
if(length(res) == 0) res <- ""
if (length(res) == 0) res <- ""
res
}

wrap_call <- function(name, return_type, args) {
call <- glue::glue('{name}({list_params})', list_params = glue_collapse_data(args, "{name}"))
if(return_type == "void") {
if (return_type == "void") {
glue::glue("\t{call};\n\treturn R_NilValue;", .trim = FALSE)
} else {
glue::glue("\treturn cpp11::as_sexp({call});")
Expand Down Expand Up @@ -149,7 +149,7 @@ cpp_functions_definitions <- arrow_exports %>%
sep = "\n",
real_params = glue_collapse_data(args, "{type} {name}"),
input_params = glue_collapse_data(args, "\tarrow::r::Input<{type}>::type {name}({name}_sexp);", sep = "\n"),
return_line = if(nrow(args)) "\n" else "")
return_line = if (nrow(args)) "\n" else "")

glue::glue('
// {basename(file)}
Expand All @@ -162,7 +162,7 @@ cpp_functions_definitions <- arrow_exports %>%

cpp_functions_registration <- arrow_exports %>%
select(name, return_type, args) %>%
pmap_chr(function(name, return_type, args){
pmap_chr(function(name, return_type, args) {
glue('\t\t{{ "_arrow_{name}", (DL_FUNC) &_arrow_{name}, {nrow(args)}}}, ')
}) %>%
glue_collapse(sep = "\n")
Expand Down
4 changes: 2 additions & 2 deletions r/extra-tests/helpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ if_version_less_than <- function(version) {
}

skip_if_version_less_than <- function(version, msg) {
if(if_version(version, `<`)) {
if (if_version(version, `<`)) {
skip(msg)
}
}

skip_if_version_equals <- function(version, msg) {
if(if_version(version, `==`)) {
if (if_version(version, `==`)) {
skip(msg)
}
}
2 changes: 1 addition & 1 deletion r/extra-tests/write-files.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ source("tests/testthat/helper-data.R")
write_parquet(example_with_metadata, "extra-tests/files/ex_data.parquet")

for (comp in c("lz4", "uncompressed", "zstd")) {
if(!codec_is_available(comp)) break
if (!codec_is_available(comp)) break

name <- paste0("extra-tests/files/ex_data_", comp, ".feather")
write_feather(example_with_metadata, name, compression = comp)
Expand Down
18 changes: 18 additions & 0 deletions r/man/recycle_scalars.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions r/man/repeat_value_as_array.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading