diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index b6f1e2481fa..29334572874 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -158,6 +158,7 @@ set(ARROW_SRCS builder.cc buffer.cc chunked_array.cc + chunk_resolver.cc compare.cc config.cc datum.cc diff --git a/cpp/src/arrow/array/array_base.cc b/cpp/src/arrow/array/array_base.cc index 11b6b1630c5..b36fb0fb94a 100644 --- a/cpp/src/arrow/array/array_base.cc +++ b/cpp/src/arrow/array/array_base.cc @@ -165,8 +165,9 @@ struct ScalarFromArraySlotImpl { Result> Finish() && { if (index_ >= array_.length()) { - return Status::IndexError("tried to refer to element ", index_, - " but array is only ", array_.length(), " long"); + return Status::IndexError("index with value of ", index_, + " is out-of-bounds for array of length ", + array_.length()); } if (array_.IsNull(index_)) { diff --git a/cpp/src/arrow/chunk_resolver.cc b/cpp/src/arrow/chunk_resolver.cc new file mode 100644 index 00000000000..4a1ba6d0a32 --- /dev/null +++ b/cpp/src/arrow/chunk_resolver.cc @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/chunk_resolver.h" + +#include +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/record_batch.h" + +namespace arrow { +namespace internal { + +namespace { +template +int64_t GetLength(const T& array) { + // General case assumes argument is an Array pointer + return array->length(); +} + +template <> +int64_t GetLength>( + const std::shared_ptr& batch) { + return batch->num_rows(); +} + +template +inline std::vector MakeChunksOffsets(const std::vector& chunks) { + std::vector offsets(chunks.size() + 1); + int64_t offset = 0; + std::transform(chunks.begin(), chunks.end(), offsets.begin(), + [&offset](const T& chunk) { + auto curr_offset = offset; + offset += GetLength(chunk); + return curr_offset; + }); + offsets[chunks.size()] = offset; + return offsets; +} +} // namespace + +ChunkResolver::ChunkResolver(const ArrayVector& chunks) + : offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {} + +ChunkResolver::ChunkResolver(const std::vector& chunks) + : offsets_(MakeChunksOffsets(chunks)), cached_chunk_(0) {} + +ChunkResolver::ChunkResolver(const RecordBatchVector& batches) + : offsets_(MakeChunksOffsets(batches)), cached_chunk_(0) {} + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h new file mode 100644 index 00000000000..f69253b1617 --- /dev/null +++ b/cpp/src/arrow/chunk_resolver.h @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "arrow/type_fwd.h" +#include "arrow/util/macros.h" + +namespace arrow { +namespace internal { + +struct ChunkLocation { + int64_t chunk_index, index_in_chunk; +}; + +// An object that resolves an array chunk depending on a logical index +struct ChunkResolver { + explicit ChunkResolver(const ArrayVector& chunks); + + explicit ChunkResolver(const std::vector& chunks); + + explicit ChunkResolver(const RecordBatchVector& batches); + + ChunkResolver(ChunkResolver&& other) + : offsets_(std::move(other.offsets_)), cached_chunk_(other.cached_chunk_.load()) {} + + ChunkResolver& operator=(ChunkResolver&& other) { + offsets_ = std::move(other.offsets_); + cached_chunk_.store(other.cached_chunk_.load()); + return *this; + } + + /// \brief Return a ChunkLocation containing the chunk index and in-chunk value index of + /// the chunked array at logical index + inline ChunkLocation Resolve(const int64_t index) const { + // It is common for the algorithms below to make consecutive accesses at + // a relatively small distance from each other, hence often falling in + // the same chunk. + // This is trivial when merging (assuming each side of the merge uses + // its own resolver), but also in the inner recursive invocations of + // partitioning. + if (offsets_.size() <= 1) { + return {0, index}; + } + const auto cached_chunk = cached_chunk_.load(); + const bool cache_hit = + (index >= offsets_[cached_chunk] && index < offsets_[cached_chunk + 1]); + if (ARROW_PREDICT_TRUE(cache_hit)) { + return {cached_chunk, index - offsets_[cached_chunk]}; + } + auto chunk_index = Bisect(index); + cached_chunk_.store(chunk_index); + return {chunk_index, index - offsets_[chunk_index]}; + } + + protected: + // Find the chunk index corresponding to a value index using binary search + inline int64_t Bisect(const int64_t index) const { + // Like std::upper_bound(), but hand-written as it can help the compiler. + // Search [lo, lo + n) + int64_t lo = 0; + auto n = static_cast(offsets_.size()); + while (n > 1) { + const int64_t m = n >> 1; + const int64_t mid = lo + m; + if (static_cast(index) >= offsets_[mid]) { + lo = mid; + n -= m; + } else { + n = m; + } + } + return lo; + } + + private: + // Collection of starting offsets used for binary search + std::vector offsets_; + + // Tracks the most recently used chunk index to allow fast + // access for consecutive indices corresponding to the same chunk + mutable std::atomic cached_chunk_; +}; + +} // namespace internal +} // namespace arrow diff --git a/cpp/src/arrow/chunked_array.cc b/cpp/src/arrow/chunked_array.cc index 30746902945..840dd04a5ad 100644 --- a/cpp/src/arrow/chunked_array.cc +++ b/cpp/src/arrow/chunked_array.cc @@ -43,15 +43,17 @@ class MemoryPool; // ChunkedArray methods ChunkedArray::ChunkedArray(ArrayVector chunks, std::shared_ptr type) - : chunks_(std::move(chunks)), type_(std::move(type)) { - length_ = 0; - null_count_ = 0; - + : chunks_(std::move(chunks)), + type_(std::move(type)), + length_(0), + null_count_(0), + chunk_resolver_{chunks_} { if (type_ == nullptr) { ARROW_CHECK_GT(chunks_.size(), 0) << "cannot construct ChunkedArray from empty vector and omitted type"; type_ = chunks_[0]->type(); } + for (const auto& chunk : chunks_) { length_ += chunk->length(); null_count_ += chunk->null_count(); @@ -147,13 +149,12 @@ bool ChunkedArray::ApproxEquals(const ChunkedArray& other, } Result> ChunkedArray::GetScalar(int64_t index) const { - for (const auto& chunk : chunks_) { - if (index < chunk->length()) { - return chunk->GetScalar(index); - } - index -= chunk->length(); + const auto loc = chunk_resolver_.Resolve(index); + if (loc.chunk_index >= static_cast(chunks_.size())) { + return Status::IndexError("index with value of ", index, + " is out-of-bounds for chunked array of length ", length_); } - return Status::Invalid("index out of bounds"); + return chunks_[loc.chunk_index]->GetScalar(loc.index_in_chunk); } std::shared_ptr ChunkedArray::Slice(int64_t offset, int64_t length) const { diff --git a/cpp/src/arrow/chunked_array.h b/cpp/src/arrow/chunked_array.h index 36b2f38ed43..e130a99bbe2 100644 --- a/cpp/src/arrow/chunked_array.h +++ b/cpp/src/arrow/chunked_array.h @@ -23,6 +23,7 @@ #include #include +#include "arrow/chunk_resolver.h" #include "arrow/compare.h" #include "arrow/result.h" #include "arrow/status.h" @@ -177,11 +178,12 @@ class ARROW_EXPORT ChunkedArray { protected: ArrayVector chunks_; + std::shared_ptr type_; int64_t length_; int64_t null_count_; - std::shared_ptr type_; private: + internal::ChunkResolver chunk_resolver_; ARROW_DISALLOW_COPY_AND_ASSIGN(ChunkedArray); }; diff --git a/cpp/src/arrow/chunked_array_test.cc b/cpp/src/arrow/chunked_array_test.cc index a623cd47065..d1dc69de274 100644 --- a/cpp/src/arrow/chunked_array_test.cc +++ b/cpp/src/arrow/chunked_array_test.cc @@ -278,7 +278,7 @@ TEST_F(TestChunkedArray, GetScalar) { check_scalar(carr, 4, **MakeScalar(ty, 3)); check_scalar(carr, 6, **MakeScalar(ty, 5)); - ASSERT_RAISES(Invalid, carr.GetScalar(7)); + ASSERT_RAISES(IndexError, carr.GetScalar(7)); } } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/chunked_internal.h b/cpp/src/arrow/compute/kernels/chunked_internal.h index b007d6cbfb8..69f439fccf0 100644 --- a/cpp/src/arrow/compute/kernels/chunked_internal.h +++ b/cpp/src/arrow/compute/kernels/chunked_internal.h @@ -19,12 +19,12 @@ #include #include +#include #include #include "arrow/array.h" +#include "arrow/chunk_resolver.h" #include "arrow/compute/kernels/codegen_internal.h" -#include "arrow/record_batch.h" -#include "arrow/type.h" namespace arrow { namespace compute { @@ -33,8 +33,8 @@ namespace internal { // The target chunk in a chunked array. template struct ResolvedChunk { - using V = GetViewType; - using LogicalValueType = typename V::T; + using ViewType = GetViewType; + using LogicalValueType = typename ViewType::T; // The target array in chunked array. const ArrayType* array; @@ -45,7 +45,7 @@ struct ResolvedChunk { bool IsNull() const { return array->IsNull(index); } - LogicalValueType Value() const { return V::LogicalValue(array->GetView(index)); } + LogicalValueType Value() const { return ViewType::LogicalValue(array->GetView(index)); } }; // ResolvedChunk specialization for untyped arrays when all is needed is null lookup @@ -61,97 +61,20 @@ struct ResolvedChunk { bool IsNull() const { return array->IsNull(index); } }; -struct ChunkLocation { - int64_t chunk_index, index_in_chunk; -}; - -// An object that resolves an array chunk depending on the index. -struct ChunkResolver { - explicit ChunkResolver(std::vector lengths) - : num_chunks_(static_cast(lengths.size())), - offsets_(MakeEndOffsets(std::move(lengths))), - cached_chunk_(0) {} - - ChunkLocation Resolve(int64_t index) const { - // It is common for the algorithms below to make consecutive accesses at - // a relatively small distance from each other, hence often falling in - // the same chunk. - // This is trivial when merging (assuming each side of the merge uses - // its own resolver), but also in the inner recursive invocations of - // partitioning. - const bool cache_hit = - (index >= offsets_[cached_chunk_] && index < offsets_[cached_chunk_ + 1]); - if (ARROW_PREDICT_TRUE(cache_hit)) { - return {cached_chunk_, index - offsets_[cached_chunk_]}; - } else { - return ResolveMissBisect(index); - } - } - - static ChunkResolver FromBatches(const RecordBatchVector& batches) { - std::vector lengths(batches.size()); - std::transform( - batches.begin(), batches.end(), lengths.begin(), - [](const std::shared_ptr& batch) { return batch->num_rows(); }); - return ChunkResolver(std::move(lengths)); - } - - protected: - ChunkLocation ResolveMissBisect(int64_t index) const { - // Like std::upper_bound(), but hand-written as it can help the compiler. - const int64_t* raw_offsets = offsets_.data(); - // Search [lo, lo + n) - int64_t lo = 0, n = num_chunks_; - while (n > 1) { - int64_t m = n >> 1; - int64_t mid = lo + m; - if (index >= raw_offsets[mid]) { - lo = mid; - n -= m; - } else { - n = m; - } - } - cached_chunk_ = lo; - return {lo, index - offsets_[lo]}; - } +struct ChunkedArrayResolver : protected ::arrow::internal::ChunkResolver { + ChunkedArrayResolver(const ChunkedArrayResolver& other) + : ::arrow::internal::ChunkResolver(other.chunks_), chunks_(other.chunks_) {} - static std::vector MakeEndOffsets(std::vector lengths) { - int64_t offset = 0; - for (auto& v : lengths) { - const auto this_length = v; - v = offset; - offset += this_length; - } - lengths.push_back(offset); - return lengths; - } - - int64_t num_chunks_; - std::vector offsets_; - - mutable int64_t cached_chunk_; -}; - -struct ChunkedArrayResolver : protected ChunkResolver { explicit ChunkedArrayResolver(const std::vector& chunks) - : ChunkResolver(MakeLengths(chunks)), chunks_(chunks) {} + : ::arrow::internal::ChunkResolver(chunks), chunks_(chunks) {} template ResolvedChunk Resolve(int64_t index) const { - const auto loc = ChunkResolver::Resolve(index); - return ResolvedChunk( - checked_cast(chunks_[loc.chunk_index]), loc.index_in_chunk); + const auto loc = ::arrow::internal::ChunkResolver::Resolve(index); + return {checked_cast(chunks_[loc.chunk_index]), loc.index_in_chunk}; } protected: - static std::vector MakeLengths(const std::vector& chunks) { - std::vector lengths(chunks.size()); - std::transform(chunks.begin(), chunks.end(), lengths.begin(), - [](const Array* arr) { return arr->length(); }); - return lengths; - } - const std::vector chunks_; }; diff --git a/cpp/src/arrow/compute/kernels/vector_sort.cc b/cpp/src/arrow/compute/kernels/vector_sort.cc index 1f62ce5b42a..ad3323199ad 100644 --- a/cpp/src/arrow/compute/kernels/vector_sort.cc +++ b/cpp/src/arrow/compute/kernels/vector_sort.cc @@ -27,7 +27,9 @@ #include "arrow/array/concatenate.h" #include "arrow/array/data.h" +#include "arrow/chunk_resolver.h" #include "arrow/compute/api_vector.h" +#include "arrow/compute/kernels/chunked_internal.h" #include "arrow/compute/kernels/common.h" #include "arrow/compute/kernels/util_internal.h" #include "arrow/compute/kernels/vector_sort_internal.h" @@ -849,10 +851,10 @@ class TableSorter { order(order), null_count(null_count) {} - using LocationType = ChunkLocation; + using LocationType = ::arrow::internal::ChunkLocation; template - ResolvedChunk GetChunk(ChunkLocation loc) const { + ResolvedChunk GetChunk(::arrow::internal::ChunkLocation loc) const { return {checked_cast(chunks[loc.chunk_index]), loc.index_in_chunk}; } @@ -895,8 +897,8 @@ class TableSorter { batches_(MakeBatches(table, &status_)), options_(options), null_placement_(options.null_placement), - left_resolver_(ChunkResolver::FromBatches(batches_)), - right_resolver_(ChunkResolver::FromBatches(batches_)), + left_resolver_(batches_), + right_resolver_(batches_), sort_keys_(ResolveSortKeys(table, batches_, options.sort_keys, &status_)), indices_begin_(indices_begin), indices_end_(indices_end), @@ -1137,7 +1139,7 @@ class TableSorter { const RecordBatchVector batches_; const SortOptions& options_; const NullPlacement null_placement_; - const ChunkResolver left_resolver_, right_resolver_; + const ::arrow::internal::ChunkResolver left_resolver_, right_resolver_; const std::vector sort_keys_; uint64_t* indices_begin_; uint64_t* indices_end_; @@ -1671,9 +1673,8 @@ class TableSelecter : public TypeVisitor { : order(order), type(GetPhysicalType(chunked_array->type())), chunks(GetPhysicalChunks(*chunked_array, type)), - chunk_pointers(GetArrayPointers(chunks)), null_count(chunked_array->null_count()), - resolver(chunk_pointers) {} + resolver(GetArrayPointers(chunks)) {} using LocationType = int64_t; @@ -1687,7 +1688,6 @@ class TableSelecter : public TypeVisitor { const SortOrder order; const std::shared_ptr type; const ArrayVector chunks; - const std::vector chunk_pointers; const int64_t null_count; const ChunkedArrayResolver resolver; }; diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 05aeff13251..9c5069c2d20 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1278,7 +1278,7 @@ cdef class Array(_PandasConvertible): ------- value : Scalar (index) or Array (slice) """ - if PySlice_Check(key): + if isinstance(key, slice): return _normalize_slice(self, key) return self.getitem(_normalize_index(key, self.length())) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 55935feba67..c8e945dc14a 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -754,6 +754,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: shared_ptr[CArray] chunk(int i) shared_ptr[CDataType] type() + CResult[shared_ptr[CScalar]] GetScalar(int64_t index) const shared_ptr[CChunkedArray] Slice(int64_t offset, int64_t length) const shared_ptr[CChunkedArray] Slice(int64_t offset) const diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index c6bf11b2dc4..c39fc78d2ad 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1050,7 +1050,7 @@ cdef class Buffer(_Weakrefable): return pyarrow_wrap_buffer(parent_buf) def __getitem__(self, key): - if PySlice_Check(key): + if isinstance(key, slice): if (key.step or 1) != 1: raise IndexError('only slices with step 1 supported') return _normalize_slice(self, key) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index c4e83266529..9bd05536836 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -222,7 +222,7 @@ cdef class ChunkedArray(_PandasConvertible): If buffers are shared between arrays then the shared portion will only be counted multiple times. - The dictionary of dictionary arrays will always be counted in their + The dictionary of dictionary arrays will always be counted in their entirety even if the array only references a portion of the dictionary. Examples @@ -285,19 +285,14 @@ cdef class ChunkedArray(_PandasConvertible): ------- value : Scalar (index) or ChunkedArray (slice) """ + if isinstance(key, slice): return _normalize_slice(self, key) return self.getitem(_normalize_index(key, self.chunked_array.length())) - cdef getitem(self, int64_t index): - cdef int j - - for j in range(self.num_chunks): - if index < self.chunked_array.chunk(j).get().length(): - return self.chunk(j)[index] - else: - index -= self.chunked_array.chunk(j).get().length() + cdef getitem(self, int64_t i): + return Scalar.wrap(GetResultValue(self.chunked_array.GetScalar(i))) def is_null(self, *, nan_is_null=False): """ @@ -1991,8 +1986,8 @@ cdef class RecordBatch(_PandasConvertible): """ if isinstance(key, slice): return _normalize_slice(self, key) - else: - return self.column(key) + + return self.column(key) def serialize(self, memory_pool=None): """ @@ -2819,8 +2814,8 @@ cdef class Table(_PandasConvertible): """ if isinstance(key, slice): return _normalize_slice(self, key) - else: - return self.column(key) + + return self.column(key) def slice(self, offset=0, length=None): """