From aaf05a40f4005fa5a9736f16af13642d9d8508fb Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 16 Dec 2024 09:46:09 -0800 Subject: [PATCH 1/6] Add asynchronous catalog traits to help users that have asynchronous catalogs --- datafusion/catalog/Cargo.toml | 3 + datafusion/catalog/src/async.rs | 764 ++++++++++++++++++++++++++++++++ datafusion/catalog/src/lib.rs | 2 + 3 files changed, 769 insertions(+) create mode 100644 datafusion/catalog/src/async.rs diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml index f9801352087d8..32a87cc7611cf 100644 --- a/datafusion/catalog/Cargo.toml +++ b/datafusion/catalog/Cargo.toml @@ -36,5 +36,8 @@ datafusion-expr = { workspace = true } datafusion-physical-plan = { workspace = true } parking_lot = { workspace = true } +[dev-dependencies] +tokio = { workspace = true } + [lints] workspace = true diff --git a/datafusion/catalog/src/async.rs b/datafusion/catalog/src/async.rs new file mode 100644 index 0000000000000..3ec83ffd37470 --- /dev/null +++ b/datafusion/catalog/src/async.rs @@ -0,0 +1,764 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion_common::{ + error::{DataFusionError, Result}, + HashMap, TableReference, +}; +use datafusion_execution::config::SessionConfig; + +use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; + +/// A schema provider that looks up tables in a cache +/// +/// This is created by the [`AsyncSchemaProvider::resolve`] method +#[derive(Debug)] +struct ResolvedSchemaProvider { + owner_name: Option, + cached_tables: HashMap>, +} +#[async_trait] +impl SchemaProvider for ResolvedSchemaProvider { + fn owner_name(&self) -> Option<&str> { + self.owner_name.as_ref().map(|o| o.as_str()) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn table_names(&self) -> Vec { + self.cached_tables.keys().cloned().collect() + } + + async fn table(&self, name: &str) -> Result>> { + Ok(self.cached_tables.get(name).cloned()) + } + + #[allow(unused_variables)] + fn register_table( + &self, + name: String, + _table: Arc, + ) -> Result>> { + Err(DataFusionError::Execution(format!( + "Attempt to register table '{name}' with ResolvedSchemaProvider which is not supported" + ))) + } + + #[allow(unused_variables)] + fn deregister_table(&self, name: &str) -> Result>> { + Err(DataFusionError::Execution(format!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported"))) + } + + fn table_exist(&self, name: &str) -> bool { + self.cached_tables.contains_key(name) + } +} + +/// Helper class for building a [`ResolvedSchemaProvider`] +struct ResolvedSchemaProviderBuilder { + owner_name: String, + async_provider: Arc, + cached_tables: HashMap>>, +} +impl ResolvedSchemaProviderBuilder { + fn new(owner_name: String, async_provider: Arc) -> Self { + Self { + owner_name, + async_provider, + cached_tables: HashMap::new(), + } + } + + fn finish(self) -> Arc { + let cached_tables = self + .cached_tables + .into_iter() + .filter_map(|(key, maybe_value)| maybe_value.map(|value| (key, value))) + .collect(); + Arc::new(ResolvedSchemaProvider { + owner_name: Some(self.owner_name), + cached_tables, + }) + } +} + +/// A catalog provider that looks up schemas in a cache +/// +/// This is created by the [`AsyncCatalogProvider::resolve`] method +#[derive(Debug)] +struct ResolvedCatalogProvider { + cached_schemas: HashMap>, +} +impl CatalogProvider for ResolvedCatalogProvider { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema_names(&self) -> Vec { + self.cached_schemas.keys().cloned().collect() + } + + fn schema(&self, name: &str) -> Option> { + self.cached_schemas.get(name).cloned() + } +} + +/// Helper class for building a [`ResolvedCatalogProvider`] +struct ResolvedCatalogProviderBuilder { + cached_schemas: HashMap>, + async_provider: Arc, +} +impl ResolvedCatalogProviderBuilder { + fn new(async_provider: Arc) -> Self { + Self { + cached_schemas: HashMap::new(), + async_provider, + } + } + fn finish(self) -> Arc { + let cached_schemas = self + .cached_schemas + .into_iter() + .filter_map(|(key, maybe_value)| { + maybe_value.map(|value| (key, value.finish())) + }) + .collect(); + Arc::new(ResolvedCatalogProvider { cached_schemas }) + } +} + +/// A catalog provider list that looks up catalogs in a cache +/// +/// This is created by the [`AsyncCatalogProviderList::resolve`] method +#[derive(Debug)] +struct ResolvedCatalogProviderList { + cached_catalogs: HashMap>, +} +impl CatalogProviderList for ResolvedCatalogProviderList { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn register_catalog( + &self, + _name: String, + _catalog: Arc, + ) -> Option> { + unimplemented!("resolved providers cannot handle registration APIs") + } + + fn catalog_names(&self) -> Vec { + self.cached_catalogs.keys().cloned().collect() + } + + fn catalog(&self, name: &str) -> Option> { + self.cached_catalogs.get(name).cloned() + } +} + +/// A trait for schema providers that must resolve tables asynchronously +/// +/// The [`SchemaProvider::table`] method _is_ asynchronous. However, this is primarily for convenience and +/// it is not a good idea for this method to be slow as this will cause poor planning performance. +/// +/// It is a better idea to resolve the tables once and cache them in memory for the duration of +/// planning. This trait helps implement that pattern. +/// +/// After implementing this trait you can call the [`AsyncSchemaProvider::resolve`] method to get an +/// `Arc` that contains a cached copy of the referenced tables. The `resolve` +/// method can be slow and asynchronous as it is only called once, before planning. +#[async_trait] +pub trait AsyncSchemaProvider: Send + Sync { + /// Return the name of the schema provided by this provider + /// + /// If a table reference's schema name does not match this name then the reference will be ignored + /// when calculating the cached set of tables (this allows other providers to supply the table) + fn name(&self) -> &str; + /// Return the name of the catalog this provider belongs to + /// + /// If a table reference's catalog name does not match this name then the reference will be ignored + /// when calculating the cached set of tables (this allows other providers to supply the table) + fn catalog_name(&self) -> &str; + /// Lookup a table in the schema provider + async fn table(&self, name: &str) -> Result>>; + /// Creates a cached provider that can be used to execute a query containing given references + /// + /// This method will walk through the references and look them up once, creating a cache of table + /// providers. This cache will be returned as a synchronous TableProvider that can be used to plan + /// and execute a query containing the given references. + async fn resolve( + &self, + references: &[TableReference], + config: &SessionConfig, + ) -> Result> { + let mut cached_tables = HashMap::>>::new(); + + for reference in references { + let catalog_name = reference + .catalog() + .unwrap_or(&config.options().catalog.default_catalog); + + // Maybe this is a reference to some other catalog provided in another way + if catalog_name != self.catalog_name() { + continue; + } + + let schema_name = reference + .schema() + .unwrap_or(&config.options().catalog.default_schema); + + if schema_name != self.name() { + continue; + } + + if !cached_tables.contains_key(reference.table()) { + let resolved_table = self.table(reference.table()).await?; + cached_tables.insert(reference.table().to_string(), resolved_table); + } + } + + let cached_tables = cached_tables + .into_iter() + .filter_map(|(key, maybe_value)| maybe_value.map(|value| (key, value))) + .collect(); + + Ok(Arc::new(ResolvedSchemaProvider { + cached_tables, + owner_name: Some(self.catalog_name().to_string()), + })) + } +} + +/// A trait for catalog providers that must resolve schemas asynchronously +/// +/// The [`CatalogProvider::schema`] method is synchronous because asynchronous operations should +/// not be used during planning. This trait makes it easy to lookup schema references once and cache +/// them for future planning use. See [`AsyncSchemaProvider`] for more details on motivation. + +#[async_trait] +pub trait AsyncCatalogProvider: Send + Sync { + /// Returns the name of the catalog being provided + /// + /// If a reference's catalog name does not match this name then the reference will be ignored. + /// This allows other providers to potentially provide the reference. + fn name(&self) -> &str; + + /// Lookup a schema in the provider + async fn schema(&self, name: &str) -> Result>>; + + /// Creates a cached provider that can be used to execute a query containing given references + /// + /// This method will walk through the references and look them up once, creating a cache of schema + /// providers (each with their own cache of table providers). This cache will be returned as a + /// synchronous CatalogProvider that can be used to plan and execute a query containing the given + /// references. + async fn resolve( + &self, + references: &[TableReference], + config: &SessionConfig, + ) -> Result> { + let mut cached_schemas = + HashMap::>::new(); + + for reference in references { + let catalog_name = reference + .catalog() + .unwrap_or(&config.options().catalog.default_catalog); + + // Maybe this is a reference to some other catalog provided in another way + if catalog_name != self.name() { + continue; + } + + let schema_name = reference + .schema() + .unwrap_or(&config.options().catalog.default_schema); + + let schema = if let Some(schema) = cached_schemas.get_mut(schema_name) { + schema + } else { + let resolved_schema = self.schema(schema_name).await?; + let resolved_schema = resolved_schema.map(|resolved_schema| { + ResolvedSchemaProviderBuilder::new( + catalog_name.to_string(), + resolved_schema, + ) + }); + cached_schemas.insert(schema_name.to_string(), resolved_schema); + cached_schemas.get_mut(schema_name).unwrap() + }; + + // If we can't find the catalog don't bother checking the table + let Some(schema) = schema else { continue }; + + if !schema.cached_tables.contains_key(reference.table()) { + let resolved_table = + schema.async_provider.table(reference.table()).await?; + schema + .cached_tables + .insert(reference.table().to_string(), resolved_table); + } + } + + let cached_schemas = cached_schemas + .into_iter() + .filter_map(|(key, maybe_builder)| { + maybe_builder.map(|schema_builder| (key, schema_builder.finish())) + }) + .collect::>(); + + Ok(Arc::new(ResolvedCatalogProvider { cached_schemas })) + } +} + +/// A trait for catalog provider lists that must resolve catalogs asynchronously +/// +/// The [`CatalogProviderList::catalog`] method is synchronous because asynchronous operations should +/// not be used during planning. This trait makes it easy to lookup catalog references once and cache +/// them for future planning use. See [`AsyncSchemaProvider`] for more details on motivation. +#[async_trait] +pub trait AsyncCatalogProviderList: Send + Sync { + /// Lookup a catalog in the provider + async fn catalog(&self, name: &str) -> Result>>; + + /// Creates a cached provider that can be used to execute a query containing given references + /// + /// This method will walk through the references and look them up once, creating a cache of catalog + /// providers, schema providers, and table providers. This cache will be returned as a + /// synchronous CatalogProvider that can be used to plan and execute a query containing the given + /// references. + async fn resolve( + &self, + references: &[TableReference], + config: &SessionConfig, + ) -> Result> { + let mut cached_catalogs = + HashMap::>::new(); + + for reference in references { + let catalog_name = reference + .catalog() + .unwrap_or(&config.options().catalog.default_catalog); + + // We will do three lookups here, one for the catalog, one for the schema, and one for the table + // We cache the result (both found results and not-found results) to speed up future lookups + // + // Note that a cache-miss is not an error at this point. We allow for the possibility that + // other providers may supply the reference. + // + // If this is the only provider then a not-found error will be raised during planning when it can't + // find the reference in the cache. + + let catalog = if let Some(catalog) = cached_catalogs.get_mut(catalog_name) { + catalog + } else { + let resolved_catalog = self.catalog(catalog_name).await?; + let resolved_catalog = resolved_catalog + .map(|async_cat| ResolvedCatalogProviderBuilder::new(async_cat)); + cached_catalogs.insert(catalog_name.to_string(), resolved_catalog); + cached_catalogs.get_mut(catalog_name).unwrap() + }; + + // If we can't find the catalog don't bother checking the schema / table + let Some(catalog) = catalog else { continue }; + + let schema_name = reference + .schema() + .unwrap_or(&config.options().catalog.default_schema); + + let schema = if let Some(schema) = catalog.cached_schemas.get_mut(schema_name) + { + schema + } else { + let resolved_schema = catalog.async_provider.schema(schema_name).await?; + let resolved_schema = resolved_schema.map(|async_schema| { + ResolvedSchemaProviderBuilder::new( + catalog_name.to_string(), + async_schema, + ) + }); + catalog + .cached_schemas + .insert(schema_name.to_string(), resolved_schema); + catalog.cached_schemas.get_mut(schema_name).unwrap() + }; + + // If we can't find the catalog don't bother checking the table + let Some(schema) = schema else { continue }; + + if !schema.cached_tables.contains_key(reference.table()) { + let resolved_table = + schema.async_provider.table(reference.table()).await?; + schema + .cached_tables + .insert(reference.table().to_string(), resolved_table); + } + } + + // Build the cached catalog provider list + let cached_catalogs = cached_catalogs + .into_iter() + .filter_map(|(key, maybe_builder)| { + maybe_builder.map(|catalog_builder| (key, catalog_builder.finish())) + }) + .collect::>(); + + Ok(Arc::new(ResolvedCatalogProviderList { cached_catalogs })) + } +} + +#[cfg(test)] +mod tests { + use std::{ + any::Any, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + }; + + use arrow_schema::SchemaRef; + use async_trait::async_trait; + use datafusion_common::{error::Result, Statistics, TableReference}; + use datafusion_execution::config::SessionConfig; + use datafusion_expr::{Expr, TableType}; + use datafusion_physical_plan::ExecutionPlan; + + use crate::{Session, TableProvider}; + + use super::{AsyncCatalogProvider, AsyncCatalogProviderList, AsyncSchemaProvider}; + + #[derive(Debug)] + struct MockTableProvider {} + #[async_trait] + impl TableProvider for MockTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + /// Get a reference to the schema for this table + fn schema(&self) -> SchemaRef { + unimplemented!() + } + + fn table_type(&self) -> TableType { + unimplemented!() + } + + async fn scan( + &self, + _state: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + unimplemented!() + } + + fn statistics(&self) -> Option { + unimplemented!() + } + } + + #[derive(Default)] + struct MockAsyncSchemaProvider { + lookup_count: AtomicU32, + } + + const MOCK_CATALOG: &str = "mock_catalog"; + const MOCK_SCHEMA: &str = "mock_schema"; + const MOCK_TABLE: &str = "mock_table"; + + #[async_trait] + impl AsyncSchemaProvider for MockAsyncSchemaProvider { + fn name(&self) -> &str { + MOCK_SCHEMA + } + fn catalog_name(&self) -> &str { + MOCK_CATALOG + } + async fn table(&self, name: &str) -> Result>> { + self.lookup_count.fetch_add(1, Ordering::Release); + if name == MOCK_TABLE { + Ok(Some(Arc::new(MockTableProvider {}))) + } else { + Ok(None) + } + } + } + + fn test_config() -> SessionConfig { + let mut config = SessionConfig::default(); + config.options_mut().catalog.default_catalog = MOCK_CATALOG.to_string(); + config.options_mut().catalog.default_schema = MOCK_SCHEMA.to_string(); + config + } + + #[tokio::test] + async fn test_async_schema_provider_resolve() { + async fn check( + refs: Vec, + expected_lookup_count: u32, + found_tables: &[&str], + not_found_tables: &[&str], + ) { + let async_provider = MockAsyncSchemaProvider::default(); + let cached_provider = + async_provider.resolve(&refs, &test_config()).await.unwrap(); + + assert_eq!( + async_provider.lookup_count.load(Ordering::Acquire), + expected_lookup_count + ); + + for table_ref in found_tables { + let table = cached_provider.table(table_ref).await.unwrap(); + assert!(table.is_some()); + } + + for table_ref in not_found_tables { + assert!(cached_provider.table(table_ref).await.unwrap().is_none()); + } + } + + // Basic full lookups + check( + vec![ + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE), + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "not_exists"), + ], + 2, + &[MOCK_TABLE], + &["not_exists"], + ) + .await; + + // Catalog / schema mismatch doesn't even search + check( + vec![ + TableReference::full(MOCK_CATALOG, "foo", MOCK_TABLE), + TableReference::full("foo", MOCK_SCHEMA, MOCK_TABLE), + ], + 0, + &[], + &[MOCK_TABLE], + ) + .await; + + // Both hits and misses cached + check( + vec![ + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE), + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE), + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "not_exists"), + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "not_exists"), + ], + 2, + &[MOCK_TABLE], + &["not_exists"], + ) + .await; + } + + #[derive(Default)] + struct MockAsyncCatalogProvider { + lookup_count: AtomicU32, + } + + #[async_trait] + impl AsyncCatalogProvider for MockAsyncCatalogProvider { + fn name(&self) -> &str { + MOCK_CATALOG + } + async fn schema( + &self, + name: &str, + ) -> Result>> { + self.lookup_count.fetch_add(1, Ordering::Release); + if name == MOCK_SCHEMA { + Ok(Some(Arc::new(MockAsyncSchemaProvider::default()))) + } else { + Ok(None) + } + } + } + + #[tokio::test] + async fn test_async_catalog_provider_resolve() { + async fn check( + refs: Vec, + expected_lookup_count: u32, + found_schemas: &[&str], + not_found_schemas: &[&str], + ) { + let async_provider = MockAsyncCatalogProvider::default(); + let cached_provider = + async_provider.resolve(&refs, &test_config()).await.unwrap(); + + assert_eq!( + async_provider.lookup_count.load(Ordering::Acquire), + expected_lookup_count + ); + + for schema_ref in found_schemas { + let schema = cached_provider.schema(schema_ref); + assert!(schema.is_some()); + } + + for schema_ref in not_found_schemas { + assert!(cached_provider.schema(schema_ref).is_none()); + } + } + + // Basic full lookups + check( + vec![ + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "x"), + TableReference::full(MOCK_CATALOG, "not_exists", "x"), + ], + 2, + &[MOCK_SCHEMA], + &["not_exists"], + ) + .await; + + // Catalog mismatch doesn't even search + check( + vec![TableReference::full("foo", MOCK_SCHEMA, "x")], + 0, + &[], + &[MOCK_SCHEMA], + ) + .await; + + // Both hits and misses cached + check( + vec![ + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "x"), + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, "x"), + TableReference::full(MOCK_CATALOG, "not_exists", "x"), + TableReference::full(MOCK_CATALOG, "not_exists", "x"), + ], + 2, + &[MOCK_SCHEMA], + &["not_exists"], + ) + .await; + } + + #[derive(Default)] + struct MockAsyncCatalogProviderList { + lookup_count: AtomicU32, + } + + #[async_trait] + impl AsyncCatalogProviderList for MockAsyncCatalogProviderList { + async fn catalog( + &self, + name: &str, + ) -> Result>> { + self.lookup_count.fetch_add(1, Ordering::Release); + if name == MOCK_CATALOG { + Ok(Some(Arc::new(MockAsyncCatalogProvider::default()))) + } else { + Ok(None) + } + } + } + + #[tokio::test] + async fn test_async_catalog_provider_list_resolve() { + async fn check( + refs: Vec, + expected_lookup_count: u32, + found_catalogs: &[&str], + not_found_catalogs: &[&str], + ) { + let async_provider = MockAsyncCatalogProviderList::default(); + let cached_provider = + async_provider.resolve(&refs, &test_config()).await.unwrap(); + + assert_eq!( + async_provider.lookup_count.load(Ordering::Acquire), + expected_lookup_count + ); + + for catalog_ref in found_catalogs { + let catalog = cached_provider.catalog(catalog_ref); + assert!(catalog.is_some()); + } + + for catalog_ref in not_found_catalogs { + assert!(cached_provider.catalog(catalog_ref).is_none()); + } + } + + // Basic full lookups + check( + vec![ + TableReference::full(MOCK_CATALOG, "x", "x"), + TableReference::full("not_exists", "x", "x"), + ], + 2, + &[MOCK_CATALOG], + &["not_exists"], + ) + .await; + + // Both hits and misses cached + check( + vec![ + TableReference::full(MOCK_CATALOG, "x", "x"), + TableReference::full(MOCK_CATALOG, "x", "x"), + TableReference::full("not_exists", "x", "x"), + TableReference::full("not_exists", "x", "x"), + ], + 2, + &[MOCK_CATALOG], + &["not_exists"], + ) + .await; + } + + #[tokio::test] + async fn test_defaults() { + for table_ref in &[ + TableReference::full(MOCK_CATALOG, MOCK_SCHEMA, MOCK_TABLE), + TableReference::partial(MOCK_SCHEMA, MOCK_TABLE), + TableReference::bare(MOCK_TABLE), + ] { + let async_provider = MockAsyncCatalogProviderList::default(); + let cached_provider = async_provider + .resolve(&[table_ref.clone()], &test_config()) + .await + .unwrap(); + + let catalog = cached_provider + .catalog(table_ref.catalog().unwrap_or(MOCK_CATALOG)) + .unwrap(); + let schema = catalog + .schema(table_ref.schema().unwrap_or(MOCK_SCHEMA)) + .unwrap(); + assert!(schema.table(table_ref.table()).await.unwrap().is_some()); + } + } +} diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs index 21630f267d2c7..3cf2a3b3cd332 100644 --- a/datafusion/catalog/src/lib.rs +++ b/datafusion/catalog/src/lib.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +mod r#async; mod catalog; mod dynamic_file; mod schema; @@ -23,6 +24,7 @@ mod table; pub use catalog::*; pub use dynamic_file::catalog::*; +pub use r#async::*; pub use schema::*; pub use session::*; pub use table::*; From 31f957db6fb365b060d108943a57627b7e0a65e8 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 16 Dec 2024 11:03:04 -0800 Subject: [PATCH 2/6] Apply clippy suggestions --- datafusion/catalog/src/async.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/catalog/src/async.rs b/datafusion/catalog/src/async.rs index 3ec83ffd37470..e256e8c42a0fa 100644 --- a/datafusion/catalog/src/async.rs +++ b/datafusion/catalog/src/async.rs @@ -37,7 +37,7 @@ struct ResolvedSchemaProvider { #[async_trait] impl SchemaProvider for ResolvedSchemaProvider { fn owner_name(&self) -> Option<&str> { - self.owner_name.as_ref().map(|o| o.as_str()) + self.owner_name.as_deref() } fn as_any(&self) -> &dyn std::any::Any { @@ -372,8 +372,8 @@ pub trait AsyncCatalogProviderList: Send + Sync { catalog } else { let resolved_catalog = self.catalog(catalog_name).await?; - let resolved_catalog = resolved_catalog - .map(|async_cat| ResolvedCatalogProviderBuilder::new(async_cat)); + let resolved_catalog = + resolved_catalog.map(ResolvedCatalogProviderBuilder::new); cached_catalogs.insert(catalog_name.to_string(), resolved_catalog); cached_catalogs.get_mut(catalog_name).unwrap() }; From 5a1c3fc80e1b21a5bc7038a72e2879e5300cbc1a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 18 Dec 2024 06:08:51 -0800 Subject: [PATCH 3/6] Address PR reviews --- datafusion/catalog/src/async.rs | 51 +++++++++++++++++---------------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/datafusion/catalog/src/async.rs b/datafusion/catalog/src/async.rs index e256e8c42a0fa..10ec472bbfffb 100644 --- a/datafusion/catalog/src/async.rs +++ b/datafusion/catalog/src/async.rs @@ -18,17 +18,14 @@ use std::sync::Arc; use async_trait::async_trait; -use datafusion_common::{ - error::{DataFusionError, Result}, - HashMap, TableReference, -}; +use datafusion_common::{error::Result, not_impl_err, HashMap, TableReference}; use datafusion_execution::config::SessionConfig; use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; /// A schema provider that looks up tables in a cache /// -/// This is created by the [`AsyncSchemaProvider::resolve`] method +/// Instances are created by the [`AsyncSchemaProvider::resolve`] method #[derive(Debug)] struct ResolvedSchemaProvider { owner_name: Option, @@ -58,14 +55,14 @@ impl SchemaProvider for ResolvedSchemaProvider { name: String, _table: Arc, ) -> Result>> { - Err(DataFusionError::Execution(format!( + not_impl_err!( "Attempt to register table '{name}' with ResolvedSchemaProvider which is not supported" - ))) + ) } #[allow(unused_variables)] fn deregister_table(&self, name: &str) -> Result>> { - Err(DataFusionError::Execution(format!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported"))) + not_impl_err!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported") } fn table_exist(&self, name: &str) -> bool { @@ -88,6 +85,15 @@ impl ResolvedSchemaProviderBuilder { } } + async fn resolve_table(&mut self, table_name: &str) -> Result<()> { + if !self.cached_tables.contains_key(table_name) { + let resolved_table = self.async_provider.table(table_name).await?; + self.cached_tables + .insert(table_name.to_string(), resolved_table); + } + Ok(()) + } + fn finish(self) -> Arc { let cached_tables = self .cached_tables @@ -103,7 +109,7 @@ impl ResolvedSchemaProviderBuilder { /// A catalog provider that looks up schemas in a cache /// -/// This is created by the [`AsyncCatalogProvider::resolve`] method +/// Instances are created by the [`AsyncCatalogProvider::resolve`] method #[derive(Debug)] struct ResolvedCatalogProvider { cached_schemas: HashMap>, @@ -148,7 +154,7 @@ impl ResolvedCatalogProviderBuilder { /// A catalog provider list that looks up catalogs in a cache /// -/// This is created by the [`AsyncCatalogProviderList::resolve`] method +/// Instances are created by the [`AsyncCatalogProviderList::resolve`] method #[derive(Debug)] struct ResolvedCatalogProviderList { cached_catalogs: HashMap>, @@ -205,6 +211,9 @@ pub trait AsyncSchemaProvider: Send + Sync { /// This method will walk through the references and look them up once, creating a cache of table /// providers. This cache will be returned as a synchronous TableProvider that can be used to plan /// and execute a query containing the given references. + /// + /// This cache is intended to be short-lived for the execution of a single query. There is no mechanism + /// for refresh or eviction of stale entries. async fn resolve( &self, references: &[TableReference], @@ -271,6 +280,9 @@ pub trait AsyncCatalogProvider: Send + Sync { /// providers (each with their own cache of table providers). This cache will be returned as a /// synchronous CatalogProvider that can be used to plan and execute a query containing the given /// references. + /// + /// This cache is intended to be short-lived for the execution of a single query. There is no mechanism + /// for refresh or eviction of stale entries. async fn resolve( &self, references: &[TableReference], @@ -310,13 +322,7 @@ pub trait AsyncCatalogProvider: Send + Sync { // If we can't find the catalog don't bother checking the table let Some(schema) = schema else { continue }; - if !schema.cached_tables.contains_key(reference.table()) { - let resolved_table = - schema.async_provider.table(reference.table()).await?; - schema - .cached_tables - .insert(reference.table().to_string(), resolved_table); - } + schema.resolve_table(reference.table()).await?; } let cached_schemas = cached_schemas @@ -346,6 +352,9 @@ pub trait AsyncCatalogProviderList: Send + Sync { /// providers, schema providers, and table providers. This cache will be returned as a /// synchronous CatalogProvider that can be used to plan and execute a query containing the given /// references. + /// + /// This cache is intended to be short-lived for the execution of a single query. There is no mechanism + /// for refresh or eviction of stale entries. async fn resolve( &self, references: &[TableReference], @@ -405,13 +414,7 @@ pub trait AsyncCatalogProviderList: Send + Sync { // If we can't find the catalog don't bother checking the table let Some(schema) = schema else { continue }; - if !schema.cached_tables.contains_key(reference.table()) { - let resolved_table = - schema.async_provider.table(reference.table()).await?; - schema - .cached_tables - .insert(reference.table().to_string(), resolved_table); - } + schema.resolve_table(reference.table()).await?; } // Build the cached catalog provider list From d27dcab916747e58bb8df0a2012221ed1951825d Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 18 Dec 2024 06:12:41 -0800 Subject: [PATCH 4/6] Remove allow_unused exceptions --- datafusion/catalog/src/async.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/catalog/src/async.rs b/datafusion/catalog/src/async.rs index 10ec472bbfffb..bf02c056a274a 100644 --- a/datafusion/catalog/src/async.rs +++ b/datafusion/catalog/src/async.rs @@ -49,7 +49,6 @@ impl SchemaProvider for ResolvedSchemaProvider { Ok(self.cached_tables.get(name).cloned()) } - #[allow(unused_variables)] fn register_table( &self, name: String, @@ -60,7 +59,6 @@ impl SchemaProvider for ResolvedSchemaProvider { ) } - #[allow(unused_variables)] fn deregister_table(&self, name: &str) -> Result>> { not_impl_err!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported") } From 29e8976053f86da8ae9cdeea47daa615f3e0f29e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 2 Jan 2025 17:29:58 -0800 Subject: [PATCH 5/6] Update remote catalog example to demonstrate new helper structs --- .../examples/remote_catalog.rs | 196 +++++------------- 1 file changed, 49 insertions(+), 147 deletions(-) diff --git a/datafusion-examples/examples/remote_catalog.rs b/datafusion-examples/examples/remote_catalog.rs index 206b7ba9c4bef..94e66b8494658 100644 --- a/datafusion-examples/examples/remote_catalog.rs +++ b/datafusion-examples/examples/remote_catalog.rs @@ -32,22 +32,19 @@ use arrow::array::record_batch; use arrow_schema::{Field, Fields, Schema, SchemaRef}; use async_trait::async_trait; -use datafusion::catalog::{SchemaProvider, TableProvider}; -use datafusion::common::DataFusionError; +use datafusion::catalog::TableProvider; use datafusion::common::Result; use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{DataFrame, SessionContext}; -use datafusion_catalog::Session; -use datafusion_common::{ - assert_batches_eq, internal_datafusion_err, plan_err, HashMap, TableReference, -}; +use datafusion_catalog::{AsyncSchemaProvider, Session}; +use datafusion_common::{assert_batches_eq, internal_datafusion_err, plan_err}; use datafusion_expr::{Expr, TableType}; use futures::TryStreamExt; use std::any::Any; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; #[tokio::main] async fn main() -> Result<()> { @@ -55,23 +52,18 @@ async fn main() -> Result<()> { let ctx = SessionContext::new(); // Make a connection to the remote catalog, asynchronously, and configure it - let remote_catalog_interface = RemoteCatalogInterface::connect().await?; - - // Register a SchemaProvider for tables in a schema named "remote_schema". - // - // This will let DataFusion query tables such as - // `datafusion.remote_schema.remote_table` - let remote_schema: Arc = - Arc::new(RemoteSchema::new(Arc::new(remote_catalog_interface))); - ctx.catalog("datafusion") - .ok_or_else(|| internal_datafusion_err!("default catalog was not installed"))? - .register_schema("remote_schema", Arc::clone(&remote_schema))?; + let remote_catalog_interface = Arc::new(RemoteCatalogInterface::connect().await?); + + // Create an adapter to provide the AsyncSchemaProvider interface to DataFusion + // based on our remote catalog interface + let remote_catalog_adapter = RemoteCatalogDatafusionAdapter(remote_catalog_interface); // Here is a query that selects data from a table in the remote catalog. let sql = "SELECT * from remote_schema.remote_table"; // The `SessionContext::sql` interface is async, but it does not - // support asynchronous access to catalogs, so the following query errors. + // support asynchronous access to catalogs, so we cannot register our schema provider + // directly and the following query fails to find our table. let results = ctx.sql(sql).await; assert_eq!( results.unwrap_err().to_string(), @@ -91,27 +83,26 @@ async fn main() -> Result<()> { // `remote_schema.remote_table`) let references = state.resolve_table_references(&statement)?; - // Call `load_tables` to load information from the remote catalog for each - // of the referenced tables. Best practice is to fetch the the information - // for all tables required by the query once (rather than one per table) to - // minimize network overhead - let table_names = references.iter().filter_map(|r| { - if refers_to_schema("datafusion", "remote_schema", r) { - Some(r.table()) - } else { - None - } - }); - remote_schema - .as_any() - .downcast_ref::() - .expect("correct types") - .load_tables(table_names) + // Now we can asynchronously resolve the table references to get a cached catalog + // that we can use for our query + let resolved_catalog = remote_catalog_adapter + .resolve(&references, state.config()) .await?; - // Now continue planing the query after having fetched the remote table and - // it can run as normal - let plan = state.statement_to_plan(statement).await?; + // This resolved catalog only makes sense for this query and so we create a clone + // of the session context with the resolved catalog + let query_ctx = ctx.clone(); + + query_ctx + .catalog("datafusion") + .ok_or_else(|| internal_datafusion_err!("default catalog was not installed"))? + .register_schema("remote_schema", resolved_catalog)?; + + // We can now continue planning the query with this new query-specific context that + // contains our cached catalog + let query_state = query_ctx.state(); + + let plan = query_state.statement_to_plan(statement).await?; let results = DataFrame::new(state, plan).collect().await?; assert_batches_eq!( [ @@ -145,9 +136,9 @@ impl RemoteCatalogInterface { } /// Fetches information for a specific table - pub async fn table_info(&self, name: &str) -> Result { + pub async fn table_info(&self, name: &str) -> Result> { if name != "remote_table" { - return plan_err!("Remote table not found: {}", name); + return Ok(None); } // In this example, we'll model a remote table with columns "id" and @@ -159,7 +150,7 @@ impl RemoteCatalogInterface { Field::new("id", arrow::datatypes::DataType::Int32, false), Field::new("name", arrow::datatypes::DataType::Utf8, false), ])); - Ok(Arc::new(schema)) + Ok(Some(Arc::new(schema))) } /// Fetches data for a table from a remote data source @@ -186,95 +177,30 @@ impl RemoteCatalogInterface { } } -/// Implements the DataFusion Catalog API interface for tables +/// Implements the DataFusion SchemaProvider API for tables /// stored in a remote catalog. -#[derive(Debug)] -struct RemoteSchema { - /// Connection with the remote catalog - remote_catalog_interface: Arc, - /// Local cache of tables that have been preloaded from the remote - /// catalog - tables: Mutex>>, -} - -impl RemoteSchema { - /// Create a new RemoteSchema - pub fn new(remote_catalog_interface: Arc) -> Self { - Self { - remote_catalog_interface, - tables: Mutex::new(HashMap::new()), - } - } - - /// Load information for the specified tables from the remote source into - /// the local cached copy. - pub async fn load_tables( - &self, - references: impl IntoIterator, - ) -> Result<()> { - for table_name in references { - if !self.table_exist(table_name) { - // Fetch information about the table from the remote catalog - // - // Note that a real remote catalog interface could return more - // information, but at the minimum, DataFusion requires the - // table's schema for planing. - let schema = self.remote_catalog_interface.table_info(table_name).await?; - let remote_table = RemoteTable::new( - Arc::clone(&self.remote_catalog_interface), - table_name, - schema, - ); - - // Add the table to our local cached list - self.tables - .lock() - .expect("mutex invalid") - .insert(table_name.to_string(), Arc::new(remote_table)); - }; - } - Ok(()) - } -} +struct RemoteCatalogDatafusionAdapter(Arc); -/// Implement the DataFusion Catalog API for [`RemoteSchema`] #[async_trait] -impl SchemaProvider for RemoteSchema { - fn as_any(&self) -> &dyn Any { - self - } - - fn table_names(&self) -> Vec { - // Note this API is not async so we can't directly call the RemoteCatalogInterface - // instead we use the cached list of loaded tables - self.tables - .lock() - .expect("mutex valid") - .keys() - .cloned() - .collect() +impl AsyncSchemaProvider for RemoteCatalogDatafusionAdapter { + fn name(&self) -> &str { + "remote_schema" } - // While this API is actually `async` and thus could consult a remote - // catalog directly it is more efficient to use a local cached copy instead, - // which is what we model in this example - async fn table( - &self, - name: &str, - ) -> Result>, DataFusionError> { - // Look for any pre-loaded tables - let table = self - .tables - .lock() - .expect("mutex valid") - .get(name) - .map(Arc::clone); - Ok(table) + fn catalog_name(&self) -> &str { + "datafusion" } - fn table_exist(&self, name: &str) -> bool { - // Look for any pre-loaded tables, note this function is also `async` - self.tables.lock().expect("mutex valid").contains_key(name) + async fn table(&self, name: &str) -> Result>> { + // Fetch information about the table from the remote catalog + // + // Note that a real remote catalog interface could return more + // information, but at the minimum, DataFusion requires the + // table's schema for planing. + Ok(self.0.table_info(name).await?.map(|schema| { + Arc::new(RemoteTable::new(Arc::clone(&self.0), name, schema)) + as Arc + })) } } @@ -343,27 +269,3 @@ impl TableProvider for RemoteTable { )?)) } } - -/// Return true if this `table_reference` might be for a table in the specified -/// catalog and schema. -fn refers_to_schema( - catalog_name: &str, - schema_name: &str, - table_reference: &TableReference, -) -> bool { - // Check the references are in the correct catalog and schema - // references like foo.bar.baz - if let Some(catalog) = table_reference.catalog() { - if catalog != catalog_name { - return false; - } - } - // references like bar.baz - if let Some(schema) = table_reference.schema() { - if schema != schema_name { - return false; - } - } - - true -} From a6e17f18523d502b8e32aaafd20b64ce3d5f260e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 3 Jan 2025 05:54:25 -0800 Subject: [PATCH 6/6] Move schema_name / catalog_name parameters into resolve function and out of trait --- .../examples/remote_catalog.rs | 12 +---- datafusion/catalog/src/async.rs | 54 +++++++------------ 2 files changed, 20 insertions(+), 46 deletions(-) diff --git a/datafusion-examples/examples/remote_catalog.rs b/datafusion-examples/examples/remote_catalog.rs index 94e66b8494658..38629328d71c4 100644 --- a/datafusion-examples/examples/remote_catalog.rs +++ b/datafusion-examples/examples/remote_catalog.rs @@ -86,7 +86,7 @@ async fn main() -> Result<()> { // Now we can asynchronously resolve the table references to get a cached catalog // that we can use for our query let resolved_catalog = remote_catalog_adapter - .resolve(&references, state.config()) + .resolve(&references, state.config(), "datafusion", "remote_schema") .await?; // This resolved catalog only makes sense for this query and so we create a clone @@ -177,20 +177,12 @@ impl RemoteCatalogInterface { } } -/// Implements the DataFusion SchemaProvider API for tables +/// Implements an async version of the DataFusion SchemaProvider API for tables /// stored in a remote catalog. struct RemoteCatalogDatafusionAdapter(Arc); #[async_trait] impl AsyncSchemaProvider for RemoteCatalogDatafusionAdapter { - fn name(&self) -> &str { - "remote_schema" - } - - fn catalog_name(&self) -> &str { - "datafusion" - } - async fn table(&self, name: &str) -> Result>> { // Fetch information about the table from the remote catalog // diff --git a/datafusion/catalog/src/async.rs b/datafusion/catalog/src/async.rs index bf02c056a274a..504f20ff95431 100644 --- a/datafusion/catalog/src/async.rs +++ b/datafusion/catalog/src/async.rs @@ -192,16 +192,6 @@ impl CatalogProviderList for ResolvedCatalogProviderList { /// method can be slow and asynchronous as it is only called once, before planning. #[async_trait] pub trait AsyncSchemaProvider: Send + Sync { - /// Return the name of the schema provided by this provider - /// - /// If a table reference's schema name does not match this name then the reference will be ignored - /// when calculating the cached set of tables (this allows other providers to supply the table) - fn name(&self) -> &str; - /// Return the name of the catalog this provider belongs to - /// - /// If a table reference's catalog name does not match this name then the reference will be ignored - /// when calculating the cached set of tables (this allows other providers to supply the table) - fn catalog_name(&self) -> &str; /// Lookup a table in the schema provider async fn table(&self, name: &str) -> Result>>; /// Creates a cached provider that can be used to execute a query containing given references @@ -216,24 +206,26 @@ pub trait AsyncSchemaProvider: Send + Sync { &self, references: &[TableReference], config: &SessionConfig, + catalog_name: &str, + schema_name: &str, ) -> Result> { let mut cached_tables = HashMap::>>::new(); for reference in references { - let catalog_name = reference + let ref_catalog_name = reference .catalog() .unwrap_or(&config.options().catalog.default_catalog); // Maybe this is a reference to some other catalog provided in another way - if catalog_name != self.catalog_name() { + if ref_catalog_name != catalog_name { continue; } - let schema_name = reference + let ref_schema_name = reference .schema() .unwrap_or(&config.options().catalog.default_schema); - if schema_name != self.name() { + if ref_schema_name != schema_name { continue; } @@ -250,7 +242,7 @@ pub trait AsyncSchemaProvider: Send + Sync { Ok(Arc::new(ResolvedSchemaProvider { cached_tables, - owner_name: Some(self.catalog_name().to_string()), + owner_name: Some(catalog_name.to_string()), })) } } @@ -263,12 +255,6 @@ pub trait AsyncSchemaProvider: Send + Sync { #[async_trait] pub trait AsyncCatalogProvider: Send + Sync { - /// Returns the name of the catalog being provided - /// - /// If a reference's catalog name does not match this name then the reference will be ignored. - /// This allows other providers to potentially provide the reference. - fn name(&self) -> &str; - /// Lookup a schema in the provider async fn schema(&self, name: &str) -> Result>>; @@ -285,17 +271,18 @@ pub trait AsyncCatalogProvider: Send + Sync { &self, references: &[TableReference], config: &SessionConfig, + catalog_name: &str, ) -> Result> { let mut cached_schemas = HashMap::>::new(); for reference in references { - let catalog_name = reference + let ref_catalog_name = reference .catalog() .unwrap_or(&config.options().catalog.default_catalog); // Maybe this is a reference to some other catalog provided in another way - if catalog_name != self.name() { + if ref_catalog_name != catalog_name { continue; } @@ -491,12 +478,6 @@ mod tests { #[async_trait] impl AsyncSchemaProvider for MockAsyncSchemaProvider { - fn name(&self) -> &str { - MOCK_SCHEMA - } - fn catalog_name(&self) -> &str { - MOCK_CATALOG - } async fn table(&self, name: &str) -> Result>> { self.lookup_count.fetch_add(1, Ordering::Release); if name == MOCK_TABLE { @@ -523,8 +504,10 @@ mod tests { not_found_tables: &[&str], ) { let async_provider = MockAsyncSchemaProvider::default(); - let cached_provider = - async_provider.resolve(&refs, &test_config()).await.unwrap(); + let cached_provider = async_provider + .resolve(&refs, &test_config(), MOCK_CATALOG, MOCK_SCHEMA) + .await + .unwrap(); assert_eq!( async_provider.lookup_count.load(Ordering::Acquire), @@ -587,9 +570,6 @@ mod tests { #[async_trait] impl AsyncCatalogProvider for MockAsyncCatalogProvider { - fn name(&self) -> &str { - MOCK_CATALOG - } async fn schema( &self, name: &str, @@ -612,8 +592,10 @@ mod tests { not_found_schemas: &[&str], ) { let async_provider = MockAsyncCatalogProvider::default(); - let cached_provider = - async_provider.resolve(&refs, &test_config()).await.unwrap(); + let cached_provider = async_provider + .resolve(&refs, &test_config(), MOCK_CATALOG) + .await + .unwrap(); assert_eq!( async_provider.lookup_count.load(Ordering::Acquire),