From 5c9f8b4e7dcd24e9f9475627a42b6c01d1398889 Mon Sep 17 00:00:00 2001 From: zhanglinwei Date: Mon, 16 Oct 2023 17:58:38 +0800 Subject: [PATCH 1/4] Improve naming --- datafusion-examples/examples/rewrite_expr.rs | 2 +- datafusion/core/src/execution/context.rs | 2 +- .../optimizer/tests/optimizer_integration.rs | 2 +- datafusion/sql/examples/sql.rs | 2 +- datafusion/sql/src/expr/function.rs | 8 ++-- datafusion/sql/src/expr/identifier.rs | 4 +- datafusion/sql/src/expr/mod.rs | 5 +-- datafusion/sql/src/planner.rs | 8 ++-- datafusion/sql/src/relation/mod.rs | 2 +- datafusion/sql/src/statement.rs | 37 +++++++++---------- datafusion/sql/tests/sql_integration.rs | 2 +- 11 files changed, 35 insertions(+), 39 deletions(-) diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs index e657baab3df83..5e95562033e60 100644 --- a/datafusion-examples/examples/rewrite_expr.rs +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -191,7 +191,7 @@ struct MyContextProvider { } impl ContextProvider for MyContextProvider { - fn get_table_provider(&self, name: TableReference) -> Result> { + fn get_table_source(&self, name: TableReference) -> Result> { if name.table() == "person" { Ok(Arc::new(MyTableSource { schema: Arc::new(Schema::new(vec![ diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index ca6da6cfa047e..11c6d83253e6b 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -2017,7 +2017,7 @@ struct SessionContextProvider<'a> { } impl<'a> ContextProvider for SessionContextProvider<'a> { - fn get_table_provider(&self, name: TableReference) -> Result> { + fn get_table_source(&self, name: TableReference) -> Result> { let name = self.state.resolve_table_ref(name).to_string(); self.tables .get(&name) diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index 46023cfc30bc4..8c629ddd9ffd6 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -362,7 +362,7 @@ struct MySchemaProvider { } impl ContextProvider for MySchemaProvider { - fn get_table_provider(&self, name: TableReference) -> Result> { + fn get_table_source(&self, name: TableReference) -> Result> { let table_name = name.table(); if table_name.starts_with("test") { let schema = Schema::new_with_metadata( diff --git a/datafusion/sql/examples/sql.rs b/datafusion/sql/examples/sql.rs index 8a12cc32b6412..7a826b06b8db2 100644 --- a/datafusion/sql/examples/sql.rs +++ b/datafusion/sql/examples/sql.rs @@ -105,7 +105,7 @@ fn create_table_source(fields: Vec) -> Arc { } impl ContextProvider for MySchemaProvider { - fn get_table_provider(&self, name: TableReference) -> Result> { + fn get_table_source(&self, name: TableReference) -> Result> { match self.tables.get(name.table()) { Some(table) => Ok(table.clone()), _ => plan_err!("Table not found: {}", name.table()), diff --git a/datafusion/sql/src/expr/function.rs b/datafusion/sql/src/expr/function.rs index 3861b4848d9ba..c9d70cbe77ec5 100644 --- a/datafusion/sql/src/expr/function.rs +++ b/datafusion/sql/src/expr/function.rs @@ -47,7 +47,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; // user-defined function (UDF) should have precedence in case it has the same name as a scalar built-in function - if let Some(fm) = self.schema_provider.get_function_meta(&name) { + if let Some(fm) = self.context_provider.get_function_meta(&name) { let args = self.function_args_to_expr(function.args, schema, planner_context)?; return Ok(Expr::ScalarUDF(ScalarUDF::new(fm, args))); @@ -125,7 +125,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } else { // User defined aggregate functions (UDAF) have precedence in case it has the same name as a scalar built-in function - if let Some(fm) = self.schema_provider.get_aggregate_meta(&name) { + if let Some(fm) = self.context_provider.get_aggregate_meta(&name) { let args = self.function_args_to_expr(function.args, schema, planner_context)?; return Ok(Expr::AggregateUDF(expr::AggregateUDF::new( @@ -178,13 +178,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { window_function::find_df_window_func(name) // next check user defined aggregates .or_else(|| { - self.schema_provider + self.context_provider .get_aggregate_meta(name) .map(WindowFunction::AggregateUDF) }) // next check user defined window functions .or_else(|| { - self.schema_provider + self.context_provider .get_window_meta(name) .map(WindowFunction::WindowUDF) }) diff --git a/datafusion/sql/src/expr/identifier.rs b/datafusion/sql/src/expr/identifier.rs index 5e03f14e5337c..c1ee75cfb2bee 100644 --- a/datafusion/sql/src/expr/identifier.rs +++ b/datafusion/sql/src/expr/identifier.rs @@ -33,7 +33,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // TODO: figure out if ScalarVariables should be insensitive. let var_names = vec![id.value]; let ty = self - .schema_provider + .context_provider .get_variable_type(&var_names) .ok_or_else(|| { DataFusionError::Plan(format!( @@ -99,7 +99,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|id| self.normalizer.normalize(id)) .collect(); let ty = self - .schema_provider + .context_provider .get_variable_type(&var_names) .ok_or_else(|| { DataFusionError::Execution(format!( diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index cb34b6ca36e80..844ddde6a6243 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -797,10 +797,7 @@ mod tests { } impl ContextProvider for TestSchemaProvider { - fn get_table_provider( - &self, - name: TableReference, - ) -> Result> { + fn get_table_source(&self, name: TableReference) -> Result> { match self.tables.get(name.table()) { Some(table) => Ok(table.clone()), _ => plan_err!("Table not found: {}", name.table()), diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index a2d790d438ccb..bd34813f81fcd 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -46,7 +46,7 @@ use crate::utils::make_decimal_type; /// functions referenced in SQL statements pub trait ContextProvider { /// Getter for a datasource - fn get_table_provider(&self, name: TableReference) -> Result>; + fn get_table_source(&self, name: TableReference) -> Result>; /// Getter for a UDF description fn get_function_meta(&self, name: &str) -> Option>; /// Getter for a UDAF description @@ -186,7 +186,7 @@ impl PlannerContext { /// SQL query planner pub struct SqlToRel<'a, S: ContextProvider> { - pub(crate) schema_provider: &'a S, + pub(crate) context_provider: &'a S, pub(crate) options: ParserOptions, pub(crate) normalizer: IdentNormalizer, } @@ -201,7 +201,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { pub fn new_with_options(schema_provider: &'a S, options: ParserOptions) -> Self { let normalize = options.enable_ident_normalization; SqlToRel { - schema_provider, + context_provider: schema_provider, options, normalizer: IdentNormalizer::new(normalize), } @@ -334,7 +334,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Timestamp With Time Zone // INPUT : [SQLDataType] TimestampTz + [RuntimeConfig] Time Zone // OUTPUT: [ArrowDataType] Timestamp - self.schema_provider.options().execution.time_zone.clone() + self.context_provider.options().execution.time_zone.clone() } else { // Timestamp Without Time zone None diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index a01a9a2fb8db3..180743d19b7bd 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -38,7 +38,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ( match ( cte, - self.schema_provider.get_table_provider(table_ref.clone()), + self.context_provider.get_table_source(table_ref.clone()), ) { (Some(cte_plan), _) => Ok(cte_plan.clone()), (_, Ok(provider)) => { diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 059a32be441b8..af51d4d1daa85 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -611,7 +611,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let DescribeTableStmt { table_name } = statement; let table_ref = self.object_name_to_table_reference(table_name)?; - let table_source = self.schema_provider.get_table_provider(table_ref)?; + let table_source = self.context_provider.get_table_source(table_ref)?; let schema = table_source.schema(); @@ -630,7 +630,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { CopyToSource::Relation(object_name) => { let table_ref = self.object_name_to_table_reference(object_name.clone())?; - let table_source = self.schema_provider.get_table_provider(table_ref)?; + let table_source = self.context_provider.get_table_source(table_ref)?; LogicalPlanBuilder::scan( object_name_to_string(&object_name), table_source, @@ -912,12 +912,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ) -> Result { // Do a table lookup to verify the table exists let table_ref = self.object_name_to_table_reference(table_name.clone())?; - let provider = self.schema_provider.get_table_provider(table_ref.clone())?; - let schema = (*provider.schema()).clone(); + let table_source = self.context_provider.get_table_source(table_ref.clone())?; + let schema = (*table_source.schema()).clone(); let schema = DFSchema::try_from(schema)?; - let scan = - LogicalPlanBuilder::scan(object_name_to_string(&table_name), provider, None)? - .build()?; + let scan = LogicalPlanBuilder::scan( + object_name_to_string(&table_name), + table_source, + None, + )? + .build()?; let mut planner_context = PlannerContext::new(); let source = match predicate_expr { @@ -960,10 +963,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Do a table lookup to verify the table exists let table_name = self.object_name_to_table_reference(table_name)?; - let provider = self - .schema_provider - .get_table_provider(table_name.clone())?; - let arrow_schema = (*provider.schema()).clone(); + let table_source = self.context_provider.get_table_source(table_name.clone())?; + let arrow_schema = (*table_source.schema()).clone(); let table_schema = Arc::new(DFSchema::try_from_qualified_schema( table_name.clone(), &arrow_schema, @@ -1064,10 +1065,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ) -> Result { // Do a table lookup to verify the table exists let table_name = self.object_name_to_table_reference(table_name)?; - let provider = self - .schema_provider - .get_table_provider(table_name.clone())?; - let arrow_schema = (*provider.schema()).clone(); + let table_source = self.context_provider.get_table_source(table_name.clone())?; + let arrow_schema = (*table_source.schema()).clone(); let table_schema = DFSchema::try_from(arrow_schema)?; // Get insert fields and index_mapping @@ -1193,7 +1192,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Do a table lookup to verify the table exists let table_ref = self.object_name_to_table_reference(sql_table_name)?; - let _ = self.schema_provider.get_table_provider(table_ref)?; + let _ = self.context_provider.get_table_source(table_ref)?; // treat both FULL and EXTENDED as the same let select_list = if full || extended { @@ -1228,7 +1227,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // Do a table lookup to verify the table exists let table_ref = self.object_name_to_table_reference(sql_table_name)?; - let _ = self.schema_provider.get_table_provider(table_ref)?; + let _ = self.context_provider.get_table_source(table_ref)?; let query = format!( "SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}" @@ -1245,8 +1244,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { schema: schema.into(), table: table.into(), }; - self.schema_provider - .get_table_provider(tables_reference) + self.context_provider + .get_table_source(tables_reference) .is_ok() } } diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 661890e125335..8a8ddaefc55f1 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -2691,7 +2691,7 @@ struct MockContextProvider { } impl ContextProvider for MockContextProvider { - fn get_table_provider(&self, name: TableReference) -> Result> { + fn get_table_source(&self, name: TableReference) -> Result> { let schema = match name.table() { "test" => Ok(Schema::new(vec![ Field::new("t_date32", DataType::Date32, false), From 383c60bc4ebf27d76a5f860d50a7ca2a22a10875 Mon Sep 17 00:00:00 2001 From: Linwei Zhang Date: Mon, 16 Oct 2023 22:32:00 +0800 Subject: [PATCH 2/4] Add deprecated warning --- datafusion/sql/src/planner.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index bd34813f81fcd..19612ad311f99 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -45,6 +45,10 @@ use crate::utils::make_decimal_type; /// The ContextProvider trait allows the query planner to obtain meta-data about tables and /// functions referenced in SQL statements pub trait ContextProvider { + #[deprecated(since = "32.0.0", note = "please use `get_table_source` instead")] + fn get_table_provider(&self, name: TableReference) -> Result> { + self.get_table_source(name) + } /// Getter for a datasource fn get_table_source(&self, name: TableReference) -> Result>; /// Getter for a UDF description From 39b3943ee74fcce608bbb4bc4be3c9556105c852 Mon Sep 17 00:00:00 2001 From: Linwei Zhang Date: Tue, 17 Oct 2023 00:30:02 +0800 Subject: [PATCH 3/4] Improve more --- datafusion/optimizer/tests/optimizer_integration.rs | 8 ++++---- datafusion/sql/examples/sql.rs | 10 +++++----- datafusion/sql/src/planner.rs | 8 ++++---- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index 8c629ddd9ffd6..872071e52fa7a 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -339,8 +339,8 @@ fn test_sql(sql: &str) -> Result { let statement = &ast[0]; // create a logical query plan - let schema_provider = MySchemaProvider::default(); - let sql_to_rel = SqlToRel::new(&schema_provider); + let context_provider = MyContextProvider::default(); + let sql_to_rel = SqlToRel::new(&context_provider); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); // hard code the return value of now() @@ -357,11 +357,11 @@ fn test_sql(sql: &str) -> Result { } #[derive(Default)] -struct MySchemaProvider { +struct MyContextProvider { options: ConfigOptions, } -impl ContextProvider for MySchemaProvider { +impl ContextProvider for MyContextProvider { fn get_table_source(&self, name: TableReference) -> Result> { let table_name = name.table(); if table_name.starts_with("test") { diff --git a/datafusion/sql/examples/sql.rs b/datafusion/sql/examples/sql.rs index 7a826b06b8db2..9df65b99a748a 100644 --- a/datafusion/sql/examples/sql.rs +++ b/datafusion/sql/examples/sql.rs @@ -49,20 +49,20 @@ fn main() { let statement = &ast[0]; // create a logical query plan - let schema_provider = MySchemaProvider::new(); - let sql_to_rel = SqlToRel::new(&schema_provider); + let context_provider = MyContextProvider::new(); + let sql_to_rel = SqlToRel::new(&context_provider); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); // show the plan println!("{plan:?}"); } -struct MySchemaProvider { +struct MyContextProvider { options: ConfigOptions, tables: HashMap>, } -impl MySchemaProvider { +impl MyContextProvider { fn new() -> Self { let mut tables = HashMap::new(); tables.insert( @@ -104,7 +104,7 @@ fn create_table_source(fields: Vec) -> Arc { ))) } -impl ContextProvider for MySchemaProvider { +impl ContextProvider for MyContextProvider { fn get_table_source(&self, name: TableReference) -> Result> { match self.tables.get(name.table()) { Some(table) => Ok(table.clone()), diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 19612ad311f99..f7d8307d33a05 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -197,15 +197,15 @@ pub struct SqlToRel<'a, S: ContextProvider> { impl<'a, S: ContextProvider> SqlToRel<'a, S> { /// Create a new query planner - pub fn new(schema_provider: &'a S) -> Self { - Self::new_with_options(schema_provider, ParserOptions::default()) + pub fn new(context_provider: &'a S) -> Self { + Self::new_with_options(context_provider, ParserOptions::default()) } /// Create a new query planner - pub fn new_with_options(schema_provider: &'a S, options: ParserOptions) -> Self { + pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self { let normalize = options.enable_ident_normalization; SqlToRel { - context_provider: schema_provider, + context_provider, options, normalizer: IdentNormalizer::new(normalize), } From d89a9981682b29aa1a07824e15f26337877582aa Mon Sep 17 00:00:00 2001 From: Linwei Zhang Date: Tue, 17 Oct 2023 00:35:33 +0800 Subject: [PATCH 4/4] Improve tests --- datafusion/sql/src/expr/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 844ddde6a6243..03ee602bd3600 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -772,12 +772,12 @@ mod tests { use crate::TableReference; - struct TestSchemaProvider { + struct TestContextProvider { options: ConfigOptions, tables: HashMap>, } - impl TestSchemaProvider { + impl TestContextProvider { pub fn new() -> Self { let mut tables = HashMap::new(); tables.insert( @@ -796,7 +796,7 @@ mod tests { } } - impl ContextProvider for TestSchemaProvider { + impl ContextProvider for TestContextProvider { fn get_table_source(&self, name: TableReference) -> Result> { match self.tables.get(name.table()) { Some(table) => Ok(table.clone()), @@ -850,8 +850,8 @@ mod tests { .unwrap(); let sql_expr = parser.parse_expr().unwrap(); - let schema_provider = TestSchemaProvider::new(); - let sql_to_rel = SqlToRel::new(&schema_provider); + let context_provider = TestContextProvider::new(); + let sql_to_rel = SqlToRel::new(&context_provider); // Should not stack overflow sql_to_rel.sql_expr_to_logical_expr(