Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cpp/src/arrow/compute/api_scalar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ static auto kSetLookupOptionsType = GetFunctionOptionsType<SetLookupOptions>(
static auto kStrptimeOptionsType = GetFunctionOptionsType<StrptimeOptions>(
DataMember("format", &StrptimeOptions::format),
DataMember("unit", &StrptimeOptions::unit));
static auto kStrftimeOptionsType = GetFunctionOptionsType<StrftimeOptions>(
DataMember("format", &StrftimeOptions::format));
static auto kPadOptionsType = GetFunctionOptionsType<PadOptions>(
DataMember("width", &PadOptions::width), DataMember("padding", &PadOptions::padding));
static auto kTrimOptionsType = GetFunctionOptionsType<TrimOptions>(
Expand Down Expand Up @@ -238,6 +240,14 @@ StrptimeOptions::StrptimeOptions(std::string format, TimeUnit::type unit)
StrptimeOptions::StrptimeOptions() : StrptimeOptions("", TimeUnit::SECOND) {}
constexpr char StrptimeOptions::kTypeName[];

StrftimeOptions::StrftimeOptions(std::string format, std::string locale)
: FunctionOptions(internal::kStrftimeOptionsType),
format(std::move(format)),
locale(std::move(locale)) {}
StrftimeOptions::StrftimeOptions() : StrftimeOptions(kDefaultFormat) {}
constexpr char StrftimeOptions::kTypeName[];
constexpr const char* StrftimeOptions::kDefaultFormat;

PadOptions::PadOptions(int64_t width, std::string padding)
: FunctionOptions(internal::kPadOptionsType),
width(width),
Expand Down Expand Up @@ -294,6 +304,7 @@ void RegisterScalarOptions(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunctionOptionsType(kExtractRegexOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kSetLookupOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kStrptimeOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kStrftimeOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kPadOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kTrimOptionsType));
DCHECK_OK(registry->AddFunctionOptionsType(kSliceOptionsType));
Expand Down Expand Up @@ -495,5 +506,9 @@ Result<Datum> DayOfWeek(const Datum& arg, DayOfWeekOptions options, ExecContext*
return CallFunction("day_of_week", {arg}, &options, ctx);
}

Result<Datum> Strftime(const Datum& arg, StrftimeOptions options, ExecContext* ctx) {
return CallFunction("strftime", {arg}, &options, ctx);
}

} // namespace compute
} // namespace arrow
30 changes: 30 additions & 0 deletions cpp/src/arrow/compute/api_scalar.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,21 @@ class ARROW_EXPORT StrptimeOptions : public FunctionOptions {
TimeUnit::type unit;
};

class ARROW_EXPORT StrftimeOptions : public FunctionOptions {
public:
explicit StrftimeOptions(std::string format, std::string locale = "C");
StrftimeOptions();

constexpr static char const kTypeName[] = "StrftimeOptions";

constexpr static const char* kDefaultFormat = "%Y-%m-%dT%H:%M:%SZ";

/// The desired format string.
std::string format;
/// The desired output locale string.
std::string locale;
};

class ARROW_EXPORT PadOptions : public FunctionOptions {
public:
explicit PadOptions(int64_t width, std::string padding = " ");
Expand Down Expand Up @@ -999,5 +1014,20 @@ Result<Datum> Nanosecond(const Datum& values, ExecContext* ctx = NULLPTR);
/// \note API not yet finalized
ARROW_EXPORT Result<Datum> Subsecond(const Datum& values, ExecContext* ctx = NULLPTR);

/// \brief Format timestamps according to a format string
///
/// Return formatted time strings according to the format string
/// `StrftimeOptions::format` and to the locale specifier `Strftime::locale`.
///
/// \param[in] values input timestamps
/// \param[in] options for setting format string and locale
/// \param[in] ctx the function execution context, optional
/// \return the resulting datum
///
/// \since 6.0.0
/// \note API not yet finalized
ARROW_EXPORT Result<Datum> Strftime(const Datum& values, StrftimeOptions options,
ExecContext* ctx = NULLPTR);

} // namespace compute
} // namespace arrow
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/function_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ TEST(FunctionOptions, Equality) {
options.emplace_back(new SetLookupOptions(ArrayFromJSON(boolean(), "[true, false]")));
options.emplace_back(new StrptimeOptions("%Y", TimeUnit::type::MILLI));
options.emplace_back(new StrptimeOptions("%Y", TimeUnit::type::NANO));
options.emplace_back(new StrftimeOptions("%Y-%m-%dT%H:%M:%SZ", "C"));
options.emplace_back(new PadOptions(5, " "));
options.emplace_back(new PadOptions(10, "A"));
options.emplace_back(new TrimOptions(" "));
Expand Down
182 changes: 157 additions & 25 deletions cpp/src/arrow/compute/kernels/scalar_temporal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#include <sstream>

#include "arrow/builder.h"
#include "arrow/compute/api_scalar.h"
#include "arrow/compute/kernels/common.h"
Expand Down Expand Up @@ -55,13 +57,18 @@ using internal::applicator::ScalarUnaryNotNull;
using internal::applicator::SimpleUnary;

using DayOfWeekState = OptionsWrapper<DayOfWeekOptions>;
using StrftimeState = OptionsWrapper<StrftimeOptions>;

const std::shared_ptr<DataType>& IsoCalendarType() {
static auto type = struct_({field("iso_year", int64()), field("iso_week", int64()),
field("iso_day_of_week", int64())});
return type;
}

const std::string& GetInputTimezone(const DataType& type) {
return checked_cast<const TimestampType&>(type).timezone();
}

const std::string& GetInputTimezone(const Datum& datum) {
return checked_cast<const TimestampType&>(*datum.type()).timezone();
}
Expand All @@ -74,6 +81,22 @@ const std::string& GetInputTimezone(const ArrayData& array) {
return checked_cast<const TimestampType&>(*array.type).timezone();
}

Result<const time_zone*> LocateZone(const std::string& timezone) {
try {
return locate_zone(timezone);
} catch (const std::runtime_error& ex) {
return Status::Invalid("Cannot locate timezone '", timezone, "': ", ex.what());
}
}

Result<std::locale> GetLocale(const std::string& locale) {
try {
return std::locale(locale.c_str());
} catch (const std::runtime_error& ex) {
return Status::Invalid("Cannot find locale '", locale, "': ", ex.what());
}
}

struct NonZonedLocalizer {
// No-op conversions: UTC -> UTC
template <typename Duration>
Expand Down Expand Up @@ -117,12 +140,7 @@ struct TemporalComponentExtractBase {
op};
return kernel.Exec(ctx, batch, out);
} else {
const time_zone* tz;
try {
tz = locate_zone(timezone);
} catch (const std::runtime_error& ex) {
return Status::Invalid(ex.what());
}
ARROW_ASSIGN_OR_RAISE(auto tz, LocateZone(timezone));
using ExecTemplate = Op<Duration, ZonedLocalizer>;
auto op = ExecTemplate(options, ZonedLocalizer{tz});
applicator::ScalarUnaryNotNullStateful<OutType, TimestampType, ExecTemplate> kernel{
Expand Down Expand Up @@ -444,6 +462,114 @@ struct Nanosecond {
}
};

// ----------------------------------------------------------------------
// Convert timestamps to a string representation with an arbitrary format

#ifndef _WIN32
template <typename Duration>
struct Strftime {
const StrftimeOptions& options;
const time_zone* tz;
const std::locale locale;

static Result<Strftime> Make(KernelContext* ctx, const DataType& type) {
const StrftimeOptions& options = StrftimeState::Get(ctx);

const auto& timezone = GetInputTimezone(type);
if (timezone.empty()) {
return Status::Invalid(
"Timestamps without a time zone cannot be reliably formatted.");
}
ARROW_ASSIGN_OR_RAISE(const time_zone* tz, LocateZone(timezone));

ARROW_ASSIGN_OR_RAISE(std::locale locale, GetLocale(options.locale));

return Strftime{options, tz, std::move(locale)};
}

static Status Call(KernelContext* ctx, const Scalar& in, Scalar* out) {
ARROW_ASSIGN_OR_RAISE(auto self, Make(ctx, *in.type));
TimestampFormatter formatter{self.options.format, self.tz, self.locale};

if (in.is_valid) {
const int64_t in_val = internal::UnboxScalar<const TimestampType>::Unbox(in);
ARROW_ASSIGN_OR_RAISE(auto formatted, formatter(in_val));
checked_cast<StringScalar*>(out)->value = Buffer::FromString(std::move(formatted));
} else {
out->is_valid = false;
}
return Status::OK();
}

static Status Call(KernelContext* ctx, const ArrayData& in, ArrayData* out) {
ARROW_ASSIGN_OR_RAISE(auto self, Make(ctx, *in.type));
TimestampFormatter formatter{self.options.format, self.tz, self.locale};

StringBuilder string_builder;
// Presize string data using a heuristic
{
ARROW_ASSIGN_OR_RAISE(auto formatted, formatter(42));
const auto string_size = static_cast<int64_t>(ceil(formatted.size() * 1.1));
RETURN_NOT_OK(string_builder.Reserve(in.length));
RETURN_NOT_OK(
string_builder.ReserveData((in.length - in.GetNullCount()) * string_size));
}

auto visit_null = [&]() { return string_builder.AppendNull(); };
auto visit_value = [&](int64_t arg) {
ARROW_ASSIGN_OR_RAISE(auto formatted, formatter(arg));
return string_builder.Append(std::move(formatted));
};
RETURN_NOT_OK(VisitArrayDataInline<Int64Type>(in, visit_value, visit_null));

std::shared_ptr<Array> out_array;
RETURN_NOT_OK(string_builder.Finish(&out_array));
*out = *std::move(out_array->data());

return Status::OK();
}

struct TimestampFormatter {
const char* format;
const time_zone* tz;
std::ostringstream bufstream;

explicit TimestampFormatter(const std::string& format, const time_zone* tz,
const std::locale& locale)
: format(format.c_str()), tz(tz) {
bufstream.imbue(locale);
// Propagate errors as C++ exceptions (to get an actual error message)
bufstream.exceptions(std::ios::failbit | std::ios::badbit);
}

Result<std::string> operator()(int64_t arg) {
bufstream.str("");
const auto zt = arrow_vendored::date::zoned_time<Duration>{
tz, sys_time<Duration>(Duration{arg})};
try {
arrow_vendored::date::to_stream(bufstream, format, zt);
} catch (const std::runtime_error& ex) {
bufstream.clear();
return Status::Invalid("Failed formatting timestamp: ", ex.what());
}
// XXX could return a view with std::ostringstream::view() (C++20)
return std::move(bufstream).str();
}
};
};

#else
template <typename Duration>
struct Strftime {
static Status Call(KernelContext* ctx, const Scalar& in, Scalar* out) {
return Status::NotImplemented("Strftime not yet implemented on windows.");
}
static Status Call(KernelContext* ctx, const ArrayData& in, ArrayData* out) {
return Status::NotImplemented("Strftime not yet implemented on windows.");
}
};
#endif

// ----------------------------------------------------------------------
// Extract ISO calendar values from timestamp

Expand Down Expand Up @@ -473,12 +599,8 @@ struct ISOCalendar {
if (timezone.empty()) {
iso_calendar = GetIsoCalendar<Duration>(in_val, NonZonedLocalizer{});
} else {
try {
const time_zone* tz = locate_zone(timezone);
iso_calendar = GetIsoCalendar<Duration>(in_val, ZonedLocalizer{tz});
} catch (const std::runtime_error& ex) {
return Status::Invalid(ex.what());
}
ARROW_ASSIGN_OR_RAISE(auto tz, LocateZone(timezone));
iso_calendar = GetIsoCalendar<Duration>(in_val, ZonedLocalizer{tz});
}
ScalarVector values = {std::make_shared<Int64Scalar>(iso_calendar[0]),
std::make_shared<Int64Scalar>(iso_calendar[1]),
Expand Down Expand Up @@ -518,19 +640,15 @@ struct ISOCalendar {
};
RETURN_NOT_OK(VisitArrayDataInline<Int64Type>(in, visit_value, visit_null));
} else {
try {
const time_zone* tz = locate_zone(timezone);
auto visit_value = [&](int64_t arg) {
const auto iso_calendar = GetIsoCalendar<Duration>(arg, ZonedLocalizer{tz});
field_builders[0]->UnsafeAppend(iso_calendar[0]);
field_builders[1]->UnsafeAppend(iso_calendar[1]);
field_builders[2]->UnsafeAppend(iso_calendar[2]);
return struct_builder->Append();
};
RETURN_NOT_OK(VisitArrayDataInline<Int64Type>(in, visit_value, visit_null));
} catch (const std::runtime_error& ex) {
return Status::Invalid(ex.what());
}
ARROW_ASSIGN_OR_RAISE(auto tz, LocateZone(timezone));
auto visit_value = [&](int64_t arg) {
const auto iso_calendar = GetIsoCalendar<Duration>(arg, ZonedLocalizer{tz});
field_builders[0]->UnsafeAppend(iso_calendar[0]);
field_builders[1]->UnsafeAppend(iso_calendar[1]);
field_builders[2]->UnsafeAppend(iso_calendar[2]);
return struct_builder->Append();
};
RETURN_NOT_OK(VisitArrayDataInline<Int64Type>(in, visit_value, visit_null));
}

std::shared_ptr<Array> out_array;
Expand Down Expand Up @@ -713,6 +831,15 @@ const FunctionDoc subsecond_doc{
"Returns an error if timestamp has a defined timezone. Null values return null."),
{"values"}};

const FunctionDoc strftime_doc{
"Format timestamps according to a format string",
("For each input timestamp, emit a formatted string.\n"
"The time format string and locale can be set using StrftimeOptions.\n"
"An error is returned if the timestamps don't have a defined timezone,\n"
"or if the timezone cannot be found in the timezone database."),
{"timestamps"},
"StrftimeOptions"};

} // namespace

void RegisterScalarTemporal(FunctionRegistry* registry) {
Expand Down Expand Up @@ -782,6 +909,11 @@ void RegisterScalarTemporal(FunctionRegistry* registry) {
auto subsecond = MakeTemporal<Subsecond, TemporalComponentExtract, DoubleType>(
"subsecond", float64(), &subsecond_doc);
DCHECK_OK(registry->AddFunction(std::move(subsecond)));

static auto default_strftime_options = StrftimeOptions();
auto strftime = MakeSimpleUnaryTemporal<Strftime>(
"strftime", utf8(), &strftime_doc, &default_strftime_options, StrftimeState::Init);
DCHECK_OK(registry->AddFunction(std::move(strftime)));
}

} // namespace internal
Expand Down
Loading