From 55cf80843f77e01b31e36a636859302982f7ca8a Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Thu, 17 Oct 2019 13:56:54 -0400 Subject: [PATCH 01/18] Use new RecordBatch Filter method --- r/src/compute.cpp | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/r/src/compute.cpp b/r/src/compute.cpp index bfb5f6f01e2..e92023b034d 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -187,15 +187,10 @@ std::shared_ptr Array__Filter(const std::shared_ptr& std::shared_ptr RecordBatch__Filter( const std::shared_ptr& batch, const std::shared_ptr& filter) { - int ncols = batch->num_columns(); - - std::vector> columns(ncols); - - for (R_xlen_t j = 0; j < ncols; j++) { - columns[j] = Array__Filter(batch->column(j), filter); - } - - return arrow::RecordBatch::Make(batch->schema(), columns[0]->length(), columns); + std::shared_ptr out; + arrow::compute::FunctionContext context; + STOP_IF_NOT_OK(arrow::compute::Filter(&context, *batch, *filter, &out)); + return out; } // [[arrow::export]] From 6793e58a1ad9312531081ca490089cc151380382 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Thu, 17 Oct 2019 14:46:22 -0400 Subject: [PATCH 02/18] Move RecordBatch Take to cpp; fill in a test for RecordBatch Filter --- cpp/src/arrow/compute/kernels/filter_test.cc | 50 +++++++++++++++ cpp/src/arrow/compute/kernels/take.cc | 16 +++++ cpp/src/arrow/compute/kernels/take.h | 15 +++++ cpp/src/arrow/compute/kernels/take_test.cc | 65 ++++++++++++++++++++ r/src/compute.cpp | 15 ++--- 5 files changed, 151 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter_test.cc b/cpp/src/arrow/compute/kernels/filter_test.cc index bb685f474dc..e332d4d31ed 100644 --- a/cpp/src/arrow/compute/kernels/filter_test.cc +++ b/cpp/src/arrow/compute/kernels/filter_test.cc @@ -468,5 +468,55 @@ TEST_F(TestFilterKernelWithUnion, FilterUnion) { } } +class TestFilterKernelWithRecordBatch : public TestFilterKernel { + public: + void AssertFilter(const std::shared_ptr& schm, const std::string& batch_json, + const std::string& selection, const std::string& expected_batch) { + std::shared_ptr actual; + + ASSERT_OK(this->Filter(schm, batch_json, selection, &actual)); + ASSERT_OK(actual->Validate()); + ASSERT_BATCHES_EQUAL(*MakeBatch(schm, expected_batch), *actual); + } + + std::shared_ptr MakeBatch(const std::shared_ptr& schm, const std::string& batch_json) { + auto struct_array = ArrayFromJSON(struct_(schm->fields()), batch_json); + std::shared_ptr batch; + ARROW_EXPECT_OK(RecordBatch::FromStructArray(struct_array, &batch)); + return batch; + } + + Status Filter(const std::shared_ptr& schm, const std::string& batch_json, + const std::string& selection, + std::shared_ptr* out) { + auto batch = MakeBatch(schm, batch_json); + return arrow::compute::Filter(&this->ctx_, *batch, + *ArrayFromJSON(boolean(), selection), out); + } +}; + +TEST_F(TestFilterKernelWithRecordBatch, FilterRecordBatch) { + std::vector> fields = {field("a", int32()), field("b", utf8())}; + auto schm = schema(fields); + + auto struct_json = R"([ + {"a": null, "b": "yo"}, + {"a": 1, "b": ""}, + {"a": 2, "b": "hello"}, + {"a": 4, "b": "eh"} + ])"; + this->AssertFilter(schm, struct_json, "[0, 0, 0, 0]", "[]"); + this->AssertFilter(schm, struct_json, "[0, 1, 1, null]", R"([ + {"a": 1, "b": ""}, + {"a": 2, "b": "hello"}, + {"a": null, "b": null} + ])"); + this->AssertFilter(schm, struct_json, "[1, 1, 1, 1]", struct_json); + this->AssertFilter(schm, struct_json, "[1, 0, 1, 0]", R"([ + {"a": null, "b": "yo"}, + {"a": 2, "b": "hello"} + ])"); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index a5210d79312..8618e99aeab 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -99,5 +99,21 @@ Status Take(FunctionContext* ctx, const Datum& values, const Datum& indices, return kernel->Call(ctx, values, indices, out); } +Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices, + const TakeOptions& options, std::shared_ptr* out) { + + + auto ncols = batch.num_columns(); + auto nrows = indices.length(); + + std::vector> columns(ncols); + + for (int j = 0; j < ncols; j++) { + RETURN_NOT_OK(Take(ctx, *batch.column(j), indices, options, &columns[j])); + } + *out = RecordBatch::Make(batch.schema(), nrows, columns); + return Status::OK(); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/take.h b/cpp/src/arrow/compute/kernels/take.h index f064b7265ed..5a297a63278 100644 --- a/cpp/src/arrow/compute/kernels/take.h +++ b/cpp/src/arrow/compute/kernels/take.h @@ -53,6 +53,21 @@ ARROW_EXPORT Status Take(FunctionContext* ctx, const Array& values, const Array& indices, const TakeOptions& options, std::shared_ptr* out); +/// \brief Take from a record batch at indices in another array +/// +/// The output batch will have the same schema as the input batch, +/// with rows taken from the values array at the given +/// indices. If an index is null then the taken element will be null. +/// +/// \param[in] ctx the FunctionContext +/// \param[in] batch array from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting record batch +ARROW_EXPORT +Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices, + const TakeOptions& options, std::shared_ptr* out); + /// \brief Take from an array of values at indices in another array /// /// \param[in] ctx the FunctionContext diff --git a/cpp/src/arrow/compute/kernels/take_test.cc b/cpp/src/arrow/compute/kernels/take_test.cc index 9cd689f601d..e65e1ef875b 100644 --- a/cpp/src/arrow/compute/kernels/take_test.cc +++ b/cpp/src/arrow/compute/kernels/take_test.cc @@ -527,5 +527,70 @@ TEST_F(TestPermutationsWithTake, InvertPermutation) { } } +class TestTakeKernelWithRecordBatch : public TestTakeKernel { + public: + void AssertTake(const std::shared_ptr& schm, const std::string& batch_json, + const std::string& indices, const std::string& expected_batch) { + std::shared_ptr actual; + + for (auto index_type : {int8(), uint32()}) { + ASSERT_OK(this->Take(schm, batch_json, index_type, indices, &actual)); + ASSERT_OK(actual->Validate()); + ASSERT_BATCHES_EQUAL(*MakeBatch(schm, expected_batch), *actual); + } + } + + std::shared_ptr MakeBatch(const std::shared_ptr& schm, const std::string& batch_json) { + auto struct_array = ArrayFromJSON(struct_(schm->fields()), batch_json); + std::shared_ptr batch; + ARROW_EXPECT_OK(RecordBatch::FromStructArray(struct_array, &batch)); + return batch; + } + + Status Take(const std::shared_ptr& schm, const std::string& batch_json, + const std::shared_ptr& index_type, const std::string& indices, + std::shared_ptr* out) { + auto batch = MakeBatch(schm, batch_json); + TakeOptions options; + return arrow::compute::Take(&this->ctx_, *batch, + *ArrayFromJSON(index_type, indices), options, out); + } +}; + +TEST_F(TestTakeKernelWithRecordBatch, TakeRecordBatch) { + std::vector> fields = {field("a", int32()), field("b", utf8())}; + auto schm = schema(fields); + + auto struct_json = R"([ + {"a": null, "b": "yo"}, + {"a": 1, "b": ""}, + {"a": 2, "b": "hello"}, + {"a": 4, "b": "eh"} + ])"; + this->AssertTake(schm, struct_json, "[]", "[]"); + this->AssertTake(schm, struct_json, "[3, 1, 3, 1, 3]", R"([ + {"a": 4, "b": "eh"}, + {"a": 1, "b": ""}, + {"a": 4, "b": "eh"}, + {"a": 1, "b": ""}, + {"a": 4, "b": "eh"} + ])"); + this->AssertTake(schm, struct_json, "[3, 1, 0]", R"([ + {"a": 4, "b": "eh"}, + {"a": 1, "b": ""}, + {"a": null, "b": "yo"} + ])"); + this->AssertTake(schm, struct_json, "[0, 1, 2, 3]", struct_json); + this->AssertTake(schm, struct_json, "[0, 2, 2, 2, 2, 2, 2]", R"([ + {"a": null, "b": "yo"}, + {"a": 2, "b": "hello"}, + {"a": 2, "b": "hello"}, + {"a": 2, "b": "hello"}, + {"a": 2, "b": "hello"}, + {"a": 2, "b": "hello"}, + {"a": 2, "b": "hello"} + ])"); +} + } // namespace compute } // namespace arrow diff --git a/r/src/compute.cpp b/r/src/compute.cpp index e92023b034d..d4c46a7778f 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -96,16 +96,11 @@ std::shared_ptr Array__Take(const std::shared_ptr& v std::shared_ptr RecordBatch__Take( const std::shared_ptr& batch, const std::shared_ptr& indices) { - int ncols = batch->num_columns(); - auto nrows = indices->length(); - - std::vector> columns(ncols); - - for (R_xlen_t j = 0; j < ncols; j++) { - columns[j] = Array__Take(batch->column(j), indices); - } - - return arrow::RecordBatch::Make(batch->schema(), nrows, columns); + std::shared_ptr out; + arrow::compute::FunctionContext context; + arrow::compute::TakeOptions options; + STOP_IF_NOT_OK(arrow::compute::Take(&context, *batch, *indices, options, &out)); + return out; } // [[arrow::export]] From 1f5b57376439721774ff3eb8c1a73fd5ead4f62c Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Fri, 18 Oct 2019 11:32:13 -0400 Subject: [PATCH 03/18] Move ChunkedArray Filter (with Array) to cpp (test deferred) --- cpp/src/arrow/compute/kernels/filter.cc | 24 ++++++++++++++++++++ cpp/src/arrow/compute/kernels/filter.h | 18 +++++++++++++++ cpp/src/arrow/compute/kernels/filter_test.cc | 3 +++ r/src/compute.cpp | 17 ++++---------- 4 files changed, 49 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index 2a3a24f0648..10c47bf5887 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -163,5 +163,29 @@ Status Filter(FunctionContext* ctx, const RecordBatch& batch, const Array& filte return Status::OK(); } +Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& filter, + std::shared_ptr* out) { + // Datum out_datum; + // RETURN_NOT_OK(Filter(ctx, Datum(values.data()), Datum(filter.data()), &out_datum)); + // *out = out_datum.make_array(); + // return Status::OK(); + + auto num_chunks = values.num_chunks(); + std::vector> new_chunks(num_chunks); + std::shared_ptr current_chunk; + int offset = 0; + int len; + + for (int i = 0; i < num_chunks; i++) { + current_chunk = values.chunk(i); + len = current_chunk->length(); + RETURN_NOT_OK(Filter(ctx, *current_chunk, *filter.Slice(offset, len), &new_chunks[i])); + offset += len; + } + + *out = std::make_shared(std::move(new_chunks)); + return Status::OK(); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/filter.h b/cpp/src/arrow/compute/kernels/filter.h index f78c2e46df7..5ff246adada 100644 --- a/cpp/src/arrow/compute/kernels/filter.h +++ b/cpp/src/arrow/compute/kernels/filter.h @@ -50,6 +50,24 @@ ARROW_EXPORT Status Filter(FunctionContext* ctx, const Array& values, const Array& filter, std::shared_ptr* out); +/// \brief Filter a chunked array with a boolean selection filter +/// +/// The output chunked array will be populated with values from the input at positions +/// where the selection filter is not 0. Nulls in the filter will result in nulls +/// in the output. +/// +/// For example given values = ["a", "b", "c", null, "e", "f"] and +/// filter = [0, 1, 1, 0, null, 1], the output will be +/// = ["b", "c", null, "f"] +/// +/// \param[in] ctx the FunctionContext +/// \param[in] values chunked array to filter +/// \param[in] filter indicates which values should be filtered out +/// \param[out] out resulting chunked array +ARROW_EXPORT +Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& filter, + std::shared_ptr* out); + /// \brief Filter a record batch with a boolean selection filter /// /// The output record batch's columns will be populated with values from corresponding diff --git a/cpp/src/arrow/compute/kernels/filter_test.cc b/cpp/src/arrow/compute/kernels/filter_test.cc index e332d4d31ed..b05cd0d3b31 100644 --- a/cpp/src/arrow/compute/kernels/filter_test.cc +++ b/cpp/src/arrow/compute/kernels/filter_test.cc @@ -518,5 +518,8 @@ TEST_F(TestFilterKernelWithRecordBatch, FilterRecordBatch) { ])"); } +// TODO (npr): add test for Filter(ChunkedArray, Array) +// See cast_test.cc for test setup with chunked arrays + } // namespace compute } // namespace arrow diff --git a/r/src/compute.cpp b/r/src/compute.cpp index d4c46a7778f..709239c7ba8 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -192,20 +192,11 @@ std::shared_ptr RecordBatch__Filter( std::shared_ptr ChunkedArray__Filter( const std::shared_ptr& values, const std::shared_ptr& filter) { - int num_chunks = values->num_chunks(); - std::vector> new_chunks(num_chunks); - std::shared_ptr current_chunk; - int offset = 0; - int len; - for (R_xlen_t i = 0; i < num_chunks; i++) { - current_chunk = values->chunk(i); - len = current_chunk->length(); - new_chunks[i] = Array__Filter(current_chunk, filter->Slice(offset, len)); - offset += len; - } - - return std::make_shared(std::move(new_chunks)); + std::shared_ptr out; + arrow::compute::FunctionContext context; + STOP_IF_NOT_OK(arrow::compute::Filter(&context, *values, *filter, &out)); + return out; } // [[arrow::export]] From fc7c8dd56f7443a01a3b4b68b181028932a5c071 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Fri, 18 Oct 2019 11:41:47 -0400 Subject: [PATCH 04/18] Move ChunkedArray Filter (with ChunkedArray) to cpp (test deferred) --- cpp/src/arrow/compute/kernels/filter.cc | 35 +++++++++++++++++--- cpp/src/arrow/compute/kernels/filter.h | 18 ++++++++++ cpp/src/arrow/compute/kernels/filter_test.cc | 2 +- r/src/compute.cpp | 29 +++------------- 4 files changed, 53 insertions(+), 31 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index 10c47bf5887..797c1f22c85 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -22,6 +22,7 @@ #include #include +#include "arrow/array/concatenate.h" #include "arrow/builder.h" #include "arrow/compute/kernels/take_internal.h" #include "arrow/record_batch.h" @@ -165,11 +166,6 @@ Status Filter(FunctionContext* ctx, const RecordBatch& batch, const Array& filte Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& filter, std::shared_ptr* out) { - // Datum out_datum; - // RETURN_NOT_OK(Filter(ctx, Datum(values.data()), Datum(filter.data()), &out_datum)); - // *out = out_datum.make_array(); - // return Status::OK(); - auto num_chunks = values.num_chunks(); std::vector> new_chunks(num_chunks); std::shared_ptr current_chunk; @@ -187,5 +183,34 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& fil return Status::OK(); } +Status Filter(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& filter, + std::shared_ptr* out) { + auto num_chunks = values.num_chunks(); + std::vector> new_chunks(num_chunks); + std::shared_ptr current_chunk; + std::shared_ptr current_chunked_filter; + std::shared_ptr current_filter; + int offset = 0; + int len; + + for (int i = 0; i < num_chunks; i++) { + current_chunk = values.chunk(i); + len = current_chunk->length(); + current_chunked_filter = filter.Slice(offset, len); + if (current_chunked_filter->num_chunks() == 1) { + current_filter = current_chunked_filter->chunk(0); + } else { + // Concatenate the chunks of the filter so we have an Array + RETURN_NOT_OK(Concatenate(current_chunked_filter->chunks(), + default_memory_pool(), ¤t_filter)); + } + RETURN_NOT_OK(Filter(ctx, *current_chunk, *current_filter, &new_chunks[i])); + offset += len; + } + + *out = std::make_shared(std::move(new_chunks)); + return Status::OK(); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/filter.h b/cpp/src/arrow/compute/kernels/filter.h index 5ff246adada..ff1eb63cee8 100644 --- a/cpp/src/arrow/compute/kernels/filter.h +++ b/cpp/src/arrow/compute/kernels/filter.h @@ -68,6 +68,24 @@ ARROW_EXPORT Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& filter, std::shared_ptr* out); +/// \brief Filter a chunked array with a boolean selection filter +/// +/// The output chunked array will be populated with values from the input at positions +/// where the selection filter is not 0. Nulls in the filter will result in nulls +/// in the output. +/// +/// For example given values = ["a", "b", "c", null, "e", "f"] and +/// filter = [0, 1, 1, 0, null, 1], the output will be +/// = ["b", "c", null, "f"] +/// +/// \param[in] ctx the FunctionContext +/// \param[in] values chunked array to filter +/// \param[in] filter indicates which values should be filtered out +/// \param[out] out resulting chunked array +ARROW_EXPORT +Status Filter(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& filter, + std::shared_ptr* out); + /// \brief Filter a record batch with a boolean selection filter /// /// The output record batch's columns will be populated with values from corresponding diff --git a/cpp/src/arrow/compute/kernels/filter_test.cc b/cpp/src/arrow/compute/kernels/filter_test.cc index b05cd0d3b31..959adc266fc 100644 --- a/cpp/src/arrow/compute/kernels/filter_test.cc +++ b/cpp/src/arrow/compute/kernels/filter_test.cc @@ -518,7 +518,7 @@ TEST_F(TestFilterKernelWithRecordBatch, FilterRecordBatch) { ])"); } -// TODO (npr): add test for Filter(ChunkedArray, Array) +// TODO (npr): add test for Filter(ChunkedArray, Array) and Filter(ChunkedArray, ChunkedArray) // See cast_test.cc for test setup with chunked arrays } // namespace compute diff --git a/r/src/compute.cpp b/r/src/compute.cpp index 709239c7ba8..a4867e6f45e 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -203,31 +203,10 @@ std::shared_ptr ChunkedArray__Filter( std::shared_ptr ChunkedArray__FilterChunked( const std::shared_ptr& values, const std::shared_ptr& filter) { - int num_chunks = values->num_chunks(); - std::vector> new_chunks(num_chunks); - std::shared_ptr current_chunk; - std::shared_ptr current_chunked_filter; - std::shared_ptr current_filter; - - int offset = 0; - int len; - - for (R_xlen_t i = 0; i < num_chunks; i++) { - current_chunk = values->chunk(i); - len = current_chunk->length(); - current_chunked_filter = filter->Slice(offset, len); - if (current_chunked_filter->num_chunks() == 1) { - current_filter = current_chunked_filter->chunk(0); - } else { - // Concatenate the chunks of the filter so we have an Array - STOP_IF_NOT_OK(arrow::Concatenate(current_chunked_filter->chunks(), - arrow::default_memory_pool(), ¤t_filter)); - } - new_chunks[i] = Array__Filter(current_chunk, current_filter); - offset += len; - } - - return std::make_shared(std::move(new_chunks)); + std::shared_ptr out; + arrow::compute::FunctionContext context; + STOP_IF_NOT_OK(arrow::compute::Filter(&context, *values, *filter, &out)); + return out; } // [[arrow::export]] From ebaa2a3bbf3c5a081a2392d5abc862c37a13be88 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Fri, 18 Oct 2019 13:09:45 -0400 Subject: [PATCH 05/18] Move Table Filter methods to cpp (also pending tests) --- cpp/src/arrow/compute/kernels/filter.cc | 26 ++++++++++++++++++ cpp/src/arrow/compute/kernels/filter.h | 28 ++++++++++++++++++++ cpp/src/arrow/compute/kernels/filter_test.cc | 6 ++++- r/src/compute.cpp | 24 ++++++----------- 4 files changed, 67 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index 797c1f22c85..5fbf8b895ca 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -212,5 +212,31 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArr return Status::OK(); } +Status Filter(FunctionContext* ctx, const Table& table, const Array& filter, + std::shared_ptr* out) { + auto ncols = table.num_columns(); + + std::vector> columns(ncols); + + for (int j = 0; j < ncols; j++) { + RETURN_NOT_OK(Filter(ctx, *table.column(j), filter, &columns[j])); + } + *out = Table::Make(table.schema(), columns); + return Status::OK(); +} + +Status Filter(FunctionContext* ctx, const Table& table, const ChunkedArray& filter, + std::shared_ptr
* out) { + auto ncols = table.num_columns(); + + std::vector> columns(ncols); + + for (int j = 0; j < ncols; j++) { + RETURN_NOT_OK(Filter(ctx, *table.column(j), filter, &columns[j])); + } + *out = Table::Make(table.schema(), columns); + return Status::OK(); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/filter.h b/cpp/src/arrow/compute/kernels/filter.h index ff1eb63cee8..2ada4931bf1 100644 --- a/cpp/src/arrow/compute/kernels/filter.h +++ b/cpp/src/arrow/compute/kernels/filter.h @@ -100,6 +100,34 @@ ARROW_EXPORT Status Filter(FunctionContext* ctx, const RecordBatch& batch, const Array& filter, std::shared_ptr* out); +/// \brief Filter a table with a boolean selection filter +/// +/// The output record batch's columns will be populated with values from corresponding +/// columns of the input at positions where the selection filter is not 0. Nulls in the +/// filter will result in nulls in the output. +/// +/// \param[in] ctx the FunctionContext +/// \param[in] table record batch to filter +/// \param[in] filter indicates which values should be filtered out +/// \param[out] out resulting record batch +ARROW_EXPORT +Status Filter(FunctionContext* ctx, const Table& table, const Array& filter, + std::shared_ptr
* out); + +/// \brief Filter a table with a boolean selection filter +/// +/// The output record batch's columns will be populated with values from corresponding +/// columns of the input at positions where the selection filter is not 0. Nulls in the +/// filter will result in nulls in the output. +/// +/// \param[in] ctx the FunctionContext +/// \param[in] table record batch to filter +/// \param[in] filter indicates which values should be filtered out +/// \param[out] out resulting record batch +ARROW_EXPORT +Status Filter(FunctionContext* ctx, const Table& table, const ChunkedArray& filter, + std::shared_ptr
* out); + /// \brief Filter an array with a boolean selection filter /// /// \param[in] ctx the FunctionContext diff --git a/cpp/src/arrow/compute/kernels/filter_test.cc b/cpp/src/arrow/compute/kernels/filter_test.cc index 959adc266fc..6437fc69d67 100644 --- a/cpp/src/arrow/compute/kernels/filter_test.cc +++ b/cpp/src/arrow/compute/kernels/filter_test.cc @@ -518,7 +518,11 @@ TEST_F(TestFilterKernelWithRecordBatch, FilterRecordBatch) { ])"); } -// TODO (npr): add test for Filter(ChunkedArray, Array) and Filter(ChunkedArray, ChunkedArray) +// TODO (npr): add tests for: +// * Filter(ChunkedArray, Array) +// * Filter(ChunkedArray, ChunkedArray) +// * Filter(Table, Array) +// * Filter(Table, ChunkedArray) // See cast_test.cc for test setup with chunked arrays } // namespace compute diff --git a/r/src/compute.cpp b/r/src/compute.cpp index a4867e6f45e..2a0e4fd2574 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -212,27 +212,19 @@ std::shared_ptr ChunkedArray__FilterChunked( // [[arrow::export]] std::shared_ptr Table__Filter(const std::shared_ptr& table, const std::shared_ptr& filter) { - auto ncols = table->num_columns(); - std::vector> columns(ncols); - - for (R_xlen_t j = 0; j < ncols; j++) { - columns[j] = ChunkedArray__Filter(table->column(j), filter); - } - - return arrow::Table::Make(table->schema(), columns); + std::shared_ptr out; + arrow::compute::FunctionContext context; + STOP_IF_NOT_OK(arrow::compute::Filter(&context, *table, *filter, &out)); + return out; } // [[arrow::export]] std::shared_ptr Table__FilterChunked( const std::shared_ptr& table, const std::shared_ptr& filter) { - auto ncols = table->num_columns(); - std::vector> columns(ncols); - - for (R_xlen_t j = 0; j < ncols; j++) { - columns[j] = ChunkedArray__FilterChunked(table->column(j), filter); - } - - return arrow::Table::Make(table->schema(), columns); + std::shared_ptr out; + arrow::compute::FunctionContext context; + STOP_IF_NOT_OK(arrow::compute::Filter(&context, *table, *filter, &out)); + return out; } #endif From d632b8dd6c7e2a02094d6a25e0a91d05f9d5f260 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Fri, 18 Oct 2019 14:47:33 -0400 Subject: [PATCH 06/18] Take(ChunkedArray, Array); needs test and should handle more cases without concatenation --- cpp/src/arrow/compute/kernels/take.cc | 24 ++++++++++++ cpp/src/arrow/compute/kernels/take.h | 20 ++++++++++ r/R/chunked-array.R | 10 ++--- r/R/table.R | 10 ++--- r/src/arrowExports.cpp | 8 ++-- r/src/compute.cpp | 56 ++++----------------------- 6 files changed, 65 insertions(+), 63 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index 8618e99aeab..a787de538c0 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -19,6 +19,7 @@ #include #include +#include "arrow/array/concatenate.h" #include "arrow/compute/kernels/take.h" #include "arrow/compute/kernels/take_internal.h" #include "arrow/util/logging.h" @@ -99,6 +100,29 @@ Status Take(FunctionContext* ctx, const Datum& values, const Datum& indices, return kernel->Call(ctx, values, indices, out); } +Status Take(FunctionContext* ctx, const ChunkedArray& values, const Array& indices, + const TakeOptions& options, std::shared_ptr* out) { + auto num_chunks = values.num_chunks(); + std::vector> new_chunks(1); // Hard-coded 1 for now + std::shared_ptr current_chunk; + + // Case 1: `values` has a single chunk, so just use it + if (num_chunks == 1) { + current_chunk = values.chunk(0); + } else { + // TODO Case 2: See if all `indices` fall in the same chunk and call Array Take on it + // See https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151 + // TODO Case 3: If indices are sorted, can slice them and call Array Take + + // Case 4: Else, concatenate chunks and call Array Take + RETURN_NOT_OK(Concatenate(values.chunks(), default_memory_pool(), ¤t_chunk)); + } + // Call Array Take on our single chunk + RETURN_NOT_OK(Take(ctx, *current_chunk, indices, options, &new_chunks[0])); + *out = std::make_shared(std::move(new_chunks)); + return Status::OK(); +} + Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices, const TakeOptions& options, std::shared_ptr* out) { diff --git a/cpp/src/arrow/compute/kernels/take.h b/cpp/src/arrow/compute/kernels/take.h index 5a297a63278..f6bab6716b0 100644 --- a/cpp/src/arrow/compute/kernels/take.h +++ b/cpp/src/arrow/compute/kernels/take.h @@ -53,6 +53,26 @@ ARROW_EXPORT Status Take(FunctionContext* ctx, const Array& values, const Array& indices, const TakeOptions& options, std::shared_ptr* out); +/// \brief Take from a chunked array of values at indices in another array +/// +/// The output chunked array will be of the same type as the input values +/// array, with elements taken from the values array at the given +/// indices. If an index is null then the taken element will be null. +/// +/// For example given values = ["a", "b", "c", null, "e", "f"] and +/// indices = [2, 1, null, 3], the output will be +/// = [values[2], values[1], null, values[3]] +/// = ["c", "b", null, null] +/// +/// \param[in] ctx the FunctionContext +/// \param[in] values chunked array from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting chunked array +ARROW_EXPORT +Status Take(FunctionContext* ctx, const ChunkedArray& values, const Array& indices, + const TakeOptions& options, std::shared_ptr* out); + /// \brief Take from a record batch at indices in another array /// /// The output batch will have the same schema as the input batch, diff --git a/r/R/chunked-array.R b/r/R/chunked-array.R index 6e977f2422a..03876481201 100644 --- a/r/R/chunked-array.R +++ b/r/R/chunked-array.R @@ -69,13 +69,13 @@ ChunkedArray <- R6Class("ChunkedArray", inherit = Object, } }, Take = function(i) { - if (inherits(i, c("Array", "ChunkedArray"))) { - # Hack because ChunkedArray__Take doesn't take Arrays - i <- as.vector(i) - } else if (is.numeric(i)) { + if (is.numeric(i)) { i <- as.integer(i) } - assert_is(i, "integer") + if (is.integer(i)) { + i <- Array$create(i) + } + assert_is(i, "Array") return(shared_ptr(ChunkedArray, ChunkedArray__Take(self, i))) }, Filter = function(i) { diff --git a/r/R/table.R b/r/R/table.R index 16a869abe95..7c0fc79c201 100644 --- a/r/R/table.R +++ b/r/R/table.R @@ -139,13 +139,13 @@ Table <- R6Class("Table", inherit = Object, } }, Take = function(i) { - if (inherits(i, c("Array", "ChunkedArray"))) { - # Hack because ChunkedArray__Take doesn't take Arrays - i <- as.vector(i) - } else if (is.numeric(i)) { + if (is.numeric(i)) { i <- as.integer(i) } - assert_is(i, "integer") + if (is.integer(i)) { + i <- Array$create(i) + } + assert_is(i, "Array") shared_ptr(Table, Table__Take(self, i)) }, Filter = function(i) { diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index ac7777f5af5..56f83b551c0 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1086,11 +1086,11 @@ RcppExport SEXP _arrow_RecordBatch__Take(SEXP batch_sexp, SEXP indices_sexp){ // compute.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr ChunkedArray__Take(const std::shared_ptr& values, Rcpp::IntegerVector& indices); +std::shared_ptr ChunkedArray__Take(const std::shared_ptr& values, const std::shared_ptr& indices); RcppExport SEXP _arrow_ChunkedArray__Take(SEXP values_sexp, SEXP indices_sexp){ BEGIN_RCPP Rcpp::traits::input_parameter&>::type values(values_sexp); - Rcpp::traits::input_parameter::type indices(indices_sexp); + Rcpp::traits::input_parameter&>::type indices(indices_sexp); return Rcpp::wrap(ChunkedArray__Take(values, indices)); END_RCPP } @@ -1102,11 +1102,11 @@ RcppExport SEXP _arrow_ChunkedArray__Take(SEXP values_sexp, SEXP indices_sexp){ // compute.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr Table__Take(const std::shared_ptr& table, Rcpp::IntegerVector& indices); +std::shared_ptr Table__Take(const std::shared_ptr& table, const std::shared_ptr& indices); RcppExport SEXP _arrow_Table__Take(SEXP table_sexp, SEXP indices_sexp){ BEGIN_RCPP Rcpp::traits::input_parameter&>::type table(table_sexp); - Rcpp::traits::input_parameter::type indices(indices_sexp); + Rcpp::traits::input_parameter&>::type indices(indices_sexp); return Rcpp::wrap(Table__Take(table, indices)); END_RCPP } diff --git a/r/src/compute.cpp b/r/src/compute.cpp index 2a0e4fd2574..0923c1ae576 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -105,60 +105,18 @@ std::shared_ptr RecordBatch__Take( // [[arrow::export]] std::shared_ptr ChunkedArray__Take( - const std::shared_ptr& values, Rcpp::IntegerVector& indices) { - int num_chunks = values->num_chunks(); - std::vector> new_chunks(1); // Hard-coded 1 for now - // 1) If there's only one chunk, just take from it - if (num_chunks == 1) { - new_chunks[0] = Array__Take( - values->chunk(0), arrow::r::Array__from_vector(indices, arrow::int32(), true)); - return std::make_shared(std::move(new_chunks)); - } - - std::shared_ptr current_chunk; - std::shared_ptr current_indices; - int offset = 0; - int len; - int min_i = indices[0]; - int max_i = indices[0]; - - // 2) See if all i are in the same chunk, call Array__Take on that - for (R_xlen_t i = 1; i < indices.size(); i++) { - if (indices[i] < min_i) { - min_i = indices[i]; - } else if (indices[i] > max_i) { - max_i = indices[i]; - } - } - for (R_xlen_t chk = 0; chk < num_chunks; chk++) { - current_chunk = values->chunk(chk); - len = current_chunk->length(); - if (min_i >= offset && max_i < offset + len) { - for (R_xlen_t i = 0; i < indices.size(); i++) { - // Subtract offset from all indices - indices[i] -= offset; - } - current_indices = arrow::r::Array__from_vector(indices, arrow::int32(), true); - new_chunks[0] = Array__Take(current_chunk, current_indices); - return std::make_shared(std::move(new_chunks)); - } - offset += len; - } - - // TODO 3) If they're not all in the same chunk but are sorted, we can slice - // the indices (offset appropriately) and take from each chunk + const std::shared_ptr& values, const std::shared_ptr& indices) { + std::shared_ptr out; + arrow::compute::FunctionContext context; + arrow::compute::TakeOptions options; - // 4) Last resort: concatenate the chunks - STOP_IF_NOT_OK( - arrow::Concatenate(values->chunks(), arrow::default_memory_pool(), ¤t_chunk)); - current_indices = arrow::r::Array__from_vector(indices, arrow::int32(), true); - new_chunks[0] = Array__Take(current_chunk, current_indices); - return std::make_shared(std::move(new_chunks)); + STOP_IF_NOT_OK(arrow::compute::Take(&context, *values, *indices, options, &out)); + return out; } // [[arrow::export]] std::shared_ptr Table__Take(const std::shared_ptr& table, - Rcpp::IntegerVector& indices) { + const std::shared_ptr& indices) { auto ncols = table->num_columns(); std::vector> columns(ncols); From 9ae88d8ea490f9d6f2a3687a33e707f9a5af0757 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Fri, 18 Oct 2019 15:07:51 -0400 Subject: [PATCH 07/18] Take(ChunkedArray, ChunkedArray), with R test --- cpp/src/arrow/compute/kernels/take.cc | 17 +++++++++++++++++ cpp/src/arrow/compute/kernels/take.h | 21 +++++++++++++++++++++ r/R/arrowExports.R | 4 ++++ r/R/chunked-array.R | 3 +++ r/src/arrowExports.cpp | 17 +++++++++++++++++ r/src/compute.cpp | 11 +++++++++++ r/tests/testthat/test-chunked-array.R | 6 ++++++ 7 files changed, 79 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index a787de538c0..922bd1dc21d 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -123,6 +123,23 @@ Status Take(FunctionContext* ctx, const ChunkedArray& values, const Array& indic return Status::OK(); } +// TODO: There should be Take(Array, ChunkedArray) too +Status Take(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& indices, + const TakeOptions& options, std::shared_ptr* out) { + auto num_chunks = indices.num_chunks(); + std::vector> new_chunks(num_chunks); + std::shared_ptr current_chunk; + + for (int i = 0; i < num_chunks; i++) { + // Take with that indices chunk + RETURN_NOT_OK(Take(ctx, values, *indices.chunk(i), options, ¤t_chunk)); + // Concatenate the result to make a single array for this chunk + RETURN_NOT_OK(Concatenate(current_chunk->chunks(), default_memory_pool(), &new_chunks[i])); + } + *out = std::make_shared(std::move(new_chunks)); + return Status::OK(); +} + Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices, const TakeOptions& options, std::shared_ptr* out) { diff --git a/cpp/src/arrow/compute/kernels/take.h b/cpp/src/arrow/compute/kernels/take.h index f6bab6716b0..1d22a23aae1 100644 --- a/cpp/src/arrow/compute/kernels/take.h +++ b/cpp/src/arrow/compute/kernels/take.h @@ -73,6 +73,27 @@ ARROW_EXPORT Status Take(FunctionContext* ctx, const ChunkedArray& values, const Array& indices, const TakeOptions& options, std::shared_ptr* out); +/// \brief Take from a chunked array of values at indices in another chunked array +/// +/// The output chunked array will be of the same type as the input values +/// array, with elements taken from the values array at the given +/// indices. If an index is null then the taken element will be null. +/// The chunks in the output array will align with the chunks in the indices. +/// +/// For example given values = ["a", "b", "c", null, "e", "f"] and +/// indices = [2, 1, null, 3], the output will be +/// = [values[2], values[1], null, values[3]] +/// = ["c", "b", null, null] +/// +/// \param[in] ctx the FunctionContext +/// \param[in] values chunked array from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting chunked array +ARROW_EXPORT +Status Take(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& indices, + const TakeOptions& options, std::shared_ptr* out); + /// \brief Take from a record batch at indices in another array /// /// The output batch will have the same schema as the input batch, diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index e68a577ab76..a2265acbe0f 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -280,6 +280,10 @@ ChunkedArray__Take <- function(values, indices){ .Call(`_arrow_ChunkedArray__Take` , values, indices) } +ChunkedArray__TakeChunked <- function(values, indices){ + .Call(`_arrow_ChunkedArray__TakeChunked` , values, indices) +} + Table__Take <- function(table, indices){ .Call(`_arrow_Table__Take` , table, indices) } diff --git a/r/R/chunked-array.R b/r/R/chunked-array.R index 03876481201..5711a4627e2 100644 --- a/r/R/chunked-array.R +++ b/r/R/chunked-array.R @@ -75,6 +75,9 @@ ChunkedArray <- R6Class("ChunkedArray", inherit = Object, if (is.integer(i)) { i <- Array$create(i) } + if (inherits(i, "ChunkedArray")) { + return(shared_ptr(ChunkedArray, ChunkedArray__TakeChunked(self, i))) + } assert_is(i, "Array") return(shared_ptr(ChunkedArray, ChunkedArray__Take(self, i))) }, diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 56f83b551c0..83c878b6be7 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1100,6 +1100,22 @@ RcppExport SEXP _arrow_ChunkedArray__Take(SEXP values_sexp, SEXP indices_sexp){ } #endif +// compute.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr ChunkedArray__TakeChunked(const std::shared_ptr& values, const std::shared_ptr& indices); +RcppExport SEXP _arrow_ChunkedArray__TakeChunked(SEXP values_sexp, SEXP indices_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type values(values_sexp); + Rcpp::traits::input_parameter&>::type indices(indices_sexp); + return Rcpp::wrap(ChunkedArray__TakeChunked(values, indices)); +END_RCPP +} +#else +RcppExport SEXP _arrow_ChunkedArray__TakeChunked(SEXP values_sexp, SEXP indices_sexp){ + Rf_error("Cannot call ChunkedArray__TakeChunked(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // compute.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr Table__Take(const std::shared_ptr& table, const std::shared_ptr& indices); @@ -5035,6 +5051,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_Array__Take", (DL_FUNC) &_arrow_Array__Take, 2}, { "_arrow_RecordBatch__Take", (DL_FUNC) &_arrow_RecordBatch__Take, 2}, { "_arrow_ChunkedArray__Take", (DL_FUNC) &_arrow_ChunkedArray__Take, 2}, + { "_arrow_ChunkedArray__TakeChunked", (DL_FUNC) &_arrow_ChunkedArray__TakeChunked, 2}, { "_arrow_Table__Take", (DL_FUNC) &_arrow_Table__Take, 2}, { "_arrow_Array__Filter", (DL_FUNC) &_arrow_Array__Filter, 2}, { "_arrow_RecordBatch__Filter", (DL_FUNC) &_arrow_RecordBatch__Filter, 2}, diff --git a/r/src/compute.cpp b/r/src/compute.cpp index 0923c1ae576..73377441a81 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -114,6 +114,17 @@ std::shared_ptr ChunkedArray__Take( return out; } +// [[arrow::export]] +std::shared_ptr ChunkedArray__TakeChunked( + const std::shared_ptr& values, const std::shared_ptr& indices) { + std::shared_ptr out; + arrow::compute::FunctionContext context; + arrow::compute::TakeOptions options; + + STOP_IF_NOT_OK(arrow::compute::Take(&context, *values, *indices, options, &out)); + return out; +} + // [[arrow::export]] std::shared_ptr Table__Take(const std::shared_ptr& table, const std::shared_ptr& indices) { diff --git a/r/tests/testthat/test-chunked-array.R b/r/tests/testthat/test-chunked-array.R index 02a92612799..1fa399db936 100644 --- a/r/tests/testthat/test-chunked-array.R +++ b/r/tests/testthat/test-chunked-array.R @@ -361,6 +361,12 @@ test_that("[ ChunkedArray", { expect_vector(x[c(11, 15, 12)], c(31, 35, 32)) # Take from multiple chunks (calls Concatenate) expect_vector(x[c(2, 11, 15, 12, 3)], c(2, 31, 35, 32, 3)) + # Take with Array (note these are 0-based) + take1 <- Array$create(c(10L, 14L, 11L)) + expect_vector(x[take1], c(31, 35, 32)) + # Take with ChunkedArray + take2 <- ChunkedArray$create(c(10L, 14L), 11L) + expect_vector(x[take2], c(31, 35, 32)) # Filter (with recycling) expect_vector( From 3f01c9f7267d052f7f1853ff78ec004b949e4d93 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Fri, 18 Oct 2019 15:32:03 -0400 Subject: [PATCH 08/18] Take(Array, ChunkedArray), with R test --- cpp/src/arrow/compute/kernels/take.cc | 14 +++++++++++++- cpp/src/arrow/compute/kernels/take.h | 23 ++++++++++++++++++++++- r/R/array.R | 5 ++++- r/R/arrowExports.R | 4 ++++ r/src/arrowExports.cpp | 17 +++++++++++++++++ r/src/compute.cpp | 11 +++++++++++ r/tests/testthat/test-Array.R | 6 +----- 7 files changed, 72 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index 922bd1dc21d..f4feb215557 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -123,7 +123,6 @@ Status Take(FunctionContext* ctx, const ChunkedArray& values, const Array& indic return Status::OK(); } -// TODO: There should be Take(Array, ChunkedArray) too Status Take(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& indices, const TakeOptions& options, std::shared_ptr* out) { auto num_chunks = indices.num_chunks(); @@ -140,6 +139,19 @@ Status Take(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray return Status::OK(); } +Status Take(FunctionContext* ctx, const Array& values, const ChunkedArray& indices, + const TakeOptions& options, std::shared_ptr* out) { + auto num_chunks = indices.num_chunks(); + std::vector> new_chunks(num_chunks); + + for (int i = 0; i < num_chunks; i++) { + // Take with that indices chunk + RETURN_NOT_OK(Take(ctx, values, *indices.chunk(i), options, &new_chunks[i])); + } + *out = std::make_shared(std::move(new_chunks)); + return Status::OK(); +} + Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices, const TakeOptions& options, std::shared_ptr* out) { diff --git a/cpp/src/arrow/compute/kernels/take.h b/cpp/src/arrow/compute/kernels/take.h index 1d22a23aae1..5f92ff35e3c 100644 --- a/cpp/src/arrow/compute/kernels/take.h +++ b/cpp/src/arrow/compute/kernels/take.h @@ -73,7 +73,7 @@ ARROW_EXPORT Status Take(FunctionContext* ctx, const ChunkedArray& values, const Array& indices, const TakeOptions& options, std::shared_ptr* out); -/// \brief Take from a chunked array of values at indices in another chunked array +/// \brief Take from a chunked array of values at indices in a chunked array /// /// The output chunked array will be of the same type as the input values /// array, with elements taken from the values array at the given @@ -94,6 +94,27 @@ ARROW_EXPORT Status Take(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& indices, const TakeOptions& options, std::shared_ptr* out); +/// \brief Take from an array of values at indices in a chunked array +/// +/// The output chunked array will be of the same type as the input values +/// array, with elements taken from the values array at the given +/// indices. If an index is null then the taken element will be null. +/// The chunks in the output array will align with the chunks in the indices. +/// +/// For example given values = ["a", "b", "c", null, "e", "f"] and +/// indices = [2, 1, null, 3], the output will be +/// = [values[2], values[1], null, values[3]] +/// = ["c", "b", null, null] +/// +/// \param[in] ctx the FunctionContext +/// \param[in] values array from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting chunked array +ARROW_EXPORT +Status Take(FunctionContext* ctx, const Array& values, const ChunkedArray& indices, + const TakeOptions& options, std::shared_ptr* out); + /// \brief Take from a record batch at indices in another array /// /// The output batch will have the same schema as the input batch, diff --git a/r/R/array.R b/r/R/array.R index 031b38ad6d1..d05d91d35a5 100644 --- a/r/R/array.R +++ b/r/R/array.R @@ -100,7 +100,10 @@ Array <- R6Class("Array", if (is.integer(i)) { i <- Array$create(i) } - assert_is(i, "Array") # Support ChunkedArray too? + if (inherits(i, "ChunkedArray")) { + return(shared_ptr(ChunkedArray, Array__TakeChunked(self, i))) + } + assert_is(i, "Array") shared_ptr(Array, Array__Take(self, i)) }, Filter = function(i) { diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index a2265acbe0f..561f68b2186 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -272,6 +272,10 @@ Array__Take <- function(values, indices){ .Call(`_arrow_Array__Take` , values, indices) } +Array__TakeChunked <- function(values, indices){ + .Call(`_arrow_Array__TakeChunked` , values, indices) +} + RecordBatch__Take <- function(batch, indices){ .Call(`_arrow_RecordBatch__Take` , batch, indices) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 83c878b6be7..3c560eb407b 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1068,6 +1068,22 @@ RcppExport SEXP _arrow_Array__Take(SEXP values_sexp, SEXP indices_sexp){ } #endif +// compute.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr Array__TakeChunked(const std::shared_ptr& values, const std::shared_ptr& indices); +RcppExport SEXP _arrow_Array__TakeChunked(SEXP values_sexp, SEXP indices_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type values(values_sexp); + Rcpp::traits::input_parameter&>::type indices(indices_sexp); + return Rcpp::wrap(Array__TakeChunked(values, indices)); +END_RCPP +} +#else +RcppExport SEXP _arrow_Array__TakeChunked(SEXP values_sexp, SEXP indices_sexp){ + Rf_error("Cannot call Array__TakeChunked(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // compute.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr RecordBatch__Take(const std::shared_ptr& batch, const std::shared_ptr& indices); @@ -5049,6 +5065,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_RecordBatch__cast", (DL_FUNC) &_arrow_RecordBatch__cast, 3}, { "_arrow_Table__cast", (DL_FUNC) &_arrow_Table__cast, 3}, { "_arrow_Array__Take", (DL_FUNC) &_arrow_Array__Take, 2}, + { "_arrow_Array__TakeChunked", (DL_FUNC) &_arrow_Array__TakeChunked, 2}, { "_arrow_RecordBatch__Take", (DL_FUNC) &_arrow_RecordBatch__Take, 2}, { "_arrow_ChunkedArray__Take", (DL_FUNC) &_arrow_ChunkedArray__Take, 2}, { "_arrow_ChunkedArray__TakeChunked", (DL_FUNC) &_arrow_ChunkedArray__TakeChunked, 2}, diff --git a/r/src/compute.cpp b/r/src/compute.cpp index 73377441a81..51eb51c3cf2 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -92,6 +92,17 @@ std::shared_ptr Array__Take(const std::shared_ptr& v return out; } +// [[arrow::export]] +std::shared_ptr Array__TakeChunked( + const std::shared_ptr& values, const std::shared_ptr& indices) { + std::shared_ptr out; + arrow::compute::FunctionContext context; + arrow::compute::TakeOptions options; + + STOP_IF_NOT_OK(arrow::compute::Take(&context, *values, *indices, options, &out)); + return out; +} + // [[arrow::export]] std::shared_ptr RecordBatch__Take( const std::shared_ptr& batch, diff --git a/r/tests/testthat/test-Array.R b/r/tests/testthat/test-Array.R index 73677705afd..d5b7e7f69e7 100644 --- a/r/tests/testthat/test-Array.R +++ b/r/tests/testthat/test-Array.R @@ -499,11 +499,7 @@ test_that("[ accepts Arrays and otherwise handles bad input", { ) expect_vector(a[Array$create(ind - 1, type = int8())], vec[ind]) expect_vector(a[Array$create(ind - 1, type = uint8())], vec[ind]) - expect_error( - # Not currently supported - a[ChunkedArray$create(8, 2, 4, type = uint8())], - 'i must be a "Array"' - ) + expect_vector(a[ChunkedArray$create(8, 2, 4, type = uint8())], vec[ind]) filt <- seq_along(vec) %in% ind expect_vector(a[Array$create(filt)], vec[filt]) From e43617ae0f4873dd05282f9076a757421a99b085 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Fri, 18 Oct 2019 15:49:25 -0400 Subject: [PATCH 09/18] Take(Table, Array) and Take(Table, ChunkedArray), with R tests --- cpp/src/arrow/compute/kernels/take.cc | 26 ++++++++++++++++++-- cpp/src/arrow/compute/kernels/take.h | 34 +++++++++++++++++++++++++-- r/R/arrowExports.R | 4 ++++ r/R/table.R | 3 +++ r/src/arrowExports.cpp | 17 ++++++++++++++ r/src/compute.cpp | 21 ++++++++++++----- r/tests/testthat/test-Table.R | 2 ++ 7 files changed, 97 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index f4feb215557..4ca20b4a4ef 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -154,8 +154,6 @@ Status Take(FunctionContext* ctx, const Array& values, const ChunkedArray& indic Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices, const TakeOptions& options, std::shared_ptr* out) { - - auto ncols = batch.num_columns(); auto nrows = indices.length(); @@ -168,5 +166,29 @@ Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices return Status::OK(); } +Status Take(FunctionContext* ctx, const Table& table, const Array& indices, + const TakeOptions& options, std::shared_ptr
* out) { + auto ncols = table.num_columns(); + std::vector> columns(ncols); + + for (int j = 0; j < ncols; j++) { + RETURN_NOT_OK(Take(ctx, *table.column(j), indices, options, &columns[j])); + } + *out = Table::Make(table.schema(), columns); + return Status::OK(); +} + +Status Take(FunctionContext* ctx, const Table& table, const ChunkedArray& indices, + const TakeOptions& options, std::shared_ptr
* out) { + auto ncols = table.num_columns(); + std::vector> columns(ncols); + + for (int j = 0; j < ncols; j++) { + RETURN_NOT_OK(Take(ctx, *table.column(j), indices, options, &columns[j])); + } + *out = Table::Make(table.schema(), columns); + return Status::OK(); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/take.h b/cpp/src/arrow/compute/kernels/take.h index 5f92ff35e3c..b9c11beed2d 100644 --- a/cpp/src/arrow/compute/kernels/take.h +++ b/cpp/src/arrow/compute/kernels/take.h @@ -118,11 +118,11 @@ Status Take(FunctionContext* ctx, const Array& values, const ChunkedArray& indic /// \brief Take from a record batch at indices in another array /// /// The output batch will have the same schema as the input batch, -/// with rows taken from the values array at the given +/// with rows taken from the columns in the batch at the given /// indices. If an index is null then the taken element will be null. /// /// \param[in] ctx the FunctionContext -/// \param[in] batch array from which to take +/// \param[in] batch record batch from which to take /// \param[in] indices which values to take /// \param[in] options options /// \param[out] out resulting record batch @@ -130,6 +130,36 @@ ARROW_EXPORT Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices, const TakeOptions& options, std::shared_ptr* out); +/// \brief Take from a table at indices in an array +/// +/// The output table will have the same schema as the input table, +/// with rows taken from the columns in the table at the given +/// indices. If an index is null then the taken element will be null. +/// +/// \param[in] ctx the FunctionContext +/// \param[in] table table from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting table +ARROW_EXPORT +Status Take(FunctionContext* ctx, const Table& table, const Array& indices, + const TakeOptions& options, std::shared_ptr
* out); + +/// \brief Take from a table at indices in a chunked array +/// +/// The output table will have the same schema as the input table, +/// with rows taken from the values array at the given +/// indices. If an index is null then the taken element will be null. +/// +/// \param[in] ctx the FunctionContext +/// \param[in] table table from which to take +/// \param[in] indices which values to take +/// \param[in] options options +/// \param[out] out resulting table +ARROW_EXPORT +Status Take(FunctionContext* ctx, const Table& table, const ChunkedArray& indices, + const TakeOptions& options, std::shared_ptr
* out); + /// \brief Take from an array of values at indices in another array /// /// \param[in] ctx the FunctionContext diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 561f68b2186..7bf434a2c3e 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -292,6 +292,10 @@ Table__Take <- function(table, indices){ .Call(`_arrow_Table__Take` , table, indices) } +Table__TakeChunked <- function(table, indices){ + .Call(`_arrow_Table__TakeChunked` , table, indices) +} + Array__Filter <- function(values, filter){ .Call(`_arrow_Array__Filter` , values, filter) } diff --git a/r/R/table.R b/r/R/table.R index 7c0fc79c201..3732e1447d1 100644 --- a/r/R/table.R +++ b/r/R/table.R @@ -145,6 +145,9 @@ Table <- R6Class("Table", inherit = Object, if (is.integer(i)) { i <- Array$create(i) } + if (inherits(i, "ChunkedArray")) { + return(shared_ptr(Table, Table__TakeChunked(self, i))) + } assert_is(i, "Array") shared_ptr(Table, Table__Take(self, i)) }, diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 3c560eb407b..3fd3d7a8b30 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1148,6 +1148,22 @@ RcppExport SEXP _arrow_Table__Take(SEXP table_sexp, SEXP indices_sexp){ } #endif +// compute.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr Table__TakeChunked(const std::shared_ptr& table, const std::shared_ptr& indices); +RcppExport SEXP _arrow_Table__TakeChunked(SEXP table_sexp, SEXP indices_sexp){ +BEGIN_RCPP + Rcpp::traits::input_parameter&>::type table(table_sexp); + Rcpp::traits::input_parameter&>::type indices(indices_sexp); + return Rcpp::wrap(Table__TakeChunked(table, indices)); +END_RCPP +} +#else +RcppExport SEXP _arrow_Table__TakeChunked(SEXP table_sexp, SEXP indices_sexp){ + Rf_error("Cannot call Table__TakeChunked(). Please use arrow::install_arrow() to install required runtime libraries. "); +} +#endif + // compute.cpp #if defined(ARROW_R_WITH_ARROW) std::shared_ptr Array__Filter(const std::shared_ptr& values, const std::shared_ptr& filter); @@ -5070,6 +5086,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ChunkedArray__Take", (DL_FUNC) &_arrow_ChunkedArray__Take, 2}, { "_arrow_ChunkedArray__TakeChunked", (DL_FUNC) &_arrow_ChunkedArray__TakeChunked, 2}, { "_arrow_Table__Take", (DL_FUNC) &_arrow_Table__Take, 2}, + { "_arrow_Table__TakeChunked", (DL_FUNC) &_arrow_Table__TakeChunked, 2}, { "_arrow_Array__Filter", (DL_FUNC) &_arrow_Array__Filter, 2}, { "_arrow_RecordBatch__Filter", (DL_FUNC) &_arrow_RecordBatch__Filter, 2}, { "_arrow_ChunkedArray__Filter", (DL_FUNC) &_arrow_ChunkedArray__Filter, 2}, diff --git a/r/src/compute.cpp b/r/src/compute.cpp index 51eb51c3cf2..31ba18d2ad2 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -139,14 +139,23 @@ std::shared_ptr ChunkedArray__TakeChunked( // [[arrow::export]] std::shared_ptr Table__Take(const std::shared_ptr& table, const std::shared_ptr& indices) { - auto ncols = table->num_columns(); - std::vector> columns(ncols); + std::shared_ptr out; + arrow::compute::FunctionContext context; + arrow::compute::TakeOptions options; - for (R_xlen_t j = 0; j < ncols; j++) { - columns[j] = ChunkedArray__Take(table->column(j), indices); - } + STOP_IF_NOT_OK(arrow::compute::Take(&context, *table, *indices, options, &out)); + return out; +} - return arrow::Table::Make(table->schema(), columns); +// [[arrow::export]] +std::shared_ptr Table__TakeChunked(const std::shared_ptr& table, + const std::shared_ptr& indices) { + std::shared_ptr out; + arrow::compute::FunctionContext context; + arrow::compute::TakeOptions options; + + STOP_IF_NOT_OK(arrow::compute::Take(&context, *table, *indices, options, &out)); + return out; } // [[arrow::export]] diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R index fbc6274cd03..ff3bff4f191 100644 --- a/r/tests/testthat/test-Table.R +++ b/r/tests/testthat/test-Table.R @@ -110,6 +110,8 @@ test_that("[, [[, $ for Table", { expect_data_frame(tab[ca,], tbl[c(1, 3, 4, 8, 9),]) # int Array expect_data_frame(tab[Array$create(5:6), 2:4], tbl[6:7, 2:4]) + # ChunkedArray + expect_data_frame(tab[ChunkedArray$create(5L, 6L), 2:4], tbl[6:7, 2:4]) # Expression expect_data_frame(tab[tab$int > 6,], tbl[tbl$int > 6,]) From e00776005a26c116931d00f85f52d96648ffdb2c Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Tue, 22 Oct 2019 11:14:17 -0700 Subject: [PATCH 10/18] Add ChunkedArray tests for Filter and Take --- cpp/src/arrow/compute/kernels/filter.cc | 27 +++++++--- cpp/src/arrow/compute/kernels/filter_test.cc | 53 ++++++++++++++++-- cpp/src/arrow/compute/kernels/take_test.cc | 56 ++++++++++++++++++++ cpp/src/arrow/testing/gtest_util.cc | 9 ++++ cpp/src/arrow/testing/gtest_util.h | 6 +++ 5 files changed, 140 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index 5fbf8b895ca..cbb885b16d8 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -166,6 +166,9 @@ Status Filter(FunctionContext* ctx, const RecordBatch& batch, const Array& filte Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& filter, std::shared_ptr* out) { + if (values.length() != filter.length()) { + return Status::Invalid("filter and value array must have identical lengths"); + } auto num_chunks = values.num_chunks(); std::vector> new_chunks(num_chunks); std::shared_ptr current_chunk; @@ -185,6 +188,9 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& fil Status Filter(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& filter, std::shared_ptr* out) { + if (values.length() != filter.length()) { + return Status::Invalid("filter and value array must have identical lengths"); + } auto num_chunks = values.num_chunks(); std::vector> new_chunks(num_chunks); std::shared_ptr current_chunk; @@ -196,16 +202,21 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArr for (int i = 0; i < num_chunks; i++) { current_chunk = values.chunk(i); len = current_chunk->length(); - current_chunked_filter = filter.Slice(offset, len); - if (current_chunked_filter->num_chunks() == 1) { - current_filter = current_chunked_filter->chunk(0); + if (len > 0) { + current_chunked_filter = filter.Slice(offset, len); + if (current_chunked_filter->num_chunks() == 1) { + current_filter = current_chunked_filter->chunk(0); + } else { + // Concatenate the chunks of the filter so we have an Array + RETURN_NOT_OK(Concatenate(current_chunked_filter->chunks(), + default_memory_pool(), ¤t_filter)); + } + RETURN_NOT_OK(Filter(ctx, *current_chunk, *current_filter, &new_chunks[i])); + offset += len; } else { - // Concatenate the chunks of the filter so we have an Array - RETURN_NOT_OK(Concatenate(current_chunked_filter->chunks(), - default_memory_pool(), ¤t_filter)); + // Put a zero length array there, which we know our current chunk to be + new_chunks[i] = current_chunk; } - RETURN_NOT_OK(Filter(ctx, *current_chunk, *current_filter, &new_chunks[i])); - offset += len; } *out = std::make_shared(std::move(new_chunks)); diff --git a/cpp/src/arrow/compute/kernels/filter_test.cc b/cpp/src/arrow/compute/kernels/filter_test.cc index 6437fc69d67..12785109b00 100644 --- a/cpp/src/arrow/compute/kernels/filter_test.cc +++ b/cpp/src/arrow/compute/kernels/filter_test.cc @@ -518,12 +518,59 @@ TEST_F(TestFilterKernelWithRecordBatch, FilterRecordBatch) { ])"); } +class TestFilterKernelWithChunkedArray : public TestFilterKernel { + public: + void AssertFilter(const std::shared_ptr& type, + const std::vector& values, + const std::string& filter, const std::vector& expected) { + std::shared_ptr actual; + ASSERT_OK(this->FilterWithArray(type, values, filter, &actual)); + ASSERT_OK(actual->Validate()); + AssertChunkedEqual(*ChunkedArrayFromJSON(type, expected), *actual); + } + + void AssertChunkedFilter(const std::shared_ptr& type, + const std::vector& values, + const std::vector& filter, + const std::vector& expected) { + std::shared_ptr actual; + ASSERT_OK(this->FilterWithChunkedArray(type, values, filter, &actual)); + ASSERT_OK(actual->Validate()); + AssertChunkedEqual(*ChunkedArrayFromJSON(type, expected), *actual); + } + + Status FilterWithArray(const std::shared_ptr& type, + const std::vector& values, + const std::string& filter, std::shared_ptr* out) { + return arrow::compute::Filter(&this->ctx_, *ChunkedArrayFromJSON(type, values), + *ArrayFromJSON(boolean(), filter), out); + } + + Status FilterWithChunkedArray(const std::shared_ptr& type, + const std::vector& values, + const std::vector& filter, + std::shared_ptr* out) { + return arrow::compute::Filter(&this->ctx_, *ChunkedArrayFromJSON(type, values), + *ChunkedArrayFromJSON(boolean(), filter), out); + } +}; + +TEST_F(TestFilterKernelWithChunkedArray, FilterChunkedArray) { + this->AssertFilter(int8(), {"[]"}, "[]", {"[]"}); + this->AssertChunkedFilter(int8(), {"[]"}, {"[]"}, {"[]"}); + + this->AssertFilter(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0]", {"[]", "[8]"}); + this->AssertChunkedFilter(int8(), {"[7]", "[8, 9]"}, {"[0]", "[1, 0]"}, {"[]", "[8]"}); + this->AssertChunkedFilter(int8(), {"[7]", "[8, 9]"}, {"[0, 1]", "[0]"}, {"[8]", "[]"}); + + std::shared_ptr arr; + ASSERT_RAISES(Invalid, this->FilterWithArray(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0, 1, 1]", &arr)); + ASSERT_RAISES(Invalid, this->FilterWithChunkedArray(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[1, 1]"}, &arr)); +} + // TODO (npr): add tests for: -// * Filter(ChunkedArray, Array) -// * Filter(ChunkedArray, ChunkedArray) // * Filter(Table, Array) // * Filter(Table, ChunkedArray) -// See cast_test.cc for test setup with chunked arrays } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/take_test.cc b/cpp/src/arrow/compute/kernels/take_test.cc index e65e1ef875b..38cd3198c80 100644 --- a/cpp/src/arrow/compute/kernels/take_test.cc +++ b/cpp/src/arrow/compute/kernels/take_test.cc @@ -592,5 +592,61 @@ TEST_F(TestTakeKernelWithRecordBatch, TakeRecordBatch) { ])"); } +class TestTakeKernelWithChunkedArray : public TestTakeKernel { + public: + void AssertTake(const std::shared_ptr& type, + const std::vector& values, + const std::string& filter, const std::vector& expected) { + std::shared_ptr actual; + ASSERT_OK(this->TakeWithArray(type, values, filter, &actual)); + ASSERT_OK(actual->Validate()); + AssertChunkedEqual(*ChunkedArrayFromJSON(type, expected), *actual); + } + + void AssertChunkedTake(const std::shared_ptr& type, + const std::vector& values, + const std::vector& filter, + const std::vector& expected) { + std::shared_ptr actual; + ASSERT_OK(this->TakeWithChunkedArray(type, values, filter, &actual)); + ASSERT_OK(actual->Validate()); + AssertChunkedEqual(*ChunkedArrayFromJSON(type, expected), *actual); + } + + Status TakeWithArray(const std::shared_ptr& type, + const std::vector& values, + const std::string& filter, std::shared_ptr* out) { + TakeOptions options; + return arrow::compute::Take(&this->ctx_, *ChunkedArrayFromJSON(type, values), + *ArrayFromJSON(int8(), filter), options, out); + } + + Status TakeWithChunkedArray(const std::shared_ptr& type, + const std::vector& values, + const std::vector& filter, + std::shared_ptr* out) { + TakeOptions options; + return arrow::compute::Take(&this->ctx_, *ChunkedArrayFromJSON(type, values), + *ChunkedArrayFromJSON(int8(), filter), options, out); + } +}; + +TEST_F(TestTakeKernelWithChunkedArray, TakeChunkedArray) { + this->AssertTake(int8(), {"[]"}, "[]", {"[]"}); + this->AssertChunkedTake(int8(), {"[]"}, {"[]"}, {"[]"}); + + this->AssertTake(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0, 2]", {"[7, 8, 7, 9]"}); + this->AssertChunkedTake(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[]", "[2]"}, {"[7, 8, 7]", "[]", "[9]"}); + this->AssertTake(int8(), {"[7]", "[8, 9]"}, "[2, 1]", {"[9, 8]"}); + + std::shared_ptr arr; + ASSERT_RAISES(IndexError, this->TakeWithArray(int8(), {"[7]", "[8, 9]"}, "[0, 5]", &arr)); + ASSERT_RAISES(IndexError, this->TakeWithChunkedArray(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[5, 1]"}, &arr)); +} + +// TODO (npr): add tests for: +// * Take(Table, Array) +// * Take(Table, ChunkedArray) + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index 393e7a977a3..95b5ffc5557 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -151,6 +151,15 @@ std::shared_ptr ArrayFromJSON(const std::shared_ptr& type, return out; } +std::shared_ptr ChunkedArrayFromJSON(const std::shared_ptr& type, + const std::vector& json) { + ArrayVector out_chunks; + for (const std::string& chunk_json : json) { + out_chunks.push_back(ArrayFromJSON(type, chunk_json)); + } + return std::make_shared(std::move(out_chunks)); +} + std::shared_ptr RecordBatchFromJSON(const std::shared_ptr& schema, util::string_view json) { // Parses as a StructArray diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index 8bae17f8e75..f6e2ec36dda 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -271,6 +271,12 @@ void ArrayFromVector(const std::vector& values, std::shared_ptr* ArrayFromVector(type, values, out); } +// ChunkedArrayFromJSON: construct an ChunkedArray from a simple JSON representation + +ARROW_EXPORT +std::shared_ptr ChunkedArrayFromJSON(const std::shared_ptr&, + const std::vector& json); + // ChunkedArrayFromVector: construct a ChunkedArray from vectors of C values template From 598fc22f5044a424931a62b34aac9e815e08daac Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 21 Oct 2019 12:40:54 -0700 Subject: [PATCH 11/18] Apply suggestions from code review Co-Authored-By: Benjamin Kietzman --- cpp/src/arrow/compute/kernels/filter.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.h b/cpp/src/arrow/compute/kernels/filter.h index 2ada4931bf1..d0217b2c124 100644 --- a/cpp/src/arrow/compute/kernels/filter.h +++ b/cpp/src/arrow/compute/kernels/filter.h @@ -102,14 +102,14 @@ Status Filter(FunctionContext* ctx, const RecordBatch& batch, const Array& filte /// \brief Filter a table with a boolean selection filter /// -/// The output record batch's columns will be populated with values from corresponding +/// The output table's columns will be populated with values from corresponding /// columns of the input at positions where the selection filter is not 0. Nulls in the -/// filter will result in nulls in the output. +/// filter will result in nulls in each column of the output. /// /// \param[in] ctx the FunctionContext -/// \param[in] table record batch to filter +/// \param[in] table table to filter /// \param[in] filter indicates which values should be filtered out -/// \param[out] out resulting record batch +/// \param[out] out resulting table ARROW_EXPORT Status Filter(FunctionContext* ctx, const Table& table, const Array& filter, std::shared_ptr
* out); From 7dd52d6f7a9089b7285bec0735fb6e2b54c2c30a Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 28 Oct 2019 11:42:21 -0700 Subject: [PATCH 12/18] Add table tests --- cpp/src/arrow/compute/kernels/filter_test.cc | 72 ++++++++++++--- cpp/src/arrow/compute/kernels/take_test.cc | 96 ++++++++++++++------ cpp/src/arrow/testing/gtest_util.cc | 12 +++ cpp/src/arrow/testing/gtest_util.h | 15 +-- 4 files changed, 150 insertions(+), 45 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter_test.cc b/cpp/src/arrow/compute/kernels/filter_test.cc index 12785109b00..6f20c7f8578 100644 --- a/cpp/src/arrow/compute/kernels/filter_test.cc +++ b/cpp/src/arrow/compute/kernels/filter_test.cc @@ -471,25 +471,18 @@ TEST_F(TestFilterKernelWithUnion, FilterUnion) { class TestFilterKernelWithRecordBatch : public TestFilterKernel { public: void AssertFilter(const std::shared_ptr& schm, const std::string& batch_json, - const std::string& selection, const std::string& expected_batch) { + const std::string& selection, const std::string& expected_batch) { std::shared_ptr actual; ASSERT_OK(this->Filter(schm, batch_json, selection, &actual)); ASSERT_OK(actual->Validate()); - ASSERT_BATCHES_EQUAL(*MakeBatch(schm, expected_batch), *actual); - } - - std::shared_ptr MakeBatch(const std::shared_ptr& schm, const std::string& batch_json) { - auto struct_array = ArrayFromJSON(struct_(schm->fields()), batch_json); - std::shared_ptr batch; - ARROW_EXPECT_OK(RecordBatch::FromStructArray(struct_array, &batch)); - return batch; + ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(schm, expected_batch), *actual); } Status Filter(const std::shared_ptr& schm, const std::string& batch_json, const std::string& selection, std::shared_ptr* out) { - auto batch = MakeBatch(schm, batch_json); + auto batch = RecordBatchFromJSON(schm, batch_json); return arrow::compute::Filter(&this->ctx_, *batch, *ArrayFromJSON(boolean(), selection), out); } @@ -568,9 +561,62 @@ TEST_F(TestFilterKernelWithChunkedArray, FilterChunkedArray) { ASSERT_RAISES(Invalid, this->FilterWithChunkedArray(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[1, 1]"}, &arr)); } -// TODO (npr): add tests for: -// * Filter(Table, Array) -// * Filter(Table, ChunkedArray) +class TestFilterKernelWithTable : public TestFilterKernel
{ + public: + void AssertFilter(const std::shared_ptr& schm, + const std::vector& table_json, + const std::string& filter, + const std::vector& expected_table) { + std::shared_ptr
actual; + + ASSERT_OK(this->FilterWithArray(schm, table_json, filter, &actual)); + ASSERT_OK(actual->Validate()); + ASSERT_TABLES_EQUAL(*TableFromJSON(schm, expected_table), *actual); + } + + void AssertChunkedFilter(const std::shared_ptr& schm, + const std::vector& table_json, + const std::vector& filter, + const std::vector& expected_table) { + std::shared_ptr
actual; + + ASSERT_OK(this->FilterWithChunkedArray(schm, table_json, filter, &actual)); + ASSERT_OK(actual->Validate()); + ASSERT_TABLES_EQUAL(*TableFromJSON(schm, expected_table), *actual); + } + + Status FilterWithArray(const std::shared_ptr& schm, + const std::vector& values, + const std::string& filter, std::shared_ptr
* out) { + return arrow::compute::Filter(&this->ctx_, *TableFromJSON(schm, values), + *ArrayFromJSON(boolean(), filter), out); + } + + Status FilterWithChunkedArray(const std::shared_ptr& schm, + const std::vector& values, + const std::vector& filter, + std::shared_ptr
* out) { + return arrow::compute::Filter(&this->ctx_, *TableFromJSON(schm, values), + *ChunkedArrayFromJSON(boolean(), filter), out); + } +}; + +TEST_F(TestFilterKernelWithTable, FilterTable) { + std::vector> fields = {field("a", int32()), field("b", utf8())}; + auto schm = schema(fields); + + std::vector table_json = {"[{\"a\": null, \"b\": \"yo\"},{\"a\": 1, \"b\": \"\"}]", + "[{\"a\": 2, \"b\": \"hello\"},{\"a\": 4, \"b\": \"eh\"}]"}; + this->AssertFilter(schm, table_json, "[0, 0, 0, 0]", {"[]", "[]"}); + this->AssertChunkedFilter(schm, table_json, {"[0]", "[0, 0, 0]"}, {"[]", "[]"}); + + std::vector expected2 = {"[{\"a\": 1, \"b\": \"\"}]", + "[{\"a\": 2, \"b\": \"hello\"},{\"a\": null, \"b\": null}]"}; + this->AssertFilter(schm, table_json, "[0, 1, 1, null]", expected2); + this->AssertChunkedFilter(schm, table_json, {"[0, 1, 1]", "[null]"}, expected2); + this->AssertFilter(schm, table_json, "[1, 1, 1, 1]", table_json); + this->AssertChunkedFilter(schm, table_json, {"[1]", "[1, 1, 1]"}, table_json); +} } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/take_test.cc b/cpp/src/arrow/compute/kernels/take_test.cc index 38cd3198c80..223b7962ea4 100644 --- a/cpp/src/arrow/compute/kernels/take_test.cc +++ b/cpp/src/arrow/compute/kernels/take_test.cc @@ -536,21 +536,14 @@ class TestTakeKernelWithRecordBatch : public TestTakeKernel { for (auto index_type : {int8(), uint32()}) { ASSERT_OK(this->Take(schm, batch_json, index_type, indices, &actual)); ASSERT_OK(actual->Validate()); - ASSERT_BATCHES_EQUAL(*MakeBatch(schm, expected_batch), *actual); + ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(schm, expected_batch), *actual); } } - std::shared_ptr MakeBatch(const std::shared_ptr& schm, const std::string& batch_json) { - auto struct_array = ArrayFromJSON(struct_(schm->fields()), batch_json); - std::shared_ptr batch; - ARROW_EXPECT_OK(RecordBatch::FromStructArray(struct_array, &batch)); - return batch; - } - Status Take(const std::shared_ptr& schm, const std::string& batch_json, const std::shared_ptr& index_type, const std::string& indices, std::shared_ptr* out) { - auto batch = MakeBatch(schm, batch_json); + auto batch = RecordBatchFromJSON(schm, batch_json); TakeOptions options; return arrow::compute::Take(&this->ctx_, *batch, *ArrayFromJSON(index_type, indices), options, out); @@ -595,39 +588,39 @@ TEST_F(TestTakeKernelWithRecordBatch, TakeRecordBatch) { class TestTakeKernelWithChunkedArray : public TestTakeKernel { public: void AssertTake(const std::shared_ptr& type, - const std::vector& values, - const std::string& filter, const std::vector& expected) { + const std::vector& values, + const std::string& indices, const std::vector& expected) { std::shared_ptr actual; - ASSERT_OK(this->TakeWithArray(type, values, filter, &actual)); + ASSERT_OK(this->TakeWithArray(type, values, indices, &actual)); ASSERT_OK(actual->Validate()); AssertChunkedEqual(*ChunkedArrayFromJSON(type, expected), *actual); } void AssertChunkedTake(const std::shared_ptr& type, - const std::vector& values, - const std::vector& filter, - const std::vector& expected) { + const std::vector& values, + const std::vector& indices, + const std::vector& expected) { std::shared_ptr actual; - ASSERT_OK(this->TakeWithChunkedArray(type, values, filter, &actual)); + ASSERT_OK(this->TakeWithChunkedArray(type, values, indices, &actual)); ASSERT_OK(actual->Validate()); AssertChunkedEqual(*ChunkedArrayFromJSON(type, expected), *actual); } Status TakeWithArray(const std::shared_ptr& type, - const std::vector& values, - const std::string& filter, std::shared_ptr* out) { + const std::vector& values, + const std::string& indices, std::shared_ptr* out) { TakeOptions options; return arrow::compute::Take(&this->ctx_, *ChunkedArrayFromJSON(type, values), - *ArrayFromJSON(int8(), filter), options, out); + *ArrayFromJSON(int8(), indices), options, out); } Status TakeWithChunkedArray(const std::shared_ptr& type, - const std::vector& values, - const std::vector& filter, - std::shared_ptr* out) { + const std::vector& values, + const std::vector& indices, + std::shared_ptr* out) { TakeOptions options; return arrow::compute::Take(&this->ctx_, *ChunkedArrayFromJSON(type, values), - *ChunkedArrayFromJSON(int8(), filter), options, out); + *ChunkedArrayFromJSON(int8(), indices), options, out); } }; @@ -644,9 +637,60 @@ TEST_F(TestTakeKernelWithChunkedArray, TakeChunkedArray) { ASSERT_RAISES(IndexError, this->TakeWithChunkedArray(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[5, 1]"}, &arr)); } -// TODO (npr): add tests for: -// * Take(Table, Array) -// * Take(Table, ChunkedArray) +class TestTakeKernelWithTable : public TestTakeKernel
{ + public: + void AssertTake(const std::shared_ptr& schm, + const std::vector& table_json, + const std::string& filter, + const std::vector& expected_table) { + std::shared_ptr
actual; + + ASSERT_OK(this->TakeWithArray(schm, table_json, filter, &actual)); + ASSERT_OK(actual->Validate()); + ASSERT_TABLES_EQUAL(*TableFromJSON(schm, expected_table), *actual); + } + + void AssertChunkedTake(const std::shared_ptr& schm, + const std::vector& table_json, + const std::vector& filter, + const std::vector& expected_table) { + std::shared_ptr
actual; + + ASSERT_OK(this->TakeWithChunkedArray(schm, table_json, filter, &actual)); + ASSERT_OK(actual->Validate()); + ASSERT_TABLES_EQUAL(*TableFromJSON(schm, expected_table), *actual); + } + + Status TakeWithArray(const std::shared_ptr& schm, + const std::vector& values, + const std::string& indices, std::shared_ptr
* out) { + TakeOptions options; + return arrow::compute::Take(&this->ctx_, *TableFromJSON(schm, values), + *ArrayFromJSON(int8(), indices), options, out); + } + + Status TakeWithChunkedArray(const std::shared_ptr& schm, + const std::vector& values, + const std::vector& indices, + std::shared_ptr
* out) { + TakeOptions options; + return arrow::compute::Take(&this->ctx_, *TableFromJSON(schm, values), + *ChunkedArrayFromJSON(int8(), indices), options, out); + } +}; + +TEST_F(TestTakeKernelWithTable, TakeTable) { + std::vector> fields = {field("a", int32()), field("b", utf8())}; + auto schm = schema(fields); + + std::vector table_json = {"[{\"a\": null, \"b\": \"yo\"},{\"a\": 1, \"b\": \"\"}]", + "[{\"a\": 2, \"b\": \"hello\"},{\"a\": 4, \"b\": \"eh\"}]"}; + + this->AssertTake(schm, table_json, "[]", {"[]"}); + std::vector expected_310 = {"[{\"a\": 4, \"b\": \"eh\"},{\"a\": 1, \"b\": \"\"},{\"a\": null, \"b\": \"yo\"}]"}; + this->AssertTake(schm, table_json, "[3, 1, 0]", expected_310); + this->AssertChunkedTake(schm, table_json, {"[0, 1]", "[2, 3]"}, table_json); +} } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index 95b5ffc5557..f3a401f6e8e 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -173,6 +173,18 @@ std::shared_ptr RecordBatchFromJSON(const std::shared_ptr& return record_batch; } +std::shared_ptr
TableFromJSON(const std::shared_ptr& schema, + const std::vector& json) { + std::vector> batches; + for (const std::string& batch_json : json) { + batches.push_back(RecordBatchFromJSON(schema, batch_json)); + } + std::shared_ptr
table; + ABORT_NOT_OK(Table::FromRecordBatches(schema, batches, &table)); + + return table; +} + void AssertTablesEqual(const Table& expected, const Table& actual, bool same_chunk_layout, bool combine_chunks) { ASSERT_EQ(expected.num_columns(), actual.num_columns()); diff --git a/cpp/src/arrow/testing/gtest_util.h b/cpp/src/arrow/testing/gtest_util.h index f6e2ec36dda..209fa295bc3 100644 --- a/cpp/src/arrow/testing/gtest_util.h +++ b/cpp/src/arrow/testing/gtest_util.h @@ -152,6 +152,7 @@ using ArrayVector = std::vector>; #define ASSERT_ARRAYS_EQUAL(lhs, rhs) AssertArraysEqual((lhs), (rhs)) #define ASSERT_BATCHES_EQUAL(lhs, rhs) AssertBatchesEqual((lhs), (rhs)) +#define ASSERT_TABLES_EQUAL(lhs, rhs) AssertTablesEqual((lhs), (rhs)) // If verbose is true, then the arrays will be pretty printed ARROW_EXPORT void AssertArraysEqual(const Array& expected, const Array& actual, @@ -213,6 +214,14 @@ std::shared_ptr ArrayFromJSON(const std::shared_ptr&, ARROW_EXPORT std::shared_ptr RecordBatchFromJSON( const std::shared_ptr&, util::string_view); +ARROW_EXPORT +std::shared_ptr ChunkedArrayFromJSON(const std::shared_ptr&, + const std::vector& json); + +ARROW_EXPORT +std::shared_ptr
TableFromJSON(const std::shared_ptr&, + const std::vector& json); + // ArrayFromVector: construct an Array from vectors of C values template @@ -271,12 +280,6 @@ void ArrayFromVector(const std::vector& values, std::shared_ptr* ArrayFromVector(type, values, out); } -// ChunkedArrayFromJSON: construct an ChunkedArray from a simple JSON representation - -ARROW_EXPORT -std::shared_ptr ChunkedArrayFromJSON(const std::shared_ptr&, - const std::vector& json); - // ChunkedArrayFromVector: construct a ChunkedArray from vectors of C values template From 7f2c436d2a6a18fa7b8bea45e7c624fc8a51becf Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 28 Oct 2019 11:49:34 -0700 Subject: [PATCH 13/18] Remove a bunch of leftover arrow:: --- cpp/src/arrow/compute/kernels/filter.cc | 20 ++++++++++---------- cpp/src/arrow/compute/kernels/take.cc | 22 +++++++++++----------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index cbb885b16d8..7ddaefc4440 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -170,8 +170,8 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& fil return Status::Invalid("filter and value array must have identical lengths"); } auto num_chunks = values.num_chunks(); - std::vector> new_chunks(num_chunks); - std::shared_ptr current_chunk; + std::vector> new_chunks(num_chunks); + std::shared_ptr current_chunk; int offset = 0; int len; @@ -182,7 +182,7 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& fil offset += len; } - *out = std::make_shared(std::move(new_chunks)); + *out = std::make_shared(std::move(new_chunks)); return Status::OK(); } @@ -192,10 +192,10 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArr return Status::Invalid("filter and value array must have identical lengths"); } auto num_chunks = values.num_chunks(); - std::vector> new_chunks(num_chunks); - std::shared_ptr current_chunk; - std::shared_ptr current_chunked_filter; - std::shared_ptr current_filter; + std::vector> new_chunks(num_chunks); + std::shared_ptr current_chunk; + std::shared_ptr current_chunked_filter; + std::shared_ptr current_filter; int offset = 0; int len; @@ -219,7 +219,7 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArr } } - *out = std::make_shared(std::move(new_chunks)); + *out = std::make_shared(std::move(new_chunks)); return Status::OK(); } @@ -227,7 +227,7 @@ Status Filter(FunctionContext* ctx, const Table& table, const Array& filter, std::shared_ptr
* out) { auto ncols = table.num_columns(); - std::vector> columns(ncols); + std::vector> columns(ncols); for (int j = 0; j < ncols; j++) { RETURN_NOT_OK(Filter(ctx, *table.column(j), filter, &columns[j])); @@ -240,7 +240,7 @@ Status Filter(FunctionContext* ctx, const Table& table, const ChunkedArray& filt std::shared_ptr
* out) { auto ncols = table.num_columns(); - std::vector> columns(ncols); + std::vector> columns(ncols); for (int j = 0; j < ncols; j++) { RETURN_NOT_OK(Filter(ctx, *table.column(j), filter, &columns[j])); diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index 4ca20b4a4ef..7606b3341f5 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -103,8 +103,8 @@ Status Take(FunctionContext* ctx, const Datum& values, const Datum& indices, Status Take(FunctionContext* ctx, const ChunkedArray& values, const Array& indices, const TakeOptions& options, std::shared_ptr* out) { auto num_chunks = values.num_chunks(); - std::vector> new_chunks(1); // Hard-coded 1 for now - std::shared_ptr current_chunk; + std::vector> new_chunks(1); // Hard-coded 1 for now + std::shared_ptr current_chunk; // Case 1: `values` has a single chunk, so just use it if (num_chunks == 1) { @@ -119,15 +119,15 @@ Status Take(FunctionContext* ctx, const ChunkedArray& values, const Array& indic } // Call Array Take on our single chunk RETURN_NOT_OK(Take(ctx, *current_chunk, indices, options, &new_chunks[0])); - *out = std::make_shared(std::move(new_chunks)); + *out = std::make_shared(std::move(new_chunks)); return Status::OK(); } Status Take(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& indices, const TakeOptions& options, std::shared_ptr* out) { auto num_chunks = indices.num_chunks(); - std::vector> new_chunks(num_chunks); - std::shared_ptr current_chunk; + std::vector> new_chunks(num_chunks); + std::shared_ptr current_chunk; for (int i = 0; i < num_chunks; i++) { // Take with that indices chunk @@ -135,20 +135,20 @@ Status Take(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray // Concatenate the result to make a single array for this chunk RETURN_NOT_OK(Concatenate(current_chunk->chunks(), default_memory_pool(), &new_chunks[i])); } - *out = std::make_shared(std::move(new_chunks)); + *out = std::make_shared(std::move(new_chunks)); return Status::OK(); } Status Take(FunctionContext* ctx, const Array& values, const ChunkedArray& indices, const TakeOptions& options, std::shared_ptr* out) { auto num_chunks = indices.num_chunks(); - std::vector> new_chunks(num_chunks); + std::vector> new_chunks(num_chunks); for (int i = 0; i < num_chunks; i++) { // Take with that indices chunk RETURN_NOT_OK(Take(ctx, values, *indices.chunk(i), options, &new_chunks[i])); } - *out = std::make_shared(std::move(new_chunks)); + *out = std::make_shared(std::move(new_chunks)); return Status::OK(); } @@ -157,7 +157,7 @@ Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices auto ncols = batch.num_columns(); auto nrows = indices.length(); - std::vector> columns(ncols); + std::vector> columns(ncols); for (int j = 0; j < ncols; j++) { RETURN_NOT_OK(Take(ctx, *batch.column(j), indices, options, &columns[j])); @@ -169,7 +169,7 @@ Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices Status Take(FunctionContext* ctx, const Table& table, const Array& indices, const TakeOptions& options, std::shared_ptr
* out) { auto ncols = table.num_columns(); - std::vector> columns(ncols); + std::vector> columns(ncols); for (int j = 0; j < ncols; j++) { RETURN_NOT_OK(Take(ctx, *table.column(j), indices, options, &columns[j])); @@ -181,7 +181,7 @@ Status Take(FunctionContext* ctx, const Table& table, const Array& indices, Status Take(FunctionContext* ctx, const Table& table, const ChunkedArray& indices, const TakeOptions& options, std::shared_ptr
* out) { auto ncols = table.num_columns(); - std::vector> columns(ncols); + std::vector> columns(ncols); for (int j = 0; j < ncols; j++) { RETURN_NOT_OK(Take(ctx, *table.column(j), indices, options, &columns[j])); From 4ef90eb7f149e316d91b9eba4f84ae02310f3af1 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 28 Oct 2019 12:12:23 -0700 Subject: [PATCH 14/18] lint --- cpp/src/arrow/compute/kernels/filter.cc | 11 +++--- cpp/src/arrow/compute/kernels/filter.h | 4 +-- cpp/src/arrow/compute/kernels/filter_test.cc | 28 ++++++++------- cpp/src/arrow/compute/kernels/take.cc | 15 ++++---- cpp/src/arrow/compute/kernels/take_test.cc | 36 +++++++++++--------- cpp/src/arrow/testing/gtest_util.cc | 12 +++---- 6 files changed, 58 insertions(+), 48 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index 7ddaefc4440..6359a3bf031 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -178,7 +178,8 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& fil for (int i = 0; i < num_chunks; i++) { current_chunk = values.chunk(i); len = current_chunk->length(); - RETURN_NOT_OK(Filter(ctx, *current_chunk, *filter.Slice(offset, len), &new_chunks[i])); + RETURN_NOT_OK( + Filter(ctx, *current_chunk, *filter.Slice(offset, len), &new_chunks[i])); offset += len; } @@ -186,8 +187,8 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& fil return Status::OK(); } -Status Filter(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& filter, - std::shared_ptr* out) { +Status Filter(FunctionContext* ctx, const ChunkedArray& values, + const ChunkedArray& filter, std::shared_ptr* out) { if (values.length() != filter.length()) { return Status::Invalid("filter and value array must have identical lengths"); } @@ -208,8 +209,8 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArr current_filter = current_chunked_filter->chunk(0); } else { // Concatenate the chunks of the filter so we have an Array - RETURN_NOT_OK(Concatenate(current_chunked_filter->chunks(), - default_memory_pool(), ¤t_filter)); + RETURN_NOT_OK(Concatenate(current_chunked_filter->chunks(), default_memory_pool(), + ¤t_filter)); } RETURN_NOT_OK(Filter(ctx, *current_chunk, *current_filter, &new_chunks[i])); offset += len; diff --git a/cpp/src/arrow/compute/kernels/filter.h b/cpp/src/arrow/compute/kernels/filter.h index d0217b2c124..99b022dbd54 100644 --- a/cpp/src/arrow/compute/kernels/filter.h +++ b/cpp/src/arrow/compute/kernels/filter.h @@ -83,8 +83,8 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& fil /// \param[in] filter indicates which values should be filtered out /// \param[out] out resulting chunked array ARROW_EXPORT -Status Filter(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& filter, - std::shared_ptr* out); +Status Filter(FunctionContext* ctx, const ChunkedArray& values, + const ChunkedArray& filter, std::shared_ptr* out); /// \brief Filter a record batch with a boolean selection filter /// diff --git a/cpp/src/arrow/compute/kernels/filter_test.cc b/cpp/src/arrow/compute/kernels/filter_test.cc index 6f20c7f8578..0c00ce4ca1e 100644 --- a/cpp/src/arrow/compute/kernels/filter_test.cc +++ b/cpp/src/arrow/compute/kernels/filter_test.cc @@ -480,11 +480,10 @@ class TestFilterKernelWithRecordBatch : public TestFilterKernel { } Status Filter(const std::shared_ptr& schm, const std::string& batch_json, - const std::string& selection, - std::shared_ptr* out) { + const std::string& selection, std::shared_ptr* out) { auto batch = RecordBatchFromJSON(schm, batch_json); return arrow::compute::Filter(&this->ctx_, *batch, - *ArrayFromJSON(boolean(), selection), out); + *ArrayFromJSON(boolean(), selection), out); } }; @@ -514,8 +513,8 @@ TEST_F(TestFilterKernelWithRecordBatch, FilterRecordBatch) { class TestFilterKernelWithChunkedArray : public TestFilterKernel { public: void AssertFilter(const std::shared_ptr& type, - const std::vector& values, - const std::string& filter, const std::vector& expected) { + const std::vector& values, const std::string& filter, + const std::vector& expected) { std::shared_ptr actual; ASSERT_OK(this->FilterWithArray(type, values, filter, &actual)); ASSERT_OK(actual->Validate()); @@ -557,15 +556,16 @@ TEST_F(TestFilterKernelWithChunkedArray, FilterChunkedArray) { this->AssertChunkedFilter(int8(), {"[7]", "[8, 9]"}, {"[0, 1]", "[0]"}, {"[8]", "[]"}); std::shared_ptr arr; - ASSERT_RAISES(Invalid, this->FilterWithArray(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0, 1, 1]", &arr)); - ASSERT_RAISES(Invalid, this->FilterWithChunkedArray(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[1, 1]"}, &arr)); + ASSERT_RAISES( + Invalid, this->FilterWithArray(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0, 1, 1]", &arr)); + ASSERT_RAISES(Invalid, this->FilterWithChunkedArray(int8(), {"[7]", "[8, 9]"}, + {"[0, 1, 0]", "[1, 1]"}, &arr)); } class TestFilterKernelWithTable : public TestFilterKernel
{ public: void AssertFilter(const std::shared_ptr& schm, - const std::vector& table_json, - const std::string& filter, + const std::vector& table_json, const std::string& filter, const std::vector& expected_table) { std::shared_ptr
actual; @@ -605,13 +605,15 @@ TEST_F(TestFilterKernelWithTable, FilterTable) { std::vector> fields = {field("a", int32()), field("b", utf8())}; auto schm = schema(fields); - std::vector table_json = {"[{\"a\": null, \"b\": \"yo\"},{\"a\": 1, \"b\": \"\"}]", - "[{\"a\": 2, \"b\": \"hello\"},{\"a\": 4, \"b\": \"eh\"}]"}; + std::vector table_json = { + "[{\"a\": null, \"b\": \"yo\"},{\"a\": 1, \"b\": \"\"}]", + "[{\"a\": 2, \"b\": \"hello\"},{\"a\": 4, \"b\": \"eh\"}]"}; this->AssertFilter(schm, table_json, "[0, 0, 0, 0]", {"[]", "[]"}); this->AssertChunkedFilter(schm, table_json, {"[0]", "[0, 0, 0]"}, {"[]", "[]"}); - std::vector expected2 = {"[{\"a\": 1, \"b\": \"\"}]", - "[{\"a\": 2, \"b\": \"hello\"},{\"a\": null, \"b\": null}]"}; + std::vector expected2 = { + "[{\"a\": 1, \"b\": \"\"}]", + "[{\"a\": 2, \"b\": \"hello\"},{\"a\": null, \"b\": null}]"}; this->AssertFilter(schm, table_json, "[0, 1, 1, null]", expected2); this->AssertChunkedFilter(schm, table_json, {"[0, 1, 1]", "[null]"}, expected2); this->AssertFilter(schm, table_json, "[1, 1, 1, 1]", table_json); diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index 7606b3341f5..84666a4a3f2 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -18,6 +18,7 @@ #include #include #include +#include #include "arrow/array/concatenate.h" #include "arrow/compute/kernels/take.h" @@ -103,7 +104,7 @@ Status Take(FunctionContext* ctx, const Datum& values, const Datum& indices, Status Take(FunctionContext* ctx, const ChunkedArray& values, const Array& indices, const TakeOptions& options, std::shared_ptr* out) { auto num_chunks = values.num_chunks(); - std::vector> new_chunks(1); // Hard-coded 1 for now + std::vector> new_chunks(1); // Hard-coded 1 for now std::shared_ptr current_chunk; // Case 1: `values` has a single chunk, so just use it @@ -111,7 +112,8 @@ Status Take(FunctionContext* ctx, const ChunkedArray& values, const Array& indic current_chunk = values.chunk(0); } else { // TODO Case 2: See if all `indices` fall in the same chunk and call Array Take on it - // See https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151 + // See + // https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151 // TODO Case 3: If indices are sorted, can slice them and call Array Take // Case 4: Else, concatenate chunks and call Array Take @@ -133,7 +135,8 @@ Status Take(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray // Take with that indices chunk RETURN_NOT_OK(Take(ctx, values, *indices.chunk(i), options, ¤t_chunk)); // Concatenate the result to make a single array for this chunk - RETURN_NOT_OK(Concatenate(current_chunk->chunks(), default_memory_pool(), &new_chunks[i])); + RETURN_NOT_OK( + Concatenate(current_chunk->chunks(), default_memory_pool(), &new_chunks[i])); } *out = std::make_shared(std::move(new_chunks)); return Status::OK(); @@ -153,7 +156,7 @@ Status Take(FunctionContext* ctx, const Array& values, const ChunkedArray& indic } Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices, - const TakeOptions& options, std::shared_ptr* out) { + const TakeOptions& options, std::shared_ptr* out) { auto ncols = batch.num_columns(); auto nrows = indices.length(); @@ -167,7 +170,7 @@ Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices } Status Take(FunctionContext* ctx, const Table& table, const Array& indices, - const TakeOptions& options, std::shared_ptr
* out) { + const TakeOptions& options, std::shared_ptr
* out) { auto ncols = table.num_columns(); std::vector> columns(ncols); @@ -179,7 +182,7 @@ Status Take(FunctionContext* ctx, const Table& table, const Array& indices, } Status Take(FunctionContext* ctx, const Table& table, const ChunkedArray& indices, - const TakeOptions& options, std::shared_ptr
* out) { + const TakeOptions& options, std::shared_ptr
* out) { auto ncols = table.num_columns(); std::vector> columns(ncols); diff --git a/cpp/src/arrow/compute/kernels/take_test.cc b/cpp/src/arrow/compute/kernels/take_test.cc index 223b7962ea4..c886a00ead9 100644 --- a/cpp/src/arrow/compute/kernels/take_test.cc +++ b/cpp/src/arrow/compute/kernels/take_test.cc @@ -545,8 +545,8 @@ class TestTakeKernelWithRecordBatch : public TestTakeKernel { std::shared_ptr* out) { auto batch = RecordBatchFromJSON(schm, batch_json); TakeOptions options; - return arrow::compute::Take(&this->ctx_, *batch, - *ArrayFromJSON(index_type, indices), options, out); + return arrow::compute::Take(&this->ctx_, *batch, *ArrayFromJSON(index_type, indices), + options, out); } }; @@ -588,8 +588,8 @@ TEST_F(TestTakeKernelWithRecordBatch, TakeRecordBatch) { class TestTakeKernelWithChunkedArray : public TestTakeKernel { public: void AssertTake(const std::shared_ptr& type, - const std::vector& values, - const std::string& indices, const std::vector& expected) { + const std::vector& values, const std::string& indices, + const std::vector& expected) { std::shared_ptr actual; ASSERT_OK(this->TakeWithArray(type, values, indices, &actual)); ASSERT_OK(actual->Validate()); @@ -607,8 +607,8 @@ class TestTakeKernelWithChunkedArray : public TestTakeKernel { } Status TakeWithArray(const std::shared_ptr& type, - const std::vector& values, - const std::string& indices, std::shared_ptr* out) { + const std::vector& values, const std::string& indices, + std::shared_ptr* out) { TakeOptions options; return arrow::compute::Take(&this->ctx_, *ChunkedArrayFromJSON(type, values), *ArrayFromJSON(int8(), indices), options, out); @@ -629,19 +629,21 @@ TEST_F(TestTakeKernelWithChunkedArray, TakeChunkedArray) { this->AssertChunkedTake(int8(), {"[]"}, {"[]"}, {"[]"}); this->AssertTake(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0, 2]", {"[7, 8, 7, 9]"}); - this->AssertChunkedTake(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[]", "[2]"}, {"[7, 8, 7]", "[]", "[9]"}); + this->AssertChunkedTake(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[]", "[2]"}, + {"[7, 8, 7]", "[]", "[9]"}); this->AssertTake(int8(), {"[7]", "[8, 9]"}, "[2, 1]", {"[9, 8]"}); std::shared_ptr arr; - ASSERT_RAISES(IndexError, this->TakeWithArray(int8(), {"[7]", "[8, 9]"}, "[0, 5]", &arr)); - ASSERT_RAISES(IndexError, this->TakeWithChunkedArray(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[5, 1]"}, &arr)); + ASSERT_RAISES(IndexError, + this->TakeWithArray(int8(), {"[7]", "[8, 9]"}, "[0, 5]", &arr)); + ASSERT_RAISES(IndexError, this->TakeWithChunkedArray(int8(), {"[7]", "[8, 9]"}, + {"[0, 1, 0]", "[5, 1]"}, &arr)); } class TestTakeKernelWithTable : public TestTakeKernel
{ public: void AssertTake(const std::shared_ptr& schm, - const std::vector& table_json, - const std::string& filter, + const std::vector& table_json, const std::string& filter, const std::vector& expected_table) { std::shared_ptr
actual; @@ -662,8 +664,8 @@ class TestTakeKernelWithTable : public TestTakeKernel
{ } Status TakeWithArray(const std::shared_ptr& schm, - const std::vector& values, - const std::string& indices, std::shared_ptr
* out) { + const std::vector& values, const std::string& indices, + std::shared_ptr
* out) { TakeOptions options; return arrow::compute::Take(&this->ctx_, *TableFromJSON(schm, values), *ArrayFromJSON(int8(), indices), options, out); @@ -683,11 +685,13 @@ TEST_F(TestTakeKernelWithTable, TakeTable) { std::vector> fields = {field("a", int32()), field("b", utf8())}; auto schm = schema(fields); - std::vector table_json = {"[{\"a\": null, \"b\": \"yo\"},{\"a\": 1, \"b\": \"\"}]", - "[{\"a\": 2, \"b\": \"hello\"},{\"a\": 4, \"b\": \"eh\"}]"}; + std::vector table_json = { + "[{\"a\": null, \"b\": \"yo\"},{\"a\": 1, \"b\": \"\"}]", + "[{\"a\": 2, \"b\": \"hello\"},{\"a\": 4, \"b\": \"eh\"}]"}; this->AssertTake(schm, table_json, "[]", {"[]"}); - std::vector expected_310 = {"[{\"a\": 4, \"b\": \"eh\"},{\"a\": 1, \"b\": \"\"},{\"a\": null, \"b\": \"yo\"}]"}; + std::vector expected_310 = { + "[{\"a\": 4, \"b\": \"eh\"},{\"a\": 1, \"b\": \"\"},{\"a\": null, \"b\": \"yo\"}]"}; this->AssertTake(schm, table_json, "[3, 1, 0]", expected_310); this->AssertChunkedTake(schm, table_json, {"[0, 1]", "[2, 3]"}, table_json); } diff --git a/cpp/src/arrow/testing/gtest_util.cc b/cpp/src/arrow/testing/gtest_util.cc index f3a401f6e8e..b0a7dbca42e 100644 --- a/cpp/src/arrow/testing/gtest_util.cc +++ b/cpp/src/arrow/testing/gtest_util.cc @@ -153,11 +153,11 @@ std::shared_ptr ArrayFromJSON(const std::shared_ptr& type, std::shared_ptr ChunkedArrayFromJSON(const std::shared_ptr& type, const std::vector& json) { - ArrayVector out_chunks; - for (const std::string& chunk_json : json) { - out_chunks.push_back(ArrayFromJSON(type, chunk_json)); - } - return std::make_shared(std::move(out_chunks)); + ArrayVector out_chunks; + for (const std::string& chunk_json : json) { + out_chunks.push_back(ArrayFromJSON(type, chunk_json)); + } + return std::make_shared(std::move(out_chunks)); } std::shared_ptr RecordBatchFromJSON(const std::shared_ptr& schema, @@ -177,7 +177,7 @@ std::shared_ptr
TableFromJSON(const std::shared_ptr& schema, const std::vector& json) { std::vector> batches; for (const std::string& batch_json : json) { - batches.push_back(RecordBatchFromJSON(schema, batch_json)); + batches.push_back(RecordBatchFromJSON(schema, batch_json)); } std::shared_ptr
table; ABORT_NOT_OK(Table::FromRecordBatches(schema, batches, &table)); From f7075982d8d35dec1e3d522dff760d1ef51a7fb4 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 28 Oct 2019 12:35:31 -0700 Subject: [PATCH 15/18] moar lint --- r/src/compute.cpp | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/r/src/compute.cpp b/r/src/compute.cpp index 31ba18d2ad2..6f78bcb6316 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -94,7 +94,8 @@ std::shared_ptr Array__Take(const std::shared_ptr& v // [[arrow::export]] std::shared_ptr Array__TakeChunked( - const std::shared_ptr& values, const std::shared_ptr& indices) { + const std::shared_ptr& values, + const std::shared_ptr& indices) { std::shared_ptr out; arrow::compute::FunctionContext context; arrow::compute::TakeOptions options; @@ -116,7 +117,8 @@ std::shared_ptr RecordBatch__Take( // [[arrow::export]] std::shared_ptr ChunkedArray__Take( - const std::shared_ptr& values, const std::shared_ptr& indices) { + const std::shared_ptr& values, + const std::shared_ptr& indices) { std::shared_ptr out; arrow::compute::FunctionContext context; arrow::compute::TakeOptions options; @@ -127,7 +129,8 @@ std::shared_ptr ChunkedArray__Take( // [[arrow::export]] std::shared_ptr ChunkedArray__TakeChunked( - const std::shared_ptr& values, const std::shared_ptr& indices) { + const std::shared_ptr& values, + const std::shared_ptr& indices) { std::shared_ptr out; arrow::compute::FunctionContext context; arrow::compute::TakeOptions options; @@ -148,8 +151,9 @@ std::shared_ptr Table__Take(const std::shared_ptr& t } // [[arrow::export]] -std::shared_ptr Table__TakeChunked(const std::shared_ptr& table, - const std::shared_ptr& indices) { +std::shared_ptr Table__TakeChunked( + const std::shared_ptr& table, + const std::shared_ptr& indices) { std::shared_ptr out; arrow::compute::FunctionContext context; arrow::compute::TakeOptions options; @@ -181,7 +185,6 @@ std::shared_ptr RecordBatch__Filter( std::shared_ptr ChunkedArray__Filter( const std::shared_ptr& values, const std::shared_ptr& filter) { - std::shared_ptr out; arrow::compute::FunctionContext context; STOP_IF_NOT_OK(arrow::compute::Filter(&context, *values, *filter, &out)); From e39425baed1f3ba19c08c51fdf90d0fe3ed15652 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Tue, 29 Oct 2019 14:55:39 -0700 Subject: [PATCH 16/18] s/int/int64_t/g --- cpp/src/arrow/compute/kernels/filter.cc | 20 ++++++++++---------- cpp/src/arrow/compute/kernels/take.cc | 10 +++++----- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index 6359a3bf031..9f791761bf7 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -149,13 +149,13 @@ Status Filter(FunctionContext* ctx, const RecordBatch& batch, const Array& filte ARROW_ASSIGN_OR_RAISE(auto filter_array, GetFilterArray(Datum(filter.data()))); std::vector> kernels(batch.num_columns()); - for (int i = 0; i < batch.num_columns(); ++i) { + for (int64_t i = 0; i < batch.num_columns(); ++i) { RETURN_NOT_OK(FilterKernel::Make(batch.schema()->field(i)->type(), &kernels[i])); } std::vector> columns(batch.num_columns()); auto out_length = OutputSize(*filter_array); - for (int i = 0; i < batch.num_columns(); ++i) { + for (int64_t i = 0; i < batch.num_columns(); ++i) { RETURN_NOT_OK(kernels[i]->Filter(ctx, *batch.column(i), *filter_array, out_length, &columns[i])); } @@ -172,10 +172,10 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& fil auto num_chunks = values.num_chunks(); std::vector> new_chunks(num_chunks); std::shared_ptr current_chunk; - int offset = 0; - int len; + int64_t offset = 0; + int64_t len; - for (int i = 0; i < num_chunks; i++) { + for (int64_t i = 0; i < num_chunks; i++) { current_chunk = values.chunk(i); len = current_chunk->length(); RETURN_NOT_OK( @@ -197,10 +197,10 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, std::shared_ptr current_chunk; std::shared_ptr current_chunked_filter; std::shared_ptr current_filter; - int offset = 0; - int len; + int64_t offset = 0; + int64_t len; - for (int i = 0; i < num_chunks; i++) { + for (int64_t i = 0; i < num_chunks; i++) { current_chunk = values.chunk(i); len = current_chunk->length(); if (len > 0) { @@ -230,7 +230,7 @@ Status Filter(FunctionContext* ctx, const Table& table, const Array& filter, std::vector> columns(ncols); - for (int j = 0; j < ncols; j++) { + for (int64_t j = 0; j < ncols; j++) { RETURN_NOT_OK(Filter(ctx, *table.column(j), filter, &columns[j])); } *out = Table::Make(table.schema(), columns); @@ -243,7 +243,7 @@ Status Filter(FunctionContext* ctx, const Table& table, const ChunkedArray& filt std::vector> columns(ncols); - for (int j = 0; j < ncols; j++) { + for (int64_t j = 0; j < ncols; j++) { RETURN_NOT_OK(Filter(ctx, *table.column(j), filter, &columns[j])); } *out = Table::Make(table.schema(), columns); diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index 84666a4a3f2..5955ad9e0c4 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -131,7 +131,7 @@ Status Take(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray std::vector> new_chunks(num_chunks); std::shared_ptr current_chunk; - for (int i = 0; i < num_chunks; i++) { + for (int64_t i = 0; i < num_chunks; i++) { // Take with that indices chunk RETURN_NOT_OK(Take(ctx, values, *indices.chunk(i), options, ¤t_chunk)); // Concatenate the result to make a single array for this chunk @@ -147,7 +147,7 @@ Status Take(FunctionContext* ctx, const Array& values, const ChunkedArray& indic auto num_chunks = indices.num_chunks(); std::vector> new_chunks(num_chunks); - for (int i = 0; i < num_chunks; i++) { + for (int64_t i = 0; i < num_chunks; i++) { // Take with that indices chunk RETURN_NOT_OK(Take(ctx, values, *indices.chunk(i), options, &new_chunks[i])); } @@ -162,7 +162,7 @@ Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices std::vector> columns(ncols); - for (int j = 0; j < ncols; j++) { + for (int64_t j = 0; j < ncols; j++) { RETURN_NOT_OK(Take(ctx, *batch.column(j), indices, options, &columns[j])); } *out = RecordBatch::Make(batch.schema(), nrows, columns); @@ -174,7 +174,7 @@ Status Take(FunctionContext* ctx, const Table& table, const Array& indices, auto ncols = table.num_columns(); std::vector> columns(ncols); - for (int j = 0; j < ncols; j++) { + for (int64_t j = 0; j < ncols; j++) { RETURN_NOT_OK(Take(ctx, *table.column(j), indices, options, &columns[j])); } *out = Table::Make(table.schema(), columns); @@ -186,7 +186,7 @@ Status Take(FunctionContext* ctx, const Table& table, const ChunkedArray& indice auto ncols = table.num_columns(); std::vector> columns(ncols); - for (int j = 0; j < ncols; j++) { + for (int64_t j = 0; j < ncols; j++) { RETURN_NOT_OK(Take(ctx, *table.column(j), indices, options, &columns[j])); } *out = Table::Make(table.schema(), columns); From f29867fd1abe8027148c8829753b71bf61bb89a3 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Tue, 29 Oct 2019 15:12:12 -0700 Subject: [PATCH 17/18] Too much int64_t --- cpp/src/arrow/compute/kernels/filter.cc | 12 ++++++------ cpp/src/arrow/compute/kernels/take.cc | 10 +++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/filter.cc b/cpp/src/arrow/compute/kernels/filter.cc index 9f791761bf7..1b2ba31613a 100644 --- a/cpp/src/arrow/compute/kernels/filter.cc +++ b/cpp/src/arrow/compute/kernels/filter.cc @@ -149,13 +149,13 @@ Status Filter(FunctionContext* ctx, const RecordBatch& batch, const Array& filte ARROW_ASSIGN_OR_RAISE(auto filter_array, GetFilterArray(Datum(filter.data()))); std::vector> kernels(batch.num_columns()); - for (int64_t i = 0; i < batch.num_columns(); ++i) { + for (int i = 0; i < batch.num_columns(); ++i) { RETURN_NOT_OK(FilterKernel::Make(batch.schema()->field(i)->type(), &kernels[i])); } std::vector> columns(batch.num_columns()); auto out_length = OutputSize(*filter_array); - for (int64_t i = 0; i < batch.num_columns(); ++i) { + for (int i = 0; i < batch.num_columns(); ++i) { RETURN_NOT_OK(kernels[i]->Filter(ctx, *batch.column(i), *filter_array, out_length, &columns[i])); } @@ -175,7 +175,7 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& fil int64_t offset = 0; int64_t len; - for (int64_t i = 0; i < num_chunks; i++) { + for (int i = 0; i < num_chunks; i++) { current_chunk = values.chunk(i); len = current_chunk->length(); RETURN_NOT_OK( @@ -200,7 +200,7 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, int64_t offset = 0; int64_t len; - for (int64_t i = 0; i < num_chunks; i++) { + for (int i = 0; i < num_chunks; i++) { current_chunk = values.chunk(i); len = current_chunk->length(); if (len > 0) { @@ -230,7 +230,7 @@ Status Filter(FunctionContext* ctx, const Table& table, const Array& filter, std::vector> columns(ncols); - for (int64_t j = 0; j < ncols; j++) { + for (int j = 0; j < ncols; j++) { RETURN_NOT_OK(Filter(ctx, *table.column(j), filter, &columns[j])); } *out = Table::Make(table.schema(), columns); @@ -243,7 +243,7 @@ Status Filter(FunctionContext* ctx, const Table& table, const ChunkedArray& filt std::vector> columns(ncols); - for (int64_t j = 0; j < ncols; j++) { + for (int j = 0; j < ncols; j++) { RETURN_NOT_OK(Filter(ctx, *table.column(j), filter, &columns[j])); } *out = Table::Make(table.schema(), columns); diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index 5955ad9e0c4..84666a4a3f2 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -131,7 +131,7 @@ Status Take(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray std::vector> new_chunks(num_chunks); std::shared_ptr current_chunk; - for (int64_t i = 0; i < num_chunks; i++) { + for (int i = 0; i < num_chunks; i++) { // Take with that indices chunk RETURN_NOT_OK(Take(ctx, values, *indices.chunk(i), options, ¤t_chunk)); // Concatenate the result to make a single array for this chunk @@ -147,7 +147,7 @@ Status Take(FunctionContext* ctx, const Array& values, const ChunkedArray& indic auto num_chunks = indices.num_chunks(); std::vector> new_chunks(num_chunks); - for (int64_t i = 0; i < num_chunks; i++) { + for (int i = 0; i < num_chunks; i++) { // Take with that indices chunk RETURN_NOT_OK(Take(ctx, values, *indices.chunk(i), options, &new_chunks[i])); } @@ -162,7 +162,7 @@ Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices std::vector> columns(ncols); - for (int64_t j = 0; j < ncols; j++) { + for (int j = 0; j < ncols; j++) { RETURN_NOT_OK(Take(ctx, *batch.column(j), indices, options, &columns[j])); } *out = RecordBatch::Make(batch.schema(), nrows, columns); @@ -174,7 +174,7 @@ Status Take(FunctionContext* ctx, const Table& table, const Array& indices, auto ncols = table.num_columns(); std::vector> columns(ncols); - for (int64_t j = 0; j < ncols; j++) { + for (int j = 0; j < ncols; j++) { RETURN_NOT_OK(Take(ctx, *table.column(j), indices, options, &columns[j])); } *out = Table::Make(table.schema(), columns); @@ -186,7 +186,7 @@ Status Take(FunctionContext* ctx, const Table& table, const ChunkedArray& indice auto ncols = table.num_columns(); std::vector> columns(ncols); - for (int64_t j = 0; j < ncols; j++) { + for (int j = 0; j < ncols; j++) { RETURN_NOT_OK(Take(ctx, *table.column(j), indices, options, &columns[j])); } *out = Table::Make(table.schema(), columns); From 21dbd26db2c48ac64140cf11bf5491cc9765e58b Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Thu, 31 Oct 2019 13:26:23 -0700 Subject: [PATCH 18/18] Add notes and warnings --- cpp/src/arrow/compute/kernels/filter.h | 5 +++++ cpp/src/arrow/compute/kernels/take.cc | 2 ++ cpp/src/arrow/compute/kernels/take.h | 6 ++++++ 3 files changed, 13 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/filter.h b/cpp/src/arrow/compute/kernels/filter.h index 99b022dbd54..bc7f75db539 100644 --- a/cpp/src/arrow/compute/kernels/filter.h +++ b/cpp/src/arrow/compute/kernels/filter.h @@ -64,6 +64,7 @@ Status Filter(FunctionContext* ctx, const Array& values, const Array& filter, /// \param[in] values chunked array to filter /// \param[in] filter indicates which values should be filtered out /// \param[out] out resulting chunked array +/// NOTE: Experimental API ARROW_EXPORT Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& filter, std::shared_ptr* out); @@ -82,6 +83,7 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& fil /// \param[in] values chunked array to filter /// \param[in] filter indicates which values should be filtered out /// \param[out] out resulting chunked array +/// NOTE: Experimental API ARROW_EXPORT Status Filter(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& filter, std::shared_ptr* out); @@ -96,6 +98,7 @@ Status Filter(FunctionContext* ctx, const ChunkedArray& values, /// \param[in] batch record batch to filter /// \param[in] filter indicates which values should be filtered out /// \param[out] out resulting record batch +/// NOTE: Experimental API ARROW_EXPORT Status Filter(FunctionContext* ctx, const RecordBatch& batch, const Array& filter, std::shared_ptr* out); @@ -110,6 +113,7 @@ Status Filter(FunctionContext* ctx, const RecordBatch& batch, const Array& filte /// \param[in] table table to filter /// \param[in] filter indicates which values should be filtered out /// \param[out] out resulting table +/// NOTE: Experimental API ARROW_EXPORT Status Filter(FunctionContext* ctx, const Table& table, const Array& filter, std::shared_ptr
* out); @@ -124,6 +128,7 @@ Status Filter(FunctionContext* ctx, const Table& table, const Array& filter, /// \param[in] table record batch to filter /// \param[in] filter indicates which values should be filtered out /// \param[out] out resulting record batch +/// NOTE: Experimental API ARROW_EXPORT Status Filter(FunctionContext* ctx, const Table& table, const ChunkedArray& filter, std::shared_ptr
* out); diff --git a/cpp/src/arrow/compute/kernels/take.cc b/cpp/src/arrow/compute/kernels/take.cc index 84666a4a3f2..2fa860c7682 100644 --- a/cpp/src/arrow/compute/kernels/take.cc +++ b/cpp/src/arrow/compute/kernels/take.cc @@ -133,6 +133,8 @@ Status Take(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray for (int i = 0; i < num_chunks; i++) { // Take with that indices chunk + // Note that as currently implemented, this is inefficient because `values` + // will get concatenated on every iteration of this loop RETURN_NOT_OK(Take(ctx, values, *indices.chunk(i), options, ¤t_chunk)); // Concatenate the result to make a single array for this chunk RETURN_NOT_OK( diff --git a/cpp/src/arrow/compute/kernels/take.h b/cpp/src/arrow/compute/kernels/take.h index b9c11beed2d..26302b3bc5c 100644 --- a/cpp/src/arrow/compute/kernels/take.h +++ b/cpp/src/arrow/compute/kernels/take.h @@ -69,6 +69,7 @@ Status Take(FunctionContext* ctx, const Array& values, const Array& indices, /// \param[in] indices which values to take /// \param[in] options options /// \param[out] out resulting chunked array +/// NOTE: Experimental API ARROW_EXPORT Status Take(FunctionContext* ctx, const ChunkedArray& values, const Array& indices, const TakeOptions& options, std::shared_ptr* out); @@ -90,6 +91,7 @@ Status Take(FunctionContext* ctx, const ChunkedArray& values, const Array& indic /// \param[in] indices which values to take /// \param[in] options options /// \param[out] out resulting chunked array +/// NOTE: Experimental API ARROW_EXPORT Status Take(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray& indices, const TakeOptions& options, std::shared_ptr* out); @@ -111,6 +113,7 @@ Status Take(FunctionContext* ctx, const ChunkedArray& values, const ChunkedArray /// \param[in] indices which values to take /// \param[in] options options /// \param[out] out resulting chunked array +/// NOTE: Experimental API ARROW_EXPORT Status Take(FunctionContext* ctx, const Array& values, const ChunkedArray& indices, const TakeOptions& options, std::shared_ptr* out); @@ -126,6 +129,7 @@ Status Take(FunctionContext* ctx, const Array& values, const ChunkedArray& indic /// \param[in] indices which values to take /// \param[in] options options /// \param[out] out resulting record batch +/// NOTE: Experimental API ARROW_EXPORT Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices, const TakeOptions& options, std::shared_ptr* out); @@ -141,6 +145,7 @@ Status Take(FunctionContext* ctx, const RecordBatch& batch, const Array& indices /// \param[in] indices which values to take /// \param[in] options options /// \param[out] out resulting table +/// NOTE: Experimental API ARROW_EXPORT Status Take(FunctionContext* ctx, const Table& table, const Array& indices, const TakeOptions& options, std::shared_ptr
* out); @@ -156,6 +161,7 @@ Status Take(FunctionContext* ctx, const Table& table, const Array& indices, /// \param[in] indices which values to take /// \param[in] options options /// \param[out] out resulting table +/// NOTE: Experimental API ARROW_EXPORT Status Take(FunctionContext* ctx, const Table& table, const ChunkedArray& indices, const TakeOptions& options, std::shared_ptr
* out);