diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 4ef60c9f168..f1cabfb569d 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -118,6 +118,7 @@ set(ARROW_SRCS testing/util.cc util/basic_decimal.cc util/bit-util.cc + util/concatenate.cc util/compression.cc util/cpu-info.cc util/decimal.cc diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc index bb3b47d32a1..5cb7bf46300 100644 --- a/cpp/src/arrow/array.cc +++ b/cpp/src/arrow/array.cc @@ -77,6 +77,18 @@ std::shared_ptr ArrayData::Make(const std::shared_ptr& type return std::make_shared(type, length, null_count, offset); } +ArrayData ArrayData::Slice(int64_t off, int64_t len) const { + DCHECK_LE(off, length); + len = std::min(length - off, len); + off += offset; + + auto copy = *this; + copy.length = len; + copy.offset = off; + copy.null_count = null_count != 0 ? kUnknownNullCount : 0; + return copy; +} + int64_t ArrayData::GetNullCount() const { if (ARROW_PREDICT_FALSE(this->null_count == kUnknownNullCount)) { if (this->buffers[0]) { @@ -125,21 +137,8 @@ bool Array::RangeEquals(const Array& other, int64_t start_idx, int64_t end_idx, return ArrayRangeEquals(*this, other, start_idx, end_idx, other_start_idx); } -static inline std::shared_ptr SliceData(const ArrayData& data, int64_t offset, - int64_t length) { - DCHECK_LE(offset, data.length); - length = std::min(data.length - offset, length); - offset += data.offset; - - auto new_data = data.Copy(); - new_data->length = length; - new_data->offset = offset; - new_data->null_count = data.null_count != 0 ? kUnknownNullCount : 0; - return new_data; -} - std::shared_ptr Array::Slice(int64_t offset, int64_t length) const { - return MakeArray(SliceData(*data_, offset, length)); + return MakeArray(std::make_shared(data_->Slice(offset, length))); } std::shared_ptr Array::Slice(int64_t offset) const { @@ -385,7 +384,8 @@ std::shared_ptr StructArray::field(int i) const { if (!boxed_fields_[i]) { std::shared_ptr field_data; if (data_->offset != 0 || data_->child_data[i]->length != data_->length) { - field_data = SliceData(*data_->child_data[i].get(), data_->offset, data_->length); + field_data = std::make_shared( + data_->child_data[i]->Slice(data_->offset, data_->length)); } else { field_data = data_->child_data[i]; } @@ -410,7 +410,7 @@ Status StructArray::Flatten(MemoryPool* pool, ArrayVector* out) const { // Need to adjust for parent offset if (data_->offset != 0 || data_->length != child_data->length) { - child_data = SliceData(*child_data, data_->offset, data_->length); + *child_data = child_data->Slice(data_->offset, data_->length); } std::shared_ptr child_null_bitmap = child_data->buffers[0]; const int64_t child_offset = child_data->offset; @@ -540,13 +540,13 @@ Status UnionArray::MakeSparse(const Array& type_ids, std::shared_ptr UnionArray::child(int i) const { if (!boxed_fields_[i]) { - std::shared_ptr child_data = data_->child_data[i]; + std::shared_ptr child_data = data_->child_data[i]->Copy(); if (mode() == UnionMode::SPARSE) { // Sparse union: need to adjust child if union is sliced // (for dense unions, the need to lookup through the offsets // makes this unnecessary) if (data_->offset != 0 || child_data->length > data_->length) { - child_data = SliceData(*child_data.get(), data_->offset, data_->length); + *child_data = child_data->Slice(data_->offset, data_->length); } } boxed_fields_[i] = MakeArray(child_data); @@ -994,5 +994,4 @@ std::vector RechunkArraysConsistently( } } // namespace internal - } // namespace arrow diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index bee133c017e..f6815426950 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -40,8 +40,6 @@ namespace arrow { class Array; class ArrayVisitor; -using BufferVector = std::vector>; - // When slicing, we do not know the null count of the sliced range without // doing some computation. To avoid doing this eagerly, we set the null count // to -1 (any negative number will do). When Array::null_count is called the @@ -67,7 +65,7 @@ class Status; /// could cast from int64 to float64 like so: /// /// Int64Array arr = GetMyData(); -/// auto new_data = arr.data()->ShallowCopy(); +/// auto new_data = arr.data()->Copy(); /// new_data->type = arrow::float64(); /// DoubleArray double_arr(new_data); /// @@ -75,7 +73,7 @@ class Status; /// reused. For example, if we had a group of operations all returning doubles, /// say: /// -/// Log(Sqrt(Expr(arr)) +/// Log(Sqrt(Expr(arr))) /// /// Then the low-level implementations of each of these functions could have /// the signatures @@ -146,6 +144,7 @@ struct ARROW_EXPORT ArrayData { buffers(std::move(other.buffers)), child_data(std::move(other.child_data)) {} + // Copy constructor ArrayData(const ArrayData& other) noexcept : type(other.type), length(other.length), @@ -155,15 +154,10 @@ struct ARROW_EXPORT ArrayData { child_data(other.child_data) {} // Move assignment - ArrayData& operator=(ArrayData&& other) { - type = std::move(other.type); - length = other.length; - null_count = other.null_count; - offset = other.offset; - buffers = std::move(other.buffers); - child_data = std::move(other.child_data); - return *this; - } + ArrayData& operator=(ArrayData&& other) = default; + + // Copy assignment + ArrayData& operator=(const ArrayData& other) = default; std::shared_ptr Copy() const { return std::make_shared(*this); } @@ -197,6 +191,9 @@ struct ARROW_EXPORT ArrayData { return GetMutableValues(i, offset); } + // Construct a zero-copy slice of the data with the indicated offset and length + ArrayData Slice(int64_t offset, int64_t length) const; + /// \brief Return null count, or compute and set it if it's not known int64_t GetNullCount() const; diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc index 8f05912b804..9e9bd2e33bc 100644 --- a/cpp/src/arrow/buffer.cc +++ b/cpp/src/arrow/buffer.cc @@ -227,4 +227,19 @@ Status AllocateEmptyBitmap(int64_t length, std::shared_ptr* out) { return AllocateEmptyBitmap(default_memory_pool(), length, out); } +Status ConcatenateBuffers(const std::vector>& buffers, + MemoryPool* pool, std::shared_ptr* out) { + int64_t out_length = 0; + for (const auto& buffer : buffers) { + out_length += buffer->size(); + } + RETURN_NOT_OK(AllocateBuffer(pool, out_length, out)); + auto out_data = (*out)->mutable_data(); + for (const auto& buffer : buffers) { + std::memcpy(out_data, buffer->data(), buffer->size()); + out_data += buffer->size(); + } + return Status::OK(); +} + } // namespace arrow diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 306e677619f..1f1cd4bdbb1 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -198,6 +198,8 @@ class ARROW_EXPORT Buffer { ARROW_DISALLOW_COPY_AND_ASSIGN(Buffer); }; +using BufferVector = std::vector>; + /// \defgroup buffer-slicing-functions Functions for slicing buffers /// /// @{ @@ -402,6 +404,17 @@ Status AllocateEmptyBitmap(MemoryPool* pool, int64_t length, ARROW_EXPORT Status AllocateEmptyBitmap(int64_t length, std::shared_ptr* out); +/// \brief Concatenate multiple buffers into a single buffer +/// +/// \param[in] buffers to be concatenated +/// \param[in] pool memory pool to allocate the new buffer from +/// \param[out] out the concatenated buffer +/// +/// \return Status +ARROW_EXPORT +Status ConcatenateBuffers(const BufferVector& buffers, MemoryPool* pool, + std::shared_ptr* out); + /// @} } // namespace arrow diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index ba24f88e28b..feef3b555d9 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -29,6 +29,7 @@ arrow_install_all_headers("arrow/util") add_arrow_test(bit-util-test) add_arrow_test(checked-cast-test) add_arrow_test(compression-test) +add_arrow_test(concatenate-test) add_arrow_test(decimal-test) add_arrow_test(hashing-test) add_arrow_test(int-util-test) diff --git a/cpp/src/arrow/util/bit-util.cc b/cpp/src/arrow/util/bit-util.cc index 2e7bf2f03d5..033267beb45 100644 --- a/cpp/src/arrow/util/bit-util.cc +++ b/cpp/src/arrow/util/bit-util.cc @@ -210,8 +210,12 @@ Status TransferBitmap(MemoryPool* pool, const uint8_t* data, int64_t offset, } void CopyBitmap(const uint8_t* data, int64_t offset, int64_t length, uint8_t* dest, - int64_t dest_offset) { - TransferBitmap(data, offset, length, dest_offset, dest); + int64_t dest_offset, bool restore_trailing_bits) { + if (restore_trailing_bits) { + TransferBitmap(data, offset, length, dest_offset, dest); + } else { + TransferBitmap(data, offset, length, dest_offset, dest); + } } void InvertBitmap(const uint8_t* data, int64_t offset, int64_t length, uint8_t* dest, diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h index 22bf8fc858b..b7de112b85c 100644 --- a/cpp/src/arrow/util/bit-util.h +++ b/cpp/src/arrow/util/bit-util.h @@ -708,11 +708,12 @@ Status CopyBitmap(MemoryPool* pool, const uint8_t* bitmap, int64_t offset, int64 /// \param[in] offset bit offset into the source data /// \param[in] length number of bits to copy /// \param[in] dest_offset bit offset into the destination +/// \param[in] restore_trailing_bits don't clobber bits outside the destination range /// \param[out] dest the destination buffer, must have at least space for /// (offset + length) bits ARROW_EXPORT void CopyBitmap(const uint8_t* bitmap, int64_t offset, int64_t length, uint8_t* dest, - int64_t dest_offset); + int64_t dest_offset, bool restore_trailing_bits = true); /// Invert a bit range of an existing bitmap into an existing bitmap /// diff --git a/cpp/src/arrow/util/concatenate-test.cc b/cpp/src/arrow/util/concatenate-test.cc new file mode 100644 index 00000000000..8d9e9d6d62d --- /dev/null +++ b/cpp/src/arrow/util/concatenate-test.cc @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/array.h" +#include "arrow/buffer.h" +#include "arrow/status.h" +#include "arrow/testing/gtest_common.h" +#include "arrow/testing/random.h" +#include "arrow/testing/util.h" +#include "arrow/type.h" +#include "arrow/util/concatenate.h" + +namespace arrow { + +class ConcatenateTest : public ::testing::Test { + protected: + ConcatenateTest() + : rng_(seed_), + sizes_({0, 1, 2, 4, 16, 31, 1234}), + null_probabilities_({0.0, 0.1, 0.5, 0.9, 1.0}) {} + + std::vector Offsets(int32_t length, int32_t slice_count) { + std::vector offsets(static_cast(slice_count + 1)); + std::default_random_engine gen(seed_); + std::uniform_int_distribution dist(0, length); + std::generate(offsets.begin(), offsets.end(), [&] { return dist(gen); }); + std::sort(offsets.begin(), offsets.end()); + return offsets; + } + + ArrayVector Slices(const std::shared_ptr& array, + const std::vector& offsets) { + ArrayVector slices(offsets.size() - 1); + for (size_t i = 0; i != slices.size(); ++i) { + slices[i] = array->Slice(offsets[i], offsets[i + 1] - offsets[i]); + } + return slices; + } + + template + std::shared_ptr GeneratePrimitive(int64_t size, double null_probability) { + if (std::is_same::value) { + return rng_.Boolean(size, 0.5, null_probability); + } + return rng_.Numeric(size, 0, 127, null_probability); + } + + void CheckTrailingBitsAreZeroed(const std::shared_ptr& bitmap, int64_t length) { + if (auto preceding_bits = BitUtil::kPrecedingBitmask[length % 8]) { + auto last_byte = bitmap->data()[length / 8]; + ASSERT_EQ(static_cast(last_byte & preceding_bits), last_byte) + << length << " " << int(preceding_bits); + } + } + + template + void Check(ArrayFactory&& factory) { + for (auto size : this->sizes_) { + auto offsets = this->Offsets(size, 3); + for (auto null_probability : this->null_probabilities_) { + std::shared_ptr array; + factory(size, null_probability, &array); + auto expected = array->Slice(offsets.front(), offsets.back() - offsets.front()); + auto slices = this->Slices(array, offsets); + std::shared_ptr actual; + ASSERT_OK(Concatenate(slices, default_memory_pool(), &actual)); + AssertArraysEqual(*expected, *actual); + if (actual->data()->buffers[0]) { + CheckTrailingBitsAreZeroed(actual->data()->buffers[0], actual->length()); + } + if (actual->type_id() == Type::BOOL) { + CheckTrailingBitsAreZeroed(actual->data()->buffers[1], actual->length()); + } + } + } + } + + random::SeedType seed_ = 0xdeadbeef; + random::RandomArrayGenerator rng_; + std::vector sizes_; + std::vector null_probabilities_; +}; + +template +class PrimitiveConcatenateTest : public ConcatenateTest { + public: +}; + +using PrimitiveTypes = + ::testing::Types; +TYPED_TEST_CASE(PrimitiveConcatenateTest, PrimitiveTypes); + +TYPED_TEST(PrimitiveConcatenateTest, Primitives) { + this->Check([this](int64_t size, double null_probability, std::shared_ptr* out) { + *out = this->template GeneratePrimitive(size, null_probability); + }); +} + +TEST_F(ConcatenateTest, StringType) { + Check([this](int32_t size, double null_probability, std::shared_ptr* out) { + auto values_size = size * 4; + auto char_array = this->GeneratePrimitive(values_size, null_probability); + std::shared_ptr offsets; + auto offsets_vector = this->Offsets(values_size, size); + // ensure the first offset is 0, which is expected for StringType + offsets_vector[0] = 0; + ASSERT_OK(CopyBufferFromVector(offsets_vector, default_memory_pool(), &offsets)); + *out = MakeArray(ArrayData::Make( + utf8(), size, + {char_array->data()->buffers[0], offsets, char_array->data()->buffers[1]})); + }); +} + +TEST_F(ConcatenateTest, ListType) { + Check([this](int32_t size, double null_probability, std::shared_ptr* out) { + auto values_size = size * 4; + auto values = this->GeneratePrimitive(values_size, null_probability); + auto offsets_vector = this->Offsets(values_size, size); + // ensure the first offset is 0, which is expected for ListType + offsets_vector[0] = 0; + std::shared_ptr offsets; + ArrayFromVector(offsets_vector, &offsets); + ASSERT_OK(ListArray::FromArrays(*offsets, *values, default_memory_pool(), out)); + }); +} + +TEST_F(ConcatenateTest, StructType) { + Check([this](int32_t size, double null_probability, std::shared_ptr* out) { + auto foo = this->GeneratePrimitive(size, null_probability); + auto bar = this->GeneratePrimitive(size, null_probability); + auto baz = this->GeneratePrimitive(size, null_probability); + *out = std::make_shared( + struct_({field("foo", int8()), field("bar", float64()), field("baz", boolean())}), + size, ArrayVector{foo, bar, baz}); + }); +} + +TEST_F(ConcatenateTest, DictionaryType) { + Check([this](int32_t size, double null_probability, std::shared_ptr* out) { + auto indices = this->GeneratePrimitive(size, null_probability); + auto type = dictionary(int32(), this->GeneratePrimitive(128, 0)); + *out = std::make_shared(type, indices); + }); +} + +TEST_F(ConcatenateTest, DISABLED_UnionType) { + // sparse mode + Check([this](int32_t size, double null_probability, std::shared_ptr* out) { + auto foo = this->GeneratePrimitive(size, null_probability); + auto bar = this->GeneratePrimitive(size, null_probability); + auto baz = this->GeneratePrimitive(size, null_probability); + auto type_ids = rng_.Numeric(size, 0, 2, null_probability); + ASSERT_OK(UnionArray::MakeSparse(*type_ids, {foo, bar, baz}, out)); + }); + // dense mode + Check([this](int32_t size, double null_probability, std::shared_ptr* out) { + auto foo = this->GeneratePrimitive(size, null_probability); + auto bar = this->GeneratePrimitive(size, null_probability); + auto baz = this->GeneratePrimitive(size, null_probability); + auto type_ids = rng_.Numeric(size, 0, 2, null_probability); + auto value_offsets = rng_.Numeric(size, 0, size, 0); + ASSERT_OK(UnionArray::MakeDense(*type_ids, *value_offsets, {foo, bar, baz}, out)); + }); +} + +TEST_F(ConcatenateTest, OffsetOverflow) { + auto fake_long = ArrayFromJSON(utf8(), "[\"\"]"); + fake_long->data()->GetMutableValues(1)[1] = + std::numeric_limits::max(); + std::shared_ptr concatenated; + // XX since the data fake_long claims to own isn't there, this will segfault if + // Concatenate doesn't detect overflow and raise an error. + ASSERT_RAISES( + Invalid, Concatenate({fake_long, fake_long}, default_memory_pool(), &concatenated)); +} + +} // namespace arrow diff --git a/cpp/src/arrow/util/concatenate.cc b/cpp/src/arrow/util/concatenate.cc new file mode 100644 index 00000000000..b982d27153a --- /dev/null +++ b/cpp/src/arrow/util/concatenate.cc @@ -0,0 +1,321 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/util/concatenate.h" + +#include +#include +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/memory_pool.h" +#include "arrow/util/logging.h" +#include "arrow/util/visibility.h" +#include "arrow/visitor_inline.h" + +namespace arrow { + +/// offset, length pair for representing a Range of a buffer or array +struct Range { + int64_t offset, length; + + Range() : offset(-1), length(0) {} + Range(int64_t o, int64_t l) : offset(o), length(l) {} +}; + +/// non-owning view into a range of bits +struct Bitmap { + Bitmap() = default; + Bitmap(const uint8_t* d, Range r) : data(d), range(r) {} + explicit Bitmap(const std::shared_ptr& buffer, Range r) + : Bitmap(buffer ? buffer->data() : nullptr, r) {} + + const uint8_t* data; + Range range; + + bool AllSet() const { return data == nullptr; } +}; + +// Allocate a buffer and concatenate bitmaps into it. +static Status ConcatenateBitmaps(const std::vector& bitmaps, MemoryPool* pool, + std::shared_ptr* out) { + int64_t out_length = 0; + for (size_t i = 0; i < bitmaps.size(); ++i) { + out_length += bitmaps[i].range.length; + } + RETURN_NOT_OK(AllocateBitmap(pool, out_length, out)); + uint8_t* dst = (*out)->mutable_data(); + + int64_t bitmap_offset = 0; + for (size_t i = 0; i < bitmaps.size(); ++i) { + auto bitmap = bitmaps[i]; + if (bitmap.AllSet()) { + BitUtil::SetBitsTo(dst, bitmap_offset, bitmap.range.length, true); + } else { + internal::CopyBitmap(bitmap.data, bitmap.range.offset, bitmap.range.length, dst, + bitmap_offset, false); + } + bitmap_offset += bitmap.range.length; + } + + // finally (if applicable) zero out any trailing bits + if (auto preceding_bits = BitUtil::kPrecedingBitmask[out_length % 8]) { + dst[out_length / 8] &= preceding_bits; + } + return Status::OK(); +} + +// Write offsets in src into dst, adjusting them such that first_offset +// will be the first offset written. +template +static Status PutOffsets(const std::shared_ptr& src, Offset first_offset, + Offset* dst, Range* values_range); + +// Concatenate buffers holding offsets into a single buffer of offsets, +// also computing the ranges of values spanned by each buffer of offsets. +template +static Status ConcatenateOffsets(const BufferVector& buffers, MemoryPool* pool, + std::shared_ptr* out, + std::vector* values_ranges) { + values_ranges->resize(buffers.size()); + + // allocate output buffer + int64_t out_length = 0; + for (size_t i = 0; i < buffers.size(); ++i) { + out_length += buffers[i]->size() / sizeof(Offset); + } + RETURN_NOT_OK(AllocateBuffer(pool, (out_length + 1) * sizeof(Offset), out)); + auto dst = reinterpret_cast((*out)->mutable_data()); + + int64_t elements_length = 0; + Offset values_length = 0; + for (size_t i = 0; i < buffers.size(); ++i) { + // the first offset from buffers[i] will be adjusted to values_length + // (the cumulative length of values spanned by offsets in previous buffers) + RETURN_NOT_OK(PutOffsets(buffers[i], values_length, &dst[elements_length], + &values_ranges->at(i))); + elements_length += buffers[i]->size() / sizeof(Offset); + values_length += static_cast(values_ranges->at(i).length); + } + + // the final element in dst is the length of all values spanned by the offsets + dst[out_length] = values_length; + return Status::OK(); +} + +template +static Status PutOffsets(const std::shared_ptr& src, Offset first_offset, + Offset* dst, Range* values_range) { + // Get the range of offsets to transfer from src + auto src_begin = reinterpret_cast(src->data()); + auto src_end = reinterpret_cast(src->data() + src->size()); + + // Compute the range of values which is spanned by this range of offsets + values_range->offset = src_begin[0]; + values_range->length = *src_end - values_range->offset; + if (first_offset > std::numeric_limits::max() - values_range->length) { + return Status::Invalid("offset overflow while concatenating arrays"); + } + + // Write offsets into dst, ensuring that the first offset written is + // first_offset + auto adjustment = first_offset - src_begin[0]; + std::transform(src_begin, src_end, dst, + [adjustment](Offset offset) { return offset + adjustment; }); + return Status::OK(); +} + +class ConcatenateImpl { + public: + ConcatenateImpl(const std::vector& in, MemoryPool* pool) + : in_(in), pool_(pool) { + out_.type = in[0].type; + for (size_t i = 0; i < in_.size(); ++i) { + out_.length += in[i].length; + if (out_.null_count == kUnknownNullCount || in[i].null_count == kUnknownNullCount) { + out_.null_count = kUnknownNullCount; + continue; + } + out_.null_count += in[i].null_count; + } + out_.buffers.resize(in[0].buffers.size()); + out_.child_data.resize(in[0].child_data.size()); + for (auto& data : out_.child_data) { + data = std::make_shared(); + } + } + + Status Concatenate(ArrayData* out) && { + if (out_.null_count != 0) { + RETURN_NOT_OK(ConcatenateBitmaps(Bitmaps(0), pool_, &out_.buffers[0])); + } + RETURN_NOT_OK(VisitTypeInline(*out_.type, this)); + *out = std::move(out_); + return Status::OK(); + } + + Status Visit(const NullType&) { return Status::OK(); } + + Status Visit(const BooleanType&) { + return ConcatenateBitmaps(Bitmaps(1), pool_, &out_.buffers[1]); + } + + Status Visit(const FixedWidthType& fixed) { + // handles numbers, decimal128, fixed_size_binary + return ConcatenateBuffers(Buffers(1, fixed), pool_, &out_.buffers[1]); + } + + Status Visit(const BinaryType&) { + std::vector value_ranges; + RETURN_NOT_OK(ConcatenateOffsets(Buffers(1, *offset_type), pool_, + &out_.buffers[1], &value_ranges)); + return ConcatenateBuffers(Buffers(2, value_ranges), pool_, &out_.buffers[2]); + } + + Status Visit(const ListType&) { + std::vector value_ranges; + RETURN_NOT_OK(ConcatenateOffsets(Buffers(1, *offset_type), pool_, + &out_.buffers[1], &value_ranges)); + return ConcatenateImpl(ChildData(0, value_ranges), pool_) + .Concatenate(out_.child_data[0].get()); + } + + Status Visit(const StructType& s) { + for (int i = 0; i < s.num_children(); ++i) { + RETURN_NOT_OK( + ConcatenateImpl(ChildData(i), pool_).Concatenate(out_.child_data[i].get())); + } + return Status::OK(); + } + + Status Visit(const DictionaryType& d) { + auto fixed = internal::checked_cast(d.index_type().get()); + return ConcatenateBuffers(Buffers(1, *fixed), pool_, &out_.buffers[1]); + } + + Status Visit(const UnionType& u) { + return Status::NotImplemented("concatenation of ", u); + } + + Status Visit(const ExtensionType& e) { + // XXX can we just concatenate their storage? + return Status::NotImplemented("concatenation of ", e); + } + + private: + // Gather the index-th buffer of each input into a vector. + // Bytes are sliced with that input's offset and length. + BufferVector Buffers(size_t index) { + BufferVector buffers(in_.size()); + for (size_t i = 0; i < in_.size(); ++i) { + buffers[i] = SliceBuffer(in_[i].buffers[index], in_[i].offset, in_[i].length); + } + return buffers; + } + + // Gather the index-th buffer of each input into a vector. + // Bytes are sliced with the explicitly passed ranges. + BufferVector Buffers(size_t index, const std::vector& ranges) { + DCHECK_EQ(in_.size(), ranges.size()); + BufferVector buffers(in_.size()); + for (size_t i = 0; i < in_.size(); ++i) { + buffers[i] = SliceBuffer(in_[i].buffers[index], ranges[i].offset, ranges[i].length); + } + return buffers; + } + + // Gather the index-th buffer of each input into a vector. + // Buffers are assumed to contain elements of fixed.bit_width(), + // those elements are sliced with that input's offset and length. + BufferVector Buffers(size_t index, const FixedWidthType& fixed) { + DCHECK_EQ(fixed.bit_width() % 8, 0); + auto byte_width = fixed.bit_width() / 8; + BufferVector buffers(in_.size()); + for (size_t i = 0; i < in_.size(); ++i) { + buffers[i] = SliceBuffer(in_[i].buffers[index], in_[i].offset * byte_width, + in_[i].length * byte_width); + } + return buffers; + } + + // Gather the index-th buffer of each input as a Bitmap + // into a vector of Bitmaps. + std::vector Bitmaps(size_t index) { + std::vector bitmaps(in_.size()); + for (size_t i = 0; i < in_.size(); ++i) { + Range range(in_[i].offset, in_[i].length); + bitmaps[i] = Bitmap(in_[i].buffers[index], range); + } + return bitmaps; + } + + // Gather the index-th child_data of each input into a vector. + // Elements are sliced with that input's offset and length. + std::vector ChildData(size_t index) { + std::vector child_data(in_.size()); + for (size_t i = 0; i < in_.size(); ++i) { + child_data[i] = in_[i].child_data[index]->Slice(in_[i].offset, in_[i].length); + } + return child_data; + } + + // Gather the index-th child_data of each input into a vector. + // Elements are sliced with the explicitly passed ranges. + std::vector ChildData(size_t index, const std::vector& ranges) { + DCHECK_EQ(in_.size(), ranges.size()); + std::vector child_data(in_.size()); + for (size_t i = 0; i < in_.size(); ++i) { + child_data[i] = in_[i].child_data[index]->Slice(ranges[i].offset, ranges[i].length); + } + return child_data; + } + + static const std::shared_ptr offset_type; + const std::vector& in_; + MemoryPool* pool_; + ArrayData out_; +}; + +const std::shared_ptr ConcatenateImpl::offset_type = + std::static_pointer_cast(int32()); + +Status Concatenate(const ArrayVector& arrays, MemoryPool* pool, + std::shared_ptr* out) { + if (arrays.size() == 0) { + return Status::Invalid("Must pass at least one array"); + } + + // gather ArrayData of input arrays + std::vector data(arrays.size()); + for (size_t i = 0; i < arrays.size(); ++i) { + if (!arrays[i]->type()->Equals(*arrays[0]->type())) { + return Status::Invalid("arrays to be concatenated must be identically typed, but ", + *arrays[0]->type(), " and ", *arrays[i]->type(), + " were encountered."); + } + data[i] = ArrayData(*arrays[i]->data()); + } + + ArrayData out_data; + RETURN_NOT_OK(ConcatenateImpl(data, pool).Concatenate(&out_data)); + *out = MakeArray(std::make_shared(std::move(out_data))); + return Status::OK(); +} + +} // namespace arrow diff --git a/cpp/src/arrow/util/concatenate.h b/cpp/src/arrow/util/concatenate.h new file mode 100644 index 00000000000..67738d547f4 --- /dev/null +++ b/cpp/src/arrow/util/concatenate.h @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/array.h" +#include "arrow/memory_pool.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +/// \brief Concatenate arrays +/// +/// \param[in] arrays a vector of arrays to be concatenated +/// \param[in] pool memory to store the result will be allocated from this memory pool +/// \param[out] out the resulting concatenated array +/// \return Status +ARROW_EXPORT +Status Concatenate(const ArrayVector& arrays, MemoryPool* pool, + std::shared_ptr* out); + +} // namespace arrow