diff --git a/docs/spark_expressions_support.md b/docs/spark_expressions_support.md index 9d9e8f7017..214948608b 100644 --- a/docs/spark_expressions_support.md +++ b/docs/spark_expressions_support.md @@ -204,7 +204,7 @@ - [ ] second - [ ] timestamp_micros - [ ] timestamp_millis -- [ ] timestamp_seconds +- [x] timestamp_seconds - [ ] to_date - [ ] to_timestamp - [ ] to_timestamp_ltz diff --git a/native/spark-expr/src/comet_scalar_funcs.rs b/native/spark-expr/src/comet_scalar_funcs.rs index f6f6ef29c4..a2ffcc8bad 100644 --- a/native/spark-expr/src/comet_scalar_funcs.rs +++ b/native/spark-expr/src/comet_scalar_funcs.rs @@ -24,7 +24,7 @@ use crate::{ spark_ceil, spark_decimal_div, spark_decimal_integral_div, spark_floor, spark_isnan, spark_lpad, spark_make_decimal, spark_read_side_padding, spark_round, spark_rpad, spark_unhex, spark_unscaled_value, EvalMode, SparkArrayCompact, SparkContains, SparkDateDiff, - SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkSizeFunc, + SparkDateFromUnixDate, SparkDateTrunc, SparkMakeDate, SparkSecondsToTimestamp, SparkSizeFunc, }; use arrow::datatypes::DataType; use datafusion::common::{DataFusionError, Result as DataFusionResult}; @@ -206,6 +206,7 @@ fn all_scalar_functions() -> Vec> { Arc::new(ScalarUDF::new_from_impl(SparkDateFromUnixDate::default())), Arc::new(ScalarUDF::new_from_impl(SparkDateTrunc::default())), Arc::new(ScalarUDF::new_from_impl(SparkMakeDate::default())), + Arc::new(ScalarUDF::new_from_impl(SparkSecondsToTimestamp::default())), Arc::new(ScalarUDF::new_from_impl(SparkSizeFunc::default())), ] } diff --git a/native/spark-expr/src/datetime_funcs/mod.rs b/native/spark-expr/src/datetime_funcs/mod.rs index fb641c3e61..a94bf16ce2 100644 --- a/native/spark-expr/src/datetime_funcs/mod.rs +++ b/native/spark-expr/src/datetime_funcs/mod.rs @@ -21,6 +21,7 @@ mod date_trunc; mod extract_date_part; mod hours; mod make_date; +mod seconds_to_timestamp; mod timestamp_trunc; mod unix_timestamp; @@ -32,5 +33,6 @@ pub use extract_date_part::SparkMinute; pub use extract_date_part::SparkSecond; pub use hours::SparkHoursTransform; pub use make_date::SparkMakeDate; +pub use seconds_to_timestamp::SparkSecondsToTimestamp; pub use timestamp_trunc::TimestampTruncExpr; pub use unix_timestamp::SparkUnixTimestamp; diff --git a/native/spark-expr/src/datetime_funcs/seconds_to_timestamp.rs b/native/spark-expr/src/datetime_funcs/seconds_to_timestamp.rs new file mode 100644 index 0000000000..2da1ac73b0 --- /dev/null +++ b/native/spark-expr/src/datetime_funcs/seconds_to_timestamp.rs @@ -0,0 +1,192 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + Array, Float32Array, Float64Array, Int32Array, Int64Array, TimestampMicrosecondArray, +}; +use arrow::compute::try_unary; +use arrow::datatypes::{DataType, TimeUnit}; +use datafusion::common::{utils::take_function_args, DataFusionError, Result, ScalarValue}; +use datafusion::logical_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +const MICROS_PER_SECOND: i64 = 1_000_000; + +/// Spark-compatible seconds_to_timestamp (timestamp_seconds) function. +/// Converts seconds since Unix epoch to a timestamp. +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkSecondsToTimestamp { + signature: Signature, + aliases: Vec, +} + +impl SparkSecondsToTimestamp { + pub fn new() -> Self { + Self { + signature: Signature::one_of( + vec![ + TypeSignature::Exact(vec![DataType::Int32]), + TypeSignature::Exact(vec![DataType::Int64]), + TypeSignature::Exact(vec![DataType::Float32]), + TypeSignature::Exact(vec![DataType::Float64]), + ], + Volatility::Immutable, + ), + aliases: vec!["timestamp_seconds".to_string()], + } + } +} + +impl Default for SparkSecondsToTimestamp { + fn default() -> Self { + Self::new() + } +} + +impl ScalarUDFImpl for SparkSecondsToTimestamp { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "seconds_to_timestamp" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _: &[DataType]) -> Result { + Ok(DataType::Timestamp(TimeUnit::Microsecond, None)) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [seconds] = take_function_args(self.name(), args.args)?; + + match seconds { + ColumnarValue::Array(arr) => { + // Handle Int32 input — no overflow possible since i32 * 1_000_000 fits in i64 + if let Some(int_array) = arr.as_any().downcast_ref::() { + let result: TimestampMicrosecondArray = + try_unary(int_array, |s| Ok((s as i64) * MICROS_PER_SECOND))?; + return Ok(ColumnarValue::Array(Arc::new(result))); + } + + // Handle Int64 input — error on overflow to match Spark's Math.multiplyExact + if let Some(int_array) = arr.as_any().downcast_ref::() { + let result: TimestampMicrosecondArray = try_unary(int_array, |s| { + s.checked_mul(MICROS_PER_SECOND).ok_or_else(|| { + arrow::error::ArrowError::ComputeError("long overflow".to_string()) + }) + })?; + return Ok(ColumnarValue::Array(Arc::new(result))); + } + + // Handle Float32 input — cast to f64 and use Float64 path + if let Some(float_array) = arr.as_any().downcast_ref::() { + let result: arrow::array::TimestampMicrosecondArray = float_array + .iter() + .map(|opt| { + opt.and_then(|s| { + let s = s as f64; + if s.is_nan() || s.is_infinite() { + None + } else { + Some((s * (MICROS_PER_SECOND as f64)) as i64) + } + }) + }) + .collect(); + return Ok(ColumnarValue::Array(Arc::new(result))); + } + + // Handle Float64 input — NaN and Infinity return null per Spark behavior + if let Some(float_array) = arr.as_any().downcast_ref::() { + let result: arrow::array::TimestampMicrosecondArray = float_array + .iter() + .map(|opt| { + opt.and_then(|s| { + if s.is_nan() || s.is_infinite() { + None + } else { + Some((s * (MICROS_PER_SECOND as f64)) as i64) + } + }) + }) + .collect(); + return Ok(ColumnarValue::Array(Arc::new(result))); + } + + Err(DataFusionError::Execution(format!( + "seconds_to_timestamp expects Int32, Int64, Float32 or Float64 input, got {:?}", + arr.data_type() + ))) + } + ColumnarValue::Scalar(scalar) => { + let ts_micros = match &scalar { + ScalarValue::Int32(Some(s)) => Some((*s as i64) * MICROS_PER_SECOND), + ScalarValue::Int64(Some(s)) => { + Some(s.checked_mul(MICROS_PER_SECOND).ok_or_else(|| { + DataFusionError::ArrowError( + Box::new(arrow::error::ArrowError::ComputeError( + "long overflow".to_string(), + )), + None, + ) + })?) + } + ScalarValue::Float32(Some(s)) => { + let s = *s as f64; + if s.is_nan() || s.is_infinite() { + None + } else { + Some((s * (MICROS_PER_SECOND as f64)) as i64) + } + } + ScalarValue::Float64(Some(s)) => { + if s.is_nan() || s.is_infinite() { + None + } else { + Some((s * (MICROS_PER_SECOND as f64)) as i64) + } + } + ScalarValue::Int32(None) + | ScalarValue::Int64(None) + | ScalarValue::Float32(None) + | ScalarValue::Float64(None) + | ScalarValue::Null => None, + _ => { + return Err(DataFusionError::Execution(format!( + "seconds_to_timestamp expects numeric scalar input, got {:?}", + scalar.data_type() + ))) + } + }; + Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond( + ts_micros, None, + ))) + } + } + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 963ce62e2b..c7385c949f 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -72,7 +72,8 @@ pub use comet_scalar_funcs::{ pub use csv_funcs::*; pub use datetime_funcs::{ SparkDateDiff, SparkDateFromUnixDate, SparkDateTrunc, SparkHour, SparkHoursTransform, - SparkMakeDate, SparkMinute, SparkSecond, SparkUnixTimestamp, TimestampTruncExpr, + SparkMakeDate, SparkMinute, SparkSecond, SparkSecondsToTimestamp, SparkUnixTimestamp, + TimestampTruncExpr, }; pub use error::{decimal_overflow_error, SparkError, SparkErrorWithContext, SparkResult}; pub use hash_funcs::*; diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 810d9bd7da..741629a0c4 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -212,6 +212,7 @@ object QueryPlanSerde extends Logging with CometExprShim { classOf[Minute] -> CometMinute, classOf[NextDay] -> CometNextDay, classOf[Second] -> CometSecond, + classOf[SecondsToTimestamp] -> CometSecondsToTimestamp, classOf[TruncDate] -> CometTruncDate, classOf[TruncTimestamp] -> CometTruncTimestamp, classOf[UnixTimestamp] -> CometUnixTimestamp, diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index 5413e8b439..65de490f12 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, GetDateField, Hour, Hours, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, SecondsToTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampNTZType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -352,6 +352,9 @@ object CometNextDay extends CometScalarFunction[NextDay]("next_day") object CometMakeDate extends CometScalarFunction[MakeDate]("make_date") +object CometSecondsToTimestamp + extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp") + object CometLastDay extends CometScalarFunction[LastDay]("last_day") object CometDateFromUnixDate extends CometScalarFunction[DateFromUnixDate]("date_from_unix_date") diff --git a/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_seconds.sql b/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_seconds.sql new file mode 100644 index 0000000000..b1605b3284 --- /dev/null +++ b/spark/src/test/resources/sql-tests/expressions/datetime/timestamp_seconds.sql @@ -0,0 +1,80 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Config: spark.sql.session.timeZone=UTC +-- ConfigMatrix: parquet.enable.dictionary=false,true + +-- bigint column +statement +CREATE TABLE test_ts_seconds_bigint(c0 bigint) USING parquet + +statement +INSERT INTO test_ts_seconds_bigint VALUES (0), (1640995200), (-86400), (4102444800), (-2208988800), (NULL) + +query +SELECT c0, timestamp_seconds(c0) FROM test_ts_seconds_bigint + +-- int column +statement +CREATE TABLE test_ts_seconds_int(c0 int) USING parquet + +statement +INSERT INTO test_ts_seconds_int VALUES (0), (1640995200), (-86400), (NULL) + +query +SELECT c0, timestamp_seconds(c0) FROM test_ts_seconds_int + +-- double column +statement +CREATE TABLE test_ts_seconds_double(c0 double) USING parquet + +statement +INSERT INTO test_ts_seconds_double VALUES (0.0), (1640995200.123), (-86400.5), (NULL) + +query +SELECT c0, timestamp_seconds(c0) FROM test_ts_seconds_double + +-- literal arguments +query +SELECT timestamp_seconds(0) + +query +SELECT timestamp_seconds(1640995200) + +-- negative value (before epoch) +query +SELECT timestamp_seconds(-86400) + +-- decimal seconds (fractional) +query +SELECT timestamp_seconds(CAST(1640995200.123 AS DOUBLE)) + +-- null handling +query +SELECT timestamp_seconds(NULL) + +-- NaN input (should return null) +query +SELECT timestamp_seconds(CAST('NaN' AS DOUBLE)) + +-- Infinity input (should return null) +query +SELECT timestamp_seconds(CAST('Infinity' AS DOUBLE)) + +-- Negative infinity input (should return null) +query +SELECT timestamp_seconds(CAST('-Infinity' AS DOUBLE))