From 44e6cf8cab16adc0bdb8735f5b353f2c0270bf66 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 28 Dec 2022 20:18:49 +0000 Subject: [PATCH 1/2] Make SessionState members private --- benchmarks/src/bin/tpch.rs | 2 +- .../core/src/datasource/listing/table.rs | 8 ++-- .../src/datasource/listing_table_factory.rs | 4 +- datafusion/core/src/execution/context.rs | 45 ++++++++++++++----- .../src/physical_plan/file_format/json.rs | 2 +- .../src/physical_plan/file_format/parquet.rs | 2 +- datafusion/core/src/physical_plan/planner.rs | 45 +++++++++---------- datafusion/core/tests/parquet/page_pruning.rs | 2 +- 8 files changed, 66 insertions(+), 44 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 8a78c357b951e..b1e4635cb40d2 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -408,7 +408,7 @@ async fn get_table( let options = ListingOptions::new(format) .with_file_extension(extension) .with_target_partitions(target_partitions) - .with_collect_stat(state.config.collect_statistics()); + .with_collect_stat(state.config().collect_statistics()); let table_path = ListingTableUrl::parse(path)?; let config = ListingTableConfig::new(table_path).with_listing_options(options); diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 26ee98ea4ae87..2ee057fd1f7ce 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -146,7 +146,7 @@ impl ListingTableConfig { /// Infer `ListingOptions` based on `table_path` suffix. pub async fn infer_options(self, state: &SessionState) -> Result { let store = state - .runtime_env + .runtime_env() .object_store(self.table_paths.get(0).unwrap())?; let file = self @@ -163,7 +163,7 @@ impl ListingTableConfig { let listing_options = ListingOptions::new(format) .with_file_extension(file_extension) - .with_target_partitions(state.config.target_partitions()); + .with_target_partitions(state.config().target_partitions()); Ok(Self { table_paths: self.table_paths, @@ -364,7 +364,7 @@ impl ListingOptions { state: &SessionState, table_path: &'a ListingTableUrl, ) -> Result { - let store = state.runtime_env.object_store(table_path)?; + let store = state.runtime_env().object_store(table_path)?; let files: Vec<_> = table_path .list_all_files(store.as_ref(), &self.file_extension) @@ -622,7 +622,7 @@ impl ListingTable { limit: Option, ) -> Result<(Vec>, Statistics)> { let store = ctx - .runtime_env + .runtime_env() .object_store(self.table_paths.get(0).unwrap())?; // list files (with partitions) let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| { diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index b48a1058aeb65..fe4393cb29a5a 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -117,9 +117,9 @@ impl TableProviderFactory for ListingTableFactory { }; let options = ListingOptions::new(file_format) - .with_collect_stat(state.config.collect_statistics()) + .with_collect_stat(state.config().collect_statistics()) .with_file_extension(file_extension) - .with_target_partitions(state.config.target_partitions()) + .with_target_partitions(state.config().target_partitions()) .with_table_partition_cols(table_partition_cols) .with_file_sort_order(None); diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index db6ab99fb5203..a4d42e1300c43 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1473,25 +1473,25 @@ impl SessionConfig { #[derive(Clone)] pub struct SessionState { /// Uuid for the session - pub session_id: String, + session_id: String, /// Responsible for optimizing a logical plan - pub optimizer: Optimizer, + optimizer: Optimizer, /// Responsible for optimizing a physical execution plan - pub physical_optimizers: Vec>, + physical_optimizers: Vec>, /// Responsible for planning `LogicalPlan`s, and `ExecutionPlan` - pub query_planner: Arc, + query_planner: Arc, /// Collection of catalogs containing schemas and ultimately TableProviders - pub catalog_list: Arc, + catalog_list: Arc, /// Scalar functions that are registered with the context - pub scalar_functions: HashMap>, + scalar_functions: HashMap>, /// Aggregate functions registered in the context - pub aggregate_functions: HashMap>, + aggregate_functions: HashMap>, /// Session configuration - pub config: SessionConfig, + config: SessionConfig, /// Execution properties - pub execution_props: ExecutionProps, + execution_props: ExecutionProps, /// Runtime environment - pub runtime_env: Arc, + runtime_env: Arc, } impl Debug for SessionState { @@ -1798,6 +1798,31 @@ impl SessionState { .await } + /// Return the session ID + pub fn session_id(&self) -> &str { + &self.session_id + } + + /// Return the runtime env + pub fn runtime_env(&self) -> &Arc { + &self.runtime_env + } + + /// Return the execution properties + pub fn execution_props(&self) -> &ExecutionProps { + &self.execution_props + } + + /// Return the [`SessionConfig`] + pub fn config(&self) -> &SessionConfig { + &self.config + } + + /// Return the physical optimizers + pub fn physical_optimizers(&self) -> &[Arc] { + &self.physical_optimizers + } + /// return the configuration options pub fn config_options(&self) -> &ConfigOptions { self.config.config_options() diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index a4be015c51a0e..78a0000783a12 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -271,7 +271,7 @@ mod tests { file_compression_type: FileCompressionType, ) -> (ObjectStoreUrl, Vec>, SchemaRef) { let store_url = ObjectStoreUrl::local_filesystem(); - let store = state.runtime_env.object_store(&store_url).unwrap(); + let store = state.runtime_env().object_store(&store_url).unwrap(); let filename = "1.json"; let file_groups = partitioned_file_groups( diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index a8d3002a907b2..b54854fb8394c 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1401,7 +1401,7 @@ mod tests { let task_ctx = session_ctx.task_ctx(); let object_store_url = ObjectStoreUrl::local_filesystem(); - let store = state.runtime_env.object_store(&object_store_url).unwrap(); + let store = state.runtime_env().object_store(&object_store_url).unwrap(); let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/alltypes_plain.parquet", testdata); diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 6aa3f627df18e..5b001f01678ec 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -448,7 +448,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { expr, input_dfschema, input_schema, - &session_state.execution_props, + session_state.execution_props(), ) } } @@ -527,8 +527,8 @@ impl DefaultPhysicalPlanner { let partition_keys = window_expr_common_partition_keys(window_expr)?; let can_repartition = !partition_keys.is_empty() - && session_state.config.target_partitions() > 1 - && session_state.config.repartition_window_functions(); + && session_state.config().target_partitions() > 1 + && session_state.config().repartition_window_functions(); let physical_partition_keys = if can_repartition { @@ -596,7 +596,7 @@ impl DefaultPhysicalPlanner { descending: !*asc, nulls_first: *nulls_first, }, - &session_state.execution_props, + session_state.execution_props(), ), _ => unreachable!(), }) @@ -612,7 +612,7 @@ impl DefaultPhysicalPlanner { e, logical_input_schema, &physical_input_schema, - &session_state.execution_props, + session_state.execution_props(), ) }) .collect::>>()?; @@ -649,7 +649,7 @@ impl DefaultPhysicalPlanner { e, logical_input_schema, &physical_input_schema, - &session_state.execution_props, + session_state.execution_props(), ) }) .collect::>>()?; @@ -666,8 +666,8 @@ impl DefaultPhysicalPlanner { let final_group: Vec> = initial_aggr.output_group_expr(); let can_repartition = !groups.is_empty() - && session_state.config.target_partitions() > 1 - && session_state.config.repartition_aggregations(); + && session_state.config().target_partitions() > 1 + && session_state.config().repartition_aggregations(); let (initial_aggr, next_partition_mode): ( Arc, @@ -833,7 +833,7 @@ impl DefaultPhysicalPlanner { descending: !*asc, nulls_first: *nulls_first, }, - &session_state.execution_props, + session_state.execution_props(), ), _ => Err(DataFusionError::Plan( "Sort only accepts sort expressions".to_string(), @@ -982,7 +982,7 @@ impl DefaultPhysicalPlanner { expr, &filter_df_schema, &filter_schema, - &session_state.execution_props, + session_state.execution_props(), )?; let column_indices = join_utils::JoinFilter::build_column_indices(left_field_indices, right_field_indices); @@ -995,7 +995,7 @@ impl DefaultPhysicalPlanner { _ => None }; - let prefer_hash_join = session_state.config.config_options() + let prefer_hash_join = session_state.config().config_options() .get_bool(OPT_PREFER_HASH_JOIN) .unwrap_or_default(); if join_on.is_empty() { @@ -1007,8 +1007,8 @@ impl DefaultPhysicalPlanner { join_filter, join_type, )?)) - } else if session_state.config.target_partitions() > 1 - && session_state.config.repartition_joins() + } else if session_state.config().target_partitions() > 1 + && session_state.config().repartition_joins() && !prefer_hash_join { // Use SortMergeJoin if hash join is not preferred @@ -1027,11 +1027,11 @@ impl DefaultPhysicalPlanner { *null_equals_null, )?)) } - } else if session_state.config.target_partitions() > 1 - && session_state.config.repartition_joins() + } else if session_state.config().target_partitions() > 1 + && session_state.config().repartition_joins() && prefer_hash_join { let partition_mode = { - if session_state.config.collect_statistics() { + if session_state.config().collect_statistics() { PartitionMode::Auto } else { PartitionMode::Partitioned @@ -1454,7 +1454,7 @@ fn get_null_physical_expr_pair( expr, input_dfschema, input_schema, - &session_state.execution_props, + session_state.execution_props(), )?; let physical_name = physical_name(&expr.clone())?; @@ -1475,7 +1475,7 @@ fn get_physical_expr_pair( expr, input_dfschema, input_schema, - &session_state.execution_props, + session_state.execution_props(), )?; let physical_name = physical_name(expr)?; Ok((physical_expr, physical_name)) @@ -1716,7 +1716,6 @@ impl DefaultPhysicalPlanner { let mut stringified_plans = vec![]; if !session_state - .config .config_options() .get_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY) .unwrap_or_default() @@ -1727,7 +1726,6 @@ impl DefaultPhysicalPlanner { } if !session_state - .config .config_options() .get_bool(OPT_EXPLAIN_LOGICAL_PLAN_ONLY) .unwrap_or_default() @@ -1773,7 +1771,7 @@ impl DefaultPhysicalPlanner { where F: FnMut(&dyn ExecutionPlan, &dyn PhysicalOptimizerRule), { - let optimizers = &session_state.physical_optimizers; + let optimizers = session_state.physical_optimizers(); debug!( "Input physical plan:\n{}\n", displayable(plan.as_ref()).indent() @@ -1844,15 +1842,14 @@ mod tests { fn make_session_state() -> SessionState { let runtime = Arc::new(RuntimeEnv::default()); - let config = SessionConfig::new(); + let config = SessionConfig::new().with_target_partitions(4); // TODO we should really test that no optimizer rules are failing here // let config = config.set_bool(crate::config::OPT_OPTIMIZER_SKIP_FAILED_RULES, false); SessionState::with_config_rt(config, runtime) } async fn plan(logical_plan: &LogicalPlan) -> Result> { - let mut session_state = make_session_state(); - session_state.config = session_state.config.with_target_partitions(4); + let session_state = make_session_state(); // optimize the logical plan let logical_plan = session_state.optimize(logical_plan)?; let planner = DefaultPhysicalPlanner::default(); diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index e62124a333fc3..ddbe6f28fa693 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -33,7 +33,7 @@ use tokio_stream::StreamExt; async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { let object_store_url = ObjectStoreUrl::local_filesystem(); - let store = state.runtime_env.object_store(&object_store_url).unwrap(); + let store = state.runtime_env().object_store(&object_store_url).unwrap(); let testdata = datafusion::test_util::parquet_test_data(); let filename = format!("{}/alltypes_tiny_pages.parquet", testdata); From b7441c1e2cf77657e4f1d62eb23995d7bac798b2 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 28 Dec 2022 20:39:23 +0000 Subject: [PATCH 2/2] Fix avro --- datafusion/core/src/physical_plan/file_format/avro.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index ab522dc94a53c..3d0b606987b16 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -239,7 +239,7 @@ mod tests { let state = session_ctx.state(); state - .runtime_env + .runtime_env() .register_object_store("file", "", store.clone()); let testdata = crate::test_util::arrow_test_data();