Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/kernels/codegen_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,7 @@ ArrayKernelExec GeneratePhysicalInteger(detail::GetTypeId get_id) {
case Type::DATE64:
case Type::TIMESTAMP:
case Type::TIME64:
case Type::DURATION:
return Generator<Type0, Int64Type, Args...>::Exec;
case Type::UINT8:
return Generator<Type0, UInt8Type, Args...>::Exec;
Expand Down
72 changes: 47 additions & 25 deletions cpp/src/arrow/compute/kernels/vector_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ using enable_if_can_hash =

template <typename Type, typename Action>
struct HashInitFunctor {
using ArrayType = typename TypeTraits<Type>::ArrayType;
using HashKernelType = typename HashKernelTraits<Type, Action>::HashKernelImpl;

static std::unique_ptr<KernelState> Init(KernelContext* ctx,
Expand All @@ -423,19 +422,48 @@ struct HashInitFunctor {
};

template <typename Action>
struct HashInitVisitor {
VectorKernel* out;

Status Visit(const DataType& type) {
return Status::NotImplemented("Hashing not available for ", type.ToString());
KernelInit GetHashInit(Type::type type_id) {
// ARROW-8933: Generate only a single hash kernel per physical data
// representation
switch (type_id) {
case Type::NA:
return HashInitFunctor<NullType, Action>::Init;
case Type::BOOL:
return HashInitFunctor<BooleanType, Action>::Init;
case Type::INT8:
case Type::UINT8:
return HashInitFunctor<UInt8Type, Action>::Init;
case Type::INT16:
case Type::UINT16:
return HashInitFunctor<UInt16Type, Action>::Init;
case Type::INT32:
case Type::UINT32:
case Type::FLOAT:
case Type::DATE32:
case Type::TIME32:
return HashInitFunctor<UInt32Type, Action>::Init;
case Type::INT64:
case Type::UINT64:
case Type::DOUBLE:
case Type::DATE64:
case Type::TIME64:
case Type::TIMESTAMP:
case Type::DURATION:
return HashInitFunctor<UInt64Type, Action>::Init;
case Type::BINARY:
case Type::STRING:
return HashInitFunctor<BinaryType, Action>::Init;
case Type::LARGE_BINARY:
case Type::LARGE_STRING:
return HashInitFunctor<LargeBinaryType, Action>::Init;
case Type::FIXED_SIZE_BINARY:
case Type::DECIMAL:
return HashInitFunctor<FixedSizeBinaryType, Action>::Init;
default:
DCHECK(false);
return nullptr;
}

template <typename Type>
enable_if_can_hash<Type, Status> Visit(const Type&) {
out->init = HashInitFunctor<Type, Action>::Init;
return Status::OK();
}
};
}

void HashExec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
auto hash_impl = checked_cast<HashKernel*>(ctx->state());
Expand Down Expand Up @@ -485,34 +513,28 @@ ValueDescr ValueCountsOutput(KernelContext*, const std::vector<ValueDescr>& desc
{field(kValuesFieldName, descrs[0].type), field(kCountsFieldName, int64())}));
}

template <typename Action>
void AddKernel(VectorFunction* func, VectorKernel kernel,
const std::shared_ptr<DataType>& type) {
HashInitVisitor<Action> visitor{&kernel};
DCHECK_OK(VisitTypeInline(*type, &visitor));
DCHECK_OK(func->AddKernel(std::move(kernel)));
}

template <typename Action>
void AddHashKernels(VectorFunction* func, VectorKernel base,
OutputType::Resolver out_resolver) {
OutputType out_ty(out_resolver);
for (const auto& ty : PrimitiveTypes()) {
base.init = GetHashInit<Action>(ty->id());
base.signature = KernelSignature::Make({InputType::Array(ty)}, out_ty);
AddKernel<Action>(func, base, ty);
DCHECK_OK(func->AddKernel(base));
}

// Example parametric types that we want to match only on Type::type
auto parametric_types = {time32(TimeUnit::SECOND), time64(TimeUnit::MICRO),
timestamp(TimeUnit::SECOND), fixed_size_binary(0)};
for (const auto& ty : parametric_types) {
base.init = GetHashInit<Action>(ty->id());
base.signature = KernelSignature::Make({InputType::Array(ty->id())}, out_ty);
AddKernel<Action>(func, base, /*dummy=*/ty);
DCHECK_OK(func->AddKernel(base));
}

// Handle Decimal as a physical string, not a number
base.init = GetHashInit<Action>(Type::DECIMAL);
base.signature = KernelSignature::Make({InputType::Array(Type::DECIMAL)}, out_ty);
AddKernel<Action>(func, base, fixed_size_binary(0));
DCHECK_OK(func->AddKernel(base));
}

} // namespace
Expand Down
26 changes: 23 additions & 3 deletions cpp/src/arrow/compute/kernels/vector_sort.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <limits>
#include <numeric>

#include "arrow/array/data.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/kernels/common.h"
#include "arrow/util/optional.h"
Expand All @@ -37,11 +38,26 @@ struct PartitionIndicesState : public KernelState {
int64_t pivot;
};

Status GetPhysicalView(const std::shared_ptr<ArrayData>& arr,
const std::shared_ptr<DataType>& type,
std::shared_ptr<ArrayData>* out) {
if (!arr->type->Equals(*type)) {
return ::arrow::internal::GetArrayView(arr, type).Value(out);
} else {
*out = arr;
return Status::OK();
}
}

template <typename OutType, typename InType>
struct PartitionIndices {
using ArrayType = typename TypeTraits<InType>::ArrayType;
static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
ArrayType arr(batch[0].array());
std::shared_ptr<ArrayData> arg0;
KERNEL_RETURN_IF_ERROR(
ctx,
GetPhysicalView(batch[0].array(), TypeTraits<InType>::type_singleton(), &arg0));
ArrayType arr(arg0);

int64_t pivot = checked_cast<const PartitionIndicesState&>(*ctx->state()).pivot;
if (pivot > arr.length()) {
Expand Down Expand Up @@ -227,7 +243,11 @@ template <typename OutType, typename InType>
struct SortIndices {
using ArrayType = typename TypeTraits<InType>::ArrayType;
static void Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
ArrayType arr(batch[0].array());
std::shared_ptr<ArrayData> arg0;
KERNEL_RETURN_IF_ERROR(
ctx,
GetPhysicalView(batch[0].array(), TypeTraits<InType>::type_singleton(), &arg0));
ArrayType arr(arg0);
ArrayData* out_arr = out->mutable_array();
uint64_t* out_begin = out_arr->GetMutableValues<uint64_t>(1);
uint64_t* out_end = out_begin + arr.length();
Expand Down Expand Up @@ -259,7 +279,7 @@ void AddSortingKernels(VectorKernel base, VectorFunction* func) {
}
for (const auto& ty : BaseBinaryTypes()) {
base.signature = KernelSignature::Make({InputType::Array(ty)}, uint64());
base.exec = GenerateVarBinary<ExecTemplate, UInt64Type>(*ty);
base.exec = GenerateVarBinaryBase<ExecTemplate, UInt64Type>(*ty);
DCHECK_OK(func->AddKernel(base));
}
}
Expand Down