Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bc05614
add binary search for finding a chunk
edponce Dec 30, 2021
8434d14
(unstable) use ChunkResolver
edponce Feb 27, 2022
c208208
split ChunkResolver/ChunkedArrayResolver files
edponce Feb 28, 2022
210b4fa
add source/header for ChunkResolver
edponce Feb 28, 2022
8e82ce4
fix lint errors
edponce Mar 1, 2022
4695225
use arrow::internal, inline ChunkResolver fast path
edponce Mar 12, 2022
98108f1
follow convention for getitem methods and PySlice_Checks
edponce Mar 12, 2022
43f67b4
fix lint error
edponce Mar 12, 2022
2fd223b
add constructors
edponce Mar 12, 2022
8630406
add bisect algorithm to utils and group common code
edponce Mar 13, 2022
aaef1ad
refactor ChunkResolver
edponce Mar 14, 2022
bc29cc9
remove static keyword for anon namespaces, remove internal index check
edponce Mar 14, 2022
7126456
fully allocate offsets during construction
edponce Mar 14, 2022
bb4a252
simplify ChunkResolver construction
edponce Mar 15, 2022
458ab21
add atomic and shared_ptr
edponce Mar 15, 2022
37c2892
Update error message and slice checks
edponce Mar 31, 2022
0b43f29
remove constexpr
edponce Mar 31, 2022
b37332a
Initialize ChunkResolver during construction
edponce Apr 1, 2022
ced0b1a
use list initialization
edponce Apr 13, 2022
6515bdc
use brace list initialization, add comment on hacky copy constructor
edponce Apr 13, 2022
5575290
add move/assigment constructor to ChunkResolver
edponce Apr 13, 2022
a0ab9ff
moved copy constructor to ChunkedArrayResolver
edponce Apr 13, 2022
69093c1
document member variables
edponce Apr 13, 2022
76b25ff
IWYU
edponce Apr 13, 2022
5d2922c
Fix move assignment operator argument constness, make inline
pitrou Apr 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ set(ARROW_SRCS
builder.cc
buffer.cc
chunked_array.cc
chunk_resolver.cc
compare.cc
config.cc
datum.cc
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/array/array_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ struct ScalarFromArraySlotImpl {

Result<std::shared_ptr<Scalar>> Finish() && {
if (index_ >= array_.length()) {
return Status::IndexError("tried to refer to element ", index_,
" but array is only ", array_.length(), " long");
return Status::IndexError("index with value of ", index_,
" is out-of-bounds for array of length ",
array_.length());
}

if (array_.IsNull(index_)) {
Expand Down
69 changes: 69 additions & 0 deletions cpp/src/arrow/chunk_resolver.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/chunk_resolver.h"

#include <algorithm>
#include <cstdint>
#include <memory>
#include <vector>

#include "arrow/array.h"
#include "arrow/record_batch.h"

namespace arrow {
namespace internal {

namespace {
template <typename T>
int64_t GetLength(const T& array) {
// General case assumes argument is an Array pointer
return array->length();
}

template <>
int64_t GetLength<std::shared_ptr<RecordBatch>>(
const std::shared_ptr<RecordBatch>& batch) {
return batch->num_rows();
}

template <typename T>
inline std::vector<int64_t> MakeChunksOffsets(const std::vector<T>& chunks) {
std::vector<int64_t> offsets(chunks.size() + 1);
int64_t offset = 0;
std::transform(chunks.begin(), chunks.end(), offsets.begin(),
[&offset](const T& chunk) {
auto curr_offset = offset;
offset += GetLength(chunk);
return curr_offset;
});
offsets[chunks.size()] = offset;
return offsets;
}
} // namespace

ChunkResolver::ChunkResolver(const ArrayVector& chunks)
: offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(const std::vector<const Array*>& chunks)
: offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {}

ChunkResolver::ChunkResolver(const RecordBatchVector& batches)
: offsets_(MakeChunksOffsets(batches)), cached_chunk_(0) {}

} // namespace internal
} // namespace arrow
104 changes: 104 additions & 0 deletions cpp/src/arrow/chunk_resolver.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <atomic>
#include <cstdint>
#include <vector>

#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"

namespace arrow {
namespace internal {

struct ChunkLocation {
int64_t chunk_index, index_in_chunk;
};

// An object that resolves an array chunk depending on a logical index
struct ChunkResolver {
explicit ChunkResolver(const ArrayVector& chunks);

explicit ChunkResolver(const std::vector<const Array*>& chunks);

explicit ChunkResolver(const RecordBatchVector& batches);

ChunkResolver(ChunkResolver&& other)
: offsets_(std::move(other.offsets_)), cached_chunk_(other.cached_chunk_.load()) {}

ChunkResolver& operator=(ChunkResolver&& other) {
offsets_ = std::move(other.offsets_);
cached_chunk_.store(other.cached_chunk_.load());
return *this;
}

/// \brief Return a ChunkLocation containing the chunk index and in-chunk value index of
/// the chunked array at logical index
inline ChunkLocation Resolve(const int64_t index) const {
// It is common for the algorithms below to make consecutive accesses at
// a relatively small distance from each other, hence often falling in
// the same chunk.
// This is trivial when merging (assuming each side of the merge uses
// its own resolver), but also in the inner recursive invocations of
// partitioning.
if (offsets_.size() <= 1) {
return {0, index};
}
const auto cached_chunk = cached_chunk_.load();
const bool cache_hit =
(index >= offsets_[cached_chunk] && index < offsets_[cached_chunk + 1]);
if (ARROW_PREDICT_TRUE(cache_hit)) {
return {cached_chunk, index - offsets_[cached_chunk]};
}
auto chunk_index = Bisect(index);
cached_chunk_.store(chunk_index);
return {chunk_index, index - offsets_[chunk_index]};
}

protected:
// Find the chunk index corresponding to a value index using binary search
inline int64_t Bisect(const int64_t index) const {
// Like std::upper_bound(), but hand-written as it can help the compiler.
// Search [lo, lo + n)
int64_t lo = 0;
auto n = static_cast<int64_t>(offsets_.size());
while (n > 1) {
const int64_t m = n >> 1;
const int64_t mid = lo + m;
if (static_cast<int64_t>(index) >= offsets_[mid]) {
lo = mid;
n -= m;
} else {
n = m;
}
}
return lo;
}

private:
// Collection of starting offsets used for binary search
std::vector<int64_t> offsets_;

// Tracks the most recently used chunk index to allow fast
// access for consecutive indices corresponding to the same chunk
mutable std::atomic<int64_t> cached_chunk_;
};

} // namespace internal
} // namespace arrow
21 changes: 11 additions & 10 deletions cpp/src/arrow/chunked_array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,17 @@ class MemoryPool;
// ChunkedArray methods

ChunkedArray::ChunkedArray(ArrayVector chunks, std::shared_ptr<DataType> type)
: chunks_(std::move(chunks)), type_(std::move(type)) {
length_ = 0;
null_count_ = 0;

: chunks_(std::move(chunks)),
type_(std::move(type)),
length_(0),
null_count_(0),
chunk_resolver_{chunks_} {
if (type_ == nullptr) {
ARROW_CHECK_GT(chunks_.size(), 0)
<< "cannot construct ChunkedArray from empty vector and omitted type";
type_ = chunks_[0]->type();
}

for (const auto& chunk : chunks_) {
length_ += chunk->length();
null_count_ += chunk->null_count();
Expand Down Expand Up @@ -147,13 +149,12 @@ bool ChunkedArray::ApproxEquals(const ChunkedArray& other,
}

Result<std::shared_ptr<Scalar>> ChunkedArray::GetScalar(int64_t index) const {
for (const auto& chunk : chunks_) {
if (index < chunk->length()) {
return chunk->GetScalar(index);
}
index -= chunk->length();
const auto loc = chunk_resolver_.Resolve(index);
if (loc.chunk_index >= static_cast<int64_t>(chunks_.size())) {
return Status::IndexError("index with value of ", index,
" is out-of-bounds for chunked array of length ", length_);
}
return Status::Invalid("index out of bounds");
return chunks_[loc.chunk_index]->GetScalar(loc.index_in_chunk);
}

std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t offset, int64_t length) const {
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/chunked_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <utility>
#include <vector>

#include "arrow/chunk_resolver.h"
#include "arrow/compare.h"
#include "arrow/result.h"
#include "arrow/status.h"
Expand Down Expand Up @@ -177,11 +178,12 @@ class ARROW_EXPORT ChunkedArray {

protected:
ArrayVector chunks_;
std::shared_ptr<DataType> type_;
int64_t length_;
int64_t null_count_;
std::shared_ptr<DataType> type_;

private:
internal::ChunkResolver chunk_resolver_;
ARROW_DISALLOW_COPY_AND_ASSIGN(ChunkedArray);
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/chunked_array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ TEST_F(TestChunkedArray, GetScalar) {
check_scalar(carr, 4, **MakeScalar(ty, 3));
check_scalar(carr, 6, **MakeScalar(ty, 5));

ASSERT_RAISES(Invalid, carr.GetScalar(7));
ASSERT_RAISES(IndexError, carr.GetScalar(7));
}

} // namespace arrow
Loading