From 7766040810820ee9701e439cd82659c2eac7f957 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 30 Apr 2021 11:14:13 -0400 Subject: [PATCH 1/4] ARROW-12614: [C++][Compute] Support Tables in ExecuteScalarExpression --- cpp/src/arrow/compute/exec/expression.cc | 17 ++++--- cpp/src/arrow/compute/exec/expression_test.cc | 46 ++++++++++++++++++- cpp/src/arrow/dataset/dataset_internal.h | 5 -- cpp/src/arrow/dataset/test_util.h | 2 +- cpp/src/arrow/util/iterator.h | 6 +++ 5 files changed, 62 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/compute/exec/expression.cc b/cpp/src/arrow/compute/exec/expression.cc index 1f819cf3d04..0fddf2d0108 100644 --- a/cpp/src/arrow/compute/exec/expression.cc +++ b/cpp/src/arrow/compute/exec/expression.cc @@ -28,6 +28,7 @@ #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/util/atomic_shared_ptr.h" +#include "arrow/util/iterator.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" #include "arrow/util/optional.h" @@ -527,20 +528,22 @@ Result ExecuteScalarExpression(const Expression& expr, const Datum& input } if (input.kind() == Datum::TABLE) { - TableBatchReader reader(*input.table()); - std::shared_ptr batch; + ArrayVector chunks; + + for (auto maybe_batch : + MakeIteratorFromReader(std::make_shared(*input.table()))) { + ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); - while (true) { - RETURN_NOT_OK(reader.ReadNext(&batch)); - if (batch != nullptr) { - break; - } ARROW_ASSIGN_OR_RAISE(Datum res, ExecuteScalarExpression(expr, batch)); if (res.is_scalar()) { ARROW_ASSIGN_OR_RAISE(res, MakeArrayFromScalar(*res.scalar(), batch->num_rows(), exec_context->memory_pool())); } + + chunks.push_back(res.make_array()); } + + return ChunkedArray::Make(std::move(chunks), expr.type()); } if (auto lit = expr.literal()) return *lit; diff --git a/cpp/src/arrow/compute/exec/expression_test.cc b/cpp/src/arrow/compute/exec/expression_test.cc index ab3fbb4d196..e52bed6b50a 100644 --- a/cpp/src/arrow/compute/exec/expression_test.cc +++ b/cpp/src/arrow/compute/exec/expression_test.cc @@ -29,6 +29,7 @@ #include "arrow/compute/exec/expression_internal.h" #include "arrow/compute/registry.h" #include "arrow/testing/gtest_util.h" +#include "arrow/util/iterator.h" using testing::HasSubstr; using testing::UnorderedElementsAreArray; @@ -172,6 +173,7 @@ TEST(Expression, ToString) { EXPECT_EQ(literal("a").ToString(), "\"a\""); EXPECT_EQ(literal("a\nb").ToString(), "\"a\\nb\""); EXPECT_EQ(literal(std::make_shared()).ToString(), "null"); + EXPECT_EQ(literal(std::make_shared()).ToString(), "null"); EXPECT_EQ(literal(std::make_shared(Buffer::FromString("az"))).ToString(), "\"617A\""); @@ -520,6 +522,24 @@ Result NaiveExecuteScalarExpression(const Expression& expr, const Datum& return ExecuteScalarExpression(expr, input); } + if (input.kind() == Datum::TABLE) { + ArrayVector chunks; + + for (auto maybe_batch : + MakeIteratorFromReader(std::make_shared(*input.table()))) { + ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); + + ARROW_ASSIGN_OR_RAISE(Datum res, NaiveExecuteScalarExpression(expr, batch)); + if (res.is_scalar()) { + ARROW_ASSIGN_OR_RAISE(res, MakeArrayFromScalar(*res.scalar(), batch->num_rows())); + } + + chunks.push_back(res.make_array()); + } + + return ChunkedArray::Make(std::move(chunks), expr.type()); + } + std::vector arguments(call->arguments.size()); for (size_t i = 0; i < arguments.size(); ++i) { ARROW_ASSIGN_OR_RAISE(arguments[i], @@ -540,7 +560,7 @@ void ExpectExecute(Expression expr, Datum in, Datum* actual_out = NULLPTR) { if (in.is_value()) { ASSERT_OK_AND_ASSIGN(expr, expr.Bind(in.descr())); } else { - ASSERT_OK_AND_ASSIGN(expr, expr.Bind(*in.record_batch()->schema())); + ASSERT_OK_AND_ASSIGN(expr, expr.Bind(*in.schema())); } ASSERT_OK_AND_ASSIGN(Datum actual, ExecuteScalarExpression(expr, in)); @@ -586,6 +606,30 @@ TEST(Expression, ExecuteCall) { ])")); } +TEST(Expression, ExecuteAgainstTable) { + ExpectExecute(field_ref("i32"), TableFromJSON(kBoringSchema, {})); + + ExpectExecute(field_ref("i32"), TableFromJSON(kBoringSchema, {R"([ + {"i32": 32}, + {"i32": 52} + ])", + R"([ + {"i32": 71} + ])"})); + + ExpectExecute(call("add", {field_ref("f32"), literal(3.5)}), + TableFromJSON(kBoringSchema, {})); + + ExpectExecute(call("add", {field_ref("f32"), literal(3.5)}), + TableFromJSON(kBoringSchema, {R"([ + {"f32": -1.5}, + {"f32": 2.25} + ])", + R"([ + {"f32": 3.75} + ])"})); +} + TEST(Expression, ExecuteDictionaryTransparent) { ExpectExecute( equal(field_ref("a"), field_ref("b")), diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index 6527eac07dd..4336f9c157e 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -54,11 +54,6 @@ inline Result GetFragmentsFromDatasets(const DatasetVector& da return MakeFlattenIterator(std::move(fragments_it)); } -inline RecordBatchIterator IteratorFromReader( - const std::shared_ptr& reader) { - return MakeFunctionIterator([reader] { return reader->Next(); }); -} - inline std::shared_ptr SchemaFromColumnNames( const std::shared_ptr& input, const std::vector& column_names) { std::vector> columns; diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 83ae4bbf1e8..42c544dd93e 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -1161,7 +1161,7 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { std::shared_ptr actual_struct; for (auto maybe_batch : - IteratorFromReader(std::make_shared(*actual_table))) { + MakeIteratorFromReader(std::make_shared(*actual_table))) { ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); ASSERT_OK_AND_ASSIGN(actual_struct, batch->ToStructArray()); } diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 4d9e7b18290..b866c8425b7 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -560,4 +560,10 @@ Iterator MakeFlattenIterator(Iterator> it) { return Iterator(FlattenIterator(std::move(it))); } +template +auto MakeIteratorFromReader(const std::shared_ptr& reader) + -> IteratorNext())::ValueType> { + return MakeFunctionIterator([reader] { return reader->Next(); }); +} + } // namespace arrow From 6b0a5c787d9e09fd0a5a509b3dfa297fd196cad3 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 30 Apr 2021 15:21:57 -0400 Subject: [PATCH 2/4] remove table iteration from ExecuteScalarExpression --- cpp/src/arrow/compute/exec/expression.cc | 20 --------- cpp/src/arrow/compute/exec/expression_test.cc | 43 ------------------- 2 files changed, 63 deletions(-) diff --git a/cpp/src/arrow/compute/exec/expression.cc b/cpp/src/arrow/compute/exec/expression.cc index 0fddf2d0108..59def380db5 100644 --- a/cpp/src/arrow/compute/exec/expression.cc +++ b/cpp/src/arrow/compute/exec/expression.cc @@ -28,7 +28,6 @@ #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/util/atomic_shared_ptr.h" -#include "arrow/util/iterator.h" #include "arrow/util/key_value_metadata.h" #include "arrow/util/logging.h" #include "arrow/util/optional.h" @@ -527,25 +526,6 @@ Result ExecuteScalarExpression(const Expression& expr, const Datum& input "ExecuteScalarExpression cannot Execute non-scalar expression ", expr.ToString()); } - if (input.kind() == Datum::TABLE) { - ArrayVector chunks; - - for (auto maybe_batch : - MakeIteratorFromReader(std::make_shared(*input.table()))) { - ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); - - ARROW_ASSIGN_OR_RAISE(Datum res, ExecuteScalarExpression(expr, batch)); - if (res.is_scalar()) { - ARROW_ASSIGN_OR_RAISE(res, MakeArrayFromScalar(*res.scalar(), batch->num_rows(), - exec_context->memory_pool())); - } - - chunks.push_back(res.make_array()); - } - - return ChunkedArray::Make(std::move(chunks), expr.type()); - } - if (auto lit = expr.literal()) return *lit; if (auto ref = expr.field_ref()) { diff --git a/cpp/src/arrow/compute/exec/expression_test.cc b/cpp/src/arrow/compute/exec/expression_test.cc index e52bed6b50a..e8b8fb31cd8 100644 --- a/cpp/src/arrow/compute/exec/expression_test.cc +++ b/cpp/src/arrow/compute/exec/expression_test.cc @@ -29,7 +29,6 @@ #include "arrow/compute/exec/expression_internal.h" #include "arrow/compute/registry.h" #include "arrow/testing/gtest_util.h" -#include "arrow/util/iterator.h" using testing::HasSubstr; using testing::UnorderedElementsAreArray; @@ -522,24 +521,6 @@ Result NaiveExecuteScalarExpression(const Expression& expr, const Datum& return ExecuteScalarExpression(expr, input); } - if (input.kind() == Datum::TABLE) { - ArrayVector chunks; - - for (auto maybe_batch : - MakeIteratorFromReader(std::make_shared(*input.table()))) { - ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); - - ARROW_ASSIGN_OR_RAISE(Datum res, NaiveExecuteScalarExpression(expr, batch)); - if (res.is_scalar()) { - ARROW_ASSIGN_OR_RAISE(res, MakeArrayFromScalar(*res.scalar(), batch->num_rows())); - } - - chunks.push_back(res.make_array()); - } - - return ChunkedArray::Make(std::move(chunks), expr.type()); - } - std::vector arguments(call->arguments.size()); for (size_t i = 0; i < arguments.size(); ++i) { ARROW_ASSIGN_OR_RAISE(arguments[i], @@ -606,30 +587,6 @@ TEST(Expression, ExecuteCall) { ])")); } -TEST(Expression, ExecuteAgainstTable) { - ExpectExecute(field_ref("i32"), TableFromJSON(kBoringSchema, {})); - - ExpectExecute(field_ref("i32"), TableFromJSON(kBoringSchema, {R"([ - {"i32": 32}, - {"i32": 52} - ])", - R"([ - {"i32": 71} - ])"})); - - ExpectExecute(call("add", {field_ref("f32"), literal(3.5)}), - TableFromJSON(kBoringSchema, {})); - - ExpectExecute(call("add", {field_ref("f32"), literal(3.5)}), - TableFromJSON(kBoringSchema, {R"([ - {"f32": -1.5}, - {"f32": 2.25} - ])", - R"([ - {"f32": 3.75} - ])"})); -} - TEST(Expression, ExecuteDictionaryTransparent) { ExpectExecute( equal(field_ref("a"), field_ref("b")), From b4a5e08afb110b366eb81c6ea4d0df59501fe852 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 3 May 2021 11:05:37 -0400 Subject: [PATCH 3/4] msvc: syntax fix --- cpp/src/arrow/util/iterator.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index b866c8425b7..13b118eccf3 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -562,7 +562,7 @@ Iterator MakeFlattenIterator(Iterator> it) { template auto MakeIteratorFromReader(const std::shared_ptr& reader) - -> IteratorNext())::ValueType> { + -> Iterator().Next())::ValueType> { return MakeFunctionIterator([reader] { return reader->Next(); }); } From 6756eff7ffef0c898d9932da8dcd49dcc1ffed9e Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 3 May 2021 12:31:30 -0400 Subject: [PATCH 4/4] msvc: syntax fix? --- cpp/src/arrow/record_batch.h | 2 ++ cpp/src/arrow/util/iterator.h | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index 59c6d5568e9..a75dd043e5d 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -199,6 +199,8 @@ class ARROW_EXPORT RecordBatch { /// \brief Abstract interface for reading stream of record batches class ARROW_EXPORT RecordBatchReader { public: + using ValueType = std::shared_ptr; + virtual ~RecordBatchReader() = default; /// \return the shared schema of the record batches in the stream diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 13b118eccf3..b82021e4b21 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -561,8 +561,8 @@ Iterator MakeFlattenIterator(Iterator> it) { } template -auto MakeIteratorFromReader(const std::shared_ptr& reader) - -> Iterator().Next())::ValueType> { +Iterator MakeIteratorFromReader( + const std::shared_ptr& reader) { return MakeFunctionIterator([reader] { return reader->Next(); }); }