Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ae0d3a3
Start of EXTRACT support for DataFusion
Dandandan Jan 29, 2021
b91632d
fmt
Dandandan Jan 29, 2021
304008b
Add temporal module
Dandandan Jan 29, 2021
3eb552b
Test, boilerplate
Dandandan Jan 29, 2021
8284f1c
Add todo!()
Dandandan Jan 29, 2021
dbbb647
Extract implementation
Dandandan Jan 29, 2021
a471f57
Add support for timestamps without time zone
Dandandan Jan 29, 2021
5de73f6
Small test changes
Dandandan Jan 29, 2021
7ce35ab
Clippy
Dandandan Jan 29, 2021
1e96898
Remove remaining todo!()
Dandandan Jan 29, 2021
bda3cab
Improve naming
Dandandan Jan 29, 2021
0cad90f
Undo whitespace changes
Dandandan Jan 29, 2021
0254e45
Merge changes
Dandandan Jan 30, 2021
ac62940
Comment fixes
Dandandan Jan 30, 2021
dc67999
Support all time units
Dandandan Jan 30, 2021
ae2489f
Merge remote-tracking branch 'upstream/master' into temporal_sql
Dandandan Feb 17, 2021
b4efb64
WIP date_part
Dandandan Feb 17, 2021
4c0dac6
WIP date_part 2
Dandandan Feb 17, 2021
6311917
WIP
Dandandan Feb 17, 2021
9b97b47
WIP
Dandandan Feb 17, 2021
a52e074
Test fix
Dandandan Feb 17, 2021
31f8d28
Merge remote-tracking branch 'upstream/master' into temporal_sql
Dandandan Feb 20, 2021
af8792f
Add support for more complex function types
Dandandan Feb 20, 2021
e538726
Support both year and hour based on argument
Dandandan Feb 20, 2021
ca771de
Fmt
Dandandan Feb 20, 2021
a23e4c9
Return scalar values
Dandandan Feb 20, 2021
aa149e7
Avoid copy
Dandandan Feb 20, 2021
85bb7b0
Merge remote-tracking branch 'upstream/master' into temporal_sql
Dandandan Feb 21, 2021
40e184b
Fix conflict
Dandandan Feb 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1144,9 +1144,9 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
let expr = create_name(expr, input_schema)?;
let list = list.iter().map(|expr| create_name(expr, input_schema));
if *negated {
Ok(format!("{:?} NOT IN ({:?})", expr, list))
Ok(format!("{} NOT IN ({:?})", expr, list))
} else {
Ok(format!("{:?} IN ({:?})", expr, list))
Ok(format!("{} IN ({:?})", expr, list))
}
}
other => Err(DataFusionError::NotImplemented(format!(
Expand Down
1 change: 0 additions & 1 deletion rust/datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ mod extension;
mod operators;
mod plan;
mod registry;

pub use builder::LogicalPlanBuilder;
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
pub use display::display_schema;
Expand Down
109 changes: 102 additions & 7 deletions rust/datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,30 @@
// under the License.

//! DateTime expressions

use std::sync::Arc;

use super::ColumnarValue;
use crate::{
error::{DataFusionError, Result},
scalar::{ScalarType, ScalarValue},
};
use arrow::temporal_conversions::timestamp_ns_to_datetime;
use arrow::{
array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait},
datatypes::{ArrowPrimitiveType, DataType, TimestampNanosecondType},
};
use arrow::{
array::{
Array, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait,
TimestampNanosecondArray,
Date32Array, Date64Array, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray,
},
datatypes::{ArrowPrimitiveType, DataType, TimestampNanosecondType},
compute::kernels::temporal,
datatypes::TimeUnit,
temporal_conversions::timestamp_ns_to_datetime,
};
use chrono::prelude::*;
use chrono::Duration;
use chrono::LocalResult;

use super::ColumnarValue;

#[inline]
/// Accepts a string in RFC3339 / ISO8601 standard format and some
/// variants and converts it to a nanosecond precision timestamp.
Expand Down Expand Up @@ -344,6 +347,98 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
})
}

macro_rules! extract_date_part {
($ARRAY: expr, $FN:expr) => {
match $ARRAY.data_type() {
DataType::Date32 => {
let array = $ARRAY.as_any().downcast_ref::<Date32Array>().unwrap();
Ok($FN(array)?)
}
DataType::Date64 => {
let array = $ARRAY.as_any().downcast_ref::<Date64Array>().unwrap();
Ok($FN(array)?)
}
DataType::Timestamp(time_unit, None) => match time_unit {
TimeUnit::Second => {
let array = $ARRAY
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap();
Ok($FN(array)?)
}
TimeUnit::Millisecond => {
let array = $ARRAY
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
Ok($FN(array)?)
}
TimeUnit::Microsecond => {
let array = $ARRAY
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
Ok($FN(array)?)
}
TimeUnit::Nanosecond => {
let array = $ARRAY
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
Ok($FN(array)?)
}
},
datatype => Err(DataFusionError::Internal(format!(
"Extract does not support datatype {:?}",
datatype
))),
}
};
}

/// DATE_PART SQL function
pub fn date_part(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return Err(DataFusionError::Execution(
"Expected two arguments in DATE_PART".to_string(),
));
}
let (date_part, array) = (&args[0], &args[1]);

let date_part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = date_part {
v
} else {
return Err(DataFusionError::Execution(
"First argument of `DATE_PART` must be non-null scalar Utf8".to_string(),
));
};

let is_scalar = matches!(array, ColumnarValue::Scalar(_));

let array = match array {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the longer term plan will be to handle the Scalar case more efficiently. This (converting to an array) is fine for now I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed. For now we can use this approach to avoid reimplementing hours/years etc, with a bit of overhead.
Maybe longer term would be nice to have something like Datum in Arrow in order to both gain some performance and avoid reimplementing things for the scalar case.

ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array(),
};

let arr = match date_part.to_lowercase().as_str() {
"hour" => extract_date_part!(array, temporal::hour),
"year" => extract_date_part!(array, temporal::year),
_ => Err(DataFusionError::Execution(format!(
"Date part '{}' not supported",
date_part
))),
}?;

Ok(if is_scalar {
ColumnarValue::Scalar(ScalarValue::try_from_array(
&(Arc::new(arr) as ArrayRef),
0,
)?)
} else {
ColumnarValue::Array(Arc::new(arr))
})
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
1 change: 0 additions & 1 deletion rust/datafusion/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ pub use negative::{negative, NegativeExpr};
pub use not::{not, NotExpr};
pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
pub use sum::{sum_return_type, Sum};

/// returns the name of the state
pub fn format_state_name(name: &str, state_name: &str) -> String {
format!("{}[{}]", name, state_name)
Expand Down
27 changes: 27 additions & 0 deletions rust/datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub enum Signature {
Exact(Vec<DataType>),
/// fixed number of arguments of arbitrary types
Any(usize),
/// One of a list of signatures
OneOf(Vec<Signature>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @seddonm1 I am not sure how this affects your string functions / other postgres function plans

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I missed this but all good. This is actually better :D

}

/// Scalar function
Expand Down Expand Up @@ -138,6 +140,8 @@ pub enum BuiltinScalarFunction {
NullIf,
/// Date truncate
DateTrunc,
/// Date part
DatePart,
/// MD5
MD5,
/// SHA224
Expand Down Expand Up @@ -192,6 +196,7 @@ impl FromStr for BuiltinScalarFunction {
"upper" => BuiltinScalarFunction::Upper,
"to_timestamp" => BuiltinScalarFunction::ToTimestamp,
"date_trunc" => BuiltinScalarFunction::DateTrunc,
"date_part" => BuiltinScalarFunction::DatePart,
"array" => BuiltinScalarFunction::Array,
"nullif" => BuiltinScalarFunction::NullIf,
"md5" => BuiltinScalarFunction::MD5,
Expand Down Expand Up @@ -294,6 +299,7 @@ pub fn return_type(
BuiltinScalarFunction::DateTrunc => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
BuiltinScalarFunction::DatePart => Ok(DataType::Int32),
BuiltinScalarFunction::Array => Ok(DataType::FixedSizeList(
Box::new(Field::new("item", arg_types[0].clone(), true)),
arg_types.len() as i32,
Expand Down Expand Up @@ -463,6 +469,7 @@ pub fn create_physical_expr(
_ => unreachable!(),
},
},
BuiltinScalarFunction::DatePart => datetime_expressions::date_part,
});
// coerce
let args = coerce(args, input_schema, &signature(fun))?;
Expand Down Expand Up @@ -507,6 +514,26 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature {
DataType::Utf8,
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
BuiltinScalarFunction::DatePart => Signature::OneOf(vec![
Signature::Exact(vec![DataType::Utf8, DataType::Date32]),
Signature::Exact(vec![DataType::Utf8, DataType::Date64]),
Signature::Exact(vec![
DataType::Utf8,
DataType::Timestamp(TimeUnit::Second, None),
]),
Signature::Exact(vec![
DataType::Utf8,
DataType::Timestamp(TimeUnit::Microsecond, None),
]),
Signature::Exact(vec![
DataType::Utf8,
DataType::Timestamp(TimeUnit::Millisecond, None),
]),
Signature::Exact(vec![
DataType::Utf8,
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
]),
BuiltinScalarFunction::Array => {
Signature::Variadic(array_expressions::SUPPORTED_ARRAY_TYPES.to_vec())
}
Expand Down
51 changes: 35 additions & 16 deletions rust/datafusion/src/physical_plan/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
//! i64. However, i64 -> i32 is never performed as there are i64
//! values which can not be represented by i32 values.

use std::sync::Arc;
use std::{sync::Arc, vec};

use arrow::datatypes::{DataType, Schema, TimeUnit};

Expand Down Expand Up @@ -68,6 +68,32 @@ pub fn data_types(
current_types: &[DataType],
signature: &Signature,
) -> Result<Vec<DataType>> {
let valid_types = get_valid_types(signature, current_types)?;

if valid_types
.iter()
.any(|data_type| data_type == current_types)
{
return Ok(current_types.to_vec());
}

for valid_types in valid_types {
if let Some(types) = maybe_data_types(&valid_types, &current_types) {
return Ok(types);
}
}

// none possible -> Error
Err(DataFusionError::Plan(format!(
"Coercion from {:?} to the signature {:?} failed.",
current_types, signature
)))
}

fn get_valid_types(
signature: &Signature,
current_types: &[DataType],
) -> Result<Vec<Vec<DataType>>> {
let valid_types = match signature {
Signature::Variadic(valid_types) => valid_types
.iter()
Expand Down Expand Up @@ -95,23 +121,16 @@ pub fn data_types(
}
vec![(0..*number).map(|i| current_types[i].clone()).collect()]
}
};

if valid_types.contains(&current_types.to_owned()) {
return Ok(current_types.to_vec());
}

for valid_types in valid_types {
if let Some(types) = maybe_data_types(&valid_types, &current_types) {
return Ok(types);
Signature::OneOf(types) => {
let mut r = vec![];
for s in types {
r.extend(get_valid_types(s, current_types)?);
}
r
}
}
};

// none possible -> Error
Err(DataFusionError::Plan(format!(
"Coercion from {:?} to the signature {:?} failed.",
current_types, signature
)))
Ok(valid_types)
}

/// Try to coerce current_types into valid_types.
Expand Down
7 changes: 7 additions & 0 deletions rust/datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
SQLExpr::Value(Value::Boolean(n)) => Ok(lit(*n)),

SQLExpr::Value(Value::Null) => Ok(Expr::Literal(ScalarValue::Utf8(None))),
SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction {
fun: functions::BuiltinScalarFunction::DatePart,
args: vec![
Expr::Literal(ScalarValue::Utf8(Some(format!("{}", field)))),
self.sql_expr_to_logical_expr(expr)?,
],
}),

SQLExpr::Value(Value::Interval {
value,
Expand Down
20 changes: 19 additions & 1 deletion rust/datafusion/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1717,7 +1717,7 @@ fn make_timestamp_nano_table() -> Result<Arc<MemTable>> {
}

#[tokio::test]
async fn to_timstamp() -> Result<()> {
async fn to_timestamp() -> Result<()> {
let mut ctx = ExecutionContext::new();
ctx.register_table("ts_data", make_timestamp_nano_table()?);

Expand Down Expand Up @@ -2134,6 +2134,24 @@ async fn crypto_expressions() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn extract_date_part() -> Result<()> {
let mut ctx = ExecutionContext::new();
let sql = "SELECT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

date_part('hour', CAST('2020-01-01' AS DATE)) AS hr1,
EXTRACT(HOUR FROM CAST('2020-01-01' AS DATE)) AS hr2,
EXTRACT(HOUR FROM to_timestamp('2020-09-08T12:00:00+00:00')) AS hr3,
date_part('YEAR', CAST('2000-01-01' AS DATE)) AS year1,
EXTRACT(year FROM to_timestamp('2020-09-08T12:00:00+00:00')) AS year2
";

let actual = execute(&mut ctx, sql).await;

let expected = vec![vec!["0", "0", "12", "2000", "2020"]];
assert_eq!(expected, actual);
Ok(())
}

#[tokio::test]
async fn in_list_array() -> Result<()> {
let mut ctx = ExecutionContext::new();
Expand Down