Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub use error::{field_not_found, DataFusionError, Result, SchemaError, SharedRes
pub use parsers::parse_interval;
pub use scalar::{ScalarType, ScalarValue};
pub use stats::{ColumnStatistics, Statistics};
pub use table_reference::{OwnedTableReference, ResolvedTableReference, TableReference};
pub use table_reference::{ResolvedTableReference, TableReference};

/// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is
/// not possible. In normal usage of DataFusion the downcast should always succeed.
Expand Down
112 changes: 44 additions & 68 deletions datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<'a> std::fmt::Display for ResolvedTableReference<'a> {
}

/// Represents a path to a table that may require further resolution
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum TableReference<'a> {
/// An unqualified table reference, e.g. "table"
Bare {
Expand All @@ -67,56 +67,23 @@ pub enum TableReference<'a> {
},
}

/// Represents a path to a table that may require further resolution
/// that owns the underlying names
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum OwnedTableReference {
/// An unqualified table reference, e.g. "table"
Bare {
/// The table name
table: String,
},
/// A partially resolved table reference, e.g. "schema.table"
Partial {
/// The schema containing the table
schema: String,
/// The table name
table: String,
},
/// A fully resolved table reference, e.g. "catalog.schema.table"
Full {
/// The catalog (aka database) containing the table
catalog: String,
/// The schema containing the table
schema: String,
/// The table name
table: String,
},
}

impl OwnedTableReference {
/// Return a `TableReference` view of this `OwnedTableReference`
pub fn as_table_reference(&self) -> TableReference<'_> {
impl<'a> std::fmt::Display for TableReference<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Bare { table } => TableReference::Bare {
table: table.into(),
},
Self::Partial { schema, table } => TableReference::Partial {
schema: schema.into(),
table: table.into(),
},
Self::Bare { table } => write!(f, "{table}"),
Self::Partial { schema, table } => {
write!(f, "{schema}.{table}")
}
Self::Full {
catalog,
schema,
table,
} => TableReference::Full {
catalog: catalog.into(),
schema: schema.into(),
table: table.into(),
},
} => write!(f, "{catalog}.{schema}.{table}"),
}
}
}

impl<'a> TableReference<'a> {
/// Retrieve the actual table name, regardless of qualification
pub fn table(&self) -> &str {
match self {
Expand All @@ -125,39 +92,48 @@ impl OwnedTableReference {
| Self::Bare { table } => table,
}
}
}

impl std::fmt::Display for OwnedTableReference {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
/// converts this table reference to a new TableReference borrowed from `self`
pub fn as_borrowed<'b: 'a>(&'b self) -> Self {
match self {
OwnedTableReference::Bare { table } => write!(f, "{table}"),
OwnedTableReference::Partial { schema, table } => {
write!(f, "{schema}.{table}")
}
OwnedTableReference::Full {
catalog,
Self::Bare { table } => Self::Bare {
table: Cow::from(table.as_ref()),
},
Self::Partial { schema, table } => Self::Partial {
schema: Cow::from(schema.as_ref()),
table: Cow::from(table.as_ref()),
},
Self::Full {
schema,
table,
} => write!(f, "{catalog}.{schema}.{table}"),
catalog,
} => Self::Full {
schema: Cow::from(schema.as_ref()),
table: Cow::from(table.as_ref()),
catalog: Cow::from(catalog.as_ref()),
},
}
}
}

/// Convert `OwnedTableReference` into a `TableReference`. Somewhat
/// awkward to use but 'idiomatic': `(&table_ref).into()`
impl<'a> From<&'a OwnedTableReference> for TableReference<'a> {
fn from(r: &'a OwnedTableReference) -> Self {
r.as_table_reference()
}
}

impl<'a> TableReference<'a> {
/// Retrieve the actual table name, regardless of qualification
pub fn table(&self) -> &str {
/// converts this table reference to an owned table reference (with `'static` lifetime)
pub fn to_owned(&self) -> TableReference<'static> {
match self {
Self::Full { table, .. }
| Self::Partial { table, .. }
| Self::Bare { table } => table,
Self::Bare { table } => TableReference::Bare {
table: Cow::from(table.to_string()),
},
Self::Partial { schema, table } => TableReference::Partial {
table: Cow::from(table.to_string()),
schema: Cow::from(schema.to_string()),
},
Self::Full {
schema,
table,
catalog,
} => TableReference::Full {
table: Cow::from(table.to_string()),
schema: Cow::from(schema.to_string()),
catalog: Cow::from(catalog.to_string()),
},
}
}

Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/catalog/listing_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
use async_trait::async_trait;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{DFSchema, DataFusionError, OwnedTableReference};
use datafusion_common::{DFSchema, DataFusionError, TableReference};
use datafusion_expr::CreateExternalTable;
use futures::TryStreamExt;
use itertools::Itertools;
use object_store::ObjectStore;
use std::any::Any;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -128,8 +129,8 @@ impl ListingSchemaProvider {
if !self.table_exist(table_name) {
let table_url = format!("{}/{}", self.authority, table_path);

let name = OwnedTableReference::Bare {
table: table_name.to_string(),
let name = TableReference::Bare {
table: Cow::from(table_name.to_string()),
};
let provider = self
.factory
Expand Down
34 changes: 19 additions & 15 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ use crate::physical_plan::PhysicalPlanner;
use crate::variable::{VarProvider, VarType};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_common::{OwnedTableReference, ScalarValue};
use datafusion_common::ScalarValue;
use datafusion_sql::{
parser::DFParser,
planner::{ContextProvider, SqlToRel},
Expand Down Expand Up @@ -316,19 +316,19 @@ impl SessionContext {
or_replace,
}) => {
let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone());
let table = self.table(&name).await;
let table = self.table(name.as_borrowed()).await;

match (if_not_exists, or_replace, table) {
(true, false, Ok(_)) => self.return_empty_dataframe(),
(false, true, Ok(_)) => {
self.deregister_table(&name)?;
self.deregister_table(name.as_borrowed())?;
let schema = Arc::new(input.schema().as_ref().into());
let physical = DataFrame::new(self.state(), input);

let batches: Vec<_> = physical.collect_partitioned().await?;
let table = Arc::new(MemTable::try_new(schema, batches)?);

self.register_table(&name, table)?;
self.register_table(name.as_borrowed(), table)?;
self.return_empty_dataframe()
}
(true, true, Ok(_)) => Err(DataFusionError::Execution(
Expand All @@ -341,7 +341,7 @@ impl SessionContext {
let batches: Vec<_> = physical.collect_partitioned().await?;
let table = Arc::new(MemTable::try_new(schema, batches)?);

self.register_table(&name, table)?;
self.register_table(name.as_borrowed(), table)?;
self.return_empty_dataframe()
}
(false, false, Ok(_)) => Err(DataFusionError::Execution(format!(
Expand All @@ -356,22 +356,22 @@ impl SessionContext {
or_replace,
definition,
}) => {
let view = self.table(&name).await;
let view = self.table(name.as_borrowed()).await;

match (or_replace, view) {
(true, Ok(_)) => {
self.deregister_table(&name)?;
self.deregister_table(name.as_borrowed())?;
let table =
Arc::new(ViewTable::try_new((*input).clone(), definition)?);

self.register_table(&name, table)?;
self.register_table(name.as_borrowed(), table)?;
self.return_empty_dataframe()
}
(_, Err(_)) => {
let table =
Arc::new(ViewTable::try_new((*input).clone(), definition)?);

self.register_table(&name, table)?;
self.register_table(name.as_borrowed(), table)?;
self.return_empty_dataframe()
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
Expand All @@ -383,7 +383,9 @@ impl SessionContext {
LogicalPlan::DropTable(DropTable {
name, if_exists, ..
}) => {
let result = self.find_and_deregister(&name, TableType::Base).await;
let result = self
.find_and_deregister(name.as_borrowed(), TableType::Base)
.await;
match (result, if_exists) {
(Ok(true), _) => self.return_empty_dataframe(),
(_, true) => self.return_empty_dataframe(),
Expand All @@ -396,7 +398,9 @@ impl SessionContext {
LogicalPlan::DropView(DropView {
name, if_exists, ..
}) => {
let result = self.find_and_deregister(&name, TableType::View).await;
let result = self
.find_and_deregister(name.as_borrowed(), TableType::View)
.await;
match (result, if_exists) {
(Ok(true), _) => self.return_empty_dataframe(),
(_, true) => self.return_empty_dataframe(),
Expand Down Expand Up @@ -554,7 +558,7 @@ impl SessionContext {
&self,
cmd: &CreateExternalTable,
) -> Result<DataFrame> {
let exist = self.table_exist(&cmd.name)?;
let exist = self.table_exist(cmd.name.as_borrowed())?;
if exist {
match cmd.if_not_exists {
true => return self.return_empty_dataframe(),
Expand All @@ -569,7 +573,7 @@ impl SessionContext {

let table_provider: Arc<dyn TableProvider> =
self.create_custom_table(cmd).await?;
self.register_table(&cmd.name, table_provider)?;
self.register_table(cmd.name.as_borrowed(), table_provider)?;
self.return_empty_dataframe()
}

Expand Down Expand Up @@ -1779,7 +1783,7 @@ impl SessionState {
pub fn resolve_table_references(
&self,
statement: &datafusion_sql::parser::Statement,
) -> Result<Vec<OwnedTableReference>> {
) -> Result<Vec<TableReference<'static>>> {
use crate::catalog::information_schema::INFORMATION_SCHEMA_TABLES;
use datafusion_sql::parser::Statement as DFStatement;
use sqlparser::ast::*;
Expand Down Expand Up @@ -1865,7 +1869,7 @@ impl SessionState {
self.config.options.sql_parser.parse_float_as_decimal;
for reference in references {
let table = reference.table();
let resolved = self.resolve_table_ref(reference.as_table_reference());
let resolved = self.resolve_table_ref(reference.as_borrowed());
if let Entry::Vacant(v) = provider.tables.entry(resolved.to_string()) {
if let Ok(schema) = self.schema_for_ref(resolved) {
if let Some(table) = schema.table(table).await {
Expand Down
15 changes: 7 additions & 8 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ use crate::{
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference,
ScalarValue,
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, ScalarValue, TableReference,
};
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug, Display, Formatter};
Expand Down Expand Up @@ -1283,7 +1282,7 @@ pub struct CreateCatalogSchema {
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DropTable {
/// The table name
pub name: OwnedTableReference,
pub name: TableReference<'static>,
/// If the table exists
pub if_exists: bool,
/// Dummy schema
Expand All @@ -1294,7 +1293,7 @@ pub struct DropTable {
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DropView {
/// The view name
pub name: OwnedTableReference,
pub name: TableReference<'static>,
/// If the view exists
pub if_exists: bool,
/// Dummy schema
Expand Down Expand Up @@ -1570,7 +1569,7 @@ pub struct Union {
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct CreateMemoryTable {
/// The table name
pub name: OwnedTableReference,
pub name: TableReference<'static>,
/// The logical plan
pub input: Arc<LogicalPlan>,
/// Option to not error if table already exists
Expand All @@ -1583,7 +1582,7 @@ pub struct CreateMemoryTable {
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct CreateView {
/// The table name
pub name: OwnedTableReference,
pub name: TableReference<'static>,
/// The logical plan
pub input: Arc<LogicalPlan>,
/// Option to not error if table already exists
Expand All @@ -1598,7 +1597,7 @@ pub struct CreateExternalTable {
/// The table schema
pub schema: DFSchemaRef,
/// The table name
pub name: OwnedTableReference,
pub name: TableReference<'static>,
/// The physical location
pub location: String,
/// The file type of physical file
Expand Down Expand Up @@ -1660,7 +1659,7 @@ impl Display for WriteOp {
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct DmlStatement {
/// The table name
pub table_name: OwnedTableReference,
pub table_name: TableReference<'static>,
/// The schema of the table (must align with Rel input)
pub table_schema: DFSchemaRef,
/// The type of operation to perform
Expand Down
Loading