diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index c31b37b63b219..2123281217ba2 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -1296,7 +1296,7 @@ impl ScalarValue { } macro_rules! build_array_primitive_tz { - ($ARRAY_TY:ident, $SCALAR_TY:ident) => {{ + ($ARRAY_TY:ident, $SCALAR_TY:ident, $TZ:expr) => {{ { let array = scalars.map(|sv| { if let ScalarValue::$SCALAR_TY(v, _) = sv { @@ -1310,7 +1310,7 @@ impl ScalarValue { } }) .collect::>()?; - Arc::new(array) + Arc::new(array.with_timezone_opt($TZ.clone())) } }}; } @@ -1444,17 +1444,29 @@ impl ScalarValue { DataType::Time64(TimeUnit::Nanosecond) => { build_array_primitive!(Time64NanosecondArray, Time64Nanosecond) } - DataType::Timestamp(TimeUnit::Second, _) => { - build_array_primitive_tz!(TimestampSecondArray, TimestampSecond) + DataType::Timestamp(TimeUnit::Second, tz) => { + build_array_primitive_tz!(TimestampSecondArray, TimestampSecond, tz) } - DataType::Timestamp(TimeUnit::Millisecond, _) => { - build_array_primitive_tz!(TimestampMillisecondArray, TimestampMillisecond) + DataType::Timestamp(TimeUnit::Millisecond, tz) => { + build_array_primitive_tz!( + TimestampMillisecondArray, + TimestampMillisecond, + tz + ) } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - build_array_primitive_tz!(TimestampMicrosecondArray, TimestampMicrosecond) + DataType::Timestamp(TimeUnit::Microsecond, tz) => { + build_array_primitive_tz!( + TimestampMicrosecondArray, + TimestampMicrosecond, + tz + ) } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - build_array_primitive_tz!(TimestampNanosecondArray, TimestampNanosecond) + DataType::Timestamp(TimeUnit::Nanosecond, tz) => { + build_array_primitive_tz!( + TimestampNanosecondArray, + TimestampNanosecond, + tz + ) } DataType::Interval(IntervalUnit::DayTime) => { build_array_primitive!(IntervalDayTimeArray, IntervalDayTime) diff --git a/datafusion/core/tests/parquet/data/timestamp_with_tz.parquet b/datafusion/core/tests/parquet/data/timestamp_with_tz.parquet new file mode 100644 index 0000000000000..075f846a60cc3 Binary files /dev/null and b/datafusion/core/tests/parquet/data/timestamp_with_tz.parquet differ diff --git a/datafusion/core/tests/sql/parquet.rs b/datafusion/core/tests/sql/parquet.rs index 31cd0da21b936..b185289927bda 100644 --- a/datafusion/core/tests/sql/parquet.rs +++ b/datafusion/core/tests/sql/parquet.rs @@ -165,6 +165,39 @@ async fn fixed_size_binary_columns() { } } +#[tokio::test] +async fn window_fn_timestamp_tz() { + let ctx = SessionContext::new(); + ctx.register_parquet( + "t0", + "tests/parquet/data/timestamp_with_tz.parquet", + ParquetReadOptions::default(), + ) + .await + .unwrap(); + + let sql = "SELECT count, LAG(timestamp, 1) OVER (ORDER BY timestamp) FROM t0"; + let dataframe = ctx.sql(sql).await.unwrap(); + let results = dataframe.collect().await.unwrap(); + + let mut num_rows = 0; + for batch in results { + num_rows += batch.num_rows(); + assert_eq!(2, batch.num_columns()); + + let ty = batch.column(0).data_type().clone(); + assert_eq!(DataType::Int64, ty); + + let ty = batch.column(1).data_type().clone(); + assert_eq!( + DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".to_owned())), + ty + ); + } + + assert_eq!(131072, num_rows); +} + #[tokio::test] async fn parquet_single_nan_schema() { let ctx = SessionContext::new();