Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions rust/datafusion/src/physical_plan/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,4 +415,32 @@ mod tests {
}
Ok(output_partitions)
}

#[tokio::test(flavor = "multi_thread")]
Copy link
Contributor

@alamb alamb Mar 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @seddonm1 -- as you say this test hangs when I comment out the call to

tokio::task::yield_now().await;

async fn many_to_many_round_robin_within_tokio_task() -> Result<()> {
let join_handle: JoinHandle<Result<Vec<Vec<RecordBatch>>>> =
tokio::spawn(async move {
// define input partitions
let schema = test_schema();
let partition = create_vec_batches(&schema, 50);
let partitions =
vec![partition.clone(), partition.clone(), partition.clone()];

// repartition from 3 input to 5 output
repartition(&schema, partitions, Partitioning::RoundRobinBatch(5)).await
});

let output_partitions = join_handle
.await
.map_err(|e| DataFusionError::Internal(e.to_string()))??;

assert_eq!(5, output_partitions.len());
assert_eq!(30, output_partitions[0].len());
assert_eq!(30, output_partitions[1].len());
assert_eq!(30, output_partitions[2].len());
assert_eq!(30, output_partitions[3].len());
assert_eq!(30, output_partitions[4].len());

Ok(())
}
}