Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions native/spark-expr/src/datetime_funcs/extract_date_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use crate::utils::array_with_timezone;
use arrow::compute::{date_part, DatePart};
use arrow::datatypes::{DataType, TimeUnit::Microsecond};
use datafusion::common::{internal_datafusion_err, DataFusionError};
Expand All @@ -24,6 +23,8 @@ use datafusion::logical_expr::{
};
use std::{any::Any, fmt::Debug};

use crate::utils::array_with_timezone;

macro_rules! extract_date_part {
($struct_name:ident, $fn_name:expr, $date_part_variant:ident) => {
#[derive(Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -75,14 +76,39 @@ macro_rules! extract_date_part {

match args {
[ColumnarValue::Array(array)] => {
let array = array_with_timezone(
array,
self.timezone.clone(),
Some(&DataType::Timestamp(
Microsecond,
Some(self.timezone.clone().into()),
)),
)?;
// First, normalize dictionary-encoded arrays (common in Parquet/Iceberg)
let array = match array.data_type() {
DataType::Dictionary(_, value_type) => {
// Cast dictionary to the underlying timestamp type
arrow::compute::cast(&array, value_type.as_ref())
.map_err(|e| DataFusionError::Execution(e.to_string()))?
}
_ => array.clone(),
};

// Then handle timezone conversion based on timestamp type
let array = match array.data_type() {
// TimestampNTZ → DO NOT apply timezone conversion
DataType::Timestamp(_, None) => array,

// Timestamp with timezone → convert from UTC to session timezone
DataType::Timestamp(_, Some(_)) => array_with_timezone(
array,
Comment thread
andygrove marked this conversation as resolved.
self.timezone.clone(),
Some(&DataType::Timestamp(
Microsecond,
Some(self.timezone.clone().into()),
)),
)?,

other => {
return Err(DataFusionError::Execution(format!(
"extract_date_part expects a Timestamp input, got {:?}",
other
)));
}
};

let result = date_part(&array, DatePart::$date_part_variant)?;
Ok(ColumnarValue::Array(result))
}
Expand Down
6 changes: 6 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/datetime.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ object CometQuarter extends CometExpressionSerde[Quarter] with CometExprGetDateF
}

object CometHour extends CometExpressionSerde[Hour] {
override def getSupportLevel(expr: Hour): SupportLevel = Compatible()

override def convert(
expr: Hour,
inputs: Seq[Attribute],
Expand All @@ -203,6 +205,8 @@ object CometHour extends CometExpressionSerde[Hour] {
}

object CometMinute extends CometExpressionSerde[Minute] {
override def getSupportLevel(expr: Minute): SupportLevel = Compatible()

override def convert(
expr: Minute,
inputs: Seq[Attribute],
Expand All @@ -229,6 +233,8 @@ object CometMinute extends CometExpressionSerde[Minute] {
}

object CometSecond extends CometExpressionSerde[Second] {
override def getSupportLevel(expr: Second): SupportLevel = Compatible()

override def convert(
expr: Second,
inputs: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,4 +395,30 @@ class CometTemporalExpressionSuite extends CometTestBase with AdaptiveSparkPlanH
// Test null handling
checkSparkAnswerAndOperator("SELECT unix_date(NULL)")
}

test("hour/minute/second with TimestampNTZ in non-UTC timezone") {
// Regression test for issue #3180
// TimestampNTZ stores local time without timezone information
// hour/minute/second should extract directly from local time without timezone conversion
val schema = StructType(Seq(StructField("ts_ntz", DataTypes.TimestampNTZType, true)))

// Create test data with known TimestampNTZ values
val data = Seq(
Row(java.time.LocalDateTime.of(2024, 1, 15, 10, 30, 45)), // 10:30:45
Row(java.time.LocalDateTime.of(2024, 6, 20, 14, 15, 20)), // 14:15:20
Row(java.time.LocalDateTime.of(2024, 12, 31, 23, 59, 59)), // 23:59:59
Row(null))

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.createOrReplaceTempView("timestamp_ntz_tbl")

// Test in multiple timezones - results should be the same since TimestampNTZ has no timezone
for (timezone <- Seq("UTC", "America/Los_Angeles", "Asia/Tokyo")) {
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timezone) {
// hour() should return the hour from local time directly
checkSparkAnswerAndOperator(
"SELECT ts_ntz, hour(ts_ntz), minute(ts_ntz), second(ts_ntz) FROM timestamp_ntz_tbl ORDER BY ts_ntz")
}
}
}
}