diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 36ea78580d14b..44bf278b9674c 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -89,8 +89,14 @@ pub enum ScalarValue { Date32(Option), /// Date stored as a signed 64bit int milliseconds since UNIX epoch 1970-01-01 Date64(Option), + /// Time stored as a signed 32bit int as seconds since midnight + Time32Second(Option), + /// Time stored as a signed 32bit int as milliseconds since midnight + Time32Millisecond(Option), + /// Time stored as a signed 64bit int as microseconds since midnight + Time64Microsecond(Option), /// Time stored as a signed 64bit int as nanoseconds since midnight - Time64(Option), + Time64Nanosecond(Option), /// Timestamp Second TimestampSecond(Option, Option), /// Timestamp Milliseconds @@ -170,8 +176,14 @@ impl PartialEq for ScalarValue { (Date32(_), _) => false, (Date64(v1), Date64(v2)) => v1.eq(v2), (Date64(_), _) => false, - (Time64(v1), Time64(v2)) => v1.eq(v2), - (Time64(_), _) => false, + (Time32Second(v1), Time32Second(v2)) => v1.eq(v2), + (Time32Second(_), _) => false, + (Time32Millisecond(v1), Time32Millisecond(v2)) => v1.eq(v2), + (Time32Millisecond(_), _) => false, + (Time64Microsecond(v1), Time64Microsecond(v2)) => v1.eq(v2), + (Time64Microsecond(_), _) => false, + (Time64Nanosecond(v1), Time64Nanosecond(v2)) => v1.eq(v2), + (Time64Nanosecond(_), _) => false, (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.eq(v2), (TimestampSecond(_, _), _) => false, (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => v1.eq(v2), @@ -263,8 +275,14 @@ impl PartialOrd for ScalarValue { (Date32(_), _) => None, (Date64(v1), Date64(v2)) => v1.partial_cmp(v2), (Date64(_), _) => None, - (Time64(v1), Time64(v2)) => v1.partial_cmp(v2), - (Time64(_), _) => None, + (Time32Second(v1), Time32Second(v2)) => v1.partial_cmp(v2), + (Time32Second(_), _) => None, + (Time32Millisecond(v1), Time32Millisecond(v2)) => v1.partial_cmp(v2), + (Time32Millisecond(_), _) => None, + (Time64Microsecond(v1), Time64Microsecond(v2)) => v1.partial_cmp(v2), + (Time64Microsecond(_), _) => None, + (Time64Nanosecond(v1), Time64Nanosecond(v2)) => v1.partial_cmp(v2), + (Time64Nanosecond(_), _) => None, (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.partial_cmp(v2), (TimestampSecond(_, _), _) => None, (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => { @@ -670,7 +688,10 @@ impl std::hash::Hash for ScalarValue { } Date32(v) => v.hash(state), Date64(v) => v.hash(state), - Time64(v) => v.hash(state), + Time32Second(v) => v.hash(state), + Time32Millisecond(v) => v.hash(state), + Time64Microsecond(v) => v.hash(state), + Time64Nanosecond(v) => v.hash(state), TimestampSecond(v, _) => v.hash(state), TimestampMillisecond(v, _) => v.hash(state), TimestampMicrosecond(v, _) => v.hash(state), @@ -1036,7 +1057,10 @@ impl ScalarValue { ))), ScalarValue::Date32(_) => DataType::Date32, ScalarValue::Date64(_) => DataType::Date64, - ScalarValue::Time64(_) => DataType::Time64(TimeUnit::Nanosecond), + ScalarValue::Time32Second(_) => DataType::Time32(TimeUnit::Second), + ScalarValue::Time32Millisecond(_) => DataType::Time32(TimeUnit::Millisecond), + ScalarValue::Time64Microsecond(_) => DataType::Time64(TimeUnit::Microsecond), + ScalarValue::Time64Nanosecond(_) => DataType::Time64(TimeUnit::Nanosecond), ScalarValue::IntervalYearMonth(_) => { DataType::Interval(IntervalUnit::YearMonth) } @@ -1120,7 +1144,10 @@ impl ScalarValue { ScalarValue::List(v, _) => v.is_none(), ScalarValue::Date32(v) => v.is_none(), ScalarValue::Date64(v) => v.is_none(), - ScalarValue::Time64(v) => v.is_none(), + ScalarValue::Time32Second(v) => v.is_none(), + ScalarValue::Time32Millisecond(v) => v.is_none(), + ScalarValue::Time64Microsecond(v) => v.is_none(), + ScalarValue::Time64Nanosecond(v) => v.is_none(), ScalarValue::TimestampSecond(v, _) => v.is_none(), ScalarValue::TimestampMillisecond(v, _) => v.is_none(), ScalarValue::TimestampMicrosecond(v, _) => v.is_none(), @@ -1380,8 +1407,17 @@ impl ScalarValue { DataType::LargeBinary => build_array_string!(LargeBinaryArray, LargeBinary), DataType::Date32 => build_array_primitive!(Date32Array, Date32), DataType::Date64 => build_array_primitive!(Date64Array, Date64), + DataType::Time32(TimeUnit::Second) => { + build_array_primitive!(Time32SecondArray, Time32Second) + } + DataType::Time32(TimeUnit::Millisecond) => { + build_array_primitive!(Time32MillisecondArray, Time32Millisecond) + } + DataType::Time64(TimeUnit::Microsecond) => { + build_array_primitive!(Time64MicrosecondArray, Time64Microsecond) + } DataType::Time64(TimeUnit::Nanosecond) => { - build_array_primitive!(Time64NanosecondArray, Time64) + build_array_primitive!(Time64NanosecondArray, Time64Nanosecond) } DataType::Timestamp(TimeUnit::Second, _) => { build_array_primitive_tz!(TimestampSecondArray, TimestampSecond) @@ -1541,10 +1577,15 @@ impl ScalarValue { Arc::new(array) } // explicitly enumerate unsupported types so newly added - // types must be aknowledged + // types must be aknowledged, Time32 and Time64 types are + // not supported if the TimeUnit is not valid (Time32 can + // only be used with Second and Millisecond, Time64 only + // with Microsecond and Nanosecond) DataType::Float16 - | DataType::Time32(_) - | DataType::Time64(_) + | DataType::Time32(TimeUnit::Microsecond) + | DataType::Time32(TimeUnit::Nanosecond) + | DataType::Time64(TimeUnit::Second) + | DataType::Time64(TimeUnit::Millisecond) | DataType::Duration(_) | DataType::FixedSizeList(_, _) | DataType::Interval(_) @@ -1813,7 +1854,34 @@ impl ScalarValue { ScalarValue::Date64(e) => { build_array_from_option!(Date64, Date64Array, e, size) } - ScalarValue::Time64(e) => { + ScalarValue::Time32Second(e) => { + build_array_from_option!( + Time32, + TimeUnit::Second, + Time32SecondArray, + e, + size + ) + } + ScalarValue::Time32Millisecond(e) => { + build_array_from_option!( + Time32, + TimeUnit::Millisecond, + Time32MillisecondArray, + e, + size + ) + } + ScalarValue::Time64Microsecond(e) => { + build_array_from_option!( + Time64, + TimeUnit::Microsecond, + Time64MicrosecondArray, + e, + size + ) + } + ScalarValue::Time64Nanosecond(e) => { build_array_from_option!( Time64, TimeUnit::Nanosecond, @@ -1957,8 +2025,17 @@ impl ScalarValue { DataType::Date64 => { typed_cast!(array, index, Date64Array, Date64) } + DataType::Time32(TimeUnit::Second) => { + typed_cast!(array, index, Time32SecondArray, Time32Second) + } + DataType::Time32(TimeUnit::Millisecond) => { + typed_cast!(array, index, Time32MillisecondArray, Time32Millisecond) + } + DataType::Time64(TimeUnit::Microsecond) => { + typed_cast!(array, index, Time64MicrosecondArray, Time64Microsecond) + } DataType::Time64(TimeUnit::Nanosecond) => { - typed_cast!(array, index, Time64NanosecondArray, Time64) + typed_cast!(array, index, Time64NanosecondArray, Time64Nanosecond) } DataType::Timestamp(TimeUnit::Second, tz_opt) => { typed_cast_tz!( @@ -2163,7 +2240,16 @@ impl ScalarValue { ScalarValue::Date64(val) => { eq_array_primitive!(array, index, Date64Array, val) } - ScalarValue::Time64(val) => { + ScalarValue::Time32Second(val) => { + eq_array_primitive!(array, index, Time32SecondArray, val) + } + ScalarValue::Time32Millisecond(val) => { + eq_array_primitive!(array, index, Time32MillisecondArray, val) + } + ScalarValue::Time64Microsecond(val) => { + eq_array_primitive!(array, index, Time64MicrosecondArray, val) + } + ScalarValue::Time64Nanosecond(val) => { eq_array_primitive!(array, index, Time64NanosecondArray, val) } ScalarValue::TimestampSecond(val, _) => { @@ -2295,14 +2381,16 @@ macro_rules! impl_try_from { impl_try_from!(Int8, i8); impl_try_from!(Int16, i16); -// special implementation for i32 because of Date32 +// special implementation for i32 because of Date32 and Time32 impl TryFrom for i32 { type Error = DataFusionError; fn try_from(value: ScalarValue) -> Result { match value { ScalarValue::Int32(Some(inner_value)) - | ScalarValue::Date32(Some(inner_value)) => Ok(inner_value), + | ScalarValue::Date32(Some(inner_value)) + | ScalarValue::Time32Second(Some(inner_value)) + | ScalarValue::Time32Millisecond(Some(inner_value)) => Ok(inner_value), _ => Err(DataFusionError::Internal(format!( "Cannot convert {:?} to {}", value, @@ -2312,7 +2400,7 @@ impl TryFrom for i32 { } } -// special implementation for i64 because of TimeNanosecond +// special implementation for i64 because of Date64, Time64 and Timestamp impl TryFrom for i64 { type Error = DataFusionError; @@ -2320,7 +2408,8 @@ impl TryFrom for i64 { match value { ScalarValue::Int64(Some(inner_value)) | ScalarValue::Date64(Some(inner_value)) - | ScalarValue::Time64(Some(inner_value)) + | ScalarValue::Time64Microsecond(Some(inner_value)) + | ScalarValue::Time64Nanosecond(Some(inner_value)) | ScalarValue::TimestampNanosecond(Some(inner_value), _) | ScalarValue::TimestampMicrosecond(Some(inner_value), _) | ScalarValue::TimestampMillisecond(Some(inner_value), _) @@ -2394,7 +2483,14 @@ impl TryFrom<&DataType> for ScalarValue { DataType::LargeBinary => ScalarValue::LargeBinary(None), DataType::Date32 => ScalarValue::Date32(None), DataType::Date64 => ScalarValue::Date64(None), - DataType::Time64(TimeUnit::Nanosecond) => ScalarValue::Time64(None), + DataType::Time32(TimeUnit::Second) => ScalarValue::Time32Second(None), + DataType::Time32(TimeUnit::Millisecond) => { + ScalarValue::Time32Millisecond(None) + } + DataType::Time64(TimeUnit::Microsecond) => { + ScalarValue::Time64Microsecond(None) + } + DataType::Time64(TimeUnit::Nanosecond) => ScalarValue::Time64Nanosecond(None), DataType::Timestamp(TimeUnit::Second, tz_opt) => { ScalarValue::TimestampSecond(None, tz_opt.clone()) } @@ -2515,7 +2611,10 @@ impl fmt::Display for ScalarValue { }, ScalarValue::Date32(e) => format_option!(f, e)?, ScalarValue::Date64(e) => format_option!(f, e)?, - ScalarValue::Time64(e) => format_option!(f, e)?, + ScalarValue::Time32Second(e) => format_option!(f, e)?, + ScalarValue::Time32Millisecond(e) => format_option!(f, e)?, + ScalarValue::Time64Microsecond(e) => format_option!(f, e)?, + ScalarValue::Time64Nanosecond(e) => format_option!(f, e)?, ScalarValue::IntervalDayTime(e) => format_option!(f, e)?, ScalarValue::IntervalYearMonth(e) => format_option!(f, e)?, ScalarValue::IntervalMonthDayNano(e) => format_option!(f, e)?, @@ -2582,7 +2681,16 @@ impl fmt::Debug for ScalarValue { ScalarValue::List(_, _) => write!(f, "List([{}])", self), ScalarValue::Date32(_) => write!(f, "Date32(\"{}\")", self), ScalarValue::Date64(_) => write!(f, "Date64(\"{}\")", self), - ScalarValue::Time64(_) => write!(f, "Time64(\"{}\")", self), + ScalarValue::Time32Second(_) => write!(f, "Time32Second(\"{}\")", self), + ScalarValue::Time32Millisecond(_) => { + write!(f, "Time32Millisecond(\"{}\")", self) + } + ScalarValue::Time64Microsecond(_) => { + write!(f, "Time64Microsecond(\"{}\")", self) + } + ScalarValue::Time64Nanosecond(_) => { + write!(f, "Time64Nanosecond(\"{}\")", self) + } ScalarValue::IntervalDayTime(_) => { write!(f, "IntervalDayTime(\"{}\")", self) } @@ -3212,7 +3320,10 @@ mod tests { make_binary_test_case!(str_vals, LargeBinaryArray, LargeBinary), make_test_case!(i32_vals, Date32Array, Date32), make_test_case!(i64_vals, Date64Array, Date64), - make_test_case!(i64_vals, Time64NanosecondArray, Time64), + make_test_case!(i32_vals, Time32SecondArray, Time32Second), + make_test_case!(i32_vals, Time32MillisecondArray, Time32Millisecond), + make_test_case!(i64_vals, Time64MicrosecondArray, Time64Microsecond), + make_test_case!(i64_vals, Time64NanosecondArray, Time64Nanosecond), make_test_case!(i64_vals, TimestampSecondArray, TimestampSecond, None), make_test_case!( i64_vals, diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 86448ff58b2fe..d557ca6ed2aff 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -24,8 +24,10 @@ use arrow::{ array::{ as_dictionary_array, ArrayData, ArrayRef, BooleanArray, Date32Array, Date64Array, Decimal128Array, DictionaryArray, LargeStringArray, PrimitiveArray, - TimestampMicrosecondArray, TimestampMillisecondArray, TimestampSecondArray, - UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, UInt64Builder, + Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, + Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampSecondArray, UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, + UInt64Builder, }, compute, datatypes::{ @@ -1218,6 +1220,34 @@ fn equal_rows( DataType::Date64 => { equal_rows_elem!(Date64Array, l, r, left, right, null_equals_null) } + DataType::Time32(time_unit) => match time_unit { + TimeUnit::Second => { + equal_rows_elem!(Time32SecondArray, l, r, left, right, null_equals_null) + } + TimeUnit::Millisecond => { + equal_rows_elem!(Time32MillisecondArray, l, r, left, right, null_equals_null) + } + _ => { + err = Some(Err(DataFusionError::Internal( + "Unsupported data type in hasher".to_string(), + ))); + false + } + } + DataType::Time64(time_unit) => match time_unit { + TimeUnit::Microsecond => { + equal_rows_elem!(Time64MicrosecondArray, l, r, left, right, null_equals_null) + } + TimeUnit::Nanosecond => { + equal_rows_elem!(Time64NanosecondArray, l, r, left, right, null_equals_null) + } + _ => { + err = Some(Err(DataFusionError::Internal( + "Unsupported data type in hasher".to_string(), + ))); + false + } + } DataType::Timestamp(time_unit, None) => match time_unit { TimeUnit::Second => { equal_rows_elem!( diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 88ae4d00dfeee..8b57a69fa61d3 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -1539,6 +1539,92 @@ async fn aggregate_timestamps_max() -> Result<()> { Ok(()) } +#[tokio::test] +async fn aggregate_times_sum() -> Result<()> { + let ctx = SessionContext::new(); + ctx.register_table("t", table_with_times()).unwrap(); + + let results = plan_and_collect( + &ctx, + "SELECT sum(nanos), sum(micros), sum(millis), sum(secs) FROM t", + ) + .await + .unwrap_err(); + + assert_eq!(results.to_string(), "Error during planning: The function Sum does not support inputs of type Time64(Nanosecond)."); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_times_count() -> Result<()> { + let ctx = SessionContext::new(); + ctx.register_table("t", table_with_times()).unwrap(); + + let results = execute_to_batches( + &ctx, + "SELECT count(nanos), count(micros), count(millis), count(secs) FROM t", + ) + .await; + + let expected = vec![ + "+----------------+-----------------+-----------------+---------------+", + "| COUNT(t.nanos) | COUNT(t.micros) | COUNT(t.millis) | COUNT(t.secs) |", + "+----------------+-----------------+-----------------+---------------+", + "| 4 | 4 | 4 | 4 |", + "+----------------+-----------------+-----------------+---------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_times_min() -> Result<()> { + let ctx = SessionContext::new(); + ctx.register_table("t", table_with_times()).unwrap(); + + let results = execute_to_batches( + &ctx, + "SELECT min(nanos), min(micros), min(millis), min(secs) FROM t", + ) + .await; + + let expected = vec![ + "+--------------------+-----------------+---------------+-------------+", + "| MIN(t.nanos) | MIN(t.micros) | MIN(t.millis) | MIN(t.secs) |", + "+--------------------+-----------------+---------------+-------------+", + "| 18:06:30.243620451 | 18:06:30.243620 | 18:06:30.243 | 18:06:30 |", + "+--------------------+-----------------+---------------+-------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn aggregate_times_max() -> Result<()> { + let ctx = SessionContext::new(); + ctx.register_table("t", table_with_times()).unwrap(); + + let results = execute_to_batches( + &ctx, + "SELECT max(nanos), max(micros), max(millis), max(secs) FROM t", + ) + .await; + + let expected = vec![ + "+--------------------+-----------------+---------------+-------------+", + "| MAX(t.nanos) | MAX(t.micros) | MAX(t.millis) | MAX(t.secs) |", + "+--------------------+-----------------+---------------+-------------+", + "| 21:06:28.247821084 | 21:06:28.247821 | 21:06:28.247 | 21:06:28 |", + "+--------------------+-----------------+---------------+-------------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + #[tokio::test] async fn aggregate_timestamps_avg() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/core/tests/sql/group_by.rs b/datafusion/core/tests/sql/group_by.rs index 56044862cdb92..fffcc356d4335 100644 --- a/datafusion/core/tests/sql/group_by.rs +++ b/datafusion/core/tests/sql/group_by.rs @@ -485,6 +485,186 @@ async fn csv_group_by_date() -> Result<()> { Ok(()) } +#[tokio::test] +async fn csv_group_by_time32second() -> Result<()> { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("time", DataType::Time32(TimeUnit::Second), false), + Field::new("cnt", DataType::Int32, false), + ])); + let data = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Time32SecondArray::from(vec![ + Some(5_000), + Some(5_000), + Some(5_500), + Some(5_500), + Some(5_900), + Some(5_900), + ])), + Arc::new(Int32Array::from(vec![ + Some(1), + Some(1), + Some(1), + Some(2), + Some(1), + Some(3), + ])), + ], + )?; + + ctx.register_batch("times", data)?; + let sql = "SELECT SUM(cnt) FROM times GROUP BY time"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+----------------+", + "| SUM(times.cnt) |", + "+----------------+", + "| 2 |", + "| 3 |", + "| 4 |", + "+----------------+", + ]; + assert_batches_sorted_eq!(expected, &actual); + Ok(()) +} + +#[tokio::test] +async fn csv_group_by_time32millisecond() -> Result<()> { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("time", DataType::Time32(TimeUnit::Millisecond), false), + Field::new("cnt", DataType::Int32, false), + ])); + let data = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Time32MillisecondArray::from(vec![ + Some(5_000_000), + Some(5_000_000), + Some(5_500_000), + Some(5_500_000), + Some(5_900_000), + Some(5_900_000), + ])), + Arc::new(Int32Array::from(vec![ + Some(1), + Some(1), + Some(1), + Some(2), + Some(1), + Some(3), + ])), + ], + )?; + + ctx.register_batch("times", data)?; + let sql = "SELECT SUM(cnt) FROM times GROUP BY time"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+----------------+", + "| SUM(times.cnt) |", + "+----------------+", + "| 2 |", + "| 3 |", + "| 4 |", + "+----------------+", + ]; + assert_batches_sorted_eq!(expected, &actual); + Ok(()) +} + +#[tokio::test] +async fn csv_group_by_time64microsecond() -> Result<()> { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("time", DataType::Time64(TimeUnit::Microsecond), false), + Field::new("cnt", DataType::Int64, false), + ])); + let data = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Time64MicrosecondArray::from(vec![ + Some(5_000_000_000), + Some(5_000_000_000), + Some(5_500_000_000), + Some(5_500_000_000), + Some(5_900_000_000), + Some(5_900_000_000), + ])), + Arc::new(Int64Array::from(vec![ + Some(1), + Some(1), + Some(1), + Some(2), + Some(1), + Some(3), + ])), + ], + )?; + + ctx.register_batch("times", data)?; + let sql = "SELECT SUM(cnt) FROM times GROUP BY time"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+----------------+", + "| SUM(times.cnt) |", + "+----------------+", + "| 2 |", + "| 3 |", + "| 4 |", + "+----------------+", + ]; + assert_batches_sorted_eq!(expected, &actual); + Ok(()) +} + +#[tokio::test] +async fn csv_group_by_time64nanosecond() -> Result<()> { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("time", DataType::Time64(TimeUnit::Nanosecond), false), + Field::new("cnt", DataType::Int64, false), + ])); + let data = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Time64NanosecondArray::from(vec![ + Some(5_000_000_000_000), + Some(5_000_000_000_000), + Some(5_500_000_000_000), + Some(5_500_000_000_000), + Some(5_900_000_000_000), + Some(5_900_000_000_000), + ])), + Arc::new(Int64Array::from(vec![ + Some(1), + Some(1), + Some(1), + Some(2), + Some(1), + Some(3), + ])), + ], + )?; + + ctx.register_batch("times", data)?; + let sql = "SELECT SUM(cnt) FROM times GROUP BY time"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+----------------+", + "| SUM(times.cnt) |", + "+----------------+", + "| 2 |", + "| 3 |", + "| 4 |", + "+----------------+", + ]; + assert_batches_sorted_eq!(expected, &actual); + Ok(()) +} + #[tokio::test] async fn group_by_date_trunc() -> Result<()> { let tmp_dir = TempDir::new()?; diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 8466e795c4a7e..619270f20f461 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -1181,6 +1181,99 @@ pub fn make_timestamps() -> RecordBatch { .unwrap() } +/// Return a new table provider containing all of the supported timestamp types +pub fn table_with_times() -> Arc { + let batch = make_times(); + let schema = batch.schema(); + let partitions = vec![vec![batch]]; + Arc::new(MemTable::try_new(schema, partitions).unwrap()) +} + +/// Return record batch with all of the supported time types +/// values +/// +/// Columns are named: +/// "nanos" --> Time64NanosecondArray +/// "micros" --> Time64MicrosecondArray +/// "millis" --> Time32MillisecondArray +/// "secs" --> Time32SecondArray +/// "names" --> StringArray +pub fn make_times() -> RecordBatch { + let ts_strings = vec![ + Some("18:06:30.243620451"), + Some("20:08:28.161121654"), + Some("19:11:04.156423842"), + Some("21:06:28.247821084"), + ]; + + let ts_nanos = ts_strings + .into_iter() + .map(|t| { + t.map(|t| { + let integer_sec = t + .parse::() + .unwrap() + .num_seconds_from_midnight() as i64; + let extra_nano = + t.parse::().unwrap().nanosecond() as i64; + // Total time in nanoseconds given by integer number of seconds multiplied by 10^9 + // plus number of nanoseconds corresponding to the extra fraction of second + integer_sec * 1_000_000_000 + extra_nano + }) + }) + .collect::>(); + + let ts_micros = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| ts_nanos / 1000)) + .collect::>(); + + let ts_millis = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| { ts_nanos / 1000000 } as i32)) + .collect::>(); + + let ts_secs = ts_nanos + .iter() + .map(|t| t.as_ref().map(|ts_nanos| { ts_nanos / 1000000000 } as i32)) + .collect::>(); + + let names = ts_nanos + .iter() + .enumerate() + .map(|(i, _)| format!("Row {}", i)) + .collect::>(); + + let arr_nanos = Time64NanosecondArray::from(ts_nanos); + let arr_micros = Time64MicrosecondArray::from(ts_micros); + let arr_millis = Time32MillisecondArray::from(ts_millis); + let arr_secs = Time32SecondArray::from(ts_secs); + + let names = names.iter().map(|s| s.as_str()).collect::>(); + let arr_names = StringArray::from(names); + + let schema = Schema::new(vec![ + Field::new("nanos", arr_nanos.data_type().clone(), true), + Field::new("micros", arr_micros.data_type().clone(), true), + Field::new("millis", arr_millis.data_type().clone(), true), + Field::new("secs", arr_secs.data_type().clone(), true), + Field::new("name", arr_names.data_type().clone(), true), + ]); + let schema = Arc::new(schema); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(arr_nanos), + Arc::new(arr_micros), + Arc::new(arr_millis), + Arc::new(arr_secs), + Arc::new(arr_names), + ], + ) + .unwrap() +} + #[tokio::test] async fn nyc() -> Result<()> { // schema for nyxtaxi csv files diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs index c3890fac40617..6d7014507bb09 100644 --- a/datafusion/core/tests/sql/select.rs +++ b/datafusion/core/tests/sql/select.rs @@ -909,6 +909,182 @@ async fn query_on_string_dictionary() -> Result<()> { Ok(()) } +#[tokio::test] +async fn filter_with_time32second() -> Result<()> { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("time", DataType::Time32(TimeUnit::Second), false), + Field::new("value", DataType::Int64, false), + ])); + let data = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Time32SecondArray::from(vec![ + Some(5_000), + Some(5_000), + Some(5_500), + Some(5_500), + Some(5_900), + Some(5_900), + ])), + Arc::new(Int64Array::from(vec![ + Some(2505), + Some(2436), + Some(2384), + Some(1815), + Some(2330), + Some(2065), + ])), + ], + )?; + + ctx.register_batch("temporal", data)?; + let sql = "SELECT value FROM temporal WHERE time = '01:23:20'"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+", + "| value |", + "+-------+", + "| 2436 |", + "| 2505 |", + "+-------+", + ]; + assert_batches_sorted_eq!(expected, &actual); + Ok(()) +} + +#[tokio::test] +async fn filter_with_time32millisecond() -> Result<()> { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("time", DataType::Time32(TimeUnit::Millisecond), false), + Field::new("value", DataType::Int64, false), + ])); + let data = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Time32MillisecondArray::from(vec![ + Some(5_000_000), + Some(5_000_000), + Some(5_500_000), + Some(5_500_000), + Some(5_900_000), + Some(5_900_000), + ])), + Arc::new(Int64Array::from(vec![ + Some(2505), + Some(2436), + Some(2384), + Some(1815), + Some(2330), + Some(2065), + ])), + ], + )?; + + ctx.register_batch("temporal", data)?; + let sql = "SELECT value FROM temporal WHERE time = '01:23:20'"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+", + "| value |", + "+-------+", + "| 2436 |", + "| 2505 |", + "+-------+", + ]; + assert_batches_sorted_eq!(expected, &actual); + Ok(()) +} + +#[tokio::test] +async fn filter_with_time64microsecond() -> Result<()> { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("time", DataType::Time64(TimeUnit::Microsecond), false), + Field::new("value", DataType::Int64, false), + ])); + let data = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Time64MicrosecondArray::from(vec![ + Some(5_000_000_000), + Some(5_000_000_000), + Some(5_500_000_000), + Some(5_500_000_000), + Some(5_900_000_000), + Some(5_900_000_000), + ])), + Arc::new(Int64Array::from(vec![ + Some(2505), + Some(2436), + Some(2384), + Some(1815), + Some(2330), + Some(2065), + ])), + ], + )?; + + ctx.register_batch("temporal", data)?; + let sql = "SELECT value FROM temporal WHERE time = '01:23:20'"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+", + "| value |", + "+-------+", + "| 2436 |", + "| 2505 |", + "+-------+", + ]; + assert_batches_sorted_eq!(expected, &actual); + Ok(()) +} + +#[tokio::test] +async fn filter_with_time64nanosecond() -> Result<()> { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("time", DataType::Time64(TimeUnit::Nanosecond), false), + Field::new("value", DataType::Int64, false), + ])); + let data = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Time64NanosecondArray::from(vec![ + Some(5_000_000_000_000), + Some(5_000_000_000_000), + Some(5_500_000_000_000), + Some(5_500_000_000_000), + Some(5_900_000_000_000), + Some(5_900_000_000_000), + ])), + Arc::new(Int64Array::from(vec![ + Some(2505), + Some(2436), + Some(2384), + Some(1815), + Some(2330), + Some(2065), + ])), + ], + )?; + + ctx.register_batch("temporal", data)?; + let sql = "SELECT value FROM temporal WHERE time = '01:23:20'"; + let actual = execute_to_batches(&ctx, sql).await; + let expected = vec![ + "+-------+", + "| value |", + "+-------+", + "| 2436 |", + "| 2505 |", + "+-------+", + ]; + assert_batches_sorted_eq!(expected, &actual); + Ok(()) +} + #[tokio::test] async fn query_cte_with_alias() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/expr/src/type_coercion/binary.rs b/datafusion/expr/src/type_coercion/binary.rs index 911874fb09ba2..70e09f2fbdf2b 100644 --- a/datafusion/expr/src/type_coercion/binary.rs +++ b/datafusion/expr/src/type_coercion/binary.rs @@ -20,7 +20,9 @@ use crate::type_coercion::is_numeric; use crate::Operator; use arrow::compute::can_cast_types; -use arrow::datatypes::{DataType, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE}; +use arrow::datatypes::{ + DataType, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, +}; use datafusion_common::DataFusionError; use datafusion_common::Result; @@ -517,11 +519,23 @@ fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { .or_else(|| null_coercion(lhs_type, rhs_type)) } +/// Checks if the TimeUnit associated with a Time32 or Time64 type is consistent, +/// as Time32 can only be used to Second and Millisecond accuracy, while Time64 +/// is exclusively used to Microsecond and Nanosecond accuracy +fn is_time_with_valid_unit(datatype: DataType) -> bool { + matches!( + datatype, + DataType::Time32(TimeUnit::Second) + | DataType::Time32(TimeUnit::Millisecond) + | DataType::Time64(TimeUnit::Microsecond) + | DataType::Time64(TimeUnit::Nanosecond) + ) +} + /// Coercion rules for Temporal columns: the type that both lhs and rhs can be /// casted to for the purpose of a date computation fn temporal_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { use arrow::datatypes::DataType::*; - use arrow::datatypes::TimeUnit; match (lhs_type, rhs_type) { (Date64, Date32) => Some(Date64), (Date32, Date64) => Some(Date64), @@ -529,6 +543,22 @@ fn temporal_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option Some(Date32), (Utf8, Date64) => Some(Date64), (Date64, Utf8) => Some(Date64), + (Utf8, Time32(unit)) => match is_time_with_valid_unit(Time32(unit.clone())) { + false => None, + true => Some(Time32(unit.clone())), + }, + (Time32(unit), Utf8) => match is_time_with_valid_unit(Time32(unit.clone())) { + false => None, + true => Some(Time32(unit.clone())), + }, + (Utf8, Time64(unit)) => match is_time_with_valid_unit(Time64(unit.clone())) { + false => None, + true => Some(Time64(unit.clone())), + }, + (Time64(unit), Utf8) => match is_time_with_valid_unit(Time64(unit.clone())) { + false => None, + true => Some(Time64(unit.clone())), + }, (Timestamp(lhs_unit, lhs_tz), Timestamp(rhs_unit, rhs_tz)) => { let tz = match (lhs_tz, rhs_tz) { // can't cast across timezones @@ -830,6 +860,30 @@ mod tests { Operator::Lt, DataType::Date64 ); + test_coercion_binary_rule!( + DataType::Utf8, + DataType::Time32(TimeUnit::Second), + Operator::Eq, + DataType::Time32(TimeUnit::Second) + ); + test_coercion_binary_rule!( + DataType::Utf8, + DataType::Time32(TimeUnit::Millisecond), + Operator::Eq, + DataType::Time32(TimeUnit::Millisecond) + ); + test_coercion_binary_rule!( + DataType::Utf8, + DataType::Time64(TimeUnit::Microsecond), + Operator::Eq, + DataType::Time64(TimeUnit::Microsecond) + ); + test_coercion_binary_rule!( + DataType::Utf8, + DataType::Time64(TimeUnit::Nanosecond), + Operator::Eq, + DataType::Time64(TimeUnit::Nanosecond) + ); test_coercion_binary_rule!( DataType::Utf8, DataType::Utf8, diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index c415e7db51644..2d6961bfcb880 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -28,6 +28,7 @@ use arrow::{ array::{ ArrayRef, Date32Array, Date64Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeStringArray, StringArray, + Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, @@ -251,8 +252,32 @@ macro_rules! min_max_batch { ), DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP), DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP), + DataType::Time32(TimeUnit::Second) => { + typed_min_max_batch!($VALUES, Time32SecondArray, Time32Second, $OP) + } + DataType::Time32(TimeUnit::Millisecond) => { + typed_min_max_batch!( + $VALUES, + Time32MillisecondArray, + Time32Millisecond, + $OP + ) + } + DataType::Time64(TimeUnit::Microsecond) => { + typed_min_max_batch!( + $VALUES, + Time64MicrosecondArray, + Time64Microsecond, + $OP + ) + } DataType::Time64(TimeUnit::Nanosecond) => { - typed_min_max_batch!($VALUES, Time64NanosecondArray, Time64, $OP) + typed_min_max_batch!( + $VALUES, + Time64NanosecondArray, + Time64Nanosecond, + $OP + ) } other => { // This should have been handled before @@ -417,10 +442,28 @@ macro_rules! min_max { typed_min_max!(lhs, rhs, Date64, $OP) } ( - ScalarValue::Time64(lhs), - ScalarValue::Time64(rhs), + ScalarValue::Time32Second(lhs), + ScalarValue::Time32Second(rhs), + ) => { + typed_min_max!(lhs, rhs, Time32Second, $OP) + } + ( + ScalarValue::Time32Millisecond(lhs), + ScalarValue::Time32Millisecond(rhs), ) => { - typed_min_max!(lhs, rhs, Time64, $OP) + typed_min_max!(lhs, rhs, Time32Millisecond, $OP) + } + ( + ScalarValue::Time64Microsecond(lhs), + ScalarValue::Time64Microsecond(rhs), + ) => { + typed_min_max!(lhs, rhs, Time64Microsecond, $OP) + } + ( + ScalarValue::Time64Nanosecond(lhs), + ScalarValue::Time64Nanosecond(rhs), + ) => { + typed_min_max!(lhs, rhs, Time64Nanosecond, $OP) } e => { return Err(DataFusionError::Internal(format!( @@ -1073,24 +1116,90 @@ mod tests { } #[test] - fn min_time64() -> Result<()> { + fn min_time32second() -> Result<()> { + let a: ArrayRef = Arc::new(Time32SecondArray::from(vec![1, 2, 3, 4, 5])); + generic_test_op!( + a, + DataType::Time32(TimeUnit::Second), + Min, + ScalarValue::Time32Second(Some(1)) + ) + } + + #[test] + fn max_time32second() -> Result<()> { + let a: ArrayRef = Arc::new(Time32SecondArray::from(vec![1, 2, 3, 4, 5])); + generic_test_op!( + a, + DataType::Time32(TimeUnit::Second), + Max, + ScalarValue::Time32Second(Some(5)) + ) + } + + #[test] + fn min_time32millisecond() -> Result<()> { + let a: ArrayRef = Arc::new(Time32MillisecondArray::from(vec![1, 2, 3, 4, 5])); + generic_test_op!( + a, + DataType::Time32(TimeUnit::Millisecond), + Min, + ScalarValue::Time32Millisecond(Some(1)) + ) + } + + #[test] + fn max_time32millisecond() -> Result<()> { + let a: ArrayRef = Arc::new(Time32MillisecondArray::from(vec![1, 2, 3, 4, 5])); + generic_test_op!( + a, + DataType::Time32(TimeUnit::Millisecond), + Max, + ScalarValue::Time32Millisecond(Some(5)) + ) + } + + #[test] + fn min_time64microsecond() -> Result<()> { + let a: ArrayRef = Arc::new(Time64MicrosecondArray::from(vec![1, 2, 3, 4, 5])); + generic_test_op!( + a, + DataType::Time64(TimeUnit::Microsecond), + Min, + ScalarValue::Time64Microsecond(Some(1)) + ) + } + + #[test] + fn max_time64microsecond() -> Result<()> { + let a: ArrayRef = Arc::new(Time64MicrosecondArray::from(vec![1, 2, 3, 4, 5])); + generic_test_op!( + a, + DataType::Time64(TimeUnit::Microsecond), + Max, + ScalarValue::Time64Microsecond(Some(5)) + ) + } + + #[test] + fn min_time64nanosecond() -> Result<()> { let a: ArrayRef = Arc::new(Time64NanosecondArray::from(vec![1, 2, 3, 4, 5])); generic_test_op!( a, DataType::Time64(TimeUnit::Nanosecond), - Max, - ScalarValue::Time64(Some(5)) + Min, + ScalarValue::Time64Nanosecond(Some(1)) ) } #[test] - fn max_time64() -> Result<()> { + fn max_time64nanosecond() -> Result<()> { let a: ArrayRef = Arc::new(Time64NanosecondArray::from(vec![1, 2, 3, 4, 5])); generic_test_op!( a, DataType::Time64(TimeUnit::Nanosecond), Max, - ScalarValue::Time64(Some(5)) + ScalarValue::Time64Nanosecond(Some(5)) ) } } diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index 0a76edde5912d..02d9b9e826cf2 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -213,7 +213,7 @@ pub fn make_current_time( now_ts: DateTime, ) -> impl Fn(&[ColumnarValue]) -> Result { let nano = Some(now_ts.timestamp_nanos() % 86400000000000); - move |_arg| Ok(ColumnarValue::Scalar(ScalarValue::Time64(nano))) + move |_arg| Ok(ColumnarValue::Scalar(ScalarValue::Time64Nanosecond(nano))) } fn quarter_month(date: &NaiveDateTime) -> u32 { diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 2d5f0854aec03..dc8ee635f8cba 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -458,6 +458,18 @@ macro_rules! binary_array_op { DataType::Date64 => { compute_op!($LEFT, $RIGHT, $OP, Date64Array) } + DataType::Time32(TimeUnit::Second) => { + compute_op!($LEFT, $RIGHT, $OP, Time32SecondArray) + } + DataType::Time32(TimeUnit::Millisecond) => { + compute_op!($LEFT, $RIGHT, $OP, Time32MillisecondArray) + } + DataType::Time64(TimeUnit::Microsecond) => { + compute_op!($LEFT, $RIGHT, $OP, Time64MicrosecondArray) + } + DataType::Time64(TimeUnit::Nanosecond) => { + compute_op!($LEFT, $RIGHT, $OP, Time64NanosecondArray) + } DataType::Boolean => compute_bool_op!($LEFT, $RIGHT, $OP, BooleanArray), other => Err(DataFusionError::Internal(format!( "Data type {:?} not supported for binary operation '{}' on dyn arrays", @@ -806,6 +818,10 @@ macro_rules! binary_array_op_dyn_scalar { ScalarValue::Float64(v) => compute_op_dyn_scalar!($LEFT, v, $OP, $OP_TYPE), ScalarValue::Date32(_) => compute_op_scalar!($LEFT, right, $OP, Date32Array), ScalarValue::Date64(_) => compute_op_scalar!($LEFT, right, $OP, Date64Array), + ScalarValue::Time32Second(_) => compute_op_scalar!($LEFT, right, $OP, Time32SecondArray), + ScalarValue::Time32Millisecond(_) => compute_op_scalar!($LEFT, right, $OP, Time32MillisecondArray), + ScalarValue::Time64Microsecond(_) => compute_op_scalar!($LEFT, right, $OP, Time64MicrosecondArray), + ScalarValue::Time64Nanosecond(_) => compute_op_scalar!($LEFT, right, $OP, Time64NanosecondArray), ScalarValue::TimestampSecond(..) => compute_op_scalar!($LEFT, right, $OP, TimestampSecondArray), ScalarValue::TimestampMillisecond(..) => compute_op_scalar!($LEFT, right, $OP, TimestampMillisecondArray), ScalarValue::TimestampMicrosecond(..) => compute_op_scalar!($LEFT, right, $OP, TimestampMicrosecondArray), diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 48716c558e598..b1ba0ec2d95e4 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -746,10 +746,24 @@ message ScalarListValue{ repeated ScalarValue values = 2; } +message ScalarTime32Value { + oneof value { + int32 time32_second_value = 1; + int32 time32_millisecond_value = 2; + }; +} + +message ScalarTime64Value { + oneof value { + int64 time64_microsecond_value = 1; + int64 time64_nanosecond_value = 2; + }; +} + message ScalarTimestampValue { oneof value { - int64 time_microsecond_value = 1; - int64 time_nanosecond_value = 2; + int64 time_microsecond_value = 1; + int64 time_nanosecond_value = 2; int64 time_second_value = 3; int64 time_millisecond_value = 4; }; @@ -804,6 +818,7 @@ message ScalarValue{ double float64_value = 13; // Literal Date32 value always has a unit of day int32 date_32_value = 14; + ScalarTime32Value time32_value = 15; ScalarListValue list_value = 17; //WAS: ScalarType null_list_value = 18; @@ -815,7 +830,7 @@ message ScalarValue{ ScalarDictionaryValue dictionary_value = 27; bytes binary_value = 28; bytes large_binary_value = 29; - int64 time64_value = 30; + ScalarTime64Value time64_value = 30; IntervalMonthDayNanoValue interval_month_day_nano = 31; StructValue struct_value = 32; ScalarFixedSizeBinary fixed_size_binary_value = 34; diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index 8f6377739470b..1de84ad94aa77 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -583,7 +583,30 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue { ) } Value::Date64Value(v) => Self::Date64(Some(*v)), - Value::Time64Value(v) => Self::Time64(Some(*v)), + Value::Time32Value(v) => { + let time_value = + v.value.as_ref().ok_or_else(|| Error::required("value"))?; + match time_value { + protobuf::scalar_time32_value::Value::Time32SecondValue(t) => { + Self::Time32Second(Some(*t)) + } + protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => { + Self::Time32Millisecond(Some(*t)) + } + } + } + Value::Time64Value(v) => { + let time_value = + v.value.as_ref().ok_or_else(|| Error::required("value"))?; + match time_value { + protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => { + Self::Time64Microsecond(Some(*t)) + } + protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => { + Self::Time64Nanosecond(Some(*t)) + } + } + } Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)), Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(*v)), Value::TimestampValue(v) => { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 6f9636f6b7ce2..ee0142011b9a7 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -10085,6 +10085,224 @@ impl<'de> serde::Deserialize<'de> for ScalarListValue { deserializer.deserialize_struct("datafusion.ScalarListValue", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for ScalarTime32Value { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.value.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.ScalarTime32Value", len)?; + if let Some(v) = self.value.as_ref() { + match v { + scalar_time32_value::Value::Time32SecondValue(v) => { + struct_ser.serialize_field("time32SecondValue", v)?; + } + scalar_time32_value::Value::Time32MillisecondValue(v) => { + struct_ser.serialize_field("time32MillisecondValue", v)?; + } + } + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ScalarTime32Value { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "time32_second_value", + "time32SecondValue", + "time32_millisecond_value", + "time32MillisecondValue", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Time32SecondValue, + Time32MillisecondValue, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "time32SecondValue" | "time32_second_value" => Ok(GeneratedField::Time32SecondValue), + "time32MillisecondValue" | "time32_millisecond_value" => Ok(GeneratedField::Time32MillisecondValue), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ScalarTime32Value; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.ScalarTime32Value") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut value__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::Time32SecondValue => { + if value__.is_some() { + return Err(serde::de::Error::duplicate_field("time32SecondValue")); + } + value__ = map.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| scalar_time32_value::Value::Time32SecondValue(x.0)); + } + GeneratedField::Time32MillisecondValue => { + if value__.is_some() { + return Err(serde::de::Error::duplicate_field("time32MillisecondValue")); + } + value__ = map.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| scalar_time32_value::Value::Time32MillisecondValue(x.0)); + } + } + } + Ok(ScalarTime32Value { + value: value__, + }) + } + } + deserializer.deserialize_struct("datafusion.ScalarTime32Value", FIELDS, GeneratedVisitor) + } +} +impl serde::Serialize for ScalarTime64Value { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.value.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.ScalarTime64Value", len)?; + if let Some(v) = self.value.as_ref() { + match v { + scalar_time64_value::Value::Time64MicrosecondValue(v) => { + struct_ser.serialize_field("time64MicrosecondValue", ToString::to_string(&v).as_str())?; + } + scalar_time64_value::Value::Time64NanosecondValue(v) => { + struct_ser.serialize_field("time64NanosecondValue", ToString::to_string(&v).as_str())?; + } + } + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ScalarTime64Value { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "time64_microsecond_value", + "time64MicrosecondValue", + "time64_nanosecond_value", + "time64NanosecondValue", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Time64MicrosecondValue, + Time64NanosecondValue, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "time64MicrosecondValue" | "time64_microsecond_value" => Ok(GeneratedField::Time64MicrosecondValue), + "time64NanosecondValue" | "time64_nanosecond_value" => Ok(GeneratedField::Time64NanosecondValue), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ScalarTime64Value; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.ScalarTime64Value") + } + + fn visit_map(self, mut map: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut value__ = None; + while let Some(k) = map.next_key()? { + match k { + GeneratedField::Time64MicrosecondValue => { + if value__.is_some() { + return Err(serde::de::Error::duplicate_field("time64MicrosecondValue")); + } + value__ = map.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| scalar_time64_value::Value::Time64MicrosecondValue(x.0)); + } + GeneratedField::Time64NanosecondValue => { + if value__.is_some() { + return Err(serde::de::Error::duplicate_field("time64NanosecondValue")); + } + value__ = map.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| scalar_time64_value::Value::Time64NanosecondValue(x.0)); + } + } + } + Ok(ScalarTime64Value { + value: value__, + }) + } + } + deserializer.deserialize_struct("datafusion.ScalarTime64Value", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for ScalarTimestampValue { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -10405,6 +10623,9 @@ impl serde::Serialize for ScalarValue { scalar_value::Value::Date32Value(v) => { struct_ser.serialize_field("date32Value", v)?; } + scalar_value::Value::Time32Value(v) => { + struct_ser.serialize_field("time32Value", v)?; + } scalar_value::Value::ListValue(v) => { struct_ser.serialize_field("listValue", v)?; } @@ -10433,7 +10654,7 @@ impl serde::Serialize for ScalarValue { struct_ser.serialize_field("largeBinaryValue", pbjson::private::base64::encode(&v).as_str())?; } scalar_value::Value::Time64Value(v) => { - struct_ser.serialize_field("time64Value", ToString::to_string(&v).as_str())?; + struct_ser.serialize_field("time64Value", v)?; } scalar_value::Value::IntervalMonthDayNano(v) => { struct_ser.serialize_field("intervalMonthDayNano", v)?; @@ -10486,6 +10707,8 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { "float64Value", "date_32_value", "date32Value", + "time32_value", + "time32Value", "list_value", "listValue", "decimal128_value", @@ -10531,6 +10754,7 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { Float32Value, Float64Value, Date32Value, + Time32Value, ListValue, Decimal128Value, Date64Value, @@ -10580,6 +10804,7 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { "float32Value" | "float32_value" => Ok(GeneratedField::Float32Value), "float64Value" | "float64_value" => Ok(GeneratedField::Float64Value), "date32Value" | "date_32_value" => Ok(GeneratedField::Date32Value), + "time32Value" | "time32_value" => Ok(GeneratedField::Time32Value), "listValue" | "list_value" => Ok(GeneratedField::ListValue), "decimal128Value" | "decimal128_value" => Ok(GeneratedField::Decimal128Value), "date64Value" | "date_64_value" => Ok(GeneratedField::Date64Value), @@ -10706,6 +10931,13 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { } value__ = map.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| scalar_value::Value::Date32Value(x.0)); } + GeneratedField::Time32Value => { + if value__.is_some() { + return Err(serde::de::Error::duplicate_field("time32Value")); + } + value__ = map.next_value::<::std::option::Option<_>>()?.map(scalar_value::Value::Time32Value) +; + } GeneratedField::ListValue => { if value__.is_some() { return Err(serde::de::Error::duplicate_field("listValue")); @@ -10768,7 +11000,8 @@ impl<'de> serde::Deserialize<'de> for ScalarValue { if value__.is_some() { return Err(serde::de::Error::duplicate_field("time64Value")); } - value__ = map.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| scalar_value::Value::Time64Value(x.0)); + value__ = map.next_value::<::std::option::Option<_>>()?.map(scalar_value::Value::Time64Value) +; } GeneratedField::IntervalMonthDayNano => { if value__.is_some() { diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index f34cc5c2a1133..25297787a1660 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -851,6 +851,36 @@ pub struct ScalarListValue { pub values: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct ScalarTime32Value { + #[prost(oneof = "scalar_time32_value::Value", tags = "1, 2")] + pub value: ::core::option::Option, +} +/// Nested message and enum types in `ScalarTime32Value`. +pub mod scalar_time32_value { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Value { + #[prost(int32, tag = "1")] + Time32SecondValue(i32), + #[prost(int32, tag = "2")] + Time32MillisecondValue(i32), + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ScalarTime64Value { + #[prost(oneof = "scalar_time64_value::Value", tags = "1, 2")] + pub value: ::core::option::Option, +} +/// Nested message and enum types in `ScalarTime64Value`. +pub mod scalar_time64_value { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Value { + #[prost(int64, tag = "1")] + Time64MicrosecondValue(i64), + #[prost(int64, tag = "2")] + Time64NanosecondValue(i64), + } +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ScalarTimestampValue { #[prost(string, tag = "5")] pub timezone: ::prost::alloc::string::String, @@ -908,7 +938,7 @@ pub struct ScalarFixedSizeBinary { pub struct ScalarValue { #[prost( oneof = "scalar_value::Value", - tags = "33, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 17, 20, 21, 24, 25, 26, 27, 28, 29, 30, 31, 32, 34" + tags = "33, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 17, 20, 21, 24, 25, 26, 27, 28, 29, 30, 31, 32, 34" )] pub value: ::core::option::Option, } @@ -949,6 +979,8 @@ pub mod scalar_value { /// Literal Date32 value always has a unit of day #[prost(int32, tag = "14")] Date32Value(i32), + #[prost(message, tag = "15")] + Time32Value(super::ScalarTime32Value), /// WAS: ScalarType null_list_value = 18; #[prost(message, tag = "17")] ListValue(super::ScalarListValue), @@ -968,8 +1000,8 @@ pub mod scalar_value { BinaryValue(::prost::alloc::vec::Vec), #[prost(bytes, tag = "29")] LargeBinaryValue(::prost::alloc::vec::Vec), - #[prost(int64, tag = "30")] - Time64Value(i64), + #[prost(message, tag = "30")] + Time64Value(super::ScalarTime64Value), #[prost(message, tag = "31")] IntervalMonthDayNano(super::IntervalMonthDayNanoValue), #[prost(message, tag = "32")] diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 2166b1d8b8d2f..315fbd0dda0e6 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -606,9 +606,21 @@ mod roundtrip_tests { ScalarValue::Date32(Some(0)), ScalarValue::Date32(Some(i32::MAX)), ScalarValue::Date32(None), - ScalarValue::Time64(Some(0)), - ScalarValue::Time64(Some(i64::MAX)), - ScalarValue::Time64(None), + ScalarValue::Date64(Some(0)), + ScalarValue::Date64(Some(i64::MAX)), + ScalarValue::Date64(None), + ScalarValue::Time32Second(Some(0)), + ScalarValue::Time32Second(Some(i32::MAX)), + ScalarValue::Time32Second(None), + ScalarValue::Time32Millisecond(Some(0)), + ScalarValue::Time32Millisecond(Some(i32::MAX)), + ScalarValue::Time32Millisecond(None), + ScalarValue::Time64Microsecond(Some(0)), + ScalarValue::Time64Microsecond(Some(i64::MAX)), + ScalarValue::Time64Microsecond(None), + ScalarValue::Time64Nanosecond(Some(0)), + ScalarValue::Time64Nanosecond(Some(i64::MAX)), + ScalarValue::Time64Nanosecond(None), ScalarValue::TimestampNanosecond(Some(0), None), ScalarValue::TimestampNanosecond(Some(i64::MAX), None), ScalarValue::TimestampNanosecond(Some(0), Some("UTC".to_string())), diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index 95224bf8e9f2a..6e152cc0c3b85 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -1075,8 +1075,50 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { }) } - datafusion::scalar::ScalarValue::Time64(v) => { - create_proto_scalar(v, &data_type, |v| Value::Time64Value(*v)) + datafusion::scalar::ScalarValue::Time32Second(v) => { + create_proto_scalar(v, &data_type, |v| { + Value::Time32Value(protobuf::ScalarTime32Value { + value: Some( + protobuf::scalar_time32_value::Value::Time32SecondValue(*v), + ), + }) + }) + } + + datafusion::scalar::ScalarValue::Time32Millisecond(v) => { + create_proto_scalar(v, &data_type, |v| { + Value::Time32Value(protobuf::ScalarTime32Value { + value: Some( + protobuf::scalar_time32_value::Value::Time32MillisecondValue( + *v, + ), + ), + }) + }) + } + + datafusion::scalar::ScalarValue::Time64Microsecond(v) => { + create_proto_scalar(v, &data_type, |v| { + Value::Time64Value(protobuf::ScalarTime64Value { + value: Some( + protobuf::scalar_time64_value::Value::Time64MicrosecondValue( + *v, + ), + ), + }) + }) + } + + datafusion::scalar::ScalarValue::Time64Nanosecond(v) => { + create_proto_scalar(v, &data_type, |v| { + Value::Time64Value(protobuf::ScalarTime64Value { + value: Some( + protobuf::scalar_time64_value::Value::Time64NanosecondValue( + *v, + ), + ), + }) + }) } datafusion::scalar::ScalarValue::IntervalMonthDayNano(v) => {