From 31756122cbaf8841164a918db1ea0c78f881ac76 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 26 Jan 2024 11:35:06 -0300 Subject: [PATCH 1/9] chunk_resolver.h: Document all invariants and required pre-conditions --- cpp/src/arrow/chunk_resolver.h | 78 +++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h index 818070ffe35..a5760948d13 100644 --- a/cpp/src/arrow/chunk_resolver.h +++ b/cpp/src/arrow/chunk_resolver.h @@ -18,25 +18,46 @@ #pragma once #include +#include #include #include #include "arrow/type_fwd.h" #include "arrow/util/macros.h" -namespace arrow { -namespace internal { +namespace arrow::internal { struct ChunkLocation { - int64_t chunk_index, index_in_chunk; + /// \brief Index of the chunk in the array of chunks + /// + /// The value is always in the range `[0, chunks.size()]`. `chunks.size()` is used + /// to represent out-of-bounds locations. + int64_t chunk_index; + + /// \brief Index of the value in the chunk + /// + /// The value is undefined if chunk_index >= chunks.size() + int64_t index_in_chunk; }; -// An object that resolves an array chunk depending on a logical index +/// \brief An utility that incrementally resolves logical indices into +/// physical indices in a chunked array. struct ARROW_EXPORT ChunkResolver { - explicit ChunkResolver(const ArrayVector& chunks); + private: + /// \brief Array containing `chunks.size() + 1` offsets. + /// + /// `offsets_[i]` is the starting logical index of chunk `i`. `offsets_[0]` is always 0 + /// and `offsets_[chunks.size()]` is the logical length of the chunked array. + std::vector offsets_; - explicit ChunkResolver(const std::vector& chunks); + /// \brief Cache of the index of the last resolved chunk. + /// + /// \invariant `cached_chunk_ in [0, chunks.size()]` + mutable std::atomic cached_chunk_; + public: + explicit ChunkResolver(const ArrayVector& chunks); + explicit ChunkResolver(const std::vector& chunks); explicit ChunkResolver(const RecordBatchVector& batches); ChunkResolver(ChunkResolver&& other) noexcept @@ -48,31 +69,49 @@ struct ARROW_EXPORT ChunkResolver { return *this; } - /// \brief Return a ChunkLocation containing the chunk index and in-chunk value index of - /// the chunked array at logical index + /// \brief Resolve a logical index to a ChunkLocation. + /// + /// The returned ChunkLocation contains the chunk index and the within-chunk index + /// equivalent to the logical index. + /// + /// \pre index >= 0 + /// \post location.chunk_index in [0, chunks.size()] + /// \param index The logical index to resolve + /// \return ChunkLocation with a valid chunk_index if index is within + /// bounds, or with chunk_index == chunks.size() if logical index is + /// `>= chunked_array.length()`. inline ChunkLocation Resolve(const int64_t index) const { - // It is common for the algorithms below to make consecutive accesses at - // a relatively small distance from each other, hence often falling in - // the same chunk. - // This is trivial when merging (assuming each side of the merge uses - // its own resolver), but also in the inner recursive invocations of + // It is common for algorithms sequentially processing arrays to make consecutive + // accesses at a relatively small distance from each other, hence often falling in the + // same chunk. + // + // This is guaranteed when merging (assuming each side of the merge uses its + // own resolver), and is the most common case in recursive invocations of // partitioning. if (offsets_.size() <= 1) { return {0, index}; } const auto cached_chunk = cached_chunk_.load(); + // XXX: the access below is unsafe because cached_chunk+1 can be out-of-bounds const bool cache_hit = (index >= offsets_[cached_chunk] && index < offsets_[cached_chunk + 1]); if (ARROW_PREDICT_TRUE(cache_hit)) { return {cached_chunk, index - offsets_[cached_chunk]}; } auto chunk_index = Bisect(index); + assert(chunk_index < static_cast(offsets_.size())); cached_chunk_.store(chunk_index); return {chunk_index, index - offsets_[chunk_index]}; } protected: - // Find the chunk index corresponding to a value index using binary search + /// \brief Find the index of the chunk that contains the logical index. + /// + /// Any non-negative index is accepted. + /// + /// \pre index >= 0 + /// \return Chunk index in `[0, chunks.size())` or `chunks.size()` if the logical index + /// is out-of-bounds. inline int64_t Bisect(const int64_t index) const { // Like std::upper_bound(), but hand-written as it can help the compiler. // Search [lo, lo + n) @@ -90,15 +129,6 @@ struct ARROW_EXPORT ChunkResolver { } return lo; } - - private: - // Collection of starting offsets used for binary search - std::vector offsets_; - - // Tracks the most recently used chunk index to allow fast - // access for consecutive indices corresponding to the same chunk - mutable std::atomic cached_chunk_; }; -} // namespace internal -} // namespace arrow +} // namespace arrow::internal From df62c75b07869cbc1f6b4edf27b85ea1cf9eb44d Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Wed, 24 Jan 2024 16:15:03 -0300 Subject: [PATCH 2/9] Cached binary search result operations only need atomicity --- cpp/src/arrow/chunk_resolver.h | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h index a5760948d13..3222a6bc81e 100644 --- a/cpp/src/arrow/chunk_resolver.h +++ b/cpp/src/arrow/chunk_resolver.h @@ -61,11 +61,12 @@ struct ARROW_EXPORT ChunkResolver { explicit ChunkResolver(const RecordBatchVector& batches); ChunkResolver(ChunkResolver&& other) noexcept - : offsets_(std::move(other.offsets_)), cached_chunk_(other.cached_chunk_.load()) {} + : offsets_(std::move(other.offsets_)), + cached_chunk_(other.cached_chunk_.load(std::memory_order_relaxed)) {} ChunkResolver& operator=(ChunkResolver&& other) { offsets_ = std::move(other.offsets_); - cached_chunk_.store(other.cached_chunk_.load()); + cached_chunk_.store(other.cached_chunk_.load(std::memory_order_relaxed)); return *this; } @@ -91,7 +92,7 @@ struct ARROW_EXPORT ChunkResolver { if (offsets_.size() <= 1) { return {0, index}; } - const auto cached_chunk = cached_chunk_.load(); + const auto cached_chunk = cached_chunk_.load(std::memory_order_relaxed); // XXX: the access below is unsafe because cached_chunk+1 can be out-of-bounds const bool cache_hit = (index >= offsets_[cached_chunk] && index < offsets_[cached_chunk + 1]); @@ -100,7 +101,7 @@ struct ARROW_EXPORT ChunkResolver { } auto chunk_index = Bisect(index); assert(chunk_index < static_cast(offsets_.size())); - cached_chunk_.store(chunk_index); + cached_chunk_.store(chunk_index, std::memory_order_relaxed); return {chunk_index, index - offsets_[chunk_index]}; } From 2e17fed911615a382a9a76d9f2316f1e3c6f3fa3 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 26 Jan 2024 01:13:14 -0300 Subject: [PATCH 3/9] Prepare Bisect to take sub-range parameters --- cpp/src/arrow/chunk_resolver.h | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h index 3222a6bc81e..abdb66638cb 100644 --- a/cpp/src/arrow/chunk_resolver.h +++ b/cpp/src/arrow/chunk_resolver.h @@ -89,39 +89,42 @@ struct ARROW_EXPORT ChunkResolver { // This is guaranteed when merging (assuming each side of the merge uses its // own resolver), and is the most common case in recursive invocations of // partitioning. - if (offsets_.size() <= 1) { + const auto num_offsets = static_cast(offsets_.size()); + if (num_offsets <= 1) { return {0, index}; } + const int64_t* offsets = offsets_.data(); const auto cached_chunk = cached_chunk_.load(std::memory_order_relaxed); // XXX: the access below is unsafe because cached_chunk+1 can be out-of-bounds const bool cache_hit = - (index >= offsets_[cached_chunk] && index < offsets_[cached_chunk + 1]); + (index >= offsets[cached_chunk] && index < offsets[cached_chunk + 1]); if (ARROW_PREDICT_TRUE(cache_hit)) { - return {cached_chunk, index - offsets_[cached_chunk]}; + return {cached_chunk, index - offsets[cached_chunk]}; } - auto chunk_index = Bisect(index); + const auto chunk_index = Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets); assert(chunk_index < static_cast(offsets_.size())); cached_chunk_.store(chunk_index, std::memory_order_relaxed); - return {chunk_index, index - offsets_[chunk_index]}; + return {chunk_index, index - offsets[chunk_index]}; } - protected: + private: /// \brief Find the index of the chunk that contains the logical index. /// - /// Any non-negative index is accepted. + /// Any non-negative index is accepted. When `hi=num_offsets`, the largest + /// possible return value is `num_offsets-1` which is equal to + /// `chunks.size()`. The is returned when the logical index is out-of-bounds. /// /// \pre index >= 0 - /// \return Chunk index in `[0, chunks.size())` or `chunks.size()` if the logical index - /// is out-of-bounds. - inline int64_t Bisect(const int64_t index) const { + /// \pre lo >= 0 && hi <= offsets_.size() + /// \return `lo` if `lo == hi` or otherwise a chunk index in `[lo, hi)`. + static inline int64_t Bisect(int64_t index, const int64_t* offsets, int64_t lo, + int64_t hi) { // Like std::upper_bound(), but hand-written as it can help the compiler. - // Search [lo, lo + n) - int64_t lo = 0; - auto n = static_cast(offsets_.size()); + auto n = hi - lo; while (n > 1) { const int64_t m = n >> 1; const int64_t mid = lo + m; - if (static_cast(index) >= offsets_[mid]) { + if (index >= offsets[mid]) { lo = mid; n -= m; } else { From 655f8d59e38438db785f45cf98cf0647869bee29 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 26 Jan 2024 01:16:02 -0300 Subject: [PATCH 4/9] Dispatch load ASAP --- cpp/src/arrow/chunk_resolver.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h index abdb66638cb..9cd58afcf98 100644 --- a/cpp/src/arrow/chunk_resolver.h +++ b/cpp/src/arrow/chunk_resolver.h @@ -89,12 +89,12 @@ struct ARROW_EXPORT ChunkResolver { // This is guaranteed when merging (assuming each side of the merge uses its // own resolver), and is the most common case in recursive invocations of // partitioning. + const auto cached_chunk = cached_chunk_.load(std::memory_order_relaxed); const auto num_offsets = static_cast(offsets_.size()); if (num_offsets <= 1) { return {0, index}; } const int64_t* offsets = offsets_.data(); - const auto cached_chunk = cached_chunk_.load(std::memory_order_relaxed); // XXX: the access below is unsafe because cached_chunk+1 can be out-of-bounds const bool cache_hit = (index >= offsets[cached_chunk] && index < offsets[cached_chunk + 1]); From b899626fec397756a4a0c786277da3ea372ce080 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 26 Jan 2024 19:33:01 -0300 Subject: [PATCH 5/9] Fix out-of-bounds read bug present on main Reproduction: // on the first out-of-bounds query, chunks.size() is cached as // cached_chunk_. Resolve(chunked_array->length()); // on the second out-of-bounds query, chunks.size() is loaded from // cached_chunk_ and... Resolve(chunked_array->length()); ...even though offsets[cached_chunk] is a valid access because offsets.size() == chunks.size()+1, offsets[cached_chunk + 1] is not because that's is equivalent to chunks.size() + 2 which is out of bounds for offsets. --- cpp/src/arrow/chunk_resolver.h | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h index 9cd58afcf98..56a8a4bf3d6 100644 --- a/cpp/src/arrow/chunk_resolver.h +++ b/cpp/src/arrow/chunk_resolver.h @@ -95,10 +95,8 @@ struct ARROW_EXPORT ChunkResolver { return {0, index}; } const int64_t* offsets = offsets_.data(); - // XXX: the access below is unsafe because cached_chunk+1 can be out-of-bounds - const bool cache_hit = - (index >= offsets[cached_chunk] && index < offsets[cached_chunk + 1]); - if (ARROW_PREDICT_TRUE(cache_hit)) { + if (ARROW_PREDICT_TRUE(index >= offsets[cached_chunk]) && + (cached_chunk + 1 == num_offsets || index < offsets[cached_chunk + 1])) { return {cached_chunk, index - offsets[cached_chunk]}; } const auto chunk_index = Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets); From 6a13c388274ad31c7f960b61eece5c34f11cb5e4 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 26 Jan 2024 21:56:27 -0300 Subject: [PATCH 6/9] Remove the now unecessary num_offsets<=1 check --- cpp/src/arrow/chunk_resolver.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h index 56a8a4bf3d6..ecb80ddc06a 100644 --- a/cpp/src/arrow/chunk_resolver.h +++ b/cpp/src/arrow/chunk_resolver.h @@ -91,9 +91,6 @@ struct ARROW_EXPORT ChunkResolver { // partitioning. const auto cached_chunk = cached_chunk_.load(std::memory_order_relaxed); const auto num_offsets = static_cast(offsets_.size()); - if (num_offsets <= 1) { - return {0, index}; - } const int64_t* offsets = offsets_.data(); if (ARROW_PREDICT_TRUE(index >= offsets[cached_chunk]) && (cached_chunk + 1 == num_offsets || index < offsets[cached_chunk + 1])) { From 17e8b9493b1b3c915d4889c873e7159bd36816b2 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 26 Jan 2024 22:13:11 -0300 Subject: [PATCH 7/9] Remove first binary search branch --- cpp/src/arrow/chunk_resolver.h | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h index ecb80ddc06a..80c6a6ff709 100644 --- a/cpp/src/arrow/chunk_resolver.h +++ b/cpp/src/arrow/chunk_resolver.h @@ -96,6 +96,7 @@ struct ARROW_EXPORT ChunkResolver { (cached_chunk + 1 == num_offsets || index < offsets[cached_chunk + 1])) { return {cached_chunk, index - offsets[cached_chunk]}; } + // lo < hi is guaranteed by `num_offsets = chunks.size() + 1` const auto chunk_index = Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets); assert(chunk_index < static_cast(offsets_.size())); cached_chunk_.store(chunk_index, std::memory_order_relaxed); @@ -110,13 +111,17 @@ struct ARROW_EXPORT ChunkResolver { /// `chunks.size()`. The is returned when the logical index is out-of-bounds. /// /// \pre index >= 0 + /// \pre lo < hi /// \pre lo >= 0 && hi <= offsets_.size() - /// \return `lo` if `lo == hi` or otherwise a chunk index in `[lo, hi)`. static inline int64_t Bisect(int64_t index, const int64_t* offsets, int64_t lo, int64_t hi) { - // Like std::upper_bound(), but hand-written as it can help the compiler. + // Similar to std::upper_bound(), but slightly different as our offsets + // array always starts with 0. auto n = hi - lo; - while (n > 1) { + // First iteration does not need to check for n > 1 + // (lo < hi is guaranteed by the precondition). + assert(n > 1 && "lo < hi is a precondition of Bisect"); + do { const int64_t m = n >> 1; const int64_t mid = lo + m; if (index >= offsets[mid]) { @@ -125,7 +130,7 @@ struct ARROW_EXPORT ChunkResolver { } else { n = m; } - } + } while (n > 1); return lo; } }; From e5b2c1bf041844beeb5c268ee5e0a1918359b864 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 26 Jan 2024 23:13:19 -0300 Subject: [PATCH 8/9] Add the ChunkResolver::ResolveWithChunkIndexHint() function This allows callers to keep the cached chunk index hint in a local variable (register) instead of relying on the in-memory cached_chunk_ member variable of ChunkResolver. --- cpp/src/arrow/chunk_resolver.h | 44 ++++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h index 80c6a6ff709..d3ae315568d 100644 --- a/cpp/src/arrow/chunk_resolver.h +++ b/cpp/src/arrow/chunk_resolver.h @@ -81,7 +81,37 @@ struct ARROW_EXPORT ChunkResolver { /// \return ChunkLocation with a valid chunk_index if index is within /// bounds, or with chunk_index == chunks.size() if logical index is /// `>= chunked_array.length()`. - inline ChunkLocation Resolve(const int64_t index) const { + inline ChunkLocation Resolve(int64_t index) const { + const auto cached_chunk = cached_chunk_.load(std::memory_order_relaxed); + const auto chunk_index = + ResolveChunkIndex(index, cached_chunk); + return {chunk_index, index - offsets_[chunk_index]}; + } + + /// \brief Resolve a logical index to a ChunkLocation. + /// + /// The returned ChunkLocation contains the chunk index and the within-chunk index + /// equivalent to the logical index. + /// + /// \pre index >= 0 + /// \post location.chunk_index in [0, chunks.size()] + /// \param index The logical index to resolve + /// \param cached_chunk_index 0 or the chunk_index of the last ChunkLocation + /// returned by this ChunkResolver. + /// \return ChunkLocation with a valid chunk_index if index is within + /// bounds, or with chunk_index == chunks.size() if logical index is + /// `>= chunked_array.length()`. + inline ChunkLocation ResolveWithChunkIndexHint(int64_t index, + int64_t cached_chunk_index) const { + assert(cached_chunk_index < static_cast(offsets_.size())); + const auto chunk_index = + ResolveChunkIndex(index, cached_chunk_index); + return {chunk_index, index - offsets_[chunk_index]}; + } + + private: + template + inline int64_t ResolveChunkIndex(int64_t index, int64_t cached_chunk) const { // It is common for algorithms sequentially processing arrays to make consecutive // accesses at a relatively small distance from each other, hence often falling in the // same chunk. @@ -89,21 +119,21 @@ struct ARROW_EXPORT ChunkResolver { // This is guaranteed when merging (assuming each side of the merge uses its // own resolver), and is the most common case in recursive invocations of // partitioning. - const auto cached_chunk = cached_chunk_.load(std::memory_order_relaxed); const auto num_offsets = static_cast(offsets_.size()); const int64_t* offsets = offsets_.data(); if (ARROW_PREDICT_TRUE(index >= offsets[cached_chunk]) && (cached_chunk + 1 == num_offsets || index < offsets[cached_chunk + 1])) { - return {cached_chunk, index - offsets[cached_chunk]}; + return cached_chunk; } // lo < hi is guaranteed by `num_offsets = chunks.size() + 1` const auto chunk_index = Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets); - assert(chunk_index < static_cast(offsets_.size())); - cached_chunk_.store(chunk_index, std::memory_order_relaxed); - return {chunk_index, index - offsets[chunk_index]}; + if constexpr (StoreCachedChunk) { + assert(chunk_index < static_cast(offsets_.size())); + cached_chunk_.store(chunk_index, std::memory_order_relaxed); + } + return chunk_index; } - private: /// \brief Find the index of the chunk that contains the logical index. /// /// Any non-negative index is accepted. When `hi=num_offsets`, the largest From a3923536046581ce9b1413b0a889ed50787db55d Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 26 Jan 2024 23:15:57 -0300 Subject: [PATCH 9/9] sort: Use ResolveWithChunkIndexHint() --- cpp/src/arrow/compute/kernels/vector_sort.cc | 25 +++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_sort.cc b/cpp/src/arrow/compute/kernels/vector_sort.cc index e08a2bc1037..d3914173b65 100644 --- a/cpp/src/arrow/compute/kernels/vector_sort.cc +++ b/cpp/src/arrow/compute/kernels/vector_sort.cc @@ -24,6 +24,7 @@ namespace arrow { using internal::checked_cast; +using internal::ChunkLocation; namespace compute { namespace internal { @@ -748,11 +749,15 @@ class TableSorter { auto& comparator = comparator_; const auto& first_sort_key = sort_keys_[0]; + ChunkLocation left_loc{0, 0}; + ChunkLocation right_loc{0, 0}; std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices, [&](uint64_t left, uint64_t right) { // First column is either null or nan - const auto left_loc = left_resolver_.Resolve(left); - const auto right_loc = right_resolver_.Resolve(right); + left_loc = + left_resolver_.ResolveWithChunkIndexHint(left, left_loc.chunk_index); + right_loc = right_resolver_.ResolveWithChunkIndexHint( + right, right_loc.chunk_index); auto chunk_left = first_sort_key.GetChunk(left_loc); auto chunk_right = first_sort_key.GetChunk(right_loc); const auto left_is_null = chunk_left.IsNull(); @@ -783,11 +788,15 @@ class TableSorter { // Untyped implementation auto& comparator = comparator_; + ChunkLocation left_loc{0, 0}; + ChunkLocation right_loc{0, 0}; std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices, [&](uint64_t left, uint64_t right) { // First column is always null - const auto left_loc = left_resolver_.Resolve(left); - const auto right_loc = right_resolver_.Resolve(right); + left_loc = + left_resolver_.ResolveWithChunkIndexHint(left, left_loc.chunk_index); + right_loc = right_resolver_.ResolveWithChunkIndexHint( + right, right_loc.chunk_index); return comparator.Compare(left_loc, right_loc, 1); }); // Copy back temp area into main buffer @@ -807,11 +816,15 @@ class TableSorter { auto& comparator = comparator_; const auto& first_sort_key = sort_keys_[0]; + ChunkLocation left_loc{0, 0}; + ChunkLocation right_loc{0, 0}; std::merge(range_begin, range_middle, range_middle, range_end, temp_indices, [&](uint64_t left, uint64_t right) { // Both values are never null nor NaN. - const auto left_loc = left_resolver_.Resolve(left); - const auto right_loc = right_resolver_.Resolve(right); + left_loc = + left_resolver_.ResolveWithChunkIndexHint(left, left_loc.chunk_index); + right_loc = right_resolver_.ResolveWithChunkIndexHint( + right, right_loc.chunk_index); auto chunk_left = first_sort_key.GetChunk(left_loc); auto chunk_right = first_sort_key.GetChunk(right_loc); DCHECK(!chunk_left.IsNull());