Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 90 additions & 8 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -987,8 +987,24 @@ impl SessionStateBuilder {

/// Returns a new [SessionStateBuilder] based on an existing [SessionState]
/// The session id for the new builder will be unset; all other fields will
/// be cloned from what is set in the provided session state
/// be cloned from what is set in the provided session state. If the default
/// catalog exists in existing session state, the new session state will not
/// create default catalog and schema.
pub fn new_from_existing(existing: SessionState) -> Self {
let default_catalog_exist = existing
.catalog_list()
.catalog(&existing.config.options().catalog.default_catalog)
.is_some();
// The new `with_create_default_catalog_and_schema` should be false if the default catalog exists
let create_default_catalog_and_schema = existing
.config
.options()
.catalog
.create_default_catalog_and_schema
&& !default_catalog_exist;
let new_config = existing
.config
.with_create_default_catalog_and_schema(create_default_catalog_and_schema);
Self {
session_id: None,
analyzer: Some(existing.analyzer),
Expand All @@ -1005,7 +1021,7 @@ impl SessionStateBuilder {
window_functions: Some(existing.window_functions.into_values().collect_vec()),
serializer_registry: Some(existing.serializer_registry),
file_formats: Some(existing.file_formats.into_values().collect_vec()),
config: Some(existing.config),
config: Some(new_config),
table_options: Some(existing.table_options),
execution_props: Some(existing.execution_props),
table_factories: Some(existing.table_factories),
Expand Down Expand Up @@ -1801,17 +1817,19 @@ impl<'a> SimplifyInfo for SessionSimplifyProvider<'a> {

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use super::{SessionContextProvider, SessionStateBuilder};
use crate::catalog_common::MemoryCatalogProviderList;
use crate::datasource::MemTable;
use crate::execution::context::SessionState;
use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use datafusion_common::DFSchema;
use datafusion_common::Result;
use datafusion_execution::config::SessionConfig;
use datafusion_expr::Expr;
use datafusion_sql::planner::{PlannerContext, SqlToRel};

use crate::execution::context::SessionState;

use super::{SessionContextProvider, SessionStateBuilder};
use std::collections::HashMap;
use std::sync::Arc;

#[test]
fn test_session_state_with_default_features() {
Expand Down Expand Up @@ -1841,4 +1859,68 @@ mod tests {

assert!(sql_to_expr(&state).is_err())
}

#[test]
fn test_from_existing() -> Result<()> {
fn employee_batch() -> RecordBatch {
let name: ArrayRef =
Arc::new(StringArray::from_iter_values(["Andy", "Andrew"]));
let age: ArrayRef = Arc::new(Int32Array::from(vec![11, 22]));
RecordBatch::try_from_iter(vec![("name", name), ("age", age)]).unwrap()
}
let batch = employee_batch();
let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;

let session_state = SessionStateBuilder::new()
.with_catalog_list(Arc::new(MemoryCatalogProviderList::new()))
.build();
let table_ref = session_state.resolve_table_ref("employee").to_string();
session_state
.schema_for_ref(&table_ref)?
.register_table("employee".to_string(), Arc::new(table))?;

let default_catalog = session_state
.config
.options()
.catalog
.default_catalog
.clone();
let default_schema = session_state
.config
.options()
.catalog
.default_schema
.clone();
let is_exist = session_state
.catalog_list()
.catalog(default_catalog.as_str())
.unwrap()
.schema(default_schema.as_str())
.unwrap()
.table_exist("employee");
assert!(is_exist);
let new_state = SessionStateBuilder::new_from_existing(session_state).build();
assert!(new_state
.catalog_list()
.catalog(default_catalog.as_str())
.unwrap()
.schema(default_schema.as_str())
.unwrap()
.table_exist("employee"));

// if `with_create_default_catalog_and_schema` is disabled, the new one shouldn't create default catalog and schema
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this test locally without the code changes in this PR and the test fails (as expected) thus verifying test coverage for the new code 👍


assertion failed: new_state.catalog_list().catalog(default_catalog.as_str()).unwrap().schema(default_schema.as_str()).unwrap().table_exist("employee")
thread 'execution::session_state::tests::test_from_existing' panicked at datafusion/core/src/execution/session_state.rs:1887:9:
assertion failed: new_state.catalog_list().catalog(default_catalog.as_str()).unwrap().schema(default_schema.as_str()).unwrap().table_exist("employee")
stack backtrace:
   0: rust_begin_unwind
             at /rustc/3f5fd8dd41153bc5fdca9427e9e05be2c767ba23/library/std/src/panicking.rs:652:5
   1: core::panicking::panic_fmt
             at /rustc/3f5fd8dd41153bc5fdca9427e9e05be2c767ba23/library/core/src/panicking.rs:72:14
   2: core::panicking::panic
             at /rustc/3f5fd8dd41153bc5fdca9427e9e05be2c767ba23/library/core/src/panicking.rs:146:5
   3: datafusion::execution::session_state::tests::test_from_existing
             at ./src/execution/session_state.rs:1887:9
   4: datafusion::execution::session_state::tests::test_from_existing::{{closure}}
             at ./src/execution/session_state.rs:1848:32
   5: core::ops::function::FnOnce::call_once
             at /rustc/3f5fd8dd41153bc5fdca9427e9e05be2c767ba23/library/core/src/ops/function.rs:250:5
   6: core::ops::function::FnOnce::call_once
             at /rustc/3f5fd8dd41153bc5fdca9427e9e05be2c767ba23/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

let disable_create_default =
SessionConfig::default().with_create_default_catalog_and_schema(false);
let without_default_state = SessionStateBuilder::new()
.with_config(disable_create_default)
.build();
assert!(without_default_state
.catalog_list()
.catalog(&default_catalog)
.is_none());
let new_state =
SessionStateBuilder::new_from_existing(without_default_state).build();
assert!(new_state.catalog_list().catalog(&default_catalog).is_none());
Ok(())
}
}