diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index e015ef5c40827..d77e74a3450c2 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -18,7 +18,10 @@ //! DFSchema is an extended schema struct that DataFusion uses to provide support for //! fields with optional relation names. -use std::collections::{HashMap, HashSet}; +use std::borrow::Cow; +use std::cmp::Ordering; +use std::collections::btree_map::Entry; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::convert::TryFrom; use std::fmt::{Display, Formatter}; use std::hash::Hash; @@ -35,11 +38,122 @@ use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; /// A reference-counted reference to a `DFSchema`. pub type DFSchemaRef = Arc; +/// [`FieldReference`]s represent a multi part identifier (path) to a +/// field that may require further resolution. +#[derive(Debug, Clone, PartialEq, Eq)] +struct FieldReference<'a> { + /// The field name + name: Cow<'a, str>, + /// Optional qualifier (usually a table or relation name) + qualifier: Option>, +} + +impl<'a> PartialOrd for FieldReference<'a> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl<'a> Ord for FieldReference<'a> { + fn cmp(&self, other: &Self) -> Ordering { + if self == other { + return Ordering::Equal; + } + + match self.field().cmp(other.field()) { + Ordering::Less => return Ordering::Less, + Ordering::Greater => return Ordering::Greater, + Ordering::Equal => {} + } + + match (self.table(), other.table()) { + (Some(lhs), Some(rhs)) => match lhs.cmp(rhs) { + Ordering::Less => return Ordering::Less, + Ordering::Greater => return Ordering::Greater, + Ordering::Equal => {} + }, + (Some(_), None) => return Ordering::Greater, + (None, Some(_)) => return Ordering::Less, + _ => {} + } + + match (self.schema(), other.schema()) { + (Some(lhs), Some(rhs)) => match lhs.cmp(rhs) { + Ordering::Less => return Ordering::Less, + Ordering::Greater => return Ordering::Greater, + Ordering::Equal => {} + }, + (Some(_), None) => return Ordering::Greater, + (None, Some(_)) => return Ordering::Less, + _ => {} + } + + match (self.catalog(), other.catalog()) { + (Some(lhs), Some(rhs)) => match lhs.cmp(rhs) { + Ordering::Less => return Ordering::Less, + Ordering::Greater => return Ordering::Greater, + Ordering::Equal => {} + }, + (Some(_), None) => return Ordering::Greater, + (None, Some(_)) => return Ordering::Less, + _ => {} + } + + Ordering::Equal + } +} + +/// This is a [`FieldReference`] that has 'static lifetime (aka it +/// owns the underlying strings) +type OwnedFieldReference = FieldReference<'static>; + +impl<'a> FieldReference<'a> { + /// Convenience method for creating a [`FieldReference`]. + pub fn new( + name: impl Into>, + qualifier: Option>, + ) -> Self { + Self { + name: name.into(), + qualifier, + } + } + + /// Compare with another [`FieldReference`] as if both are resolved. + /// This allows comparing across variants, where if a field is not present + /// in both variants being compared then it is ignored in the comparison. + pub fn resolved_eq(&self, other: &Self) -> bool { + self.name == other.name + && match (&self.qualifier, &other.qualifier) { + (Some(lhs), Some(rhs)) => lhs.resolved_eq(rhs), + _ => true, + } + } + + fn field(&self) -> &str { + &self.name + } + + fn table(&self) -> Option<&str> { + self.qualifier.as_ref().map(|q| q.table()) + } + + fn schema(&self) -> Option<&str> { + self.qualifier.as_ref().and_then(|q| q.schema()) + } + + fn catalog(&self) -> Option<&str> { + self.qualifier.as_ref().and_then(|q| q.catalog()) + } +} + /// DFSchema wraps an Arrow schema and adds relation names #[derive(Debug, Clone, PartialEq, Eq)] pub struct DFSchema { /// Fields fields: Vec, + /// Fields index + fields_index: BTreeMap>, /// Additional metadata in form of key value pairs metadata: HashMap, /// Stores functional dependencies in the schema. @@ -51,6 +165,7 @@ impl DFSchema { pub fn empty() -> Self { Self { fields: vec![], + fields_index: BTreeMap::new(), metadata: HashMap::new(), functional_dependencies: FunctionalDependencies::empty(), } @@ -102,8 +217,12 @@ impl DFSchema { )); } } + + let fields_index = build_index(&fields); + Ok(Self { fields, + fields_index, metadata, functional_dependencies: FunctionalDependencies::empty(), }) @@ -159,6 +278,20 @@ impl DFSchema { }; if !duplicated_field { self.fields.push(field.clone()); + let idx = self.fields.len() - 1; + + let field_ref = OwnedFieldReference::new( + field.name().clone(), + field.qualifier().map(|q| q.to_owned_reference()), + ); + match self.fields_index.entry(field_ref) { + Entry::Vacant(entry) => { + entry.insert(vec![idx]); + } + Entry::Occupied(mut entry) => { + entry.get_mut().push(idx); + } + } } } self.metadata.extend(other_schema.metadata.clone()) @@ -206,34 +339,15 @@ impl DFSchema { qualifier: Option<&TableReference>, name: &str, ) -> Result> { + let field_ref = FieldReference::new(name, qualifier.cloned()); let mut matches = self - .fields - .iter() - .enumerate() - .filter(|(_, field)| match (qualifier, &field.qualifier) { - // field to lookup is qualified. - // current field is qualified and not shared between relations, compare both - // qualifier and name. - (Some(q), Some(field_q)) => { - q.resolved_eq(field_q) && field.name() == name - } - // field to lookup is qualified but current field is unqualified. - (Some(qq), None) => { - // the original field may now be aliased with a name that matches the - // original qualified name - let column = Column::from_qualified_name(field.name()); - match column { - Column { - relation: Some(r), - name: column_name, - } => &r == qq && column_name == name, - _ => false, - } - } - // field to lookup is unqualified, no need to compare qualifier - (None, Some(_)) | (None, None) => field.name() == name, + .fields_index + .range(field_ref..) + .take_while(|(q, _indices)| { + q.resolved_eq(&FieldReference::new(name, qualifier.cloned())) }) - .map(|(idx, _)| idx); + .flat_map(|(_q, indices)| indices) + .copied(); Ok(matches.next()) } @@ -272,9 +386,11 @@ impl DFSchema { /// Find all fields match the given name pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> { - self.fields - .iter() - .filter(|field| field.name() == name) + self.fields_index + .range(FieldReference::new(name, None)..) + .take_while(|(q, _indices)| q.field() == name) + .flat_map(|(_q, indices)| indices) + .map(|idx| self.field(*idx)) .collect() } @@ -454,12 +570,15 @@ impl DFSchema { /// Strip all field qualifier in schema pub fn strip_qualifiers(self) -> Self { + let fields: Vec = self + .fields + .into_iter() + .map(|f| f.strip_qualifier()) + .collect(); + let fields_index = build_index(&fields); DFSchema { - fields: self - .fields - .into_iter() - .map(|f| f.strip_qualifier()) - .collect(), + fields, + fields_index, ..self } } @@ -467,12 +586,15 @@ impl DFSchema { /// Replace all field qualifier with new value in schema pub fn replace_qualifier(self, qualifier: impl Into) -> Self { let qualifier = qualifier.into(); + let fields: Vec = self + .fields + .into_iter() + .map(|f| DFField::from_qualified(qualifier.clone(), f.field)) + .collect(); + let fields_index = build_index(&fields); DFSchema { - fields: self - .fields - .into_iter() - .map(|f| DFField::from_qualified(qualifier.clone(), f.field)) - .collect(), + fields, + fields_index, ..self } } @@ -496,6 +618,30 @@ impl DFSchema { } } +fn build_index(fields: &[DFField]) -> BTreeMap> { + let mut index = BTreeMap::new(); + let iter = fields + .iter() + .map(|field| { + OwnedFieldReference::new( + field.name().clone(), + field.qualifier().map(|q| q.to_owned_reference()), + ) + }) + .enumerate(); + for (idx, field) in iter { + match index.entry(field) { + Entry::Vacant(entry) => { + entry.insert(vec![idx]); + } + Entry::Occupied(mut entry) => { + entry.get_mut().push(idx); + } + } + } + index +} + impl From for Schema { /// Convert DFSchema into a Schema fn from(df_schema: DFSchema) -> Self { diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 35119f374fa37..c58db133d214c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2327,7 +2327,9 @@ mod tests { dict_id: 0, \ dict_is_ordered: false, \ metadata: {} } }\ - ], metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \ + ], fields_index: {\ + FieldReference { name: \"a\", qualifier: None }: [0]\ + }, metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \ ExecutionPlan schema: Schema { fields: [\ Field { \ name: \"b\", \ diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index d3bdd47c5cb34..231977067a0ed 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -499,11 +499,15 @@ mod tests { assert_eq!( "Optimizer rule 'get table_scan rule' failed\ncaused by\nget table_scan rule\ncaused by\n\ Internal error: Failed due to generate a different schema, \ - original schema: DFSchema { fields: [], metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \ + original schema: DFSchema { fields: [], fields_index: {}, metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }, \ new schema: DFSchema { fields: [\ DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, \ DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"b\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, \ DFField { qualifier: Some(Bare { table: \"test\" }), field: Field { name: \"c\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }], \ + fields_index: {\ + FieldReference { name: \"a\", qualifier: Some(Bare { table: \"test\" }) }: [0], \ + FieldReference { name: \"b\", qualifier: Some(Bare { table: \"test\" }) }: [1], \ + FieldReference { name: \"c\", qualifier: Some(Bare { table: \"test\" }) }: [2]}, \ metadata: {}, functional_dependencies: FunctionalDependencies { deps: [] } }.\ \nThis was likely caused by a bug in DataFusion's code \ and we would welcome that you file an bug report in our issue tracker",