diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 60d79490cf6fb..0d6bae09aedb5 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -87,13 +87,7 @@ impl DataFrame { } /// Create a physical plan - pub async fn create_physical_plan(mut self) -> Result> { - self.create_physical_plan_impl().await - } - - /// Temporary pending #4626 - async fn create_physical_plan_impl(&mut self) -> Result> { - self.session_state.execution_props.start_execution(); + pub async fn create_physical_plan(self) -> Result> { self.session_state.create_physical_plan(&self.plan).await } @@ -627,24 +621,24 @@ impl DataFrame { } /// Write a `DataFrame` to a CSV file. - pub async fn write_csv(mut self, path: &str) -> Result<()> { - let plan = self.create_physical_plan_impl().await?; + pub async fn write_csv(self, path: &str) -> Result<()> { + let plan = self.session_state.create_physical_plan(&self.plan).await?; plan_to_csv(&self.session_state, plan, path).await } /// Write a `DataFrame` to a Parquet file. pub async fn write_parquet( - mut self, + self, path: &str, writer_properties: Option, ) -> Result<()> { - let plan = self.create_physical_plan_impl().await?; + let plan = self.session_state.create_physical_plan(&self.plan).await?; plan_to_parquet(&self.session_state, plan, path, writer_properties).await } /// Executes a query and writes the results to a partitioned JSON file. - pub async fn write_json(mut self, path: impl AsRef) -> Result<()> { - let plan = self.create_physical_plan_impl().await?; + pub async fn write_json(self, path: impl AsRef) -> Result<()> { + let plan = self.session_state.create_physical_plan(&self.plan).await?; plan_to_json(&self.session_state, plan, path).await } diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 271051096dc94..4fae03fad7175 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -108,9 +108,6 @@ impl TableProvider for ViewTable { filters: &[Expr], limit: Option, ) -> Result> { - // clone state and start_execution so that now() works in views - let mut state_cloned = state.clone(); - state_cloned.execution_props.start_execution(); let plan = if let Some(projection) = projection { // avoiding adding a redundant projection (e.g. SELECT * FROM view) let current_projection = @@ -144,7 +141,7 @@ impl TableProvider for ViewTable { plan = plan.limit(0, Some(limit))?; } - state_cloned.create_physical_plan(&plan.build()?).await + state.create_physical_plan(&plan.build()?).await } } diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index fcfcf1ca69035..9c909d5d69c59 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1000,23 +1000,7 @@ impl SessionContext { &self, logical_plan: &LogicalPlan, ) -> Result> { - let state_cloned = { - let mut state = self.state.write(); - state.execution_props.start_execution(); - - // We need to clone `state` to release the lock that is not `Send`. We could - // make the lock `Send` by using `tokio::sync::Mutex`, but that would require to - // propagate async even to the `LogicalPlan` building methods. - // Cloning `state` here is fine as we then pass it as immutable `&state`, which - // means that we avoid write consistency issues as the cloned version will not - // be written to. As for eventual modifications that would be applied to the - // original state after it has been cloned, they will not be picked up by the - // clone but that is okay, as it is equivalent to postponing the state update - // by keeping the lock until the end of the function scope. - state.clone() - }; - - state_cloned.create_physical_plan(logical_plan).await + self.state().create_physical_plan(logical_plan).await } /// Executes a query and writes the results to a partitioned CSV file. @@ -1055,9 +1039,12 @@ impl SessionContext { Arc::new(TaskContext::from(self)) } - /// Get a copy of the [`SessionState`] of this [`SessionContext`] + /// Snapshots the [`SessionState`] of this [`SessionContext`] setting the + /// `query_execution_start_time` to the current time pub fn state(&self) -> SessionState { - self.state.read().clone() + let mut state = self.state.read().clone(); + state.execution_props.start_execution(); + state } } diff --git a/datafusion/physical-expr/src/execution_props.rs b/datafusion/physical-expr/src/execution_props.rs index 2e68206668eaa..800e355831b77 100644 --- a/datafusion/physical-expr/src/execution_props.rs +++ b/datafusion/physical-expr/src/execution_props.rs @@ -16,7 +16,7 @@ // under the License. use crate::var_provider::{VarProvider, VarType}; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, TimeZone, Utc}; use std::collections::HashMap; use std::sync::Arc; @@ -44,14 +44,16 @@ impl ExecutionProps { /// Creates a new execution props pub fn new() -> Self { ExecutionProps { - query_execution_start_time: chrono::Utc::now(), + // Set this to a fixed sentinel to make it obvious if this is + // not being updated / propagated correctly + query_execution_start_time: Utc.timestamp_nanos(0), var_providers: None, } } /// Marks the execution of query started timestamp pub fn start_execution(&mut self) -> &Self { - self.query_execution_start_time = chrono::Utc::now(); + self.query_execution_start_time = Utc::now(); &*self }