Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ async fn get_table(
let options = ListingOptions::new(format)
.with_file_extension(extension)
.with_target_partitions(target_partitions)
.with_collect_stat(state.config.collect_statistics());
.with_collect_stat(state.config().collect_statistics());

let table_path = ListingTableUrl::parse(path)?;
let config = ListingTableConfig::new(table_path).with_listing_options(options);
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl ListingTableConfig {
/// Infer `ListingOptions` based on `table_path` suffix.
pub async fn infer_options(self, state: &SessionState) -> Result<Self> {
let store = state
.runtime_env
.runtime_env()
.object_store(self.table_paths.get(0).unwrap())?;

let file = self
Expand All @@ -163,7 +163,7 @@ impl ListingTableConfig {

let listing_options = ListingOptions::new(format)
.with_file_extension(file_extension)
.with_target_partitions(state.config.target_partitions());
.with_target_partitions(state.config().target_partitions());

Ok(Self {
table_paths: self.table_paths,
Expand Down Expand Up @@ -364,7 +364,7 @@ impl ListingOptions {
state: &SessionState,
table_path: &'a ListingTableUrl,
) -> Result<SchemaRef> {
let store = state.runtime_env.object_store(table_path)?;
let store = state.runtime_env().object_store(table_path)?;

let files: Vec<_> = table_path
.list_all_files(store.as_ref(), &self.file_extension)
Expand Down Expand Up @@ -622,7 +622,7 @@ impl ListingTable {
limit: Option<usize>,
) -> Result<(Vec<Vec<PartitionedFile>>, Statistics)> {
let store = ctx
.runtime_env
.runtime_env()
.object_store(self.table_paths.get(0).unwrap())?;
// list files (with partitions)
let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ impl TableProviderFactory for ListingTableFactory {
};

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config.collect_statistics())
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
.with_target_partitions(state.config.target_partitions())
.with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(None);

Expand Down
45 changes: 35 additions & 10 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1473,25 +1473,25 @@ impl SessionConfig {
#[derive(Clone)]
pub struct SessionState {
/// Uuid for the session
pub session_id: String,
session_id: String,
/// Responsible for optimizing a logical plan
pub optimizer: Optimizer,
optimizer: Optimizer,
/// Responsible for optimizing a physical execution plan
pub physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>>,
/// Responsible for planning `LogicalPlan`s, and `ExecutionPlan`
pub query_planner: Arc<dyn QueryPlanner + Send + Sync>,
query_planner: Arc<dyn QueryPlanner + Send + Sync>,
/// Collection of catalogs containing schemas and ultimately TableProviders
pub catalog_list: Arc<dyn CatalogList>,
catalog_list: Arc<dyn CatalogList>,
/// Scalar functions that are registered with the context
pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
scalar_functions: HashMap<String, Arc<ScalarUDF>>,
/// Aggregate functions registered in the context
pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
/// Session configuration
pub config: SessionConfig,
config: SessionConfig,
/// Execution properties
pub execution_props: ExecutionProps,
execution_props: ExecutionProps,
/// Runtime environment
pub runtime_env: Arc<RuntimeEnv>,
runtime_env: Arc<RuntimeEnv>,
}

impl Debug for SessionState {
Expand Down Expand Up @@ -1798,6 +1798,31 @@ impl SessionState {
.await
}

/// Return the session ID
pub fn session_id(&self) -> &str {
&self.session_id
}

/// Return the runtime env
pub fn runtime_env(&self) -> &Arc<RuntimeEnv> {
&self.runtime_env
}

/// Return the execution properties
pub fn execution_props(&self) -> &ExecutionProps {
&self.execution_props
}

/// Return the [`SessionConfig`]
pub fn config(&self) -> &SessionConfig {
&self.config
}

/// Return the physical optimizers
pub fn physical_optimizers(&self) -> &[Arc<dyn PhysicalOptimizerRule + Send + Sync>] {
&self.physical_optimizers
}

/// return the configuration options
pub fn config_options(&self) -> &ConfigOptions {
self.config.config_options()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ mod tests {
let state = session_ctx.state();

state
.runtime_env
.runtime_env()
.register_object_store("file", "", store.clone());

let testdata = crate::test_util::arrow_test_data();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ mod tests {
file_compression_type: FileCompressionType,
) -> (ObjectStoreUrl, Vec<Vec<PartitionedFile>>, SchemaRef) {
let store_url = ObjectStoreUrl::local_filesystem();
let store = state.runtime_env.object_store(&store_url).unwrap();
let store = state.runtime_env().object_store(&store_url).unwrap();

let filename = "1.json";
let file_groups = partitioned_file_groups(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1401,7 +1401,7 @@ mod tests {
let task_ctx = session_ctx.task_ctx();

let object_store_url = ObjectStoreUrl::local_filesystem();
let store = state.runtime_env.object_store(&object_store_url).unwrap();
let store = state.runtime_env().object_store(&object_store_url).unwrap();

let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/alltypes_plain.parquet", testdata);
Expand Down
45 changes: 21 additions & 24 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
expr,
input_dfschema,
input_schema,
&session_state.execution_props,
session_state.execution_props(),
)
}
}
Expand Down Expand Up @@ -527,8 +527,8 @@ impl DefaultPhysicalPlanner {
let partition_keys = window_expr_common_partition_keys(window_expr)?;

let can_repartition = !partition_keys.is_empty()
&& session_state.config.target_partitions() > 1
&& session_state.config.repartition_window_functions();
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_window_functions();

let physical_partition_keys = if can_repartition
{
Expand Down Expand Up @@ -596,7 +596,7 @@ impl DefaultPhysicalPlanner {
descending: !*asc,
nulls_first: *nulls_first,
},
&session_state.execution_props,
session_state.execution_props(),
),
_ => unreachable!(),
})
Expand All @@ -612,7 +612,7 @@ impl DefaultPhysicalPlanner {
e,
logical_input_schema,
&physical_input_schema,
&session_state.execution_props,
session_state.execution_props(),
)
})
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -649,7 +649,7 @@ impl DefaultPhysicalPlanner {
e,
logical_input_schema,
&physical_input_schema,
&session_state.execution_props,
session_state.execution_props(),
)
})
.collect::<Result<Vec<_>>>()?;
Expand All @@ -666,8 +666,8 @@ impl DefaultPhysicalPlanner {
let final_group: Vec<Arc<dyn PhysicalExpr>> = initial_aggr.output_group_expr();

let can_repartition = !groups.is_empty()
&& session_state.config.target_partitions() > 1
&& session_state.config.repartition_aggregations();
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_aggregations();

let (initial_aggr, next_partition_mode): (
Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -833,7 +833,7 @@ impl DefaultPhysicalPlanner {
descending: !*asc,
nulls_first: *nulls_first,
},
&session_state.execution_props,
session_state.execution_props(),
),
_ => Err(DataFusionError::Plan(
"Sort only accepts sort expressions".to_string(),
Expand Down Expand Up @@ -982,7 +982,7 @@ impl DefaultPhysicalPlanner {
expr,
&filter_df_schema,
&filter_schema,
&session_state.execution_props,
session_state.execution_props(),
)?;
let column_indices = join_utils::JoinFilter::build_column_indices(left_field_indices, right_field_indices);

Expand All @@ -995,7 +995,7 @@ impl DefaultPhysicalPlanner {
_ => None
};

let prefer_hash_join = session_state.config.config_options()
let prefer_hash_join = session_state.config().config_options()
.get_bool(OPT_PREFER_HASH_JOIN)
.unwrap_or_default();
if join_on.is_empty() {
Expand All @@ -1007,8 +1007,8 @@ impl DefaultPhysicalPlanner {
join_filter,
join_type,
)?))
} else if session_state.config.target_partitions() > 1
&& session_state.config.repartition_joins()
} else if session_state.config().target_partitions() > 1
&& session_state.config().repartition_joins()
&& !prefer_hash_join
{
// Use SortMergeJoin if hash join is not preferred
Expand All @@ -1027,11 +1027,11 @@ impl DefaultPhysicalPlanner {
*null_equals_null,
)?))
}
} else if session_state.config.target_partitions() > 1
&& session_state.config.repartition_joins()
} else if session_state.config().target_partitions() > 1
&& session_state.config().repartition_joins()
&& prefer_hash_join {
let partition_mode = {
if session_state.config.collect_statistics() {
if session_state.config().collect_statistics() {
PartitionMode::Auto
} else {
PartitionMode::Partitioned
Expand Down Expand Up @@ -1454,7 +1454,7 @@ fn get_null_physical_expr_pair(
expr,
input_dfschema,
input_schema,
&session_state.execution_props,
session_state.execution_props(),
)?;
let physical_name = physical_name(&expr.clone())?;

Expand All @@ -1475,7 +1475,7 @@ fn get_physical_expr_pair(
expr,
input_dfschema,
input_schema,
&session_state.execution_props,
session_state.execution_props(),
)?;
let physical_name = physical_name(expr)?;
Ok((physical_expr, physical_name))
Expand Down Expand Up @@ -1716,7 +1716,6 @@ impl DefaultPhysicalPlanner {
let mut stringified_plans = vec![];

if !session_state
.config
.config_options()
.get_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY)
.unwrap_or_default()
Expand All @@ -1727,7 +1726,6 @@ impl DefaultPhysicalPlanner {
}

if !session_state
.config
.config_options()
.get_bool(OPT_EXPLAIN_LOGICAL_PLAN_ONLY)
.unwrap_or_default()
Expand Down Expand Up @@ -1773,7 +1771,7 @@ impl DefaultPhysicalPlanner {
where
F: FnMut(&dyn ExecutionPlan, &dyn PhysicalOptimizerRule),
{
let optimizers = &session_state.physical_optimizers;
let optimizers = session_state.physical_optimizers();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite a peculiar API, I'm not sure why the physical planner would be calling the optimizers and not SessionState itself

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may predate when the list of optimizers was on the session state 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok - I will add it to my list

debug!(
"Input physical plan:\n{}\n",
displayable(plan.as_ref()).indent()
Expand Down Expand Up @@ -1844,15 +1842,14 @@ mod tests {

fn make_session_state() -> SessionState {
let runtime = Arc::new(RuntimeEnv::default());
let config = SessionConfig::new();
let config = SessionConfig::new().with_target_partitions(4);
// TODO we should really test that no optimizer rules are failing here
// let config = config.set_bool(crate::config::OPT_OPTIMIZER_SKIP_FAILED_RULES, false);
SessionState::with_config_rt(config, runtime)
}

async fn plan(logical_plan: &LogicalPlan) -> Result<Arc<dyn ExecutionPlan>> {
let mut session_state = make_session_state();
session_state.config = session_state.config.with_target_partitions(4);
let session_state = make_session_state();
// optimize the logical plan
let logical_plan = session_state.optimize(logical_plan)?;
let planner = DefaultPhysicalPlanner::default();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tokio_stream::StreamExt;

async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec {
let object_store_url = ObjectStoreUrl::local_filesystem();
let store = state.runtime_env.object_store(&object_store_url).unwrap();
let store = state.runtime_env().object_store(&object_store_url).unwrap();

let testdata = datafusion::test_util::parquet_test_data();
let filename = format!("{}/alltypes_tiny_pages.parquet", testdata);
Expand Down