From 936bef14e369b4cad7ff5bccc82f0aa27d890356 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 22 Apr 2024 17:13:58 +0800 Subject: [PATCH 01/20] WIP --- cpp/src/arrow/acero/CMakeLists.txt | 1 + cpp/src/arrow/acero/query_context.cc | 27 +++++++++++- cpp/src/arrow/acero/query_context.h | 9 ++++ cpp/src/arrow/acero/query_context_test.cc | 51 +++++++++++++++++++++++ 4 files changed, 86 insertions(+), 2 deletions(-) create mode 100644 cpp/src/arrow/acero/query_context_test.cc diff --git a/cpp/src/arrow/acero/CMakeLists.txt b/cpp/src/arrow/acero/CMakeLists.txt index 31ed4a6a69b..f0ae9668cd3 100644 --- a/cpp/src/arrow/acero/CMakeLists.txt +++ b/cpp/src/arrow/acero/CMakeLists.txt @@ -186,6 +186,7 @@ add_arrow_acero_test(union_node_test SOURCES union_node_test.cc) add_arrow_acero_test(aggregate_node_test SOURCES aggregate_node_test.cc) add_arrow_acero_test(util_test SOURCES util_test.cc task_util_test.cc) add_arrow_acero_test(hash_aggregate_test SOURCES hash_aggregate_test.cc) +add_arrow_acero_test(internal_test SOURCES query_context_test.cc) if(ARROW_BUILD_BENCHMARKS) function(add_arrow_acero_benchmark REL_BENCHMARK_NAME) diff --git a/cpp/src/arrow/acero/query_context.cc b/cpp/src/arrow/acero/query_context.cc index a27397d1207..4006a40a77a 100644 --- a/cpp/src/arrow/acero/query_context.cc +++ b/cpp/src/arrow/acero/query_context.cc @@ -23,6 +23,29 @@ namespace arrow { using arrow::internal::CpuInfo; namespace acero { +namespace internal { + +int64_t GetTempStackSizeFromEnvVar() { + auto maybe_env_value = arrow::internal::GetEnvVar(kTempStackSizeEnvVar); + if (!maybe_env_value.ok()) { + return kDefaultTempStackSize; + } + auto env_value = *std::move(maybe_env_value); + if (env_value.empty()) { + return kDefaultTempStackSize; + } + + int64_t temp_stack_size = std::atoll(env_value.c_str()); + if (temp_stack_size <= 0) { + ARROW_LOG(WARNING) << "Invalid temp stack size provided in " << kTempStackSizeEnvVar + << ". Using default temp stack size: " << kDefaultTempStackSize; + return kDefaultTempStackSize; + } + return temp_stack_size; +} + +} // namespace internal + namespace { io::IOContext GetIoContext(const QueryOptions& opts, const ExecContext& exec_context) { if (opts.custom_io_executor == nullptr) { @@ -51,9 +74,9 @@ size_t QueryContext::GetThreadIndex() { return thread_indexer_(); } size_t QueryContext::max_concurrency() const { return thread_indexer_.Capacity(); } Result QueryContext::GetTempStack(size_t thread_index) { + static const int64_t temp_stack_size = internal::GetTempStackSizeFromEnvVar(); if (!tld_[thread_index].is_init) { - RETURN_NOT_OK(tld_[thread_index].stack.Init( - memory_pool(), 32 * util::MiniBatch::kMiniBatchLength * sizeof(uint64_t))); + RETURN_NOT_OK(tld_[thread_index].stack.Init(memory_pool(), temp_stack_size)); tld_[thread_index].is_init = true; } return &tld_[thread_index].stack; diff --git a/cpp/src/arrow/acero/query_context.h b/cpp/src/arrow/acero/query_context.h index 9ea11679cba..2c8d4533469 100644 --- a/cpp/src/arrow/acero/query_context.h +++ b/cpp/src/arrow/acero/query_context.h @@ -33,6 +33,15 @@ using io::IOContext; namespace acero { +namespace internal { + +constexpr char kTempStackSizeEnvVar[] = "ACERO_TEMP_STACK_SIZE"; +constexpr int64_t kDefaultTempStackSize = + 32 * arrow::util::MiniBatch::kMiniBatchLength * sizeof(uint64_t); +int64_t GetTempStackSizeFromEnvVar(); + +} // namespace internal + class ARROW_ACERO_EXPORT QueryContext { public: QueryContext(QueryOptions opts = {}, diff --git a/cpp/src/arrow/acero/query_context_test.cc b/cpp/src/arrow/acero/query_context_test.cc new file mode 100644 index 00000000000..2266e9778cb --- /dev/null +++ b/cpp/src/arrow/acero/query_context_test.cc @@ -0,0 +1,51 @@ +#include + +#include "arrow/acero/query_context.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/io_util.h" + +namespace arrow { +namespace acero { + +TEST(TestTempStack, TestGetTempStackSizeFromEnvVar) { + // Not set. + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); + + // Empty. + ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, "")); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); + + // Non-number. + ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, "invalid")); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); + + // Valid positive number. + ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, "42")); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), 42); + + // Int64 max. + { + auto str = std::to_string(std::numeric_limits::max()); + ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, str)); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), + std::numeric_limits::max()); + } + + // Over int64 max. + { + auto str = std::to_string(std::numeric_limits::max()) + "0"; + ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, str)); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); + } + + // Zero. + ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, "0")); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); + + // Negative number. + ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, "-1")); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); +} + +} // namespace acero +} // namespace arrow \ No newline at end of file From 7d13d51dffdf27c51e3efcede6f6b14d9a86e749 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 22 Apr 2024 17:49:20 +0800 Subject: [PATCH 02/20] Fix tests --- cpp/src/arrow/acero/query_context_test.cc | 47 +++++++++++++++-------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/acero/query_context_test.cc b/cpp/src/arrow/acero/query_context_test.cc index 2266e9778cb..b4ae84e4460 100644 --- a/cpp/src/arrow/acero/query_context_test.cc +++ b/cpp/src/arrow/acero/query_context_test.cc @@ -8,43 +8,58 @@ namespace arrow { namespace acero { TEST(TestTempStack, TestGetTempStackSizeFromEnvVar) { + // Uncleared env var may have side-effect to subsequent tests. Use a structure to help + // clearing the env var when leaving the scope. + struct ScopedEnvVar { + ScopedEnvVar(const char* name, const char* value) : name_(std::move(name)) { + ARROW_CHECK_OK(::arrow::internal::SetEnvVar(name_, value)); + } + ~ScopedEnvVar() { ARROW_CHECK_OK(::arrow::internal::DelEnvVar(name_)); } + + private: + const char* name_; + }; + // Not set. ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); // Empty. - ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, "")); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); + { + ScopedEnvVar env(internal::kTempStackSizeEnvVar, ""); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); + } // Non-number. - ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, "invalid")); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); + { + ScopedEnvVar env(internal::kTempStackSizeEnvVar, "invalid"); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); + } // Valid positive number. - ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, "42")); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), 42); + { + ScopedEnvVar env(internal::kTempStackSizeEnvVar, "42"); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), 42); + } // Int64 max. { auto str = std::to_string(std::numeric_limits::max()); - ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, str)); + ScopedEnvVar env(internal::kTempStackSizeEnvVar, str.c_str()); ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), std::numeric_limits::max()); } - // Over int64 max. + // Zero. { - auto str = std::to_string(std::numeric_limits::max()) + "0"; - ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, str)); + ScopedEnvVar env(internal::kTempStackSizeEnvVar, "0"); ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); } - // Zero. - ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, "0")); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); - // Negative number. - ASSERT_OK(::arrow::internal::SetEnvVar(internal::kTempStackSizeEnvVar, "-1")); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); + { + ScopedEnvVar env(internal::kTempStackSizeEnvVar, "-1"); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); + } } } // namespace acero From 500c81eea7bb2037d19d668e2ee08f6ccb9735c8 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 22 Apr 2024 18:29:41 +0800 Subject: [PATCH 03/20] Fix --- cpp/src/arrow/acero/asof_join_node_test.cc | 4 ++-- cpp/src/arrow/acero/query_context.cc | 11 +++++++++-- cpp/src/arrow/acero/query_context_test.cc | 15 ++++++++++++++- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index d95d2aaad36..e50d4bfdf40 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1503,8 +1503,8 @@ void TestBackpressure(BatchesMaker maker, int batch_size, int num_l_batches, Declaration asofjoin = {"asofjoin", bp_decls, opts}; - ASSERT_OK_AND_ASSIGN(std::shared_ptr tpool, - internal::ThreadPool::Make(1)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<::arrow::internal::ThreadPool> tpool, + ::arrow::internal::ThreadPool::Make(1)); ExecContext exec_ctx(default_memory_pool(), tpool.get()); Future batches_fut = DeclarationToExecBatchesAsync(asofjoin, exec_ctx); diff --git a/cpp/src/arrow/acero/query_context.cc b/cpp/src/arrow/acero/query_context.cc index 4006a40a77a..4dae09f190b 100644 --- a/cpp/src/arrow/acero/query_context.cc +++ b/cpp/src/arrow/acero/query_context.cc @@ -35,8 +35,15 @@ int64_t GetTempStackSizeFromEnvVar() { return kDefaultTempStackSize; } - int64_t temp_stack_size = std::atoll(env_value.c_str()); - if (temp_stack_size <= 0) { + int64_t temp_stack_size = 0; + size_t length = 0; + bool exception = false; + try { + temp_stack_size = std::stoll(env_value.c_str(), &length); + } catch (const std::exception&) { + exception = true; + } + if (length != env_value.length() || exception || temp_stack_size <= 0) { ARROW_LOG(WARNING) << "Invalid temp stack size provided in " << kTempStackSizeEnvVar << ". Using default temp stack size: " << kDefaultTempStackSize; return kDefaultTempStackSize; diff --git a/cpp/src/arrow/acero/query_context_test.cc b/cpp/src/arrow/acero/query_context_test.cc index b4ae84e4460..18dd2ea2828 100644 --- a/cpp/src/arrow/acero/query_context_test.cc +++ b/cpp/src/arrow/acero/query_context_test.cc @@ -7,7 +7,7 @@ namespace arrow { namespace acero { -TEST(TestTempStack, TestGetTempStackSizeFromEnvVar) { +TEST(TestTempStack, GetTempStackSizeFromEnvVar) { // Uncleared env var may have side-effect to subsequent tests. Use a structure to help // clearing the env var when leaving the scope. struct ScopedEnvVar { @@ -35,6 +35,12 @@ TEST(TestTempStack, TestGetTempStackSizeFromEnvVar) { ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); } + // Number with invalid suffix. + { + ScopedEnvVar env(internal::kTempStackSizeEnvVar, "42MB"); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); + } + // Valid positive number. { ScopedEnvVar env(internal::kTempStackSizeEnvVar, "42"); @@ -60,6 +66,13 @@ TEST(TestTempStack, TestGetTempStackSizeFromEnvVar) { ScopedEnvVar env(internal::kTempStackSizeEnvVar, "-1"); ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); } + + // Over int64 max. + { + auto str = std::to_string(std::numeric_limits::max()) + "0"; + ScopedEnvVar env(internal::kTempStackSizeEnvVar, str.c_str()); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); + } } } // namespace acero From 19d0246e1bac9b0c9aa6fe77160f057885e61b5b Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 22 Apr 2024 19:09:27 +0800 Subject: [PATCH 04/20] Add doc --- docs/source/cpp/env_vars.rst | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/source/cpp/env_vars.rst b/docs/source/cpp/env_vars.rst index 0a082b0a5d8..5296a583591 100644 --- a/docs/source/cpp/env_vars.rst +++ b/docs/source/cpp/env_vars.rst @@ -49,6 +49,20 @@ that changing their value later will have an effect. option but this will have a significant performance impact as the buffer will need to be copied. +.. envvar:: ACERO_TEMP_STACK_SIZE + + Arrow C++'s Acero module performs computation on streams of data. This + computation uses a thread-local temporary stack to store intermediate results to + reduce the overhead of memory allocation. As the complexity of the computation + increases, such as when joining multiple tables, a larger temporary stack is required. + If the stack is undersized, computations may fail with error: "TempVectorStack::alloc overflow". + +This environment variable allows users to adjust the stack size. The value of the variable specifies the stack size in bytes. + + The default stack size is 256 KB (262144 bytes). The value of this + environment variable should be a positive integer and should not exceed the maximum value of int64. + Otherwise the default is used. + .. envvar:: ARROW_DEBUG_MEMORY_POOL Enable rudimentary memory checks to guard against buffer overflows. From f8fc7f34f9deb34556f32802313038a18a0bc09f Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 22 Apr 2024 20:04:46 +0800 Subject: [PATCH 05/20] Update doc --- docs/source/cpp/env_vars.rst | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/docs/source/cpp/env_vars.rst b/docs/source/cpp/env_vars.rst index 5296a583591..65709e70256 100644 --- a/docs/source/cpp/env_vars.rst +++ b/docs/source/cpp/env_vars.rst @@ -52,16 +52,18 @@ that changing their value later will have an effect. .. envvar:: ACERO_TEMP_STACK_SIZE Arrow C++'s Acero module performs computation on streams of data. This - computation uses a thread-local temporary stack to store intermediate results to - reduce the overhead of memory allocation. As the complexity of the computation - increases, such as when joining multiple tables, a larger temporary stack is required. - If the stack is undersized, computations may fail with error: "TempVectorStack::alloc overflow". - -This environment variable allows users to adjust the stack size. The value of the variable specifies the stack size in bytes. + computation uses a thread-local temporary stack to store intermediate + results to reduce the overhead of memory allocation. As the complexity + of the computation increases, such as when joining multiple tables, a + larger temporary stack is required. If the stack is undersized, + computations may fail with error: "TempVectorStack::alloc overflow". + + This environment variable allows users to adjust the stack size. The + value of the variable specifies the stack size in bytes. - The default stack size is 256 KB (262144 bytes). The value of this - environment variable should be a positive integer and should not exceed the maximum value of int64. - Otherwise the default is used. + The default stack size is 262144 bytes (256 KB). The value of this + environment variable should be a positive integer and should not exceed + the maximum value of int64. Otherwise the default value is used. .. envvar:: ARROW_DEBUG_MEMORY_POOL From 0e756d8721628769e7e77d57ef2c895694a5e67b Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 22 Apr 2024 20:49:54 +0800 Subject: [PATCH 06/20] Fix link on windows --- cpp/src/arrow/acero/query_context.h | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/acero/query_context.h b/cpp/src/arrow/acero/query_context.h index 2c8d4533469..637f1170b81 100644 --- a/cpp/src/arrow/acero/query_context.h +++ b/cpp/src/arrow/acero/query_context.h @@ -38,6 +38,7 @@ namespace internal { constexpr char kTempStackSizeEnvVar[] = "ACERO_TEMP_STACK_SIZE"; constexpr int64_t kDefaultTempStackSize = 32 * arrow::util::MiniBatch::kMiniBatchLength * sizeof(uint64_t); +ARROW_ACERO_EXPORT int64_t GetTempStackSizeFromEnvVar(); } // namespace internal From a48ed90ff5c8a00a07466b8079e599621c460631 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Mon, 22 Apr 2024 21:00:49 +0800 Subject: [PATCH 07/20] Fix lint --- cpp/src/arrow/acero/query_context_test.cc | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/query_context_test.cc b/cpp/src/arrow/acero/query_context_test.cc index 18dd2ea2828..6ad744140c6 100644 --- a/cpp/src/arrow/acero/query_context_test.cc +++ b/cpp/src/arrow/acero/query_context_test.cc @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + #include #include "arrow/acero/query_context.h" @@ -76,4 +93,4 @@ TEST(TestTempStack, GetTempStackSizeFromEnvVar) { } } // namespace acero -} // namespace arrow \ No newline at end of file +} // namespace arrow From 5ff1b2949b8d786a22306b6b3c0bfdebd99d0ed5 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Wed, 24 Apr 2024 23:42:26 +0800 Subject: [PATCH 08/20] Address comment --- cpp/src/arrow/acero/query_context.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/acero/query_context.cc b/cpp/src/arrow/acero/query_context.cc index 4dae09f190b..2065d2f4b91 100644 --- a/cpp/src/arrow/acero/query_context.cc +++ b/cpp/src/arrow/acero/query_context.cc @@ -30,7 +30,7 @@ int64_t GetTempStackSizeFromEnvVar() { if (!maybe_env_value.ok()) { return kDefaultTempStackSize; } - auto env_value = *std::move(maybe_env_value); + std::string env_value = std::move(maybe_env_value).ValueUnsafe(); if (env_value.empty()) { return kDefaultTempStackSize; } @@ -81,8 +81,8 @@ size_t QueryContext::GetThreadIndex() { return thread_indexer_(); } size_t QueryContext::max_concurrency() const { return thread_indexer_.Capacity(); } Result QueryContext::GetTempStack(size_t thread_index) { - static const int64_t temp_stack_size = internal::GetTempStackSizeFromEnvVar(); - if (!tld_[thread_index].is_init) { + if (ARROW_PREDICT_FALSE(!tld_[thread_index].is_init)) { + static const int64_t temp_stack_size = internal::GetTempStackSizeFromEnvVar(); RETURN_NOT_OK(tld_[thread_index].stack.Init(memory_pool(), temp_stack_size)); tld_[thread_index].is_init = true; } From 8b1d9cd139bfd20c92681b19d72f5f027d9f6e90 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Fri, 26 Apr 2024 02:43:06 +0800 Subject: [PATCH 09/20] Update lower and upper bound of temp stack size --- cpp/src/arrow/acero/query_context.cc | 10 ++++++++++ cpp/src/arrow/acero/query_context.h | 10 ++++++++-- cpp/src/arrow/acero/query_context_test.cc | 20 ++++++++++++-------- docs/source/cpp/env_vars.rst | 8 ++++++-- 4 files changed, 36 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/acero/query_context.cc b/cpp/src/arrow/acero/query_context.cc index 2065d2f4b91..f9a184ffdf5 100644 --- a/cpp/src/arrow/acero/query_context.cc +++ b/cpp/src/arrow/acero/query_context.cc @@ -47,6 +47,16 @@ int64_t GetTempStackSizeFromEnvVar() { ARROW_LOG(WARNING) << "Invalid temp stack size provided in " << kTempStackSizeEnvVar << ". Using default temp stack size: " << kDefaultTempStackSize; return kDefaultTempStackSize; + } else if (temp_stack_size < kMinTempStackSize) { + ARROW_LOG(WARNING) << "Temp stack size provided in " << kTempStackSizeEnvVar + << " is too small. Using minimal temp stack size: " + << kMinTempStackSize; + return kMinTempStackSize; + } else if (temp_stack_size > kMaxTempStackSize) { + ARROW_LOG(WARNING) << "Temp stack size provided in " << kTempStackSizeEnvVar + << " is too big. Using maximal temp stack size: " + << kMaxTempStackSize; + return kMaxTempStackSize; } return temp_stack_size; } diff --git a/cpp/src/arrow/acero/query_context.h b/cpp/src/arrow/acero/query_context.h index 637f1170b81..c1c4555f562 100644 --- a/cpp/src/arrow/acero/query_context.h +++ b/cpp/src/arrow/acero/query_context.h @@ -36,8 +36,14 @@ namespace acero { namespace internal { constexpr char kTempStackSizeEnvVar[] = "ACERO_TEMP_STACK_SIZE"; -constexpr int64_t kDefaultTempStackSize = - 32 * arrow::util::MiniBatch::kMiniBatchLength * sizeof(uint64_t); +/// Default 256KB temp stack size to be fairly sufficient for most common cases. +constexpr int64_t kDefaultTempStackSize = 256 * arrow::util::MiniBatch::kMiniBatchLength; +/// Minimal 64KB temp stack size to be barely useful. +constexpr int64_t kMinTempStackSize = 64 * arrow::util::MiniBatch::kMiniBatchLength; +/// Maximal 64MB temp stack size, which should be sufficient for any case that is not +/// insane, and the stack init overhead is acceptable. +constexpr int64_t kMaxTempStackSize = + 64 * 1024 * arrow::util::MiniBatch::kMiniBatchLength; ARROW_ACERO_EXPORT int64_t GetTempStackSizeFromEnvVar(); diff --git a/cpp/src/arrow/acero/query_context_test.cc b/cpp/src/arrow/acero/query_context_test.cc index 6ad744140c6..33717701141 100644 --- a/cpp/src/arrow/acero/query_context_test.cc +++ b/cpp/src/arrow/acero/query_context_test.cc @@ -58,18 +58,22 @@ TEST(TestTempStack, GetTempStackSizeFromEnvVar) { ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); } - // Valid positive number. + // Smaller than minimal temp stack size. { - ScopedEnvVar env(internal::kTempStackSizeEnvVar, "42"); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), 42); + ScopedEnvVar env(internal::kTempStackSizeEnvVar, "65535"); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kMinTempStackSize); } - // Int64 max. + // Between minimal and maximal temp stack size. { - auto str = std::to_string(std::numeric_limits::max()); - ScopedEnvVar env(internal::kTempStackSizeEnvVar, str.c_str()); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), - std::numeric_limits::max()); + ScopedEnvVar env(internal::kTempStackSizeEnvVar, "65537"); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), 65537); + } + + // Bigger than maximal temp stack size. + { + ScopedEnvVar env(internal::kTempStackSizeEnvVar, "67108865"); + ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kMaxTempStackSize); } // Zero. diff --git a/docs/source/cpp/env_vars.rst b/docs/source/cpp/env_vars.rst index 65709e70256..f5c58a998fa 100644 --- a/docs/source/cpp/env_vars.rst +++ b/docs/source/cpp/env_vars.rst @@ -62,8 +62,12 @@ that changing their value later will have an effect. value of the variable specifies the stack size in bytes. The default stack size is 262144 bytes (256 KB). The value of this - environment variable should be a positive integer and should not exceed - the maximum value of int64. Otherwise the default value is used. + environment variable should be a positive integer within the inclusive + range between 65536 (64 KB) and 67108864 (64 MB). If the value is a + valid positive integer within the int64 range but exceeds the specified + minimum or maximum stack size limits, the nearest boundary value is used. + If the input is a non-positive integer, an invalid number, or exceeds the + int64 capacity, the default stack size is used. .. envvar:: ARROW_DEBUG_MEMORY_POOL From ccc33662565f55139600bb2d281c4d2dcd1d956b Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Fri, 26 Apr 2024 22:58:37 +0800 Subject: [PATCH 10/20] Use arrow internal int parser instead of std --- cpp/src/arrow/acero/query_context.cc | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/acero/query_context.cc b/cpp/src/arrow/acero/query_context.cc index f9a184ffdf5..ab35c23b379 100644 --- a/cpp/src/arrow/acero/query_context.cc +++ b/cpp/src/arrow/acero/query_context.cc @@ -18,6 +18,7 @@ #include "arrow/acero/query_context.h" #include "arrow/util/cpu_info.h" #include "arrow/util/io_util.h" +#include "arrow/util/value_parsing.h" namespace arrow { using arrow::internal::CpuInfo; @@ -36,14 +37,9 @@ int64_t GetTempStackSizeFromEnvVar() { } int64_t temp_stack_size = 0; - size_t length = 0; - bool exception = false; - try { - temp_stack_size = std::stoll(env_value.c_str(), &length); - } catch (const std::exception&) { - exception = true; - } - if (length != env_value.length() || exception || temp_stack_size <= 0) { + bool ok = ::arrow::internal::ParseValue(env_value.c_str(), env_value.size(), + &temp_stack_size); + if (!ok || temp_stack_size <= 0) { ARROW_LOG(WARNING) << "Invalid temp stack size provided in " << kTempStackSizeEnvVar << ". Using default temp stack size: " << kDefaultTempStackSize; return kDefaultTempStackSize; From a15b6de9d8d7ff4eb7a98f57e210959f7ec5d993 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Wed, 8 May 2024 00:57:33 +0800 Subject: [PATCH 11/20] Use per-node stack wip --- cpp/src/arrow/acero/CMakeLists.txt | 1 - cpp/src/arrow/acero/asof_join_node_test.cc | 4 +- cpp/src/arrow/acero/exec_plan.cc | 2 +- cpp/src/arrow/acero/hash_join_node.cc | 37 ++++--- cpp/src/arrow/acero/hash_join_node_test.cc | 6 ++ cpp/src/arrow/acero/query_context.cc | 48 +-------- cpp/src/arrow/acero/query_context.h | 24 +---- cpp/src/arrow/acero/query_context_test.cc | 100 ------------------- cpp/src/arrow/acero/swiss_join.cc | 15 ++- cpp/src/arrow/compute/key_hash_internal.h | 9 ++ cpp/src/arrow/compute/key_hash_test.cc | 6 ++ cpp/src/arrow/compute/row/compare_internal.h | 6 ++ cpp/src/arrow/compute/row/compare_test.cc | 6 ++ cpp/src/arrow/compute/util.cc | 8 +- cpp/src/arrow/compute/util.h | 4 + docs/source/cpp/env_vars.rst | 20 ---- 16 files changed, 80 insertions(+), 216 deletions(-) delete mode 100644 cpp/src/arrow/acero/query_context_test.cc diff --git a/cpp/src/arrow/acero/CMakeLists.txt b/cpp/src/arrow/acero/CMakeLists.txt index f0ae9668cd3..31ed4a6a69b 100644 --- a/cpp/src/arrow/acero/CMakeLists.txt +++ b/cpp/src/arrow/acero/CMakeLists.txt @@ -186,7 +186,6 @@ add_arrow_acero_test(union_node_test SOURCES union_node_test.cc) add_arrow_acero_test(aggregate_node_test SOURCES aggregate_node_test.cc) add_arrow_acero_test(util_test SOURCES util_test.cc task_util_test.cc) add_arrow_acero_test(hash_aggregate_test SOURCES hash_aggregate_test.cc) -add_arrow_acero_test(internal_test SOURCES query_context_test.cc) if(ARROW_BUILD_BENCHMARKS) function(add_arrow_acero_benchmark REL_BENCHMARK_NAME) diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index e50d4bfdf40..d95d2aaad36 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -1503,8 +1503,8 @@ void TestBackpressure(BatchesMaker maker, int batch_size, int num_l_batches, Declaration asofjoin = {"asofjoin", bp_decls, opts}; - ASSERT_OK_AND_ASSIGN(std::shared_ptr<::arrow::internal::ThreadPool> tpool, - ::arrow::internal::ThreadPool::Make(1)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr tpool, + internal::ThreadPool::Make(1)); ExecContext exec_ctx(default_memory_pool(), tpool.get()); Future batches_fut = DeclarationToExecBatchesAsync(asofjoin, exec_ctx); diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc index 97119726d4b..d9fb1942fcc 100644 --- a/cpp/src/arrow/acero/exec_plan.cc +++ b/cpp/src/arrow/acero/exec_plan.cc @@ -128,7 +128,7 @@ struct ExecPlanImpl : public ExecPlan { Future<> scheduler_finished = arrow::util::AsyncTaskScheduler::Make( [this](arrow::util::AsyncTaskScheduler* async_scheduler) { QueryContext* ctx = query_context(); - RETURN_NOT_OK(ctx->Init(ctx->max_concurrency(), async_scheduler)); + RETURN_NOT_OK(ctx->Init(async_scheduler)); #ifdef ARROW_WITH_OPENTELEMETRY if (HasMetadata()) { diff --git a/cpp/src/arrow/acero/hash_join_node.cc b/cpp/src/arrow/acero/hash_join_node.cc index b49364300da..078af16046a 100644 --- a/cpp/src/arrow/acero/hash_join_node.cc +++ b/cpp/src/arrow/acero/hash_join_node.cc @@ -497,11 +497,11 @@ struct BloomFilterPushdownContext { using BuildFinishedCallback = std::function; using FiltersReceivedCallback = std::function; using FilterFinishedCallback = std::function; - void Init(HashJoinNode* owner, size_t num_threads, - RegisterTaskGroupCallback register_task_group_callback, - StartTaskGroupCallback start_task_group_callback, - FiltersReceivedCallback on_bloom_filters_received, bool disable_bloom_filter, - bool use_sync_execution); + Status Init(HashJoinNode* owner, size_t num_threads, + RegisterTaskGroupCallback register_task_group_callback, + StartTaskGroupCallback start_task_group_callback, + FiltersReceivedCallback on_bloom_filters_received, + bool disable_bloom_filter, bool use_sync_execution); Status StartProducing(size_t thread_index); @@ -559,8 +559,7 @@ struct BloomFilterPushdownContext { std::vector hashes(batch.length); std::vector bv(bit_vector_bytes); - ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * stack, - ctx_->GetTempStack(thread_index)); + arrow::util::TempVectorStack* stack = &tld_[thread_index].stack; // Start with full selection for the current batch memset(selected.data(), 0xff, bit_vector_bytes); @@ -654,7 +653,16 @@ struct BloomFilterPushdownContext { FiltersReceivedCallback all_received_callback_; FilterFinishedCallback on_finished_; } eval_; + + static constexpr auto kTempStackUsage = + Hashing32::kTempStackUsage + + (sizeof(uint32_t) + /*extra=*/1) * arrow::util::MiniBatch::kMiniBatchLength; + struct ThreadLocalData { + arrow::util::TempVectorStack stack; + }; + std::vector tld_; }; + bool HashJoinSchema::HasDictionaries() const { for (int side = 0; side <= 1; ++side) { for (int icol = 0; icol < proj_maps[side].num_cols(HashJoinProjection::INPUT); @@ -930,7 +938,7 @@ class HashJoinNode : public ExecNode, public TracedNode { // we will change it back to just the CPU's thread pool capacity. size_t num_threads = (GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1); - pushdown_context_.Init( + RETURN_NOT_OK(pushdown_context_.Init( this, num_threads, [ctx](std::function fn, std::function on_finished) { @@ -940,7 +948,7 @@ class HashJoinNode : public ExecNode, public TracedNode { return ctx->StartTaskGroup(task_group_id, num_tasks); }, [this](size_t thread_index) { return OnFiltersReceived(thread_index); }, - disable_bloom_filter_, use_sync_execution); + disable_bloom_filter_, use_sync_execution)); RETURN_NOT_OK(impl_->Init( ctx, join_type_, num_threads, &(schema_mgr_->proj_maps[0]), @@ -1037,7 +1045,7 @@ class HashJoinNode : public ExecNode, public TracedNode { BloomFilterPushdownContext pushdown_context_; }; -void BloomFilterPushdownContext::Init( +Status BloomFilterPushdownContext::Init( HashJoinNode* owner, size_t num_threads, RegisterTaskGroupCallback register_task_group_callback, StartTaskGroupCallback start_task_group_callback, @@ -1074,6 +1082,12 @@ void BloomFilterPushdownContext::Init( return eval_.on_finished_(thread_index, std::move(eval_.batches_)); }); start_task_group_callback_ = std::move(start_task_group_callback); + tld_.resize(num_threads); + for (auto& local_data : tld_) { + RETURN_NOT_OK(local_data.stack.Init(ctx_->memory_pool(), kTempStackUsage)); + } + + return Status::OK(); } Status BloomFilterPushdownContext::StartProducing(size_t thread_index) { @@ -1124,8 +1138,7 @@ Status BloomFilterPushdownContext::BuildBloomFilter_exec_task(size_t thread_inde } ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(key_columns))); - ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * stack, - ctx_->GetTempStack(thread_index)); + arrow::util::TempVectorStack* stack = &tld_[thread_index].stack; arrow::util::TempVectorHolder hash_holder( stack, arrow::util::MiniBatch::kMiniBatchLength); uint32_t* hashes = hash_holder.mutable_data(); diff --git a/cpp/src/arrow/acero/hash_join_node_test.cc b/cpp/src/arrow/acero/hash_join_node_test.cc index 9c3dbc176ff..34c20a3c9ee 100644 --- a/cpp/src/arrow/acero/hash_join_node_test.cc +++ b/cpp/src/arrow/acero/hash_join_node_test.cc @@ -3201,5 +3201,11 @@ TEST(HashJoin, ChainedIntegerHashJoins) { } } +// Test that a large number of joins don't overflow the temp vector stack, like GH-39582 +// and GH-39951. +TEST(HashJoin, ManyJoins) { + // TODO. +} + } // namespace acero } // namespace arrow diff --git a/cpp/src/arrow/acero/query_context.cc b/cpp/src/arrow/acero/query_context.cc index ab35c23b379..18beb19ab7f 100644 --- a/cpp/src/arrow/acero/query_context.cc +++ b/cpp/src/arrow/acero/query_context.cc @@ -18,47 +18,11 @@ #include "arrow/acero/query_context.h" #include "arrow/util/cpu_info.h" #include "arrow/util/io_util.h" -#include "arrow/util/value_parsing.h" namespace arrow { using arrow::internal::CpuInfo; namespace acero { -namespace internal { - -int64_t GetTempStackSizeFromEnvVar() { - auto maybe_env_value = arrow::internal::GetEnvVar(kTempStackSizeEnvVar); - if (!maybe_env_value.ok()) { - return kDefaultTempStackSize; - } - std::string env_value = std::move(maybe_env_value).ValueUnsafe(); - if (env_value.empty()) { - return kDefaultTempStackSize; - } - - int64_t temp_stack_size = 0; - bool ok = ::arrow::internal::ParseValue(env_value.c_str(), env_value.size(), - &temp_stack_size); - if (!ok || temp_stack_size <= 0) { - ARROW_LOG(WARNING) << "Invalid temp stack size provided in " << kTempStackSizeEnvVar - << ". Using default temp stack size: " << kDefaultTempStackSize; - return kDefaultTempStackSize; - } else if (temp_stack_size < kMinTempStackSize) { - ARROW_LOG(WARNING) << "Temp stack size provided in " << kTempStackSizeEnvVar - << " is too small. Using minimal temp stack size: " - << kMinTempStackSize; - return kMinTempStackSize; - } else if (temp_stack_size > kMaxTempStackSize) { - ARROW_LOG(WARNING) << "Temp stack size provided in " << kTempStackSizeEnvVar - << " is too big. Using maximal temp stack size: " - << kMaxTempStackSize; - return kMaxTempStackSize; - } - return temp_stack_size; -} - -} // namespace internal - namespace { io::IOContext GetIoContext(const QueryOptions& opts, const ExecContext& exec_context) { if (opts.custom_io_executor == nullptr) { @@ -76,8 +40,7 @@ QueryContext::QueryContext(QueryOptions opts, ExecContext exec_context) const CpuInfo* QueryContext::cpu_info() const { return CpuInfo::GetInstance(); } int64_t QueryContext::hardware_flags() const { return cpu_info()->hardware_flags(); } -Status QueryContext::Init(size_t max_num_threads, util::AsyncTaskScheduler* scheduler) { - tld_.resize(max_num_threads); +Status QueryContext::Init(util::AsyncTaskScheduler* scheduler) { async_scheduler_ = scheduler; return Status::OK(); } @@ -86,15 +49,6 @@ size_t QueryContext::GetThreadIndex() { return thread_indexer_(); } size_t QueryContext::max_concurrency() const { return thread_indexer_.Capacity(); } -Result QueryContext::GetTempStack(size_t thread_index) { - if (ARROW_PREDICT_FALSE(!tld_[thread_index].is_init)) { - static const int64_t temp_stack_size = internal::GetTempStackSizeFromEnvVar(); - RETURN_NOT_OK(tld_[thread_index].stack.Init(memory_pool(), temp_stack_size)); - tld_[thread_index].is_init = true; - } - return &tld_[thread_index].stack; -} - Result> QueryContext::BeginExternalTask(std::string_view name) { Future<> completion_future = Future<>::Make(); if (async_scheduler_->AddSimpleTask([completion_future] { return completion_future; }, diff --git a/cpp/src/arrow/acero/query_context.h b/cpp/src/arrow/acero/query_context.h index c1c4555f562..3eff2994398 100644 --- a/cpp/src/arrow/acero/query_context.h +++ b/cpp/src/arrow/acero/query_context.h @@ -33,28 +33,12 @@ using io::IOContext; namespace acero { -namespace internal { - -constexpr char kTempStackSizeEnvVar[] = "ACERO_TEMP_STACK_SIZE"; -/// Default 256KB temp stack size to be fairly sufficient for most common cases. -constexpr int64_t kDefaultTempStackSize = 256 * arrow::util::MiniBatch::kMiniBatchLength; -/// Minimal 64KB temp stack size to be barely useful. -constexpr int64_t kMinTempStackSize = 64 * arrow::util::MiniBatch::kMiniBatchLength; -/// Maximal 64MB temp stack size, which should be sufficient for any case that is not -/// insane, and the stack init overhead is acceptable. -constexpr int64_t kMaxTempStackSize = - 64 * 1024 * arrow::util::MiniBatch::kMiniBatchLength; -ARROW_ACERO_EXPORT -int64_t GetTempStackSizeFromEnvVar(); - -} // namespace internal - class ARROW_ACERO_EXPORT QueryContext { public: QueryContext(QueryOptions opts = {}, ExecContext exec_context = *default_exec_context()); - Status Init(size_t max_num_threads, arrow::util::AsyncTaskScheduler* scheduler); + Status Init(arrow::util::AsyncTaskScheduler* scheduler); const ::arrow::internal::CpuInfo* cpu_info() const; int64_t hardware_flags() const; @@ -68,7 +52,6 @@ class ARROW_ACERO_EXPORT QueryContext { size_t GetThreadIndex(); size_t max_concurrency() const; - Result GetTempStack(size_t thread_index); /// \brief Start an external task /// @@ -161,11 +144,6 @@ class ARROW_ACERO_EXPORT QueryContext { std::unique_ptr task_scheduler_ = TaskScheduler::Make(); ThreadIndexer thread_indexer_; - struct ThreadLocalData { - bool is_init = false; - arrow::util::TempVectorStack stack; - }; - std::vector tld_; std::atomic in_flight_bytes_to_disk_{0}; }; diff --git a/cpp/src/arrow/acero/query_context_test.cc b/cpp/src/arrow/acero/query_context_test.cc deleted file mode 100644 index 33717701141..00000000000 --- a/cpp/src/arrow/acero/query_context_test.cc +++ /dev/null @@ -1,100 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include - -#include "arrow/acero/query_context.h" -#include "arrow/testing/gtest_util.h" -#include "arrow/util/io_util.h" - -namespace arrow { -namespace acero { - -TEST(TestTempStack, GetTempStackSizeFromEnvVar) { - // Uncleared env var may have side-effect to subsequent tests. Use a structure to help - // clearing the env var when leaving the scope. - struct ScopedEnvVar { - ScopedEnvVar(const char* name, const char* value) : name_(std::move(name)) { - ARROW_CHECK_OK(::arrow::internal::SetEnvVar(name_, value)); - } - ~ScopedEnvVar() { ARROW_CHECK_OK(::arrow::internal::DelEnvVar(name_)); } - - private: - const char* name_; - }; - - // Not set. - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); - - // Empty. - { - ScopedEnvVar env(internal::kTempStackSizeEnvVar, ""); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); - } - - // Non-number. - { - ScopedEnvVar env(internal::kTempStackSizeEnvVar, "invalid"); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); - } - - // Number with invalid suffix. - { - ScopedEnvVar env(internal::kTempStackSizeEnvVar, "42MB"); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); - } - - // Smaller than minimal temp stack size. - { - ScopedEnvVar env(internal::kTempStackSizeEnvVar, "65535"); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kMinTempStackSize); - } - - // Between minimal and maximal temp stack size. - { - ScopedEnvVar env(internal::kTempStackSizeEnvVar, "65537"); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), 65537); - } - - // Bigger than maximal temp stack size. - { - ScopedEnvVar env(internal::kTempStackSizeEnvVar, "67108865"); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kMaxTempStackSize); - } - - // Zero. - { - ScopedEnvVar env(internal::kTempStackSizeEnvVar, "0"); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); - } - - // Negative number. - { - ScopedEnvVar env(internal::kTempStackSizeEnvVar, "-1"); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); - } - - // Over int64 max. - { - auto str = std::to_string(std::numeric_limits::max()) + "0"; - ScopedEnvVar env(internal::kTempStackSizeEnvVar, str.c_str()); - ASSERT_EQ(internal::GetTempStackSizeFromEnvVar(), internal::kDefaultTempStackSize); - } -} - -} // namespace acero -} // namespace arrow diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index 542e943c4a8..6b9a5d58cc1 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -2470,6 +2470,7 @@ Status JoinProbeProcessor::OnFinished() { class SwissJoin : public HashJoinImpl { public: + static constexpr auto kTempStackUsage = 64 * arrow::util::MiniBatch::kMiniBatchLength; Status Init(QueryContext* ctx, JoinType join_type, size_t num_threads, const HashJoinProjectionMaps* proj_map_left, const HashJoinProjectionMaps* proj_map_right, @@ -2513,6 +2514,7 @@ class SwissJoin : public HashJoinImpl { local_states_.resize(num_threads_); for (int i = 0; i < num_threads_; ++i) { + RETURN_NOT_OK(local_states_[i].stack.Init(pool_, kTempStackUsage)); local_states_[i].hash_table_ready = false; local_states_[i].num_output_batches = 0; local_states_[i].materialize.Init(pool_, proj_map_left, proj_map_right); @@ -2566,8 +2568,7 @@ class SwissJoin : public HashJoinImpl { ExecBatch keypayload_batch; ARROW_ASSIGN_OR_RAISE(keypayload_batch, KeyPayloadFromInput(/*side=*/0, &batch)); - ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * temp_stack, - ctx_->GetTempStack(thread_index)); + arrow::util::TempVectorStack* temp_stack = &local_states_[thread_index].stack; return CancelIfNotOK( probe_processor_.OnNextBatch(thread_index, keypayload_batch, temp_stack, @@ -2679,8 +2680,7 @@ class SwissJoin : public HashJoinImpl { input_batch.values[schema->num_cols(HashJoinProjection::KEY) + icol]; } } - ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * temp_stack, - ctx_->GetTempStack(thread_id)); + arrow::util::TempVectorStack* temp_stack = &local_states_[thread_id].stack; RETURN_NOT_OK(CancelIfNotOK(hash_table_build_.PushNextBatch( static_cast(thread_id), key_batch, no_payload ? nullptr : &payload_batch, temp_stack))); @@ -2715,8 +2715,7 @@ class SwissJoin : public HashJoinImpl { Status MergeFinished(size_t thread_id) { RETURN_NOT_OK(status()); - ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * temp_stack, - ctx_->GetTempStack(thread_id)); + arrow::util::TempVectorStack* temp_stack = &local_states_[thread_id].stack; hash_table_build_.FinishPrtnMerge(temp_stack); return CancelIfNotOK(OnBuildHashTableFinished(static_cast(thread_id))); } @@ -2771,8 +2770,7 @@ class SwissJoin : public HashJoinImpl { std::min((task_id + 1) * kNumRowsPerScanTask, hash_table_.num_rows()); // Get thread index and related temp vector stack // - ARROW_ASSIGN_OR_RAISE(arrow::util::TempVectorStack * temp_stack, - ctx_->GetTempStack(thread_id)); + arrow::util::TempVectorStack* temp_stack = &local_states_[thread_id].stack; // Split into mini-batches // @@ -2949,6 +2947,7 @@ class SwissJoin : public HashJoinImpl { FinishedCallback finished_callback_; struct ThreadLocalState { + arrow::util::TempVectorStack stack; JoinResultMaterialize materialize; std::vector temp_column_arrays; int64_t num_output_batches; diff --git a/cpp/src/arrow/compute/key_hash_internal.h b/cpp/src/arrow/compute/key_hash_internal.h index 7d226f52086..cde4c54433e 100644 --- a/cpp/src/arrow/compute/key_hash_internal.h +++ b/cpp/src/arrow/compute/key_hash_internal.h @@ -48,6 +48,11 @@ class ARROW_EXPORT Hashing32 { static void HashMultiColumn(const std::vector& cols, LightContext* ctx, uint32_t* out_hash); + // Clarify the max temp stack usage for HashBatch so the caller could reserve enough + // size in advance. + static constexpr auto kTempStackUsage = + (sizeof(uint32_t) + sizeof(uint16_t) + sizeof(uint32_t) + /*extra=*/1) * + util::MiniBatch::kMiniBatchLength; static Status HashBatch(const ExecBatch& key_batch, uint32_t* hashes, std::vector& column_arrays, int64_t hardware_flags, util::TempVectorStack* temp_stack, @@ -161,6 +166,10 @@ class ARROW_EXPORT Hashing64 { static void HashMultiColumn(const std::vector& cols, LightContext* ctx, uint64_t* hashes); + // Clarify the max temp stack usage for HashBatch so the caller could reserve enough + // size in advance. + static constexpr auto kTempStackUsage = + (sizeof(uint16_t) + sizeof(uint64_t)) * util::MiniBatch::kMiniBatchLength; static Status HashBatch(const ExecBatch& key_batch, uint64_t* hashes, std::vector& column_arrays, int64_t hardware_flags, util::TempVectorStack* temp_stack, diff --git a/cpp/src/arrow/compute/key_hash_test.cc b/cpp/src/arrow/compute/key_hash_test.cc index 4e5d869cb7d..99e773c42bd 100644 --- a/cpp/src/arrow/compute/key_hash_test.cc +++ b/cpp/src/arrow/compute/key_hash_test.cc @@ -311,5 +311,11 @@ TEST(VectorHash, FixedLengthTailByteSafety) { HashFixedLengthFrom(/*key_length=*/19, /*num_rows=*/64, /*start_row=*/63); } +// Make sure that Hashing32/64::HashBatch uses no more stack space than declared in +// Hashing32/64::kTempStackUsage. +TEST(VectorHash, TempStackUsage) { + // TODO +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/row/compare_internal.h b/cpp/src/arrow/compute/row/compare_internal.h index 16002ee5184..e110824fb57 100644 --- a/cpp/src/arrow/compute/row/compare_internal.h +++ b/cpp/src/arrow/compute/row/compare_internal.h @@ -32,6 +32,12 @@ namespace compute { class ARROW_EXPORT KeyCompare { public: + // Clarify the max temp stack usage for CompareColumnsToRows so the caller could reserve + // enough size in advance. + static constexpr auto kTempStackUsage = + (sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t) + /*extra=*/1) * + util::MiniBatch::kMiniBatchLength; + // Returns a single 16-bit selection vector of rows that failed comparison. // If there is input selection on the left, the resulting selection is a filtered image // of input selection. diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index 1d8562cd56d..fe7bbd97a28 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -106,5 +106,11 @@ TEST(KeyCompare, CompareColumnsToRowsCuriousFSB) { } } +// Make sure that KeyCompare::CompareColumnsToRows uses no more stack space than declared +// in KeyCompare::kTempStackUsage. +TEST(KeyCompare, TempStackUsage) { + // TODO +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/util.cc b/cpp/src/arrow/compute/util.cc index b0c863b26a0..c68bc75527d 100644 --- a/cpp/src/arrow/compute/util.cc +++ b/cpp/src/arrow/compute/util.cc @@ -32,10 +32,13 @@ using internal::CpuInfo; namespace util { void TempVectorStack::alloc(uint32_t num_bytes, uint8_t** data, int* id) { - int64_t new_top = top_ + EstimatedAllocationSize(num_bytes); + int64_t estimated_alloc_size = EstimatedAllocationSize(num_bytes); + int64_t new_top = top_ + estimated_alloc_size; // Stack overflow check (see GH-39582). // XXX cannot return a regular Status because most consumers do not either. - ARROW_CHECK_LE(new_top, buffer_size_) << "TempVectorStack::alloc overflow"; + ARROW_CHECK_LE(new_top, buffer_size_) + << "TempVectorStack::alloc overflow: attempt " << estimated_alloc_size + << ", current " << top_ << ", capacity " << buffer_size_; *data = buffer_->mutable_data() + top_ + sizeof(uint64_t); // We set 8 bytes before the beginning of the allocated range and // 8 bytes after the end to check for stack overflow (which would @@ -44,6 +47,7 @@ void TempVectorStack::alloc(uint32_t num_bytes, uint8_t** data, int* id) { reinterpret_cast(buffer_->mutable_data() + new_top)[-1] = kGuard2; *id = num_vectors_++; top_ = new_top; + peak_usage = std::max(peak_usage, top_); } void TempVectorStack::release(int id, uint32_t num_bytes) { diff --git a/cpp/src/arrow/compute/util.h b/cpp/src/arrow/compute/util.h index 88dce160ce9..ce51fd4d27b 100644 --- a/cpp/src/arrow/compute/util.h +++ b/cpp/src/arrow/compute/util.h @@ -89,6 +89,7 @@ class ARROW_EXPORT TempVectorStack { Status Init(MemoryPool* pool, int64_t size) { num_vectors_ = 0; top_ = 0; + peak_usage = 0; buffer_size_ = EstimatedAllocationSize(size); ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool)); // Ensure later operations don't accidentally read uninitialized memory. @@ -97,6 +98,8 @@ class ARROW_EXPORT TempVectorStack { return Status::OK(); } + int64_t PeakUsage() const { return peak_usage; } + private: static int64_t EstimatedAllocationSize(int64_t size) { return PaddedAllocationSize(size) + 2 * sizeof(uint64_t); @@ -119,6 +122,7 @@ class ARROW_EXPORT TempVectorStack { static constexpr int64_t kPadding = 64; int num_vectors_; int64_t top_; + int64_t peak_usage; std::unique_ptr buffer_; int64_t buffer_size_; }; diff --git a/docs/source/cpp/env_vars.rst b/docs/source/cpp/env_vars.rst index f5c58a998fa..0a082b0a5d8 100644 --- a/docs/source/cpp/env_vars.rst +++ b/docs/source/cpp/env_vars.rst @@ -49,26 +49,6 @@ that changing their value later will have an effect. option but this will have a significant performance impact as the buffer will need to be copied. -.. envvar:: ACERO_TEMP_STACK_SIZE - - Arrow C++'s Acero module performs computation on streams of data. This - computation uses a thread-local temporary stack to store intermediate - results to reduce the overhead of memory allocation. As the complexity - of the computation increases, such as when joining multiple tables, a - larger temporary stack is required. If the stack is undersized, - computations may fail with error: "TempVectorStack::alloc overflow". - - This environment variable allows users to adjust the stack size. The - value of the variable specifies the stack size in bytes. - - The default stack size is 262144 bytes (256 KB). The value of this - environment variable should be a positive integer within the inclusive - range between 65536 (64 KB) and 67108864 (64 MB). If the value is a - valid positive integer within the int64 range but exceeds the specified - minimum or maximum stack size limits, the nearest boundary value is used. - If the input is a non-positive integer, an invalid number, or exceeds the - int64 capacity, the default stack size is used. - .. envvar:: ARROW_DEBUG_MEMORY_POOL Enable rudimentary memory checks to guard against buffer overflows. From 7fe76d5e828bd8a5b052c15b87b5ab594f63cd57 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Wed, 8 May 2024 20:33:50 +0800 Subject: [PATCH 12/20] WIP --- cpp/src/arrow/acero/hash_join_node_test.cc | 25 +++++++- cpp/src/arrow/compute/key_hash_internal.h | 5 +- cpp/src/arrow/compute/key_hash_test.cc | 57 ++++++++++++++++-- cpp/src/arrow/compute/row/compare_internal.h | 7 ++- cpp/src/arrow/compute/row/compare_test.cc | 62 ++++++++++++++++++-- cpp/src/arrow/compute/util.cc | 5 +- cpp/src/arrow/compute/util.h | 4 -- 7 files changed, 142 insertions(+), 23 deletions(-) diff --git a/cpp/src/arrow/acero/hash_join_node_test.cc b/cpp/src/arrow/acero/hash_join_node_test.cc index 34c20a3c9ee..d240dee2678 100644 --- a/cpp/src/arrow/acero/hash_join_node_test.cc +++ b/cpp/src/arrow/acero/hash_join_node_test.cc @@ -3204,7 +3204,30 @@ TEST(HashJoin, ChainedIntegerHashJoins) { // Test that a large number of joins don't overflow the temp vector stack, like GH-39582 // and GH-39951. TEST(HashJoin, ManyJoins) { - // TODO. + const int num_joins = 64; + + std::vector left_types = {int8()}; + auto left_batch = ExecBatchFromJSON(left_types, R"([[1]])"); + std::vector right_types = {int8()}; + auto right_batch = ExecBatchFromJSON(right_types, R"([[1]])"); + + HashJoinNodeOptions join_opts(JoinType::LEFT_OUTER, /*left_keys=*/{"l_key"}, + /*right_keys*/ {"r_key"}); + + Declaration root{ + "exec_batch_source", + ExecBatchSourceNodeOptions(schema({field("l_key", int8())}), {left_batch})}; + + for (int i = 0; i < num_joins; ++i) { + Declaration table{ + "exec_batch_source", + ExecBatchSourceNodeOptions(schema({field("r_key", int8())}), {right_batch})}; + Declaration new_root{"hashjoin", {std::move(root), std::move(table)}, join_opts}; + + root = std::move(new_root); + } + + ASSERT_OK_AND_ASSIGN(auto result, DeclarationToTable(std::move(root))); } } // namespace acero diff --git a/cpp/src/arrow/compute/key_hash_internal.h b/cpp/src/arrow/compute/key_hash_internal.h index cde4c54433e..5ef6f37bca0 100644 --- a/cpp/src/arrow/compute/key_hash_internal.h +++ b/cpp/src/arrow/compute/key_hash_internal.h @@ -53,6 +53,7 @@ class ARROW_EXPORT Hashing32 { static constexpr auto kTempStackUsage = (sizeof(uint32_t) + sizeof(uint16_t) + sizeof(uint32_t) + /*extra=*/1) * util::MiniBatch::kMiniBatchLength; + static Status HashBatch(const ExecBatch& key_batch, uint32_t* hashes, std::vector& column_arrays, int64_t hardware_flags, util::TempVectorStack* temp_stack, @@ -169,7 +170,9 @@ class ARROW_EXPORT Hashing64 { // Clarify the max temp stack usage for HashBatch so the caller could reserve enough // size in advance. static constexpr auto kTempStackUsage = - (sizeof(uint16_t) + sizeof(uint64_t)) * util::MiniBatch::kMiniBatchLength; + (sizeof(uint16_t) + sizeof(uint64_t) + /*extra=*/1) * + util::MiniBatch::kMiniBatchLength; + static Status HashBatch(const ExecBatch& key_batch, uint64_t* hashes, std::vector& column_arrays, int64_t hardware_flags, util::TempVectorStack* temp_stack, diff --git a/cpp/src/arrow/compute/key_hash_test.cc b/cpp/src/arrow/compute/key_hash_test.cc index 99e773c42bd..5f4545b1b8c 100644 --- a/cpp/src/arrow/compute/key_hash_test.cc +++ b/cpp/src/arrow/compute/key_hash_test.cc @@ -25,12 +25,16 @@ #include "arrow/array/builder_binary.h" #include "arrow/compute/key_hash_internal.h" #include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" #include "arrow/testing/util.h" #include "arrow/util/cpu_info.h" #include "arrow/util/pcg_random.h" namespace arrow { +using arrow::random::RandomArrayGenerator; +using arrow::util::MiniBatch; +using arrow::util::TempVectorStack; using internal::checked_pointer_cast; using internal::CpuInfo; @@ -156,7 +160,7 @@ class TestVectorHash { std::vector temp_buffer; temp_buffer.resize(mini_batch_size * 4); - for (int i = 0; i < static_cast(hardware_flags_for_testing.size()); ++i) { + for (size_t i = 0; i < hardware_flags_for_testing.size(); ++i) { const auto hardware_flags = hardware_flags_for_testing[i]; if (use_32bit_hash) { if (!use_varlen_input) { @@ -192,7 +196,7 @@ class TestVectorHash { // Verify that all implementations (scalar, SIMD) give the same hashes // const auto& hashes_scalar64 = hashes64[0]; - for (int i = 0; i < static_cast(hardware_flags_for_testing.size()); ++i) { + for (size_t i = 0; i < hardware_flags_for_testing.size(); ++i) { for (int j = 0; j < num_rows; ++j) { ASSERT_EQ(hashes64[i][j], hashes_scalar64[j]) << "scalar and simd approaches yielded different hashes"; @@ -280,7 +284,7 @@ void HashFixedLengthFrom(int key_length, int num_rows, int start_row) { std::vector temp_buffer; temp_buffer.resize(mini_batch_size * 4); - for (int i = 0; i < static_cast(hardware_flags_for_testing.size()); ++i) { + for (size_t i = 0; i < hardware_flags_for_testing.size(); ++i) { const auto hardware_flags = hardware_flags_for_testing[i]; Hashing32::HashFixed(hardware_flags, /*combine_hashes=*/false, num_rows_to_hash, key_length, @@ -292,7 +296,7 @@ void HashFixedLengthFrom(int key_length, int num_rows, int start_row) { } // Verify that all implementations (scalar, SIMD) give the same hashes. - for (int i = 1; i < static_cast(hardware_flags_for_testing.size()); ++i) { + for (size_t i = 1; i < hardware_flags_for_testing.size(); ++i) { for (int j = 0; j < num_rows_to_hash; ++j) { ASSERT_EQ(hashes32[i][j], hashes32[0][j]) << "scalar and simd approaches yielded different 32-bit hashes"; @@ -313,8 +317,49 @@ TEST(VectorHash, FixedLengthTailByteSafety) { // Make sure that Hashing32/64::HashBatch uses no more stack space than declared in // Hashing32/64::kTempStackUsage. -TEST(VectorHash, TempStackUsage) { - // TODO +TEST(VectorHash, HashBatchTempStackUsage) { + for (auto num_rows : + {0, 1, MiniBatch::kMiniBatchLength, MiniBatch::kMiniBatchLength * 64}) { + SCOPED_TRACE("num_rows = " + std::to_string(num_rows)); + + MemoryPool* pool = default_memory_pool(); + RandomArrayGenerator gen(42); + + auto column = gen.Int8(num_rows, 0, 127); + ExecBatch batch({column}, num_rows); + + std::vector column_arrays; + ASSERT_OK(ColumnArraysFromExecBatch(batch, &column_arrays)); + + const auto hardware_flags_for_testing = HardwareFlagsForTesting(); + ASSERT_GT(hardware_flags_for_testing.size(), 0); + + { + std::vector hashes(num_rows); + TempVectorStack stack; + ASSERT_OK(stack.Init(pool, Hashing32::kTempStackUsage)); + for (size_t i = 0; i < hardware_flags_for_testing.size(); ++i) { + SCOPED_TRACE("hashing32 for hardware flags = " + + std::to_string(hardware_flags_for_testing[i])); + ASSERT_OK(Hashing32::HashBatch(batch, hashes.data(), column_arrays, + hardware_flags_for_testing[i], &stack, + /*start_rows=*/0, num_rows)); + } + } + + { + std::vector hashes(num_rows); + TempVectorStack stack; + ASSERT_OK(stack.Init(pool, Hashing64::kTempStackUsage)); + for (size_t i = 0; i < hardware_flags_for_testing.size(); ++i) { + SCOPED_TRACE("hashing64 for hardware flags = " + + std::to_string(hardware_flags_for_testing[i])); + ASSERT_OK(Hashing64::HashBatch(batch, hashes.data(), column_arrays, + hardware_flags_for_testing[i], &stack, + /*start_rows=*/0, num_rows)); + } + } + } } } // namespace compute diff --git a/cpp/src/arrow/compute/row/compare_internal.h b/cpp/src/arrow/compute/row/compare_internal.h index e110824fb57..a9a6246affe 100644 --- a/cpp/src/arrow/compute/row/compare_internal.h +++ b/cpp/src/arrow/compute/row/compare_internal.h @@ -34,9 +34,10 @@ class ARROW_EXPORT KeyCompare { public: // Clarify the max temp stack usage for CompareColumnsToRows so the caller could reserve // enough size in advance. - static constexpr auto kTempStackUsage = - (sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t) + /*extra=*/1) * - util::MiniBatch::kMiniBatchLength; + constexpr static int64_t TempStackUsage(int64_t num_rows) { + return (sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t)) * num_rows + + /*extra=*/util::MiniBatch::kMiniBatchLength; + } // Returns a single 16-bit selection vector of rows that failed comparison. // If there is input selection on the left, the resulting selection is a filtered image diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index fe7bbd97a28..825369080e2 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -19,23 +19,26 @@ #include "arrow/compute/row/compare_internal.h" #include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" namespace arrow { namespace compute { using arrow::bit_util::BytesForBits; using arrow::internal::CpuInfo; +using arrow::random::RandomArrayGenerator; using arrow::util::MiniBatch; using arrow::util::TempVectorStack; // Specialized case for GH-39577. TEST(KeyCompare, CompareColumnsToRowsCuriousFSB) { int fsb_length = 9; + int num_rows = 7; + MemoryPool* pool = default_memory_pool(); TempVectorStack stack; - ASSERT_OK(stack.Init(pool, 8 * MiniBatch::kMiniBatchLength * sizeof(uint64_t))); + ASSERT_OK(stack.Init(pool, KeyCompare::TempStackUsage(num_rows))); - int num_rows = 7; auto column_right = ArrayFromJSON(fixed_size_binary(fsb_length), R"([ "000000000", "111111111", @@ -107,9 +110,58 @@ TEST(KeyCompare, CompareColumnsToRowsCuriousFSB) { } // Make sure that KeyCompare::CompareColumnsToRows uses no more stack space than declared -// in KeyCompare::kTempStackUsage. -TEST(KeyCompare, TempStackUsage) { - // TODO +// in KeyCompare::TempStackUsage(). +TEST(KeyCompare, CompareColumnsToRowsTempStackUsage) { + for (auto num_rows : + {0, 1, MiniBatch::kMiniBatchLength, MiniBatch::kMiniBatchLength * 64}) { + SCOPED_TRACE("num_rows = " + std::to_string(num_rows)); + + MemoryPool* pool = default_memory_pool(); + TempVectorStack stack; + ASSERT_OK(stack.Init(pool, KeyCompare::TempStackUsage(num_rows))); + + RandomArrayGenerator gen(42); + + auto column_right = gen.Int8(num_rows, 0, 127); + ExecBatch batch_right({column_right}, num_rows); + + std::vector column_metadatas_right; + ASSERT_OK(ColumnMetadatasFromExecBatch(batch_right, &column_metadatas_right)); + + RowTableMetadata table_metadata_right; + table_metadata_right.FromColumnMetadataVector(column_metadatas_right, + sizeof(uint64_t), sizeof(uint64_t)); + + std::vector column_arrays_right; + ASSERT_OK(ColumnArraysFromExecBatch(batch_right, &column_arrays_right)); + + RowTableImpl row_table; + ASSERT_OK(row_table.Init(pool, table_metadata_right)); + + RowTableEncoder row_encoder; + row_encoder.Init(column_metadatas_right, sizeof(uint64_t), sizeof(uint64_t)); + row_encoder.PrepareEncodeSelected(0, num_rows, column_arrays_right); + + std::vector row_ids_right(num_rows); + std::iota(row_ids_right.begin(), row_ids_right.end(), 0); + ASSERT_OK(row_encoder.EncodeSelected(&row_table, num_rows, row_ids_right.data())); + + auto column_left = gen.Int8(num_rows, 0, 127); + ExecBatch batch_left({column_left}, num_rows); + std::vector column_arrays_left; + ASSERT_OK(ColumnArraysFromExecBatch(batch_left, &column_arrays_left)); + + std::vector row_ids_left(num_rows); + std::iota(row_ids_left.begin(), row_ids_left.end(), 0); + + LightContext ctx{CpuInfo::GetInstance()->hardware_flags(), &stack}; + + uint32_t num_rows_no_match; + std::vector row_ids_out(num_rows); + KeyCompare::CompareColumnsToRows(num_rows, NULLPTR, row_ids_left.data(), &ctx, + &num_rows_no_match, row_ids_out.data(), + column_arrays_left, row_table, true, NULLPTR); + } } } // namespace compute diff --git a/cpp/src/arrow/compute/util.cc b/cpp/src/arrow/compute/util.cc index c68bc75527d..cd20fe486e8 100644 --- a/cpp/src/arrow/compute/util.cc +++ b/cpp/src/arrow/compute/util.cc @@ -37,8 +37,8 @@ void TempVectorStack::alloc(uint32_t num_bytes, uint8_t** data, int* id) { // Stack overflow check (see GH-39582). // XXX cannot return a regular Status because most consumers do not either. ARROW_CHECK_LE(new_top, buffer_size_) - << "TempVectorStack::alloc overflow: attempt " << estimated_alloc_size - << ", current " << top_ << ", capacity " << buffer_size_; + << "TempVectorStack::alloc overflow: allocating " << estimated_alloc_size + << " on top of " << top_ << " in stack of size " << buffer_size_; *data = buffer_->mutable_data() + top_ + sizeof(uint64_t); // We set 8 bytes before the beginning of the allocated range and // 8 bytes after the end to check for stack overflow (which would @@ -47,7 +47,6 @@ void TempVectorStack::alloc(uint32_t num_bytes, uint8_t** data, int* id) { reinterpret_cast(buffer_->mutable_data() + new_top)[-1] = kGuard2; *id = num_vectors_++; top_ = new_top; - peak_usage = std::max(peak_usage, top_); } void TempVectorStack::release(int id, uint32_t num_bytes) { diff --git a/cpp/src/arrow/compute/util.h b/cpp/src/arrow/compute/util.h index ce51fd4d27b..88dce160ce9 100644 --- a/cpp/src/arrow/compute/util.h +++ b/cpp/src/arrow/compute/util.h @@ -89,7 +89,6 @@ class ARROW_EXPORT TempVectorStack { Status Init(MemoryPool* pool, int64_t size) { num_vectors_ = 0; top_ = 0; - peak_usage = 0; buffer_size_ = EstimatedAllocationSize(size); ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool)); // Ensure later operations don't accidentally read uninitialized memory. @@ -98,8 +97,6 @@ class ARROW_EXPORT TempVectorStack { return Status::OK(); } - int64_t PeakUsage() const { return peak_usage; } - private: static int64_t EstimatedAllocationSize(int64_t size) { return PaddedAllocationSize(size) + 2 * sizeof(uint64_t); @@ -122,7 +119,6 @@ class ARROW_EXPORT TempVectorStack { static constexpr int64_t kPadding = 64; int num_vectors_; int64_t top_; - int64_t peak_usage; std::unique_ptr buffer_; int64_t buffer_size_; }; From cf45ef93bccb00e57aad6219d7f823901dae341b Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Wed, 8 May 2024 23:57:38 +0800 Subject: [PATCH 13/20] Add many join case to overflow the stack --- cpp/src/arrow/acero/hash_join_node_test.cc | 31 +++++++++++++--------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/acero/hash_join_node_test.cc b/cpp/src/arrow/acero/hash_join_node_test.cc index d240dee2678..ecadbc664aa 100644 --- a/cpp/src/arrow/acero/hash_join_node_test.cc +++ b/cpp/src/arrow/acero/hash_join_node_test.cc @@ -3206,28 +3206,33 @@ TEST(HashJoin, ChainedIntegerHashJoins) { TEST(HashJoin, ManyJoins) { const int num_joins = 64; - std::vector left_types = {int8()}; - auto left_batch = ExecBatchFromJSON(left_types, R"([[1]])"); - std::vector right_types = {int8()}; - auto right_batch = ExecBatchFromJSON(right_types, R"([[1]])"); + const int num_left_rows = ExecBatchBuilder::num_rows_max(); + ASSERT_OK_AND_ASSIGN( + auto left_batches, + MakeIntegerBatches({[](int row_id) -> int64_t { return row_id; }}, + schema({field("l_key", int8())}), + /*num_batches=*/1, /*batch_size=*/num_left_rows)); + Declaration root{"exec_batch_source", + ExecBatchSourceNodeOptions(std::move(left_batches.schema), + std::move(left_batches.batches))}; HashJoinNodeOptions join_opts(JoinType::LEFT_OUTER, /*left_keys=*/{"l_key"}, /*right_keys*/ {"r_key"}); - Declaration root{ - "exec_batch_source", - ExecBatchSourceNodeOptions(schema({field("l_key", int8())}), {left_batch})}; - for (int i = 0; i < num_joins; ++i) { - Declaration table{ - "exec_batch_source", - ExecBatchSourceNodeOptions(schema({field("r_key", int8())}), {right_batch})}; - Declaration new_root{"hashjoin", {std::move(root), std::move(table)}, join_opts}; + ASSERT_OK_AND_ASSIGN(auto right_batches, + MakeIntegerBatches({[i](int) -> int64_t { return i; }}, + schema({field("r_key", int8())}), + /*num_batches=*/1, /*batch_size=*/2)); + Declaration table{"exec_batch_source", + ExecBatchSourceNodeOptions(std::move(right_batches.schema), + std::move(right_batches.batches))}; + Declaration new_root{"hashjoin", {std::move(root), std::move(table)}, join_opts}; root = std::move(new_root); } - ASSERT_OK_AND_ASSIGN(auto result, DeclarationToTable(std::move(root))); + ASSERT_OK_AND_ASSIGN(std::ignore, DeclarationToTable(std::move(root))); } } // namespace acero From 7b397fb8c2d27cfe6e01645e37889e2205c450ed Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Thu, 9 May 2024 00:55:40 +0800 Subject: [PATCH 14/20] Explain the many join case --- cpp/src/arrow/acero/hash_join_node_test.cc | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/cpp/src/arrow/acero/hash_join_node_test.cc b/cpp/src/arrow/acero/hash_join_node_test.cc index ecadbc664aa..1d9d3681a34 100644 --- a/cpp/src/arrow/acero/hash_join_node_test.cc +++ b/cpp/src/arrow/acero/hash_join_node_test.cc @@ -28,6 +28,7 @@ #include "arrow/api.h" #include "arrow/compute/kernels/row_encoder_internal.h" #include "arrow/compute/kernels/test_util.h" +#include "arrow/compute/light_array_internal.h" #include "arrow/testing/extension_type.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/matchers.h" @@ -41,6 +42,7 @@ namespace arrow { using compute::call; using compute::default_exec_context; +using compute::ExecBatchBuilder; using compute::ExecSpan; using compute::field_ref; using compute::SortIndices; @@ -3204,8 +3206,24 @@ TEST(HashJoin, ChainedIntegerHashJoins) { // Test that a large number of joins don't overflow the temp vector stack, like GH-39582 // and GH-39951. TEST(HashJoin, ManyJoins) { + // The idea of this case is to create many nested join nodes that may possibly cause + // recursive usage of temp vector stack. To make sure that the recursion happens: + // 1. A left-deep join tree is created so that the left-most (the final probe side) + // table will go through all the hash tables from the right side. + // 2. Left-outer join is used so that every join will increase the cardinality. + // 3. The left-most table contains rows of unique integers from 0 to N. + // 4. Each right table at level i contains two rows of integer i, so that the probing of + // each level will increase the result by one row. + // 5. The left-most table is a single batch of enough rows, so that at each level, the + // probing will accumulate enough result rows to have to output to the subsequent level + // before finishing the current batch (releasing the buffer allocated on the temp vector + // stack), which is essentially the recursive usage of the temp vector stack. + + // A fair number of joins to guarantee temp vector stack overflow before GH-41335. const int num_joins = 64; + // `ExecBatchBuilder::num_rows_max()` is the number of rows for swiss join to accumulate + // before outputting. const int num_left_rows = ExecBatchBuilder::num_rows_max(); ASSERT_OK_AND_ASSIGN( auto left_batches, From 2d6f053d29a1256159717abc0abf4556a5592a17 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Thu, 9 May 2024 01:08:41 +0800 Subject: [PATCH 15/20] Format --- cpp/src/arrow/acero/hash_join_node.cc | 1 + cpp/src/arrow/acero/swiss_join.cc | 1 + 2 files changed, 2 insertions(+) diff --git a/cpp/src/arrow/acero/hash_join_node.cc b/cpp/src/arrow/acero/hash_join_node.cc index 078af16046a..044da51bc9e 100644 --- a/cpp/src/arrow/acero/hash_join_node.cc +++ b/cpp/src/arrow/acero/hash_join_node.cc @@ -657,6 +657,7 @@ struct BloomFilterPushdownContext { static constexpr auto kTempStackUsage = Hashing32::kTempStackUsage + (sizeof(uint32_t) + /*extra=*/1) * arrow::util::MiniBatch::kMiniBatchLength; + struct ThreadLocalData { arrow::util::TempVectorStack stack; }; diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index 6b9a5d58cc1..17c52126973 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -2471,6 +2471,7 @@ Status JoinProbeProcessor::OnFinished() { class SwissJoin : public HashJoinImpl { public: static constexpr auto kTempStackUsage = 64 * arrow::util::MiniBatch::kMiniBatchLength; + Status Init(QueryContext* ctx, JoinType join_type, size_t num_threads, const HashJoinProjectionMaps* proj_map_left, const HashJoinProjectionMaps* proj_map_right, From a3622d9e7231a6a8ba2c29cb2e695b2ae46f64cd Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Thu, 9 May 2024 01:42:16 +0800 Subject: [PATCH 16/20] Refine naming --- cpp/src/arrow/acero/hash_join_node.cc | 2 +- cpp/src/arrow/acero/hash_join_node_test.cc | 2 +- cpp/src/arrow/compute/key_hash_internal.h | 4 ++-- cpp/src/arrow/compute/key_hash_test.cc | 6 +++--- cpp/src/arrow/compute/row/compare_internal.h | 2 +- cpp/src/arrow/compute/row/compare_test.cc | 6 +++--- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/acero/hash_join_node.cc b/cpp/src/arrow/acero/hash_join_node.cc index 044da51bc9e..06405f16c8d 100644 --- a/cpp/src/arrow/acero/hash_join_node.cc +++ b/cpp/src/arrow/acero/hash_join_node.cc @@ -655,7 +655,7 @@ struct BloomFilterPushdownContext { } eval_; static constexpr auto kTempStackUsage = - Hashing32::kTempStackUsage + + Hashing32::kHashBatchTempStackUsage + (sizeof(uint32_t) + /*extra=*/1) * arrow::util::MiniBatch::kMiniBatchLength; struct ThreadLocalData { diff --git a/cpp/src/arrow/acero/hash_join_node_test.cc b/cpp/src/arrow/acero/hash_join_node_test.cc index 1d9d3681a34..e1968aecf63 100644 --- a/cpp/src/arrow/acero/hash_join_node_test.cc +++ b/cpp/src/arrow/acero/hash_join_node_test.cc @@ -3235,7 +3235,7 @@ TEST(HashJoin, ManyJoins) { std::move(left_batches.batches))}; HashJoinNodeOptions join_opts(JoinType::LEFT_OUTER, /*left_keys=*/{"l_key"}, - /*right_keys*/ {"r_key"}); + /*right_keys=*/{"r_key"}); for (int i = 0; i < num_joins; ++i) { ASSERT_OK_AND_ASSIGN(auto right_batches, diff --git a/cpp/src/arrow/compute/key_hash_internal.h b/cpp/src/arrow/compute/key_hash_internal.h index 5ef6f37bca0..2b9c9455651 100644 --- a/cpp/src/arrow/compute/key_hash_internal.h +++ b/cpp/src/arrow/compute/key_hash_internal.h @@ -50,7 +50,7 @@ class ARROW_EXPORT Hashing32 { // Clarify the max temp stack usage for HashBatch so the caller could reserve enough // size in advance. - static constexpr auto kTempStackUsage = + static constexpr auto kHashBatchTempStackUsage = (sizeof(uint32_t) + sizeof(uint16_t) + sizeof(uint32_t) + /*extra=*/1) * util::MiniBatch::kMiniBatchLength; @@ -169,7 +169,7 @@ class ARROW_EXPORT Hashing64 { // Clarify the max temp stack usage for HashBatch so the caller could reserve enough // size in advance. - static constexpr auto kTempStackUsage = + static constexpr auto kHashBatchTempStackUsage = (sizeof(uint16_t) + sizeof(uint64_t) + /*extra=*/1) * util::MiniBatch::kMiniBatchLength; diff --git a/cpp/src/arrow/compute/key_hash_test.cc b/cpp/src/arrow/compute/key_hash_test.cc index 5f4545b1b8c..fdf6d212585 100644 --- a/cpp/src/arrow/compute/key_hash_test.cc +++ b/cpp/src/arrow/compute/key_hash_test.cc @@ -316,7 +316,7 @@ TEST(VectorHash, FixedLengthTailByteSafety) { } // Make sure that Hashing32/64::HashBatch uses no more stack space than declared in -// Hashing32/64::kTempStackUsage. +// Hashing32/64::kHashBatchTempStackUsage. TEST(VectorHash, HashBatchTempStackUsage) { for (auto num_rows : {0, 1, MiniBatch::kMiniBatchLength, MiniBatch::kMiniBatchLength * 64}) { @@ -337,7 +337,7 @@ TEST(VectorHash, HashBatchTempStackUsage) { { std::vector hashes(num_rows); TempVectorStack stack; - ASSERT_OK(stack.Init(pool, Hashing32::kTempStackUsage)); + ASSERT_OK(stack.Init(pool, Hashing32::kHashBatchTempStackUsage)); for (size_t i = 0; i < hardware_flags_for_testing.size(); ++i) { SCOPED_TRACE("hashing32 for hardware flags = " + std::to_string(hardware_flags_for_testing[i])); @@ -350,7 +350,7 @@ TEST(VectorHash, HashBatchTempStackUsage) { { std::vector hashes(num_rows); TempVectorStack stack; - ASSERT_OK(stack.Init(pool, Hashing64::kTempStackUsage)); + ASSERT_OK(stack.Init(pool, Hashing64::kHashBatchTempStackUsage)); for (size_t i = 0; i < hardware_flags_for_testing.size(); ++i) { SCOPED_TRACE("hashing64 for hardware flags = " + std::to_string(hardware_flags_for_testing[i])); diff --git a/cpp/src/arrow/compute/row/compare_internal.h b/cpp/src/arrow/compute/row/compare_internal.h index a9a6246affe..e3e3f3c3b60 100644 --- a/cpp/src/arrow/compute/row/compare_internal.h +++ b/cpp/src/arrow/compute/row/compare_internal.h @@ -34,7 +34,7 @@ class ARROW_EXPORT KeyCompare { public: // Clarify the max temp stack usage for CompareColumnsToRows so the caller could reserve // enough size in advance. - constexpr static int64_t TempStackUsage(int64_t num_rows) { + constexpr static int64_t CompareColumnsToRowsTempStackUsage(int64_t num_rows) { return (sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t)) * num_rows + /*extra=*/util::MiniBatch::kMiniBatchLength; } diff --git a/cpp/src/arrow/compute/row/compare_test.cc b/cpp/src/arrow/compute/row/compare_test.cc index 825369080e2..4044049b108 100644 --- a/cpp/src/arrow/compute/row/compare_test.cc +++ b/cpp/src/arrow/compute/row/compare_test.cc @@ -37,7 +37,7 @@ TEST(KeyCompare, CompareColumnsToRowsCuriousFSB) { MemoryPool* pool = default_memory_pool(); TempVectorStack stack; - ASSERT_OK(stack.Init(pool, KeyCompare::TempStackUsage(num_rows))); + ASSERT_OK(stack.Init(pool, KeyCompare::CompareColumnsToRowsTempStackUsage(num_rows))); auto column_right = ArrayFromJSON(fixed_size_binary(fsb_length), R"([ "000000000", @@ -110,7 +110,7 @@ TEST(KeyCompare, CompareColumnsToRowsCuriousFSB) { } // Make sure that KeyCompare::CompareColumnsToRows uses no more stack space than declared -// in KeyCompare::TempStackUsage(). +// in KeyCompare::CompareColumnsToRowsTempStackUsage(). TEST(KeyCompare, CompareColumnsToRowsTempStackUsage) { for (auto num_rows : {0, 1, MiniBatch::kMiniBatchLength, MiniBatch::kMiniBatchLength * 64}) { @@ -118,7 +118,7 @@ TEST(KeyCompare, CompareColumnsToRowsTempStackUsage) { MemoryPool* pool = default_memory_pool(); TempVectorStack stack; - ASSERT_OK(stack.Init(pool, KeyCompare::TempStackUsage(num_rows))); + ASSERT_OK(stack.Init(pool, KeyCompare::CompareColumnsToRowsTempStackUsage(num_rows))); RandomArrayGenerator gen(42); From 94be92d04dbfb65d0c11bd44c063ea0886beafb6 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 14 May 2024 02:10:26 +0800 Subject: [PATCH 17/20] Move temp stack vector into internal header and reduce including --- cpp/src/arrow/CMakeLists.txt | 3 +- cpp/src/arrow/compute/key_map_internal.h | 1 + cpp/src/arrow/compute/light_array_internal.h | 1 + cpp/src/arrow/compute/light_array_test.cc | 1 + cpp/src/arrow/compute/util.cc | 34 --------- cpp/src/arrow/compute/util.h | 73 ------------------ cpp/src/arrow/compute/util_internal.cc | 79 ++++++++++++++++++++ cpp/src/arrow/compute/util_internal.h | 51 +++++++++++++ 8 files changed, 135 insertions(+), 108 deletions(-) create mode 100644 cpp/src/arrow/compute/util_internal.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 5d61112518f..0f4824ec99d 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -716,7 +716,8 @@ set(ARROW_COMPUTE_SRCS compute/row/compare_internal.cc compute/row/grouper.cc compute/row/row_internal.cc - compute/util.cc) + compute/util.cc + compute/util_internal.cc) append_runtime_avx2_src(ARROW_COMPUTE_SRCS compute/key_hash_internal_avx2.cc) append_runtime_avx2_bmi2_src(ARROW_COMPUTE_SRCS compute/key_map_internal_avx2.cc) diff --git a/cpp/src/arrow/compute/key_map_internal.h b/cpp/src/arrow/compute/key_map_internal.h index 8e06dc83483..a5e784a9e44 100644 --- a/cpp/src/arrow/compute/key_map_internal.h +++ b/cpp/src/arrow/compute/key_map_internal.h @@ -21,6 +21,7 @@ #include #include "arrow/compute/util.h" +#include "arrow/compute/util_internal.h" #include "arrow/result.h" #include "arrow/status.h" #include "arrow/type_fwd.h" diff --git a/cpp/src/arrow/compute/light_array_internal.h b/cpp/src/arrow/compute/light_array_internal.h index 67de71bf56c..995c4211998 100644 --- a/cpp/src/arrow/compute/light_array_internal.h +++ b/cpp/src/arrow/compute/light_array_internal.h @@ -22,6 +22,7 @@ #include "arrow/array.h" #include "arrow/compute/exec.h" #include "arrow/compute/util.h" +#include "arrow/compute/util_internal.h" #include "arrow/type.h" #include "arrow/util/cpu_info.h" #include "arrow/util/logging.h" diff --git a/cpp/src/arrow/compute/light_array_test.cc b/cpp/src/arrow/compute/light_array_test.cc index 08f36ee6060..cc02d489d13 100644 --- a/cpp/src/arrow/compute/light_array_test.cc +++ b/cpp/src/arrow/compute/light_array_test.cc @@ -20,6 +20,7 @@ #include #include +#include "arrow/memory_pool.h" #include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" #include "arrow/type.h" diff --git a/cpp/src/arrow/compute/util.cc b/cpp/src/arrow/compute/util.cc index cd20fe486e8..b90b3a64056 100644 --- a/cpp/src/arrow/compute/util.cc +++ b/cpp/src/arrow/compute/util.cc @@ -17,11 +17,7 @@ #include "arrow/compute/util.h" -#include "arrow/table.h" -#include "arrow/util/bit_util.h" -#include "arrow/util/bitmap_ops.h" #include "arrow/util/logging.h" -#include "arrow/util/tracing_internal.h" #include "arrow/util/ubsan.h" namespace arrow { @@ -31,36 +27,6 @@ using internal::CpuInfo; namespace util { -void TempVectorStack::alloc(uint32_t num_bytes, uint8_t** data, int* id) { - int64_t estimated_alloc_size = EstimatedAllocationSize(num_bytes); - int64_t new_top = top_ + estimated_alloc_size; - // Stack overflow check (see GH-39582). - // XXX cannot return a regular Status because most consumers do not either. - ARROW_CHECK_LE(new_top, buffer_size_) - << "TempVectorStack::alloc overflow: allocating " << estimated_alloc_size - << " on top of " << top_ << " in stack of size " << buffer_size_; - *data = buffer_->mutable_data() + top_ + sizeof(uint64_t); - // We set 8 bytes before the beginning of the allocated range and - // 8 bytes after the end to check for stack overflow (which would - // result in those known bytes being corrupted). - reinterpret_cast(buffer_->mutable_data() + top_)[0] = kGuard1; - reinterpret_cast(buffer_->mutable_data() + new_top)[-1] = kGuard2; - *id = num_vectors_++; - top_ = new_top; -} - -void TempVectorStack::release(int id, uint32_t num_bytes) { - ARROW_DCHECK(num_vectors_ == id + 1); - int64_t size = EstimatedAllocationSize(num_bytes); - ARROW_DCHECK(reinterpret_cast(buffer_->mutable_data() + top_)[-1] == - kGuard2); - ARROW_DCHECK(top_ >= size); - top_ -= size; - ARROW_DCHECK(reinterpret_cast(buffer_->mutable_data() + top_)[0] == - kGuard1); - --num_vectors_; -} - namespace bit_util { inline uint64_t SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes) { diff --git a/cpp/src/arrow/compute/util.h b/cpp/src/arrow/compute/util.h index 88dce160ce9..d56e398667f 100644 --- a/cpp/src/arrow/compute/util.h +++ b/cpp/src/arrow/compute/util.h @@ -24,17 +24,10 @@ #include #include -#include "arrow/buffer.h" #include "arrow/compute/expression.h" #include "arrow/compute/type_fwd.h" -#include "arrow/memory_pool.h" #include "arrow/result.h" -#include "arrow/status.h" -#include "arrow/util/bit_util.h" #include "arrow/util/cpu_info.h" -#include "arrow/util/mutex.h" -#include "arrow/util/thread_pool.h" -#include "arrow/util/type_fwd.h" #if defined(__clang__) || defined(__GNUC__) #define BYTESWAP(x) __builtin_bswap64(x) @@ -77,72 +70,6 @@ class MiniBatch { static constexpr int kMiniBatchLength = 1 << kLogMiniBatchLength; }; -/// Storage used to allocate temporary vectors of a batch size. -/// Temporary vectors should resemble allocating temporary variables on the stack -/// but in the context of vectorized processing where we need to store a vector of -/// temporaries instead of a single value. -class ARROW_EXPORT TempVectorStack { - template - friend class TempVectorHolder; - - public: - Status Init(MemoryPool* pool, int64_t size) { - num_vectors_ = 0; - top_ = 0; - buffer_size_ = EstimatedAllocationSize(size); - ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool)); - // Ensure later operations don't accidentally read uninitialized memory. - std::memset(buffer->mutable_data(), 0xFF, size); - buffer_ = std::move(buffer); - return Status::OK(); - } - - private: - static int64_t EstimatedAllocationSize(int64_t size) { - return PaddedAllocationSize(size) + 2 * sizeof(uint64_t); - } - - static int64_t PaddedAllocationSize(int64_t num_bytes) { - // Round up allocation size to multiple of 8 bytes - // to avoid returning temp vectors with unaligned address. - // - // Also add padding at the end to facilitate loads and stores - // using SIMD when number of vector elements is not divisible - // by the number of SIMD lanes. - // - return ::arrow::bit_util::RoundUp(num_bytes, sizeof(int64_t)) + kPadding; - } - void alloc(uint32_t num_bytes, uint8_t** data, int* id); - void release(int id, uint32_t num_bytes); - static constexpr uint64_t kGuard1 = 0x3141592653589793ULL; - static constexpr uint64_t kGuard2 = 0x0577215664901532ULL; - static constexpr int64_t kPadding = 64; - int num_vectors_; - int64_t top_; - std::unique_ptr buffer_; - int64_t buffer_size_; -}; - -template -class TempVectorHolder { - friend class TempVectorStack; - - public: - ~TempVectorHolder() { stack_->release(id_, num_elements_ * sizeof(T)); } - T* mutable_data() { return reinterpret_cast(data_); } - TempVectorHolder(TempVectorStack* stack, uint32_t num_elements) { - stack_ = stack; - num_elements_ = num_elements; - stack_->alloc(num_elements * sizeof(T), &data_, &id_); - } - - private: - TempVectorStack* stack_; - uint8_t* data_; - int id_; - uint32_t num_elements_; -}; - namespace bit_util { ARROW_EXPORT void bits_to_indexes(int bit_to_search, int64_t hardware_flags, diff --git a/cpp/src/arrow/compute/util_internal.cc b/cpp/src/arrow/compute/util_internal.cc new file mode 100644 index 00000000000..cc26982fef1 --- /dev/null +++ b/cpp/src/arrow/compute/util_internal.cc @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/util_internal.h" + +#include "arrow/compute/util.h" +#include "arrow/memory_pool.h" + +namespace arrow { +namespace util { + +Status TempVectorStack::Init(MemoryPool* pool, int64_t size) { + num_vectors_ = 0; + top_ = 0; + buffer_size_ = EstimatedAllocationSize(size); + ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool)); + // Ensure later operations don't accidentally read uninitialized memory. + std::memset(buffer->mutable_data(), 0xFF, size); + buffer_ = std::move(buffer); + return Status::OK(); +} + +int64_t TempVectorStack::PaddedAllocationSize(int64_t num_bytes) { + // Round up allocation size to multiple of 8 bytes + // to avoid returning temp vectors with unaligned address. + // + // Also add padding at the end to facilitate loads and stores + // using SIMD when number of vector elements is not divisible + // by the number of SIMD lanes. + // + return ::arrow::bit_util::RoundUp(num_bytes, sizeof(int64_t)) + kPadding; +} + +void TempVectorStack::alloc(uint32_t num_bytes, uint8_t** data, int* id) { + int64_t estimated_alloc_size = EstimatedAllocationSize(num_bytes); + int64_t new_top = top_ + estimated_alloc_size; + // Stack overflow check (see GH-39582). + // XXX cannot return a regular Status because most consumers do not either. + ARROW_CHECK_LE(new_top, buffer_size_) + << "TempVectorStack::alloc overflow: allocating " << estimated_alloc_size + << " on top of " << top_ << " in stack of size " << buffer_size_; + *data = buffer_->mutable_data() + top_ + sizeof(uint64_t); + // We set 8 bytes before the beginning of the allocated range and + // 8 bytes after the end to check for stack overflow (which would + // result in those known bytes being corrupted). + reinterpret_cast(buffer_->mutable_data() + top_)[0] = kGuard1; + reinterpret_cast(buffer_->mutable_data() + new_top)[-1] = kGuard2; + *id = num_vectors_++; + top_ = new_top; +} + +void TempVectorStack::release(int id, uint32_t num_bytes) { + ARROW_DCHECK(num_vectors_ == id + 1); + int64_t size = EstimatedAllocationSize(num_bytes); + ARROW_DCHECK(reinterpret_cast(buffer_->mutable_data() + top_)[-1] == + kGuard2); + ARROW_DCHECK(top_ >= size); + top_ -= size; + ARROW_DCHECK(reinterpret_cast(buffer_->mutable_data() + top_)[0] == + kGuard1); + --num_vectors_; +} + +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/compute/util_internal.h b/cpp/src/arrow/compute/util_internal.h index 87e89a33507..b560cddbf3b 100644 --- a/cpp/src/arrow/compute/util_internal.h +++ b/cpp/src/arrow/compute/util_internal.h @@ -17,6 +17,8 @@ #pragma once +#include "arrow/status.h" +#include "arrow/type_fwd.h" #include "arrow/util/logging.h" namespace arrow { @@ -27,5 +29,54 @@ void CheckAlignment(const void* ptr) { ARROW_DCHECK(reinterpret_cast(ptr) % sizeof(T) == 0); } +/// Storage used to allocate temporary vectors of a batch size. +/// Temporary vectors should resemble allocating temporary variables on the stack +/// but in the context of vectorized processing where we need to store a vector of +/// temporaries instead of a single value. +class ARROW_EXPORT TempVectorStack { + template + friend class TempVectorHolder; + + public: + Status Init(MemoryPool* pool, int64_t size); + + private: + static int64_t EstimatedAllocationSize(int64_t size) { + return PaddedAllocationSize(size) + 2 * sizeof(uint64_t); + } + + static int64_t PaddedAllocationSize(int64_t num_bytes); + + void alloc(uint32_t num_bytes, uint8_t** data, int* id); + void release(int id, uint32_t num_bytes); + static constexpr uint64_t kGuard1 = 0x3141592653589793ULL; + static constexpr uint64_t kGuard2 = 0x0577215664901532ULL; + static constexpr int64_t kPadding = 64; + int num_vectors_; + int64_t top_; + std::unique_ptr buffer_; + int64_t buffer_size_; +}; + +template +class TempVectorHolder { + friend class TempVectorStack; + + public: + ~TempVectorHolder() { stack_->release(id_, num_elements_ * sizeof(T)); } + T* mutable_data() { return reinterpret_cast(data_); } + TempVectorHolder(TempVectorStack* stack, uint32_t num_elements) { + stack_ = stack; + num_elements_ = num_elements; + stack_->alloc(num_elements * sizeof(T), &data_, &id_); + } + + private: + TempVectorStack* stack_; + uint8_t* data_; + int id_; + uint32_t num_elements_; +}; + } // namespace util } // namespace arrow From 3b3c771ad1e8bdd4ed75f5ad68813334ac4eb351 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 14 May 2024 02:38:06 +0800 Subject: [PATCH 18/20] Enhance comments about stack size constants --- cpp/src/arrow/compute/key_hash_internal.h | 15 +++++++++++---- cpp/src/arrow/compute/row/compare_internal.h | 7 +++++-- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/compute/key_hash_internal.h b/cpp/src/arrow/compute/key_hash_internal.h index 2b9c9455651..1f25beb0e16 100644 --- a/cpp/src/arrow/compute/key_hash_internal.h +++ b/cpp/src/arrow/compute/key_hash_internal.h @@ -48,8 +48,12 @@ class ARROW_EXPORT Hashing32 { static void HashMultiColumn(const std::vector& cols, LightContext* ctx, uint32_t* out_hash); - // Clarify the max temp stack usage for HashBatch so the caller could reserve enough - // size in advance. + // Clarify the max temp stack usage for HashBatch, which might be necessary for the + // caller to be aware of at compile time to reserve enough stack size in advance. The + // HashBatch implementation uses one uint32 temp vector as a buffer for hash, one uint16 + // temp vector as a buffer for null indices and one uint32 temp vector as a buffer for + // null hash, all are of size kMiniBatchLength. Plus extra kMiniBatchLength to cope with + // stack padding and aligning. static constexpr auto kHashBatchTempStackUsage = (sizeof(uint32_t) + sizeof(uint16_t) + sizeof(uint32_t) + /*extra=*/1) * util::MiniBatch::kMiniBatchLength; @@ -167,8 +171,11 @@ class ARROW_EXPORT Hashing64 { static void HashMultiColumn(const std::vector& cols, LightContext* ctx, uint64_t* hashes); - // Clarify the max temp stack usage for HashBatch so the caller could reserve enough - // size in advance. + // Clarify the max temp stack usage for HashBatch, which might be necessary for the + // caller to be aware of at compile time to reserve enough stack size in advance. The + // HashBatch implementation uses one uint16 temp vector as a buffer for null indices and + // one uint64 temp vector as a buffer for null hash, all are of size kMiniBatchLength. + // Plus extra kMiniBatchLength to cope with stack padding and aligning. static constexpr auto kHashBatchTempStackUsage = (sizeof(uint16_t) + sizeof(uint64_t) + /*extra=*/1) * util::MiniBatch::kMiniBatchLength; diff --git a/cpp/src/arrow/compute/row/compare_internal.h b/cpp/src/arrow/compute/row/compare_internal.h index e3e3f3c3b60..a5a109b0b51 100644 --- a/cpp/src/arrow/compute/row/compare_internal.h +++ b/cpp/src/arrow/compute/row/compare_internal.h @@ -32,8 +32,11 @@ namespace compute { class ARROW_EXPORT KeyCompare { public: - // Clarify the max temp stack usage for CompareColumnsToRows so the caller could reserve - // enough size in advance. + // Clarify the max temp stack usage for CompareColumnsToRows, which might be necessary + // for the caller to be aware of (possibly at compile time) to reserve enough stack size + // in advance. The CompareColumnsToRows implementation uses three uint8 temp vectors as + // buffers for match vectors, all are of size num_rows. Plus extra kMiniBatchLength to + // cope with stack padding and aligning. constexpr static int64_t CompareColumnsToRowsTempStackUsage(int64_t num_rows) { return (sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t)) * num_rows + /*extra=*/util::MiniBatch::kMiniBatchLength; From dcc3b7ab67428e6b8416107526a389e91798f19f Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Tue, 14 May 2024 22:31:10 +0800 Subject: [PATCH 19/20] Change many join test case to use int32 instead of int8 --- cpp/src/arrow/acero/hash_join_node_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/acero/hash_join_node_test.cc b/cpp/src/arrow/acero/hash_join_node_test.cc index e1968aecf63..215b1e4d211 100644 --- a/cpp/src/arrow/acero/hash_join_node_test.cc +++ b/cpp/src/arrow/acero/hash_join_node_test.cc @@ -3228,7 +3228,7 @@ TEST(HashJoin, ManyJoins) { ASSERT_OK_AND_ASSIGN( auto left_batches, MakeIntegerBatches({[](int row_id) -> int64_t { return row_id; }}, - schema({field("l_key", int8())}), + schema({field("l_key", int32())}), /*num_batches=*/1, /*batch_size=*/num_left_rows)); Declaration root{"exec_batch_source", ExecBatchSourceNodeOptions(std::move(left_batches.schema), @@ -3240,7 +3240,7 @@ TEST(HashJoin, ManyJoins) { for (int i = 0; i < num_joins; ++i) { ASSERT_OK_AND_ASSIGN(auto right_batches, MakeIntegerBatches({[i](int) -> int64_t { return i; }}, - schema({field("r_key", int8())}), + schema({field("r_key", int32())}), /*num_batches=*/1, /*batch_size=*/2)); Declaration table{"exec_batch_source", ExecBatchSourceNodeOptions(std::move(right_batches.schema), From 4821398a6c22051880862d0eb7d9d74fce50aa69 Mon Sep 17 00:00:00 2001 From: Ruoxi Sun Date: Wed, 15 May 2024 00:33:44 +0800 Subject: [PATCH 20/20] Address a remaining comment in #41352 --- cpp/src/arrow/compute/row/grouper.cc | 1 + cpp/src/arrow/compute/util_internal.h | 2 ++ 2 files changed, 3 insertions(+) diff --git a/cpp/src/arrow/compute/row/grouper.cc b/cpp/src/arrow/compute/row/grouper.cc index 50ca20bd14f..3ed5411d0ba 100644 --- a/cpp/src/arrow/compute/row/grouper.cc +++ b/cpp/src/arrow/compute/row/grouper.cc @@ -600,6 +600,7 @@ struct GrouperFastImpl : public Grouper { } Status Reset() override { + ARROW_DCHECK_EQ(temp_stack_.AllocatedSize(), 0); rows_.Clean(); rows_minibatch_.Clean(); map_.cleanup(); diff --git a/cpp/src/arrow/compute/util_internal.h b/cpp/src/arrow/compute/util_internal.h index b560cddbf3b..043ff118062 100644 --- a/cpp/src/arrow/compute/util_internal.h +++ b/cpp/src/arrow/compute/util_internal.h @@ -40,6 +40,8 @@ class ARROW_EXPORT TempVectorStack { public: Status Init(MemoryPool* pool, int64_t size); + int64_t AllocatedSize() const { return top_; } + private: static int64_t EstimatedAllocationSize(int64_t size) { return PaddedAllocationSize(size) + 2 * sizeof(uint64_t);