diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index cc2ff2f24790f..43f214607f9fc 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -16,6 +16,7 @@ // under the License. use std::any::Any; +use std::fmt; use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -53,6 +54,12 @@ pub type PhysicalExprRef = Arc; /// * [`SessionContext::create_physical_expr`]: A high level API /// * [`create_physical_expr`]: A low level API /// +/// # Formatting `PhysicalExpr` as strings +/// There are three ways to format `PhysicalExpr` as a string: +/// * [`Debug`]: Standard Rust debugging format (e.g. `Constant { value: ... }`) +/// * [`Display`]: Detailed SQL-like format that shows expression structure (e.g. (`Utf8 ("foobar")`). This is often used for debugging and tests +/// * [`Self::fmt_sql`]: SQL-like human readable format (e.g. ('foobar')`), See also [`sql_fmt`] +/// /// [`SessionContext::create_physical_expr`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.create_physical_expr /// [`PhysicalPlanner`]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html /// [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html @@ -266,6 +273,16 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { fn get_properties(&self, _children: &[ExprProperties]) -> Result { Ok(ExprProperties::new_unknown()) } + + /// Format this `PhysicalExpr` in nice human readable "SQL" format + /// + /// Specifically, this format is designed to be readable by humans, at the + /// expense of details. Use `Display` or `Debug` for more detailed + /// representation. + /// + /// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL. + /// + fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result; } /// [`PhysicalExpr`] can't be constrained by [`Eq`] directly because it must remain object @@ -363,7 +380,7 @@ where I: Iterator + Clone, I::Item: Display, { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { let mut iter = self.0.clone(); write!(f, "[")?; if let Some(expr) = iter.next() { @@ -379,3 +396,53 @@ where DisplayWrapper(exprs.into_iter()) } + +/// Prints a [`PhysicalExpr`] in a SQL-like format +/// +/// # Example +/// ``` +/// # // The boiler plate needed to create a `PhysicalExpr` for the example +/// # use std::any::Any; +/// # use std::fmt::Formatter; +/// # use std::sync::Arc; +/// # use arrow::array::RecordBatch; +/// # use arrow::datatypes::{DataType, Schema}; +/// # use datafusion_common::Result; +/// # use datafusion_expr_common::columnar_value::ColumnarValue; +/// # use datafusion_physical_expr_common::physical_expr::{fmt_sql, DynEq, PhysicalExpr}; +/// # #[derive(Debug, Hash, PartialOrd, PartialEq)] +/// # struct MyExpr {}; +/// # impl PhysicalExpr for MyExpr {fn as_any(&self) -> &dyn Any { unimplemented!() } +/// # fn data_type(&self, input_schema: &Schema) -> Result { unimplemented!() } +/// # fn nullable(&self, input_schema: &Schema) -> Result { unimplemented!() } +/// # fn evaluate(&self, batch: &RecordBatch) -> Result { unimplemented!() } +/// # fn children(&self) -> Vec<&Arc>{ unimplemented!() } +/// # fn with_new_children(self: Arc, children: Vec>) -> Result> { unimplemented!() } +/// # fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "CASE a > b THEN 1 ELSE 0 END") } +/// # } +/// # impl std::fmt::Display for MyExpr {fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { unimplemented!() } } +/// # impl DynEq for MyExpr {fn dyn_eq(&self, other: &dyn Any) -> bool { unimplemented!() } } +/// # fn make_physical_expr() -> Arc { Arc::new(MyExpr{}) } +/// let expr: Arc = make_physical_expr(); +/// // wrap the expression in `sql_fmt` which can be used with +/// // `format!`, `to_string()`, etc +/// let expr_as_sql = fmt_sql(expr.as_ref()); +/// assert_eq!( +/// "The SQL: CASE a > b THEN 1 ELSE 0 END", +/// format!("The SQL: {expr_as_sql}") +/// ); +/// ``` +pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ { + struct Wrapper<'a> { + expr: &'a dyn PhysicalExpr, + } + + impl Display for Wrapper<'_> { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + self.expr.fmt_sql(f)?; + Ok(()) + } + } + + Wrapper { expr } +} diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 38b820edc544e..fe5c2ecfdf621 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -37,7 +37,7 @@ use itertools::Itertools; /// Example: /// ``` /// # use std::any::Any; -/// # use std::fmt::Display; +/// # use std::fmt::{Display, Formatter}; /// # use std::hash::Hasher; /// # use std::sync::Arc; /// # use arrow::array::RecordBatch; @@ -58,6 +58,7 @@ use itertools::Itertools; /// # fn evaluate(&self, batch: &RecordBatch) -> Result {todo!() } /// # fn children(&self) -> Vec<&Arc> {todo!()} /// # fn with_new_children(self: Arc, children: Vec>) -> Result> {todo!()} +/// # fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { todo!() } /// # } /// # impl Display for MyPhysicalExpr { /// # fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "a") } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 1f16c5471ed77..872773b06fa6f 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -571,6 +571,32 @@ impl PhysicalExpr for BinaryExpr { _ => Ok(ExprProperties::new_unknown()), } } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn write_child( + f: &mut std::fmt::Formatter, + expr: &dyn PhysicalExpr, + precedence: u8, + ) -> std::fmt::Result { + if let Some(child) = expr.as_any().downcast_ref::() { + let p = child.op.precedence(); + if p == 0 || p < precedence { + write!(f, "(")?; + child.fmt_sql(f)?; + write!(f, ")") + } else { + child.fmt_sql(f) + } + } else { + expr.fmt_sql(f) + } + } + + let precedence = self.op.precedence(); + write_child(f, self.left.as_ref(), precedence)?; + write!(f, " {} ", self.op)?; + write_child(f, self.right.as_ref(), precedence) + } } /// Casts dictionary array to result type for binary numerical operators. Such operators @@ -770,6 +796,7 @@ mod tests { use crate::expressions::{col, lit, try_cast, Column, Literal}; use datafusion_common::plan_datafusion_err; + use datafusion_physical_expr_common::physical_expr::fmt_sql; /// Performs a binary operation, applying any type coercion necessary fn binary_op( @@ -4672,4 +4699,72 @@ mod tests { Ok(()) } + + #[test] + fn test_fmt_sql() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + + // Test basic binary expressions + let simple_expr = binary_expr( + col("a", &schema)?, + Operator::Plus, + col("b", &schema)?, + &schema, + )?; + let display_string = simple_expr.to_string(); + assert_eq!(display_string, "a@0 + b@1"); + let sql_string = fmt_sql(&simple_expr).to_string(); + assert_eq!(sql_string, "a + b"); + + // Test nested expressions with different operator precedence + let nested_expr = binary_expr( + Arc::new(binary_expr( + col("a", &schema)?, + Operator::Plus, + col("b", &schema)?, + &schema, + )?), + Operator::Multiply, + col("b", &schema)?, + &schema, + )?; + let display_string = nested_expr.to_string(); + assert_eq!(display_string, "(a@0 + b@1) * b@1"); + let sql_string = fmt_sql(&nested_expr).to_string(); + assert_eq!(sql_string, "(a + b) * b"); + + // Test nested expressions with same operator precedence + let nested_same_prec = binary_expr( + Arc::new(binary_expr( + col("a", &schema)?, + Operator::Plus, + col("b", &schema)?, + &schema, + )?), + Operator::Plus, + col("b", &schema)?, + &schema, + )?; + let display_string = nested_same_prec.to_string(); + assert_eq!(display_string, "a@0 + b@1 + b@1"); + let sql_string = fmt_sql(&nested_same_prec).to_string(); + assert_eq!(sql_string, "a + b + b"); + + // Test with literals + let lit_expr = binary_expr( + col("a", &schema)?, + Operator::Eq, + lit(ScalarValue::Int32(Some(42))), + &schema, + )?; + let display_string = lit_expr.to_string(); + assert_eq!(display_string, "a@0 = 42"); + let sql_string = fmt_sql(&lit_expr).to_string(); + assert_eq!(sql_string, "a = 42"); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index 78606f05ae817..67fab3912c6a4 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -559,6 +559,29 @@ impl PhysicalExpr for CaseExpr { )?)) } } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CASE ")?; + if let Some(e) = &self.expr { + e.fmt_sql(f)?; + write!(f, " ")?; + } + + for (w, t) in &self.when_then_expr { + write!(f, "WHEN ")?; + w.fmt_sql(f)?; + write!(f, " THEN ")?; + t.fmt_sql(f)?; + write!(f, " ")?; + } + + if let Some(e) = &self.else_expr { + write!(f, "ELSE ")?; + e.fmt_sql(f)?; + write!(f, " ")?; + } + write!(f, "END") + } } /// Create a CASE expression @@ -583,6 +606,7 @@ mod tests { use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_expr::type_coercion::binary::comparison_coercion; use datafusion_expr::Operator; + use datafusion_physical_expr_common::physical_expr::fmt_sql; #[test] fn case_with_expr() -> Result<()> { @@ -1378,4 +1402,35 @@ mod tests { comparison_coercion(&left_type, right_type) }) } + + #[test] + fn test_fmt_sql() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]); + + // CASE WHEN a = 'foo' THEN 123.3 ELSE 999 END + let when = binary(col("a", &schema)?, Operator::Eq, lit("foo"), &schema)?; + let then = lit(123.3f64); + let else_value = lit(999i32); + + let expr = generate_case_when_with_type_coercion( + None, + vec![(when, then)], + Some(else_value), + &schema, + )?; + + let display_string = expr.to_string(); + assert_eq!( + display_string, + "CASE WHEN a@0 = foo THEN 123.3 ELSE TRY_CAST(999 AS Float64) END" + ); + + let sql_string = fmt_sql(expr.as_ref()).to_string(); + assert_eq!( + sql_string, + "CASE WHEN a = foo THEN 123.3 ELSE TRY_CAST(999 AS Float64) END" + ); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 8a093e0ae92ea..a6766687a881a 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -194,6 +194,14 @@ impl PhysicalExpr for CastExpr { Ok(ExprProperties::new_unknown().with_range(unbounded)) } } + + fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "CAST(")?; + self.expr.fmt_sql(f)?; + write!(f, " AS {:?}", self.cast_type)?; + + write!(f, ")") + } } /// Return a PhysicalExpression representing `expr` casted to @@ -243,6 +251,7 @@ mod tests { datatypes::*, }; use datafusion_common::assert_contains; + use datafusion_physical_expr_common::physical_expr::fmt_sql; // runs an end-to-end test of physical type cast // 1. construct a record batch with a column "a" of type A @@ -766,4 +775,26 @@ mod tests { expression.evaluate(&batch)?; Ok(()) } + + #[test] + fn test_fmt_sql() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", Int32, true)]); + + // Test numeric casting + let expr = cast(col("a", &schema)?, &schema, Int64)?; + let display_string = expr.to_string(); + assert_eq!(display_string, "CAST(a@0 AS Int64)"); + let sql_string = fmt_sql(expr.as_ref()).to_string(); + assert_eq!(sql_string, "CAST(a AS Int64)"); + + // Test string casting + let schema = Schema::new(vec![Field::new("b", Utf8, true)]); + let expr = cast(col("b", &schema)?, &schema, Int32)?; + let display_string = expr.to_string(); + assert_eq!(display_string, "CAST(b@0 AS Int32)"); + let sql_string = fmt_sql(expr.as_ref()).to_string(); + assert_eq!(sql_string, "CAST(b AS Int32)"); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/column.rs b/datafusion/physical-expr/src/expressions/column.rs index 0ec985887c3f8..ab5b359847535 100644 --- a/datafusion/physical-expr/src/expressions/column.rs +++ b/datafusion/physical-expr/src/expressions/column.rs @@ -137,6 +137,10 @@ impl PhysicalExpr for Column { ) -> Result> { Ok(self) } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.name) + } } impl Column { diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index dfe9a905dfeaa..469f7bbee3173 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -398,6 +398,22 @@ impl PhysicalExpr for InListExpr { self.static_filter.clone(), ))) } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.expr.fmt_sql(f)?; + if self.negated { + write!(f, " NOT")?; + } + + write!(f, " IN (")?; + for (i, expr) in self.list.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + expr.fmt_sql(f)?; + } + write!(f, ")") + } } impl PartialEq for InListExpr { @@ -453,6 +469,7 @@ mod tests { use crate::expressions::{col, lit, try_cast}; use datafusion_common::plan_err; use datafusion_expr::type_coercion::binary::comparison_coercion; + use datafusion_physical_expr_common::physical_expr::fmt_sql; type InListCastResult = (Arc, Vec>); @@ -1422,4 +1439,44 @@ mod tests { Ok(()) } + + #[test] + fn test_fmt_sql() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]); + let col_a = col("a", &schema)?; + + // Test: a IN ('a', 'b') + let list = vec![lit("a"), lit("b")]; + let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?; + let sql_string = fmt_sql(expr.as_ref()).to_string(); + let display_string = expr.to_string(); + assert_eq!(sql_string, "a IN (a, b)"); + assert_eq!(display_string, "Use a@0 IN (SET) ([Literal { value: Utf8(\"a\") }, Literal { value: Utf8(\"b\") }])"); + + // Test: a NOT IN ('a', 'b') + let list = vec![lit("a"), lit("b")]; + let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?; + let sql_string = fmt_sql(expr.as_ref()).to_string(); + let display_string = expr.to_string(); + assert_eq!(sql_string, "a NOT IN (a, b)"); + assert_eq!(display_string, "a@0 NOT IN (SET) ([Literal { value: Utf8(\"a\") }, Literal { value: Utf8(\"b\") }])"); + + // Test: a IN ('a', 'b', NULL) + let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))]; + let expr = in_list(Arc::clone(&col_a), list, &false, &schema)?; + let sql_string = fmt_sql(expr.as_ref()).to_string(); + let display_string = expr.to_string(); + assert_eq!(sql_string, "a IN (a, b, NULL)"); + assert_eq!(display_string, "Use a@0 IN (SET) ([Literal { value: Utf8(\"a\") }, Literal { value: Utf8(\"b\") }, Literal { value: Utf8(NULL) }])"); + + // Test: a NOT IN ('a', 'b', NULL) + let list = vec![lit("a"), lit("b"), lit(ScalarValue::Utf8(None))]; + let expr = in_list(Arc::clone(&col_a), list, &true, &schema)?; + let sql_string = fmt_sql(expr.as_ref()).to_string(); + let display_string = expr.to_string(); + assert_eq!(sql_string, "a NOT IN (a, b, NULL)"); + assert_eq!(display_string, "a@0 NOT IN (SET) ([Literal { value: Utf8(\"a\") }, Literal { value: Utf8(\"b\") }, Literal { value: Utf8(NULL) }])"); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index 47dc53d125550..0619e72488581 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -104,6 +104,11 @@ impl PhysicalExpr for IsNotNullExpr { ) -> Result> { Ok(Arc::new(IsNotNullExpr::new(Arc::clone(&children[0])))) } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.arg.fmt_sql(f)?; + write!(f, " IS NOT NULL") + } } /// Create an IS NOT NULL expression @@ -121,6 +126,7 @@ mod tests { use arrow::buffer::ScalarBuffer; use arrow::datatypes::*; use datafusion_common::cast::as_boolean_array; + use datafusion_physical_expr_common::physical_expr::fmt_sql; #[test] fn is_not_null_op() -> Result<()> { @@ -187,4 +193,29 @@ mod tests { assert_eq!(expected, actual); } + + #[test] + fn test_fmt_sql() -> Result<()> { + let union_fields: UnionFields = [ + (0, Arc::new(Field::new("A", DataType::Int32, true))), + (1, Arc::new(Field::new("B", DataType::Float64, true))), + ] + .into_iter() + .collect(); + + let field = Field::new( + "my_union", + DataType::Union(union_fields, UnionMode::Sparse), + true, + ); + + let schema = Schema::new(vec![field]); + let expr = is_not_null(col("my_union", &schema).unwrap()).unwrap(); + let display_string = expr.to_string(); + assert_eq!(display_string, "my_union@0 IS NOT NULL"); + let sql_string = fmt_sql(expr.as_ref()).to_string(); + assert_eq!(sql_string, "my_union IS NOT NULL"); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 5e883dff997aa..4c6081f35cad7 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -103,6 +103,11 @@ impl PhysicalExpr for IsNullExpr { ) -> Result> { Ok(Arc::new(IsNullExpr::new(Arc::clone(&children[0])))) } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.arg.fmt_sql(f)?; + write!(f, " IS NULL") + } } /// Create an IS NULL expression @@ -120,6 +125,7 @@ mod tests { use arrow::buffer::ScalarBuffer; use arrow::datatypes::*; use datafusion_common::cast::as_boolean_array; + use datafusion_physical_expr_common::physical_expr::fmt_sql; #[test] fn is_null_op() -> Result<()> { @@ -209,4 +215,18 @@ mod tests { let expected = &BooleanArray::from(vec![false, true, false, true, false, true]); assert_eq!(expected, &result); } + + #[test] + fn test_fmt_sql() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]); + + // expression: "a is null" + let expr = is_null(col("a", &schema)?).unwrap(); + let display_string = expr.to_string(); + assert_eq!(display_string, "a@0 IS NULL"); + let sql_string = fmt_sql(expr.as_ref()).to_string(); + assert_eq!(sql_string, "a IS NULL"); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs index b26927b77f1ff..ebf9882665ba0 100644 --- a/datafusion/physical-expr/src/expressions/like.rs +++ b/datafusion/physical-expr/src/expressions/like.rs @@ -145,6 +145,12 @@ impl PhysicalExpr for LikeExpr { Arc::clone(&children[1]), ))) } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.expr.fmt_sql(f)?; + write!(f, " {} ", self.op_name())?; + self.pattern.fmt_sql(f) + } } /// used for optimize Dictionary like @@ -185,6 +191,7 @@ mod test { use arrow::array::*; use arrow::datatypes::Field; use datafusion_common::cast::as_boolean_array; + use datafusion_physical_expr_common::physical_expr::fmt_sql; macro_rules! test_like { ($A_VEC:expr, $B_VEC:expr, $VEC:expr, $NULLABLE: expr, $NEGATED:expr, $CASE_INSENSITIVE:expr,) => {{ @@ -256,4 +263,30 @@ mod test { Ok(()) } + + #[test] + fn test_fmt_sql() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ]); + + let expr = like( + false, + false, + col("a", &schema)?, + col("b", &schema)?, + &schema, + )?; + + // Display format + let display_string = expr.to_string(); + assert_eq!(display_string, "a@0 LIKE b@1"); + + // fmt_sql format + let sql_string = fmt_sql(expr.as_ref()).to_string(); + assert_eq!(sql_string, "a LIKE b"); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/literal.rs b/datafusion/physical-expr/src/expressions/literal.rs index 232f9769b056a..0d0c0ecc62c79 100644 --- a/datafusion/physical-expr/src/expressions/literal.rs +++ b/datafusion/physical-expr/src/expressions/literal.rs @@ -93,6 +93,10 @@ impl PhysicalExpr for Literal { preserves_lex_ordering: true, }) } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(self, f) + } } /// Create a literal expression @@ -110,6 +114,7 @@ mod tests { use arrow::array::Int32Array; use arrow::datatypes::*; use datafusion_common::cast::as_int32_array; + use datafusion_physical_expr_common::physical_expr::fmt_sql; #[test] fn literal_i32() -> Result<()> { @@ -136,4 +141,16 @@ mod tests { Ok(()) } + + #[test] + fn test_fmt_sql() -> Result<()> { + // create and evaluate a literal expression + let expr = lit(42i32); + let display_string = expr.to_string(); + assert_eq!(display_string, "42"); + let sql_string = fmt_sql(expr.as_ref()).to_string(); + assert_eq!(sql_string, "42"); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/negative.rs b/datafusion/physical-expr/src/expressions/negative.rs index 8795545274a2e..33a1bae14d420 100644 --- a/datafusion/physical-expr/src/expressions/negative.rs +++ b/datafusion/physical-expr/src/expressions/negative.rs @@ -167,6 +167,12 @@ impl PhysicalExpr for NegativeExpr { preserves_lex_ordering: false, }) } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "(- ")?; + self.arg.fmt_sql(f)?; + write!(f, ")") + } } /// Creates a unary expression NEGATIVE @@ -202,6 +208,7 @@ mod tests { use datafusion_common::cast::as_primitive_array; use datafusion_common::{DataFusionError, ScalarValue}; + use datafusion_physical_expr_common::physical_expr::fmt_sql; use paste::paste; macro_rules! test_array_negative_op { @@ -379,4 +386,15 @@ mod tests { matches!(expr, DataFusionError::Plan(_)); Ok(()) } + + #[test] + fn test_fmt_sql() -> Result<()> { + let expr = NegativeExpr::new(Arc::new(Column::new("a", 0))); + let display_string = expr.to_string(); + assert_eq!(display_string, "(- a@0)"); + let sql_string = fmt_sql(&expr).to_string(); + assert_eq!(sql_string, "(- a)"); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/no_op.rs b/datafusion/physical-expr/src/expressions/no_op.rs index c17b52f5cdfff..24d2f4d9e074d 100644 --- a/datafusion/physical-expr/src/expressions/no_op.rs +++ b/datafusion/physical-expr/src/expressions/no_op.rs @@ -77,4 +77,8 @@ impl PhysicalExpr for NoOp { ) -> Result> { Ok(self) } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(self, f) + } } diff --git a/datafusion/physical-expr/src/expressions/not.rs b/datafusion/physical-expr/src/expressions/not.rs index ddf7c739b692a..8a3348b43d20b 100644 --- a/datafusion/physical-expr/src/expressions/not.rs +++ b/datafusion/physical-expr/src/expressions/not.rs @@ -175,6 +175,11 @@ impl PhysicalExpr for NotExpr { _ => internal_err!("NotExpr can only operate on Boolean datatypes"), } } + + fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "NOT ")?; + self.arg.fmt_sql(f) + } } /// Creates a unary expression NOT @@ -190,6 +195,7 @@ mod tests { use crate::expressions::{col, Column}; use arrow::{array::BooleanArray, datatypes::*}; + use datafusion_physical_expr_common::physical_expr::fmt_sql; #[test] fn neg_op() -> Result<()> { @@ -322,6 +328,21 @@ mod tests { Ok(()) } + #[test] + fn test_fmt_sql() -> Result<()> { + let schema = schema(); + + let expr = not(col("a", &schema)?)?; + + let display_string = expr.to_string(); + assert_eq!(display_string, "NOT a@0"); + + let sql_string = fmt_sql(expr.as_ref()).to_string(); + assert_eq!(sql_string, "NOT a"); + + Ok(()) + } + fn schema() -> SchemaRef { static SCHEMA: LazyLock = LazyLock::new(|| { Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, true)])) diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr/src/expressions/try_cast.rs index 06f4e929992e5..e49815cd8b644 100644 --- a/datafusion/physical-expr/src/expressions/try_cast.rs +++ b/datafusion/physical-expr/src/expressions/try_cast.rs @@ -123,6 +123,12 @@ impl PhysicalExpr for TryCastExpr { self.cast_type.clone(), ))) } + + fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "TRY_CAST(")?; + self.expr.fmt_sql(f)?; + write!(f, " AS {:?})", self.cast_type) + } } /// Return a PhysicalExpression representing `expr` casted to @@ -158,6 +164,7 @@ mod tests { }, datatypes::*, }; + use datafusion_physical_expr_common::physical_expr::fmt_sql; // runs an end-to-end test of physical type cast // 1. construct a record batch with a column "a" of type A @@ -573,4 +580,26 @@ mod tests { .with_precision_and_scale(precision, scale) .unwrap() } + + #[test] + fn test_fmt_sql() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); + + // Test numeric casting + let expr = try_cast(col("a", &schema)?, &schema, DataType::Int64)?; + let display_string = expr.to_string(); + assert_eq!(display_string, "TRY_CAST(a@0 AS Int64)"); + let sql_string = fmt_sql(expr.as_ref()).to_string(); + assert_eq!(sql_string, "TRY_CAST(a AS Int64)"); + + // Test string casting + let schema = Schema::new(vec![Field::new("b", DataType::Utf8, true)]); + let expr = try_cast(col("b", &schema)?, &schema, DataType::Int32)?; + let display_string = expr.to_string(); + assert_eq!(display_string, "TRY_CAST(b@0 AS Int32)"); + let sql_string = fmt_sql(expr.as_ref()).to_string(); + assert_eq!(sql_string, "TRY_CAST(b AS Int32)"); + + Ok(()) + } } diff --git a/datafusion/physical-expr/src/expressions/unknown_column.rs b/datafusion/physical-expr/src/expressions/unknown_column.rs index a63caf7e13056..2face4eb6bdb6 100644 --- a/datafusion/physical-expr/src/expressions/unknown_column.rs +++ b/datafusion/physical-expr/src/expressions/unknown_column.rs @@ -86,6 +86,10 @@ impl PhysicalExpr for UnKnownColumn { ) -> Result> { Ok(self) } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(self, f) + } } impl Hash for UnKnownColumn { diff --git a/datafusion/physical-expr/src/scalar_function.rs b/datafusion/physical-expr/src/scalar_function.rs index cf8cc6e00c805..44bbcc4928c68 100644 --- a/datafusion/physical-expr/src/scalar_function.rs +++ b/datafusion/physical-expr/src/scalar_function.rs @@ -260,4 +260,15 @@ impl PhysicalExpr for ScalarFunctionExpr { preserves_lex_ordering, }) } + + fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}(", self.name)?; + for (i, expr) in self.args.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + expr.fmt_sql(f)?; + } + write!(f, ")") + } } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 3f901311a053c..1d3e23ea90974 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -48,6 +48,7 @@ use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; use itertools::Itertools; use log::trace; @@ -169,13 +170,14 @@ impl DisplayAs for ProjectionExec { } DisplayFormatType::TreeRender => { for (i, (e, alias)) in self.expr().iter().enumerate() { - let e = e.to_string(); - if &e == alias { - writeln!(f, "expr{i}={e}")?; + let expr_sql = fmt_sql(e.as_ref()); + if &e.to_string() == alias { + writeln!(f, "expr{i}={expr_sql}")?; } else { - writeln!(f, "{alias}={e}")?; + writeln!(f, "{alias}={expr_sql}")?; } } + Ok(()) } } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index b5bfef99a6f39..738ac9064d87d 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -16,7 +16,7 @@ // under the License. use std::any::Any; -use std::fmt::Display; +use std::fmt::{Display, Formatter}; use std::ops::Deref; use std::sync::Arc; use std::vec; @@ -860,6 +860,10 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { ) -> Result> { todo!() } + + fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(self, f) + } } #[derive(Debug)] diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 99190122f75ae..a9444da6c16d2 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -710,7 +710,7 @@ physical_plan 07)│ count(Int64(1)) ROWS │ 08)│ BETWEEN UNBOUNDED │ 09)│ PRECEDING AND UNBOUNDED │ -10)│ FOLLOWING@0 │ +10)│ FOLLOWING │ 11)└─────────────┬─────────────┘ 12)┌─────────────┴─────────────┐ 13)│ WindowAggExec │ @@ -743,9 +743,9 @@ physical_plan 05)│ sum(t1.v1) ORDER BY [t1.v1│ 06)│ ASC NULLS LAST] ROWS │ 07)│ BETWEEN 1 PRECEDING │ -08)│ AND CURRENT ROW@1 │ +08)│ AND CURRENT ROW │ 09)│ │ -10)│ v1: v1@0 │ +10)│ v1: v1 │ 11)└─────────────┬─────────────┘ 12)┌─────────────┴─────────────┐ 13)│ BoundedWindowAggExec │ @@ -766,7 +766,7 @@ physical_plan 28)┌─────────────┴─────────────┐ 29)│ ProjectionExec │ 30)│ -------------------- │ -31)│ v1: value@0 │ +31)│ v1: value │ 32)└─────────────┬─────────────┘ 33)┌─────────────┴─────────────┐ 34)│ LazyMemoryExec │ @@ -793,14 +793,14 @@ physical_plan 07)│ count(Int64(1)) ROWS │ 08)│ BETWEEN UNBOUNDED │ 09)│ PRECEDING AND UNBOUNDED │ -10)│ FOLLOWING@0 │ +10)│ FOLLOWING │ 11)│ │ 12)│ row_number() ROWS BETWEEN │ 13)│ UNBOUNDED PRECEDING AND │ 14)│ UNBOUNDED FOLLOWING: │ 15)│ row_number() ROWS BETWEEN │ 16)│ UNBOUNDED PRECEDING AND │ -17)│ UNBOUNDED FOLLOWING@1 │ +17)│ UNBOUNDED FOLLOWING │ 18)└─────────────┬─────────────┘ 19)┌─────────────┴─────────────┐ 20)│ WindowAggExec │ @@ -866,13 +866,13 @@ physical_plan 02)│ ProjectionExec │ 03)│ -------------------- │ 04)│ bigint_col: │ -05)│ bigint_col@1 │ +05)│ bigint_col │ 06)│ │ -07)│ int_col: int_col@0 │ +07)│ int_col: int_col │ 08)│ │ 09)│ sum_col: │ -10)│ CAST(int_col@0 AS Int64) +│ -11)│ bigint_col@1 │ +10)│ CAST(int_col AS Int64) + │ +11)│ bigint_col │ 12)└─────────────┬─────────────┘ 13)┌─────────────┴─────────────┐ 14)│ RepartitionExec │ @@ -909,7 +909,7 @@ physical_plan 10)│ .int_col DESC NULLS │ 11)│ FIRST] RANGE BETWEEN │ 12)│ UNBOUNDED PRECEDING AND │ -13)│ CURRENT ROW@1 │ +13)│ CURRENT ROW │ 14)│ │ 15)│ row_number() ORDER BY │ 16)│ [table1.int_col ASC │ @@ -922,7 +922,7 @@ physical_plan 23)│ NULLS LAST] RANGE │ 24)│ BETWEEN UNBOUNDED │ 25)│ PRECEDING AND CURRENT │ -26)│ ROW@2 │ +26)│ ROW │ 27)└─────────────┬─────────────┘ 28)┌─────────────┴─────────────┐ 29)│ BoundedWindowAggExec │ @@ -975,13 +975,13 @@ physical_plan 02)│ ProjectionExec │ 03)│ -------------------- │ 04)│ bigint_col: │ -05)│ bigint_col@1 │ +05)│ bigint_col │ 06)│ │ -07)│ int_col: int_col@0 │ +07)│ int_col: int_col │ 08)│ │ 09)│ sum_col: │ -10)│ CAST(int_col@0 AS Int64) +│ -11)│ bigint_col@1 │ +10)│ CAST(int_col AS Int64) + │ +11)│ bigint_col │ 12)└─────────────┬─────────────┘ 13)┌─────────────┴─────────────┐ 14)│ RepartitionExec │ @@ -1009,13 +1009,13 @@ physical_plan 02)│ ProjectionExec │ 03)│ -------------------- │ 04)│ bigint_col: │ -05)│ bigint_col@1 │ +05)│ bigint_col │ 06)│ │ -07)│ int_col: int_col@0 │ +07)│ int_col: int_col │ 08)│ │ 09)│ sum_col: │ -10)│ CAST(int_col@0 AS Int64) +│ -11)│ bigint_col@1 │ +10)│ CAST(int_col AS Int64) + │ +11)│ bigint_col │ 12)└─────────────┬─────────────┘ 13)┌─────────────┴─────────────┐ 14)│ DataSourceExec │ @@ -1034,12 +1034,12 @@ physical_plan 02)│ ProjectionExec │ 03)│ -------------------- │ 04)│ bigint_col: │ -05)│ bigint_col@0 │ +05)│ bigint_col │ 06)│ │ -07)│ int_col: int_col@1 │ +07)│ int_col: int_col │ 08)│ │ 09)│ sum_col: │ -10)│ int_col@1 + bigint_col@0 │ +10)│ int_col + bigint_col │ 11)└─────────────┬─────────────┘ 12)┌─────────────┴─────────────┐ 13)│ RepartitionExec │ @@ -1067,13 +1067,13 @@ physical_plan 02)│ ProjectionExec │ 03)│ -------------------- │ 04)│ bigint_col: │ -05)│ bigint_col@1 │ +05)│ bigint_col │ 06)│ │ -07)│ int_col: int_col@0 │ +07)│ int_col: int_col │ 08)│ │ 09)│ sum_col: │ -10)│ CAST(int_col@0 AS Int64) +│ -11)│ bigint_col@1 │ +10)│ CAST(int_col AS Int64) + │ +11)│ bigint_col │ 12)└─────────────┬─────────────┘ 13)┌─────────────┴─────────────┐ 14)│ RepartitionExec │ @@ -1177,17 +1177,17 @@ physical_plan 35)│ -------------------- ││ -------------------- │ 36)│ CAST(table1.string_col AS ││ output_partition_count: │ 37)│ Utf8View): ││ 1 │ -38)│ CAST(string_col@1 AS ││ │ +38)│ CAST(string_col AS ││ │ 39)│ Utf8View) ││ partitioning_scheme: │ 40)│ ││ RoundRobinBatch(4) │ 41)│ bigint_col: ││ │ -42)│ bigint_col@2 ││ │ +42)│ bigint_col ││ │ 43)│ ││ │ -44)│ date_col: date_col@3 ││ │ -45)│ int_col: int_col@0 ││ │ +44)│ date_col: date_col ││ │ +45)│ int_col: int_col ││ │ 46)│ ││ │ 47)│ string_col: ││ │ -48)│ string_col@1 ││ │ +48)│ string_col ││ │ 49)└─────────────┬─────────────┘└─────────────┬─────────────┘ 50)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 51)│ RepartitionExec ││ DataSourceExec │ @@ -1249,17 +1249,17 @@ physical_plan 37)│ -------------------- ││ -------------------- │ 38)│ CAST(table1.string_col AS ││ output_partition_count: │ 39)│ Utf8View): ││ 1 │ -40)│ CAST(string_col@1 AS ││ │ +40)│ CAST(string_col AS ││ │ 41)│ Utf8View) ││ partitioning_scheme: │ 42)│ ││ RoundRobinBatch(4) │ 43)│ bigint_col: ││ │ -44)│ bigint_col@2 ││ │ +44)│ bigint_col ││ │ 45)│ ││ │ -46)│ date_col: date_col@3 ││ │ -47)│ int_col: int_col@0 ││ │ +46)│ date_col: date_col ││ │ +47)│ int_col: int_col ││ │ 48)│ ││ │ 49)│ string_col: ││ │ -50)│ string_col@1 ││ │ +50)│ string_col ││ │ 51)└─────────────┬─────────────┘└─────────────┬─────────────┘ 52)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 53)│ RepartitionExec ││ DataSourceExec │ @@ -1429,7 +1429,7 @@ physical_plan 02)│ ProjectionExec │ 03)│ -------------------- │ 04)│ count(*): │ -05)│ count(Int64(1))@1 │ +05)│ count(Int64(1)) │ 06)└─────────────┬─────────────┘ 07)┌─────────────┴─────────────┐ 08)│ AggregateExec │ @@ -1792,7 +1792,7 @@ physical_plan 09)┌─────────────┴─────────────┐┌─────────────┴─────────────┐ 10)│ PlaceholderRowExec ││ ProjectionExec │ 11)│ ││ -------------------- │ -12)│ ││ id: id@0 + 1 │ +12)│ ││ id: id + 1 │ 13)└───────────────────────────┘└─────────────┬─────────────┘ 14)-----------------------------┌─────────────┴─────────────┐ 15)-----------------------------│ CoalesceBatchesExec │ @@ -1864,7 +1864,7 @@ physical_plan 02)│ ProjectionExec │ 03)│ -------------------- │ 04)│ count(*): │ -05)│ count(Int64(1))@0 │ +05)│ count(Int64(1)) │ 06)└─────────────┬─────────────┘ 07)┌─────────────┴─────────────┐ 08)│ AggregateExec │