From a660bc3512822792962184a69f53a2d32e36b072 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 15 Mar 2023 14:00:07 +0000 Subject: [PATCH 1/3] Cleanup direct uses of ArrayData --- datafusion/common/src/scalar.rs | 2 +- .../src/avro_to_arrow/arrow_array_reader.rs | 8 +++---- .../file_format/parquet/row_filter.rs | 5 ++--- .../core/src/physical_plan/joins/utils.rs | 21 +++++++++---------- .../physical-expr/src/aggregate/count.rs | 6 +++--- datafusion/physical-expr/src/aggregate/sum.rs | 4 ++-- .../physical-expr/src/expressions/case.rs | 6 +++--- .../physical-expr/src/expressions/in_list.rs | 7 +++---- 8 files changed, 27 insertions(+), 32 deletions(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 5dc0425869c22..73352941afa76 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -2358,7 +2358,7 @@ impl ScalarValue { None => v.is_null(), } } - ScalarValue::Null => array.data().is_null(index), + ScalarValue::Null => array.is_null(index), } } diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs index 1f06078e46270..313c2a1596ea1 100644 --- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs @@ -435,7 +435,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { }); let valid_len = cur_offset.to_usize().unwrap(); let array_data = match list_field.data_type() { - DataType::Null => NullArray::new(valid_len).data().clone(), + DataType::Null => NullArray::new(valid_len).into_data(), DataType::Boolean => { let num_bytes = bit_util::ceil(valid_len, 8); let mut bool_values = MutableBuffer::from_len_zeroed(num_bytes); @@ -496,13 +496,11 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { DataType::Utf8 => flatten_string_values(rows) .into_iter() .collect::() - .data() - .clone(), + .into_data(), DataType::LargeUtf8 => flatten_string_values(rows) .into_iter() .collect::() - .data() - .clone(), + .into_data(), DataType::List(field) => { let child = self.build_nested_list_array::(&flatten_values(rows), field)?; diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs index 54478edc73c5e..91ae8afe6f297 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{Array, BooleanArray}; +use arrow::array::BooleanArray; use arrow::datatypes::{DataType, Schema}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; @@ -131,8 +131,7 @@ impl ArrowPredicate for DatafusionArrowPredicate { .map(|v| v.into_array(batch.num_rows())) { Ok(array) => { - let mask = as_boolean_array(&array)?; - let bool_arr = BooleanArray::from(mask.data().clone()); + let bool_arr = as_boolean_array(&array)?.clone(); let num_filtered = bool_arr.len() - bool_arr.true_count(); self.rows_filtered.add(num_filtered); timer.stop(); diff --git a/datafusion/core/src/physical_plan/joins/utils.rs b/datafusion/core/src/physical_plan/joins/utils.rs index a756d2ba89381..f10ad7a15ae35 100644 --- a/datafusion/core/src/physical_plan/joins/utils.rs +++ b/datafusion/core/src/physical_plan/joins/utils.rs @@ -18,11 +18,11 @@ //! Join related functionality used both on logical and physical plans use arrow::array::{ - new_null_array, Array, BooleanBufferBuilder, PrimitiveArray, UInt32Array, + downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, }; use arrow::compute; -use arrow::datatypes::{Field, Schema, UInt32Type, UInt64Type}; +use arrow::datatypes::{Field, Schema}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use futures::future::{BoxFuture, Shared}; use futures::{ready, FutureExt}; @@ -783,8 +783,8 @@ pub(crate) fn apply_join_filter_to_indices( filter.schema(), build_input_buffer, probe_batch, - PrimitiveArray::from(build_indices.data().clone()), - PrimitiveArray::from(probe_indices.data().clone()), + build_indices.clone(), + probe_indices.clone(), filter.column_indices(), build_side, )?; @@ -794,13 +794,12 @@ pub(crate) fn apply_join_filter_to_indices( .into_array(intermediate_batch.num_rows()); let mask = as_boolean_array(&filter_result)?; - let left_filtered = PrimitiveArray::::from( - compute::filter(&build_indices, mask)?.data().clone(), - ); - let right_filtered = PrimitiveArray::::from( - compute::filter(&probe_indices, mask)?.data().clone(), - ); - Ok((left_filtered, right_filtered)) + let left_filtered = compute::filter(&build_indices, mask)?; + let right_filtered = compute::filter(&probe_indices, mask)?; + Ok(( + downcast_array(left_filtered.as_ref()), + downcast_array(right_filtered.as_ref()), + )) } /// Returns a new [RecordBatch] by combining the `left` and `right` according to `indices`. diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 38b193ebf270d..dc77b794a283b 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -137,13 +137,13 @@ impl Accumulator for CountAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let array = &values[0]; - self.count += (array.len() - array.data().null_count()) as i64; + self.count += (array.len() - array.null_count()) as i64; Ok(()) } fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let array = &values[0]; - self.count -= (array.len() - array.data().null_count()) as i64; + self.count -= (array.len() - array.null_count()) as i64; Ok(()) } @@ -183,7 +183,7 @@ impl RowAccumulator for CountRowAccumulator { accessor: &mut RowAccessor, ) -> Result<()> { let array = &values[0]; - let delta = (array.len() - array.data().null_count()) as u64; + let delta = (array.len() - array.null_count()) as u64; accessor.add_u64(self.state_index, delta); Ok(()) } diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index 0a21de22d32d0..a815a33c8c7fe 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -236,7 +236,7 @@ impl Accumulator for SumAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = &values[0]; - self.count += (values.len() - values.data().null_count()) as u64; + self.count += (values.len() - values.null_count()) as u64; let delta = sum_batch(values, &self.sum.get_datatype())?; self.sum = self.sum.add(&delta)?; Ok(()) @@ -244,7 +244,7 @@ impl Accumulator for SumAccumulator { fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = &values[0]; - self.count -= (values.len() - values.data().null_count()) as u64; + self.count -= (values.len() - values.null_count()) as u64; let delta = sum_batch(values, &self.sum.get_datatype())?; self.sum = self.sum.sub(&delta)?; Ok(()) diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index 3dff4ae9745a4..d649474455dbf 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -716,10 +716,10 @@ mod tests { //let valid_array = vec![true, false, false, true, false, tru let null_buffer = Buffer::from([0b00101001u8]); - let load4 = ArrayDataBuilder::new(load4.data_type().clone()) - .len(load4.len()) + let load4 = load4 + .into_data() + .into_builder() .null_bit_buffer(Some(null_buffer)) - .buffers(load4.data().buffers().to_vec()) .build() .unwrap(); let load4: Float64Array = load4.into(); diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 3a5a25ff66cf2..e37d5cd7498a2 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -86,7 +86,7 @@ where { fn new(array: &T, hash_set: ArrayHashSet) -> Self { Self { - array: T::from(array.data().clone()), + array: downcast_array(array), hash_set, } } @@ -103,15 +103,14 @@ where v => { let values_contains = self.contains(v.values().as_ref(), negated)?; let result = take(&values_contains, v.keys(), None)?; - return Ok(BooleanArray::from(result.data().clone())) + return Ok(downcast_array(result.as_ref())) } _ => {} } let v = v.as_any().downcast_ref::().unwrap(); - let in_data = self.array.data(); let in_array = &self.array; - let has_nulls = in_data.null_count() != 0; + let has_nulls = in_array.null_count() != 0; Ok(ArrayIter::new(v) .map(|v| { From 3ea3f517e7aef741ea999a1ce79265e5b7115926 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 15 Mar 2023 16:28:19 +0000 Subject: [PATCH 2/3] Fix DataFrame::count --- datafusion/core/src/dataframe.rs | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index af41725db5186..f56497ca2a0ca 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -20,11 +20,11 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{Array, ArrayRef, Int64Array, StringArray}; +use arrow::array::{ArrayRef, StringArray}; use arrow::compute::{cast, concat}; use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; -use datafusion_common::DataFusionError; +use futures::StreamExt; use parquet::file::properties::WriterProperties; use datafusion_common::from_slice::FromSlice; @@ -627,22 +627,12 @@ impl DataFrame { /// # } /// ``` pub async fn count(self) -> Result { - let rows = self - .aggregate( - vec![], - vec![datafusion_expr::count(Expr::Literal(ScalarValue::Null))], - )? - .collect() - .await?; - let len = *rows - .first() - .and_then(|r| r.columns().first()) - .and_then(|c| c.as_any().downcast_ref::()) - .and_then(|a| a.values().first()) - .ok_or(DataFusionError::Internal( - "Unexpected output when collecting for count()".to_string(), - ))? as usize; - Ok(len) + let mut stream = self.execute_stream().await?; + let mut rows = 0; + while let Some(b) = stream.next().await { + rows += b?.num_rows(); + } + Ok(rows) } /// Convert the logical plan represented by this DataFrame into a physical plan and From 134626a3e0abf00f687fdbaf1e959fe1e3ec49e4 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 15 Mar 2023 18:56:36 +0000 Subject: [PATCH 3/3] COUNT_STAR_EXPANSION --- datafusion/core/src/dataframe.rs | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index f56497ca2a0ca..1b8b742c2ae1b 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -20,18 +20,18 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{ArrayRef, StringArray}; +use arrow::array::{Array, ArrayRef, Int64Array, StringArray}; use arrow::compute::{cast, concat}; use arrow::datatypes::{DataType, Field}; use async_trait::async_trait; -use futures::StreamExt; +use datafusion_common::DataFusionError; use parquet::file::properties::WriterProperties; use datafusion_common::from_slice::FromSlice; use datafusion_common::{Column, DFSchema, ScalarValue}; use datafusion_expr::{ - avg, count, is_null, max, median, min, stddev, TableProviderFilterPushDown, - UNNAMED_TABLE, + avg, count, is_null, max, median, min, stddev, utils::COUNT_STAR_EXPANSION, + TableProviderFilterPushDown, UNNAMED_TABLE, }; use crate::arrow::datatypes::Schema; @@ -627,12 +627,22 @@ impl DataFrame { /// # } /// ``` pub async fn count(self) -> Result { - let mut stream = self.execute_stream().await?; - let mut rows = 0; - while let Some(b) = stream.next().await { - rows += b?.num_rows(); - } - Ok(rows) + let rows = self + .aggregate( + vec![], + vec![datafusion_expr::count(Expr::Literal(COUNT_STAR_EXPANSION))], + )? + .collect() + .await?; + let len = *rows + .first() + .and_then(|r| r.columns().first()) + .and_then(|c| c.as_any().downcast_ref::()) + .and_then(|a| a.values().first()) + .ok_or(DataFusionError::Internal( + "Unexpected output when collecting for count()".to_string(), + ))? as usize; + Ok(len) } /// Convert the logical plan represented by this DataFrame into a physical plan and