From 213e19ebd28e9af9c9c07f899c713f56fc76878b Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Thu, 15 Aug 2024 00:06:57 +0800 Subject: [PATCH] disable `with_create_default_catalog_and_schema` if the default catalog exists --- .../core/src/execution/session_state.rs | 98 +++++++++++++++++-- 1 file changed, 90 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 0a057d6f1417..e9c876291845 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -987,8 +987,24 @@ impl SessionStateBuilder { /// Returns a new [SessionStateBuilder] based on an existing [SessionState] /// The session id for the new builder will be unset; all other fields will - /// be cloned from what is set in the provided session state + /// be cloned from what is set in the provided session state. If the default + /// catalog exists in existing session state, the new session state will not + /// create default catalog and schema. pub fn new_from_existing(existing: SessionState) -> Self { + let default_catalog_exist = existing + .catalog_list() + .catalog(&existing.config.options().catalog.default_catalog) + .is_some(); + // The new `with_create_default_catalog_and_schema` should be false if the default catalog exists + let create_default_catalog_and_schema = existing + .config + .options() + .catalog + .create_default_catalog_and_schema + && !default_catalog_exist; + let new_config = existing + .config + .with_create_default_catalog_and_schema(create_default_catalog_and_schema); Self { session_id: None, analyzer: Some(existing.analyzer), @@ -1005,7 +1021,7 @@ impl SessionStateBuilder { window_functions: Some(existing.window_functions.into_values().collect_vec()), serializer_registry: Some(existing.serializer_registry), file_formats: Some(existing.file_formats.into_values().collect_vec()), - config: Some(existing.config), + config: Some(new_config), table_options: Some(existing.table_options), execution_props: Some(existing.execution_props), table_factories: Some(existing.table_factories), @@ -1801,17 +1817,19 @@ impl<'a> SimplifyInfo for SessionSimplifyProvider<'a> { #[cfg(test)] mod tests { - use std::collections::HashMap; - + use super::{SessionContextProvider, SessionStateBuilder}; + use crate::catalog_common::MemoryCatalogProviderList; + use crate::datasource::MemTable; + use crate::execution::context::SessionState; + use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; use datafusion_common::DFSchema; use datafusion_common::Result; + use datafusion_execution::config::SessionConfig; use datafusion_expr::Expr; use datafusion_sql::planner::{PlannerContext, SqlToRel}; - - use crate::execution::context::SessionState; - - use super::{SessionContextProvider, SessionStateBuilder}; + use std::collections::HashMap; + use std::sync::Arc; #[test] fn test_session_state_with_default_features() { @@ -1841,4 +1859,68 @@ mod tests { assert!(sql_to_expr(&state).is_err()) } + + #[test] + fn test_from_existing() -> Result<()> { + fn employee_batch() -> RecordBatch { + let name: ArrayRef = + Arc::new(StringArray::from_iter_values(["Andy", "Andrew"])); + let age: ArrayRef = Arc::new(Int32Array::from(vec![11, 22])); + RecordBatch::try_from_iter(vec![("name", name), ("age", age)]).unwrap() + } + let batch = employee_batch(); + let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; + + let session_state = SessionStateBuilder::new() + .with_catalog_list(Arc::new(MemoryCatalogProviderList::new())) + .build(); + let table_ref = session_state.resolve_table_ref("employee").to_string(); + session_state + .schema_for_ref(&table_ref)? + .register_table("employee".to_string(), Arc::new(table))?; + + let default_catalog = session_state + .config + .options() + .catalog + .default_catalog + .clone(); + let default_schema = session_state + .config + .options() + .catalog + .default_schema + .clone(); + let is_exist = session_state + .catalog_list() + .catalog(default_catalog.as_str()) + .unwrap() + .schema(default_schema.as_str()) + .unwrap() + .table_exist("employee"); + assert!(is_exist); + let new_state = SessionStateBuilder::new_from_existing(session_state).build(); + assert!(new_state + .catalog_list() + .catalog(default_catalog.as_str()) + .unwrap() + .schema(default_schema.as_str()) + .unwrap() + .table_exist("employee")); + + // if `with_create_default_catalog_and_schema` is disabled, the new one shouldn't create default catalog and schema + let disable_create_default = + SessionConfig::default().with_create_default_catalog_and_schema(false); + let without_default_state = SessionStateBuilder::new() + .with_config(disable_create_default) + .build(); + assert!(without_default_state + .catalog_list() + .catalog(&default_catalog) + .is_none()); + let new_state = + SessionStateBuilder::new_from_existing(without_default_state).build(); + assert!(new_state.catalog_list().catalog(&default_catalog).is_none()); + Ok(()) + } }