Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
371a12d
Proof of concept
amol- Jan 10, 2024
224f7b3
All tests passing
amol- Jan 10, 2024
e713108
Remove debug prints
amol- Jan 11, 2024
59f356c
lint
amol- Jan 11, 2024
1460f6a
Switch to usage of ChunkResolver to lookup indices
amol- Jan 15, 2024
9247f04
Identify indices type by id
amol- Jan 15, 2024
fba913e
reorder includes
amol- Jan 15, 2024
fc8377b
preallocate indices vector, we already know the size
amol- Jan 15, 2024
2a84e1b
Do an explicit cast
amol- Jan 15, 2024
43ef048
Edit test to account for chunked result
amol- Jan 15, 2024
f15cd04
ChunkResolver more robust to out of bound indices
amol- Jan 18, 2024
967c697
Speed up test
amol- Jan 23, 2024
d4420b3
Major speedup when taking 1 element from a chunk
amol- Jan 23, 2024
901e986
Improve typing
amol- Jan 23, 2024
17de3aa
Further speed up by using pointers math
amol- Jan 23, 2024
10d44b4
Fix detection of out of bounds indices
amol- Jan 23, 2024
bc15cec
lint
amol- Jan 23, 2024
a5b0eef
Fast implementation of TakeCA
amol- Jan 24, 2024
2b78740
No need to concatenate anymore, it's fast enough
amol- Jan 24, 2024
0f7f395
Remove max_index check in ChunkResolver
amol- Jan 26, 2024
05dccec
remove unecessary reindent
amol- Jan 31, 2024
b0b3026
Speed up by using a builder
amol- Jan 31, 2024
a6f9da3
Minor speed up by preallocating target array
amol- Jan 31, 2024
c6991b1
Do not create temporary Array and ArraySpan
amol- Jan 31, 2024
3be4d94
Preallocate indices builders
amol- Jan 31, 2024
d5d2885
hint optimization
amol- Feb 1, 2024
505e0f7
Speed up for incremental indices
amol- Feb 2, 2024
1a10e0a
Latest state of the code, not working atm
amol- Feb 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cpp/src/arrow/acero/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,10 @@ TEST(ExecPlanExecution, CustomFieldNames) {

TEST(ExecPlanExecution, SourceOrderBy) {
std::vector<ExecBatch> expected = {
ExecBatchFromJSON({int32(), boolean()},
"[[4, false], [5, null], [6, false], [7, false], [null, true]]")};
ExecBatchFromJSON({int32(), boolean()}, "[[4, false]]"),
ExecBatchFromJSON({int32(), boolean()}, "[[5, null], [6, false], [7, false]]"),
ExecBatchFromJSON({int32(), boolean()}, "[[null, true]]")};

for (bool slow : {false, true}) {
SCOPED_TRACE(slow ? "slowed" : "unslowed");

Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/array/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ ARROW_EXPORT
Result<std::shared_ptr<Array>> MakeEmptyArray(std::shared_ptr<DataType> type,
MemoryPool* pool = default_memory_pool());

/// \brief Create multiple strongly-typed Array instances from ArrayData
///
/// When building many arrays at once, this can provide a significant
/// performance improvement over calling MakeArray() repeatedly.
///
/// \param[in] data contents of all arrays, all must be of the same type.
/// \return the resulting Array instances
ARROW_EXPORT
ArrayVector MakeArrays(const std::vector<std::shared_ptr<ArrayData>>& data);

namespace internal {

/// \brief Swap endian of each element in a generic ArrayData
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/chunked_array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ bool ChunkedArray::ApproxEquals(const ChunkedArray& other,

Result<std::shared_ptr<Scalar>> ChunkedArray::GetScalar(int64_t index) const {
const auto loc = chunk_resolver_.Resolve(index);
if (loc.chunk_index >= static_cast<int64_t>(chunks_.size())) {
if (loc.chunk_index >= num_chunks()) {
return Status::IndexError("index with value of ", index,
" is out-of-bounds for chunked array of length ", length_);
}
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/compute/kernels/select_k_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,8 @@ TEST_F(TestSelectKWithTable, TopKMultipleColumnKeys) {
auto options = SelectKOptions::TopKDefault(3, {"a", "b"});

std::vector<std::string> expected = {R"([{"a": 3, "b": null},
{"a": 2, "b": 5},
{"a": 1, "b": 5}
{"a": 2, "b": 5}])",
R"([{"a": 1, "b": 5}
])"};
Check(schema, input, options, expected);
}
Expand Down Expand Up @@ -705,8 +705,8 @@ TEST_F(TestSelectKWithTable, BottomKMultipleColumnKeys) {
auto options = SelectKOptions::BottomKDefault(3, {"a", "b"});

std::vector<std::string> expected = {R"([{"a": 1, "b": 3},
{"a": 1, "b": 5},
{"a": 2, "b": 5}
{"a": 1, "b": 5}])",
R"([{"a": 2, "b": 5}
])"};
Check(schema, input, options, expected);
}
Expand Down
181 changes: 162 additions & 19 deletions cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
#include <limits>
#include <memory>
#include <vector>
#include <iostream>

#include "arrow/array/builder_primitive.h"
#include "arrow/array/concatenate.h"
#include "arrow/buffer_builder.h"
#include "arrow/chunk_resolver.h"
#include "arrow/chunked_array.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/kernels/codegen_internal.h"
Expand All @@ -46,6 +48,8 @@ using internal::BinaryBitBlockCounter;
using internal::BitBlockCount;
using internal::BitBlockCounter;
using internal::CheckIndexBounds;
using internal::ChunkLocation;
using internal::ChunkResolver;
using internal::OptionalBitBlockCounter;

namespace compute {
Expand Down Expand Up @@ -693,32 +697,171 @@ Result<std::shared_ptr<ChunkedArray>> TakeCA(const ChunkedArray& values,
const Array& indices,
const TakeOptions& options,
ExecContext* ctx) {
std::cerr << "TakeCA " << indices.ToString() << " from " << values.ToString() << std::endl;
auto num_chunks = values.num_chunks();
std::shared_ptr<Array> current_chunk;

// Case 1: `values` has a single chunk, so just use it
if (num_chunks == 1) {
current_chunk = values.chunk(0);
} else {
// TODO Case 2: See if all `indices` fall in the same chunk and call Array Take on it
// See
// https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151
// TODO Case 3: If indices are sorted, can slice them and call Array Take

// Case 4: Else, concatenate chunks and call Array Take
auto num_indices = indices.length();

if (indices.length() == 0) {
// Case 0: No indices were provided, nothing to take so return an empty chunked array
return ChunkedArray::MakeEmpty(values.type());
} else if (num_chunks < 2) {
std::shared_ptr<Array> current_chunk;
// Case 1: `values` is empty or has a single chunk, so just use it
if (values.chunks().empty()) {
ARROW_ASSIGN_OR_RAISE(current_chunk, MakeArrayOfNull(values.type(), /*length=*/0,
ctx->memory_pool()));
} else {
ARROW_ASSIGN_OR_RAISE(current_chunk,
Concatenate(values.chunks(), ctx->memory_pool()));
current_chunk = values.chunk(0);
}
// Call Array Take on our single chunk
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> new_chunk,
TakeAA(current_chunk->data(), indices.data(), options, ctx));
std::vector<std::shared_ptr<Array>> chunks = {MakeArray(new_chunk)};
return std::make_shared<ChunkedArray>(std::move(chunks));
} else {
// For each index, lookup at which chunk it refers to.
// We have to do this because the indices are not necessarily sorted.
// So we can't simply iterate over chunks and pick the slices we need.
std::vector<Int64Builder> builders(num_chunks);
std::vector<int64_t> indices_chunks(num_indices);
const void* indices_raw_data; // Use Raw data to avoid invoking .Value() on indices
auto indices_type_id = indices.type()->id();
switch (indices_type_id) {
case Type::UINT8:
case Type::INT8:
indices_raw_data = static_cast<UInt8Array const&>(indices).raw_values();
break;
case Type::UINT16:
case Type::INT16:
indices_raw_data = static_cast<UInt16Array const&>(indices).raw_values();
break;
case Type::UINT32:
case Type::INT32:
indices_raw_data = static_cast<UInt32Array const&>(indices).raw_values();
break;
case Type::UINT64:
case Type::INT64:
indices_raw_data = static_cast<UInt64Array const&>(indices).raw_values();
break;
default:
DCHECK(false) << "Invalid indices types " << indices.type()->ToString();
break;
}

ChunkLocation resolved_index = {0, 0};
ChunkResolver index_resolver(values.chunks());
for (int64_t requested_index = 0; requested_index < num_indices; ++requested_index) {
uint64_t index;
switch (indices_type_id) {
case Type::UINT8:
case Type::INT8:
index = static_cast<uint8_t const*>(indices_raw_data)[requested_index];
break;
case Type::UINT16:
case Type::INT16:
index = static_cast<uint16_t const*>(indices_raw_data)[requested_index];
break;
case Type::UINT32:
case Type::INT32:
index = static_cast<uint32_t const*>(indices_raw_data)[requested_index];
break;
case Type::UINT64:
case Type::INT64:
index = static_cast<uint64_t const*>(indices_raw_data)[requested_index];
break;
default:
DCHECK(false) << "Invalid indices types " << indices.type()->ToString();
break;
}

resolved_index = index_resolver.ResolveWithChunkIndexHint(index, resolved_index.chunk_index);
int64_t chunk_index = resolved_index.chunk_index;
if (chunk_index >= num_chunks) {
// ChunkResolver doesn't throw errors when the index is out of bounds
// it will just return a chunk index that doesn't exist.
return Status::IndexError("Index ", index, " is out of bounds");
}
indices_chunks[requested_index] = chunk_index;
Int64Builder &builder = builders[chunk_index];
if (builder.capacity() == builder.length()) {
// Preallocate to speed up appending
ARROW_RETURN_NOT_OK(builder.Reserve(1 << 13));
}
builder.UnsafeAppend(resolved_index.index_in_chunk);
}

// Take from the various chunks only the values we actually care about.
// We first gather all values using Take and then we slice the resulting
// arrays with the values to create the actual resulting chunks
// as that is orders of magnitude faster than calling Take multiple times.
std::vector<std::shared_ptr<arrow::ArrayData>> looked_up_values_data(num_chunks);
std::vector<arrow::ArraySpan> looked_up_values;
looked_up_values.reserve(num_chunks);
for (int i = 0; i < num_chunks; ++i) {
if (builders[i].length() == 0) {
// No indices refer to this chunk, so we can skip it
continue;
}
std::shared_ptr<Int64Array> indices_array;
ARROW_RETURN_NOT_OK(builders[i].Finish(&indices_array));
ARROW_ASSIGN_OR_RAISE(
looked_up_values_data[i], TakeAA(values.chunk(i)->data(), indices_array->data(), options, ctx));
looked_up_values.emplace_back(*looked_up_values_data[i]);
}

// Slice the arrays with the values to create the new chunked array out of them
std::unique_ptr<ArrayBuilder> result_builder;
std::vector<std::shared_ptr<Array>> result_chunks;
ARROW_RETURN_NOT_OK(MakeBuilder(ctx->memory_pool(), values.type(), &result_builder));
ARROW_RETURN_NOT_OK(result_builder->Reserve(num_indices));
std::vector<int64_t> consumed_chunk_offset(num_chunks, 0);
int64_t current_chunk = indices_chunks[0];
int64_t current_length = 0;
for (int64_t requested_index = 0; requested_index < num_indices; ++requested_index) {
int64_t chunk_index = indices_chunks[requested_index];
if (chunk_index != current_chunk) {
if (current_length == looked_up_values[current_chunk].length) {
// We have taken a whole consecutive chunk, so we can append it to the result.
std::cerr << "Full Chunk " << current_chunk << "," << current_length << " :" << looked_up_values[current_chunk].ToArray()->ToString() << std::endl;
if (result_builder->length() > 0) {
// First close the current chunk being built and add it to the chunked array,
// then we can add the full chunk to the array after it.
ARROW_ASSIGN_OR_RAISE(auto previous_chunk_array, result_builder->Finish());
result_chunks.push_back(previous_chunk_array);
result_builder->Reset();
ARROW_RETURN_NOT_OK(result_builder->Reserve(num_indices - requested_index));
}
// Append the whole chunk itself
result_chunks.push_back(looked_up_values[current_chunk].ToArray());
}
else {
std::cerr << "Append Slice" << std::endl;
// Values in previous chunk
ARROW_RETURN_NOT_OK(result_builder->AppendArraySlice(
looked_up_values[current_chunk],
consumed_chunk_offset[current_chunk], current_length));
}
consumed_chunk_offset[current_chunk] += current_length;
current_chunk = chunk_index;
current_length = 0;
}
++current_length;
}
if (current_length > 0) {
// Remaining values in last chunk
ARROW_RETURN_NOT_OK(result_builder->AppendArraySlice(
looked_up_values[current_chunk], consumed_chunk_offset[current_chunk],
current_length));
std::cerr << "Appended Last Slice" << looked_up_values[current_chunk].ToArray()->ToString() << " @ " << consumed_chunk_offset[current_chunk] << "," << current_length << std::endl;
}
ARROW_ASSIGN_OR_RAISE(auto last_chunk_array, result_builder->Finish());
std::cerr << "Last Chunk " << last_chunk_array->ToString() << std::endl;
result_chunks.push_back(last_chunk_array);

auto result = std::make_shared<ChunkedArray>(result_chunks);
std::cerr << "Result " << result->ToString() << std::endl;
return result;
}
// Call Array Take on our single chunk
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> new_chunk,
TakeAA(current_chunk->data(), indices.data(), options, ctx));
std::vector<std::shared_ptr<Array>> chunks = {MakeArray(new_chunk)};
return std::make_shared<ChunkedArray>(std::move(chunks));
}

Result<std::shared_ptr<ChunkedArray>> TakeCC(const ChunkedArray& values,
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/arrow/compute/kernels/vector_selection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1795,11 +1795,16 @@ TEST_F(TestTakeKernelWithChunkedArray, TakeChunkedArray) {
this->AssertChunkedTake(int8(), {"[]"}, {"[]"}, {"[]"});
this->AssertChunkedTake(int8(), {"[]"}, {"[null]"}, {"[null]"});

this->AssertTake(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0, 2]", {"[7, 8, 7, 9]"});
std::cerr << "\n\nCHUNKED take 1" << std::endl;
this->AssertTake(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0, 2]",
{"[7, 8, 7, 9]"});
std::cerr << "\n\nCHUNKED take 2" << std::endl;
this->AssertChunkedTake(int8(), {"[7]", "[8, 9]"}, {"[0, 1, 0]", "[]", "[2]"},
{"[7, 8, 7]", "[]", "[9]"});
std::cerr << "\n\nCHUNKED take 3" << std::endl;
this->AssertTake(int8(), {"[7]", "[8, 9]"}, "[2, 1]", {"[9, 8]"});

std::cerr << "CHUNKED index errrors" << std::endl;
std::shared_ptr<ChunkedArray> arr;
ASSERT_RAISES(IndexError,
this->TakeWithArray(int8(), {"[7]", "[8, 9]"}, "[0, 5]", &arr));
Expand Down Expand Up @@ -1863,7 +1868,8 @@ TEST_F(TestTakeKernelWithTable, TakeTable) {

this->AssertTake(schm, table_json, "[]", {"[]"});
std::vector<std::string> expected_310 = {
"[{\"a\": 4, \"b\": \"eh\"},{\"a\": 1, \"b\": \"\"},{\"a\": null, \"b\": \"yo\"}]"};
"[{\"a\": 4, \"b\": \"eh\"}]",
"[{\"a\": 1, \"b\": \"\"},{\"a\": null, \"b\": \"yo\"}]"};
this->AssertTake(schm, table_json, "[3, 1, 0]", expected_310);
this->AssertChunkedTake(schm, table_json, {"[0, 1]", "[2, 3]"}, table_json);
}
Expand Down