From 48655f1e7140d9044bf112ee74857f0506f955fd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 10 Oct 2020 12:50:18 -0600 Subject: [PATCH 1/2] MemTable::load() now loads partitions in parallel --- rust/datafusion/src/datasource/memory.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs index de1c31918549..b17ca846061f 100644 --- a/rust/datafusion/src/datasource/memory.rs +++ b/rust/datafusion/src/datasource/memory.rs @@ -30,6 +30,8 @@ use crate::error::{ExecutionError, Result}; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::ExecutionPlan; +use tokio::task::{self, JoinHandle}; + /// In-memory table pub struct MemTable { schema: SchemaRef, @@ -59,13 +61,22 @@ impl MemTable { pub async fn load(t: &dyn TableProvider, batch_size: usize) -> Result { let schema = t.schema(); let exec = t.scan(&None, batch_size)?; + let partition_count = exec.output_partitioning().partition_count(); + + let mut tasks = Vec::with_capacity(partition_count); + for partition in 0..partition_count { + let exec = exec.clone(); + let task: JoinHandle>> = task::spawn(async move { + let it = exec.execute(partition).await?; + Ok(it.into_iter().collect::>>()?) + }); + tasks.push(task) + } - let mut data: Vec> = - Vec::with_capacity(exec.output_partitioning().partition_count()); - for partition in 0..exec.output_partitioning().partition_count() { - let it = exec.execute(partition).await?; - let partition_batches = it.into_iter().collect::>>()?; - data.push(partition_batches); + let mut data: Vec> = Vec::with_capacity(partition_count); + for task in tasks { + let result = task.await.unwrap()?; + data.push(result); } MemTable::new(schema.clone(), data) From e5d3032b988340a824ba8bd11c4143438523abc6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 10 Oct 2020 15:20:15 -0600 Subject: [PATCH 2/2] address feedback --- rust/datafusion/src/datasource/memory.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs index b17ca846061f..b454315ad906 100644 --- a/rust/datafusion/src/datasource/memory.rs +++ b/rust/datafusion/src/datasource/memory.rs @@ -68,14 +68,16 @@ impl MemTable { let exec = exec.clone(); let task: JoinHandle>> = task::spawn(async move { let it = exec.execute(partition).await?; - Ok(it.into_iter().collect::>>()?) + it.into_iter() + .collect::>>() + .map_err(ExecutionError::from) }); tasks.push(task) } let mut data: Vec> = Vec::with_capacity(partition_count); for task in tasks { - let result = task.await.unwrap()?; + let result = task.await.expect("MemTable::load could not join task")?; data.push(result); }