Skip to content
156 changes: 110 additions & 46 deletions cpp/src/arrow/chunk_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,87 +18,151 @@
#pragma once

#include <atomic>
#include <cassert>
#include <cstdint>
#include <vector>

#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"

namespace arrow {
namespace internal {
namespace arrow::internal {

struct ChunkLocation {
int64_t chunk_index, index_in_chunk;
/// \brief Index of the chunk in the array of chunks
///
/// The value is always in the range `[0, chunks.size()]`. `chunks.size()` is used
/// to represent out-of-bounds locations.
int64_t chunk_index;

/// \brief Index of the value in the chunk
///
/// The value is undefined if chunk_index >= chunks.size()
int64_t index_in_chunk;
};

// An object that resolves an array chunk depending on a logical index
/// \brief An utility that incrementally resolves logical indices into
/// physical indices in a chunked array.
struct ARROW_EXPORT ChunkResolver {
explicit ChunkResolver(const ArrayVector& chunks);
private:
/// \brief Array containing `chunks.size() + 1` offsets.
///
/// `offsets_[i]` is the starting logical index of chunk `i`. `offsets_[0]` is always 0
/// and `offsets_[chunks.size()]` is the logical length of the chunked array.
std::vector<int64_t> offsets_;

explicit ChunkResolver(const std::vector<const Array*>& chunks);
/// \brief Cache of the index of the last resolved chunk.
///
/// \invariant `cached_chunk_ in [0, chunks.size()]`
mutable std::atomic<int64_t> cached_chunk_;

public:
explicit ChunkResolver(const ArrayVector& chunks);
explicit ChunkResolver(const std::vector<const Array*>& chunks);
explicit ChunkResolver(const RecordBatchVector& batches);

ChunkResolver(ChunkResolver&& other) noexcept
: offsets_(std::move(other.offsets_)), cached_chunk_(other.cached_chunk_.load()) {}
: offsets_(std::move(other.offsets_)),
cached_chunk_(other.cached_chunk_.load(std::memory_order_relaxed)) {}

ChunkResolver& operator=(ChunkResolver&& other) {
offsets_ = std::move(other.offsets_);
cached_chunk_.store(other.cached_chunk_.load());
cached_chunk_.store(other.cached_chunk_.load(std::memory_order_relaxed));
return *this;
}

/// \brief Return a ChunkLocation containing the chunk index and in-chunk value index of
/// the chunked array at logical index
inline ChunkLocation Resolve(const int64_t index) const {
// It is common for the algorithms below to make consecutive accesses at
// a relatively small distance from each other, hence often falling in
// the same chunk.
// This is trivial when merging (assuming each side of the merge uses
// its own resolver), but also in the inner recursive invocations of
/// \brief Resolve a logical index to a ChunkLocation.
///
/// The returned ChunkLocation contains the chunk index and the within-chunk index
/// equivalent to the logical index.
///
/// \pre index >= 0
/// \post location.chunk_index in [0, chunks.size()]
/// \param index The logical index to resolve
/// \return ChunkLocation with a valid chunk_index if index is within
/// bounds, or with chunk_index == chunks.size() if logical index is
/// `>= chunked_array.length()`.
inline ChunkLocation Resolve(int64_t index) const {
const auto cached_chunk = cached_chunk_.load(std::memory_order_relaxed);
const auto chunk_index =
ResolveChunkIndex</*StoreCachedChunk=*/true>(index, cached_chunk);
return {chunk_index, index - offsets_[chunk_index]};
}

/// \brief Resolve a logical index to a ChunkLocation.
///
/// The returned ChunkLocation contains the chunk index and the within-chunk index
/// equivalent to the logical index.
///
/// \pre index >= 0
/// \post location.chunk_index in [0, chunks.size()]
/// \param index The logical index to resolve
/// \param cached_chunk_index 0 or the chunk_index of the last ChunkLocation
/// returned by this ChunkResolver.
/// \return ChunkLocation with a valid chunk_index if index is within
/// bounds, or with chunk_index == chunks.size() if logical index is
/// `>= chunked_array.length()`.
inline ChunkLocation ResolveWithChunkIndexHint(int64_t index,
int64_t cached_chunk_index) const {
assert(cached_chunk_index < static_cast<int64_t>(offsets_.size()));
const auto chunk_index =
ResolveChunkIndex</*StoreCachedChunk=*/false>(index, cached_chunk_index);
return {chunk_index, index - offsets_[chunk_index]};
}

private:
template <bool StoreCachedChunk>
inline int64_t ResolveChunkIndex(int64_t index, int64_t cached_chunk) const {
// It is common for algorithms sequentially processing arrays to make consecutive
// accesses at a relatively small distance from each other, hence often falling in the
// same chunk.
//
// This is guaranteed when merging (assuming each side of the merge uses its
// own resolver), and is the most common case in recursive invocations of
// partitioning.
if (offsets_.size() <= 1) {
return {0, index};
const auto num_offsets = static_cast<int64_t>(offsets_.size());
const int64_t* offsets = offsets_.data();
if (ARROW_PREDICT_TRUE(index >= offsets[cached_chunk]) &&
(cached_chunk + 1 == num_offsets || index < offsets[cached_chunk + 1])) {
return cached_chunk;
}
const auto cached_chunk = cached_chunk_.load();
const bool cache_hit =
(index >= offsets_[cached_chunk] && index < offsets_[cached_chunk + 1]);
if (ARROW_PREDICT_TRUE(cache_hit)) {
return {cached_chunk, index - offsets_[cached_chunk]};
// lo < hi is guaranteed by `num_offsets = chunks.size() + 1`
const auto chunk_index = Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets);
if constexpr (StoreCachedChunk) {
assert(chunk_index < static_cast<int64_t>(offsets_.size()));
cached_chunk_.store(chunk_index, std::memory_order_relaxed);
}
auto chunk_index = Bisect(index);
cached_chunk_.store(chunk_index);
return {chunk_index, index - offsets_[chunk_index]};
return chunk_index;
}

protected:
// Find the chunk index corresponding to a value index using binary search
inline int64_t Bisect(const int64_t index) const {
// Like std::upper_bound(), but hand-written as it can help the compiler.
// Search [lo, lo + n)
int64_t lo = 0;
auto n = static_cast<int64_t>(offsets_.size());
while (n > 1) {
/// \brief Find the index of the chunk that contains the logical index.
///
/// Any non-negative index is accepted. When `hi=num_offsets`, the largest
/// possible return value is `num_offsets-1` which is equal to
/// `chunks.size()`. The is returned when the logical index is out-of-bounds.
///
/// \pre index >= 0
/// \pre lo < hi
/// \pre lo >= 0 && hi <= offsets_.size()
static inline int64_t Bisect(int64_t index, const int64_t* offsets, int64_t lo,
int64_t hi) {
// Similar to std::upper_bound(), but slightly different as our offsets
// array always starts with 0.
auto n = hi - lo;
// First iteration does not need to check for n > 1
// (lo < hi is guaranteed by the precondition).
assert(n > 1 && "lo < hi is a precondition of Bisect");
do {
const int64_t m = n >> 1;
const int64_t mid = lo + m;
if (static_cast<int64_t>(index) >= offsets_[mid]) {
if (index >= offsets[mid]) {
lo = mid;
n -= m;
} else {
n = m;
}
}
} while (n > 1);
return lo;
}

private:
// Collection of starting offsets used for binary search
std::vector<int64_t> offsets_;

// Tracks the most recently used chunk index to allow fast
// access for consecutive indices corresponding to the same chunk
mutable std::atomic<int64_t> cached_chunk_;
};

} // namespace internal
} // namespace arrow
} // namespace arrow::internal
25 changes: 19 additions & 6 deletions cpp/src/arrow/compute/kernels/vector_sort.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
namespace arrow {

using internal::checked_cast;
using internal::ChunkLocation;

namespace compute {
namespace internal {
Expand Down Expand Up @@ -748,11 +749,15 @@ class TableSorter {
auto& comparator = comparator_;
const auto& first_sort_key = sort_keys_[0];

ChunkLocation left_loc{0, 0};
ChunkLocation right_loc{0, 0};
std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices,
[&](uint64_t left, uint64_t right) {
// First column is either null or nan
const auto left_loc = left_resolver_.Resolve(left);
const auto right_loc = right_resolver_.Resolve(right);
left_loc =
left_resolver_.ResolveWithChunkIndexHint(left, left_loc.chunk_index);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of this is to avoid all atomic accesses, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory access really: keeping the ChunkLocation::chunk_index in registers throughout the loop -- of course it depends on the register allocation and inlining of ResolveWithChunkIndexHint. It improved the sort benchmarks slightly.

right_loc = right_resolver_.ResolveWithChunkIndexHint(
right, right_loc.chunk_index);
auto chunk_left = first_sort_key.GetChunk<ArrayType>(left_loc);
auto chunk_right = first_sort_key.GetChunk<ArrayType>(right_loc);
const auto left_is_null = chunk_left.IsNull();
Expand Down Expand Up @@ -783,11 +788,15 @@ class TableSorter {
// Untyped implementation
auto& comparator = comparator_;

ChunkLocation left_loc{0, 0};
ChunkLocation right_loc{0, 0};
std::merge(nulls_begin, nulls_middle, nulls_middle, nulls_end, temp_indices,
[&](uint64_t left, uint64_t right) {
// First column is always null
const auto left_loc = left_resolver_.Resolve(left);
const auto right_loc = right_resolver_.Resolve(right);
left_loc =
left_resolver_.ResolveWithChunkIndexHint(left, left_loc.chunk_index);
right_loc = right_resolver_.ResolveWithChunkIndexHint(
right, right_loc.chunk_index);
return comparator.Compare(left_loc, right_loc, 1);
});
// Copy back temp area into main buffer
Expand All @@ -807,11 +816,15 @@ class TableSorter {
auto& comparator = comparator_;
const auto& first_sort_key = sort_keys_[0];

ChunkLocation left_loc{0, 0};
ChunkLocation right_loc{0, 0};
std::merge(range_begin, range_middle, range_middle, range_end, temp_indices,
[&](uint64_t left, uint64_t right) {
// Both values are never null nor NaN.
const auto left_loc = left_resolver_.Resolve(left);
const auto right_loc = right_resolver_.Resolve(right);
left_loc =
left_resolver_.ResolveWithChunkIndexHint(left, left_loc.chunk_index);
right_loc = right_resolver_.ResolveWithChunkIndexHint(
right, right_loc.chunk_index);
auto chunk_left = first_sort_key.GetChunk<ArrayType>(left_loc);
auto chunk_right = first_sort_key.GetChunk<ArrayType>(right_loc);
DCHECK(!chunk_left.IsNull());
Expand Down