Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 232 additions & 14 deletions native/spark-expr/src/conversion_funcs/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::datatypes::{
i256, is_validate_decimal_precision, DataType, Date32Type, Decimal256Type, Float32Type,
Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, TimestampMicrosecondType,
};
use chrono::{DateTime, LocalResult, NaiveDate, NaiveTime, Offset, TimeZone, Timelike};
use chrono::{DateTime, LocalResult, NaiveDate, NaiveTime, Offset, TimeZone, Timelike, Utc};
use num::traits::CheckedNeg;
use num::{CheckedSub, Integer};
use regex::Regex;
Expand All @@ -35,10 +35,14 @@ use std::sync::{Arc, LazyLock};

macro_rules! cast_utf8_to_timestamp {
// $tz is a Timezone:Tz object and contains the session timezone.
// $to_tz_str is a string containing the to_type timezone
($array:expr, $eval_mode:expr, $array_type:ty, $cast_method:ident, $tz:expr, $to_tz_str:expr, $is_spark4_plus:expr) => {{
// $with_tz is an Option<&str> containing the to_type timezone (None for NTZ)
($array:expr, $eval_mode:expr, $array_type:ty, $cast_method:ident, $tz:expr, $with_tz:expr, $is_spark4_plus:expr) => {{
let len = $array.len();
let mut cast_array = PrimitiveArray::<$array_type>::builder(len).with_timezone($to_tz_str);
let mut cast_array = if let Some(tz_str) = $with_tz {
PrimitiveArray::<$array_type>::builder(len).with_timezone(tz_str)
} else {
PrimitiveArray::<$array_type>::builder(len)
};
let mut cast_err: Option<SparkError> = None;
for i in 0..len {
if $array.is_null(i) {
Expand Down Expand Up @@ -700,15 +704,28 @@ pub(crate) fn cast_string_to_timestamp(
.map_err(|_| SparkError::Internal(format!("Invalid timezone string: {timezone_str}")))?;

let cast_array: ArrayRef = match to_type {
DataType::Timestamp(_, tz_opt) => {
let to_tz = tz_opt.as_deref().unwrap_or("UTC");
DataType::Timestamp(_, Some(tz_opt)) => {
let to_tz = tz_opt.as_ref();
cast_utf8_to_timestamp!(
string_array,
eval_mode,
TimestampMicrosecondType,
timestamp_parser,
tz,
to_tz,
Some(to_tz),
is_spark4_plus
)?
}
DataType::Timestamp(_, None) => {
// TimestampNTZ: use a dedicated parser that strips timezone offsets
// before parsing, matching Spark's behavior of ignoring offsets for NTZ.
cast_utf8_to_timestamp!(
string_array,
eval_mode,
TimestampMicrosecondType,
timestamp_ntz_parser,
&Utc,
None::<&str>,
is_spark4_plus
)?
}
Expand Down Expand Up @@ -1236,6 +1253,30 @@ fn parse_str_to_microsecond_timestamp<T: TimeZone>(
get_timestamp_values(value, "microsecond", tz)
}

/// Regex to match a timezone offset suffix in a timestamp string.
/// Matches: Z, +HH:MM, -HH:MM, +HHMM, -HHMM, +HH, -HH
fn strip_timezone_offset(value: &str) -> &str {
// This regex is anchored to the end of the string and matches common timezone offset formats.
// We use a lazy_static-style approach via Regex::new (consistent with the rest of this file).
let tz_suffix = Regex::new(r"([+-]\d{2}(:\d{2})?|Z)$").unwrap();
match tz_suffix.find(value) {
Some(m) => &value[..m.start()],
None => value,
}
}

/// Strips timezone offset before parsing, matching Spark's TimestampNTZ behavior.
/// Separate from `timestamp_parser` to preserve offsets for Timestamp WITH TZ.
fn timestamp_ntz_parser<T: TimeZone>(
value: &str,
eval_mode: EvalMode,
tz: &T,
is_spark4_plus: bool,
) -> SparkResult<Option<i64>> {
let stripped = strip_timezone_offset(value.trim());
timestamp_parser(stripped, eval_mode, tz, is_spark4_plus)
}

fn timestamp_parser<T: TimeZone>(
value: &str,
eval_mode: EvalMode,
Expand Down Expand Up @@ -1769,7 +1810,7 @@ mod tests {
TimestampMicrosecondType,
timestamp_parser,
tz,
"UTC",
Some("UTC"),
true
)
.unwrap();
Expand All @@ -1781,14 +1822,54 @@ mod tests {
assert_eq!(result.len(), 4);
}

#[test]
#[cfg_attr(miri, ignore)]
fn test_cast_string_to_timestamp_ntz() {
let array: ArrayRef = Arc::new(StringArray::from(vec![
Some("2020-01-01T12:34:56.123456"),
Some("not_a_timestamp"),
Some("2020-01-01"),
]));
let string_array = array
.as_any()
.downcast_ref::<GenericStringArray<i32>>()
.expect("Expected a string array");

let eval_mode = EvalMode::Legacy;
let result = cast_utf8_to_timestamp!(
&string_array,
eval_mode,
TimestampMicrosecondType,
timestamp_ntz_parser,
&Utc,
None::<&str>,
true
)
.unwrap();

assert_eq!(
result.data_type(),
&DataType::Timestamp(TimeUnit::Microsecond, None)
);
assert_eq!(result.len(), 3);

let ts_array = result
.as_any()
.downcast_ref::<PrimitiveArray<TimestampMicrosecondType>>()
.expect("Expected a timestamp array");

assert_eq!(ts_array.value(0), 1577882096123456);
assert!(ts_array.is_null(1));
assert_eq!(ts_array.value(2), 1577836800000000);
}

#[test]
fn test_cast_string_to_timestamp_ansi_error() {
// In ANSI mode, an invalid timestamp string must produce an error rather than null.
let array: ArrayRef = Arc::new(StringArray::from(vec![
Some("2020-01-01T12:34:56.123456"),
Some("not_a_timestamp"),
]));
let tz = &timezone::Tz::from_str("UTC").unwrap();
let string_array = array
.as_any()
.downcast_ref::<GenericStringArray<i32>>()
Expand All @@ -1800,14 +1881,151 @@ mod tests {
eval_mode,
TimestampMicrosecondType,
timestamp_parser,
tz,
"UTC",
&Utc,
None::<&str>,
true
);

assert!(result.is_err());
if let Err(SparkError::InvalidInputInCastToDatetime { value, from_type, to_type }) = result {
assert_eq!(value, "not_a_timestamp");
assert_eq!(from_type, "STRING");
assert_eq!(to_type, "TIMESTAMP");
} else {
panic!("Expected InvalidInputInCastToDatetime error, got {:?}", result);
}
}

#[test]
#[cfg_attr(miri, ignore)]
fn test_cast_string_with_timezone_offset_to_timestamp_ntz() {
// Offset strings: offset stripped, local datetime preserved (Spark NTZ behavior)
let array: ArrayRef = Arc::new(StringArray::from(vec![
Some("2020-01-01T12:34:56+05:00"),
Some("2020-01-01T12:34:56-08:00"),
Some("2020-01-01T12:34:56Z"),
Some("2020-01-01T12:34:56.123456+00:00"),
Some("2020-01-01T12:34:56.123456"),
]));

let string_array = array
.as_any()
.downcast_ref::<GenericStringArray<i32>>()
.expect("Expected a string array");

let eval_mode = EvalMode::Legacy;
let result = cast_utf8_to_timestamp!(
&string_array,
eval_mode,
TimestampMicrosecondType,
timestamp_ntz_parser,
&Utc,
None::<&str>,
true
).unwrap();

assert_eq!(
result.data_type(),
&DataType::Timestamp(TimeUnit::Microsecond, None)
);
assert_eq!(result.len(), 5);

let ts_array = result
.as_any()
.downcast_ref::<PrimitiveArray<TimestampMicrosecondType>>()
.expect("Expected a timestamp array");

// Offset strings: offset stripped, local datetime kept
assert!(!ts_array.is_null(0), "'+05:00' offset should be parsed");
assert_eq!(ts_array.value(0), 1577882096000000);
assert!(!ts_array.is_null(1), "'-08:00' offset should be parsed");
assert_eq!(ts_array.value(1), 1577882096000000);
assert!(!ts_array.is_null(2), "'Z' suffix should be parsed");
assert_eq!(ts_array.value(2), 1577882096000000);
assert!(
result.is_err(),
"ANSI mode should return Err for an invalid timestamp string"
!ts_array.is_null(3),
"'+00:00' offset with micros should be parsed"
);
assert_eq!(ts_array.value(3), 1577882096123456);

// The one without offset should also parse correctly
assert!(!ts_array.is_null(4));
assert_eq!(ts_array.value(4), 1577882096123456);
}

#[test]
fn test_cast_string_to_timestamp_ntz_via_cast_array() -> DataFusionResult<()> {
let array: ArrayRef = Arc::new(StringArray::from(vec![
Some("2020-01-01T12:34:56.123456"),
Some("T2"),
]));

let timezone = "America/New_York".to_string();
let cast_options = SparkCastOptions::new(EvalMode::Legacy, &timezone, false);

// Cast to Timestamp with timezone
let result_tz = cast_array(
Arc::clone(&array),
&DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
&cast_options,
)?;
assert_eq!(
*result_tz.data_type(),
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
);

// Cast to TimestampNTZ (no timezone)
let result_ntz = cast_array(
Arc::clone(&array),
&DataType::Timestamp(TimeUnit::Microsecond, None),
&cast_options,
)?;
assert_eq!(
*result_ntz.data_type(),
DataType::Timestamp(TimeUnit::Microsecond, None)
);

// The NTZ result should NOT have timezone metadata
let ntz_array = result_ntz
.as_any()
.downcast_ref::<PrimitiveArray<TimestampMicrosecondType>>()
.expect("Expected a timestamp array");
// 2020-01-01T12:34:56.123456 stored as-is (no timezone conversion)
assert_eq!(ntz_array.value(0), 1577882096123456);

Ok(())
}

#[test]
fn test_cast_string_with_offset_via_cast_array() -> DataFusionResult<()> {
let array: ArrayRef = Arc::new(StringArray::from(vec![
Some("2020-01-01T12:34:56+05:00"),
Some("2020-01-01T12:34:56"),
]));

let timezone = "America/Los_Angeles".to_string();
let cast_options = SparkCastOptions::new(EvalMode::Legacy, &timezone, false);

let result = cast_array(
Arc::clone(&array),
&DataType::Timestamp(TimeUnit::Microsecond, None),
&cast_options,
)?;

let ts_array = result
.as_any()
.downcast_ref::<PrimitiveArray<TimestampMicrosecondType>>()
.expect("Expected a timestamp array");

// Offset string should be parsed: offset stripped, local datetime kept
assert!(
!ts_array.is_null(0),
"Offset string should be parsed for NTZ"
);
assert_eq!(ts_array.value(0), 1577882096000000); // 2020-01-01T12:34:56
// Non-offset string should parse correctly
assert!(!ts_array.is_null(1));
Ok(())
}

#[test]
Expand All @@ -1830,7 +2048,7 @@ mod tests {
TimestampMicrosecondType,
timestamp_parser,
tz,
"UTC",
Some("UTC"),
true
);
match result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.comet.expressions

import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression, Literal}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType, TimestampType}
import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType, TimestampNTZType, TimestampType}

import org.apache.comet.CometConf
import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus, withInfo}
Expand Down Expand Up @@ -219,6 +219,10 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim {
Compatible(Some("Only supports years between 262143 BC and 262142 AD"))
case DataTypes.TimestampType =>
Compatible()
case _: TimestampNTZType if evalMode == CometEvalMode.ANSI =>
Incompatible(Some("ANSI mode not supported"))
case _: TimestampNTZType =>
Incompatible(Some("Not all valid formats are supported"))
case _ =>
unsupported(DataTypes.StringType, toType)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- Config: spark.comet.expression.Cast.allowIncompatible=true
-- ConfigMatrix: parquet.enable.dictionary=false,true

-- Test casting string to timestamp_ntz
-- https://github.com/apache/datafusion-comet/issues/3179

statement
CREATE TABLE test_cast_ts_ntz(s string) USING parquet

statement
INSERT INTO test_cast_ts_ntz VALUES ('2020-01-01T12:34:56.123456'), ('2020-01-01'), ('2020-01-01T12:34:56'), ('2020'), ('2020-01'), (NULL), ('not_a_timestamp'), ('2020-01-01T12:34:56+05:00')

-- Cast string to timestamp_ntz: valid formats should parse, invalid should be null
query
SELECT s, cast(s AS timestamp_ntz) FROM test_cast_ts_ntz

-- Verify that timestamp_ntz values are not affected by session timezone
query
SELECT s, cast(s AS timestamp_ntz) FROM test_cast_ts_ntz WHERE s = '2020-01-01T12:34:56.123456'

-- Compare timestamp_ntz vs timestamp (with timezone) to show they differ
query
-- Exclude offset string: cast(s AS timestamp) does not yet parse offsets natively
SELECT s, cast(s AS timestamp_ntz) as ts_ntz, cast(s AS timestamp) as ts FROM test_cast_ts_ntz WHERE s IS NOT NULL AND s != 'not_a_timestamp' AND s != '2020-01-01T12:34:56+05:00'