Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions r/R/RecordBatch.R
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@
#' @export
record_batch <- function(..., schema = NULL){
arrays <- list2(...)
# making sure there are always names
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but why? IIUC this has to do with how the cpp code distinguishes things to autosplice, switching behavior on nchar(name). That's a subtlety I'll probably forget by Monday so it seems worth explaining.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

names(list(a = 1, b = 2))
#> [1] "a" "b"
names(list(a = 1, 2))
#> [1] "a" ""
names(list(1, 2))
#> NULL

otherwise we'd have to treat the NULL case differently internally as we did before, maybe I could do that instead.

if (is.null(names(arrays))) {
names(arrays) <- rep_len("", length(arrays))
}
stopifnot(length(arrays) > 0)
shared_ptr(`arrow::RecordBatch`, RecordBatch__from_arrays(schema, arrays))
}
4 changes: 4 additions & 0 deletions r/R/Table.R
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@
#' @export
table <- function(..., schema = NULL){
dots <- list2(...)
# making sure there are always names
if (is.null(names(dots))) {
names(dots) <- rep_len("", length(dots))
}
stopifnot(length(dots) > 0)
shared_ptr(`arrow::Table`, Table__from_dots(dots, schema))
}
Expand Down
2 changes: 2 additions & 0 deletions r/src/arrow_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ std::shared_ptr<arrow::RecordBatch> RecordBatch__from_dataframe(Rcpp::DataFrame
namespace arrow {
namespace r {

Status count_fields(SEXP lst, int* out);

std::shared_ptr<arrow::Array> Array__from_vector(
SEXP x, const std::shared_ptr<arrow::DataType>& type, bool type_infered);

Expand Down
113 changes: 81 additions & 32 deletions r/src/recordbatch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ std::shared_ptr<arrow::RecordBatch> ipc___ReadRecordBatch__InputStream__Schema(
return batch;
}

namespace arrow {
namespace r {

arrow::Status check_consistent_array_size(
const std::vector<std::shared_ptr<arrow::Array>>& arrays, int64_t* num_rows) {
if (arrays.size()) {
Expand All @@ -163,30 +166,69 @@ arrow::Status check_consistent_array_size(
return arrow::Status::OK();
}

Status count_fields(SEXP lst, int* out) {
int res = 0;
R_xlen_t n = XLENGTH(lst);
SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
for (R_xlen_t i = 0; i < n; i++) {
if (LENGTH(STRING_ELT(names, i)) > 0) {
++res;
} else {
SEXP x = VECTOR_ELT(lst, i);
if (Rf_inherits(x, "data.frame")) {
res += XLENGTH(x);
} else {
return Status::RError(
"only data frames are allowed as unnamed arguments to be auto spliced");
}
}
}
*out = res;
return Status::OK();
}

} // namespace r
} // namespace arrow

std::shared_ptr<arrow::RecordBatch> RecordBatch__from_arrays__known_schema(
const std::shared_ptr<arrow::Schema>& schema, SEXP lst) {
R_xlen_t n_arrays = XLENGTH(lst);
if (schema->num_fields() != n_arrays) {
int num_fields;
STOP_IF_NOT_OK(arrow::r::count_fields(lst, &num_fields));

if (schema->num_fields() != num_fields) {
Rcpp::stop("incompatible. schema has %d fields, and %d arrays are supplied",
schema->num_fields(), n_arrays);
schema->num_fields(), num_fields);
}

// convert lst to a vector of arrow::Array
std::vector<std::shared_ptr<arrow::Array>> arrays(n_arrays);
std::vector<std::shared_ptr<arrow::Array>> arrays(num_fields);
SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
bool has_names = !Rf_isNull(names);

for (R_xlen_t i = 0; i < n_arrays; i++) {
if (has_names && schema->field(i)->name() != CHAR(STRING_ELT(names, i))) {
Rcpp::stop("field at index %d has name '%s' != '%s'", i + 1,
schema->field(i)->name(), CHAR(STRING_ELT(names, i)));
auto fill_array = [&arrays, &schema](int j, SEXP x, SEXP name) {
if (schema->field(j)->name() != CHAR(name)) {
Rcpp::stop("field at index %d has name '%s' != '%s'", j + 1,
schema->field(j)->name(), CHAR(name));
}
arrays[j] = arrow::r::Array__from_vector(x, schema->field(j)->type(), false);
};

for (R_xlen_t i = 0, j = 0; j < num_fields; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you simplify the loop conditions? Make it only iterate over i and XLENGTH(lst) and use an external variable,e.g.

int out_idx = 0;
for (R_xlen_t i = 0; i < XLENGHT(lst); i++) {
  ...
  if (..) {
    for (...)
      fill_array(out_idx++, VECTOR_ELT(x_i, k), STRING_ELT(names_x_i, k));
  } else {
    fill_array(out_idx++, x_i, name_i);
  }
}
assert(out_idx == num_fields);

The loop is now invariant on j (out_idx in example) Easier to follow and review/refactor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bonus point, get rid of all of this and make a C++ iterator over lst that will behave as-if "flattened".

SEXP name_i = STRING_ELT(names, i);
SEXP x_i = VECTOR_ELT(lst, i);

if (LENGTH(name_i) == 0) {
SEXP names_x_i = Rf_getAttrib(x_i, R_NamesSymbol);
for (R_xlen_t k = 0; k < XLENGTH(x_i); k++, j++) {
fill_array(j, VECTOR_ELT(x_i, k), STRING_ELT(names_x_i, k));
}
} else {
fill_array(j, x_i, name_i);
j++;
}
arrays[i] =
arrow::r::Array__from_vector(VECTOR_ELT(lst, i), schema->field(i)->type(), false);
}

int64_t num_rows = 0;
STOP_IF_NOT_OK(check_consistent_array_size(arrays, &num_rows));
STOP_IF_NOT_OK(arrow::r::check_consistent_array_size(arrays, &num_rows));
return arrow::RecordBatch::Make(schema, num_rows, arrays);
}

Expand All @@ -197,38 +239,45 @@ std::shared_ptr<arrow::RecordBatch> RecordBatch__from_arrays(SEXP schema_sxp, SE
arrow::r::extract<arrow::Schema>(schema_sxp), lst);
}

R_xlen_t n_arrays = XLENGTH(lst);
int num_fields;
STOP_IF_NOT_OK(arrow::r::count_fields(lst, &num_fields));

// convert lst to a vector of arrow::Array
std::vector<std::shared_ptr<arrow::Array>> arrays(n_arrays);
for (R_xlen_t i = 0; i < n_arrays; i++) {
arrays[i] = Array__from_vector(VECTOR_ELT(lst, i), R_NilValue);
std::vector<std::shared_ptr<arrow::Array>> arrays(num_fields);
std::vector<std::string> arrays_names(num_fields);
SEXP names = Rf_getAttrib(lst, R_NamesSymbol);

auto fill_array = [&arrays, &arrays_names](int j, SEXP x, SEXP name) {
arrays[j] = Array__from_vector(x, R_NilValue);
arrays_names[j] = CHAR(name);
};

for (R_xlen_t i = 0, j = 0; j < num_fields; i++) {
SEXP name_i = STRING_ELT(names, i);
SEXP x_i = VECTOR_ELT(lst, i);
if (LENGTH(name_i) == 0) {
SEXP names_x_i = Rf_getAttrib(x_i, R_NamesSymbol);
for (R_xlen_t k = 0; k < XLENGTH(x_i); k++, j++) {
fill_array(j, VECTOR_ELT(x_i, k), STRING_ELT(names_x_i, k));
}
} else {
fill_array(j, x_i, name_i);
j++;
}
}

// generate schema from the types that have been infered
std::shared_ptr<arrow::Schema> schema;
if (Rf_inherits(schema_sxp, "arrow::Schema")) {
schema = arrow::r::extract<arrow::Schema>(schema_sxp);
} else {
Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol));
std::vector<std::shared_ptr<arrow::Field>> fields(n_arrays);
for (R_xlen_t i = 0; i < n_arrays; i++) {
fields[i] =
std::make_shared<arrow::Field>(std::string(names[i]), arrays[i]->type());
}
schema = std::make_shared<arrow::Schema>(std::move(fields));
}

Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol));
std::vector<std::shared_ptr<arrow::Field>> fields(n_arrays);
for (R_xlen_t i = 0; i < n_arrays; i++) {
fields[i] = std::make_shared<arrow::Field>(std::string(names[i]), arrays[i]->type());
std::vector<std::shared_ptr<arrow::Field>> fields(num_fields);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like Schema::Make(const std::vector<Array>, const std::vector<string>) could be a convenient method to have globally.

for (R_xlen_t i = 0; i < num_fields; i++) {
fields[i] = std::make_shared<arrow::Field>(arrays_names[i], arrays[i]->type());
}
schema = std::make_shared<arrow::Schema>(std::move(fields));

// check all sizes are the same
int64_t num_rows = 0;
STOP_IF_NOT_OK(check_consistent_array_size(arrays, &num_rows));
STOP_IF_NOT_OK(arrow::r::check_consistent_array_size(arrays, &num_rows));

return arrow::RecordBatch::Make(schema, num_rows, arrays);
}
Expand Down
74 changes: 52 additions & 22 deletions r/src/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,54 +117,84 @@ std::shared_ptr<arrow::Table> Table__from_dots(SEXP lst, SEXP schema_sxp) {
return tab;
}

R_xlen_t n = XLENGTH(lst);
std::vector<std::shared_ptr<arrow::Column>> columns(n);
int num_fields;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why the record_batch and table code isn't more shared

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be. record batch only have to handle arrays, where tables have to handle arrays, chunked arrays, columns. but I agree that the structure of the code is similar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should refactor the flatten-for loop into a small function if they're all the same.

STOP_IF_NOT_OK(arrow::r::count_fields(lst, &num_fields));

std::vector<std::shared_ptr<arrow::Column>> columns(num_fields);
std::shared_ptr<arrow::Schema> schema;

if (Rf_isNull(schema_sxp)) {
// infer the schema from the ...
std::vector<std::shared_ptr<arrow::Field>> fields(n);
Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol));
std::vector<std::shared_ptr<arrow::Field>> fields(num_fields);
SEXP names = Rf_getAttrib(lst, R_NamesSymbol);

for (R_xlen_t i = 0; i < n; i++) {
SEXP x = VECTOR_ELT(lst, i);
auto fill_one_column = [&columns, &fields](int j, SEXP x, SEXP name) {
if (Rf_inherits(x, "arrow::Column")) {
columns[i] = arrow::r::extract<arrow::Column>(x);
fields[i] = columns[i]->field();
columns[j] = arrow::r::extract<arrow::Column>(x);
fields[j] = columns[j]->field();
} else if (Rf_inherits(x, "arrow::ChunkedArray")) {
auto chunked_array = arrow::r::extract<arrow::ChunkedArray>(x);
fields[i] =
std::make_shared<arrow::Field>(std::string(names[i]), chunked_array->type());
columns[i] = std::make_shared<arrow::Column>(fields[i], chunked_array);
fields[j] = std::make_shared<arrow::Field>(CHAR(name), chunked_array->type());
columns[j] = std::make_shared<arrow::Column>(fields[j], chunked_array);
} else if (Rf_inherits(x, "arrow::Array")) {
auto array = arrow::r::extract<arrow::Array>(x);
fields[i] = std::make_shared<arrow::Field>(std::string(names[i]), array->type());
columns[i] = std::make_shared<arrow::Column>(fields[i], array);
fields[j] = std::make_shared<arrow::Field>(CHAR(name), array->type());
columns[j] = std::make_shared<arrow::Column>(fields[j], array);
} else {
auto array = Array__from_vector(x, R_NilValue);
fields[i] = std::make_shared<arrow::Field>(std::string(names[i]), array->type());
columns[i] = std::make_shared<arrow::Column>(fields[i], array);
fields[j] = std::make_shared<arrow::Field>(CHAR(name), array->type());
columns[j] = std::make_shared<arrow::Column>(fields[j], array);
}
};

for (R_xlen_t i = 0, j = 0; j < num_fields; i++) {
SEXP name_i = STRING_ELT(names, i);
SEXP x_i = VECTOR_ELT(lst, i);

if (LENGTH(name_i) == 0) {
SEXP names_x_i = Rf_getAttrib(x_i, R_NamesSymbol);
for (R_xlen_t k = 0; k < XLENGTH(x_i); k++, j++) {
fill_one_column(j, VECTOR_ELT(x_i, k), STRING_ELT(names_x_i, k));
}
} else {
fill_one_column(j, x_i, name_i);
j++;
}
}

schema = std::make_shared<arrow::Schema>(std::move(fields));
} else {
// use the schema that is given
schema = arrow::r::extract<arrow::Schema>(schema_sxp);

for (R_xlen_t i = 0; i < n; i++) {
SEXP x = VECTOR_ELT(lst, i);
auto fill_one_column = [&columns, &schema](int j, SEXP x) {
if (Rf_inherits(x, "arrow::Column")) {
columns[i] = arrow::r::extract<arrow::Column>(x);
columns[j] = arrow::r::extract<arrow::Column>(x);
} else if (Rf_inherits(x, "arrow::ChunkedArray")) {
auto chunked_array = arrow::r::extract<arrow::ChunkedArray>(x);
columns[i] = std::make_shared<arrow::Column>(schema->field(i), chunked_array);
columns[j] = std::make_shared<arrow::Column>(schema->field(j), chunked_array);
} else if (Rf_inherits(x, "arrow::Array")) {
auto array = arrow::r::extract<arrow::Array>(x);
columns[i] = std::make_shared<arrow::Column>(schema->field(i), array);
columns[j] = std::make_shared<arrow::Column>(schema->field(j), array);
} else {
auto type = schema->field(i)->type();
auto type = schema->field(j)->type();
auto array = arrow::r::Array__from_vector(x, type, false);
columns[i] = std::make_shared<arrow::Column>(schema->field(i), array);
columns[j] = std::make_shared<arrow::Column>(schema->field(j), array);
}
};

SEXP names = Rf_getAttrib(lst, R_NamesSymbol);
for (R_xlen_t i = 0, j = 0; j < num_fields; i++) {
SEXP name_i = STRING_ELT(names, i);
SEXP x_i = VECTOR_ELT(lst, i);

if (LENGTH(name_i) == 0) {
for (R_xlen_t k = 0; k < XLENGTH(x_i); k++, j++) {
fill_one_column(j, VECTOR_ELT(x_i, k));
}
} else {
fill_one_column(j, x_i);
j++;
}
}
}
Expand Down
53 changes: 52 additions & 1 deletion r/tests/testthat/test-RecordBatch.R
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,24 @@ test_that("record_batch() handles arrow::Array", {

test_that("record_batch() handles data frame columns", {
tib <- tibble::tibble(x = 1:10, y = 1:10)
# because tib is named here, this becomes a struct array
batch <- record_batch(a = 1:10, b = tib)
expect_equal(batch$schema, schema(a = int32(), struct(x = int32(), y = int32())))
expect_equal(batch$schema,
schema(
a = int32(),
struct(x = int32(), y = int32())
)
)
out <- as.data.frame(batch)
expect_equivalent(out, tibble::tibble(a = 1:10, b = tib))

# if not named, columns from tib are auto spliced
batch2 <- record_batch(a = 1:10, tib)
expect_equal(batch$schema,
schema(a = int32(), x = int32(), y = int32())
)
out <- as.data.frame(batch2)
expect_equivalent(out, tibble::tibble(a = 1:10, !!!tib))
})

test_that("record_batch() handles data frame columns with schema spec", {
Expand All @@ -170,3 +184,40 @@ test_that("record_batch() handles data frame columns with schema spec", {
schema <- schema(a = int32(), b = struct(x = int16(), y = utf8()))
expect_error(record_batch(a = 1:10, b = tib, schema = schema))
})

test_that("record_batch() auto splices (ARROW-5718)", {
df <- tibble::tibble(x = 1:10, y = letters[1:10])
batch1 <- record_batch(df)
batch2 <- record_batch(!!!df)
expect_equal(batch1, batch2)
expect_equal(batch1$schema, schema(x = int32(), y = utf8()))
expect_equivalent(as.data.frame(batch1), df)

batch3 <- record_batch(df, z = 1:10)
batch4 <- record_batch(!!!df, z = 1:10)
expect_equal(batch3, batch4)
expect_equal(batch3$schema, schema(x = int32(), y = utf8(), z = int32()))
expect_equivalent(as.data.frame(batch3), cbind(df, data.frame(z = 1:10)))

s <- schema(x = float64(), y = utf8())
batch5 <- record_batch(df, schema = s)
batch6 <- record_batch(!!!df, schema = s)
expect_equal(batch5, batch6)
expect_equal(batch5$schema, s)
expect_equivalent(as.data.frame(batch5), df)

s2 <- schema(x = float64(), y = utf8(), z = int16())
batch7 <- record_batch(df, z = 1:10, schema = s2)
batch8 <- record_batch(!!!df, z = 1:10, schema = s2)
expect_equal(batch7, batch8)
expect_equal(batch7$schema, s2)
expect_equivalent(as.data.frame(batch7), cbind(df, data.frame(z = 1:10)))
})

test_that("record_batch() only auto splice data frames", {
expect_error(
record_batch(1:10),
regexp = "only data frames are allowed as unnamed arguments to be auto spliced"
)
})

18 changes: 18 additions & 0 deletions r/tests/testthat/test-Table.R
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,21 @@ test_that("table() handles ... of arrays, chunked arrays, vectors", {
tibble::tibble(a = 1:10, b = 1:10, c = v, x = 1:10, y = letters[1:10])
)
})

test_that("table() auto splices (ARROW-5718)", {
df <- tibble::tibble(x = 1:10, y = letters[1:10])

tab1 <- table(df)
tab2 <- table(!!!df)
expect_equal(tab1, tab2)
expect_equal(tab1$schema, schema(x = int32(), y = utf8()))
expect_equivalent(as.data.frame(tab1), df)

s <- schema(x = float64(), y = utf8())
tab3 <- table(df, schema = s)
tab4 <- table(!!!df, schema = s)
expect_equal(tab3, tab4)
expect_equal(tab3$schema, s)
expect_equivalent(as.data.frame(tab3), df)
})