From 5bab9c2f5b428b4d00222c0b2d3d9dddb51b5c52 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Fri, 2 Jun 2017 16:03:45 +0200 Subject: [PATCH] ARROW-1073: C++: Adapative integer builder Change-Id: I5309b506174c2fcafd6c168069fa81a5af4122fb --- cpp/src/arrow/array-test.cc | 220 +++++++++++++++++++ cpp/src/arrow/builder-benchmark.cc | 66 ++++++ cpp/src/arrow/builder.cc | 331 +++++++++++++++++++++++++++++ cpp/src/arrow/builder.h | 190 ++++++++++++++++- 4 files changed, 806 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc index 636d97f9d05..beffa1b11cb 100644 --- a/cpp/src/arrow/array-test.cc +++ b/cpp/src/arrow/array-test.cc @@ -1257,6 +1257,226 @@ TEST_F(TestFWBinaryArray, Slice) { ASSERT_TRUE(array->RangeEquals(1, 3, 0, slice)); } +// ---------------------------------------------------------------------- +// AdaptiveInt tests + +class TestAdaptiveIntBuilder : public TestBuilder { + public: + void SetUp() { + TestBuilder::SetUp(); + builder_ = std::make_shared(pool_); + } + + void Done() { EXPECT_OK(builder_->Finish(&result_)); } + + protected: + std::shared_ptr builder_; + + std::shared_ptr expected_; + std::shared_ptr result_; +}; + +TEST_F(TestAdaptiveIntBuilder, TestInt8) { + builder_->Append(0); + builder_->Append(127); + builder_->Append(-128); + + Done(); + + std::vector expected_values({0, 127, -128}); + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveIntBuilder, TestInt16) { + builder_->Append(0); + builder_->Append(128); + Done(); + + std::vector expected_values({0, 128}); + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(-129); + expected_values = {-129}; + Done(); + + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(std::numeric_limits::max()); + builder_->Append(std::numeric_limits::min()); + expected_values = { + std::numeric_limits::max(), std::numeric_limits::min()}; + Done(); + + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveIntBuilder, TestInt32) { + builder_->Append(0); + builder_->Append(static_cast(std::numeric_limits::max()) + 1); + Done(); + + std::vector expected_values( + {0, static_cast(std::numeric_limits::max()) + 1}); + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(static_cast(std::numeric_limits::min()) - 1); + expected_values = {static_cast(std::numeric_limits::min()) - 1}; + Done(); + + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(std::numeric_limits::max()); + builder_->Append(std::numeric_limits::min()); + expected_values = { + std::numeric_limits::max(), std::numeric_limits::min()}; + Done(); + + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveIntBuilder, TestInt64) { + builder_->Append(0); + builder_->Append(static_cast(std::numeric_limits::max()) + 1); + Done(); + + std::vector expected_values( + {0, static_cast(std::numeric_limits::max()) + 1}); + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(static_cast(std::numeric_limits::min()) - 1); + expected_values = {static_cast(std::numeric_limits::min()) - 1}; + Done(); + + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(std::numeric_limits::max()); + builder_->Append(std::numeric_limits::min()); + expected_values = { + std::numeric_limits::max(), std::numeric_limits::min()}; + Done(); + + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveIntBuilder, TestAppendVector) { + std::vector expected_values( + {0, static_cast(std::numeric_limits::max()) + 1}); + builder_->Append(expected_values.data(), expected_values.size()); + Done(); + + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +class TestAdaptiveUIntBuilder : public TestBuilder { + public: + void SetUp() { + TestBuilder::SetUp(); + builder_ = std::make_shared(pool_); + } + + void Done() { EXPECT_OK(builder_->Finish(&result_)); } + + protected: + std::shared_ptr builder_; + + std::shared_ptr expected_; + std::shared_ptr result_; +}; + +TEST_F(TestAdaptiveUIntBuilder, TestUInt8) { + builder_->Append(0); + builder_->Append(255); + + Done(); + + std::vector expected_values({0, 255}); + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveUIntBuilder, TestUInt16) { + builder_->Append(0); + builder_->Append(256); + Done(); + + std::vector expected_values({0, 256}); + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(std::numeric_limits::max()); + expected_values = {std::numeric_limits::max()}; + Done(); + + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveUIntBuilder, TestUInt32) { + builder_->Append(0); + builder_->Append(static_cast(std::numeric_limits::max()) + 1); + Done(); + + std::vector expected_values( + {0, static_cast(std::numeric_limits::max()) + 1}); + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(std::numeric_limits::max()); + expected_values = {std::numeric_limits::max()}; + Done(); + + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveUIntBuilder, TestUInt64) { + builder_->Append(0); + builder_->Append(static_cast(std::numeric_limits::max()) + 1); + Done(); + + std::vector expected_values( + {0, static_cast(std::numeric_limits::max()) + 1}); + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); + + SetUp(); + builder_->Append(std::numeric_limits::max()); + expected_values = {std::numeric_limits::max()}; + Done(); + + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + +TEST_F(TestAdaptiveUIntBuilder, TestAppendVector) { + std::vector expected_values( + {0, static_cast(std::numeric_limits::max()) + 1}); + builder_->Append(expected_values.data(), expected_values.size()); + Done(); + + ArrayFromVector(expected_values, &expected_); + ASSERT_TRUE(expected_->Equals(result_)); +} + // ---------------------------------------------------------------------- // List tests diff --git a/cpp/src/arrow/builder-benchmark.cc b/cpp/src/arrow/builder-benchmark.cc index b0c3cd19064..62f2fd620a9 100644 --- a/cpp/src/arrow/builder-benchmark.cc +++ b/cpp/src/arrow/builder-benchmark.cc @@ -61,4 +61,70 @@ static void BM_BuildVectorNoNulls( BENCHMARK(BM_BuildVectorNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond); +static void BM_BuildAdaptiveIntNoNulls( + benchmark::State& state) { // NOLINT non-const reference + int64_t size = static_cast(std::numeric_limits::max()) * 256; + int64_t chunk_size = size / 8; + std::vector data; + for (int64_t i = 0; i < size; i++) { + data.push_back(i); + } + while (state.KeepRunning()) { + AdaptiveIntBuilder builder(default_memory_pool()); + for (int64_t i = 0; i < size; i += chunk_size) { + // Build up an array of 512 MiB in size + builder.Append(data.data() + i, chunk_size, nullptr); + } + std::shared_ptr out; + builder.Finish(&out); + } + state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t)); +} + +BENCHMARK(BM_BuildAdaptiveIntNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond); + +static void BM_BuildAdaptiveIntNoNullsScalarAppend( + benchmark::State& state) { // NOLINT non-const reference + int64_t size = static_cast(std::numeric_limits::max()) * 256; + std::vector data; + for (int64_t i = 0; i < size; i++) { + data.push_back(i); + } + while (state.KeepRunning()) { + AdaptiveIntBuilder builder(default_memory_pool()); + for (int64_t i = 0; i < size; i++) { + builder.Append(data[i]); + } + std::shared_ptr out; + builder.Finish(&out); + } + state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t)); +} + +BENCHMARK(BM_BuildAdaptiveIntNoNullsScalarAppend) + ->Repetitions(3) + ->Unit(benchmark::kMillisecond); + +static void BM_BuildAdaptiveUIntNoNulls( + benchmark::State& state) { // NOLINT non-const reference + int64_t size = static_cast(std::numeric_limits::max()) * 256; + int64_t chunk_size = size / 8; + std::vector data; + for (uint64_t i = 0; i < static_cast(size); i++) { + data.push_back(i); + } + while (state.KeepRunning()) { + AdaptiveUIntBuilder builder(default_memory_pool()); + for (int64_t i = 0; i < size; i += chunk_size) { + // Build up an array of 512 MiB in size + builder.Append(data.data() + i, chunk_size, nullptr); + } + std::shared_ptr out; + builder.Finish(&out); + } + state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t)); +} + +BENCHMARK(BM_BuildAdaptiveUIntNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond); + } // namespace arrow diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc index ab43c2a51ba..6762e172676 100644 --- a/cpp/src/arrow/builder.cc +++ b/cpp/src/arrow/builder.cc @@ -17,6 +17,7 @@ #include "arrow/builder.h" +#include #include #include #include @@ -248,6 +249,336 @@ template class PrimitiveBuilder; template class PrimitiveBuilder; template class PrimitiveBuilder; +AdaptiveIntBuilderBase::AdaptiveIntBuilderBase(MemoryPool* pool) + : ArrayBuilder(pool, int64()), data_(nullptr), raw_data_(nullptr), int_size_(1) {} + +Status AdaptiveIntBuilderBase::Init(int64_t capacity) { + RETURN_NOT_OK(ArrayBuilder::Init(capacity)); + data_ = std::make_shared(pool_); + + int64_t nbytes = capacity * int_size_; + RETURN_NOT_OK(data_->Resize(nbytes)); + // TODO(emkornfield) valgrind complains without this + memset(data_->mutable_data(), 0, static_cast(nbytes)); + + raw_data_ = reinterpret_cast(data_->mutable_data()); + return Status::OK(); +} + +Status AdaptiveIntBuilderBase::Resize(int64_t capacity) { + // XXX: Set floor size for now + if (capacity < kMinBuilderCapacity) { capacity = kMinBuilderCapacity; } + + if (capacity_ == 0) { + RETURN_NOT_OK(Init(capacity)); + } else { + RETURN_NOT_OK(ArrayBuilder::Resize(capacity)); + const int64_t old_bytes = data_->size(); + const int64_t new_bytes = capacity * int_size_; + RETURN_NOT_OK(data_->Resize(new_bytes)); + raw_data_ = data_->mutable_data(); + // TODO(emkornfield) valgrind complains without this + memset( + data_->mutable_data() + old_bytes, 0, static_cast(new_bytes - old_bytes)); + } + return Status::OK(); +} + +AdaptiveIntBuilder::AdaptiveIntBuilder(MemoryPool* pool) : AdaptiveIntBuilderBase(pool) {} + +Status AdaptiveIntBuilder::Finish(std::shared_ptr* out) { + const int64_t bytes_required = length_ * int_size_; + if (bytes_required > 0 && bytes_required < data_->size()) { + // Trim buffers + RETURN_NOT_OK(data_->Resize(bytes_required)); + } + switch (int_size_) { + case 1: + *out = + std::make_shared(int8(), length_, data_, null_bitmap_, null_count_); + break; + case 2: + *out = std::make_shared( + int16(), length_, data_, null_bitmap_, null_count_); + break; + case 4: + *out = std::make_shared( + int32(), length_, data_, null_bitmap_, null_count_); + break; + case 8: + *out = std::make_shared( + int64(), length_, data_, null_bitmap_, null_count_); + break; + default: + DCHECK(false); + return Status::NotImplemented("Only ints of size 1,2,4,8 are supported"); + } + + data_ = null_bitmap_ = nullptr; + capacity_ = length_ = null_count_ = 0; + return Status::OK(); +} + +Status AdaptiveIntBuilder::Append( + const int64_t* values, int64_t length, const uint8_t* valid_bytes) { + RETURN_NOT_OK(Reserve(length)); + + if (length > 0) { + if (int_size_ < 8) { + uint8_t new_int_size = int_size_; + for (int64_t i = 0; i < length; i++) { + if (valid_bytes == nullptr || valid_bytes[i]) { + new_int_size = expanded_int_size(values[i], new_int_size); + } + } + if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); } + } + } + + if (int_size_ == 8) { + std::memcpy(reinterpret_cast(raw_data_) + length_, values, + sizeof(int64_t) * length); + } else { + // int_size_ may have changed, so we need to recheck + switch (int_size_) { + case 1: { + int8_t* data_ptr = reinterpret_cast(raw_data_) + length_; + std::transform(values, values + length, data_ptr, + [](int64_t x) { return static_cast(x); }); + } break; + case 2: { + int16_t* data_ptr = reinterpret_cast(raw_data_) + length_; + std::transform(values, values + length, data_ptr, + [](int64_t x) { return static_cast(x); }); + } break; + case 4: { + int32_t* data_ptr = reinterpret_cast(raw_data_) + length_; + std::transform(values, values + length, data_ptr, + [](int64_t x) { return static_cast(x); }); + } break; + default: + DCHECK(false); + } + } + + // length_ is update by these + ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length); + + return Status::OK(); +} + +template +typename std::enable_if= sizeof(new_type), Status>::type +AdaptiveIntBuilder::ExpandIntSizeInternal() { + return Status::OK(); +} + +#define __LESS(a, b) (a) < (b) +template +typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type +AdaptiveIntBuilder::ExpandIntSizeInternal() { + int_size_ = sizeof(new_type); + RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type))); + + old_type* src = reinterpret_cast(raw_data_); + new_type* dst = reinterpret_cast(raw_data_); + // By doing the backward copy, we ensure that no element is overriden during + // the copy process and the copy stays in-place. + std::copy_backward(src, src + length_, dst + length_); + + return Status::OK(); +} +#undef __LESS + +template +Status AdaptiveIntBuilder::ExpandIntSizeN() { + switch (int_size_) { + case 1: + RETURN_NOT_OK((ExpandIntSizeInternal())); + break; + case 2: + RETURN_NOT_OK((ExpandIntSizeInternal())); + break; + case 4: + RETURN_NOT_OK((ExpandIntSizeInternal())); + break; + case 8: + RETURN_NOT_OK((ExpandIntSizeInternal())); + break; + default: + DCHECK(false); + } + return Status::OK(); +} + +Status AdaptiveIntBuilder::ExpandIntSize(uint8_t new_int_size) { + switch (new_int_size) { + case 1: + RETURN_NOT_OK((ExpandIntSizeN())); + break; + case 2: + RETURN_NOT_OK((ExpandIntSizeN())); + break; + case 4: + RETURN_NOT_OK((ExpandIntSizeN())); + break; + case 8: + RETURN_NOT_OK((ExpandIntSizeN())); + break; + default: + DCHECK(false); + } + return Status::OK(); +} + +AdaptiveUIntBuilder::AdaptiveUIntBuilder(MemoryPool* pool) + : AdaptiveIntBuilderBase(pool) {} + +Status AdaptiveUIntBuilder::Finish(std::shared_ptr* out) { + const int64_t bytes_required = length_ * int_size_; + if (bytes_required > 0 && bytes_required < data_->size()) { + // Trim buffers + RETURN_NOT_OK(data_->Resize(bytes_required)); + } + switch (int_size_) { + case 1: + *out = std::make_shared( + uint8(), length_, data_, null_bitmap_, null_count_); + break; + case 2: + *out = std::make_shared( + uint16(), length_, data_, null_bitmap_, null_count_); + break; + case 4: + *out = std::make_shared( + uint32(), length_, data_, null_bitmap_, null_count_); + break; + case 8: + *out = std::make_shared( + uint64(), length_, data_, null_bitmap_, null_count_); + break; + default: + DCHECK(false); + return Status::NotImplemented("Only ints of size 1,2,4,8 are supported"); + } + + data_ = null_bitmap_ = nullptr; + capacity_ = length_ = null_count_ = 0; + return Status::OK(); +} + +Status AdaptiveUIntBuilder::Append( + const uint64_t* values, int64_t length, const uint8_t* valid_bytes) { + RETURN_NOT_OK(Reserve(length)); + + if (length > 0) { + if (int_size_ < 8) { + uint8_t new_int_size = int_size_; + for (int64_t i = 0; i < length; i++) { + if (valid_bytes == nullptr || valid_bytes[i]) { + new_int_size = expanded_uint_size(values[i], new_int_size); + } + } + if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); } + } + } + + if (int_size_ == 8) { + std::memcpy(reinterpret_cast(raw_data_) + length_, values, + sizeof(uint64_t) * length); + } else { + // int_size_ may have changed, so we need to recheck + switch (int_size_) { + case 1: { + uint8_t* data_ptr = reinterpret_cast(raw_data_) + length_; + std::transform(values, values + length, data_ptr, + [](uint64_t x) { return static_cast(x); }); + } break; + case 2: { + uint16_t* data_ptr = reinterpret_cast(raw_data_) + length_; + std::transform(values, values + length, data_ptr, + [](uint64_t x) { return static_cast(x); }); + } break; + case 4: { + uint32_t* data_ptr = reinterpret_cast(raw_data_) + length_; + std::transform(values, values + length, data_ptr, + [](uint64_t x) { return static_cast(x); }); + } break; + default: + DCHECK(false); + } + } + + // length_ is update by these + ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length); + + return Status::OK(); +} + +template +typename std::enable_if= sizeof(new_type), Status>::type +AdaptiveUIntBuilder::ExpandIntSizeInternal() { + return Status::OK(); +} + +#define __LESS(a, b) (a) < (b) +template +typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type +AdaptiveUIntBuilder::ExpandIntSizeInternal() { + int_size_ = sizeof(new_type); + RETURN_NOT_OK(Resize(data_->size() / sizeof(old_type))); + + old_type* src = reinterpret_cast(raw_data_); + new_type* dst = reinterpret_cast(raw_data_); + // By doing the backward copy, we ensure that no element is overriden during + // the copy process and the copy stays in-place. + std::copy_backward(src, src + length_, dst + length_); + + return Status::OK(); +} +#undef __LESS + +template +Status AdaptiveUIntBuilder::ExpandIntSizeN() { + switch (int_size_) { + case 1: + RETURN_NOT_OK((ExpandIntSizeInternal())); + break; + case 2: + RETURN_NOT_OK((ExpandIntSizeInternal())); + break; + case 4: + RETURN_NOT_OK((ExpandIntSizeInternal())); + break; + case 8: + RETURN_NOT_OK((ExpandIntSizeInternal())); + break; + default: + DCHECK(false); + } + return Status::OK(); +} + +Status AdaptiveUIntBuilder::ExpandIntSize(uint8_t new_int_size) { + switch (new_int_size) { + case 1: + RETURN_NOT_OK((ExpandIntSizeN())); + break; + case 2: + RETURN_NOT_OK((ExpandIntSizeN())); + break; + case 4: + RETURN_NOT_OK((ExpandIntSizeN())); + break; + case 8: + RETURN_NOT_OK((ExpandIntSizeN())); + break; + default: + DCHECK(false); + } + return Status::OK(); +} + BooleanBuilder::BooleanBuilder(MemoryPool* pool) : ArrayBuilder(pool, boolean()), data_(nullptr), raw_data_(nullptr) {} diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h index 68769165b02..d77223e7883 100644 --- a/cpp/src/arrow/builder.h +++ b/cpp/src/arrow/builder.h @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -247,6 +248,193 @@ using HalfFloatBuilder = NumericBuilder; using FloatBuilder = NumericBuilder; using DoubleBuilder = NumericBuilder; +class ARROW_EXPORT AdaptiveIntBuilderBase : public ArrayBuilder { + public: + explicit AdaptiveIntBuilderBase(MemoryPool* pool); + + /// Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory + Status AppendNulls(const uint8_t* valid_bytes, int64_t length) { + RETURN_NOT_OK(Reserve(length)); + UnsafeAppendToBitmap(valid_bytes, length); + return Status::OK(); + } + + Status AppendNull() { + RETURN_NOT_OK(Reserve(1)); + UnsafeAppendToBitmap(false); + return Status::OK(); + } + + std::shared_ptr data() const { return data_; } + + Status Init(int64_t capacity) override; + + /// Increase the capacity of the builder to accommodate at least the indicated + /// number of elements + Status Resize(int64_t capacity) override; + + protected: + std::shared_ptr data_; + uint8_t* raw_data_; + + uint8_t int_size_; +}; + +// Check if we would need to expand the underlying storage type +inline uint8_t expanded_uint_size(uint64_t val, uint8_t current_int_size) { + if (current_int_size == 8 || + (current_int_size < 8 && + (val > static_cast(std::numeric_limits::max())))) { + return 8; + } else if (current_int_size == 4 || + (current_int_size < 4 && + (val > static_cast(std::numeric_limits::max())))) { + return 4; + } else if (current_int_size == 2 || + (current_int_size == 1 && + (val > static_cast(std::numeric_limits::max())))) { + return 2; + } else { + return 1; + } +} + +class ARROW_EXPORT AdaptiveUIntBuilder : public AdaptiveIntBuilderBase { + public: + explicit AdaptiveUIntBuilder(MemoryPool* pool); + + using ArrayBuilder::Advance; + + /// Scalar append + Status Append(uint64_t val) { + RETURN_NOT_OK(Reserve(1)); + BitUtil::SetBit(null_bitmap_data_, length_); + + uint8_t new_int_size = expanded_uint_size(val, int_size_); + if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); } + + switch (int_size_) { + case 1: + reinterpret_cast(raw_data_)[length_++] = static_cast(val); + break; + case 2: + reinterpret_cast(raw_data_)[length_++] = static_cast(val); + break; + case 4: + reinterpret_cast(raw_data_)[length_++] = static_cast(val); + break; + case 8: + reinterpret_cast(raw_data_)[length_++] = val; + break; + default: + return Status::NotImplemented("This code shall never be reached"); + } + return Status::OK(); + } + + /// Vector append + /// + /// If passed, valid_bytes is of equal length to values, and any zero byte + /// will be considered as a null for that slot + Status Append( + const uint64_t* values, int64_t length, const uint8_t* valid_bytes = nullptr); + + Status ExpandIntSize(uint8_t new_int_size); + Status Finish(std::shared_ptr* out) override; + + protected: + template + typename std::enable_if= sizeof(new_type), Status>::type + ExpandIntSizeInternal(); +#define __LESS(a, b) (a) < (b) + template + typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type + ExpandIntSizeInternal(); +#undef __LESS + + template + Status ExpandIntSizeN(); +}; + +// Check if we would need to expand the underlying storage type +inline uint8_t expanded_int_size(int64_t val, uint8_t current_int_size) { + if (current_int_size == 8 || + (current_int_size < 8 && + (val > static_cast(std::numeric_limits::max()) || + val < static_cast(std::numeric_limits::min())))) { + return 8; + } else if (current_int_size == 4 || + (current_int_size < 4 && + (val > static_cast(std::numeric_limits::max()) || + val < static_cast(std::numeric_limits::min())))) { + return 4; + } else if (current_int_size == 2 || + (current_int_size == 1 && + (val > static_cast(std::numeric_limits::max()) || + val < static_cast(std::numeric_limits::min())))) { + return 2; + } else { + return 1; + } +} + +class ARROW_EXPORT AdaptiveIntBuilder : public AdaptiveIntBuilderBase { + public: + explicit AdaptiveIntBuilder(MemoryPool* pool); + + using ArrayBuilder::Advance; + + /// Scalar append + Status Append(int64_t val) { + RETURN_NOT_OK(Reserve(1)); + BitUtil::SetBit(null_bitmap_data_, length_); + + uint8_t new_int_size = expanded_int_size(val, int_size_); + if (new_int_size != int_size_) { RETURN_NOT_OK(ExpandIntSize(new_int_size)); } + + switch (int_size_) { + case 1: + reinterpret_cast(raw_data_)[length_++] = static_cast(val); + break; + case 2: + reinterpret_cast(raw_data_)[length_++] = static_cast(val); + break; + case 4: + reinterpret_cast(raw_data_)[length_++] = static_cast(val); + break; + case 8: + reinterpret_cast(raw_data_)[length_++] = val; + break; + default: + return Status::NotImplemented("This code shall never be reached"); + } + return Status::OK(); + } + + /// Vector append + /// + /// If passed, valid_bytes is of equal length to values, and any zero byte + /// will be considered as a null for that slot + Status Append( + const int64_t* values, int64_t length, const uint8_t* valid_bytes = nullptr); + + Status ExpandIntSize(uint8_t new_int_size); + Status Finish(std::shared_ptr* out) override; + + protected: + template + typename std::enable_if= sizeof(new_type), Status>::type + ExpandIntSizeInternal(); +#define __LESS(a, b) (a) < (b) + template + typename std::enable_if<__LESS(sizeof(old_type), sizeof(new_type)), Status>::type + ExpandIntSizeInternal(); +#undef __LESS + + template + Status ExpandIntSizeN(); +}; + class ARROW_EXPORT BooleanBuilder : public ArrayBuilder { public: explicit BooleanBuilder(MemoryPool* pool); @@ -271,7 +459,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder { /// Scalar append Status Append(bool val) { - Reserve(1); + RETURN_NOT_OK(Reserve(1)); BitUtil::SetBit(null_bitmap_data_, length_); if (val) { BitUtil::SetBit(raw_data_, length_);