diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs index 00e42615b90e..a6aec204c0d3 100644 --- a/rust/datafusion/src/bin/repl.rs +++ b/rust/datafusion/src/bin/repl.rs @@ -61,8 +61,11 @@ pub async fn main() { .map(|size| size.parse::().unwrap()) .unwrap_or(1_048_576); - let mut ctx = - ExecutionContext::with_config(ExecutionConfig::new().with_batch_size(batch_size)); + let mut ctx = ExecutionContext::with_config( + ExecutionConfig::new() + .with_batch_size(batch_size) + .with_information_schema(true), + ); let mut rl = Editor::<()>::new(); rl.load_history(".history").ok(); diff --git a/rust/datafusion/src/catalog/catalog.rs b/rust/datafusion/src/catalog/catalog.rs index 69059d13bb30..30fea1f45f2f 100644 --- a/rust/datafusion/src/catalog/catalog.rs +++ b/rust/datafusion/src/catalog/catalog.rs @@ -23,6 +23,67 @@ use std::any::Any; use std::collections::HashMap; use std::sync::{Arc, RwLock}; +/// Represent a list of named catalogs +pub trait CatalogList: Sync + Send { + /// Returns the catalog list as [`Any`](std::any::Any) + /// so that it can be downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Adds a new catalog to this catalog list + /// If a catalog of the same name existed before, it is replaced in the list and returned. + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option>; + + /// Retrieves the list of available catalog names + fn catalog_names(&self) -> Vec; + + /// Retrieves a specific catalog by name, provided it exists. + fn catalog(&self, name: &str) -> Option>; +} + +/// Simple in-memory list of catalogs +pub struct MemoryCatalogList { + /// Collection of catalogs containing schemas and ultimately TableProviders + pub catalogs: RwLock>>, +} + +impl MemoryCatalogList { + /// Instantiates a new `MemoryCatalogList` with an empty collection of catalogs + pub fn new() -> Self { + Self { + catalogs: RwLock::new(HashMap::new()), + } + } +} + +impl CatalogList for MemoryCatalogList { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + let mut catalogs = self.catalogs.write().unwrap(); + catalogs.insert(name, catalog) + } + + fn catalog_names(&self) -> Vec { + let catalogs = self.catalogs.read().unwrap(); + catalogs.keys().map(|s| s.to_string()).collect() + } + + fn catalog(&self, name: &str) -> Option> { + let catalogs = self.catalogs.read().unwrap(); + catalogs.get(name).cloned() + } +} + /// Represents a catalog, comprising a number of named schemas. pub trait CatalogProvider: Sync + Send { /// Returns the catalog provider as [`Any`](std::any::Any) diff --git a/rust/datafusion/src/catalog/information_schema.rs b/rust/datafusion/src/catalog/information_schema.rs new file mode 100644 index 000000000000..bfa71ff90fb6 --- /dev/null +++ b/rust/datafusion/src/catalog/information_schema.rs @@ -0,0 +1,223 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Implements the SQL [Information Schema] for DataFusion. +//! +//! Information Schema](https://en.wikipedia.org/wiki/Information_schema) + +use std::{any, sync::Arc}; + +use arrow::{ + array::StringBuilder, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, +}; + +use crate::datasource::{MemTable, TableProvider}; + +use super::{ + catalog::{CatalogList, CatalogProvider}, + schema::SchemaProvider, +}; + +const INFORMATION_SCHEMA: &str = "information_schema"; +const TABLES: &str = "tables"; + +/// Wraps another [`CatalogProvider`] and adds a "information_schema" +/// schema that can introspect on tables in the catalog_list +pub(crate) struct CatalogWithInformationSchema { + catalog_list: Arc, + /// wrapped provider + inner: Arc, +} + +impl CatalogWithInformationSchema { + pub(crate) fn new( + catalog_list: Arc, + inner: Arc, + ) -> Self { + Self { + catalog_list, + inner, + } + } +} + +impl CatalogProvider for CatalogWithInformationSchema { + fn as_any(&self) -> &dyn any::Any { + self + } + + fn schema_names(&self) -> Vec { + self.inner + .schema_names() + .into_iter() + .chain(std::iter::once(INFORMATION_SCHEMA.to_string())) + .collect::>() + } + + fn schema(&self, name: &str) -> Option> { + if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) { + Some(Arc::new(InformationSchemaProvider { + catalog_list: self.catalog_list.clone(), + })) + } else { + self.inner.schema(name) + } + } +} + +/// Implements the `information_schema` virtual schema and tables +/// +/// The underlying tables in the `information_schema` are created on +/// demand. This means that if more tables are added to the underlying +/// providers, they will appear the next time the `information_schema` +/// table is queried. +struct InformationSchemaProvider { + catalog_list: Arc, +} + +impl SchemaProvider for InformationSchemaProvider { + fn as_any(&self) -> &(dyn any::Any + 'static) { + self + } + + fn table_names(&self) -> Vec { + vec![TABLES.to_string()] + } + + fn table(&self, name: &str) -> Option> { + if name.eq_ignore_ascii_case("tables") { + // create a mem table with the names of tables + let mut builder = InformationSchemaTablesBuilder::new(); + + for catalog_name in self.catalog_list.catalog_names() { + let catalog = self.catalog_list.catalog(&catalog_name).unwrap(); + + for schema_name in catalog.schema_names() { + if schema_name != INFORMATION_SCHEMA { + let schema = catalog.schema(&schema_name).unwrap(); + for table_name in schema.table_names() { + builder.add_base_table( + &catalog_name, + &schema_name, + table_name, + ) + } + } + } + + // Add a final list for the information schema tables themselves + builder.add_system_table(&catalog_name, INFORMATION_SCHEMA, TABLES); + } + + let mem_table = builder.build(); + + Some(Arc::new(mem_table)) + } else { + None + } + } +} + +/// Builds the `information_schema.TABLE` table row by row + +struct InformationSchemaTablesBuilder { + catalog_names: StringBuilder, + schema_names: StringBuilder, + table_names: StringBuilder, + table_types: StringBuilder, +} + +impl InformationSchemaTablesBuilder { + fn new() -> Self { + // StringBuilder requires providing an initial capacity, so + // pick 10 here arbitrarily as this is not performance + // critical code and the number of tables is unavailable here. + let default_capacity = 10; + Self { + catalog_names: StringBuilder::new(default_capacity), + schema_names: StringBuilder::new(default_capacity), + table_names: StringBuilder::new(default_capacity), + table_types: StringBuilder::new(default_capacity), + } + } + + fn add_base_table( + &mut self, + catalog_name: impl AsRef, + schema_name: impl AsRef, + table_name: impl AsRef, + ) { + // Note: append_value is actually infallable. + self.catalog_names + .append_value(catalog_name.as_ref()) + .unwrap(); + self.schema_names + .append_value(schema_name.as_ref()) + .unwrap(); + self.table_names.append_value(table_name.as_ref()).unwrap(); + self.table_types.append_value("BASE TABLE").unwrap(); + } + + fn add_system_table( + &mut self, + catalog_name: impl AsRef, + schema_name: impl AsRef, + table_name: impl AsRef, + ) { + // Note: append_value is actually infallable. + self.catalog_names + .append_value(catalog_name.as_ref()) + .unwrap(); + self.schema_names + .append_value(schema_name.as_ref()) + .unwrap(); + self.table_names.append_value(table_name.as_ref()).unwrap(); + self.table_types.append_value("VIEW").unwrap(); + } + + fn build(self) -> MemTable { + let schema = Schema::new(vec![ + Field::new("table_catalog", DataType::Utf8, false), + Field::new("table_schema", DataType::Utf8, false), + Field::new("table_name", DataType::Utf8, false), + Field::new("table_type", DataType::Utf8, false), + ]); + + let Self { + mut catalog_names, + mut schema_names, + mut table_names, + mut table_types, + } = self; + + let schema = Arc::new(schema); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(catalog_names.finish()), + Arc::new(schema_names.finish()), + Arc::new(table_names.finish()), + Arc::new(table_types.finish()), + ], + ) + .unwrap(); + + MemTable::try_new(schema, vec![vec![batch]]).unwrap() + } +} diff --git a/rust/datafusion/src/catalog/mod.rs b/rust/datafusion/src/catalog/mod.rs index b61ed154acc3..10591f07e378 100644 --- a/rust/datafusion/src/catalog/mod.rs +++ b/rust/datafusion/src/catalog/mod.rs @@ -19,6 +19,7 @@ //! of table namespacing concepts, including catalogs and schemas. pub mod catalog; +pub mod information_schema; pub mod schema; use crate::error::DataFusionError; diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 01f4204dbae6..d183633be551 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -16,7 +16,13 @@ // under the License. //! ExecutionContext contains methods for registering data sources and executing queries -use crate::optimizer::hash_build_probe_order::HashBuildProbeOrder; +use crate::{ + catalog::{ + catalog::{CatalogList, MemoryCatalogList}, + information_schema::CatalogWithInformationSchema, + }, + optimizer::hash_build_probe_order::HashBuildProbeOrder, +}; use log::debug; use std::fs; use std::path::Path; @@ -116,24 +122,32 @@ impl ExecutionContext { /// Creates a new execution context using the provided configuration. pub fn with_config(config: ExecutionConfig) -> Self { - let mut catalogs = HashMap::new(); + let catalog_list = Arc::new(MemoryCatalogList::new()) as Arc; if config.create_default_catalog_and_schema { let default_catalog = MemoryCatalogProvider::new(); + default_catalog.register_schema( config.default_schema.clone(), Arc::new(MemorySchemaProvider::new()), ); - catalogs.insert( - config.default_catalog.clone(), - Arc::new(default_catalog) as Arc, - ); + let default_catalog: Arc = if config.information_schema { + Arc::new(CatalogWithInformationSchema::new( + catalog_list.clone(), + Arc::new(default_catalog), + )) + } else { + Arc::new(default_catalog) + }; + + catalog_list + .register_catalog(config.default_catalog.clone(), default_catalog); } Self { state: Arc::new(Mutex::new(ExecutionContextState { - catalogs, + catalog_list, scalar_functions: HashMap::new(), var_provider: HashMap::new(), aggregate_functions: HashMap::new(), @@ -310,16 +324,24 @@ impl ExecutionContext { name: impl Into, catalog: Arc, ) -> Option> { - self.state - .lock() - .unwrap() - .catalogs - .insert(name.into(), catalog) + let name = name.into(); + + let state = self.state.lock().unwrap(); + let catalog = if state.config.information_schema { + Arc::new(CatalogWithInformationSchema::new( + state.catalog_list.clone(), + catalog, + )) + } else { + catalog + }; + + state.catalog_list.register_catalog(name, catalog) } /// Retrieves a `CatalogProvider` instance by name pub fn catalog(&self, name: &str) -> Option> { - self.state.lock().unwrap().catalogs.get(name).cloned() + self.state.lock().unwrap().catalog_list.catalog(name) } /// Registers a table using a custom `TableProvider` so that @@ -579,6 +601,9 @@ pub struct ExecutionConfig { default_schema: String, /// Whether the default catalog and schema should be created automatically create_default_catalog_and_schema: bool, + /// Should DataFusion provide access to `information_schema` + /// virtual tables for displaying schema information + information_schema: bool, } impl ExecutionConfig { @@ -598,6 +623,7 @@ impl ExecutionConfig { default_catalog: "datafusion".to_owned(), default_schema: "public".to_owned(), create_default_catalog_and_schema: true, + information_schema: false, } } @@ -651,13 +677,19 @@ impl ExecutionConfig { self.create_default_catalog_and_schema = create; self } + + /// Enables or disables the inclusion of `information_schema` virtual tables + pub fn with_information_schema(mut self, enabled: bool) -> Self { + self.information_schema = enabled; + self + } } /// Execution context for registering data sources and executing queries #[derive(Clone)] pub struct ExecutionContextState { /// Collection of catalogs containing schemas and ultimately TableProviders - pub catalogs: HashMap>, + pub catalog_list: Arc, /// Scalar functions that are registered with the context pub scalar_functions: HashMap>, /// Variable provider that are registered with the context @@ -684,8 +716,8 @@ impl ExecutionContextState { ) -> Result> { let resolved_ref = self.resolve_table_ref(table_ref.into()); - self.catalogs - .get(resolved_ref.catalog) + self.catalog_list + .catalog(resolved_ref.catalog) .ok_or_else(|| { DataFusionError::Plan(format!( "failed to resolve catalog: {}", @@ -2048,6 +2080,130 @@ mod tests { Ok(Arc::new(MemTable::try_new(schema, partitions)?)) } + #[tokio::test] + async fn information_schema_tables_not_exist_by_default() { + let mut ctx = ExecutionContext::new(); + + let err = plan_and_collect(&mut ctx, "SELECT * from information_schema.tables") + .await + .unwrap_err(); + assert_eq!( + err.to_string(), + "Error during planning: Table or CTE with name 'information_schema.tables' not found" + ); + } + + #[tokio::test] + async fn information_schema_tables_no_tables() { + let mut ctx = ExecutionContext::with_config( + ExecutionConfig::new().with_information_schema(true), + ); + + let result = + plan_and_collect(&mut ctx, "SELECT * from information_schema.tables") + .await + .unwrap(); + + let expected = vec![ + "+---------------+--------------------+------------+------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+---------------+--------------------+------------+------------+", + "| datafusion | information_schema | tables | VIEW |", + "+---------------+--------------------+------------+------------+", + ]; + assert_batches_eq!(expected, &result); + } + + #[tokio::test] + async fn information_schema_tables_tables_default_catalog() { + let mut ctx = ExecutionContext::with_config( + ExecutionConfig::new().with_information_schema(true), + ); + + // Now, register an empty table + ctx.register_table("t", table_with_sequence(1, 1).unwrap()) + .unwrap(); + + let result = + plan_and_collect(&mut ctx, "SELECT * from information_schema.tables") + .await + .unwrap(); + + let expected = vec![ + "+---------------+--------------------+------------+------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+---------------+--------------------+------------+------------+", + "| datafusion | information_schema | tables | VIEW |", + "| datafusion | public | t | BASE TABLE |", + "+---------------+--------------------+------------+------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + + // Newly added tables should appear + ctx.register_table("t2", table_with_sequence(1, 1).unwrap()) + .unwrap(); + + let result = + plan_and_collect(&mut ctx, "SELECT * from information_schema.tables") + .await + .unwrap(); + + let expected = vec![ + "+---------------+--------------------+------------+------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+---------------+--------------------+------------+------------+", + "| datafusion | information_schema | tables | VIEW |", + "| datafusion | public | t | BASE TABLE |", + "| datafusion | public | t2 | BASE TABLE |", + "+---------------+--------------------+------------+------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + } + + #[tokio::test] + async fn information_schema_tables_tables_with_multiple_catalogs() { + let mut ctx = ExecutionContext::with_config( + ExecutionConfig::new().with_information_schema(true), + ); + let catalog = MemoryCatalogProvider::new(); + let schema = MemorySchemaProvider::new(); + schema + .register_table("t1".to_owned(), table_with_sequence(1, 1).unwrap()) + .unwrap(); + schema + .register_table("t2".to_owned(), table_with_sequence(1, 1).unwrap()) + .unwrap(); + catalog.register_schema("my_schema", Arc::new(schema)); + ctx.register_catalog("my_catalog", Arc::new(catalog)); + + let catalog = MemoryCatalogProvider::new(); + let schema = MemorySchemaProvider::new(); + schema + .register_table("t3".to_owned(), table_with_sequence(1, 1).unwrap()) + .unwrap(); + catalog.register_schema("my_other_schema", Arc::new(schema)); + ctx.register_catalog("my_other_catalog", Arc::new(catalog)); + + let result = + plan_and_collect(&mut ctx, "SELECT * from information_schema.tables") + .await + .unwrap(); + + let expected = vec![ + "+------------------+--------------------+------------+------------+", + "| table_catalog | table_schema | table_name | table_type |", + "+------------------+--------------------+------------+------------+", + "| datafusion | information_schema | tables | VIEW |", + "| my_catalog | information_schema | tables | VIEW |", + "| my_catalog | my_schema | t1 | BASE TABLE |", + "| my_catalog | my_schema | t2 | BASE TABLE |", + "| my_other_catalog | information_schema | tables | VIEW |", + "| my_other_catalog | my_other_schema | t3 | BASE TABLE |", + "+------------------+--------------------+------------+------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + } + #[tokio::test] async fn disabled_default_catalog_and_schema() -> Result<()> { let mut ctx = ExecutionContext::with_config( diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index 569eb59847e9..fce85e360743 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -30,7 +30,10 @@ use super::{ planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, }; -use crate::physical_plan::{common, ExecutionPlan, Partitioning}; +use crate::{ + catalog::catalog::MemoryCatalogList, + physical_plan::{common, ExecutionPlan, Partitioning}, +}; use crate::{ error::{DataFusionError, Result}, execution::context::ExecutionContextState, @@ -392,7 +395,7 @@ impl RowGroupPredicateBuilder { .collect::>(); let stat_schema = Schema::new(stat_fields); let execution_context_state = ExecutionContextState { - catalogs: HashMap::new(), + catalog_list: Arc::new(MemoryCatalogList::new()), scalar_functions: HashMap::new(), var_provider: HashMap::new(), aggregate_functions: HashMap::new(), diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index a9b1da179ca4..dd4184c5e1c0 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -749,10 +749,13 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { #[cfg(test)] mod tests { use super::*; - use crate::logical_plan::{DFField, DFSchema, DFSchemaRef}; use crate::physical_plan::{csv::CsvReadOptions, expressions, Partitioning}; use crate::prelude::ExecutionConfig; use crate::scalar::ScalarValue; + use crate::{ + catalog::catalog::MemoryCatalogList, + logical_plan::{DFField, DFSchema, DFSchemaRef}, + }; use crate::{ logical_plan::{col, lit, sum, LogicalPlanBuilder}, physical_plan::SendableRecordBatchStream, @@ -764,7 +767,7 @@ mod tests { fn make_ctx_state() -> ExecutionContextState { ExecutionContextState { - catalogs: HashMap::new(), + catalog_list: Arc::new(MemoryCatalogList::new()), scalar_functions: HashMap::new(), var_provider: HashMap::new(), aggregate_functions: HashMap::new(),