From 42ab37d4b47e4b129e96e1e68b783354a6d62fe5 Mon Sep 17 00:00:00 2001 From: Mike Seddon Date: Thu, 18 Feb 2021 18:03:59 +1100 Subject: [PATCH 1/2] fix tokio race condition --- rust/datafusion/src/physical_plan/repartition.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 20e7122de12..48aad1a4acc 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -125,7 +125,7 @@ impl ExecutionPlan for RepartitionExec { let input = self.input.clone(); let mut channels = channels.clone(); let partitioning = self.partitioning.clone(); - let _: JoinHandle> = tokio::spawn(async move { + let join_handle: JoinHandle> = tokio::spawn(async move { let mut stream = input.execute(i).await?; let mut counter = 0; while let Some(result) = stream.next().await { @@ -157,6 +157,7 @@ impl ExecutionPlan for RepartitionExec { } Ok(()) }); + join_handle.await.map(|_| ()).map_err(|e| DataFusionError::Execution(e.to_string()))?; } } From 59b51bed844c119b5e88fa27071f9ae292c79952 Mon Sep 17 00:00:00 2001 From: Mike Seddon Date: Fri, 19 Feb 2021 07:03:43 +1100 Subject: [PATCH 2/2] cargo fmt --- rust/datafusion/src/physical_plan/repartition.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 48aad1a4acc..edabfde27c4 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -157,7 +157,10 @@ impl ExecutionPlan for RepartitionExec { } Ok(()) }); - join_handle.await.map(|_| ()).map_err(|e| DataFusionError::Execution(e.to_string()))?; + join_handle + .await + .map(|_| ()) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; } }