diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h index 818070ffe35..d3ae315568d 100644 --- a/cpp/src/arrow/chunk_resolver.h +++ b/cpp/src/arrow/chunk_resolver.h @@ -18,87 +18,151 @@ #pragma once #include +#include #include #include #include "arrow/type_fwd.h" #include "arrow/util/macros.h" -namespace arrow { -namespace internal { +namespace arrow::internal { struct ChunkLocation { - int64_t chunk_index, index_in_chunk; + /// \brief Index of the chunk in the array of chunks + /// + /// The value is always in the range `[0, chunks.size()]`. `chunks.size()` is used + /// to represent out-of-bounds locations. + int64_t chunk_index; + + /// \brief Index of the value in the chunk + /// + /// The value is undefined if chunk_index >= chunks.size() + int64_t index_in_chunk; }; -// An object that resolves an array chunk depending on a logical index +/// \brief An utility that incrementally resolves logical indices into +/// physical indices in a chunked array. struct ARROW_EXPORT ChunkResolver { - explicit ChunkResolver(const ArrayVector& chunks); + private: + /// \brief Array containing `chunks.size() + 1` offsets. + /// + /// `offsets_[i]` is the starting logical index of chunk `i`. `offsets_[0]` is always 0 + /// and `offsets_[chunks.size()]` is the logical length of the chunked array. + std::vector offsets_; - explicit ChunkResolver(const std::vector& chunks); + /// \brief Cache of the index of the last resolved chunk. + /// + /// \invariant `cached_chunk_ in [0, chunks.size()]` + mutable std::atomic cached_chunk_; + public: + explicit ChunkResolver(const ArrayVector& chunks); + explicit ChunkResolver(const std::vector& chunks); explicit ChunkResolver(const RecordBatchVector& batches); ChunkResolver(ChunkResolver&& other) noexcept - : offsets_(std::move(other.offsets_)), cached_chunk_(other.cached_chunk_.load()) {} + : offsets_(std::move(other.offsets_)), + cached_chunk_(other.cached_chunk_.load(std::memory_order_relaxed)) {} ChunkResolver& operator=(ChunkResolver&& other) { offsets_ = std::move(other.offsets_); - cached_chunk_.store(other.cached_chunk_.load()); + cached_chunk_.store(other.cached_chunk_.load(std::memory_order_relaxed)); return *this; } - /// \brief Return a ChunkLocation containing the chunk index and in-chunk value index of - /// the chunked array at logical index - inline ChunkLocation Resolve(const int64_t index) const { - // It is common for the algorithms below to make consecutive accesses at - // a relatively small distance from each other, hence often falling in - // the same chunk. - // This is trivial when merging (assuming each side of the merge uses - // its own resolver), but also in the inner recursive invocations of + /// \brief Resolve a logical index to a ChunkLocation. + /// + /// The returned ChunkLocation contains the chunk index and the within-chunk index + /// equivalent to the logical index. + /// + /// \pre index >= 0 + /// \post location.chunk_index in [0, chunks.size()] + /// \param index The logical index to resolve + /// \return ChunkLocation with a valid chunk_index if index is within + /// bounds, or with chunk_index == chunks.size() if logical index is + /// `>= chunked_array.length()`. + inline ChunkLocation Resolve(int64_t index) const { + const auto cached_chunk = cached_chunk_.load(std::memory_order_relaxed); + const auto chunk_index = + ResolveChunkIndex(index, cached_chunk); + return {chunk_index, index - offsets_[chunk_index]}; + } + + /// \brief Resolve a logical index to a ChunkLocation. + /// + /// The returned ChunkLocation contains the chunk index and the within-chunk index + /// equivalent to the logical index. + /// + /// \pre index >= 0 + /// \post location.chunk_index in [0, chunks.size()] + /// \param index The logical index to resolve + /// \param cached_chunk_index 0 or the chunk_index of the last ChunkLocation + /// returned by this ChunkResolver. + /// \return ChunkLocation with a valid chunk_index if index is within + /// bounds, or with chunk_index == chunks.size() if logical index is + /// `>= chunked_array.length()`. + inline ChunkLocation ResolveWithChunkIndexHint(int64_t index, + int64_t cached_chunk_index) const { + assert(cached_chunk_index < static_cast(offsets_.size())); + const auto chunk_index = + ResolveChunkIndex(index, cached_chunk_index); + return {chunk_index, index - offsets_[chunk_index]}; + } + + private: + template + inline int64_t ResolveChunkIndex(int64_t index, int64_t cached_chunk) const { + // It is common for algorithms sequentially processing arrays to make consecutive + // accesses at a relatively small distance from each other, hence often falling in the + // same chunk. + // + // This is guaranteed when merging (assuming each side of the merge uses its + // own resolver), and is the most common case in recursive invocations of // partitioning. - if (offsets_.size() <= 1) { - return {0, index}; + const auto num_offsets = static_cast(offsets_.size()); + const int64_t* offsets = offsets_.data(); + if (ARROW_PREDICT_TRUE(index >= offsets[cached_chunk]) && + (cached_chunk + 1 == num_offsets || index < offsets[cached_chunk + 1])) { + return cached_chunk; } - const auto cached_chunk = cached_chunk_.load(); - const bool cache_hit = - (index >= offsets_[cached_chunk] && index < offsets_[cached_chunk + 1]); - if (ARROW_PREDICT_TRUE(cache_hit)) { - return {cached_chunk, index - offsets_[cached_chunk]}; + // lo < hi is guaranteed by `num_offsets = chunks.size() + 1` + const auto chunk_index = Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets); + if constexpr (StoreCachedChunk) { + assert(chunk_index < static_cast(offsets_.size())); + cached_chunk_.store(chunk_index, std::memory_order_relaxed); } - auto chunk_index = Bisect(index); - cached_chunk_.store(chunk_index); - return {chunk_index, index - offsets_[chunk_index]}; + return chunk_index; } - protected: - // Find the chunk index corresponding to a value index using binary search - inline int64_t Bisect(const int64_t index) const { - // Like std::upper_bound(), but hand-written as it can help the compiler. - // Search [lo, lo + n) - int64_t lo = 0; - auto n = static_cast(offsets_.size()); - while (n > 1) { + /// \brief Find the index of the chunk that contains the logical index. + /// + /// Any non-negative index is accepted. When `hi=num_offsets`, the largest + /// possible return value is `num_offsets-1` which is equal to + /// `chunks.size()`. The is returned when the logical index is out-of-bounds. + /// + /// \pre index >= 0 + /// \pre lo < hi + /// \pre lo >= 0 && hi <= offsets_.size() + static inline int64_t Bisect(int64_t index, const int64_t* offsets, int64_t lo, + int64_t hi) { + // Similar to std::upper_bound(), but slightly different as our offsets + // array always starts with 0. + auto n = hi - lo; + // First iteration does not need to check for n > 1 + // (lo < hi is guaranteed by the precondition). + assert(n > 1 && "lo < hi is a precondition of Bisect"); + do { const int64_t m = n >> 1; const int64_t mid = lo + m; - if (static_cast(index) >= offsets_[mid]) { + if (index >= offsets[mid]) { lo = mid; n -= m; } else { n = m; } - } + } while (n > 1); return lo; } - - private: - // Collection of starting offsets used for binary search - std::vector offsets_; - - // Tracks the most recently used chunk index to allow fast - // access for consecutive indices corresponding to the same chunk - mutable std::atomic cached_chunk_; }; -} // namespace internal -} // namespace arrow +} // namespace arrow::internal diff --git a/cpp/src/arrow/compute/kernels/vector_sort.cc b/cpp/src/arrow/compute/kernels/vector_sort.cc index e08a2bc1037..d3914173b65 100644 --- a/cpp/src/arrow/compute/kernels/vector_sort.cc +++ b/cpp/src/arrow/compute/kernels/vector_sort.cc @@ -24,6 +24,7 @@ namespace arrow { using internal::checked_cast; +using internal::ChunkLocation; namespace compute { namespace internal { @@ -748,11 +749,15 @@ class TableSorter { auto& comparator = comparator_; const auto& first_sort_key = sort_keys_[0]; + ChunkLocation left_loc{0, 0}; + ChunkLocation right_loc{0, 0}; std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices, [&](uint64_t left, uint64_t right) { // First column is either null or nan - const auto left_loc = left_resolver_.Resolve(left); - const auto right_loc = right_resolver_.Resolve(right); + left_loc = + left_resolver_.ResolveWithChunkIndexHint(left, left_loc.chunk_index); + right_loc = right_resolver_.ResolveWithChunkIndexHint( + right, right_loc.chunk_index); auto chunk_left = first_sort_key.GetChunk(left_loc); auto chunk_right = first_sort_key.GetChunk(right_loc); const auto left_is_null = chunk_left.IsNull(); @@ -783,11 +788,15 @@ class TableSorter { // Untyped implementation auto& comparator = comparator_; + ChunkLocation left_loc{0, 0}; + ChunkLocation right_loc{0, 0}; std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices, [&](uint64_t left, uint64_t right) { // First column is always null - const auto left_loc = left_resolver_.Resolve(left); - const auto right_loc = right_resolver_.Resolve(right); + left_loc = + left_resolver_.ResolveWithChunkIndexHint(left, left_loc.chunk_index); + right_loc = right_resolver_.ResolveWithChunkIndexHint( + right, right_loc.chunk_index); return comparator.Compare(left_loc, right_loc, 1); }); // Copy back temp area into main buffer @@ -807,11 +816,15 @@ class TableSorter { auto& comparator = comparator_; const auto& first_sort_key = sort_keys_[0]; + ChunkLocation left_loc{0, 0}; + ChunkLocation right_loc{0, 0}; std::merge(range_begin, range_middle, range_middle, range_end, temp_indices, [&](uint64_t left, uint64_t right) { // Both values are never null nor NaN. - const auto left_loc = left_resolver_.Resolve(left); - const auto right_loc = right_resolver_.Resolve(right); + left_loc = + left_resolver_.ResolveWithChunkIndexHint(left, left_loc.chunk_index); + right_loc = right_resolver_.ResolveWithChunkIndexHint( + right, right_loc.chunk_index); auto chunk_left = first_sort_key.GetChunk(left_loc); auto chunk_right = first_sort_key.GetChunk(right_loc); DCHECK(!chunk_left.IsNull());