Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ impl ConvertOpt {
.schema()
.iter()
.take(schema.fields.len() - 1)
.map(|(qualifier, field)| {
Expr::Column(Column::from((qualifier, field.as_ref())))
})
.map(Expr::from)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now pretty nice I think given @comphead 's suggestion ❤️

.collect();

csv = csv.select(selection)?;
Expand Down
11 changes: 10 additions & 1 deletion datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Column

use arrow_schema::Field;
use arrow_schema::{Field, FieldRef};

use crate::error::_schema_err;
use crate::utils::{parse_identifiers_normalized, quote_identifier};
Expand Down Expand Up @@ -63,6 +63,8 @@ impl Column {
}

/// Create Column from unqualified name.
///
/// Alias for `Column::new_unqualified`
pub fn from_name(name: impl Into<String>) -> Self {
Self {
relation: None,
Expand Down Expand Up @@ -346,6 +348,13 @@ impl From<(Option<&TableReference>, &Field)> for Column {
}
}

/// Create a column, use qualifier and field name
impl From<(Option<&TableReference>, &FieldRef)> for Column {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new API -- the rest of this PR updates the code to use it when possible

fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
Self::new(relation.cloned(), field.name())
}
}

impl FromStr for Column {
type Err = Infallible;

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1332,7 +1332,7 @@ impl DataFrame {
col_exists = true;
new_column.clone()
} else {
col(Column::from((qualifier, field.as_ref())))
col(Column::from((qualifier, field)))
}
})
.collect();
Expand Down Expand Up @@ -1402,9 +1402,9 @@ impl DataFrame {
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
col(Column::from((qualifier, field.as_ref()))).alias(new_name)
col(Column::from((qualifier, field))).alias(new_name)
} else {
col(Column::from((qualifier, field.as_ref())))
col(Column::from((qualifier, field)))
}
})
.collect::<Vec<_>>();
Expand Down
11 changes: 2 additions & 9 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1261,15 +1261,8 @@ impl DefaultPhysicalPlanner {

// Remove temporary projected columns
if left_projected || right_projected {
let final_join_result = join_schema
.iter()
.map(|(qualifier, field)| {
Expr::Column(datafusion_common::Column::from((
qualifier,
field.as_ref(),
)))
})
.collect::<Vec<_>>();
let final_join_result =
join_schema.iter().map(Expr::from).collect::<Vec<_>>();
let projection = LogicalPlan::Projection(Projection::try_new(
final_join_result,
Arc::new(new_join),
Expand Down
42 changes: 41 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
Signature,
};

use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, FieldRef};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{
internal_err, plan_err, Column, DFSchema, Result, ScalarValue, TableReference,
Expand Down Expand Up @@ -84,6 +84,29 @@ use sqlparser::ast::NullTreatment;
/// assert_eq!(binary_expr.op, Operator::Eq);
/// }
/// ```
///
/// ## Return a list of [`Expr::Column`] from a schema's columns
/// ```
/// # use arrow::datatypes::{DataType, Field, Schema};
/// # use datafusion_common::{DFSchema, Column};
/// # use datafusion_expr::Expr;
///
/// let arrow_schema = Schema::new(vec![
/// Field::new("c1", DataType::Int32, false),
/// Field::new("c2", DataType::Float64, false),
/// ]);
/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
///
/// // Form a list of expressions for each item in the schema
/// let exprs: Vec<_> = df_schema.iter()
/// .map(Expr::from)
/// .collect();
///
/// assert_eq!(exprs, vec![
/// Expr::from(Column::from_qualified_name("t1.c1")),
/// Expr::from(Column::from_qualified_name("t1.c2")),
/// ]);
/// ```
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub enum Expr {
/// An expression with a specific name.
Expand Down Expand Up @@ -190,6 +213,23 @@ impl Default for Expr {
}
}

/// Create an [`Expr`] from a [`Column`]
impl From<Column> for Expr {
fn from(value: Column) -> Self {
Expr::Column(value)
}
}

/// Create an [`Expr`] from an optional qualifier and a [`FieldRef`]. This is
/// useful for creating [`Expr`] from a [`DFSchema`].
///
/// See example on [`Expr`]
impl<'a> From<(Option<&'a TableReference>, &'a FieldRef)> for Expr {
fn from(value: (Option<&'a TableReference>, &'a FieldRef)) -> Self {
Expr::from(Column::from(value))
}
}

#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Unnest {
pub expr: Box<Expr>,
Expand Down
8 changes: 1 addition & 7 deletions datafusion/expr/src/expr_rewriter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,7 @@ pub fn coerce_plan_expr_for_schema(
Ok(LogicalPlan::Projection(projection))
}
_ => {
let exprs: Vec<Expr> = plan
.schema()
.iter()
.map(|(qualifier, field)| {
Expr::Column(Column::from((qualifier, field.as_ref())))
})
.collect();
let exprs: Vec<Expr> = plan.schema().iter().map(Expr::from).collect();

let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(), schema)?;
let add_project = new_exprs.iter().any(|expr| expr.try_into_col().is_err());
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1577,7 +1577,7 @@ pub fn unnest_with_options(
return Ok(input);
}
};
qualified_columns.push(Column::from((unnest_qualifier, unnested_field.as_ref())));
qualified_columns.push(Column::from((unnest_qualifier, &unnested_field)));
unnested_fields.insert(index, unnested_field);
}

Expand Down
9 changes: 2 additions & 7 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,7 @@ fn get_exprs_except_skipped(
columns_to_skip: HashSet<Column>,
) -> Vec<Expr> {
if columns_to_skip.is_empty() {
schema
.iter()
.map(|(qualifier, field)| {
Expr::Column(Column::from((qualifier, field.as_ref())))
})
.collect::<Vec<Expr>>()
schema.iter().map(Expr::from).collect::<Vec<Expr>>()
} else {
schema
.columns()
Expand Down Expand Up @@ -855,7 +850,7 @@ pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan) -> Result<Expr> {
match expr {
Expr::Column(col) => {
let (qualifier, field) = plan.schema().qualified_field_from_column(col)?;
Ok(Expr::Column(Column::from((qualifier, field))))
Ok(Expr::from(Column::from((qualifier, field))))
}
_ => Ok(Expr::Column(Column::from_name(expr.display_name()?))),
}
Expand Down
7 changes: 2 additions & 5 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ fn build_common_expr_project_plan(

for (qualifier, field) in input.schema().iter() {
if fields_set.insert(qualified_name(qualifier, field.name())) {
project_exprs.push(Expr::Column(Column::from((qualifier, field.as_ref()))));
project_exprs.push(Expr::from((qualifier, field)));
}
}

Expand All @@ -525,10 +525,7 @@ fn build_recover_project_plan(
schema: &DFSchema,
input: LogicalPlan,
) -> Result<LogicalPlan> {
let col_exprs = schema
.iter()
.map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field.as_ref()))))
.collect();
let col_exprs = schema.iter().map(Expr::from).collect();
Ok(LogicalPlan::Projection(Projection::try_new(
col_exprs,
Arc::new(input),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/replace_distinct_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
.skip(on_expr.len())
.zip(schema.iter())
.map(|((new_qualifier, new_field), (old_qualifier, old_field))| {
Ok(col(Column::from((new_qualifier, new_field.as_ref())))
Ok(col(Column::from((new_qualifier, new_field)))
.alias_qualified(old_qualifier.cloned(), old_field.name()))
})
.collect::<Result<Vec<Expr>>>()?;
Expand Down
8 changes: 3 additions & 5 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use sqlparser::ast::{ArrayAgg, Expr as SQLExpr, JsonOperator, TrimWhereField, Va
use sqlparser::parser::ParserError::ParserError;

use datafusion_common::{
internal_datafusion_err, internal_err, not_impl_err, plan_err, Column, DFSchema,
Result, ScalarValue,
internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, Result,
ScalarValue,
};
use datafusion_expr::expr::AggregateFunctionDefinition;
use datafusion_expr::expr::InList;
Expand Down Expand Up @@ -142,9 +142,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
_ => false,
}) {
Some((qualifier, df_field)) => {
Expr::Column(Column::from((qualifier, df_field.as_ref())))
}
Some((qualifier, df_field)) => Expr::from((qualifier, df_field)),
None => Expr::Column(col),
}
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1307,8 +1307,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
))
} else {
datafusion_expr::Expr::Column(Column::from((
qualifier,
field.as_ref(),
qualifier, field,
)))
}
}
Expand Down