From 833f8abf1947ea92e3a4fbc402313509d0d40a6b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 5 Mar 2021 20:27:16 +0100 Subject: [PATCH 1/7] ctx.sql should optimize plan --- rust/arrow/benches/filter_kernels.rs | 11 +++ rust/arrow/src/array/array_binary.rs | 34 ++++--- rust/datafusion/src/execution/context.rs | 109 ++++++++++------------- 3 files changed, 72 insertions(+), 82 deletions(-) diff --git a/rust/arrow/benches/filter_kernels.rs b/rust/arrow/benches/filter_kernels.rs index 479f9abe155..ded745a7fad 100644 --- a/rust/arrow/benches/filter_kernels.rs +++ b/rust/arrow/benches/filter_kernels.rs @@ -100,6 +100,17 @@ fn add_benchmark(c: &mut Criterion) { c.bench_function("filter context string low selectivity", |b| { b.iter(|| bench_built_filter(&sparse_filter, &data_array)) }); + + let binary_array = BinaryArray::from( + data_array + .into_iter() + .map(|x| x.map(|x| x.as_bytes())) + .collect::>>(), + ); + + c.bench_function("filter context binary high selectivity", |b| { + b.iter(|| bench_built_filter(&dense_filter, &binary_array)) + }); } criterion_group!(benches, add_benchmark); diff --git a/rust/arrow/src/array/array_binary.rs b/rust/arrow/src/array/array_binary.rs index 9bc182eebb7..527811e7ddc 100644 --- a/rust/arrow/src/array/array_binary.rs +++ b/rust/arrow/src/array/array_binary.rs @@ -249,33 +249,31 @@ where let (_, data_len) = iter.size_hint(); let data_len = data_len.expect("Iterator must be sized"); // panic if no upper bound. - let mut offsets = Vec::with_capacity(data_len + 1); - let mut values = Vec::new(); + let offset_size = std::mem::size_of::(); + let mut offsets = MutableBuffer::new((data_len + 1) * offset_size); + let mut values = MutableBuffer::new(0); let mut null_buf = MutableBuffer::new_null(data_len); - let mut length_so_far: OffsetSize = OffsetSize::zero(); + let null_slice = null_buf.as_slice_mut(); + let mut length_so_far = OffsetSize::zero(); offsets.push(length_so_far); - { - let null_slice = null_buf.as_slice_mut(); - - for (i, s) in iter.enumerate() { - if let Some(s) = s { - let s = s.as_ref(); - bit_util::set_bit(null_slice, i); - length_so_far += OffsetSize::from_usize(s.len()).unwrap(); - values.extend_from_slice(s); - } - // always add an element in offsets - offsets.push(length_so_far); + for (i, s) in iter.enumerate() { + if let Some(s) = s { + let s = s.as_ref(); + bit_util::set_bit(null_slice, i); + length_so_far += OffsetSize::from_usize(s.len()).unwrap(); + values.extend_from_slice(s); } + // always add an element in offsets + offsets.push(length_so_far); } // calculate actual data_len, which may be different from the iterator's upper bound - let data_len = offsets.len() - 1; + let data_len = (offsets.len() / offset_size) - 1; let array_data = ArrayData::builder(OffsetSize::DATA_TYPE) .len(data_len) - .add_buffer(Buffer::from_slice_ref(&offsets)) - .add_buffer(Buffer::from_slice_ref(&values)) + .add_buffer(offsets.into()) + .add_buffer(values.into()) .null_bit_buffer(null_buf.into()) .build(); Self::from(array_data) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 850ce745c8c..82ef55fc658 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -37,9 +37,7 @@ use crate::datasource::parquet::ParquetTable; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::execution::dataframe_impl::DataFrameImpl; -use crate::logical_plan::{ - FunctionRegistry, LogicalPlan, LogicalPlanBuilder, ToDFSchema, -}; +use crate::logical_plan::{FunctionRegistry, LogicalPlan, LogicalPlanBuilder, ToDFSchema}; use crate::optimizer::constant_folding::ConstantFolding; use crate::optimizer::filter_push_down::FilterPushDown; use crate::optimizer::optimizer::OptimizerRule; @@ -123,7 +121,7 @@ impl ExecutionContext { /// Creates a dataframe that will execute a SQL query. pub fn sql(&mut self, sql: &str) -> Result> { - let plan = self.create_logical_plan(sql)?; + let plan = self.optimize(&self.create_logical_plan(sql)?)?; match plan { LogicalPlan::CreateExternalTable { ref schema, @@ -266,10 +264,8 @@ impl ExecutionContext { /// Registers a Parquet data source so that it can be referenced from SQL statements /// executed against this context. pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> { - let table = ParquetTable::try_new( - &filename, - self.state.lock().unwrap().config.concurrency, - )?; + let table = + ParquetTable::try_new(&filename, self.state.lock().unwrap().config.concurrency)?; self.register_table(name, Arc::new(table)); Ok(()) } @@ -295,10 +291,7 @@ impl ExecutionContext { /// Deregisters the named table. /// /// Returns the registered provider, if any - pub fn deregister_table( - &mut self, - name: &str, - ) -> Option> { + pub fn deregister_table(&mut self, name: &str) -> Option> { self.state.lock().unwrap().datasources.remove(name) } @@ -370,11 +363,7 @@ impl ExecutionContext { } /// Executes a query and writes the results to a partitioned CSV file. - pub async fn write_csv( - &self, - plan: Arc, - path: String, - ) -> Result<()> { + pub async fn write_csv(&self, plan: Arc, path: String) -> Result<()> { // create directory to contain the CSV files (one per partition) let fs_path = Path::new(&path); match fs::create_dir(fs_path) { @@ -575,10 +564,7 @@ pub struct ExecutionContextState { } impl ContextProvider for ExecutionContextState { - fn get_table_provider( - &self, - name: &str, - ) -> Option> { + fn get_table_provider(&self, name: &str) -> Option> { self.datasources.get(name).map(|ds| Arc::clone(ds)) } @@ -632,12 +618,9 @@ mod tests { logical_plan::{col, create_udf, sum}, }; use crate::{ - datasource::MemTable, logical_plan::create_udaf, - physical_plan::expressions::AvgAccumulator, - }; - use arrow::array::{ - Array, ArrayRef, DictionaryArray, Float64Array, Int32Array, Int64Array, + datasource::MemTable, logical_plan::create_udaf, physical_plan::expressions::AvgAccumulator, }; + use arrow::array::{Array, ArrayRef, DictionaryArray, Float64Array, Int32Array, Int64Array}; use arrow::compute::add; use arrow::datatypes::*; use arrow::record_batch::RecordBatch; @@ -726,8 +709,7 @@ mod tests { let provider = test::create_table_dual(); ctx.register_table("dual", provider); - let results = - plan_and_collect(&mut ctx, "SELECT @@version, @name FROM dual").await?; + let results = plan_and_collect(&mut ctx, "SELECT @@version, @name FROM dual").await?; let expected = vec![ "+----------------------+------------------------+", @@ -950,8 +932,7 @@ mod tests { #[tokio::test] async fn sort() -> Result<()> { - let results = - execute("SELECT c1, c2 FROM test ORDER BY c1 DESC, c2 ASC", 4).await?; + let results = execute("SELECT c1, c2 FROM test ORDER BY c1 DESC, c2 ASC", 4).await?; assert_eq!(results.len(), 1); let expected: Vec<&str> = vec![ @@ -1152,8 +1133,7 @@ mod tests { #[tokio::test] async fn boolean_literal() -> Result<()> { - let results = - execute("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4).await?; + let results = execute("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4).await?; assert_eq!(results.len(), 1); let expected = vec![ @@ -1174,8 +1154,7 @@ mod tests { #[tokio::test] async fn aggregate_grouped_empty() -> Result<()> { - let results = - execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?; + let results = execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?; assert_eq!(results.len(), 1); let expected = vec!["++", "||", "++", "++"]; @@ -1351,18 +1330,15 @@ mod tests { Field::new("val", val_array.data_type().clone(), false), ])); - let batch = RecordBatch::try_new(schema.clone(), vec![dict_array, val_array]) - .unwrap(); + let batch = RecordBatch::try_new(schema.clone(), vec![dict_array, val_array]).unwrap(); let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap(); ctx.register_table("t", Arc::new(provider)); - let results = plan_and_collect( - &mut ctx, - "SELECT dict, count(val) FROM t GROUP BY dict", - ) - .await - .expect("ran plan correctly"); + let results = + plan_and_collect(&mut ctx, "SELECT dict, count(val) FROM t GROUP BY dict") + .await + .expect("ran plan correctly"); let expected = vec![ "+------+------------+", @@ -1376,10 +1352,9 @@ mod tests { assert_batches_sorted_eq!(expected, &results); // Now, use dict as an aggregate - let results = - plan_and_collect(&mut ctx, "SELECT val, count(dict) FROM t GROUP BY val") - .await - .expect("ran plan correctly"); + let results = plan_and_collect(&mut ctx, "SELECT val, count(dict) FROM t GROUP BY val") + .await + .expect("ran plan correctly"); let expected = vec![ "+-----+-------------+", @@ -1660,8 +1635,7 @@ mod tests { .file_extension(file_extension), )?; let results = - plan_and_collect(&mut ctx, "SELECT SUM(c1), SUM(c2), COUNT(*) FROM test") - .await?; + plan_and_collect(&mut ctx, "SELECT SUM(c1), SUM(c2), COUNT(*) FROM test").await?; assert_eq!(results.len(), 1); let expected = vec![ @@ -1690,9 +1664,7 @@ mod tests { thread::spawn(move || { let ctx = ctx_clone.lock().expect("Locked context"); // Ensure we can create logical plan code on a separate thread. - ctx.create_logical_plan( - "SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3", - ) + ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3") }) }) .collect(); @@ -1702,6 +1674,24 @@ mod tests { } Ok(()) } + #[test] + fn ctx_sql_should_optimize_plan() -> Result<()> { + let mut ctx = ExecutionContext::new(); + let plan1 = ctx.create_logical_plan("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?; + + let opt_plan1 = ctx.optimize(&plan1)?; + + let plan2 = ctx.sql("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?; + + println!("{:?}", plan2.to_logical_plan()); + + assert_eq!( + format!("{:?}", opt_plan1), + format!("{:?}", plan2.to_logical_plan()) + ); + + Ok(()) + } #[tokio::test] async fn scalar_udf() -> Result<()> { @@ -1821,8 +1811,7 @@ mod tests { let mut ctx = ExecutionContext::new(); - let provider = - MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?; + let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?; ctx.register_table("t", Arc::new(provider)); let result = plan_and_collect(&mut ctx, "SELECT AVG(a) FROM t").await?; @@ -1858,8 +1847,7 @@ mod tests { let mut ctx = ExecutionContext::new(); - let provider = - MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?; + let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?; ctx.register_table("t", Arc::new(provider)); // define a udaf, using a DataFusion's accumulator @@ -1926,10 +1914,7 @@ mod tests { } /// Execute SQL and return results - async fn plan_and_collect( - ctx: &mut ExecutionContext, - sql: &str, - ) -> Result> { + async fn plan_and_collect(ctx: &mut ExecutionContext, sql: &str) -> Result> { let logical_plan = ctx.create_logical_plan(sql)?; let logical_plan = ctx.optimize(&logical_plan)?; let physical_plan = ctx.create_physical_plan(&logical_plan)?; @@ -1944,11 +1929,7 @@ mod tests { } /// Execute SQL and write results to partitioned csv files - async fn write_csv( - ctx: &mut ExecutionContext, - sql: &str, - out_dir: &str, - ) -> Result<()> { + async fn write_csv(ctx: &mut ExecutionContext, sql: &str, out_dir: &str) -> Result<()> { let logical_plan = ctx.create_logical_plan(sql)?; let logical_plan = ctx.optimize(&logical_plan)?; let physical_plan = ctx.create_physical_plan(&logical_plan)?; From ce423cfbc46e1e499404e072b801ea199e08cbfb Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 5 Mar 2021 20:30:34 +0100 Subject: [PATCH 2/7] fmt --- rust/datafusion/src/execution/context.rs | 92 +++++++++++++++++------- 1 file changed, 65 insertions(+), 27 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 82ef55fc658..f2fccb6f1ed 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -37,7 +37,9 @@ use crate::datasource::parquet::ParquetTable; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; use crate::execution::dataframe_impl::DataFrameImpl; -use crate::logical_plan::{FunctionRegistry, LogicalPlan, LogicalPlanBuilder, ToDFSchema}; +use crate::logical_plan::{ + FunctionRegistry, LogicalPlan, LogicalPlanBuilder, ToDFSchema, +}; use crate::optimizer::constant_folding::ConstantFolding; use crate::optimizer::filter_push_down::FilterPushDown; use crate::optimizer::optimizer::OptimizerRule; @@ -264,8 +266,10 @@ impl ExecutionContext { /// Registers a Parquet data source so that it can be referenced from SQL statements /// executed against this context. pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> { - let table = - ParquetTable::try_new(&filename, self.state.lock().unwrap().config.concurrency)?; + let table = ParquetTable::try_new( + &filename, + self.state.lock().unwrap().config.concurrency, + )?; self.register_table(name, Arc::new(table)); Ok(()) } @@ -291,7 +295,10 @@ impl ExecutionContext { /// Deregisters the named table. /// /// Returns the registered provider, if any - pub fn deregister_table(&mut self, name: &str) -> Option> { + pub fn deregister_table( + &mut self, + name: &str, + ) -> Option> { self.state.lock().unwrap().datasources.remove(name) } @@ -363,7 +370,11 @@ impl ExecutionContext { } /// Executes a query and writes the results to a partitioned CSV file. - pub async fn write_csv(&self, plan: Arc, path: String) -> Result<()> { + pub async fn write_csv( + &self, + plan: Arc, + path: String, + ) -> Result<()> { // create directory to contain the CSV files (one per partition) let fs_path = Path::new(&path); match fs::create_dir(fs_path) { @@ -564,7 +575,10 @@ pub struct ExecutionContextState { } impl ContextProvider for ExecutionContextState { - fn get_table_provider(&self, name: &str) -> Option> { + fn get_table_provider( + &self, + name: &str, + ) -> Option> { self.datasources.get(name).map(|ds| Arc::clone(ds)) } @@ -618,9 +632,12 @@ mod tests { logical_plan::{col, create_udf, sum}, }; use crate::{ - datasource::MemTable, logical_plan::create_udaf, physical_plan::expressions::AvgAccumulator, + datasource::MemTable, logical_plan::create_udaf, + physical_plan::expressions::AvgAccumulator, + }; + use arrow::array::{ + Array, ArrayRef, DictionaryArray, Float64Array, Int32Array, Int64Array, }; - use arrow::array::{Array, ArrayRef, DictionaryArray, Float64Array, Int32Array, Int64Array}; use arrow::compute::add; use arrow::datatypes::*; use arrow::record_batch::RecordBatch; @@ -709,7 +726,8 @@ mod tests { let provider = test::create_table_dual(); ctx.register_table("dual", provider); - let results = plan_and_collect(&mut ctx, "SELECT @@version, @name FROM dual").await?; + let results = + plan_and_collect(&mut ctx, "SELECT @@version, @name FROM dual").await?; let expected = vec![ "+----------------------+------------------------+", @@ -932,7 +950,8 @@ mod tests { #[tokio::test] async fn sort() -> Result<()> { - let results = execute("SELECT c1, c2 FROM test ORDER BY c1 DESC, c2 ASC", 4).await?; + let results = + execute("SELECT c1, c2 FROM test ORDER BY c1 DESC, c2 ASC", 4).await?; assert_eq!(results.len(), 1); let expected: Vec<&str> = vec![ @@ -1133,7 +1152,8 @@ mod tests { #[tokio::test] async fn boolean_literal() -> Result<()> { - let results = execute("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4).await?; + let results = + execute("SELECT c1, c3 FROM test WHERE c1 > 2 AND c3 = true", 4).await?; assert_eq!(results.len(), 1); let expected = vec![ @@ -1154,7 +1174,8 @@ mod tests { #[tokio::test] async fn aggregate_grouped_empty() -> Result<()> { - let results = execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?; + let results = + execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?; assert_eq!(results.len(), 1); let expected = vec!["++", "||", "++", "++"]; @@ -1330,15 +1351,18 @@ mod tests { Field::new("val", val_array.data_type().clone(), false), ])); - let batch = RecordBatch::try_new(schema.clone(), vec![dict_array, val_array]).unwrap(); + let batch = RecordBatch::try_new(schema.clone(), vec![dict_array, val_array]) + .unwrap(); let provider = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap(); ctx.register_table("t", Arc::new(provider)); - let results = - plan_and_collect(&mut ctx, "SELECT dict, count(val) FROM t GROUP BY dict") - .await - .expect("ran plan correctly"); + let results = plan_and_collect( + &mut ctx, + "SELECT dict, count(val) FROM t GROUP BY dict", + ) + .await + .expect("ran plan correctly"); let expected = vec![ "+------+------------+", @@ -1352,9 +1376,10 @@ mod tests { assert_batches_sorted_eq!(expected, &results); // Now, use dict as an aggregate - let results = plan_and_collect(&mut ctx, "SELECT val, count(dict) FROM t GROUP BY val") - .await - .expect("ran plan correctly"); + let results = + plan_and_collect(&mut ctx, "SELECT val, count(dict) FROM t GROUP BY val") + .await + .expect("ran plan correctly"); let expected = vec![ "+-----+-------------+", @@ -1635,7 +1660,8 @@ mod tests { .file_extension(file_extension), )?; let results = - plan_and_collect(&mut ctx, "SELECT SUM(c1), SUM(c2), COUNT(*) FROM test").await?; + plan_and_collect(&mut ctx, "SELECT SUM(c1), SUM(c2), COUNT(*) FROM test") + .await?; assert_eq!(results.len(), 1); let expected = vec![ @@ -1664,7 +1690,9 @@ mod tests { thread::spawn(move || { let ctx = ctx_clone.lock().expect("Locked context"); // Ensure we can create logical plan code on a separate thread. - ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3") + ctx.create_logical_plan( + "SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3", + ) }) }) .collect(); @@ -1677,7 +1705,8 @@ mod tests { #[test] fn ctx_sql_should_optimize_plan() -> Result<()> { let mut ctx = ExecutionContext::new(); - let plan1 = ctx.create_logical_plan("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?; + let plan1 = + ctx.create_logical_plan("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?; let opt_plan1 = ctx.optimize(&plan1)?; @@ -1811,7 +1840,8 @@ mod tests { let mut ctx = ExecutionContext::new(); - let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?; + let provider = + MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?; ctx.register_table("t", Arc::new(provider)); let result = plan_and_collect(&mut ctx, "SELECT AVG(a) FROM t").await?; @@ -1847,7 +1877,8 @@ mod tests { let mut ctx = ExecutionContext::new(); - let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?; + let provider = + MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?; ctx.register_table("t", Arc::new(provider)); // define a udaf, using a DataFusion's accumulator @@ -1914,7 +1945,10 @@ mod tests { } /// Execute SQL and return results - async fn plan_and_collect(ctx: &mut ExecutionContext, sql: &str) -> Result> { + async fn plan_and_collect( + ctx: &mut ExecutionContext, + sql: &str, + ) -> Result> { let logical_plan = ctx.create_logical_plan(sql)?; let logical_plan = ctx.optimize(&logical_plan)?; let physical_plan = ctx.create_physical_plan(&logical_plan)?; @@ -1929,7 +1963,11 @@ mod tests { } /// Execute SQL and write results to partitioned csv files - async fn write_csv(ctx: &mut ExecutionContext, sql: &str, out_dir: &str) -> Result<()> { + async fn write_csv( + ctx: &mut ExecutionContext, + sql: &str, + out_dir: &str, + ) -> Result<()> { let logical_plan = ctx.create_logical_plan(sql)?; let logical_plan = ctx.optimize(&logical_plan)?; let physical_plan = ctx.create_physical_plan(&logical_plan)?; From d0f1f2752f136b80d41e0cc5694cff99442edb8c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 5 Mar 2021 20:34:19 +0100 Subject: [PATCH 3/7] Revert unrelated --- rust/arrow/benches/filter_kernels.rs | 11 --------- rust/arrow/src/array/array_binary.rs | 34 +++++++++++++++------------- 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/rust/arrow/benches/filter_kernels.rs b/rust/arrow/benches/filter_kernels.rs index ded745a7fad..479f9abe155 100644 --- a/rust/arrow/benches/filter_kernels.rs +++ b/rust/arrow/benches/filter_kernels.rs @@ -100,17 +100,6 @@ fn add_benchmark(c: &mut Criterion) { c.bench_function("filter context string low selectivity", |b| { b.iter(|| bench_built_filter(&sparse_filter, &data_array)) }); - - let binary_array = BinaryArray::from( - data_array - .into_iter() - .map(|x| x.map(|x| x.as_bytes())) - .collect::>>(), - ); - - c.bench_function("filter context binary high selectivity", |b| { - b.iter(|| bench_built_filter(&dense_filter, &binary_array)) - }); } criterion_group!(benches, add_benchmark); diff --git a/rust/arrow/src/array/array_binary.rs b/rust/arrow/src/array/array_binary.rs index 527811e7ddc..9bc182eebb7 100644 --- a/rust/arrow/src/array/array_binary.rs +++ b/rust/arrow/src/array/array_binary.rs @@ -249,31 +249,33 @@ where let (_, data_len) = iter.size_hint(); let data_len = data_len.expect("Iterator must be sized"); // panic if no upper bound. - let offset_size = std::mem::size_of::(); - let mut offsets = MutableBuffer::new((data_len + 1) * offset_size); - let mut values = MutableBuffer::new(0); + let mut offsets = Vec::with_capacity(data_len + 1); + let mut values = Vec::new(); let mut null_buf = MutableBuffer::new_null(data_len); - let null_slice = null_buf.as_slice_mut(); - let mut length_so_far = OffsetSize::zero(); + let mut length_so_far: OffsetSize = OffsetSize::zero(); offsets.push(length_so_far); - for (i, s) in iter.enumerate() { - if let Some(s) = s { - let s = s.as_ref(); - bit_util::set_bit(null_slice, i); - length_so_far += OffsetSize::from_usize(s.len()).unwrap(); - values.extend_from_slice(s); + { + let null_slice = null_buf.as_slice_mut(); + + for (i, s) in iter.enumerate() { + if let Some(s) = s { + let s = s.as_ref(); + bit_util::set_bit(null_slice, i); + length_so_far += OffsetSize::from_usize(s.len()).unwrap(); + values.extend_from_slice(s); + } + // always add an element in offsets + offsets.push(length_so_far); } - // always add an element in offsets - offsets.push(length_so_far); } // calculate actual data_len, which may be different from the iterator's upper bound - let data_len = (offsets.len() / offset_size) - 1; + let data_len = offsets.len() - 1; let array_data = ArrayData::builder(OffsetSize::DATA_TYPE) .len(data_len) - .add_buffer(offsets.into()) - .add_buffer(values.into()) + .add_buffer(Buffer::from_slice_ref(&offsets)) + .add_buffer(Buffer::from_slice_ref(&values)) .null_bit_buffer(null_buf.into()) .build(); Self::from(array_data) From a02d91e81845a4600a7ff336ac0fdccc4d125f56 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 5 Mar 2021 20:37:27 +0100 Subject: [PATCH 4/7] Clean up --- rust/datafusion/src/execution/context.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index f2fccb6f1ed..93ccbfde927 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -1712,8 +1712,6 @@ mod tests { let plan2 = ctx.sql("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?; - println!("{:?}", plan2.to_logical_plan()); - assert_eq!( format!("{:?}", opt_plan1), format!("{:?}", plan2.to_logical_plan()) From 99cca328a65261575d56b1ab59a001c9dee8a4d9 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 5 Mar 2021 20:56:56 +0100 Subject: [PATCH 5/7] Move optimize --- rust/datafusion/src/execution/context.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 93ccbfde927..c2202c59110 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -123,7 +123,7 @@ impl ExecutionContext { /// Creates a dataframe that will execute a SQL query. pub fn sql(&mut self, sql: &str) -> Result> { - let plan = self.optimize(&self.create_logical_plan(sql)?)?; + let plan = self.create_logical_plan(sql)?; match plan { LogicalPlan::CreateExternalTable { ref schema, @@ -154,7 +154,10 @@ impl ExecutionContext { ))), }, - plan => Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))), + plan => Ok(Arc::new(DataFrameImpl::new( + self.state.clone(), + &self.optimize(&plan)?, + ))), } } From 3bfaae4e39a2103b78196d1dacce1bc9ca273748 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 6 Mar 2021 10:26:45 +0100 Subject: [PATCH 6/7] Remove existing projection check --- rust/datafusion/src/optimizer/projection_push_down.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs index 115da4b3010..b3f91afdc32 100644 --- a/rust/datafusion/src/optimizer/projection_push_down.rs +++ b/rust/datafusion/src/optimizer/projection_push_down.rs @@ -18,7 +18,7 @@ //! Projection Push Down optimizer rule ensures that only referenced columns are //! loaded into memory -use crate::error::{DataFusionError, Result}; +use crate::error::Result; use crate::logical_plan::{DFField, DFSchema, DFSchemaRef, LogicalPlan, ToDFSchema}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; @@ -57,16 +57,9 @@ impl ProjectionPushDown { fn get_projected_schema( schema: &Schema, - projection: &Option>, required_columns: &HashSet, has_projection: bool, ) -> Result<(Vec, DFSchemaRef)> { - if projection.is_some() { - return Err(DataFusionError::Internal( - "Cannot run projection push-down rule more than once".to_string(), - )); - } - // once we reach the table scan, we can use the accumulated set of column // names to construct the set of column indexes in the scan // @@ -242,13 +235,11 @@ fn optimize_plan( LogicalPlan::TableScan { table_name, source, - projection, filters, .. } => { let (projection, projected_schema) = get_projected_schema( &source.schema(), - projection, required_columns, has_projection, )?; From 42420b3d7d68564acc9aa892078a5899a43631e1 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 6 Mar 2021 10:33:23 +0100 Subject: [PATCH 7/7] Add test for running optimizer twice in projection push down --- .../src/optimizer/projection_push_down.rs | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs index b3f91afdc32..432b64501a3 100644 --- a/rust/datafusion/src/optimizer/projection_push_down.rs +++ b/rust/datafusion/src/optimizer/projection_push_down.rs @@ -238,11 +238,8 @@ fn optimize_plan( filters, .. } => { - let (projection, projected_schema) = get_projected_schema( - &source.schema(), - required_columns, - has_projection, - )?; + let (projection, projected_schema) = + get_projected_schema(&source.schema(), required_columns, has_projection)?; // return the table scan with projection Ok(LogicalPlan::TableScan { @@ -482,6 +479,26 @@ mod tests { Ok(()) } + /// tests that optimizing twice yields same plan + #[test] + fn test_double_optimization() -> Result<()> { + let table_scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(&table_scan) + .project(&[col("b")])? + .project(&[lit(1).alias("a")])? + .build()?; + + let optimized_plan1 = optimize(&plan).expect("failed to optimize plan"); + let optimized_plan2 = + optimize(&optimized_plan1).expect("failed to optimize plan"); + + let formatted_plan1 = format!("{:?}", optimized_plan1); + let formatted_plan2 = format!("{:?}", optimized_plan2); + assert_eq!(formatted_plan1, formatted_plan2); + Ok(()) + } + /// tests that it removes an aggregate is never used downstream #[test] fn table_unused_aggregate() -> Result<()> {