From 465f3971e1e602108b58c47a14299e06d988131e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 26 Mar 2021 12:08:10 -0400 Subject: [PATCH 1/8] ARROW-12106: [Rust][DataFusion] Support `SELECT * from information_schema.tables` --- rust/datafusion/src/bin/repl.rs | 7 +- rust/datafusion/src/catalog/catalog.rs | 61 +++++ .../src/catalog/information_schema.rs | 225 ++++++++++++++++++ rust/datafusion/src/catalog/mod.rs | 1 + rust/datafusion/src/execution/context.rs | 184 ++++++++++++-- rust/datafusion/src/physical_plan/parquet.rs | 7 +- rust/datafusion/src/physical_plan/planner.rs | 7 +- 7 files changed, 470 insertions(+), 22 deletions(-) create mode 100644 rust/datafusion/src/catalog/information_schema.rs diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs index 00e42615b90e..4a6237c4bb33 100644 --- a/rust/datafusion/src/bin/repl.rs +++ b/rust/datafusion/src/bin/repl.rs @@ -61,8 +61,11 @@ pub async fn main() { .map(|size| size.parse::().unwrap()) .unwrap_or(1_048_576); - let mut ctx = - ExecutionContext::with_config(ExecutionConfig::new().with_batch_size(batch_size)); + let mut ctx = ExecutionContext::with_config( + ExecutionConfig::new() + .with_batch_size(batch_size) + .with_information_schema(), + ); let mut rl = Editor::<()>::new(); rl.load_history(".history").ok(); diff --git a/rust/datafusion/src/catalog/catalog.rs b/rust/datafusion/src/catalog/catalog.rs index 69059d13bb30..9bc10fe8bd14 100644 --- a/rust/datafusion/src/catalog/catalog.rs +++ b/rust/datafusion/src/catalog/catalog.rs @@ -23,6 +23,67 @@ use std::any::Any; use std::collections::HashMap; use std::sync::{Arc, RwLock}; +/// Represent a list of named catalogs +pub trait CatalogList: Sync + Send { + /// Returns the catalog list as [`Any`](std::any::Any) + /// so that it can be downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Adds a new catalog to this catalog list + /// If a catalog of the same name existed before, it is replaced in the list and returned. + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option>; + + /// Retrieves the list of available catalog names + fn catalog_names(&self) -> Vec; + + /// Retrieves a specific catalog by name, provided it exists. + fn catalog(&self, name: &str) -> Option>; +} + +/// Simple in-memory list of catalogs +pub struct MemoryCatalogList { + /// Collection of catalogs containing schemas and ultimately TableProviders + pub catalogs: RwLock>>, +} + +impl MemoryCatalogList { + /// Instantiates a new MemoryDatabase with an empty collection of catalogs + pub fn new() -> Self { + Self { + catalogs: RwLock::new(HashMap::new()), + } + } +} + +impl CatalogList for MemoryCatalogList { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + let mut catalogs = self.catalogs.write().unwrap(); + catalogs.insert(name, catalog) + } + + fn catalog_names(&self) -> Vec { + let catalogs = self.catalogs.read().unwrap(); + catalogs.keys().map(|s| s.to_string()).collect() + } + + fn catalog(&self, name: &str) -> Option> { + let catalogs = self.catalogs.read().unwrap(); + catalogs.get(name).cloned() + } +} + /// Represents a catalog, comprising a number of named schemas. pub trait CatalogProvider: Sync + Send { /// Returns the catalog provider as [`Any`](std::any::Any) diff --git a/rust/datafusion/src/catalog/information_schema.rs b/rust/datafusion/src/catalog/information_schema.rs new file mode 100644 index 000000000000..cf30c64046e8 --- /dev/null +++ b/rust/datafusion/src/catalog/information_schema.rs @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Implements the SQL [Information Schema] for DataFusion. +//! +//! Information Schema](https://en.wikipedia.org/wiki/Information_schema) + +use std::{any, sync::Arc}; + +use arrow::{ + array::StringBuilder, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, +}; + +use crate::datasource::{MemTable, TableProvider}; + +use super::{ + catalog::{CatalogList, CatalogProvider}, + schema::SchemaProvider, +}; + +const INFORMATION_SCHEMA: &str = "information_schema"; +const TABLES: &str = "tables"; + +/// Wraps a [`CatalogList`] so that it also provides an +/// `information_schema` schema view as well in each catalog. +pub(crate) struct CatalogListWithInformationSchema { + inner: Arc, +} + +impl CatalogListWithInformationSchema { + pub(crate) fn new(inner: Arc) -> Self { + Self { inner } + } + + /// Returns a catalog provider that also contains the information schema table provider. + pub(crate) fn catalog(&self, name: &str) -> Option> { + self.inner.catalog(name).map(|catalog| { + let catalog_list = self.inner.clone(); + + Arc::new(CatalogWithInformationSchema { + catalog_list, + inner: catalog, + }) as Arc + }) + } +} + +/// Wraps another [`CatalogProvider`] and adds a "information_schema" +/// schema that can introspect on tables in the catalog_list +struct CatalogWithInformationSchema { + catalog_list: Arc, + /// wrapped provider + inner: Arc, +} + +impl CatalogProvider for CatalogWithInformationSchema { + fn as_any(&self) -> &dyn any::Any { + self + } + + fn schema_names(&self) -> Vec { + self.inner + .schema_names() + .into_iter() + .chain(std::iter::once(INFORMATION_SCHEMA.to_string())) + .collect::>() + } + + fn schema(&self, name: &str) -> Option> { + if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) { + Some(Arc::new(InformationSchemaProvider { + catalog_list: self.catalog_list.clone(), + })) + } else { + self.inner.schema(name) + } + } +} + +/// Implements the `information_schema` virtual schema and tables +/// +/// The underlying tables in the `information_schema` are created on +/// demand. This means that if more tables are added to the underlying +/// providers, they will appear the next time the `information_schema` +/// table is queried. +struct InformationSchemaProvider { + catalog_list: Arc, +} + +impl SchemaProvider for InformationSchemaProvider { + fn as_any(&self) -> &(dyn any::Any + 'static) { + self + } + + fn table_names(&self) -> Vec { + vec![TABLES.to_string()] + } + + fn table(&self, name: &str) -> Option> { + if name.eq_ignore_ascii_case("tables") { + // create a mem table with the names of tables + let mut builder = InformationSchemaTablesBuilder::new(); + + for catalog_name in self.catalog_list.catalog_names() { + let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); + + for schema_name in catalog.schema_names() { + let schema = catalog.schema(&schema_name).unwrap(); + for table_name in schema.table_names() { + builder.add_base_table(&catalog_name, &schema_name, table_name) + } + } + + // Add a final list for the information schema tables themselves + builder.add_system_table(&catalog_name, INFORMATION_SCHEMA, TABLES); + } + + let mem_table = builder.build(); + + Some(Arc::new(mem_table)) + } else { + None + } + } +} + +/// Builds the `information_schema.TABLE` table row by row + +struct InformationSchemaTablesBuilder { + catalog_names: StringBuilder, + schema_names: StringBuilder, + table_names: StringBuilder, + table_types: StringBuilder, +} + +impl InformationSchemaTablesBuilder { + fn new() -> Self { + Self { + catalog_names: StringBuilder::new(10), + schema_names: StringBuilder::new(10), + table_names: StringBuilder::new(10), + table_types: StringBuilder::new(10), + } + } + + fn add_base_table( + &mut self, + catalog_name: impl AsRef, + schema_name: impl AsRef, + table_name: impl AsRef, + ) { + // Note: append_value is actually infallable. + self.catalog_names + .append_value(catalog_name.as_ref()) + .unwrap(); + self.schema_names + .append_value(schema_name.as_ref()) + .unwrap(); + self.table_names.append_value(table_name.as_ref()).unwrap(); + self.table_types.append_value("BASE TABLE").unwrap(); + } + + fn add_system_table( + &mut self, + catalog_name: impl AsRef, + schema_name: impl AsRef, + table_name: impl AsRef, + ) { + // Note: append_value is actually infallable. + self.catalog_names + .append_value(catalog_name.as_ref()) + .unwrap(); + self.schema_names + .append_value(schema_name.as_ref()) + .unwrap(); + self.table_names.append_value(table_name.as_ref()).unwrap(); + self.table_types.append_value("SYSTEM TABLE").unwrap(); + } + + fn build(self) -> MemTable { + let schema = Schema::new(vec![ + Field::new("table_catalog", DataType::Utf8, false), + Field::new("table_schema", DataType::Utf8, false), + Field::new("table_name", DataType::Utf8, false), + Field::new("table_type", DataType::Utf8, false), + ]); + + let Self { + mut catalog_names, + mut schema_names, + mut table_names, + mut table_types, + } = self; + + let schema = Arc::new(schema); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(catalog_names.finish()), + Arc::new(schema_names.finish()), + Arc::new(table_names.finish()), + Arc::new(table_types.finish()), + ], + ) + .unwrap(); + + MemTable::try_new(schema, vec![vec![batch]]).unwrap() + } +} diff --git a/rust/datafusion/src/catalog/mod.rs b/rust/datafusion/src/catalog/mod.rs index b61ed154acc3..10591f07e378 100644 --- a/rust/datafusion/src/catalog/mod.rs +++ b/rust/datafusion/src/catalog/mod.rs @@ -19,6 +19,7 @@ //! of table namespacing concepts, including catalogs and schemas. pub mod catalog; +pub mod information_schema; pub mod schema; use crate::error::DataFusionError; diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 01f4204dbae6..206c07b1fe6f 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -16,7 +16,13 @@ // under the License. //! ExecutionContext contains methods for registering data sources and executing queries -use crate::optimizer::hash_build_probe_order::HashBuildProbeOrder; +use crate::{ + catalog::{ + catalog::{CatalogList, MemoryCatalogList}, + information_schema::CatalogListWithInformationSchema, + }, + optimizer::hash_build_probe_order::HashBuildProbeOrder, +}; use log::debug; use std::fs; use std::path::Path; @@ -116,7 +122,7 @@ impl ExecutionContext { /// Creates a new execution context using the provided configuration. pub fn with_config(config: ExecutionConfig) -> Self { - let mut catalogs = HashMap::new(); + let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc; if config.create_default_catalog_and_schema { let default_catalog = MemoryCatalogProvider::new(); @@ -125,7 +131,7 @@ impl ExecutionContext { Arc::new(MemorySchemaProvider::new()), ); - catalogs.insert( + catalog_list.register_catalog( config.default_catalog.clone(), Arc::new(default_catalog) as Arc, ); @@ -133,7 +139,7 @@ impl ExecutionContext { Self { state: Arc::new(Mutex::new(ExecutionContextState { - catalogs, + catalog_list, scalar_functions: HashMap::new(), var_provider: HashMap::new(), aggregate_functions: HashMap::new(), @@ -310,16 +316,17 @@ impl ExecutionContext { name: impl Into, catalog: Arc, ) -> Option> { + let name = name.into(); self.state .lock() .unwrap() - .catalogs - .insert(name.into(), catalog) + .catalog_list + .register_catalog(name, catalog) } /// Retrieves a `CatalogProvider` instance by name pub fn catalog(&self, name: &str) -> Option> { - self.state.lock().unwrap().catalogs.get(name).cloned() + self.state.lock().unwrap().catalog_list.catalog(name) } /// Registers a table using a custom `TableProvider` so that @@ -579,6 +586,9 @@ pub struct ExecutionConfig { default_schema: String, /// Whether the default catalog and schema should be created automatically create_default_catalog_and_schema: bool, + /// Should DataFusion provide access to `information_schema` + /// virtual tables for displaying schema information + information_schema: bool, } impl ExecutionConfig { @@ -598,6 +608,7 @@ impl ExecutionConfig { default_catalog: "datafusion".to_owned(), default_schema: "public".to_owned(), create_default_catalog_and_schema: true, + information_schema: false, } } @@ -651,13 +662,19 @@ impl ExecutionConfig { self.create_default_catalog_and_schema = create; self } + + /// Enables the `information_schema` virtual tables + pub fn with_information_schema(mut self) -> Self { + self.information_schema = true; + self + } } /// Execution context for registering data sources and executing queries #[derive(Clone)] pub struct ExecutionContextState { /// Collection of catalogs containing schemas and ultimately TableProviders - pub catalogs: HashMap>, + pub catalog_list: Arc, /// Scalar functions that are registered with the context pub scalar_functions: HashMap>, /// Variable provider that are registered with the context @@ -678,20 +695,31 @@ impl ExecutionContextState { .resolve(&self.config.default_catalog, &self.config.default_schema) } + fn catalog_for_ref<'a>( + &'a self, + resolved_ref: &ResolvedTableReference<'a>, + ) -> Result> { + if self.config.information_schema { + CatalogListWithInformationSchema::new(self.catalog_list.clone()) + .catalog(resolved_ref.catalog) + } else { + self.catalog_list.catalog(resolved_ref.catalog) + } + .ok_or_else(|| { + DataFusionError::Plan(format!( + "failed to resolve catalog: {}", + resolved_ref.catalog + )) + }) + } + fn schema_for_ref<'a>( &'a self, table_ref: impl Into>, ) -> Result> { let resolved_ref = self.resolve_table_ref(table_ref.into()); - self.catalogs - .get(resolved_ref.catalog) - .ok_or_else(|| { - DataFusionError::Plan(format!( - "failed to resolve catalog: {}", - resolved_ref.catalog - )) - })? + self.catalog_for_ref(&resolved_ref)? .schema(resolved_ref.schema) .ok_or_else(|| { DataFusionError::Plan(format!( @@ -2048,6 +2076,130 @@ mod tests { Ok(Arc::new(MemTable::try_new(schema, partitions)?)) } + #[tokio::test] + async fn information_schema_tables_not_exist_by_default() { + let mut ctx = ExecutionContext::new(); + + let err = plan_and_collect(&mut ctx, "SELECT * from information_schema.tables") + .await + .unwrap_err(); + assert_eq!( + err.to_string(), + "Error during planning: Table or CTE with name 'information_schema.tables' not found" + ); + } + + #[tokio::test] + async fn information_schema_tables_no_tables() { + let mut ctx = ExecutionContext::with_config( + ExecutionConfig::new().with_information_schema(), + ); + + let result = + plan_and_collect(&mut ctx, "SELECT * from information_schema.tables") + .await + .unwrap(); + + let expected = vec![ + "+---------------+--------------------+------------+--------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+---------------+--------------------+------------+--------------+", + "| datafusion | information_schema | TABLES | SYSTEM TABLE |", + "+---------------+--------------------+------------+--------------+", + ]; + assert_batches_eq!(expected, &result); + } + + #[tokio::test] + async fn information_schema_tables_tables_default_catalog() { + let mut ctx = ExecutionContext::with_config( + ExecutionConfig::new().with_information_schema(), + ); + + // Now, register an empty table + ctx.register_table("t", table_with_sequence(1, 1).unwrap()) + .unwrap(); + + let result = + plan_and_collect(&mut ctx, "SELECT * from information_schema.tables") + .await + .unwrap(); + + let expected = vec![ + "+---------------+--------------------+------------+--------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+---------------+--------------------+------------+--------------+", + "| datafusion | public | t | BASE TABLE |", + "| datafusion | information_schema | TABLES | SYSTEM TABLE |", + "+---------------+--------------------+------------+--------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + + // Newly added tables should appear + ctx.register_table("t2", table_with_sequence(1, 1).unwrap()) + .unwrap(); + + let result = + plan_and_collect(&mut ctx, "SELECT * from information_schema.tables") + .await + .unwrap(); + + let expected = vec![ + "+---------------+--------------------+------------+--------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+---------------+--------------------+------------+--------------+", + "| datafusion | public | t2 | BASE TABLE |", + "| datafusion | public | t | BASE TABLE |", + "| datafusion | information_schema | TABLES | SYSTEM TABLE |", + "+---------------+--------------------+------------+--------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + } + + #[tokio::test] + async fn information_schema_tables_tables_with_multiple_catalogs() { + let mut ctx = ExecutionContext::with_config( + ExecutionConfig::new().with_information_schema(), + ); + let catalog = MemoryCatalogProvider::new(); + let schema = MemorySchemaProvider::new(); + schema + .register_table("t1".to_owned(), table_with_sequence(1, 1).unwrap()) + .unwrap(); + schema + .register_table("t2".to_owned(), table_with_sequence(1, 1).unwrap()) + .unwrap(); + catalog.register_schema("my_schema", Arc::new(schema)); + ctx.register_catalog("my_catalog", Arc::new(catalog)); + + let catalog = MemoryCatalogProvider::new(); + let schema = MemorySchemaProvider::new(); + schema + .register_table("t3".to_owned(), table_with_sequence(1, 1).unwrap()) + .unwrap(); + catalog.register_schema("my_other_schema", Arc::new(schema)); + ctx.register_catalog("my_other_catalog", Arc::new(catalog)); + + let result = + plan_and_collect(&mut ctx, "SELECT * from information_schema.tables") + .await + .unwrap(); + + let expected = vec![ + "+------------------+--------------------+------------+--------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+------------------+--------------------+------------+--------------+", + "| datafusion | information_schema | TABLES | SYSTEM TABLE |", + "| my_other_catalog | my_other_schema | t3 | BASE TABLE |", + "| my_other_catalog | information_schema | TABLES | SYSTEM TABLE |", + "| my_catalog | my_schema | t2 | BASE TABLE |", + "| my_catalog | my_schema | t1 | BASE TABLE |", + "| my_catalog | information_schema | TABLES | SYSTEM TABLE |", + "+------------------+--------------------+------------+--------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + } + #[tokio::test] async fn disabled_default_catalog_and_schema() -> Result<()> { let mut ctx = ExecutionContext::with_config( diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index 569eb59847e9..fce85e360743 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -30,7 +30,10 @@ use super::{ planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, }; -use crate::physical_plan::{common, ExecutionPlan, Partitioning}; +use crate::{ + catalog::catalog::MemoryCatalogList, + physical_plan::{common, ExecutionPlan, Partitioning}, +}; use crate::{ error::{DataFusionError, Result}, execution::context::ExecutionContextState, @@ -392,7 +395,7 @@ impl RowGroupPredicateBuilder { .collect::>(); let stat_schema = Schema::new(stat_fields); let execution_context_state = ExecutionContextState { - catalogs: HashMap::new(), + catalog_list: Arc::new(MemoryCatalogList::new()), scalar_functions: HashMap::new(), var_provider: HashMap::new(), aggregate_functions: HashMap::new(), diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index a9b1da179ca4..dd4184c5e1c0 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -749,10 +749,13 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { #[cfg(test)] mod tests { use super::*; - use crate::logical_plan::{DFField, DFSchema, DFSchemaRef}; use crate::physical_plan::{csv::CsvReadOptions, expressions, Partitioning}; use crate::prelude::ExecutionConfig; use crate::scalar::ScalarValue; + use crate::{ + catalog::catalog::MemoryCatalogList, + logical_plan::{DFField, DFSchema, DFSchemaRef}, + }; use crate::{ logical_plan::{col, lit, sum, LogicalPlanBuilder}, physical_plan::SendableRecordBatchStream, @@ -764,7 +767,7 @@ mod tests { fn make_ctx_state() -> ExecutionContextState { ExecutionContextState { - catalogs: HashMap::new(), + catalog_list: Arc::new(MemoryCatalogList::new()), scalar_functions: HashMap::new(), var_provider: HashMap::new(), aggregate_functions: HashMap::new(), From cea11e34fe5b10172dd09c3636b8bc6ba88a4cf3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 26 Mar 2021 17:31:37 -0400 Subject: [PATCH 2/8] fixup test --- rust/datafusion/src/execution/context.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 206c07b1fe6f..b7350edfea03 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -2104,7 +2104,7 @@ mod tests { "+---------------+--------------------+------------+--------------+", "| table_catalog | table_schema | table_name | table_type |", "+---------------+--------------------+------------+--------------+", - "| datafusion | information_schema | TABLES | SYSTEM TABLE |", + "| datafusion | information_schema | tables | SYSTEM TABLE |", "+---------------+--------------------+------------+--------------+", ]; assert_batches_eq!(expected, &result); @@ -2130,7 +2130,7 @@ mod tests { "| table_catalog | table_schema | table_name | table_type |", "+---------------+--------------------+------------+--------------+", "| datafusion | public | t | BASE TABLE |", - "| datafusion | information_schema | TABLES | SYSTEM TABLE |", + "| datafusion | information_schema | tables | SYSTEM TABLE |", "+---------------+--------------------+------------+--------------+", ]; assert_batches_sorted_eq!(expected, &result); @@ -2150,7 +2150,7 @@ mod tests { "+---------------+--------------------+------------+--------------+", "| datafusion | public | t2 | BASE TABLE |", "| datafusion | public | t | BASE TABLE |", - "| datafusion | information_schema | TABLES | SYSTEM TABLE |", + "| datafusion | information_schema | tables | SYSTEM TABLE |", "+---------------+--------------------+------------+--------------+", ]; assert_batches_sorted_eq!(expected, &result); @@ -2189,12 +2189,12 @@ mod tests { "+------------------+--------------------+------------+--------------+", "| table_catalog | table_schema | table_name | table_type |", "+------------------+--------------------+------------+--------------+", - "| datafusion | information_schema | TABLES | SYSTEM TABLE |", + "| datafusion | information_schema | tables | SYSTEM TABLE |", "| my_other_catalog | my_other_schema | t3 | BASE TABLE |", - "| my_other_catalog | information_schema | TABLES | SYSTEM TABLE |", + "| my_other_catalog | information_schema | tables | SYSTEM TABLE |", "| my_catalog | my_schema | t2 | BASE TABLE |", "| my_catalog | my_schema | t1 | BASE TABLE |", - "| my_catalog | information_schema | TABLES | SYSTEM TABLE |", + "| my_catalog | information_schema | tables | SYSTEM TABLE |", "+------------------+--------------------+------------+--------------+", ]; assert_batches_sorted_eq!(expected, &result); From ea3586b15cede59f5e6e2fe425de72af22135dd8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 28 Mar 2021 13:12:02 -0400 Subject: [PATCH 3/8] fixup comment --- rust/datafusion/src/catalog/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/datafusion/src/catalog/catalog.rs b/rust/datafusion/src/catalog/catalog.rs index 9bc10fe8bd14..30fea1f45f2f 100644 --- a/rust/datafusion/src/catalog/catalog.rs +++ b/rust/datafusion/src/catalog/catalog.rs @@ -51,7 +51,7 @@ pub struct MemoryCatalogList { } impl MemoryCatalogList { - /// Instantiates a new MemoryDatabase with an empty collection of catalogs + /// Instantiates a new `MemoryCatalogList` with an empty collection of catalogs pub fn new() -> Self { Self { catalogs: RwLock::new(HashMap::new()), From df99e080f3fcbad02899ade97360d6a9b2c5757c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 28 Mar 2021 13:15:53 -0400 Subject: [PATCH 4/8] use VIEW and clarify intent of initial capacity --- rust/datafusion/src/catalog/information_schema.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/catalog/information_schema.rs b/rust/datafusion/src/catalog/information_schema.rs index cf30c64046e8..b7066e72f2c0 100644 --- a/rust/datafusion/src/catalog/information_schema.rs +++ b/rust/datafusion/src/catalog/information_schema.rs @@ -151,11 +151,15 @@ struct InformationSchemaTablesBuilder { impl InformationSchemaTablesBuilder { fn new() -> Self { + // StringBuilder requires providing an initial capacity, so + // pick 10 here arbitrarily as this is not performance + // critical code and the number of tables is unavailable here. + let default_capacity = 10; Self { - catalog_names: StringBuilder::new(10), - schema_names: StringBuilder::new(10), - table_names: StringBuilder::new(10), - table_types: StringBuilder::new(10), + catalog_names: StringBuilder::new(default_capacity), + schema_names: StringBuilder::new(default_capacity), + table_names: StringBuilder::new(default_capacity), + table_types: StringBuilder::new(default_capacity), } } @@ -190,7 +194,7 @@ impl InformationSchemaTablesBuilder { .append_value(schema_name.as_ref()) .unwrap(); self.table_names.append_value(table_name.as_ref()).unwrap(); - self.table_types.append_value("SYSTEM TABLE").unwrap(); + self.table_types.append_value("VIEW").unwrap(); } fn build(self) -> MemTable { From f0bc9b0cdf92b5dc43cac0d7e2a770681d2e5bac Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 28 Mar 2021 13:25:02 -0400 Subject: [PATCH 5/8] Add parameter, update tests --- rust/datafusion/src/bin/repl.rs | 2 +- rust/datafusion/src/execution/context.rs | 54 ++++++++++++------------ 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs index 4a6237c4bb33..a6aec204c0d3 100644 --- a/rust/datafusion/src/bin/repl.rs +++ b/rust/datafusion/src/bin/repl.rs @@ -64,7 +64,7 @@ pub async fn main() { let mut ctx = ExecutionContext::with_config( ExecutionConfig::new() .with_batch_size(batch_size) - .with_information_schema(), + .with_information_schema(true), ); let mut rl = Editor::<()>::new(); diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index b7350edfea03..8c4671ccc93a 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -663,9 +663,9 @@ impl ExecutionConfig { self } - /// Enables the `information_schema` virtual tables - pub fn with_information_schema(mut self) -> Self { - self.information_schema = true; + /// Enables or disables the inclusion of `information_schema` virtual tables + pub fn with_information_schema(mut self, enabled: bool) -> Self { + self.information_schema = enabled; self } } @@ -2092,7 +2092,7 @@ mod tests { #[tokio::test] async fn information_schema_tables_no_tables() { let mut ctx = ExecutionContext::with_config( - ExecutionConfig::new().with_information_schema(), + ExecutionConfig::new().with_information_schema(true), ); let result = @@ -2101,11 +2101,11 @@ mod tests { .unwrap(); let expected = vec![ - "+---------------+--------------------+------------+--------------+", - "| table_catalog | table_schema | table_name | table_type |", - "+---------------+--------------------+------------+--------------+", - "| datafusion | information_schema | tables | SYSTEM TABLE |", - "+---------------+--------------------+------------+--------------+", + "+---------------+--------------------+------------+------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+---------------+--------------------+------------+------------+", + "| datafusion | information_schema | tables | VIEW |", + "+---------------+--------------------+------------+------------+", ]; assert_batches_eq!(expected, &result); } @@ -2113,7 +2113,7 @@ mod tests { #[tokio::test] async fn information_schema_tables_tables_default_catalog() { let mut ctx = ExecutionContext::with_config( - ExecutionConfig::new().with_information_schema(), + ExecutionConfig::new().with_information_schema(true), ); // Now, register an empty table @@ -2126,12 +2126,12 @@ mod tests { .unwrap(); let expected = vec![ - "+---------------+--------------------+------------+--------------+", - "| table_catalog | table_schema | table_name | table_type |", - "+---------------+--------------------+------------+--------------+", - "| datafusion | public | t | BASE TABLE |", - "| datafusion | information_schema | tables | SYSTEM TABLE |", - "+---------------+--------------------+------------+--------------+", + "+---------------+--------------------+------------+------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+---------------+--------------------+------------+------------+", + "| datafusion | information_schema | tables | VIEW |", + "| datafusion | public | t | BASE TABLE |", + "+---------------+--------------------+------------+------------+", ]; assert_batches_sorted_eq!(expected, &result); @@ -2159,7 +2159,7 @@ mod tests { #[tokio::test] async fn information_schema_tables_tables_with_multiple_catalogs() { let mut ctx = ExecutionContext::with_config( - ExecutionConfig::new().with_information_schema(), + ExecutionConfig::new().with_information_schema(true), ); let catalog = MemoryCatalogProvider::new(); let schema = MemorySchemaProvider::new(); @@ -2186,16 +2186,16 @@ mod tests { .unwrap(); let expected = vec![ - "+------------------+--------------------+------------+--------------+", - "| table_catalog | table_schema | table_name | table_type |", - "+------------------+--------------------+------------+--------------+", - "| datafusion | information_schema | tables | SYSTEM TABLE |", - "| my_other_catalog | my_other_schema | t3 | BASE TABLE |", - "| my_other_catalog | information_schema | tables | SYSTEM TABLE |", - "| my_catalog | my_schema | t2 | BASE TABLE |", - "| my_catalog | my_schema | t1 | BASE TABLE |", - "| my_catalog | information_schema | tables | SYSTEM TABLE |", - "+------------------+--------------------+------------+--------------+", + "+------------------+--------------------+------------+------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+------------------+--------------------+------------+------------+", + "| datafusion | information_schema | tables | VIEW |", + "| my_catalog | information_schema | tables | VIEW |", + "| my_catalog | my_schema | t1 | BASE TABLE |", + "| my_catalog | my_schema | t2 | BASE TABLE |", + "| my_other_catalog | information_schema | tables | VIEW |", + "| my_other_catalog | my_other_schema | t3 | BASE TABLE |", + "+------------------+--------------------+------------+------------+", ]; assert_batches_sorted_eq!(expected, &result); } From 721c5ddf6461da684207ad52f3c8fbbd13e092c9 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 28 Mar 2021 13:25:30 -0400 Subject: [PATCH 6/8] update test --- rust/datafusion/src/execution/context.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 8c4671ccc93a..5c23e52f4941 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -2145,13 +2145,13 @@ mod tests { .unwrap(); let expected = vec![ - "+---------------+--------------------+------------+--------------+", - "| table_catalog | table_schema | table_name | table_type |", - "+---------------+--------------------+------------+--------------+", - "| datafusion | public | t2 | BASE TABLE |", - "| datafusion | public | t | BASE TABLE |", - "| datafusion | information_schema | tables | SYSTEM TABLE |", - "+---------------+--------------------+------------+--------------+", + "+---------------+--------------------+------------+------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+---------------+--------------------+------------+------------+", + "| datafusion | information_schema | tables | VIEW |", + "| datafusion | public | t | BASE TABLE |", + "| datafusion | public | t2 | BASE TABLE |", + "+---------------+--------------------+------------+------------+", ]; assert_batches_sorted_eq!(expected, &result); } From 8c57e9aaa5f7c2854f17febaf5cdf5d8742da60b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 28 Mar 2021 14:19:33 -0400 Subject: [PATCH 7/8] Inject schema wrapper once rather than on each lookup --- .../src/catalog/information_schema.rs | 50 +++++++-------- rust/datafusion/src/execution/context.rs | 63 ++++++++++--------- 2 files changed, 56 insertions(+), 57 deletions(-) diff --git a/rust/datafusion/src/catalog/information_schema.rs b/rust/datafusion/src/catalog/information_schema.rs index b7066e72f2c0..bfa71ff90fb6 100644 --- a/rust/datafusion/src/catalog/information_schema.rs +++ b/rust/datafusion/src/catalog/information_schema.rs @@ -37,38 +37,26 @@ use super::{ const INFORMATION_SCHEMA: &str = "information_schema"; const TABLES: &str = "tables"; -/// Wraps a [`CatalogList`] so that it also provides an -/// `information_schema` schema view as well in each catalog. -pub(crate) struct CatalogListWithInformationSchema { - inner: Arc, -} - -impl CatalogListWithInformationSchema { - pub(crate) fn new(inner: Arc) -> Self { - Self { inner } - } - - /// Returns a catalog provider that also contains the information schema table provider. - pub(crate) fn catalog(&self, name: &str) -> Option> { - self.inner.catalog(name).map(|catalog| { - let catalog_list = self.inner.clone(); - - Arc::new(CatalogWithInformationSchema { - catalog_list, - inner: catalog, - }) as Arc - }) - } -} - /// Wraps another [`CatalogProvider`] and adds a "information_schema" /// schema that can introspect on tables in the catalog_list -struct CatalogWithInformationSchema { +pub(crate) struct CatalogWithInformationSchema { catalog_list: Arc, /// wrapped provider inner: Arc, } +impl CatalogWithInformationSchema { + pub(crate) fn new( + catalog_list: Arc, + inner: Arc, + ) -> Self { + Self { + catalog_list, + inner, + } + } +} + impl CatalogProvider for CatalogWithInformationSchema { fn as_any(&self) -> &dyn any::Any { self @@ -121,9 +109,15 @@ impl SchemaProvider for InformationSchemaProvider { let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); for schema_name in catalog.schema_names() { - let schema = catalog.schema(&schema_name).unwrap(); - for table_name in schema.table_names() { - builder.add_base_table(&catalog_name, &schema_name, table_name) + if schema_name != INFORMATION_SCHEMA { + let schema = catalog.schema(&schema_name).unwrap(); + for table_name in schema.table_names() { + builder.add_base_table( + &catalog_name, + &schema_name, + table_name, + ) + } } } diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 5c23e52f4941..098930cb4184 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -19,7 +19,7 @@ use crate::{ catalog::{ catalog::{CatalogList, MemoryCatalogList}, - information_schema::CatalogListWithInformationSchema, + information_schema::CatalogWithInformationSchema, }, optimizer::hash_build_probe_order::HashBuildProbeOrder, }; @@ -126,15 +126,23 @@ impl ExecutionContext { if config.create_default_catalog_and_schema { let default_catalog = MemoryCatalogProvider::new(); + default_catalog.register_schema( config.default_schema.clone(), Arc::new(MemorySchemaProvider::new()), ); - catalog_list.register_catalog( - config.default_catalog.clone(), - Arc::new(default_catalog) as Arc, - ); + let default_catalog: Arc = if config.information_schema { + Arc::new(CatalogWithInformationSchema::new( + catalog_list.clone(), + Arc::new(default_catalog), + )) + } else { + Arc::new(default_catalog) + }; + + catalog_list + .register_catalog(config.default_catalog.clone(), default_catalog); } Self { @@ -317,11 +325,19 @@ impl ExecutionContext { catalog: Arc, ) -> Option> { let name = name.into(); - self.state - .lock() - .unwrap() - .catalog_list - .register_catalog(name, catalog) + + let state = self.state.lock().unwrap(); + println!("Registering catalog for {}", name); + let catalog = if state.config.information_schema { + Arc::new(CatalogWithInformationSchema::new( + state.catalog_list.clone(), + catalog, + )) + } else { + catalog + }; + + state.catalog_list.register_catalog(name, catalog) } /// Retrieves a `CatalogProvider` instance by name @@ -695,31 +711,20 @@ impl ExecutionContextState { .resolve(&self.config.default_catalog, &self.config.default_schema) } - fn catalog_for_ref<'a>( - &'a self, - resolved_ref: &ResolvedTableReference<'a>, - ) -> Result> { - if self.config.information_schema { - CatalogListWithInformationSchema::new(self.catalog_list.clone()) - .catalog(resolved_ref.catalog) - } else { - self.catalog_list.catalog(resolved_ref.catalog) - } - .ok_or_else(|| { - DataFusionError::Plan(format!( - "failed to resolve catalog: {}", - resolved_ref.catalog - )) - }) - } - fn schema_for_ref<'a>( &'a self, table_ref: impl Into>, ) -> Result> { let resolved_ref = self.resolve_table_ref(table_ref.into()); - self.catalog_for_ref(&resolved_ref)? + self.catalog_list + .catalog(resolved_ref.catalog) + .ok_or_else(|| { + DataFusionError::Plan(format!( + "failed to resolve catalog: {}", + resolved_ref.catalog + )) + })? .schema(resolved_ref.schema) .ok_or_else(|| { DataFusionError::Plan(format!( From be47c11a026ef5817fe9827d58cf2159d0d15c37 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 29 Mar 2021 10:39:36 -0400 Subject: [PATCH 8/8] Remove println --- rust/datafusion/src/execution/context.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 098930cb4184..d183633be551 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -327,7 +327,6 @@ impl ExecutionContext { let name = name.into(); let state = self.state.lock().unwrap(); - println!("Registering catalog for {}", name); let catalog = if state.config.information_schema { Arc::new(CatalogWithInformationSchema::new( state.catalog_list.clone(),