From 34834f711c18c4a9e6910b6322d23ffcc28f3651 Mon Sep 17 00:00:00 2001 From: Alessandro Andrioni Date: Tue, 1 May 2018 03:21:54 +0200 Subject: [PATCH 01/20] First try at implementing a CountValues kernel --- cpp/src/arrow/compute/kernels/hash-test.cc | 93 +++++++++++++++++++- cpp/src/arrow/compute/kernels/hash.cc | 99 ++++++++++++++++++++++ cpp/src/arrow/compute/kernels/hash.h | 17 ++++ 3 files changed, 208 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/kernels/hash-test.cc b/cpp/src/arrow/compute/kernels/hash-test.cc index 84eec8ba1d7..640aeb3475d 100644 --- a/cpp/src/arrow/compute/kernels/hash-test.cc +++ b/cpp/src/arrow/compute/kernels/hash-test.cc @@ -64,6 +64,23 @@ void CheckUnique(FunctionContext* ctx, const shared_ptr& type, ASSERT_ARRAYS_EQUAL(*expected, *result); } +template +void CheckCountValues(FunctionContext* ctx, const shared_ptr& type, + const vector& in_values, const vector& in_is_valid, + const vector& out_values, const vector& out_is_valid, + const vector& out_counts) { + shared_ptr input = _MakeArray(type, in_values, in_is_valid); + shared_ptr ex_values = _MakeArray(type, out_values, out_is_valid); + shared_ptr ex_counts = + _MakeArray(int64(), out_counts, out_is_valid); + + shared_ptr result_values; + shared_ptr result_counts; + ASSERT_OK(CountValues(ctx, Datum(input), &result_values, &result_counts)); + ASSERT_ARRAYS_EQUAL(*ex_values, *result_values); + ASSERT_ARRAYS_EQUAL(*ex_counts, *result_counts); +} + template void CheckDictEncode(FunctionContext* ctx, const shared_ptr& type, const vector& in_values, const vector& in_is_valid, @@ -104,6 +121,14 @@ TYPED_TEST(TestHashKernelPrimitive, Unique) { {3, 1}, {}); } +TYPED_TEST(TestHashKernelPrimitive, CountValues) { + using T = typename TypeParam::c_type; + auto type = TypeTraits::type_singleton(); + CheckCountValues(&this->ctx_, type, {2, 1, 2, 1, 2, 3, 4}, + {true, false, true, true, true, true, false}, {2, 1, 3}, + {}, {3, 1, 1}); +} + TYPED_TEST(TestHashKernelPrimitive, DictEncode) { using T = typename TypeParam::c_type; auto type = TypeTraits::type_singleton(); @@ -121,19 +146,21 @@ TYPED_TEST(TestHashKernelPrimitive, PrimitiveResizeTable) { vector values; vector uniques; vector indices; + vector counts; for (int64_t i = 0; i < kTotalValues * kRepeats; i++) { const auto val = static_cast(i % kTotalValues); values.push_back(val); if (i < kTotalValues) { uniques.push_back(val); + counts.push_back(kRepeats); } indices.push_back(static_cast(i % kTotalValues)); } auto type = TypeTraits::type_singleton(); CheckUnique(&this->ctx_, type, values, {}, uniques, {}); - + CheckCountValues(&this->ctx_, type, values, {}, uniques, {}, counts); CheckDictEncode(&this->ctx_, type, values, {}, uniques, {}, indices); } @@ -149,6 +176,19 @@ TEST_F(TestHashKernel, UniqueTimeTimestamp) { {}); } +TEST_F(TestHashKernel, CountValuesTimeTimestamp) { + CheckCountValues(&this->ctx_, time32(TimeUnit::SECOND), + {2, 1, 2, 1}, {true, false, true, true}, {2, 1}, + {}, {2, 1}); + + CheckCountValues(&this->ctx_, time64(TimeUnit::NANO), {2, 1, 2, 1}, + {true, false, true, true}, {2, 1}, {}, {2, 1}); + + CheckCountValues(&this->ctx_, timestamp(TimeUnit::NANO), + {2, 1, 2, 1}, {true, false, true, true}, + {2, 1}, {}, {2, 1}); +} + TEST_F(TestHashKernel, UniqueBoolean) { CheckUnique(&this->ctx_, boolean(), {true, true, false, true}, {true, false, true, true}, {true, false}, {}); @@ -164,6 +204,23 @@ TEST_F(TestHashKernel, UniqueBoolean) { {false, true}, {}); } +TEST_F(TestHashKernel, CountValuesBoolean) { + CheckCountValues(&this->ctx_, boolean(), {true, true, false, true}, + {true, false, true, true}, {true, false}, {}, + {2, 1}); + + CheckCountValues(&this->ctx_, boolean(), {false, true, false, true}, + {true, false, true, true}, {false, true}, {}, + {2, 1}); + + // No nulls + CheckCountValues(&this->ctx_, boolean(), {true, true, false, true}, + {}, {true, false}, {}, {3, 1}); + + CheckCountValues(&this->ctx_, boolean(), {false, true, false, true}, + {}, {false, true}, {}, {2, 2}); +} + TEST_F(TestHashKernel, DictEncodeBoolean) { CheckDictEncode( &this->ctx_, boolean(), {true, true, false, true, false}, @@ -192,6 +249,16 @@ TEST_F(TestHashKernel, UniqueBinary) { {true, false, true, true}, {"test", "test2"}, {}); } +TEST_F(TestHashKernel, CountValuesBinary) { + CheckCountValues( + &this->ctx_, binary(), {"test", "", "test2", "test"}, {true, false, true, true}, + {"test", "test2"}, {}, {2, 1}); + + CheckCountValues( + &this->ctx_, utf8(), {"test", "", "test2", "test"}, {true, false, true, true}, + {"test", "test2"}, {}, {2, 1}); +} + TEST_F(TestHashKernel, DictEncodeBinary) { CheckDictEncode( &this->ctx_, binary(), {"test", "", "test2", "test", "baz"}, @@ -214,6 +281,7 @@ TEST_F(TestHashKernel, BinaryResizeTable) { vector values; vector uniques; vector indices; + vector counts; char buf[20] = "test"; for (int32_t i = 0; i < kTotalValues * kRepeats; i++) { @@ -224,6 +292,7 @@ TEST_F(TestHashKernel, BinaryResizeTable) { if (i < kTotalValues) { uniques.push_back(values.back()); + counts.push_back(kRepeats); } indices.push_back(index); } @@ -233,6 +302,8 @@ TEST_F(TestHashKernel, BinaryResizeTable) { indices); CheckUnique(&this->ctx_, utf8(), values, {}, uniques, {}); + CheckCountValues(&this->ctx_, type, values, {}, + uniques, {}, counts); CheckDictEncode(&this->ctx_, utf8(), values, {}, uniques, {}, indices); } @@ -291,6 +362,15 @@ TEST_F(TestHashKernel, UniqueDecimal) { {true, false, true, true}, expected, {}); } +TEST_F(TestHashKernel, CountValuesDecimal) { + vector values{12, 12, 11, 12}; + vector expected{12, 11}; + + CheckCountValues(&this->ctx_, decimal(2, 0), values, + {true, false, true, true}, expected, {}, + {2, 1}); +} + TEST_F(TestHashKernel, DictEncodeDecimal) { vector values{12, 12, 11, 12, 13}; vector expected{12, 11, 13}; @@ -311,6 +391,9 @@ TEST_F(TestHashKernel, ChunkedArrayInvoke) { vector dict_values = {"foo", "bar", "baz", "quuux"}; auto ex_dict = _MakeArray(type, dict_values, {}); + vector counts = {3, 2, 1, 1}; + auto ex_counts = _MakeArray(int64(), counts, {}); + ArrayVector arrays = {a1, a2}; auto carr = std::make_shared(arrays); @@ -329,6 +412,14 @@ TEST_F(TestHashKernel, ChunkedArrayInvoke) { std::make_shared(dict_type, i2)}; auto dict_carr = std::make_shared(dict_arrays); + // Unique counts + shared_ptr cv_uniques; + shared_ptr cv_counts; + ASSERT_OK(CountValues(&this->ctx_, Datum(carr), &cv_uniques, &cv_counts)); + ASSERT_ARRAYS_EQUAL(*ex_dict, *cv_uniques); + ASSERT_ARRAYS_EQUAL(*ex_counts, *cv_counts); + + // Dictionary encode Datum encoded_out; ASSERT_OK(DictionaryEncode(&this->ctx_, carr, &encoded_out)); ASSERT_EQ(Datum::CHUNKED_ARRAY, encoded_out.kind()); diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc index f443282b1b8..6f2ba1ca4d4 100644 --- a/cpp/src/arrow/compute/kernels/hash.cc +++ b/cpp/src/arrow/compute/kernels/hash.cc @@ -95,6 +95,50 @@ class UniqueAction : public ActionBase { std::shared_ptr out_type() const { return type_; } }; +// ---------------------------------------------------------------------- +// Count values implementation + +template +class CountValuesImpl : public HashTableKernel> { + public: + static constexpr bool allow_expand = true; + using Base = HashTableKernel; + + CountValuesImpl(const std::shared_ptr& type, MemoryPool* pool) + : Base(type, pool) {} + + Status Reserve(const int64_t length) { + counts_.reserve(length); + return Status::OK(); + } + + void ObserveNull() {} + + void ObserveFound(const hash_slot_t slot) { counts_[slot]++; } + + void ObserveNotFound(const hash_slot_t slot) { counts_.emplace_back(1); } + + Status DoubleSize() { return Base::DoubleTableSize(); } + + Status Flush(Datum* out) override { + Int64Builder builder(Base::pool_); + std::shared_ptr result; + + for (const int64_t value : counts_) { + RETURN_NOT_OK(builder.Append(value)); + } + + RETURN_NOT_OK(builder.FinishInternal(&result)); + out->value = std::move(result); + return Status::OK(); + } + + using Base::Append; + + private: + std::vector counts_; +}; + // ---------------------------------------------------------------------- // Dictionary encode implementation @@ -368,6 +412,48 @@ Status GetDictionaryEncodeKernel(FunctionContext* ctx, return Status::OK(); } +Status GetCountValuesKernel(FunctionContext* ctx, const std::shared_ptr& type, + std::unique_ptr* out) { + std::unique_ptr hasher; + +#define COUNT_VALUES_CASE(InType) \ + case InType::type_id: \ + hasher.reset(new CountValuesImpl(type, ctx->memory_pool())); \ + break + + switch (type->id()) { + COUNT_VALUES_CASE(NullType); + COUNT_VALUES_CASE(BooleanType); + COUNT_VALUES_CASE(UInt8Type); + COUNT_VALUES_CASE(Int8Type); + COUNT_VALUES_CASE(UInt16Type); + COUNT_VALUES_CASE(Int16Type); + COUNT_VALUES_CASE(UInt32Type); + COUNT_VALUES_CASE(Int32Type); + COUNT_VALUES_CASE(UInt64Type); + COUNT_VALUES_CASE(Int64Type); + COUNT_VALUES_CASE(FloatType); + COUNT_VALUES_CASE(DoubleType); + COUNT_VALUES_CASE(Date32Type); + COUNT_VALUES_CASE(Date64Type); + COUNT_VALUES_CASE(Time32Type); + COUNT_VALUES_CASE(Time64Type); + COUNT_VALUES_CASE(TimestampType); + COUNT_VALUES_CASE(BinaryType); + COUNT_VALUES_CASE(StringType); + COUNT_VALUES_CASE(FixedSizeBinaryType); + COUNT_VALUES_CASE(Decimal128Type); + default: + break; + } + +#undef COUNT_VALUES_CASE + + CHECK_IMPLEMENTED(hasher, "count-values", type); + out->reset(new HashKernelImpl(std::move(hasher))); + return Status::OK(); +} + namespace { Status InvokeHash(FunctionContext* ctx, HashKernel* func, const Datum& value, @@ -415,5 +501,18 @@ Status DictionaryEncode(FunctionContext* ctx, const Datum& value, Datum* out) { return Status::OK(); } +Status CountValues(FunctionContext* ctx, const Datum& value, + std::shared_ptr* out_uniques, + std::shared_ptr* out_counts) { + std::unique_ptr func; + RETURN_NOT_OK(GetCountValuesKernel(ctx, value.type(), &func)); + + std::vector counts_datum; + RETURN_NOT_OK(InvokeHash(ctx, func.get(), value, &counts_datum, out_uniques)); + + *out_counts = MakeArray(counts_datum.back().array()); + return Status::OK(); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/hash.h b/cpp/src/arrow/compute/kernels/hash.h index 6bbe3cfb447..e10ab6db8b5 100644 --- a/cpp/src/arrow/compute/kernels/hash.h +++ b/cpp/src/arrow/compute/kernels/hash.h @@ -56,6 +56,10 @@ Status GetDictionaryEncodeKernel(FunctionContext* ctx, const std::shared_ptr& type, std::unique_ptr* kernel); +ARROW_EXPORT +Status GetCountValuesKernel(FunctionContext* ctx, const std::shared_ptr& type, + std::unique_ptr* kernel); + /// \brief Compute unique elements from an array-like object /// \param[in] context the FunctionContext /// \param[in] datum array-like input @@ -76,6 +80,19 @@ Status Unique(FunctionContext* context, const Datum& datum, std::shared_ptr* out_uniques, + std::shared_ptr* out_counts); + // TODO(wesm): Define API for incremental dictionary encoding // TODO(wesm): Define API for regularizing DictionaryArray objects with From 57349f7ead52052a6f4f9ba49132ef9fbbc9bd2b Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 6 Feb 2019 23:10:39 -0800 Subject: [PATCH 02/20] unit tests passing --- cpp/src/arrow/array/builder_primitive.h | 4 + cpp/src/arrow/compute/kernels/hash-test.cc | 21 +++- cpp/src/arrow/compute/kernels/hash.cc | 106 ++++++++++++++------- cpp/src/arrow/compute/kernels/hash.h | 63 ++++-------- 4 files changed, 113 insertions(+), 81 deletions(-) diff --git a/cpp/src/arrow/array/builder_primitive.h b/cpp/src/arrow/array/builder_primitive.h index 5a9b69483af..18e7d27c0ff 100644 --- a/cpp/src/arrow/array/builder_primitive.h +++ b/cpp/src/arrow/array/builder_primitive.h @@ -92,6 +92,10 @@ class NumericBuilder : public ArrayBuilder { return ArrayBuilder::Resize(capacity); } + value_type& operator[](int64_t index) { + return reinterpret_cast(data_->mutable_data())[index]; + } + /// \brief Append a sequence of elements in one shot /// \param[in] values a contiguous C array of values /// \param[in] length the number of values to append diff --git a/cpp/src/arrow/compute/kernels/hash-test.cc b/cpp/src/arrow/compute/kernels/hash-test.cc index 640aeb3475d..098a06cfa65 100644 --- a/cpp/src/arrow/compute/kernels/hash-test.cc +++ b/cpp/src/arrow/compute/kernels/hash-test.cc @@ -298,12 +298,15 @@ TEST_F(TestHashKernel, BinaryResizeTable) { } CheckUnique(&this->ctx_, binary(), values, {}, uniques, {}); + CheckCountValues(&this->ctx_, binary(), values, {}, uniques, + {}, counts); + CheckDictEncode(&this->ctx_, binary(), values, {}, uniques, {}, indices); CheckUnique(&this->ctx_, utf8(), values, {}, uniques, {}); - CheckCountValues(&this->ctx_, type, values, {}, - uniques, {}, counts); + CheckCountValues(&this->ctx_, utf8(), values, {}, uniques, {}, + counts); CheckDictEncode(&this->ctx_, utf8(), values, {}, uniques, {}, indices); } @@ -380,6 +383,20 @@ TEST_F(TestHashKernel, DictEncodeDecimal) { {}, {0, 0, 1, 0, 2}); } +/* TODO(ARROW-XXXX): Determine if we wan to do something that is reproducable with floats. +TEST_F(TestHashKernel, CountValuesFloat) { + + // No nulls + CheckCountValues(&this->ctx_, float32(), {1.0f, 0.0f, -0.0f, +std::nan("1"), std::nan("2") }, + {}, {0.0f, 1.0f, std::nan("1")}, {}, {}); + + CheckCountValues(&this->ctx_, float64(), {1.0f, 0.0f, -0.0f, +std::nan("1"), std::nan("2") }, + {}, {0.0f, 1.0f, std::nan("1")}, {}, {}); +} +*/ + TEST_F(TestHashKernel, ChunkedArrayInvoke) { vector values1 = {"foo", "bar", "foo"}; vector values2 = {"bar", "baz", "quuux", "foo"}; diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc index 6f2ba1ca4d4..6cd3e1dcea6 100644 --- a/cpp/src/arrow/compute/kernels/hash.cc +++ b/cpp/src/arrow/compute/kernels/hash.cc @@ -93,54 +93,56 @@ class UniqueAction : public ActionBase { Status Flush(Datum* out) { return Status::OK(); } std::shared_ptr out_type() const { return type_; } + + Status FlushFinal(Datum* out) { return Status::OK(); } }; // ---------------------------------------------------------------------- -// Count values implementation +// Count values implementation (see HashKernel for description of methods) -template -class CountValuesImpl : public HashTableKernel> { +class CountValuesAction { public: - static constexpr bool allow_expand = true; - using Base = HashTableKernel; - - CountValuesImpl(const std::shared_ptr& type, MemoryPool* pool) - : Base(type, pool) {} + CountValuesAction(const std::shared_ptr& type, MemoryPool* pool) + : count_builder_(pool) {} Status Reserve(const int64_t length) { - counts_.reserve(length); + // builder size is independent of input array size. + return Status::OK(); + } + Status Reset() { + count_builder_.Reset(); return Status::OK(); } - void ObserveNull() {} - - void ObserveFound(const hash_slot_t slot) { counts_[slot]++; } - - void ObserveNotFound(const hash_slot_t slot) { counts_.emplace_back(1); } - - Status DoubleSize() { return Base::DoubleTableSize(); } + // Don't do anything on flush beceause we don't want to finalize the builder + // or incur the cost of memory copies. + Status Flush(Datum* out) { return Status::OK(); } - Status Flush(Datum* out) override { - Int64Builder builder(Base::pool_); + Status FlushFinal(Datum* out) { std::shared_ptr result; - - for (const int64_t value : counts_) { - RETURN_NOT_OK(builder.Append(value)); - } - - RETURN_NOT_OK(builder.FinishInternal(&result)); + RETURN_NOT_OK(count_builder_.FinishInternal(&result)); out->value = std::move(result); return Status::OK(); } - using Base::Append; + void ObserveNull() {} + + template + void ObserveFound(Index slot) { + count_builder_[slot]++; + } + + template + void ObserveNotFound(Index slot) { + count_builder_.Append(1); + } private: - std::vector counts_; + Int64Builder count_builder_; }; // ---------------------------------------------------------------------- -// Dictionary encode implementation +// Dictionary encode implementation (see HashKernel for description of methods) class DictEncodeAction : public ActionBase { public: @@ -174,11 +176,33 @@ class DictEncodeAction : public ActionBase { } std::shared_ptr out_type() const { return int32(); } + Status FlushFinal(Datum* out) { return Status::OK(); } private: Int32Builder indices_builder_; }; +/// \brief Invoke hash table kernel on input array, returning any output +/// values. Implementations should be thread-safe +/// +/// This interface is implemented below using visitor pattern on "Action" +/// implementations. It is not consolidate to keep the contract clearer. +class ARROW_EXPORT HashKernel : public UnaryKernel { + public: + // Reset for another run. + virtual Status Reset() = 0; + // Prepare the Action for the given input (e.g. reserve appropriately sized + // data structures) and visit the given input with Action. + virtual Status Append(FunctionContext* ctx, const ArrayData& input) = 0; + // Flush out accumulated results from the last invocation of Call. + virtual Status Flush(Datum* out) = 0; + // Flush out accumulated results across all invocations of Call. The kernel + // should not be used until after Reset() is called. + virtual Status FlushFinal(Datum* out) = 0; + // Get the values (keys) acummulated in the dictionary so far. + virtual Status GetDictionary(std::shared_ptr* out) = 0; +}; + // ---------------------------------------------------------------------- // Base class for all hash kernel implementations @@ -223,6 +247,8 @@ class RegularHashKernelImpl : public HashKernelImpl { Status Flush(Datum* out) override { return action_.Flush(out); } + Status FlushFinal(Datum* out) override { return action_.FlushFinal(out); } + Status GetDictionary(std::shared_ptr* out) override { return DictionaryTraits::GetDictionaryArrayData(pool_, type_, *memo_table_, 0 /* start_offset */, out); @@ -273,6 +299,7 @@ class NullHashKernelImpl : public HashKernelImpl { } Status Flush(Datum* out) override { return action_.Flush(out); } + Status FlushFinal(Datum* out) override { return action_.FlushFinal(out); } Status GetDictionary(std::shared_ptr* out) override { // TODO(wesm): handle null being a valid dictionary value @@ -414,11 +441,12 @@ Status GetDictionaryEncodeKernel(FunctionContext* ctx, Status GetCountValuesKernel(FunctionContext* ctx, const std::shared_ptr& type, std::unique_ptr* out) { - std::unique_ptr hasher; - -#define COUNT_VALUES_CASE(InType) \ - case InType::type_id: \ - hasher.reset(new CountValuesImpl(type, ctx->memory_pool())); \ + std::unique_ptr kernel; +#define COUNT_VALUES_CASE(InType) \ + case InType::type_id: \ + kernel.reset(new \ + typename HashKernelTraits::HashKernelImpl( \ + type, ctx->memory_pool())); \ break switch (type->id()) { @@ -449,8 +477,9 @@ Status GetCountValuesKernel(FunctionContext* ctx, const std::shared_ptrreset(new HashKernelImpl(std::move(hasher))); + CHECK_IMPLEMENTED(kernel, "count-values", type); + RETURN_NOT_OK(kernel->Reset()); + *out = std::move(kernel); return Status::OK(); } @@ -507,10 +536,13 @@ Status CountValues(FunctionContext* ctx, const Datum& value, std::unique_ptr func; RETURN_NOT_OK(GetCountValuesKernel(ctx, value.type(), &func)); - std::vector counts_datum; - RETURN_NOT_OK(InvokeHash(ctx, func.get(), value, &counts_datum, out_uniques)); + // Calls return nothing for counts. + std::vector unused_output; + RETURN_NOT_OK(InvokeHash(ctx, func.get(), value, &unused_output, out_uniques)); - *out_counts = MakeArray(counts_datum.back().array()); + Datum counts; + RETURN_NOT_OK(func->FlushFinal(&counts)); + *out_counts = MakeArray(counts.array()); return Status::OK(); } diff --git a/cpp/src/arrow/compute/kernels/hash.h b/cpp/src/arrow/compute/kernels/hash.h index e10ab6db8b5..08a84d50599 100644 --- a/cpp/src/arrow/compute/kernels/hash.h +++ b/cpp/src/arrow/compute/kernels/hash.h @@ -34,33 +34,10 @@ namespace compute { class FunctionContext; -/// \brief Invoke hash table kernel on input array, returning any output -/// values. Implementations should be thread-safe -class ARROW_EXPORT HashKernel : public UnaryKernel { - public: - // XXX why are those methods exposed? - virtual Status Reset() = 0; - virtual Status Append(FunctionContext* ctx, const ArrayData& input) = 0; - virtual Status Flush(Datum* out) = 0; - virtual Status GetDictionary(std::shared_ptr* out) = 0; -}; - -/// \since 0.8.0 -/// \note API not yet finalized -ARROW_EXPORT -Status GetUniqueKernel(FunctionContext* ctx, const std::shared_ptr& type, - std::unique_ptr* kernel); - -ARROW_EXPORT -Status GetDictionaryEncodeKernel(FunctionContext* ctx, - const std::shared_ptr& type, - std::unique_ptr* kernel); - -ARROW_EXPORT -Status GetCountValuesKernel(FunctionContext* ctx, const std::shared_ptr& type, - std::unique_ptr* kernel); - /// \brief Compute unique elements from an array-like object +/// +/// Note if a null occurs in the input it will NOT be included in the output. +/// /// \param[in] context the FunctionContext /// \param[in] datum array-like input /// \param[out] out result as Array @@ -70,29 +47,36 @@ Status GetCountValuesKernel(FunctionContext* ctx, const std::shared_ptr* out); -/// \brief Dictionary-encode values in an array-like object -/// \param[in] context the FunctionContext -/// \param[in] data array-like input -/// \param[out] out result with same shape and type as input +/// \brief Return counts of unique elements from an array-like object. +/// +/// Note that the counts do not include counts for nulls in the array. These can be +/// obtained separately from metadata. +/// +/// For floating point arrays there is no attempt to normalize -0.0, 0.0 and NaN values +/// which can lead to unexpected results if the input Array has these values. /// -/// \since 0.8.0 -/// \note API not yet finalized -ARROW_EXPORT -Status DictionaryEncode(FunctionContext* context, const Datum& data, Datum* out); - -/// \brief Return counts of unique elements from an array-like object /// \param[in] context the FunctionContext /// \param[in] value array-like input /// \param[out] out_uniques unique elements as Array /// \param[out] out_counts counts per element as Array, same shape as out_uniques /// -/// \since 0.10.0 +/// \since 0.13.0 /// \note API not yet finalized ARROW_EXPORT Status CountValues(FunctionContext* context, const Datum& value, std::shared_ptr* out_uniques, std::shared_ptr* out_counts); +/// \brief Dictionary-encode values in an array-like object +/// \param[in] context the FunctionContext +/// \param[in] data array-like input +/// \param[out] out result with same shape and type as input +/// +/// \since 0.8.0 +/// \note API not yet finalized +ARROW_EXPORT +Status DictionaryEncode(FunctionContext* context, const Datum& data, Datum* out); + // TODO(wesm): Define API for incremental dictionary encoding // TODO(wesm): Define API for regularizing DictionaryArray objects with @@ -117,11 +101,6 @@ Status CountValues(FunctionContext* context, const Datum& value, // Status IsIn(FunctionContext* context, const Datum& values, const Datum& member_set, // Datum* out); -// ARROW_EXPORT -// Status CountValues(FunctionContext* context, const Datum& values, -// std::shared_ptr* out_uniques, -// std::shared_ptr* out_counts); - } // namespace compute } // namespace arrow From 0dd007718b40e64ba67607248358db9847b819a1 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 7 Feb 2019 00:50:38 -0800 Subject: [PATCH 03/20] minimal test --- cpp/src/arrow/array-test.cc | 16 ++++++++++++++++ cpp/src/arrow/array/builder_primitive.h | 4 ++++ 2 files changed, 20 insertions(+) diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc index f6f66d27cfe..7a8b504253b 100644 --- a/cpp/src/arrow/array-test.cc +++ b/cpp/src/arrow/array-test.cc @@ -494,6 +494,22 @@ void TestPrimitiveBuilder::Check(const std::unique_ptr ASSERT_EQ(0, builder->null_count()); } +TEST(NumericBuilderAccessors, TestSettersGetters) { + int64_t datum = 42; + int64_t new_datum = 43; + NumericBuilder builder(int64(), default_memory_pool()); + + builder.Reset(); + ASSERT_OK(builder.Append(datum)); + ASSERT_EQ(builder.GetValue(0), datum); + + // Now update the value. + builder[0] = new_datum; + + ASSERT_EQ(builder.GetValue(0), new_datum); + ASSERT_EQ(((const NumericBuilder&)builder)[0], new_datum); +} + typedef ::testing::Types Primitives; diff --git a/cpp/src/arrow/array/builder_primitive.h b/cpp/src/arrow/array/builder_primitive.h index 18e7d27c0ff..a8f7531e08a 100644 --- a/cpp/src/arrow/array/builder_primitive.h +++ b/cpp/src/arrow/array/builder_primitive.h @@ -92,6 +92,10 @@ class NumericBuilder : public ArrayBuilder { return ArrayBuilder::Resize(capacity); } + value_type operator[](int64_t index) const { + return GetValue(index); + } + value_type& operator[](int64_t index) { return reinterpret_cast(data_->mutable_data())[index]; } From 96858bd520743574894237b4b8ab85ced6c1d451 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 7 Feb 2019 01:20:13 -0800 Subject: [PATCH 04/20] Use macro inversion to reduce boiler plate --- cpp/src/arrow/array-test.cc | 2 +- cpp/src/arrow/array/builder_primitive.h | 4 +- cpp/src/arrow/compute/kernels/hash.cc | 118 ++++++++---------------- 3 files changed, 42 insertions(+), 82 deletions(-) diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc index 7a8b504253b..9451e00b010 100644 --- a/cpp/src/arrow/array-test.cc +++ b/cpp/src/arrow/array-test.cc @@ -495,7 +495,7 @@ void TestPrimitiveBuilder::Check(const std::unique_ptr } TEST(NumericBuilderAccessors, TestSettersGetters) { - int64_t datum = 42; + int64_t datum = 42; int64_t new_datum = 43; NumericBuilder builder(int64(), default_memory_pool()); diff --git a/cpp/src/arrow/array/builder_primitive.h b/cpp/src/arrow/array/builder_primitive.h index a8f7531e08a..9e90367fa46 100644 --- a/cpp/src/arrow/array/builder_primitive.h +++ b/cpp/src/arrow/array/builder_primitive.h @@ -92,9 +92,7 @@ class NumericBuilder : public ArrayBuilder { return ArrayBuilder::Resize(capacity); } - value_type operator[](int64_t index) const { - return GetValue(index); - } + value_type operator[](int64_t index) const { return GetValue(index); } value_type& operator[](int64_t index) { return reinterpret_cast(data_->mutable_data())[index]; diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc index 6cd3e1dcea6..d4b87b870db 100644 --- a/cpp/src/arrow/compute/kernels/hash.cc +++ b/cpp/src/arrow/compute/kernels/hash.cc @@ -349,44 +349,45 @@ struct HashKernelTraits> { } // namespace +#define PROCESS_SUPPORTED_HASH_TYPES \ + PROCESS(NullType) \ + PROCESS(BooleanType) \ + PROCESS(UInt8Type) \ + PROCESS(Int8Type) \ + PROCESS(UInt16Type) \ + PROCESS(Int16Type) \ + PROCESS(UInt32Type) \ + PROCESS(Int32Type) \ + PROCESS(UInt64Type) \ + PROCESS(Int64Type) \ + PROCESS(FloatType) \ + PROCESS(DoubleType) \ + PROCESS(Date32Type) \ + PROCESS(Date64Type) \ + PROCESS(Time32Type) \ + PROCESS(Time64Type) \ + PROCESS(TimestampType) \ + PROCESS(BinaryType) \ + PROCESS(StringType) \ + PROCESS(FixedSizeBinaryType) \ + PROCESS(Decimal128Type) + Status GetUniqueKernel(FunctionContext* ctx, const std::shared_ptr& type, std::unique_ptr* out) { std::unique_ptr kernel; - -#define UNIQUE_CASE(InType) \ + switch (type->id()) { +#define PROCESS(InType) \ case InType::type_id: \ kernel.reset(new typename HashKernelTraits::HashKernelImpl( \ type, ctx->memory_pool())); \ - break + break; - switch (type->id()) { - UNIQUE_CASE(NullType); - UNIQUE_CASE(BooleanType); - UNIQUE_CASE(UInt8Type); - UNIQUE_CASE(Int8Type); - UNIQUE_CASE(UInt16Type); - UNIQUE_CASE(Int16Type); - UNIQUE_CASE(UInt32Type); - UNIQUE_CASE(Int32Type); - UNIQUE_CASE(UInt64Type); - UNIQUE_CASE(Int64Type); - UNIQUE_CASE(FloatType); - UNIQUE_CASE(DoubleType); - UNIQUE_CASE(Date32Type); - UNIQUE_CASE(Date64Type); - UNIQUE_CASE(Time32Type); - UNIQUE_CASE(Time64Type); - UNIQUE_CASE(TimestampType); - UNIQUE_CASE(BinaryType); - UNIQUE_CASE(StringType); - UNIQUE_CASE(FixedSizeBinaryType); - UNIQUE_CASE(Decimal128Type); + PROCESS_SUPPORTED_HASH_TYPES +#undef PROCESS default: break; } -#undef UNIQUE_CASE - CHECK_IMPLEMENTED(kernel, "unique", type); RETURN_NOT_OK(kernel->Reset()); *out = std::move(kernel); @@ -398,35 +399,16 @@ Status GetDictionaryEncodeKernel(FunctionContext* ctx, std::unique_ptr* out) { std::unique_ptr kernel; -#define DICTIONARY_ENCODE_CASE(InType) \ + switch (type->id()) { +#define PROCESS(InType) \ case InType::type_id: \ kernel.reset(new \ typename HashKernelTraits::HashKernelImpl( \ type, ctx->memory_pool())); \ - break + break; - switch (type->id()) { - DICTIONARY_ENCODE_CASE(NullType); - DICTIONARY_ENCODE_CASE(BooleanType); - DICTIONARY_ENCODE_CASE(UInt8Type); - DICTIONARY_ENCODE_CASE(Int8Type); - DICTIONARY_ENCODE_CASE(UInt16Type); - DICTIONARY_ENCODE_CASE(Int16Type); - DICTIONARY_ENCODE_CASE(UInt32Type); - DICTIONARY_ENCODE_CASE(Int32Type); - DICTIONARY_ENCODE_CASE(UInt64Type); - DICTIONARY_ENCODE_CASE(Int64Type); - DICTIONARY_ENCODE_CASE(FloatType); - DICTIONARY_ENCODE_CASE(DoubleType); - DICTIONARY_ENCODE_CASE(Date32Type); - DICTIONARY_ENCODE_CASE(Date64Type); - DICTIONARY_ENCODE_CASE(Time32Type); - DICTIONARY_ENCODE_CASE(Time64Type); - DICTIONARY_ENCODE_CASE(TimestampType); - DICTIONARY_ENCODE_CASE(BinaryType); - DICTIONARY_ENCODE_CASE(StringType); - DICTIONARY_ENCODE_CASE(FixedSizeBinaryType); - DICTIONARY_ENCODE_CASE(Decimal128Type); + PROCESS_SUPPORTED_HASH_TYPES +#undef PROCESS default: break; } @@ -442,41 +424,21 @@ Status GetDictionaryEncodeKernel(FunctionContext* ctx, Status GetCountValuesKernel(FunctionContext* ctx, const std::shared_ptr& type, std::unique_ptr* out) { std::unique_ptr kernel; -#define COUNT_VALUES_CASE(InType) \ + + switch (type->id()) { +#define PROCESS(InType) \ case InType::type_id: \ kernel.reset(new \ typename HashKernelTraits::HashKernelImpl( \ type, ctx->memory_pool())); \ - break + break; - switch (type->id()) { - COUNT_VALUES_CASE(NullType); - COUNT_VALUES_CASE(BooleanType); - COUNT_VALUES_CASE(UInt8Type); - COUNT_VALUES_CASE(Int8Type); - COUNT_VALUES_CASE(UInt16Type); - COUNT_VALUES_CASE(Int16Type); - COUNT_VALUES_CASE(UInt32Type); - COUNT_VALUES_CASE(Int32Type); - COUNT_VALUES_CASE(UInt64Type); - COUNT_VALUES_CASE(Int64Type); - COUNT_VALUES_CASE(FloatType); - COUNT_VALUES_CASE(DoubleType); - COUNT_VALUES_CASE(Date32Type); - COUNT_VALUES_CASE(Date64Type); - COUNT_VALUES_CASE(Time32Type); - COUNT_VALUES_CASE(Time64Type); - COUNT_VALUES_CASE(TimestampType); - COUNT_VALUES_CASE(BinaryType); - COUNT_VALUES_CASE(StringType); - COUNT_VALUES_CASE(FixedSizeBinaryType); - COUNT_VALUES_CASE(Decimal128Type); + PROCESS_SUPPORTED_HASH_TYPES +#undef PROCESS default: break; } -#undef COUNT_VALUES_CASE - CHECK_IMPLEMENTED(kernel, "count-values", type); RETURN_NOT_OK(kernel->Reset()); *out = std::move(kernel); @@ -545,6 +507,6 @@ Status CountValues(FunctionContext* ctx, const Datum& value, *out_counts = MakeArray(counts.array()); return Status::OK(); } - +#undef PROCESS_SUPPORTED_HASH_TYPES } // namespace compute } // namespace arrow From afeb1ad042fdee3103f0fc8f85be2e9232224e44 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 7 Feb 2019 01:32:56 -0800 Subject: [PATCH 05/20] add real jira --- cpp/src/arrow/compute/kernels/hash-test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/kernels/hash-test.cc b/cpp/src/arrow/compute/kernels/hash-test.cc index 098a06cfa65..1cfeb6e4394 100644 --- a/cpp/src/arrow/compute/kernels/hash-test.cc +++ b/cpp/src/arrow/compute/kernels/hash-test.cc @@ -383,7 +383,7 @@ TEST_F(TestHashKernel, DictEncodeDecimal) { {}, {0, 0, 1, 0, 2}); } -/* TODO(ARROW-XXXX): Determine if we wan to do something that is reproducable with floats. +/* TODO(ARROW-4124): Determine if we wan to do something that is reproducable with floats. TEST_F(TestHashKernel, CountValuesFloat) { // No nulls From 0152f2fa5c80e1dbf3833761448302803b63e335 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 7 Feb 2019 02:14:36 -0800 Subject: [PATCH 06/20] plumb through status on hash visitors --- cpp/src/arrow/compute/kernels/hash.cc | 47 ++++++++++++++++----------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc index d4b87b870db..56746b8d3bb 100644 --- a/cpp/src/arrow/compute/kernels/hash.cc +++ b/cpp/src/arrow/compute/kernels/hash.cc @@ -82,13 +82,17 @@ class UniqueAction : public ActionBase { Status Reserve(const int64_t length) { return Status::OK(); } - void ObserveNull() {} + Status ObserveNull() { return Status::OK(); } template - void ObserveFound(Index index) {} + Status ObserveFound(Index index) { + return Status::OK(); + } template - void ObserveNotFound(Index index) {} + Status ObserveNotFound(Index index) { + return Status::OK(); + } Status Flush(Datum* out) { return Status::OK(); } @@ -125,16 +129,17 @@ class CountValuesAction { return Status::OK(); } - void ObserveNull() {} + Status ObserveNull() { return Status::OK(); } template - void ObserveFound(Index slot) { + Status ObserveFound(Index slot) { count_builder_[slot]++; + return Status::OK(); } template - void ObserveNotFound(Index slot) { - count_builder_.Append(1); + Status ObserveNotFound(Index slot) { + return count_builder_.Append(1); } private: @@ -156,15 +161,19 @@ class DictEncodeAction : public ActionBase { Status Reserve(const int64_t length) { return indices_builder_.Reserve(length); } - void ObserveNull() { indices_builder_.UnsafeAppendNull(); } + Status ObserveNull() { + indices_builder_.UnsafeAppendNull(); + return Status::OK(); + } template - void ObserveFound(Index index) { + Status ObserveFound(Index index) { indices_builder_.UnsafeAppend(index); + return Status::OK(); } template - void ObserveNotFound(Index index) { + Status ObserveNotFound(Index index) { return ObserveFound(index); } @@ -254,18 +263,18 @@ class RegularHashKernelImpl : public HashKernelImpl { 0 /* start_offset */, out); } - Status VisitNull() { - action_.ObserveNull(); - return Status::OK(); - } + Status VisitNull() { return action_.ObserveNull(); } Status VisitValue(const Scalar& value) { - auto on_found = [this](int32_t memo_index) { action_.ObserveFound(memo_index); }; - auto on_not_found = [this](int32_t memo_index) { - action_.ObserveNotFound(memo_index); + Status status; + auto on_found = [this, &status](int32_t memo_index) { + status = action_.ObserveFound(memo_index); + }; + auto on_not_found = [this, &status](int32_t memo_index) { + status = action_.ObserveNotFound(memo_index); }; memo_table_->GetOrInsert(value, on_found, on_not_found); - return Status::OK(); + return status; } std::shared_ptr out_type() const override { return action_.out_type(); } @@ -293,7 +302,7 @@ class NullHashKernelImpl : public HashKernelImpl { Status Append(const ArrayData& arr) override { RETURN_NOT_OK(action_.Reserve(arr.length)); for (int64_t i = 0; i < arr.length; ++i) { - action_.ObserveNull(); + RETURN_NOT_OK(action_.ObserveNull()); } return Status::OK(); } From defb4f1a15dea35ef632a7b7da22155c8f5f7d87 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 7 Feb 2019 02:16:23 -0800 Subject: [PATCH 07/20] remove export from .cc --- cpp/src/arrow/compute/kernels/hash.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc index 56746b8d3bb..b9fb4de3084 100644 --- a/cpp/src/arrow/compute/kernels/hash.cc +++ b/cpp/src/arrow/compute/kernels/hash.cc @@ -196,7 +196,7 @@ class DictEncodeAction : public ActionBase { /// /// This interface is implemented below using visitor pattern on "Action" /// implementations. It is not consolidate to keep the contract clearer. -class ARROW_EXPORT HashKernel : public UnaryKernel { +class HashKernel : public UnaryKernel { public: // Reset for another run. virtual Status Reset() = 0; From e8e58a5b9a0404ea05e3e140a7a9391e50a2436b Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Mon, 11 Feb 2019 22:55:03 -0800 Subject: [PATCH 08/20] Address output type code review feedback --- cpp/src/arrow/compute/kernels/hash-test.cc | 67 +++++++++++----------- cpp/src/arrow/compute/kernels/hash.cc | 33 ++++++----- cpp/src/arrow/compute/kernels/hash.h | 13 +---- 3 files changed, 57 insertions(+), 56 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash-test.cc b/cpp/src/arrow/compute/kernels/hash-test.cc index 1cfeb6e4394..3f87c32b101 100644 --- a/cpp/src/arrow/compute/kernels/hash-test.cc +++ b/cpp/src/arrow/compute/kernels/hash-test.cc @@ -65,7 +65,7 @@ void CheckUnique(FunctionContext* ctx, const shared_ptr& type, } template -void CheckCountValues(FunctionContext* ctx, const shared_ptr& type, +void CheckValueCounts(FunctionContext* ctx, const shared_ptr& type, const vector& in_values, const vector& in_is_valid, const vector& out_values, const vector& out_is_valid, const vector& out_counts) { @@ -74,11 +74,11 @@ void CheckCountValues(FunctionContext* ctx, const shared_ptr& type, shared_ptr ex_counts = _MakeArray(int64(), out_counts, out_is_valid); - shared_ptr result_values; - shared_ptr result_counts; - ASSERT_OK(CountValues(ctx, Datum(input), &result_values, &result_counts)); - ASSERT_ARRAYS_EQUAL(*ex_values, *result_values); - ASSERT_ARRAYS_EQUAL(*ex_counts, *result_counts); + shared_ptr result; + ASSERT_OK(ValueCounts(ctx, Datum(input), &result)); + auto result_struct = std::dynamic_pointer_cast(result); + ASSERT_ARRAYS_EQUAL(*ex_values, *result_struct->GetFieldByName("Values")); + ASSERT_ARRAYS_EQUAL(*ex_counts, *result_struct->GetFieldByName("Counts")); } template @@ -121,12 +121,13 @@ TYPED_TEST(TestHashKernelPrimitive, Unique) { {3, 1}, {}); } -TYPED_TEST(TestHashKernelPrimitive, CountValues) { +TYPED_TEST(TestHashKernelPrimitive, ValueCounts) { using T = typename TypeParam::c_type; auto type = TypeTraits::type_singleton(); - CheckCountValues(&this->ctx_, type, {2, 1, 2, 1, 2, 3, 4}, + CheckValueCounts(&this->ctx_, type, {2, 1, 2, 1, 2, 3, 4}, {true, false, true, true, true, true, false}, {2, 1, 3}, {}, {3, 1, 1}); + CheckValueCounts(&this->ctx_, type, {}, {}, {}, {}, {}); } TYPED_TEST(TestHashKernelPrimitive, DictEncode) { @@ -160,7 +161,7 @@ TYPED_TEST(TestHashKernelPrimitive, PrimitiveResizeTable) { auto type = TypeTraits::type_singleton(); CheckUnique(&this->ctx_, type, values, {}, uniques, {}); - CheckCountValues(&this->ctx_, type, values, {}, uniques, {}, counts); + CheckValueCounts(&this->ctx_, type, values, {}, uniques, {}, counts); CheckDictEncode(&this->ctx_, type, values, {}, uniques, {}, indices); } @@ -176,15 +177,15 @@ TEST_F(TestHashKernel, UniqueTimeTimestamp) { {}); } -TEST_F(TestHashKernel, CountValuesTimeTimestamp) { - CheckCountValues(&this->ctx_, time32(TimeUnit::SECOND), +TEST_F(TestHashKernel, ValueCountsTimeTimestamp) { + CheckValueCounts(&this->ctx_, time32(TimeUnit::SECOND), {2, 1, 2, 1}, {true, false, true, true}, {2, 1}, {}, {2, 1}); - CheckCountValues(&this->ctx_, time64(TimeUnit::NANO), {2, 1, 2, 1}, + CheckValueCounts(&this->ctx_, time64(TimeUnit::NANO), {2, 1, 2, 1}, {true, false, true, true}, {2, 1}, {}, {2, 1}); - CheckCountValues(&this->ctx_, timestamp(TimeUnit::NANO), + CheckValueCounts(&this->ctx_, timestamp(TimeUnit::NANO), {2, 1, 2, 1}, {true, false, true, true}, {2, 1}, {}, {2, 1}); } @@ -204,20 +205,20 @@ TEST_F(TestHashKernel, UniqueBoolean) { {false, true}, {}); } -TEST_F(TestHashKernel, CountValuesBoolean) { - CheckCountValues(&this->ctx_, boolean(), {true, true, false, true}, +TEST_F(TestHashKernel, ValueCountsBoolean) { + CheckValueCounts(&this->ctx_, boolean(), {true, true, false, true}, {true, false, true, true}, {true, false}, {}, {2, 1}); - CheckCountValues(&this->ctx_, boolean(), {false, true, false, true}, + CheckValueCounts(&this->ctx_, boolean(), {false, true, false, true}, {true, false, true, true}, {false, true}, {}, {2, 1}); // No nulls - CheckCountValues(&this->ctx_, boolean(), {true, true, false, true}, + CheckValueCounts(&this->ctx_, boolean(), {true, true, false, true}, {}, {true, false}, {}, {3, 1}); - CheckCountValues(&this->ctx_, boolean(), {false, true, false, true}, + CheckValueCounts(&this->ctx_, boolean(), {false, true, false, true}, {}, {false, true}, {}, {2, 2}); } @@ -249,12 +250,12 @@ TEST_F(TestHashKernel, UniqueBinary) { {true, false, true, true}, {"test", "test2"}, {}); } -TEST_F(TestHashKernel, CountValuesBinary) { - CheckCountValues( +TEST_F(TestHashKernel, ValueCountsBinary) { + CheckValueCounts( &this->ctx_, binary(), {"test", "", "test2", "test"}, {true, false, true, true}, {"test", "test2"}, {}, {2, 1}); - CheckCountValues( + CheckValueCounts( &this->ctx_, utf8(), {"test", "", "test2", "test"}, {true, false, true, true}, {"test", "test2"}, {}, {2, 1}); } @@ -298,14 +299,14 @@ TEST_F(TestHashKernel, BinaryResizeTable) { } CheckUnique(&this->ctx_, binary(), values, {}, uniques, {}); - CheckCountValues(&this->ctx_, binary(), values, {}, uniques, + CheckValueCounts(&this->ctx_, binary(), values, {}, uniques, {}, counts); CheckDictEncode(&this->ctx_, binary(), values, {}, uniques, {}, indices); CheckUnique(&this->ctx_, utf8(), values, {}, uniques, {}); - CheckCountValues(&this->ctx_, utf8(), values, {}, uniques, {}, + CheckValueCounts(&this->ctx_, utf8(), values, {}, uniques, {}, counts); CheckDictEncode(&this->ctx_, utf8(), values, {}, uniques, {}, indices); @@ -365,11 +366,11 @@ TEST_F(TestHashKernel, UniqueDecimal) { {true, false, true, true}, expected, {}); } -TEST_F(TestHashKernel, CountValuesDecimal) { +TEST_F(TestHashKernel, ValueCountsDecimal) { vector values{12, 12, 11, 12}; vector expected{12, 11}; - CheckCountValues(&this->ctx_, decimal(2, 0), values, + CheckValueCounts(&this->ctx_, decimal(2, 0), values, {true, false, true, true}, expected, {}, {2, 1}); } @@ -384,14 +385,14 @@ TEST_F(TestHashKernel, DictEncodeDecimal) { } /* TODO(ARROW-4124): Determine if we wan to do something that is reproducable with floats. -TEST_F(TestHashKernel, CountValuesFloat) { +TEST_F(TestHashKernel, ValueCountsFloat) { // No nulls - CheckCountValues(&this->ctx_, float32(), {1.0f, 0.0f, -0.0f, + CheckValueCounts(&this->ctx_, float32(), {1.0f, 0.0f, -0.0f, std::nan("1"), std::nan("2") }, {}, {0.0f, 1.0f, std::nan("1")}, {}, {}); - CheckCountValues(&this->ctx_, float64(), {1.0f, 0.0f, -0.0f, + CheckValueCounts(&this->ctx_, float64(), {1.0f, 0.0f, -0.0f, std::nan("1"), std::nan("2") }, {}, {0.0f, 1.0f, std::nan("1")}, {}, {}); } @@ -430,11 +431,11 @@ TEST_F(TestHashKernel, ChunkedArrayInvoke) { auto dict_carr = std::make_shared(dict_arrays); // Unique counts - shared_ptr cv_uniques; - shared_ptr cv_counts; - ASSERT_OK(CountValues(&this->ctx_, Datum(carr), &cv_uniques, &cv_counts)); - ASSERT_ARRAYS_EQUAL(*ex_dict, *cv_uniques); - ASSERT_ARRAYS_EQUAL(*ex_counts, *cv_counts); + shared_ptr counts_array; + ASSERT_OK(ValueCounts(&this->ctx_, Datum(carr), &counts_array)); + auto counts_struct = std::dynamic_pointer_cast(counts_array); + ASSERT_ARRAYS_EQUAL(*ex_dict, *counts_struct->field(0)); + ASSERT_ARRAYS_EQUAL(*ex_counts, *counts_struct->field(1)); // Dictionary encode Datum encoded_out; diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc index b9fb4de3084..5ce42ff836f 100644 --- a/cpp/src/arrow/compute/kernels/hash.cc +++ b/cpp/src/arrow/compute/kernels/hash.cc @@ -104,9 +104,9 @@ class UniqueAction : public ActionBase { // ---------------------------------------------------------------------- // Count values implementation (see HashKernel for description of methods) -class CountValuesAction { +class ValueCountsAction { public: - CountValuesAction(const std::shared_ptr& type, MemoryPool* pool) + ValueCountsAction(const std::shared_ptr& type, MemoryPool* pool) : count_builder_(pool) {} Status Reserve(const int64_t length) { @@ -122,6 +122,7 @@ class CountValuesAction { // or incur the cost of memory copies. Status Flush(Datum* out) { return Status::OK(); } + // Return the counts corresponding the MemoTable keys. Status FlushFinal(Datum* out) { std::shared_ptr result; RETURN_NOT_OK(count_builder_.FinishInternal(&result)); @@ -430,7 +431,7 @@ Status GetDictionaryEncodeKernel(FunctionContext* ctx, return Status::OK(); } -Status GetCountValuesKernel(FunctionContext* ctx, const std::shared_ptr& type, +Status GetValueCountsKernel(FunctionContext* ctx, const std::shared_ptr& type, std::unique_ptr* out) { std::unique_ptr kernel; @@ -438,7 +439,7 @@ Status GetCountValuesKernel(FunctionContext* ctx, const std::shared_ptr::HashKernelImpl( \ + typename HashKernelTraits::HashKernelImpl( \ type, ctx->memory_pool())); \ break; @@ -501,19 +502,25 @@ Status DictionaryEncode(FunctionContext* ctx, const Datum& value, Datum* out) { return Status::OK(); } -Status CountValues(FunctionContext* ctx, const Datum& value, - std::shared_ptr* out_uniques, - std::shared_ptr* out_counts) { +Status ValueCounts(FunctionContext* ctx, const Datum& value, + std::shared_ptr* counts) { std::unique_ptr func; - RETURN_NOT_OK(GetCountValuesKernel(ctx, value.type(), &func)); + RETURN_NOT_OK(GetValueCountsKernel(ctx, value.type(), &func)); // Calls return nothing for counts. std::vector unused_output; - RETURN_NOT_OK(InvokeHash(ctx, func.get(), value, &unused_output, out_uniques)); - - Datum counts; - RETURN_NOT_OK(func->FlushFinal(&counts)); - *out_counts = MakeArray(counts.array()); + std::shared_ptr uniques; + RETURN_NOT_OK(InvokeHash(ctx, func.get(), value, &unused_output, &uniques)); + + Datum value_counts; + RETURN_NOT_OK(func->FlushFinal(&value_counts)); + + auto data_type = std::make_shared(std::vector>{ + std::make_shared("Values", uniques->type()), + std::make_shared("Counts", int64())}); + *counts = std::make_shared( + data_type, uniques->length(), + std::vector>{uniques, MakeArray(value_counts.array())}); return Status::OK(); } #undef PROCESS_SUPPORTED_HASH_TYPES diff --git a/cpp/src/arrow/compute/kernels/hash.h b/cpp/src/arrow/compute/kernels/hash.h index 08a84d50599..6a6f6e7f353 100644 --- a/cpp/src/arrow/compute/kernels/hash.h +++ b/cpp/src/arrow/compute/kernels/hash.h @@ -57,15 +57,13 @@ Status Unique(FunctionContext* context, const Datum& datum, std::shared_ptr structs. /// /// \since 0.13.0 /// \note API not yet finalized ARROW_EXPORT -Status CountValues(FunctionContext* context, const Datum& value, - std::shared_ptr* out_uniques, - std::shared_ptr* out_counts); +Status ValueCounts(FunctionContext* context, const Datum& value, + std::shared_ptr* counts); /// \brief Dictionary-encode values in an array-like object /// \param[in] context the FunctionContext @@ -82,11 +80,6 @@ Status DictionaryEncode(FunctionContext* context, const Datum& data, Datum* out) // TODO(wesm): Define API for regularizing DictionaryArray objects with // different dictionaries -// class DictionaryEncoder { -// public: -// virtual Encode(const Datum& data, Datum* out) = 0; -// }; - // // ARROW_EXPORT // Status DictionaryEncode(FunctionContext* context, const Datum& data, From f964bd6da18319bae5567e1f2f377e5bd7d0659a Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 19 Feb 2019 22:35:39 -0800 Subject: [PATCH 09/20] Rebase --- cpp/src/arrow/array/builder_primitive.h | 2 +- cpp/src/arrow/compute/kernels/hash.cc | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/array/builder_primitive.h b/cpp/src/arrow/array/builder_primitive.h index 9e90367fa46..95cfaa793d6 100644 --- a/cpp/src/arrow/array/builder_primitive.h +++ b/cpp/src/arrow/array/builder_primitive.h @@ -95,7 +95,7 @@ class NumericBuilder : public ArrayBuilder { value_type operator[](int64_t index) const { return GetValue(index); } value_type& operator[](int64_t index) { - return reinterpret_cast(data_->mutable_data())[index]; + return reinterpret_cast(data_builder_.mutable_data())[index]; } /// \brief Append a sequence of elements in one shot diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc index 5ce42ff836f..73a54aa7c20 100644 --- a/cpp/src/arrow/compute/kernels/hash.cc +++ b/cpp/src/arrow/compute/kernels/hash.cc @@ -104,10 +104,12 @@ class UniqueAction : public ActionBase { // ---------------------------------------------------------------------- // Count values implementation (see HashKernel for description of methods) -class ValueCountsAction { +class ValueCountsAction : ActionBase { public: + using ActionBase::ActionBase; + ValueCountsAction(const std::shared_ptr& type, MemoryPool* pool) - : count_builder_(pool) {} + : ActionBase(type, pool), count_builder_(pool) {} Status Reserve(const int64_t length) { // builder size is independent of input array size. @@ -122,6 +124,8 @@ class ValueCountsAction { // or incur the cost of memory copies. Status Flush(Datum* out) { return Status::OK(); } + std::shared_ptr out_type() const { return type_; } + // Return the counts corresponding the MemoTable keys. Status FlushFinal(Datum* out) { std::shared_ptr result; From b7d54929a4b9554f90005f180e2374763131c905 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 19 Feb 2019 23:47:41 -0800 Subject: [PATCH 10/20] add null test --- cpp/src/arrow/compute/kernels/boolean-test.cc | 1 - cpp/src/arrow/compute/kernels/hash-test.cc | 24 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/kernels/boolean-test.cc b/cpp/src/arrow/compute/kernels/boolean-test.cc index 439e0db9b6f..5e1da1be657 100644 --- a/cpp/src/arrow/compute/kernels/boolean-test.cc +++ b/cpp/src/arrow/compute/kernels/boolean-test.cc @@ -129,7 +129,6 @@ TEST_F(TestBooleanKernel, Invert) { } TEST_F(TestBooleanKernel, InvertEmptyArray) { - auto type = boolean(); std::vector> data_buffers(2); Datum input; input.value = ArrayData::Make(boolean(), 0 /* length */, std::move(data_buffers), diff --git a/cpp/src/arrow/compute/kernels/hash-test.cc b/cpp/src/arrow/compute/kernels/hash-test.cc index 3f87c32b101..123480be3e4 100644 --- a/cpp/src/arrow/compute/kernels/hash-test.cc +++ b/cpp/src/arrow/compute/kernels/hash-test.cc @@ -61,9 +61,29 @@ void CheckUnique(FunctionContext* ctx, const shared_ptr& type, shared_ptr result; ASSERT_OK(Unique(ctx, input, &result)); + // TODO: We probably shouldn't rely on array ordering. ASSERT_ARRAYS_EQUAL(*expected, *result); } +template +void CheckValueCountsNull(FunctionContext* ctx, const shared_ptr& type) { + std::vector> data_buffers(2); + Datum input; + input.value = ArrayData::Make(type, 0 /* length */, std::move(data_buffers), + 0 /* null_count */); + + shared_ptr ex_values = _MakeArray(type, {}, {}); + shared_ptr ex_counts = _MakeArray(int64(), {}, {}); + + shared_ptr result; + ASSERT_OK(ValueCounts(ctx, input, &result)); + auto result_struct = std::dynamic_pointer_cast(result); + // TODO: We probably shouldn't rely on value ordering. + ASSERT_ARRAYS_EQUAL(*ex_values, *result_struct->GetFieldByName("Values")); + ASSERT_ARRAYS_EQUAL(*ex_counts, *result_struct->GetFieldByName("Counts")); +} + + template void CheckValueCounts(FunctionContext* ctx, const shared_ptr& type, const vector& in_values, const vector& in_is_valid, @@ -77,6 +97,7 @@ void CheckValueCounts(FunctionContext* ctx, const shared_ptr& type, shared_ptr result; ASSERT_OK(ValueCounts(ctx, Datum(input), &result)); auto result_struct = std::dynamic_pointer_cast(result); + // TODO: We probably shouldn't rely on value ordering. ASSERT_ARRAYS_EQUAL(*ex_values, *result_struct->GetFieldByName("Values")); ASSERT_ARRAYS_EQUAL(*ex_counts, *result_struct->GetFieldByName("Counts")); } @@ -128,8 +149,11 @@ TYPED_TEST(TestHashKernelPrimitive, ValueCounts) { {true, false, true, true, true, true, false}, {2, 1, 3}, {}, {3, 1, 1}); CheckValueCounts(&this->ctx_, type, {}, {}, {}, {}, {}); + CheckValueCountsNull(&this->ctx_, type); } + + TYPED_TEST(TestHashKernelPrimitive, DictEncode) { using T = typename TypeParam::c_type; auto type = TypeTraits::type_singleton(); From 8c26b01549c6fd4555d3772ef1b2a8c5c2912766 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 20 Feb 2019 01:53:12 -0800 Subject: [PATCH 11/20] fix format --- cpp/src/arrow/compute/kernels/hash-test.cc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash-test.cc b/cpp/src/arrow/compute/kernels/hash-test.cc index 123480be3e4..fef5079a537 100644 --- a/cpp/src/arrow/compute/kernels/hash-test.cc +++ b/cpp/src/arrow/compute/kernels/hash-test.cc @@ -69,8 +69,8 @@ template void CheckValueCountsNull(FunctionContext* ctx, const shared_ptr& type) { std::vector> data_buffers(2); Datum input; - input.value = ArrayData::Make(type, 0 /* length */, std::move(data_buffers), - 0 /* null_count */); + input.value = + ArrayData::Make(type, 0 /* length */, std::move(data_buffers), 0 /* null_count */); shared_ptr ex_values = _MakeArray(type, {}, {}); shared_ptr ex_counts = _MakeArray(int64(), {}, {}); @@ -83,7 +83,6 @@ void CheckValueCountsNull(FunctionContext* ctx, const shared_ptr& type ASSERT_ARRAYS_EQUAL(*ex_counts, *result_struct->GetFieldByName("Counts")); } - template void CheckValueCounts(FunctionContext* ctx, const shared_ptr& type, const vector& in_values, const vector& in_is_valid, @@ -152,8 +151,6 @@ TYPED_TEST(TestHashKernelPrimitive, ValueCounts) { CheckValueCountsNull(&this->ctx_, type); } - - TYPED_TEST(TestHashKernelPrimitive, DictEncode) { using T = typename TypeParam::c_type; auto type = TypeTraits::type_singleton(); From d99e52fb7f25d8285b1920cf1670cb7ff5a5b96b Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Mon, 25 Feb 2019 23:12:16 -0800 Subject: [PATCH 12/20] add guard to CopyValue in cases where vector is empty --- cpp/src/arrow/util/hashing.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h index 3dde0beeb19..0cff7593740 100644 --- a/cpp/src/arrow/util/hashing.h +++ b/cpp/src/arrow/util/hashing.h @@ -473,7 +473,9 @@ class SmallScalarMemoTable { // Copy values starting from index `start` into `out_data` void CopyValues(int32_t start, Scalar* out_data) const { - memcpy(out_data, &index_to_value_[start], size() - start); + if (index_to_value_.size() > 0) { + memcpy(out_data, &index_to_value_[start], size() - start); + } } void CopyValues(Scalar* out_data) const { CopyValues(0, out_data); } From c6f6ad72f5ab4befbb56e60034e6f015ddbfabb9 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 27 Feb 2019 22:12:49 -0800 Subject: [PATCH 13/20] address feedback --- cpp/src/arrow/util/hashing.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h index 0cff7593740..eee09f6f043 100644 --- a/cpp/src/arrow/util/hashing.h +++ b/cpp/src/arrow/util/hashing.h @@ -473,9 +473,9 @@ class SmallScalarMemoTable { // Copy values starting from index `start` into `out_data` void CopyValues(int32_t start, Scalar* out_data) const { - if (index_to_value_.size() > 0) { - memcpy(out_data, &index_to_value_[start], size() - start); - } + DCHECK_GE(start, 0); + DCHECK_LE(start, index_to_value_.size()); + std::copy(index_to_value_.begin() + start, index_to_value_.end(), out_data); } void CopyValues(Scalar* out_data) const { CopyValues(0, out_data); } From 4770d9924a6b710fa9cad90ff423f9769ef4305b Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 27 Feb 2019 23:31:38 -0800 Subject: [PATCH 14/20] fix warning --- cpp/src/arrow/util/hashing.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h index eee09f6f043..c2f541bf64f 100644 --- a/cpp/src/arrow/util/hashing.h +++ b/cpp/src/arrow/util/hashing.h @@ -474,7 +474,7 @@ class SmallScalarMemoTable { // Copy values starting from index `start` into `out_data` void CopyValues(int32_t start, Scalar* out_data) const { DCHECK_GE(start, 0); - DCHECK_LE(start, index_to_value_.size()); + DCHECK_LE(static_cast(start), index_to_value_.size()); std::copy(index_to_value_.begin() + start, index_to_value_.end(), out_data); } From e7e624b1fa1d757d295f4588c0b14a6e5cd8f55c Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Fri, 1 Mar 2019 21:11:25 -0800 Subject: [PATCH 15/20] Add constants, per code review --- cpp/src/arrow/compute/kernels/hash-test.cc | 8 ++++---- cpp/src/arrow/compute/kernels/hash.cc | 4 ++++ cpp/src/arrow/compute/kernels/hash.h | 5 +++++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash-test.cc b/cpp/src/arrow/compute/kernels/hash-test.cc index fef5079a537..6d241ff31a4 100644 --- a/cpp/src/arrow/compute/kernels/hash-test.cc +++ b/cpp/src/arrow/compute/kernels/hash-test.cc @@ -79,8 +79,8 @@ void CheckValueCountsNull(FunctionContext* ctx, const shared_ptr& type ASSERT_OK(ValueCounts(ctx, input, &result)); auto result_struct = std::dynamic_pointer_cast(result); // TODO: We probably shouldn't rely on value ordering. - ASSERT_ARRAYS_EQUAL(*ex_values, *result_struct->GetFieldByName("Values")); - ASSERT_ARRAYS_EQUAL(*ex_counts, *result_struct->GetFieldByName("Counts")); + ASSERT_ARRAYS_EQUAL(*ex_values, *result_struct->GetFieldByName(kValuesFieldName)); + ASSERT_ARRAYS_EQUAL(*ex_counts, *result_struct->GetFieldByName(kCountsFieldName)); } template @@ -97,8 +97,8 @@ void CheckValueCounts(FunctionContext* ctx, const shared_ptr& type, ASSERT_OK(ValueCounts(ctx, Datum(input), &result)); auto result_struct = std::dynamic_pointer_cast(result); // TODO: We probably shouldn't rely on value ordering. - ASSERT_ARRAYS_EQUAL(*ex_values, *result_struct->GetFieldByName("Values")); - ASSERT_ARRAYS_EQUAL(*ex_counts, *result_struct->GetFieldByName("Counts")); + ASSERT_ARRAYS_EQUAL(*ex_values, *result_struct->field(kValuesFieldIndex)); + ASSERT_ARRAYS_EQUAL(*ex_counts, *result_struct->field(kCountsFieldIndex)); } template diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc index 73a54aa7c20..4463a390a3d 100644 --- a/cpp/src/arrow/compute/kernels/hash.cc +++ b/cpp/src/arrow/compute/kernels/hash.cc @@ -506,6 +506,10 @@ Status DictionaryEncode(FunctionContext* ctx, const Datum& value, Datum* out) { return Status::OK(); } +const char kValuesFieldName[] = "Values"; +const char kCountsFieldName[] = "Counts"; +const int32_t kValuesFieldIndex = 0; +const int32_t kCountsFieldIndex = 1; Status ValueCounts(FunctionContext* ctx, const Datum& value, std::shared_ptr* counts) { std::unique_ptr func; diff --git a/cpp/src/arrow/compute/kernels/hash.h b/cpp/src/arrow/compute/kernels/hash.h index 6a6f6e7f353..edc7c493e46 100644 --- a/cpp/src/arrow/compute/kernels/hash.h +++ b/cpp/src/arrow/compute/kernels/hash.h @@ -47,6 +47,11 @@ class FunctionContext; ARROW_EXPORT Status Unique(FunctionContext* context, const Datum& datum, std::shared_ptr* out); +// Constants for accessing the output of ValueCounts +ARROW_EXPORT extern const char kValuesFieldName[]; +ARROW_EXPORT extern const char kCountsFieldName[]; +ARROW_EXPORT extern const int32_t kValuesFieldIndex; +ARROW_EXPORT extern const int32_t kCountsFieldIndex; /// \brief Return counts of unique elements from an array-like object. /// /// Note that the counts do not include counts for nulls in the array. These can be From 2973fccbe4a1ffbb4056bff0c19d132151bc3f03 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Sun, 10 Mar 2019 23:06:56 -0700 Subject: [PATCH 16/20] Address code review comments --- cpp/src/arrow/compute/kernels/hash-test.cc | 11 ++- cpp/src/arrow/compute/kernels/hash.cc | 107 ++++++++++----------- 2 files changed, 59 insertions(+), 59 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash-test.cc b/cpp/src/arrow/compute/kernels/hash-test.cc index 6d241ff31a4..d4ffa55cb0f 100644 --- a/cpp/src/arrow/compute/kernels/hash-test.cc +++ b/cpp/src/arrow/compute/kernels/hash-test.cc @@ -43,6 +43,8 @@ #include "arrow/compute/kernels/util-internal.h" #include "arrow/compute/test-util.h" +#include "arrow/ipc/json-simple.h" + using std::shared_ptr; using std::vector; @@ -72,12 +74,13 @@ void CheckValueCountsNull(FunctionContext* ctx, const shared_ptr& type input.value = ArrayData::Make(type, 0 /* length */, std::move(data_buffers), 0 /* null_count */); - shared_ptr ex_values = _MakeArray(type, {}, {}); - shared_ptr ex_counts = _MakeArray(int64(), {}, {}); + shared_ptr ex_values = ArrayFromJSON(type, "[]"); + shared_ptr ex_counts = ArrayFromJSON(int64(), "[]"); shared_ptr result; ASSERT_OK(ValueCounts(ctx, input, &result)); auto result_struct = std::dynamic_pointer_cast(result); + ASSERT_NE(result_struct->GetFieldByName(kValuesFieldName), nullptr); // TODO: We probably shouldn't rely on value ordering. ASSERT_ARRAYS_EQUAL(*ex_values, *result_struct->GetFieldByName(kValuesFieldName)); ASSERT_ARRAYS_EQUAL(*ex_counts, *result_struct->GetFieldByName(kCountsFieldName)); @@ -94,7 +97,7 @@ void CheckValueCounts(FunctionContext* ctx, const shared_ptr& type, _MakeArray(int64(), out_counts, out_is_valid); shared_ptr result; - ASSERT_OK(ValueCounts(ctx, Datum(input), &result)); + ASSERT_OK(ValueCounts(ctx, input, &result)); auto result_struct = std::dynamic_pointer_cast(result); // TODO: We probably shouldn't rely on value ordering. ASSERT_ARRAYS_EQUAL(*ex_values, *result_struct->field(kValuesFieldIndex)); @@ -453,7 +456,7 @@ TEST_F(TestHashKernel, ChunkedArrayInvoke) { // Unique counts shared_ptr counts_array; - ASSERT_OK(ValueCounts(&this->ctx_, Datum(carr), &counts_array)); + ASSERT_OK(ValueCounts(&this->ctx_, carr, &counts_array)); auto counts_struct = std::dynamic_pointer_cast(counts_array); ASSERT_ARRAYS_EQUAL(*ex_dict, *counts_struct->field(0)); ASSERT_ARRAYS_EQUAL(*ex_counts, *counts_struct->field(1)); diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc index 4463a390a3d..17818541221 100644 --- a/cpp/src/arrow/compute/kernels/hash.cc +++ b/cpp/src/arrow/compute/kernels/hash.cc @@ -82,17 +82,13 @@ class UniqueAction : public ActionBase { Status Reserve(const int64_t length) { return Status::OK(); } - Status ObserveNull() { return Status::OK(); } + void ObserveNull() {} template - Status ObserveFound(Index index) { - return Status::OK(); - } + void ObserveFound(Index index) {} template - Status ObserveNotFound(Index index) { - return Status::OK(); - } + void ObserveNotFound(Index index, Status* err_status) {} Status Flush(Datum* out) { return Status::OK(); } @@ -115,12 +111,13 @@ class ValueCountsAction : ActionBase { // builder size is independent of input array size. return Status::OK(); } + Status Reset() { count_builder_.Reset(); return Status::OK(); } - // Don't do anything on flush beceause we don't want to finalize the builder + // Don't do anything on flush because we don't want to finalize the builder // or incur the cost of memory copies. Status Flush(Datum* out) { return Status::OK(); } @@ -134,17 +131,19 @@ class ValueCountsAction : ActionBase { return Status::OK(); } - Status ObserveNull() { return Status::OK(); } + void ObserveNull() {} template - Status ObserveFound(Index slot) { + void ObserveFound(Index slot) { count_builder_[slot]++; - return Status::OK(); } template - Status ObserveNotFound(Index slot) { - return count_builder_.Append(1); + void ObserveNotFound(Index slot, Status* status) { + Status s = count_builder_.Append(1); + if (ARROW_PREDICT_FALSE(!s.ok())) { + *status = s; + } } private: @@ -166,20 +165,16 @@ class DictEncodeAction : public ActionBase { Status Reserve(const int64_t length) { return indices_builder_.Reserve(length); } - Status ObserveNull() { - indices_builder_.UnsafeAppendNull(); - return Status::OK(); - } + void ObserveNull() { indices_builder_.UnsafeAppendNull(); } template - Status ObserveFound(Index index) { + void ObserveFound(Index index) { indices_builder_.UnsafeAppend(index); - return Status::OK(); } template - Status ObserveNotFound(Index index) { - return ObserveFound(index); + void ObserveNotFound(Index index, Status* status) { + ObserveFound(index); } Status Flush(Datum* out) { @@ -268,15 +263,17 @@ class RegularHashKernelImpl : public HashKernelImpl { 0 /* start_offset */, out); } - Status VisitNull() { return action_.ObserveNull(); } + Status VisitNull() { + action_.ObserveNull(); + return Status::Status::OK(); + } Status VisitValue(const Scalar& value) { + auto on_found = [this](int32_t memo_index) { action_.ObserveFound(memo_index); }; + Status status; - auto on_found = [this, &status](int32_t memo_index) { - status = action_.ObserveFound(memo_index); - }; auto on_not_found = [this, &status](int32_t memo_index) { - status = action_.ObserveNotFound(memo_index); + action_.ObserveNotFound(memo_index, &status); }; memo_table_->GetOrInsert(value, on_found, on_not_found); return status; @@ -307,7 +304,7 @@ class NullHashKernelImpl : public HashKernelImpl { Status Append(const ArrayData& arr) override { RETURN_NOT_OK(action_.Reserve(arr.length)); for (int64_t i = 0; i < arr.length; ++i) { - RETURN_NOT_OK(action_.ObserveNull()); + action_.ObserveNull(); } return Status::OK(); } @@ -363,27 +360,27 @@ struct HashKernelTraits> { } // namespace -#define PROCESS_SUPPORTED_HASH_TYPES \ - PROCESS(NullType) \ - PROCESS(BooleanType) \ - PROCESS(UInt8Type) \ - PROCESS(Int8Type) \ - PROCESS(UInt16Type) \ - PROCESS(Int16Type) \ - PROCESS(UInt32Type) \ - PROCESS(Int32Type) \ - PROCESS(UInt64Type) \ - PROCESS(Int64Type) \ - PROCESS(FloatType) \ - PROCESS(DoubleType) \ - PROCESS(Date32Type) \ - PROCESS(Date64Type) \ - PROCESS(Time32Type) \ - PROCESS(Time64Type) \ - PROCESS(TimestampType) \ - PROCESS(BinaryType) \ - PROCESS(StringType) \ - PROCESS(FixedSizeBinaryType) \ +#define PROCESS_SUPPORTED_HASH_TYPES(PROCESS) \ + PROCESS(NullType) \ + PROCESS(BooleanType) \ + PROCESS(UInt8Type) \ + PROCESS(Int8Type) \ + PROCESS(UInt16Type) \ + PROCESS(Int16Type) \ + PROCESS(UInt32Type) \ + PROCESS(Int32Type) \ + PROCESS(UInt64Type) \ + PROCESS(Int64Type) \ + PROCESS(FloatType) \ + PROCESS(DoubleType) \ + PROCESS(Date32Type) \ + PROCESS(Date64Type) \ + PROCESS(Time32Type) \ + PROCESS(Time64Type) \ + PROCESS(TimestampType) \ + PROCESS(BinaryType) \ + PROCESS(StringType) \ + PROCESS(FixedSizeBinaryType) \ PROCESS(Decimal128Type) Status GetUniqueKernel(FunctionContext* ctx, const std::shared_ptr& type, @@ -396,7 +393,7 @@ Status GetUniqueKernel(FunctionContext* ctx, const std::shared_ptr& ty type, ctx->memory_pool())); \ break; - PROCESS_SUPPORTED_HASH_TYPES + PROCESS_SUPPORTED_HASH_TYPES(PROCESS) #undef PROCESS default: break; @@ -421,7 +418,7 @@ Status GetDictionaryEncodeKernel(FunctionContext* ctx, type, ctx->memory_pool())); \ break; - PROCESS_SUPPORTED_HASH_TYPES + PROCESS_SUPPORTED_HASH_TYPES(PROCESS) #undef PROCESS default: break; @@ -447,7 +444,7 @@ Status GetValueCountsKernel(FunctionContext* ctx, const std::shared_ptrmemory_pool())); \ break; - PROCESS_SUPPORTED_HASH_TYPES + PROCESS_SUPPORTED_HASH_TYPES(PROCESS) #undef PROCESS default: break; @@ -506,8 +503,8 @@ Status DictionaryEncode(FunctionContext* ctx, const Datum& value, Datum* out) { return Status::OK(); } -const char kValuesFieldName[] = "Values"; -const char kCountsFieldName[] = "Counts"; +const char kValuesFieldName[] = "values"; +const char kCountsFieldName[] = "counts"; const int32_t kValuesFieldIndex = 0; const int32_t kCountsFieldIndex = 1; Status ValueCounts(FunctionContext* ctx, const Datum& value, @@ -524,8 +521,8 @@ Status ValueCounts(FunctionContext* ctx, const Datum& value, RETURN_NOT_OK(func->FlushFinal(&value_counts)); auto data_type = std::make_shared(std::vector>{ - std::make_shared("Values", uniques->type()), - std::make_shared("Counts", int64())}); + std::make_shared(kValuesFieldName, uniques->type()), + std::make_shared(kCountsFieldName, int64())}); *counts = std::make_shared( data_type, uniques->length(), std::vector>{uniques, MakeArray(value_counts.array())}); From 54afb2bac389b38e6346c16c285a491f94bb40c1 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Sun, 10 Mar 2019 23:36:54 -0700 Subject: [PATCH 17/20] change from std::copy to memcopy --- cpp/src/arrow/util/hashing.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h index c2f541bf64f..3e2a3365552 100644 --- a/cpp/src/arrow/util/hashing.h +++ b/cpp/src/arrow/util/hashing.h @@ -475,7 +475,8 @@ class SmallScalarMemoTable { void CopyValues(int32_t start, Scalar* out_data) const { DCHECK_GE(start, 0); DCHECK_LE(static_cast(start), index_to_value_.size()); - std::copy(index_to_value_.begin() + start, index_to_value_.end(), out_data); + int offset = start * sizeof(Scalar); + memcpy(out_data, index_to_value_.data() + offset, (size() - start) * sizeof(Scalar)); } void CopyValues(Scalar* out_data) const { CopyValues(0, out_data); } From 72095ebc4c49c195ab55b59cb7d2e86d95143184 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Mon, 11 Mar 2019 20:20:19 -0700 Subject: [PATCH 18/20] Templatize whether to use return status --- cpp/src/arrow/compute/kernels/hash.cc | 101 +++++++++++++++++--------- 1 file changed, 65 insertions(+), 36 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc index 17818541221..9994f86fdd0 100644 --- a/cpp/src/arrow/compute/kernels/hash.cc +++ b/cpp/src/arrow/compute/kernels/hash.cc @@ -88,7 +88,12 @@ class UniqueAction : public ActionBase { void ObserveFound(Index index) {} template - void ObserveNotFound(Index index, Status* err_status) {} + void ObserveNotFound(Index index, Status* err_status) { + ARROW_LOG(FATAL) << "ObserveNotFound with err_status should not be called"; + } + + template + void ObserveNotFound(Index index) {} Status Flush(Datum* out) { return Status::OK(); } @@ -138,6 +143,11 @@ class ValueCountsAction : ActionBase { count_builder_[slot]++; } + template + void ObserveNotFound(Index slot) { + ARROW_LOG(FATAL) << "ObserveNotFound without err_status should not be called"; + } + template void ObserveNotFound(Index slot, Status* status) { Status s = count_builder_.Append(1); @@ -173,10 +183,15 @@ class DictEncodeAction : public ActionBase { } template - void ObserveNotFound(Index index, Status* status) { + void ObserveNotFound(Index index) { ObserveFound(index); } + template + void ObserveNotFound(Index index, Status* err_status) { + ARROW_LOG(FATAL) << "ObserveNotFound with err_status should not be called"; + } + Status Flush(Datum* out) { std::shared_ptr result; RETURN_NOT_OK(indices_builder_.FinishInternal(&result)); @@ -238,7 +253,7 @@ class HashKernelImpl : public HashKernel { // Base class for all "regular" hash kernel implementations // (NullType has a separate implementation) -template +template class RegularHashKernelImpl : public HashKernelImpl { public: RegularHashKernelImpl(const std::shared_ptr& type, MemoryPool* pool) @@ -271,12 +286,22 @@ class RegularHashKernelImpl : public HashKernelImpl { Status VisitValue(const Scalar& value) { auto on_found = [this](int32_t memo_index) { action_.ObserveFound(memo_index); }; + if (with_error_status) { + Status status; - auto on_not_found = [this, &status](int32_t memo_index) { - action_.ObserveNotFound(memo_index, &status); - }; - memo_table_->GetOrInsert(value, on_found, on_not_found); + auto on_not_found = [this, &status](int32_t memo_index) { + action_.ObserveNotFound(memo_index, &status); + }; + memo_table_->GetOrInsert(value, on_found, on_not_found); return status; + } else { + auto on_not_found = [this](int32_t memo_index) { + action_.ObserveNotFound(memo_index); + }; + + memo_table_->GetOrInsert(value, on_found, on_not_found); + } + return Status::OK(); } std::shared_ptr out_type() const override { return action_.out_type(); } @@ -330,32 +355,35 @@ class NullHashKernelImpl : public HashKernelImpl { // ---------------------------------------------------------------------- // Kernel wrapper for generic hash table kernels -template +template struct HashKernelTraits {}; -template -struct HashKernelTraits> { +template +struct HashKernelTraits> { using HashKernelImpl = NullHashKernelImpl; }; -template -struct HashKernelTraits> { - using HashKernelImpl = RegularHashKernelImpl; +template +struct HashKernelTraits> { + using HashKernelImpl = + RegularHashKernelImpl; }; -template -struct HashKernelTraits> { - using HashKernelImpl = RegularHashKernelImpl; +template +struct HashKernelTraits> { + using HashKernelImpl = RegularHashKernelImpl; }; -template -struct HashKernelTraits> { - using HashKernelImpl = RegularHashKernelImpl; +template +struct HashKernelTraits> { + using HashKernelImpl = + RegularHashKernelImpl; }; -template -struct HashKernelTraits> { - using HashKernelImpl = RegularHashKernelImpl; +template +struct HashKernelTraits> { + using HashKernelImpl = + RegularHashKernelImpl; }; } // namespace @@ -387,10 +415,11 @@ Status GetUniqueKernel(FunctionContext* ctx, const std::shared_ptr& ty std::unique_ptr* out) { std::unique_ptr kernel; switch (type->id()) { -#define PROCESS(InType) \ - case InType::type_id: \ - kernel.reset(new typename HashKernelTraits::HashKernelImpl( \ - type, ctx->memory_pool())); \ +#define PROCESS(InType) \ + case InType::type_id: \ + kernel.reset(new \ + typename HashKernelTraits::HashKernelImpl( \ + type, ctx->memory_pool())); \ break; PROCESS_SUPPORTED_HASH_TYPES(PROCESS) @@ -411,11 +440,11 @@ Status GetDictionaryEncodeKernel(FunctionContext* ctx, std::unique_ptr kernel; switch (type->id()) { -#define PROCESS(InType) \ - case InType::type_id: \ - kernel.reset(new \ - typename HashKernelTraits::HashKernelImpl( \ - type, ctx->memory_pool())); \ +#define PROCESS(InType) \ + case InType::type_id: \ + kernel.reset( \ + new typename HashKernelTraits::HashKernelImpl( \ + type, ctx->memory_pool())); \ break; PROCESS_SUPPORTED_HASH_TYPES(PROCESS) @@ -437,11 +466,11 @@ Status GetValueCountsKernel(FunctionContext* ctx, const std::shared_ptr kernel; switch (type->id()) { -#define PROCESS(InType) \ - case InType::type_id: \ - kernel.reset(new \ - typename HashKernelTraits::HashKernelImpl( \ - type, ctx->memory_pool())); \ +#define PROCESS(InType) \ + case InType::type_id: \ + kernel.reset( \ + new typename HashKernelTraits::HashKernelImpl( \ + type, ctx->memory_pool())); \ break; PROCESS_SUPPORTED_HASH_TYPES(PROCESS) From dd0d8a1552280f7f073522df537b5a64983b6e09 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Mon, 11 Mar 2019 21:08:39 -0700 Subject: [PATCH 19/20] fix link and warning --- cpp/src/arrow/compute/kernels/hash.cc | 8 ++++---- cpp/src/arrow/util/hashing.h | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/hash.cc b/cpp/src/arrow/compute/kernels/hash.cc index 9994f86fdd0..2a3031fc7bc 100644 --- a/cpp/src/arrow/compute/kernels/hash.cc +++ b/cpp/src/arrow/compute/kernels/hash.cc @@ -287,13 +287,12 @@ class RegularHashKernelImpl : public HashKernelImpl { auto on_found = [this](int32_t memo_index) { action_.ObserveFound(memo_index); }; if (with_error_status) { - - Status status; + Status status; auto on_not_found = [this, &status](int32_t memo_index) { action_.ObserveNotFound(memo_index, &status); }; memo_table_->GetOrInsert(value, on_found, on_not_found); - return status; + return status; } else { auto on_not_found = [this](int32_t memo_index) { action_.ObserveNotFound(memo_index); @@ -381,7 +380,8 @@ struct HashKernelTraits> }; template -struct HashKernelTraits> { +struct HashKernelTraits> { using HashKernelImpl = RegularHashKernelImpl; }; diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h index 3e2a3365552..de836eff740 100644 --- a/cpp/src/arrow/util/hashing.h +++ b/cpp/src/arrow/util/hashing.h @@ -475,7 +475,7 @@ class SmallScalarMemoTable { void CopyValues(int32_t start, Scalar* out_data) const { DCHECK_GE(start, 0); DCHECK_LE(static_cast(start), index_to_value_.size()); - int offset = start * sizeof(Scalar); + int32_t offset = start * static_cast(sizeof(Scalar)); memcpy(out_data, index_to_value_.data() + offset, (size() - start) * sizeof(Scalar)); } From 9c55f7ba69f4067c41485076d161a563e3b5afbf Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Mon, 11 Mar 2019 21:10:23 -0700 Subject: [PATCH 20/20] make 64 bit --- cpp/src/arrow/util/hashing.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/hashing.h b/cpp/src/arrow/util/hashing.h index de836eff740..044d4e96624 100644 --- a/cpp/src/arrow/util/hashing.h +++ b/cpp/src/arrow/util/hashing.h @@ -475,7 +475,7 @@ class SmallScalarMemoTable { void CopyValues(int32_t start, Scalar* out_data) const { DCHECK_GE(start, 0); DCHECK_LE(static_cast(start), index_to_value_.size()); - int32_t offset = start * static_cast(sizeof(Scalar)); + int64_t offset = start * static_cast(sizeof(Scalar)); memcpy(out_data, index_to_value_.data() + offset, (size() - start) * sizeof(Scalar)); }