Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 233 additions & 3 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! DateTime expressions

use std::ops::{Add, Sub};
use std::str::FromStr;
use std::sync::Arc;

Expand All @@ -43,7 +44,7 @@ use arrow_array::types::{ArrowTimestampType, Date32Type, Int32Type};
use arrow_array::GenericStringArray;
use chrono::prelude::*;
use chrono::LocalResult::Single;
use chrono::{Duration, Months, NaiveDate};
use chrono::{Duration, LocalResult, Months, NaiveDate};
use itertools::Either;

use datafusion_common::cast::{
Expand Down Expand Up @@ -662,8 +663,42 @@ fn _date_trunc_coarse_with_tz(
granularity: &str,
value: Option<DateTime<Tz>>,
) -> Result<Option<i64>> {
let value = _date_trunc_coarse::<DateTime<Tz>>(granularity, value)?;
Ok(value.and_then(|value| value.timestamp_nanos_opt()))
if let Some(value) = value {
let local = value.naive_local();
let truncated = _date_trunc_coarse::<NaiveDateTime>(granularity, Some(local))?;
let truncated = truncated.and_then(|truncated| {
match truncated.and_local_timezone(value.timezone()) {
LocalResult::None => {
// This can happen if the date_trunc operation moves the time into
// an hour that doesn't exist due to daylight savings. On known example where
// this can happen is with historic dates in the America/Sao_Paulo time zone.
// To account for this adjust the time by a few hours, convert to local time,
// and then adjust the time back.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timezones continue to blow my mind

truncated
.sub(Duration::hours(3))
.and_local_timezone(value.timezone())
.single()
.map(|v| v.add(Duration::hours(3)))
}
LocalResult::Single(datetime) => Some(datetime),
LocalResult::Ambiguous(datetime1, datetime2) => {
// Because we are truncating from an equally or more specific time
// the original time must have been within the ambiguous local time
// period. Therefore the offset of one of these times should match the
// offset of the original time.
if datetime1.offset().fix() == value.offset().fix() {
Some(datetime1)
} else {
Some(datetime2)
}
}
}
});
Ok(truncated.and_then(|value| value.timestamp_nanos_opt()))
} else {
_date_trunc_coarse::<NaiveDateTime>(granularity, None)?;
Ok(None)
}
}

fn _date_trunc_coarse_without_tz(
Expand Down Expand Up @@ -1784,6 +1819,44 @@ mod tests {
"2020-09-08T00:00:00+08",
],
),
(
vec![
"2024-10-26T23:00:00Z",
"2024-10-27T00:00:00Z",
"2024-10-27T01:00:00Z",
"2024-10-27T02:00:00Z",
],
Some("Europe/Berlin".into()),
vec![
"2024-10-27T00:00:00+02",
"2024-10-27T00:00:00+02",
"2024-10-27T00:00:00+02",
"2024-10-27T00:00:00+02",
],
),
(
vec![
"2018-02-18T00:00:00Z",
"2018-02-18T01:00:00Z",
"2018-02-18T02:00:00Z",
"2018-02-18T03:00:00Z",
"2018-11-04T01:00:00Z",
"2018-11-04T02:00:00Z",
"2018-11-04T03:00:00Z",
"2018-11-04T04:00:00Z",
],
Some("America/Sao_Paulo".into()),
vec![
"2018-02-17T00:00:00-02",
"2018-02-17T00:00:00-02",
"2018-02-17T00:00:00-02",
"2018-02-18T00:00:00-03",
"2018-11-03T00:00:00-03",
"2018-11-03T00:00:00-03",
"2018-11-04T01:00:00-02",
"2018-11-04T01:00:00-02",
],
),
];

cases.iter().for_each(|(original, tz_opt, expected)| {
Expand Down Expand Up @@ -1815,6 +1888,163 @@ mod tests {
});
}

#[test]
fn test_date_trunc_hour_timezones() {
let cases = vec![
(
vec![
"2020-09-08T00:30:00Z",
"2020-09-08T01:30:00Z",
"2020-09-08T02:30:00Z",
"2020-09-08T03:30:00Z",
"2020-09-08T04:30:00Z",
],
Some("+00".into()),
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
),
(
vec![
"2020-09-08T00:30:00Z",
"2020-09-08T01:30:00Z",
"2020-09-08T02:30:00Z",
"2020-09-08T03:30:00Z",
"2020-09-08T04:30:00Z",
],
None,
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
),
(
vec![
"2020-09-08T00:30:00Z",
"2020-09-08T01:30:00Z",
"2020-09-08T02:30:00Z",
"2020-09-08T03:30:00Z",
"2020-09-08T04:30:00Z",
],
Some("-02".into()),
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T01:00:00Z",
"2020-09-08T02:00:00Z",
"2020-09-08T03:00:00Z",
"2020-09-08T04:00:00Z",
],
),
(
vec![
"2020-09-08T00:30:00+05",
"2020-09-08T01:30:00+05",
"2020-09-08T02:30:00+05",
"2020-09-08T03:30:00+05",
"2020-09-08T04:30:00+05",
],
Some("+05".into()),
vec![
"2020-09-08T00:00:00+05",
"2020-09-08T01:00:00+05",
"2020-09-08T02:00:00+05",
"2020-09-08T03:00:00+05",
"2020-09-08T04:00:00+05",
],
),
(
vec![
"2020-09-08T00:30:00+08",
"2020-09-08T01:30:00+08",
"2020-09-08T02:30:00+08",
"2020-09-08T03:30:00+08",
"2020-09-08T04:30:00+08",
],
Some("+08".into()),
vec![
"2020-09-08T00:00:00+08",
"2020-09-08T01:00:00+08",
"2020-09-08T02:00:00+08",
"2020-09-08T03:00:00+08",
"2020-09-08T04:00:00+08",
],
),
(
vec![
"2024-10-26T23:30:00Z",
"2024-10-27T00:30:00Z",
"2024-10-27T01:30:00Z",
"2024-10-27T02:30:00Z",
],
Some("Europe/Berlin".into()),
vec![
"2024-10-27T01:00:00+02",
"2024-10-27T02:00:00+02",
"2024-10-27T02:00:00+01",
"2024-10-27T03:00:00+01",
],
),
(
vec![
"2018-02-18T00:30:00Z",
"2018-02-18T01:30:00Z",
"2018-02-18T02:30:00Z",
"2018-02-18T03:30:00Z",
"2018-11-04T01:00:00Z",
"2018-11-04T02:00:00Z",
"2018-11-04T03:00:00Z",
"2018-11-04T04:00:00Z",
],
Some("America/Sao_Paulo".into()),
vec![
"2018-02-17T22:00:00-02",
"2018-02-17T23:00:00-02",
"2018-02-17T23:00:00-03",
"2018-02-18T00:00:00-03",
"2018-11-03T22:00:00-03",
"2018-11-03T23:00:00-03",
"2018-11-04T01:00:00-02",
"2018-11-04T02:00:00-02",
],
),
];

cases.iter().for_each(|(original, tz_opt, expected)| {
let input = original
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let right = expected
.iter()
.map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let result = date_trunc(&[
ColumnarValue::Scalar(ScalarValue::from("hour")),
ColumnarValue::Array(Arc::new(input)),
])
.unwrap();
if let ColumnarValue::Array(result) = result {
assert_eq!(
result.data_type(),
&DataType::Timestamp(TimeUnit::Nanosecond, tz_opt.clone())
);
let left = as_primitive_array::<TimestampNanosecondType>(&result);
assert_eq!(left, &right);
} else {
panic!("unexpected column type");
}
});
}

#[test]
fn test_date_bin_single() {
use chrono::Duration;
Expand Down
Loading