Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,7 @@ impl DataFrame {
}

/// Create a physical plan
pub async fn create_physical_plan(mut self) -> Result<Arc<dyn ExecutionPlan>> {
self.create_physical_plan_impl().await
}

/// Temporary pending #4626
Copy link
Contributor Author

@tustvold tustvold Dec 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I no longer intend to do this, as it creates far too much churn to justify it, this PR instead opts to workaround the issue by punting responsibility for setting the execution start time onto the caller if they're using the lower-level APIs.

async fn create_physical_plan_impl(&mut self) -> Result<Arc<dyn ExecutionPlan>> {
self.session_state.execution_props.start_execution();
pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>> {
self.session_state.create_physical_plan(&self.plan).await
}

Expand Down Expand Up @@ -627,24 +621,24 @@ impl DataFrame {
}

/// Write a `DataFrame` to a CSV file.
pub async fn write_csv(mut self, path: &str) -> Result<()> {
let plan = self.create_physical_plan_impl().await?;
pub async fn write_csv(self, path: &str) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
plan_to_csv(&self.session_state, plan, path).await
}

/// Write a `DataFrame` to a Parquet file.
pub async fn write_parquet(
mut self,
self,
path: &str,
writer_properties: Option<WriterProperties>,
) -> Result<()> {
let plan = self.create_physical_plan_impl().await?;
let plan = self.session_state.create_physical_plan(&self.plan).await?;
plan_to_parquet(&self.session_state, plan, path, writer_properties).await
}

/// Executes a query and writes the results to a partitioned JSON file.
pub async fn write_json(mut self, path: impl AsRef<str>) -> Result<()> {
let plan = self.create_physical_plan_impl().await?;
pub async fn write_json(self, path: impl AsRef<str>) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
plan_to_json(&self.session_state, plan, path).await
}

Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ impl TableProvider for ViewTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
// clone state and start_execution so that now() works in views
let mut state_cloned = state.clone();
state_cloned.execution_props.start_execution();
let plan = if let Some(projection) = projection {
// avoiding adding a redundant projection (e.g. SELECT * FROM view)
let current_projection =
Expand Down Expand Up @@ -144,7 +141,7 @@ impl TableProvider for ViewTable {
plan = plan.limit(0, Some(limit))?;
}

state_cloned.create_physical_plan(&plan.build()?).await
state.create_physical_plan(&plan.build()?).await
}
}

Expand Down
25 changes: 6 additions & 19 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1000,23 +1000,7 @@ impl SessionContext {
&self,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let state_cloned = {
let mut state = self.state.write();
state.execution_props.start_execution();

// We need to clone `state` to release the lock that is not `Send`. We could
// make the lock `Send` by using `tokio::sync::Mutex`, but that would require to
// propagate async even to the `LogicalPlan` building methods.
// Cloning `state` here is fine as we then pass it as immutable `&state`, which
// means that we avoid write consistency issues as the cloned version will not
// be written to. As for eventual modifications that would be applied to the
// original state after it has been cloned, they will not be picked up by the
// clone but that is okay, as it is equivalent to postponing the state update
// by keeping the lock until the end of the function scope.
state.clone()
};

state_cloned.create_physical_plan(logical_plan).await
self.state().create_physical_plan(logical_plan).await
}

/// Executes a query and writes the results to a partitioned CSV file.
Expand Down Expand Up @@ -1055,9 +1039,12 @@ impl SessionContext {
Arc::new(TaskContext::from(self))
}

/// Get a copy of the [`SessionState`] of this [`SessionContext`]
/// Snapshots the [`SessionState`] of this [`SessionContext`] setting the
/// `query_execution_start_time` to the current time
pub fn state(&self) -> SessionState {
self.state.read().clone()
let mut state = self.state.read().clone();
state.execution_props.start_execution();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice change 👍 make sense

state
}
}

Expand Down
8 changes: 5 additions & 3 deletions datafusion/physical-expr/src/execution_props.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::var_provider::{VarProvider, VarType};
use chrono::{DateTime, Utc};
use chrono::{DateTime, TimeZone, Utc};
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -44,14 +44,16 @@ impl ExecutionProps {
/// Creates a new execution props
pub fn new() -> Self {
ExecutionProps {
query_execution_start_time: chrono::Utc::now(),
// Set this to a fixed sentinel to make it obvious if this is
// not being updated / propagated correctly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

query_execution_start_time: Utc.timestamp_nanos(0),
var_providers: None,
}
}

/// Marks the execution of query started timestamp
pub fn start_execution(&mut self) -> &Self {
self.query_execution_start_time = chrono::Utc::now();
self.query_execution_start_time = Utc::now();
&*self
}

Expand Down