diff --git a/native/spark-expr/src/datetime_funcs/extract_date_part.rs b/native/spark-expr/src/datetime_funcs/extract_date_part.rs index acb7d2266e..20c26c781a 100644 --- a/native/spark-expr/src/datetime_funcs/extract_date_part.rs +++ b/native/spark-expr/src/datetime_funcs/extract_date_part.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use crate::utils::array_with_timezone; use arrow::compute::{date_part, DatePart}; use arrow::datatypes::{DataType, TimeUnit::Microsecond}; use datafusion::common::{internal_datafusion_err, DataFusionError}; @@ -24,6 +23,8 @@ use datafusion::logical_expr::{ }; use std::{any::Any, fmt::Debug}; +use crate::utils::array_with_timezone; + macro_rules! extract_date_part { ($struct_name:ident, $fn_name:expr, $date_part_variant:ident) => { #[derive(Debug, PartialEq, Eq, Hash)] @@ -75,14 +76,39 @@ macro_rules! extract_date_part { match args { [ColumnarValue::Array(array)] => { - let array = array_with_timezone( - array, - self.timezone.clone(), - Some(&DataType::Timestamp( - Microsecond, - Some(self.timezone.clone().into()), - )), - )?; + // First, normalize dictionary-encoded arrays (common in Parquet/Iceberg) + let array = match array.data_type() { + DataType::Dictionary(_, value_type) => { + // Cast dictionary to the underlying timestamp type + arrow::compute::cast(&array, value_type.as_ref()) + .map_err(|e| DataFusionError::Execution(e.to_string()))? + } + _ => array.clone(), + }; + + // Then handle timezone conversion based on timestamp type + let array = match array.data_type() { + // TimestampNTZ → DO NOT apply timezone conversion + DataType::Timestamp(_, None) => array, + + // Timestamp with timezone → convert from UTC to session timezone + DataType::Timestamp(_, Some(_)) => array_with_timezone( + array, + self.timezone.clone(), + Some(&DataType::Timestamp( + Microsecond, + Some(self.timezone.clone().into()), + )), + )?, + + other => { + return Err(DataFusionError::Execution(format!( + "extract_date_part expects a Timestamp input, got {:?}", + other + ))); + } + }; + let result = date_part(&array, DatePart::$date_part_variant)?; Ok(ColumnarValue::Array(result)) } diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index d36b6a3b40..1afc8b4ba8 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -177,6 +177,8 @@ object CometQuarter extends CometExpressionSerde[Quarter] with CometExprGetDateF } object CometHour extends CometExpressionSerde[Hour] { + override def getSupportLevel(expr: Hour): SupportLevel = Compatible() + override def convert( expr: Hour, inputs: Seq[Attribute], @@ -203,6 +205,8 @@ object CometHour extends CometExpressionSerde[Hour] { } object CometMinute extends CometExpressionSerde[Minute] { + override def getSupportLevel(expr: Minute): SupportLevel = Compatible() + override def convert( expr: Minute, inputs: Seq[Attribute], @@ -229,6 +233,8 @@ object CometMinute extends CometExpressionSerde[Minute] { } object CometSecond extends CometExpressionSerde[Second] { + override def getSupportLevel(expr: Second): SupportLevel = Compatible() + override def convert( expr: Second, inputs: Seq[Attribute], diff --git a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala index 1ae6926e05..d2e31dfd1d 100644 --- a/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometTemporalExpressionSuite.scala @@ -395,4 +395,30 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH // Test null handling checkSparkAnswerAndOperator("SELECT unix_date(NULL)") } + + test("hour/minute/second with TimestampNTZ in non-UTC timezone") { + // Regression test for issue #3180 + // TimestampNTZ stores local time without timezone information + // hour/minute/second should extract directly from local time without timezone conversion + val schema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true))) + + // Create test data with known TimestampNTZ values + val data = Seq( + Row(java.time.LocalDateTime.of(2024, 1, 15, 10, 30, 45)), // 10:30:45 + Row(java.time.LocalDateTime.of(2024, 6, 20, 14, 15, 20)), // 14:15:20 + Row(java.time.LocalDateTime.of(2024, 12, 31, 23, 59, 59)), // 23:59:59 + Row(null)) + + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + df.createOrReplaceTempView("timestamp_ntz_tbl") + + // Test in multiple timezones - results should be the same since TimestampNTZ has no timezone + for (timezone <- Seq("UTC", "America/Los_Angeles", "Asia/Tokyo")) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) { + // hour() should return the hour from local time directly + checkSparkAnswerAndOperator( + "SELECT ts_ntz, hour(ts_ntz), minute(ts_ntz), second(ts_ntz) FROM timestamp_ntz_tbl ORDER BY ts_ntz") + } + } + } }