Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ arrow-src.tar
arrow-src.tar.gz

# Compiled source
cpp/debug
*.a
*.dll
*.o
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/array/builder_primitive.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,9 @@ using Int64Builder = NumericBuilder<Int64Type>;
using HalfFloatBuilder = NumericBuilder<HalfFloatType>;
using FloatBuilder = NumericBuilder<FloatType>;
using DoubleBuilder = NumericBuilder<DoubleType>;
using ComplexFloatBuilder = NumericBuilder<ComplexFloatType>;
using ComplexDoubleBuilder = NumericBuilder<ComplexDoubleType>;


class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
public:
Expand Down
14 changes: 13 additions & 1 deletion cpp/src/arrow/array/diff.cc
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,21 @@ class MakeFormatterImpl {
return Status::OK();
}

template <typename T>
enable_if_complex<T, Status> Visit(const T&) {
impl_ = [](const Array& array, int64_t index, std::ostream* os) {
const auto& numeric = checked_cast<const NumericArray<T>&>(array);
*os << numeric.Value(index);
};
return Status::OK();
}


// format Numerics with std::ostream defaults
template <typename T>
enable_if_number<T, Status> Visit(const T&) {
enable_if_t<is_number_type<T>::value &&
!is_complex_type<T>::value, Status>
Visit(const T&) {
impl_ = [](const Array& array, int64_t index, std::ostream* os) {
const auto& numeric = checked_cast<const NumericArray<T>&>(array);
if (sizeof(decltype(numeric.Value(index))) == sizeof(char)) {
Expand Down
62 changes: 62 additions & 0 deletions cpp/src/arrow/compute/kernels/aggregate_basic_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,35 @@ struct MeanImpl : public SumImpl<ArrowType, SimdLevel> {
ScalarAggregateOptions options;
};


template <SimdLevel::type SimdLevel>
struct MeanImpl<ComplexFloatType, SimdLevel> : public SumImpl<ComplexFloatType, SimdLevel> {
Status Finalize(KernelContext*, Datum* out) override {
if (this->count < options.min_count) {
out->value = std::make_shared<ComplexDoubleScalar>();
} else {
const std::complex<double> mean = this->sum / double(this->count);
out->value = std::make_shared<ComplexDoubleScalar>(mean);
}
return Status::OK();
}
ScalarAggregateOptions options;
};

template <SimdLevel::type SimdLevel>
struct MeanImpl<ComplexDoubleType, SimdLevel> : public SumImpl<ComplexDoubleType, SimdLevel> {
Status Finalize(KernelContext*, Datum* out) override {
if (this->count < options.min_count) {
out->value = std::make_shared<ComplexDoubleScalar>();
} else {
const std::complex<double> mean = this->sum / double(this->count);
out->value = std::make_shared<ComplexDoubleScalar>(mean);
}
return Status::OK();
}
ScalarAggregateOptions options;
};

template <template <typename> class KernelClass>
struct SumLikeInit {
std::unique_ptr<KernelState> state;
Expand Down Expand Up @@ -217,6 +246,39 @@ struct MinMaxState<ArrowType, SimdLevel, enable_if_floating_point<ArrowType>> {
bool has_values = false;
};

template <typename ArrowType, SimdLevel::type SimdLevel>
struct MinMaxState<ArrowType, SimdLevel, enable_if_complex<ArrowType>> {
using ThisType = MinMaxState<ArrowType, SimdLevel>;
using T = typename ArrowType::c_type;
using R = typename T::value_type;

ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = T(std::fmin(this->min.real(), rhs.min.real()),
std::fmin(this->min.imag(), rhs.min.imag()));
this->max = T(std::fmax(this->max.real(), rhs.max.real()),
std::fmax(this->max.imag(), rhs.max.imag()));
return *this;
}

void MergeOne(T value) {
this->min = T(std::fmin(this->min.real(), value.real()),
std::fmin(this->min.imag(), value.imag()));
this->max = T(std::fmax(this->max.real(), value.real()),
std::fmax(this->max.imag(), value.imag()));
}

T min = T(std::numeric_limits<R>::infinity(),
std::numeric_limits<R>::infinity());
T max = T(-std::numeric_limits<R>::infinity(),
-std::numeric_limits<R>::infinity());

bool has_nulls = false;
bool has_values = false;
};


template <typename ArrowType, SimdLevel::type SimdLevel>
struct MinMaxImpl : public ScalarAggregator {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
Expand Down
9 changes: 8 additions & 1 deletion cpp/src/arrow/compute/kernels/aggregate_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ struct FindAccumulatorType<I, enable_if_floating_point<I>> {
using Type = DoubleType;
};

template <typename I>
struct FindAccumulatorType<I, enable_if_complex<I>> {
using Type = ComplexDoubleType;
};


struct ScalarAggregator : public KernelState {
virtual Status Consume(KernelContext* ctx, const ExecBatch& batch) = 0;
virtual Status MergeFrom(KernelContext* ctx, KernelState&& src) = 0;
Expand All @@ -66,7 +72,8 @@ using arrow::internal::VisitSetBitRunsVoid;
// non-recursive pairwise summation for floating points
// https://en.wikipedia.org/wiki/Pairwise_summation
template <typename ValueType, typename SumType, typename ValueFunc>
enable_if_t<std::is_floating_point<SumType>::value, SumType> SumArray(
enable_if_t<std::is_floating_point<SumType>::value ||
is_complex_type<SumType>::value, SumType> SumArray(
const ArrayData& data, ValueFunc&& func) {
const int64_t data_size = data.length - data.GetNullCount();
if (data_size == 0) {
Expand Down
76 changes: 69 additions & 7 deletions cpp/src/arrow/compute/kernels/aggregate_var_std.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <cmath>
#include <complex>

#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/kernels/aggregate_internal.h"
Expand All @@ -32,16 +33,50 @@ namespace {
using arrow::internal::int128_t;
using arrow::internal::VisitSetBitRunsVoid;

template <typename ArrowType> struct VarStdTraits {
using MeanType = double;
using VarType = double;
using StdType = double;

static inline VarType mean_squared(MeanType mean) {
return mean*mean;
}
};

template <> struct VarStdTraits<ComplexFloatType> {
using MeanType = std::complex<double>;
using VarType = double;
using StdType = double;

static inline VarType mean_squared(MeanType mean) {
return mean.real()*mean.real() + mean.imag() + mean.imag();
}
};

template <> struct VarStdTraits<ComplexDoubleType> {
using MeanType = std::complex<double>;
using VarType = double;
using StdType = double;

static inline VarType mean_squared(MeanType mean) {
return mean.real()*mean.real() + mean.imag() + mean.imag();
}
};


template <typename ArrowType>
struct VarStdState {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using CType = typename ArrowType::c_type;
using ThisType = VarStdState<ArrowType>;
using Traits = VarStdTraits<ArrowType>;
using MeanType = typename Traits::MeanType;

// float/double/int64: calculate `m2` (sum((X-mean)^2)) with `two pass algorithm`
// https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Two-pass_algorithm
template <typename T = ArrowType>
enable_if_t<is_floating_type<T>::value || (sizeof(CType) > 4)> Consume(
enable_if_t<is_floating_type<T>::value ||
(is_integer_type<T>::value && sizeof(CType) > 4)> Consume(
const ArrayType& array) {
int64_t count = array.length() - array.null_count();
if (count == 0) {
Expand All @@ -52,7 +87,7 @@ struct VarStdState {
typename std::conditional<is_floating_type<T>::value, double, int128_t>::type;
SumType sum = arrow::compute::detail::SumArray<CType, SumType>(*array.data());

const double mean = static_cast<double>(sum) / count;
const MeanType mean = static_cast<double>(sum) / count;
const double m2 = arrow::compute::detail::SumArray<CType, double>(
*array.data(), [mean](CType value) {
const double v = static_cast<double>(value);
Expand All @@ -64,6 +99,30 @@ struct VarStdState {
this->m2 = m2;
}

// complex numbers
template <typename T = ArrowType>
enable_if_t<is_complex_type<T>::value> Consume(
const ArrayType& array) {
int64_t count = array.length() - array.null_count();
if (count == 0) {
return;
}

MeanType sum = arrow::compute::detail::SumArray<CType, MeanType>(*array.data());

const MeanType mean = static_cast<MeanType>(sum) / double(count);
const double m2 = arrow::compute::detail::SumArray<CType, double>(
*array.data(), [mean](CType value) {
const MeanType v = static_cast<MeanType>(value);
return Traits::mean_squared(v - mean);
});

this->count = count;
this->mean = mean;
this->m2 = m2;
}


// int32/16/8: textbook one pass algorithm with integer arithmetic
template <typename T = ArrowType>
enable_if_t<is_integer_type<T>::value && (sizeof(CType) <= 4)> Consume(
Expand Down Expand Up @@ -127,16 +186,19 @@ struct VarStdState {
this->m2 = state.m2;
return;
}
double mean = (this->mean * this->count + state.mean * state.count) /
(this->count + state.count);
this->m2 += state.m2 + this->count * (this->mean - mean) * (this->mean - mean) +
state.count * (state.mean - mean) * (state.mean - mean);
double this_count = double(this->count);
double state_count = double(state.count);

MeanType mean = (this->mean * this_count + state.mean * state_count) /
(this_count + state_count);
this->m2 += state.m2 + this_count * Traits::mean_squared(this->mean - mean) +
state_count * Traits::mean_squared(state.mean - mean);
this->count += state.count;
this->mean = mean;
}

int64_t count = 0;
double mean = 0;
MeanType mean = 0;
double m2 = 0; // m2 = count*s2 = sum((X-mean)^2)
};

Expand Down
54 changes: 53 additions & 1 deletion cpp/src/arrow/compute/kernels/hash_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <complex>
#include <functional>
#include <memory>
#include <string>
Expand Down Expand Up @@ -942,6 +943,30 @@ struct Extrema<double> {
static constexpr double max() { return std::numeric_limits<double>::infinity(); }
};

template <>
struct Extrema<std::complex<float>> {
static constexpr std::complex<float> min() {
return std::complex<float>(-std::numeric_limits<float>::infinity(),
-std::numeric_limits<float>::infinity());
}
static constexpr std::complex<float> max() {
return std::complex<float>(std::numeric_limits<float>::infinity(),
std::numeric_limits<float>::infinity());
}
};

template <>
struct Extrema<std::complex<double>> {
static constexpr std::complex<double> min() {
return std::complex<double>(-std::numeric_limits<double>::infinity(),
-std::numeric_limits<double>::infinity());
}
static constexpr std::complex<double> max() {
return std::complex<double>(std::numeric_limits<double>::infinity(),
std::numeric_limits<double>::infinity());
}
};

struct GroupedMinMaxImpl : public GroupedAggregator {
using ConsumeImpl =
std::function<void(const std::shared_ptr<ArrayData>&, const uint32_t*, void*, void*,
Expand All @@ -962,7 +987,8 @@ struct GroupedMinMaxImpl : public GroupedAggregator {

struct GetImpl {
template <typename T, typename CType = typename TypeTraits<T>::CType>
enable_if_number<T, Status> Visit(const T&) {
enable_if_t<is_number_type<T>::value && ! is_complex_type<T>::value, Status>
Visit(const T&) {
consume_impl = [](const std::shared_ptr<ArrayData>& input, const uint32_t* group,
void* mins, void* maxes, uint8_t* has_values,
uint8_t* has_nulls) {
Expand All @@ -984,6 +1010,32 @@ struct GroupedMinMaxImpl : public GroupedAggregator {
return Status::OK();
}

template <typename T, typename CType = typename TypeTraits<T>::CType>
enable_if_t<is_number_type<T>::value && is_complex_type<T>::value, Status>
Visit(const T&) {
consume_impl = [](const std::shared_ptr<ArrayData>& input, const uint32_t* group,
void* mins, void* maxes, uint8_t* has_values,
uint8_t* has_nulls) {
auto raw_mins = reinterpret_cast<CType*>(mins);
auto raw_maxes = reinterpret_cast<CType*>(maxes);

VisitArrayDataInline<T>(
*input,
[&](const CType & val) {
raw_maxes[*group] = CType(std::max(raw_maxes[*group].real(), val.real()),
std::max(raw_maxes[*group].imag(), val.imag()));
raw_maxes[*group] = CType(std::min(raw_mins[*group].real(), val.real()),
std::min(raw_mins[*group].imag(), val.imag()));
},
[&] { BitUtil::SetBit(has_nulls, *group++); });
};

resize_min_impl = MakeResizeImpl(Extrema<CType>::max());
resize_max_impl = MakeResizeImpl(Extrema<CType>::min());
return Status::OK();
}


Status Visit(const BooleanType& type) {
return Status::NotImplemented("Grouped MinMax data of type ", type);
}
Expand Down
27 changes: 24 additions & 3 deletions cpp/src/arrow/compute/kernels/scalar_set_lookup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,14 @@ struct InitStateVisitor {
}

template <typename Type>
enable_if_t<has_c_type<Type>::value && !is_boolean_type<Type>::value, Status> Visit(
enable_if_complex<Type, Status> Visit(const Type &) {
return Init<Type>();
}

template <typename Type>
enable_if_t<has_c_type<Type>::value &&
!is_boolean_type<Type>::value &&
!is_complex_type<Type>::value, Status> Visit(
const Type&) {
return Init<typename UnsignedIntType<sizeof(typename Type::c_type)>::Type>();
}
Expand Down Expand Up @@ -262,7 +269,14 @@ struct IndexInVisitor {
}

template <typename Type>
enable_if_t<has_c_type<Type>::value && !is_boolean_type<Type>::value, Status> Visit(
enable_if_complex<Type, Status> Visit(const Type&) {
return ProcessIndexIn<Type>();
}

template <typename Type>
enable_if_t<has_c_type<Type>::value &&
!is_boolean_type<Type>::value &&
!is_complex_type<Type>::value, Status> Visit(
const Type&) {
return ProcessIndexIn<
typename UnsignedIntType<sizeof(typename Type::c_type)>::Type>();
Expand Down Expand Up @@ -352,7 +366,14 @@ struct IsInVisitor {
}

template <typename Type>
enable_if_t<has_c_type<Type>::value && !is_boolean_type<Type>::value, Status> Visit(
enable_if_complex<Type, Status> Visit(const Type&) {
return ProcessIsIn<Type>();
}

template <typename Type>
enable_if_t<has_c_type<Type>::value &&
!is_boolean_type<Type>::value &&
!is_complex_type<Type>::value, Status> Visit(
const Type&) {
return ProcessIsIn<typename UnsignedIntType<sizeof(typename Type::c_type)>::Type>();
}
Expand Down
Loading