Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 103 additions & 1 deletion cpp/src/arrow/compute/kernels/scalar_cast_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arrow/array/builder_binary.h"
#include "arrow/compute/kernels/common.h"
#include "arrow/compute/kernels/scalar_cast_internal.h"
#include "arrow/compute/kernels/temporal_internal.h"
#include "arrow/result.h"
#include "arrow/util/formatting.h"
#include "arrow/util/int_util.h"
Expand Down Expand Up @@ -105,6 +106,107 @@ struct TemporalToStringCastFunctor {
}
};

template <typename O>
struct TemporalToStringCastFunctor<O, TimestampType> {
using value_type = typename TypeTraits<TimestampType>::CType;
using BuilderType = typename TypeTraits<O>::BuilderType;
using FormatterType = StringFormatter<TimestampType>;

static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
DCHECK(out->is_array());
const ArrayData& input = *batch[0].array();
ArrayData* output = out->mutable_array();
return Convert(ctx, input, output);
}

static Status Convert(KernelContext* ctx, const ArrayData& input, ArrayData* output) {
const auto& timezone = GetInputTimezone(*input.type);
const auto& ty = checked_cast<const TimestampType&>(*input.type);
BuilderType builder(input.type, ctx->memory_pool());

// Preallocate
int64_t string_length = 19; // YYYY-MM-DD HH:MM:SS
if (ty.unit() == TimeUnit::MILLI) {
string_length += 4; // .SSS
} else if (ty.unit() == TimeUnit::MICRO) {
string_length += 7; // .SSSSSS
} else if (ty.unit() == TimeUnit::NANO) {
string_length += 10; // .SSSSSSSSS
}
if (!timezone.empty()) string_length += 5; // +0000
RETURN_NOT_OK(builder.Reserve(input.length));
RETURN_NOT_OK(
builder.ReserveData((input.length - input.GetNullCount()) * string_length));

if (timezone.empty()) {
FormatterType formatter(input.type);
RETURN_NOT_OK(VisitArrayDataInline<TimestampType>(
input,
[&](value_type v) {
return formatter(v, [&](util::string_view v) { return builder.Append(v); });
},
[&]() {
builder.UnsafeAppendNull();
return Status::OK();
}));
} else {
#ifdef _WIN32
// TODO(ARROW-13168):
return Status::NotImplemented(
"Casting a timestamp with time zone to string is not yet supported on "
"Windows.");
#else
switch (ty.unit()) {
case TimeUnit::SECOND:
RETURN_NOT_OK(ConvertZoned<std::chrono::seconds>(input, timezone, &builder));
break;
case TimeUnit::MILLI:
RETURN_NOT_OK(
ConvertZoned<std::chrono::milliseconds>(input, timezone, &builder));
break;
case TimeUnit::MICRO:
RETURN_NOT_OK(
ConvertZoned<std::chrono::microseconds>(input, timezone, &builder));
break;
case TimeUnit::NANO:
RETURN_NOT_OK(
ConvertZoned<std::chrono::nanoseconds>(input, timezone, &builder));
break;
default:
DCHECK(false);
return Status::NotImplemented("Unimplemented time unit");
}
#endif
}
std::shared_ptr<Array> output_array;
RETURN_NOT_OK(builder.Finish(&output_array));
*output = std::move(*output_array->data());
return Status::OK();
}

template <typename Duration>
static Status ConvertZoned(const ArrayData& input, const std::string& timezone,
BuilderType* builder) {
static const std::string kFormatString = "%Y-%m-%d %H:%M:%S%z";
static const std::string kUtcFormatString = "%Y-%m-%d %H:%M:%SZ";
DCHECK(!timezone.empty());
ARROW_ASSIGN_OR_RAISE(const time_zone* tz, LocateZone(timezone));
ARROW_ASSIGN_OR_RAISE(std::locale locale, GetLocale("C"));
TimestampFormatter<Duration> formatter{
timezone == "UTC" ? kUtcFormatString : kFormatString, tz, locale};
return VisitArrayDataInline<TimestampType>(
input,
[&](value_type v) {
ARROW_ASSIGN_OR_RAISE(auto formatted, formatter(v));
return builder->Append(std::move(formatted));
},
[&]() {
builder->UnsafeAppendNull();
return Status::OK();
});
}
};

// ----------------------------------------------------------------------
// Binary-like to binary-like
//
Expand Down Expand Up @@ -304,7 +406,7 @@ void AddTemporalToStringCasts(CastFunction* func) {
auto out_ty = TypeTraits<OutType>::type_singleton();
for (const std::shared_ptr<DataType>& in_ty : TemporalTypes()) {
DCHECK_OK(func->AddKernel(
in_ty->id(), {in_ty}, out_ty,
in_ty->id(), {InputType(in_ty->id())}, out_ty,
TrivialScalarUnaryAsArraysExec(
GenerateTemporal<TemporalToStringCastFunctor, OutType>(*in_ty)),
NullHandling::COMPUTED_NO_PREALLOCATE));
Expand Down
69 changes: 69 additions & 0 deletions cpp/src/arrow/compute/kernels/scalar_cast_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1553,8 +1553,77 @@ TEST(Cast, TimestampToString) {
CheckCast(
ArrayFromJSON(timestamp(TimeUnit::SECOND), "[-30610224000, -5364662400]"),
ArrayFromJSON(string_type, R"(["1000-01-01 00:00:00", "1800-01-01 00:00:00"])"));

CheckCast(
ArrayFromJSON(timestamp(TimeUnit::MILLI), "[-30610224000000, -5364662400000]"),
ArrayFromJSON(string_type,
R"(["1000-01-01 00:00:00.000", "1800-01-01 00:00:00.000"])"));

CheckCast(
ArrayFromJSON(timestamp(TimeUnit::MICRO),
"[-30610224000000000, -5364662400000000]"),
ArrayFromJSON(string_type,
R"(["1000-01-01 00:00:00.000000", "1800-01-01 00:00:00.000000"])"));

CheckCast(
ArrayFromJSON(timestamp(TimeUnit::NANO),
"[-596933876543210988, 349837323456789012]"),
ArrayFromJSON(
string_type,
R"(["1951-02-01 01:02:03.456789012", "1981-02-01 01:02:03.456789012"])"));
}
}

#ifndef _WIN32
TEST(Cast, TimestampWithZoneToString) {
for (auto string_type : {utf8(), large_utf8()}) {
CheckCast(
ArrayFromJSON(timestamp(TimeUnit::SECOND, "UTC"), "[-30610224000, -5364662400]"),
ArrayFromJSON(string_type,
R"(["1000-01-01 00:00:00Z", "1800-01-01 00:00:00Z"])"));

CheckCast(
ArrayFromJSON(timestamp(TimeUnit::SECOND, "America/Phoenix"),
"[-34226955, 1456767743]"),
ArrayFromJSON(string_type,
R"(["1968-11-30 13:30:45-0700", "2016-02-29 10:42:23-0700"])"));

CheckCast(ArrayFromJSON(timestamp(TimeUnit::MILLI, "America/Phoenix"),
"[-34226955877, 1456767743456]"),
ArrayFromJSON(
string_type,
R"(["1968-11-30 13:30:44.123-0700", "2016-02-29 10:42:23.456-0700"])"));

CheckCast(
ArrayFromJSON(timestamp(TimeUnit::MICRO, "America/Phoenix"),
"[-34226955877000, 1456767743456789]"),
ArrayFromJSON(
string_type,
R"(["1968-11-30 13:30:44.123000-0700", "2016-02-29 10:42:23.456789-0700"])"));

CheckCast(
ArrayFromJSON(timestamp(TimeUnit::NANO, "America/Phoenix"),
"[-34226955876543211, 1456767743456789246]"),
ArrayFromJSON(
string_type,
R"(["1968-11-30 13:30:44.123456789-0700", "2016-02-29 10:42:23.456789246-0700"])"));
}
}
#else
// TODO(ARROW-13168): we lack tzdb on Windows
TEST(Cast, TimestampWithZoneToString) {
for (auto string_type : {utf8(), large_utf8()}) {
ASSERT_RAISES(NotImplemented, Cast(ArrayFromJSON(timestamp(TimeUnit::SECOND, "UTC"),
"[-34226955, 1456767743]"),
CastOptions::Safe(string_type)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UTC could probably be made to work on Windows, can you perhaps create a followup JIRA for that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed ARROW-14643.


ASSERT_RAISES(NotImplemented,
Cast(ArrayFromJSON(timestamp(TimeUnit::SECOND, "America/Phoenix"),
"[-34226955, 1456767743]"),
CastOptions::Safe(string_type)));
}
}
#endif

TEST(Cast, DateToDate) {
auto day_32 = ArrayFromJSON(date32(), "[0, null, 100, 1, 10]");
Expand Down
39 changes: 6 additions & 33 deletions cpp/src/arrow/compute/kernels/scalar_temporal_unary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ using arrow_vendored::date::local_time;
using arrow_vendored::date::locate_zone;
using arrow_vendored::date::sys_days;
using arrow_vendored::date::sys_time;
using arrow_vendored::date::time_zone;
using arrow_vendored::date::trunc;
using arrow_vendored::date::weekday;
using arrow_vendored::date::weeks;
Expand Down Expand Up @@ -479,7 +478,7 @@ struct Strftime {
if ((options.format.find("%c") != std::string::npos) && (options.locale != "C")) {
return Status::Invalid("%c flag is not supported in non-C locales.");
}
auto timezone = GetInputTimezone(type);
const auto& timezone = GetInputTimezone(type);

if (timezone.empty()) {
if ((options.format.find("%z") != std::string::npos) ||
Expand All @@ -488,10 +487,10 @@ struct Strftime {
"Timezone not present, cannot convert to string with timezone: ",
options.format);
}
timezone = "UTC";
}

ARROW_ASSIGN_OR_RAISE(const time_zone* tz, LocateZone(timezone));
ARROW_ASSIGN_OR_RAISE(const time_zone* tz,
LocateZone(timezone.empty() ? "UTC" : timezone));

ARROW_ASSIGN_OR_RAISE(std::locale locale, GetLocale(options.locale));

Expand All @@ -500,7 +499,7 @@ struct Strftime {

static Status Call(KernelContext* ctx, const Scalar& in, Scalar* out) {
ARROW_ASSIGN_OR_RAISE(auto self, Make(ctx, *in.type));
TimestampFormatter formatter{self.options.format, self.tz, self.locale};
TimestampFormatter<Duration> formatter{self.options.format, self.tz, self.locale};

if (in.is_valid) {
const int64_t in_val = internal::UnboxScalar<const InType>::Unbox(in);
Expand All @@ -514,7 +513,7 @@ struct Strftime {

static Status Call(KernelContext* ctx, const ArrayData& in, ArrayData* out) {
ARROW_ASSIGN_OR_RAISE(auto self, Make(ctx, *in.type));
TimestampFormatter formatter{self.options.format, self.tz, self.locale};
TimestampFormatter<Duration> formatter{self.options.format, self.tz, self.locale};

StringBuilder string_builder;
// Presize string data using a heuristic
Expand All @@ -539,35 +538,9 @@ struct Strftime {

return Status::OK();
}

struct TimestampFormatter {
const char* format;
const time_zone* tz;
std::ostringstream bufstream;

explicit TimestampFormatter(const std::string& format, const time_zone* tz,
const std::locale& locale)
: format(format.c_str()), tz(tz) {
bufstream.imbue(locale);
// Propagate errors as C++ exceptions (to get an actual error message)
bufstream.exceptions(std::ios::failbit | std::ios::badbit);
}

Result<std::string> operator()(int64_t arg) {
bufstream.str("");
const auto zt = zoned_time<Duration>{tz, sys_time<Duration>(Duration{arg})};
try {
arrow_vendored::date::to_stream(bufstream, format, zt);
} catch (const std::runtime_error& ex) {
bufstream.clear();
return Status::Invalid("Failed formatting timestamp: ", ex.what());
}
// XXX could return a view with std::ostringstream::view() (C++20)
return std::move(bufstream).str();
}
};
};
#else
// TODO(ARROW-13168)
template <typename Duration, typename InType>
struct Strftime {
static Status Call(KernelContext* ctx, const Scalar& in, Scalar* out) {
Expand Down
39 changes: 38 additions & 1 deletion cpp/src/arrow/compute/kernels/temporal_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ using arrow_vendored::date::sys_days;
using arrow_vendored::date::sys_time;
using arrow_vendored::date::time_zone;
using arrow_vendored::date::year_month_day;
using arrow_vendored::date::zoned_time;

inline int64_t GetQuarter(const year_month_day& ymd) {
return static_cast<int64_t>((static_cast<uint32_t>(ymd.month()) - 1) / 3);
Expand Down Expand Up @@ -72,7 +73,7 @@ static inline const std::string& GetInputTimezone(const ArrayData& array) {
return checked_cast<const TimestampType&>(*array.type).timezone();
}

inline Status ValidateDayOfWeekOptions(const DayOfWeekOptions& options) {
static inline Status ValidateDayOfWeekOptions(const DayOfWeekOptions& options) {
if (options.week_start < 1 || 7 < options.week_start) {
return Status::Invalid(
"week_start must follow ISO convention (Monday=1, Sunday=7). Got week_start=",
Expand All @@ -81,6 +82,14 @@ inline Status ValidateDayOfWeekOptions(const DayOfWeekOptions& options) {
return Status::OK();
}

static inline Result<std::locale> GetLocale(const std::string& locale) {
try {
return std::locale(locale.c_str());
} catch (const std::runtime_error& ex) {
return Status::Invalid("Cannot find locale '", locale, "': ", ex.what());
}
}

struct NonZonedLocalizer {
using days_t = sys_days;

Expand All @@ -107,6 +116,34 @@ struct ZonedLocalizer {
local_days ConvertDays(sys_days d) const { return local_days(year_month_day(d)); }
};

template <typename Duration>
struct TimestampFormatter {
const char* format;
const time_zone* tz;
std::ostringstream bufstream;

explicit TimestampFormatter(const std::string& format, const time_zone* tz,
const std::locale& locale)
: format(format.c_str()), tz(tz) {
bufstream.imbue(locale);
// Propagate errors as C++ exceptions (to get an actual error message)
bufstream.exceptions(std::ios::failbit | std::ios::badbit);
}

Result<std::string> operator()(int64_t arg) {
bufstream.str("");
const auto zt = zoned_time<Duration>{tz, sys_time<Duration>(Duration{arg})};
try {
arrow_vendored::date::to_stream(bufstream, format, zt);
} catch (const std::runtime_error& ex) {
bufstream.clear();
return Status::Invalid("Failed formatting timestamp: ", ex.what());
}
// XXX could return a view with std::ostringstream::view() (C++20)
return std::move(bufstream).str();
}
};

//
// Which types to generate a kernel for
//
Expand Down
Loading