Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 179 additions & 41 deletions cpp/src/arrow/compute/kernels/scalar_cast_temporal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "arrow/array/builder_time.h"
#include "arrow/compute/kernels/common.h"
#include "arrow/compute/kernels/scalar_cast_internal.h"
#include "arrow/compute/kernels/temporal_internal.h"
#include "arrow/util/bitmap_reader.h"
#include "arrow/util/time.h"
#include "arrow/util/value_parsing.h"
Expand Down Expand Up @@ -117,6 +118,28 @@ Status ShiftTime(KernelContext* ctx, const util::DivideOrMultiply factor_op,
return Status::OK();
}

template <template <typename...> class Op, typename OutType, typename... Args>
Status ExtractTemporal(KernelContext* ctx, const ExecBatch& batch, Datum* out,
Args... args) {
const auto& ty = checked_cast<const TimestampType&>(*batch[0].type());

switch (ty.unit()) {
case TimeUnit::SECOND:
return TemporalComponentExtract<Op, std::chrono::seconds, TimestampType, OutType,
Args...>::Exec(ctx, batch, out, args...);
case TimeUnit::MILLI:
return TemporalComponentExtract<Op, std::chrono::milliseconds, TimestampType,
OutType, Args...>::Exec(ctx, batch, out, args...);
case TimeUnit::MICRO:
return TemporalComponentExtract<Op, std::chrono::microseconds, TimestampType,
OutType, Args...>::Exec(ctx, batch, out, args...);
case TimeUnit::NANO:
return TemporalComponentExtract<Op, std::chrono::nanoseconds, TimestampType,
OutType, Args...>::Exec(ctx, batch, out, args...);
}
return Status::Invalid("Unknown timestamp unit: ", ty);
}

// <TimestampType, TimestampType> and <DurationType, DurationType>
template <typename O, typename I>
struct CastFunctor<
Expand All @@ -142,68 +165,175 @@ struct CastFunctor<
}
};

// ----------------------------------------------------------------------
// From timestamp to date32 or date64

template <>
struct CastFunctor<Date32Type, TimestampType> {
static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
DCHECK_EQ(batch[0].kind(), Datum::ARRAY);

const ArrayData& input = *batch[0].array();
ArrayData* output = out->mutable_array();

const auto& in_type = checked_cast<const TimestampType&>(*input.type);
template <typename Duration, typename Localizer>
struct Date32 {
Date32(const FunctionOptions* options, Localizer&& localizer)
: localizer_(std::move(localizer)) {}

template <typename T, typename Arg0>
T Call(KernelContext*, Arg0 arg, Status*) const {
return static_cast<T>(static_cast<const int32_t>(
floor<days>(localizer_.template ConvertTimePoint<Duration>(arg))
.time_since_epoch()
.count()));
}

static const int64_t kTimestampToDateFactors[4] = {
86400LL, // SECOND
86400LL * 1000LL, // MILLI
86400LL * 1000LL * 1000LL, // MICRO
86400LL * 1000LL * 1000LL * 1000LL, // NANO
};
Localizer localizer_;
};

const int64_t factor = kTimestampToDateFactors[static_cast<int>(in_type.unit())];
return ShiftTime<int64_t, int32_t>(ctx, util::DIVIDE, factor, input, output);
static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
return ExtractTemporal<Date32, Date32Type>(ctx, batch, out);
}
};

template <>
struct CastFunctor<Date64Type, TimestampType> {
template <typename Duration, typename Localizer>
struct Date64 {
constexpr static int64_t kMillisPerDay = 86400000;
Date64(const FunctionOptions* options, Localizer&& localizer)
: localizer_(std::move(localizer)) {}

template <typename T, typename Arg0>
T Call(KernelContext*, Arg0 arg, Status*) const {
return static_cast<T>(
kMillisPerDay *
static_cast<const int32_t>(
floor<days>(localizer_.template ConvertTimePoint<Duration>(arg))
.time_since_epoch()
.count()));
}

Localizer localizer_;
};

static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
return ExtractTemporal<Date64, Date64Type>(ctx, batch, out);
}
};

const CastOptions& options = checked_cast<const CastState&>(*ctx->state()).options;
const ArrayData& input = *batch[0].array();
ArrayData* output = out->mutable_array();
const auto& in_type = checked_cast<const TimestampType&>(*input.type);
// ----------------------------------------------------------------------
// From timestamp to time32 or time64

template <typename Duration, typename Localizer>
struct ExtractTimeDownscaled {
ExtractTimeDownscaled(const FunctionOptions* options, Localizer&& localizer,
const int64_t factor)
: localizer_(std::move(localizer)), factor_(factor) {}

template <typename T, typename Arg0>
T Call(KernelContext*, Arg0 arg, Status* st) const {
const auto t = localizer_.template ConvertTimePoint<Duration>(arg);
const int64_t orig_value = (t - floor<days>(t)).count();
const T scaled = static_cast<T>(orig_value / factor_);
const int64_t unscaled = static_cast<int64_t>(scaled) * factor_;
if (unscaled != orig_value) {
*st = Status::Invalid("Cast would lose data: ", orig_value);
return 0;
}
return scaled;
}

Localizer localizer_;
const int64_t factor_;
};

auto conversion = util::GetTimestampConversion(in_type.unit(), TimeUnit::MILLI);
RETURN_NOT_OK((ShiftTime<int64_t, int64_t>(ctx, conversion.first, conversion.second,
input, output)));
template <typename Duration, typename Localizer>
struct ExtractTimeUpscaledUnchecked {
ExtractTimeUpscaledUnchecked(const FunctionOptions* options, Localizer&& localizer,
const int64_t factor)
: localizer_(std::move(localizer)), factor_(factor) {}

template <typename T, typename Arg0>
T Call(KernelContext*, Arg0 arg, Status*) const {
const auto t = localizer_.template ConvertTimePoint<Duration>(arg);
const int64_t orig_value = (t - floor<days>(t)).count();
return static_cast<T>(orig_value * factor_);
}

Localizer localizer_;
const int64_t factor_;
};

// Ensure that intraday milliseconds have been zeroed out
auto out_data = output->GetMutableValues<int64_t>(1);
template <typename Duration, typename Localizer>
struct ExtractTimeDownscaledUnchecked {
ExtractTimeDownscaledUnchecked(const FunctionOptions* options, Localizer&& localizer,
const int64_t factor)
: localizer_(std::move(localizer)), factor_(factor) {}

template <typename T, typename Arg0>
T Call(KernelContext*, Arg0 arg, Status*) const {
const auto t = localizer_.template ConvertTimePoint<Duration>(arg);
const int64_t orig_value = (t - floor<days>(t)).count();
return static_cast<T>(orig_value / factor_);
}

if (input.null_count != 0) {
BitmapReader bit_reader(input.buffers[0]->data(), input.offset, input.length);
Localizer localizer_;
const int64_t factor_;
};

template <>
struct CastFunctor<Time32Type, TimestampType> {
static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
const auto& in_type = checked_cast<const TimestampType&>(*batch[0].type());
const auto& out_type = checked_cast<const Time32Type&>(*out->type());
const CastOptions& options = checked_cast<const CastState&>(*ctx->state()).options;

for (int64_t i = 0; i < input.length; ++i) {
const int64_t remainder = out_data[i] % kMillisecondsInDay;
if (ARROW_PREDICT_FALSE(!options.allow_time_truncate && bit_reader.IsSet() &&
remainder > 0)) {
return Status::Invalid("Timestamp value had non-zero intraday milliseconds");
// Shifting before extraction won't work since the timestamp may not fit
// even if the time itself fits
if (in_type.unit() != out_type.unit()) {
auto conversion = util::GetTimestampConversion(in_type.unit(), out_type.unit());
if (conversion.first == util::MULTIPLY) {
return ExtractTemporal<ExtractTimeUpscaledUnchecked, Time32Type>(
ctx, batch, out, conversion.second);
} else {
if (options.allow_time_truncate) {
return ExtractTemporal<ExtractTimeDownscaledUnchecked, Time32Type>(
ctx, batch, out, conversion.second);
} else {
return ExtractTemporal<ExtractTimeDownscaled, Time32Type>(ctx, batch, out,
conversion.second);
}
out_data[i] -= remainder;
bit_reader.Next();
}
} else {
for (int64_t i = 0; i < input.length; ++i) {
const int64_t remainder = out_data[i] % kMillisecondsInDay;
if (ARROW_PREDICT_FALSE(!options.allow_time_truncate && remainder > 0)) {
return Status::Invalid("Timestamp value had non-zero intraday milliseconds");
}
return ExtractTemporal<ExtractTimeUpscaledUnchecked, Time32Type>(ctx, batch, out, 1);
}
};

template <>
struct CastFunctor<Time64Type, TimestampType> {
static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
DCHECK_EQ(batch[0].kind(), Datum::ARRAY);
const auto& in_type = checked_cast<const TimestampType&>(*batch[0].type());
const auto& out_type = checked_cast<const Time64Type&>(*out->type());
const CastOptions& options = checked_cast<const CastState&>(*ctx->state()).options;

// Shifting before extraction won't work since the timestamp may not fit
// even if the time itself fits
if (in_type.unit() != out_type.unit()) {
auto conversion = util::GetTimestampConversion(in_type.unit(), out_type.unit());
if (conversion.first == util::MULTIPLY) {
return ExtractTemporal<ExtractTimeUpscaledUnchecked, Time64Type>(
ctx, batch, out, conversion.second);
} else {
if (options.allow_time_truncate) {
return ExtractTemporal<ExtractTimeDownscaledUnchecked, Time64Type>(
ctx, batch, out, conversion.second);
} else {
return ExtractTemporal<ExtractTimeDownscaled, Time64Type>(ctx, batch, out,
conversion.second);
}
out_data[i] -= remainder;
}
}

return Status::OK();
return ExtractTemporal<ExtractTimeUpscaledUnchecked, Time64Type>(ctx, batch, out, 1);
}
};

Expand Down Expand Up @@ -389,6 +519,10 @@ std::shared_ptr<CastFunction> GetTime32Cast() {
// time32 -> time32
AddCrossUnitCast<Time32Type>(func.get());

// timestamp -> time32
AddSimpleCast<TimestampType, Time32Type>(InputType(Type::TIMESTAMP), kOutputTargetType,
func.get());

return func;
}

Expand All @@ -406,6 +540,10 @@ std::shared_ptr<CastFunction> GetTime64Cast() {
// Between durations
AddCrossUnitCast<Time64Type>(func.get());

// timestamp -> time64
AddSimpleCast<TimestampType, Time64Type>(InputType(Type::TIMESTAMP), kOutputTargetType,
func.get());

return func;
}

Expand Down
Loading