diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc index e74ad6a6665..8d1f98a8940 100644 --- a/cpp/src/arrow/acero/plan_test.cc +++ b/cpp/src/arrow/acero/plan_test.cc @@ -594,8 +594,10 @@ TEST(ExecPlanExecution, CustomFieldNames) { TEST(ExecPlanExecution, SourceOrderBy) { std::vector expected = { - ExecBatchFromJSON({int32(), boolean()}, - "[[4, false], [5, null], [6, false], [7, false], [null, true]]")}; + ExecBatchFromJSON({int32(), boolean()}, "[[4, false]]"), + ExecBatchFromJSON({int32(), boolean()}, "[[5, null], [6, false], [7, false]]"), + ExecBatchFromJSON({int32(), boolean()}, "[[null, true]]")}; + for (bool slow : {false, true}) { SCOPED_TRACE(slow ? "slowed" : "unslowed"); diff --git a/cpp/src/arrow/array/util.h b/cpp/src/arrow/array/util.h index 9f34af0525d..9585ba35039 100644 --- a/cpp/src/arrow/array/util.h +++ b/cpp/src/arrow/array/util.h @@ -65,6 +65,16 @@ ARROW_EXPORT Result> MakeEmptyArray(std::shared_ptr type, MemoryPool* pool = default_memory_pool()); +/// \brief Create multiple strongly-typed Array instances from ArrayData +/// +/// When building many arrays at once, this can provide a significant +/// performance improvement over calling MakeArray() repeatedly. +/// +/// \param[in] data contents of all arrays, all must be of the same type. +/// \return the resulting Array instances +ARROW_EXPORT +ArrayVector MakeArrays(const std::vector>& data); + namespace internal { /// \brief Swap endian of each element in a generic ArrayData diff --git a/cpp/src/arrow/chunked_array.cc b/cpp/src/arrow/chunked_array.cc index c36b736d5d5..07de394a2a7 100644 --- a/cpp/src/arrow/chunked_array.cc +++ b/cpp/src/arrow/chunked_array.cc @@ -169,7 +169,7 @@ bool ChunkedArray::ApproxEquals(const ChunkedArray& other, Result> ChunkedArray::GetScalar(int64_t index) const { const auto loc = chunk_resolver_.Resolve(index); - if (loc.chunk_index >= static_cast(chunks_.size())) { + if (loc.chunk_index >= num_chunks()) { return Status::IndexError("index with value of ", index, " is out-of-bounds for chunked array of length ", length_); } diff --git a/cpp/src/arrow/compute/kernels/select_k_test.cc b/cpp/src/arrow/compute/kernels/select_k_test.cc index c9dbe0bd4c0..8669a36dc4c 100644 --- a/cpp/src/arrow/compute/kernels/select_k_test.cc +++ b/cpp/src/arrow/compute/kernels/select_k_test.cc @@ -659,8 +659,8 @@ TEST_F(TestSelectKWithTable, TopKMultipleColumnKeys) { auto options = SelectKOptions::TopKDefault(3, {"a", "b"}); std::vector expected = {R"([{"a": 3, "b": null}, - {"a": 2, "b": 5}, - {"a": 1, "b": 5} + {"a": 2, "b": 5}])", + R"([{"a": 1, "b": 5} ])"}; Check(schema, input, options, expected); } @@ -705,8 +705,8 @@ TEST_F(TestSelectKWithTable, BottomKMultipleColumnKeys) { auto options = SelectKOptions::BottomKDefault(3, {"a", "b"}); std::vector expected = {R"([{"a": 1, "b": 3}, - {"a": 1, "b": 5}, - {"a": 2, "b": 5} + {"a": 1, "b": 5}])", + R"([{"a": 2, "b": 5} ])"}; Check(schema, input, options, expected); } diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index 89b3f7d0d3c..19890462490 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -20,10 +20,12 @@ #include #include #include +#include #include "arrow/array/builder_primitive.h" #include "arrow/array/concatenate.h" #include "arrow/buffer_builder.h" +#include "arrow/chunk_resolver.h" #include "arrow/chunked_array.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/kernels/codegen_internal.h" @@ -46,6 +48,8 @@ using internal::BinaryBitBlockCounter; using internal::BitBlockCount; using internal::BitBlockCounter; using internal::CheckIndexBounds; +using internal::ChunkLocation; +using internal::ChunkResolver; using internal::OptionalBitBlockCounter; namespace compute { @@ -693,32 +697,171 @@ Result> TakeCA(const ChunkedArray& values, const Array& indices, const TakeOptions& options, ExecContext* ctx) { + std::cerr << "TakeCA " << indices.ToString() << " from " << values.ToString() << std::endl; auto num_chunks = values.num_chunks(); - std::shared_ptr current_chunk; - - // Case 1: `values` has a single chunk, so just use it - if (num_chunks == 1) { - current_chunk = values.chunk(0); - } else { - // TODO Case 2: See if all `indices` fall in the same chunk and call Array Take on it - // See - // https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151 - // TODO Case 3: If indices are sorted, can slice them and call Array Take - - // Case 4: Else, concatenate chunks and call Array Take + auto num_indices = indices.length(); + + if (indices.length() == 0) { + // Case 0: No indices were provided, nothing to take so return an empty chunked array + return ChunkedArray::MakeEmpty(values.type()); + } else if (num_chunks < 2) { + std::shared_ptr current_chunk; + // Case 1: `values` is empty or has a single chunk, so just use it if (values.chunks().empty()) { ARROW_ASSIGN_OR_RAISE(current_chunk, MakeArrayOfNull(values.type(), /*length=*/0, ctx->memory_pool())); } else { - ARROW_ASSIGN_OR_RAISE(current_chunk, - Concatenate(values.chunks(), ctx->memory_pool())); + current_chunk = values.chunk(0); + } + // Call Array Take on our single chunk + ARROW_ASSIGN_OR_RAISE(std::shared_ptr new_chunk, + TakeAA(current_chunk->data(), indices.data(), options, ctx)); + std::vector> chunks = {MakeArray(new_chunk)}; + return std::make_shared(std::move(chunks)); + } else { + // For each index, lookup at which chunk it refers to. + // We have to do this because the indices are not necessarily sorted. + // So we can't simply iterate over chunks and pick the slices we need. + std::vector builders(num_chunks); + std::vector indices_chunks(num_indices); + const void* indices_raw_data; // Use Raw data to avoid invoking .Value() on indices + auto indices_type_id = indices.type()->id(); + switch (indices_type_id) { + case Type::UINT8: + case Type::INT8: + indices_raw_data = static_cast(indices).raw_values(); + break; + case Type::UINT16: + case Type::INT16: + indices_raw_data = static_cast(indices).raw_values(); + break; + case Type::UINT32: + case Type::INT32: + indices_raw_data = static_cast(indices).raw_values(); + break; + case Type::UINT64: + case Type::INT64: + indices_raw_data = static_cast(indices).raw_values(); + break; + default: + DCHECK(false) << "Invalid indices types " << indices.type()->ToString(); + break; + } + + ChunkLocation resolved_index = {0, 0}; + ChunkResolver index_resolver(values.chunks()); + for (int64_t requested_index = 0; requested_index < num_indices; ++requested_index) { + uint64_t index; + switch (indices_type_id) { + case Type::UINT8: + case Type::INT8: + index = static_cast(indices_raw_data)[requested_index]; + break; + case Type::UINT16: + case Type::INT16: + index = static_cast(indices_raw_data)[requested_index]; + break; + case Type::UINT32: + case Type::INT32: + index = static_cast(indices_raw_data)[requested_index]; + break; + case Type::UINT64: + case Type::INT64: + index = static_cast(indices_raw_data)[requested_index]; + break; + default: + DCHECK(false) << "Invalid indices types " << indices.type()->ToString(); + break; + } + + resolved_index = index_resolver.ResolveWithChunkIndexHint(index, resolved_index.chunk_index); + int64_t chunk_index = resolved_index.chunk_index; + if (chunk_index >= num_chunks) { + // ChunkResolver doesn't throw errors when the index is out of bounds + // it will just return a chunk index that doesn't exist. + return Status::IndexError("Index ", index, " is out of bounds"); + } + indices_chunks[requested_index] = chunk_index; + Int64Builder &builder = builders[chunk_index]; + if (builder.capacity() == builder.length()) { + // Preallocate to speed up appending + ARROW_RETURN_NOT_OK(builder.Reserve(1 << 13)); + } + builder.UnsafeAppend(resolved_index.index_in_chunk); + } + + // Take from the various chunks only the values we actually care about. + // We first gather all values using Take and then we slice the resulting + // arrays with the values to create the actual resulting chunks + // as that is orders of magnitude faster than calling Take multiple times. + std::vector> looked_up_values_data(num_chunks); + std::vector looked_up_values; + looked_up_values.reserve(num_chunks); + for (int i = 0; i < num_chunks; ++i) { + if (builders[i].length() == 0) { + // No indices refer to this chunk, so we can skip it + continue; + } + std::shared_ptr indices_array; + ARROW_RETURN_NOT_OK(builders[i].Finish(&indices_array)); + ARROW_ASSIGN_OR_RAISE( + looked_up_values_data[i], TakeAA(values.chunk(i)->data(), indices_array->data(), options, ctx)); + looked_up_values.emplace_back(*looked_up_values_data[i]); } + + // Slice the arrays with the values to create the new chunked array out of them + std::unique_ptr result_builder; + std::vector> result_chunks; + ARROW_RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), values.type(), &result_builder)); + ARROW_RETURN_NOT_OK(result_builder->Reserve(num_indices)); + std::vector consumed_chunk_offset(num_chunks, 0); + int64_t current_chunk = indices_chunks[0]; + int64_t current_length = 0; + for (int64_t requested_index = 0; requested_index < num_indices; ++requested_index) { + int64_t chunk_index = indices_chunks[requested_index]; + if (chunk_index != current_chunk) { + if (current_length == looked_up_values[current_chunk].length) { + // We have taken a whole consecutive chunk, so we can append it to the result. + std::cerr << "Full Chunk " << current_chunk << "," << current_length << " :" << looked_up_values[current_chunk].ToArray()->ToString() << std::endl; + if (result_builder->length() > 0) { + // First close the current chunk being built and add it to the chunked array, + // then we can add the full chunk to the array after it. + ARROW_ASSIGN_OR_RAISE(auto previous_chunk_array, result_builder->Finish()); + result_chunks.push_back(previous_chunk_array); + result_builder->Reset(); + ARROW_RETURN_NOT_OK(result_builder->Reserve(num_indices - requested_index)); + } + // Append the whole chunk itself + result_chunks.push_back(looked_up_values[current_chunk].ToArray()); + } + else { + std::cerr << "Append Slice" << std::endl; + // Values in previous chunk + ARROW_RETURN_NOT_OK(result_builder->AppendArraySlice( + looked_up_values[current_chunk], + consumed_chunk_offset[current_chunk], current_length)); + } + consumed_chunk_offset[current_chunk] += current_length; + current_chunk = chunk_index; + current_length = 0; + } + ++current_length; + } + if (current_length > 0) { + // Remaining values in last chunk + ARROW_RETURN_NOT_OK(result_builder->AppendArraySlice( + looked_up_values[current_chunk], consumed_chunk_offset[current_chunk], + current_length)); + std::cerr << "Appended Last Slice" << looked_up_values[current_chunk].ToArray()->ToString() << " @ " << consumed_chunk_offset[current_chunk] << "," << current_length << std::endl; + } + ARROW_ASSIGN_OR_RAISE(auto last_chunk_array, result_builder->Finish()); + std::cerr << "Last Chunk " << last_chunk_array->ToString() << std::endl; + result_chunks.push_back(last_chunk_array); + + auto result = std::make_shared(result_chunks); + std::cerr << "Result " << result->ToString() << std::endl; + return result; } - // Call Array Take on our single chunk - ARROW_ASSIGN_OR_RAISE(std::shared_ptr new_chunk, - TakeAA(current_chunk->data(), indices.data(), options, ctx)); - std::vector> chunks = {MakeArray(new_chunk)}; - return std::make_shared(std::move(chunks)); } Result> TakeCC(const ChunkedArray& values, diff --git a/cpp/src/arrow/compute/kernels/vector_selection_test.cc b/cpp/src/arrow/compute/kernels/vector_selection_test.cc index ec94b328ea3..1f4d25eb96c 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_test.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_test.cc @@ -1795,11 +1795,16 @@ TEST_F(TestTakeKernelWithChunkedArray, TakeChunkedArray) { this->AssertChunkedTake(int8(), {"[]"}, {"[]"}, {"[]"}); this->AssertChunkedTake(int8(), {"[]"}, {"[null]"}, {"[null]"}); - this->AssertTake(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0, 2]", {"[7, 8, 7, 9]"}); + std::cerr << "\n\nCHUNKED take 1" << std::endl; + this->AssertTake(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0, 2]", + {"[7, 8, 7, 9]"}); + std::cerr << "\n\nCHUNKED take 2" << std::endl; this->AssertChunkedTake(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[]", "[2]"}, {"[7, 8, 7]", "[]", "[9]"}); + std::cerr << "\n\nCHUNKED take 3" << std::endl; this->AssertTake(int8(), {"[7]", "[8, 9]"}, "[2, 1]", {"[9, 8]"}); + std::cerr << "CHUNKED index errrors" << std::endl; std::shared_ptr arr; ASSERT_RAISES(IndexError, this->TakeWithArray(int8(), {"[7]", "[8, 9]"}, "[0, 5]", &arr)); @@ -1863,7 +1868,8 @@ TEST_F(TestTakeKernelWithTable, TakeTable) { this->AssertTake(schm, table_json, "[]", {"[]"}); std::vector expected_310 = { - "[{\"a\": 4, \"b\": \"eh\"},{\"a\": 1, \"b\": \"\"},{\"a\": null, \"b\": \"yo\"}]"}; + "[{\"a\": 4, \"b\": \"eh\"}]", + "[{\"a\": 1, \"b\": \"\"},{\"a\": null, \"b\": \"yo\"}]"}; this->AssertTake(schm, table_json, "[3, 1, 0]", expected_310); this->AssertChunkedTake(schm, table_json, {"[0, 1]", "[2, 3]"}, table_json); }