From 69248bc7260e2b350790b60fff27a8ea9fdd4132 Mon Sep 17 00:00:00 2001 From: Mike Seddon Date: Mon, 1 Mar 2021 11:34:05 +1100 Subject: [PATCH] add test for tokio tasks within tasks --- .../src/physical_plan/repartition.rs | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/rust/datafusion/src/physical_plan/repartition.rs b/rust/datafusion/src/physical_plan/repartition.rs index 94c3aab64e1..12b29cbeabe 100644 --- a/rust/datafusion/src/physical_plan/repartition.rs +++ b/rust/datafusion/src/physical_plan/repartition.rs @@ -415,4 +415,32 @@ mod tests { } Ok(output_partitions) } + + #[tokio::test(flavor = "multi_thread")] + async fn many_to_many_round_robin_within_tokio_task() -> Result<()> { + let join_handle: JoinHandle>>> = + tokio::spawn(async move { + // define input partitions + let schema = test_schema(); + let partition = create_vec_batches(&schema, 50); + let partitions = + vec![partition.clone(), partition.clone(), partition.clone()]; + + // repartition from 3 input to 5 output + repartition(&schema, partitions, Partitioning::RoundRobinBatch(5)).await + }); + + let output_partitions = join_handle + .await + .map_err(|e| DataFusionError::Internal(e.to_string()))??; + + assert_eq!(5, output_partitions.len()); + assert_eq!(30, output_partitions[0].len()); + assert_eq!(30, output_partitions[1].len()); + assert_eq!(30, output_partitions[2].len()); + assert_eq!(30, output_partitions[3].len()); + assert_eq!(30, output_partitions[4].len()); + + Ok(()) + } }