diff --git a/rust/datafusion/src/execution/physical_plan/hash.rs b/rust/datafusion/src/execution/physical_plan/hash.rs new file mode 100644 index 00000000000..883f3f941a3 --- /dev/null +++ b/rust/datafusion/src/execution/physical_plan/hash.rs @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines anxiliary functions for hashing keys + +use std::cell::RefCell; +use std::rc::Rc; +use std::sync::Arc; + +use crate::error::{ExecutionError, Result}; +use crate::execution::physical_plan::Accumulator; + +use arrow::array::{ + ArrayRef, BooleanArray, Int16Array, Int32Array, Int64Array, Int8Array, StringArray, + UInt16Array, UInt32Array, UInt64Array, UInt8Array, +}; +use arrow::array::{ + BooleanBuilder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, StringBuilder, + UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder, +}; +use arrow::datatypes::DataType; + +use fnv::FnvHashMap; + +/// Enumeration of types that can be used in any expression that uses an hash (all primitives except +/// for floating point numerics) +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +pub enum KeyScalar { + /// Boolean + Boolean(bool), + /// 8 bits + UInt8(u8), + /// 16 bits + UInt16(u16), + /// 32 bits + UInt32(u32), + /// 64 bits + UInt64(u64), + /// 8 bits signed + Int8(i8), + /// 16 bits signed + Int16(i16), + /// 32 bits signed + Int32(i32), + /// 64 bits signed + Int64(i64), + /// string + Utf8(String), +} + +macro_rules! safe_cast { + ($col:expr, $row:expr, $datatype:expr, $ArrayType:ty, $keyscalar: expr) => {{ + if let Some(array) = $col.as_any().downcast_ref::<$ArrayType>() { + return Ok($keyscalar(array.value($row))); + } else { + return Err(ExecutionError::General(format!( + "Failed to downcast {:?} to {:?}", + $col, $datatype + ))); + } + }}; +} + +/// Return an KeyScalar from an ArrayRef of a given row. +pub fn create_key(col: &ArrayRef, row: usize) -> Result { + match col.data_type() { + DataType::Boolean => safe_cast!( + col, + row, + DataType::Boolean, + BooleanArray, + KeyScalar::Boolean + ), + DataType::UInt8 => { + safe_cast!(col, row, DataType::UInt8, UInt8Array, KeyScalar::UInt8) + } + DataType::UInt16 => { + safe_cast!(col, row, DataType::UInt16, UInt16Array, KeyScalar::UInt16) + } + DataType::UInt32 => { + safe_cast!(col, row, DataType::UInt32, UInt32Array, KeyScalar::UInt32) + } + DataType::UInt64 => { + safe_cast!(col, row, DataType::UInt64, UInt64Array, KeyScalar::UInt64) + } + DataType::Int8 => { + safe_cast!(col, row, DataType::Int8, Int8Array, KeyScalar::Int8) + } + DataType::Int16 => { + safe_cast!(col, row, DataType::Int16, Int16Array, KeyScalar::Int16) + } + DataType::Int32 => { + safe_cast!(col, row, DataType::Int32, Int32Array, KeyScalar::Int32) + } + DataType::Int64 => { + safe_cast!(col, row, DataType::Int64, Int64Array, KeyScalar::Int64) + } + DataType::Utf8 => { + if let Some(array) = col.as_any().downcast_ref::() { + return Ok(KeyScalar::Utf8(String::from(array.value(row)))); + } else { + return Err(ExecutionError::General(format!( + "Failed to downcast {:?} to {:?}", + col, + DataType::Utf8 + ))); + } + } + _ => { + return Err(ExecutionError::ExecutionError( + "Unsupported key data type".to_string(), + )) + } + } +} + +/// Create array from `key` attribute in map entry (representing a grouping scalar value) +macro_rules! key_array_from_map_entries { + ($BUILDER:ident, $TY:ident, $MAP:expr, $COL_INDEX:expr) => {{ + let mut builder = $BUILDER::new($MAP.len()); + let mut err = false; + for k in $MAP.keys() { + match k[$COL_INDEX] { + KeyScalar::$TY(n) => builder.append_value(n).unwrap(), + _ => err = true, + } + } + if err { + Err(ExecutionError::ExecutionError( + "unexpected type when creating grouping array from aggregate map" + .to_string(), + )) + } else { + Ok(Arc::new(builder.finish()) as ArrayRef) + } + }}; +} + +/// A set of accumulators +pub type AccumulatorSet = Vec>>; + +/// Builds the array of KeyScalars from `data_type`. +pub fn create_key_array( + i: usize, + data_type: DataType, + map: &FnvHashMap, Rc>, +) -> Result { + let array: Result = match data_type { + DataType::Boolean => key_array_from_map_entries!(BooleanBuilder, Boolean, map, i), + DataType::UInt8 => key_array_from_map_entries!(UInt8Builder, UInt8, map, i), + DataType::UInt16 => key_array_from_map_entries!(UInt16Builder, UInt16, map, i), + DataType::UInt32 => key_array_from_map_entries!(UInt32Builder, UInt32, map, i), + DataType::UInt64 => key_array_from_map_entries!(UInt64Builder, UInt64, map, i), + DataType::Int8 => key_array_from_map_entries!(Int8Builder, Int8, map, i), + DataType::Int16 => key_array_from_map_entries!(Int16Builder, Int16, map, i), + DataType::Int32 => key_array_from_map_entries!(Int32Builder, Int32, map, i), + DataType::Int64 => key_array_from_map_entries!(Int64Builder, Int64, map, i), + DataType::Utf8 => { + let mut builder = StringBuilder::new(1); + for k in map.keys() { + match &k[i] { + KeyScalar::Utf8(s) => builder.append_value(&s).unwrap(), + _ => { + return Err(ExecutionError::ExecutionError( + "Unexpected value for Utf8 group column".to_string(), + )) + } + } + } + Ok(Arc::new(builder.finish()) as ArrayRef) + } + _ => Err(ExecutionError::ExecutionError( + "Unsupported key by expr".to_string(), + )), + }; + Ok(array?) +} diff --git a/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs b/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs index 19836fd864d..d4e374de316 100644 --- a/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs @@ -26,20 +26,17 @@ use crate::execution::physical_plan::{ Accumulator, AggregateExpr, ExecutionPlan, Partition, PhysicalExpr, }; -use arrow::array::{ - ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, - StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, -}; -use arrow::array::{ - Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, - Int8Builder, StringBuilder, UInt16Builder, UInt32Builder, UInt64Builder, - UInt8Builder, -}; +use arrow::array::ArrayRef; +use arrow::array::{Float32Builder, Float64Builder, Int64Builder, UInt64Builder}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::{RecordBatch, RecordBatchReader}; +use crate::execution::physical_plan::common::get_scalar_value; use crate::execution::physical_plan::expressions::col; +use crate::execution::physical_plan::hash::{ + create_key, create_key_array, AccumulatorSet, KeyScalar, +}; use crate::logicalplan::ScalarValue; use fnv::FnvHashMap; @@ -164,28 +161,6 @@ impl Partition for HashAggregatePartition { } } -/// Create array from `key` attribute in map entry (representing a grouping scalar value) -macro_rules! group_array_from_map_entries { - ($BUILDER:ident, $TY:ident, $MAP:expr, $COL_INDEX:expr) => {{ - let mut builder = $BUILDER::new($MAP.len()); - let mut err = false; - for k in $MAP.keys() { - match k[$COL_INDEX] { - GroupByScalar::$TY(n) => builder.append_value(n).unwrap(), - _ => err = true, - } - } - if err { - Err(ExecutionError::ExecutionError( - "unexpected type when creating grouping array from aggregate map" - .to_string(), - )) - } else { - Ok(Arc::new(builder.finish()) as ArrayRef) - } - }}; -} - /// Create array from `value` attribute in map entry (representing an aggregate scalar /// value) macro_rules! aggr_array_from_map_entries { @@ -236,12 +211,6 @@ macro_rules! aggr_array_from_accumulator { }}; } -#[derive(Debug)] -struct MapEntry { - k: Vec, - v: Vec>, -} - struct GroupedHashAggregateIterator { schema: SchemaRef, group_expr: Vec>, @@ -268,24 +237,6 @@ impl GroupedHashAggregateIterator { } } -type AccumulatorSet = Vec>>; - -macro_rules! update_accumulators { - ($ARRAY:ident, $ARRAY_TY:ident, $SCALAR_TY:expr, $COL:expr, $ACCUM:expr) => {{ - let primitive_array = $ARRAY.as_any().downcast_ref::<$ARRAY_TY>().unwrap(); - - for row in 0..$ARRAY.len() { - if $ARRAY.is_valid(row) { - let value = Some($SCALAR_TY(primitive_array.value(row))); - let mut accum = $ACCUM[row][$COL].borrow_mut(); - accum - .accumulate_scalar(value) - .map_err(ExecutionError::into_arrow_external_error)?; - } - } - }}; -} - impl RecordBatchReader for GroupedHashAggregateIterator { fn schema(&self) -> SchemaRef { self.schema.clone() @@ -299,7 +250,7 @@ impl RecordBatchReader for GroupedHashAggregateIterator { self.finished = true; // create map to store accumulators for each unique grouping key - let mut map: FnvHashMap, Rc> = + let mut map: FnvHashMap, Rc> = FnvHashMap::default(); // iterate over all input batches and update the accumulators @@ -327,120 +278,47 @@ impl RecordBatchReader for GroupedHashAggregateIterator { }) .collect::>>()?; - // create vector large enough to hold the grouping key + // create vector to hold the grouping key let mut key = Vec::with_capacity(group_values.len()); for _ in 0..group_values.len() { - key.push(GroupByScalar::UInt32(0)); + key.push(KeyScalar::UInt32(0)); } // iterate over each row in the batch and create the accumulators for each grouping key - let mut accumulators: Vec> = - Vec::with_capacity(batch.num_rows()); - for row in 0..batch.num_rows() { - // create grouping key for this row - create_key(&group_values, row, &mut key) - .map_err(ExecutionError::into_arrow_external_error)?; - - if let Some(accumulator_set) = map.get(&key) { - accumulators.push(accumulator_set.clone()); - } else { - let accumulator_set: AccumulatorSet = self - .aggr_expr - .iter() - .map(|expr| expr.create_accumulator()) - .collect(); - - let accumulator_set = Rc::new(accumulator_set); - - map.insert(key.clone(), accumulator_set.clone()); - accumulators.push(accumulator_set); + // create and assign the grouping key of this row + for i in 0..group_values.len() { + key[i] = create_key(&group_values[i], row) + .map_err(ExecutionError::into_arrow_external_error)?; } - } - // iterate over each non-grouping column in the batch and update the accumulator - // for each row - for col in 0..aggr_input_values.len() { - let array = &aggr_input_values[col]; - - match array.data_type() { - DataType::Int8 => update_accumulators!( - array, - Int8Array, - ScalarValue::Int8, - col, - accumulators - ), - DataType::Int16 => update_accumulators!( - array, - Int16Array, - ScalarValue::Int16, - col, - accumulators - ), - DataType::Int32 => update_accumulators!( - array, - Int32Array, - ScalarValue::Int32, - col, - accumulators - ), - DataType::Int64 => update_accumulators!( - array, - Int64Array, - ScalarValue::Int64, - col, - accumulators - ), - DataType::UInt8 => update_accumulators!( - array, - UInt8Array, - ScalarValue::UInt8, - col, - accumulators - ), - DataType::UInt16 => update_accumulators!( - array, - UInt16Array, - ScalarValue::UInt16, - col, - accumulators - ), - DataType::UInt32 => update_accumulators!( - array, - UInt32Array, - ScalarValue::UInt32, - col, - accumulators - ), - DataType::UInt64 => update_accumulators!( - array, - UInt64Array, - ScalarValue::UInt64, - col, - accumulators - ), - DataType::Float32 => update_accumulators!( - array, - Float32Array, - ScalarValue::Float32, - col, - accumulators - ), - DataType::Float64 => update_accumulators!( - array, - Float64Array, - ScalarValue::Float64, - col, - accumulators - ), - other => { - return Err(ExecutionError::ExecutionError(format!( - "Unsupported data type {:?} for result of aggregate expression", - other - )).into_arrow_external_error()); + // for each new key on the map, add an accumulatorSet to the map + + // get (or create) the accumulator set for this key + let accumulator_set = match map.get(&key) { + None => { + let accumulator_set: AccumulatorSet = self + .aggr_expr + .iter() + .map(|expr| expr.create_accumulator()) + .collect(); + let accumulator_set = Rc::new(accumulator_set); + map.insert(key.clone(), accumulator_set.clone()); + accumulator_set } + Some(accumulator_set) => accumulator_set.clone(), }; + + // update each accumulator with the values from the aggregated input + for col in 0..aggr_input_values.len() { + let value = get_scalar_value(&aggr_input_values[col], row) + .map_err(ExecutionError::into_arrow_external_error)?; + + let mut accum = accumulator_set[col].borrow_mut(); + accum + .accumulate_scalar(value) + .map_err(ExecutionError::into_arrow_external_error)?; + } } } @@ -452,54 +330,12 @@ impl RecordBatchReader for GroupedHashAggregateIterator { // grouping values for i in 0..self.group_expr.len() { - let array: Result = match self.group_expr[i] + let data_type = self.group_expr[i] .data_type(&input_schema) - .map_err(ExecutionError::into_arrow_external_error)? - { - DataType::UInt8 => { - group_array_from_map_entries!(UInt8Builder, UInt8, map, i) - } - DataType::UInt16 => { - group_array_from_map_entries!(UInt16Builder, UInt16, map, i) - } - DataType::UInt32 => { - group_array_from_map_entries!(UInt32Builder, UInt32, map, i) - } - DataType::UInt64 => { - group_array_from_map_entries!(UInt64Builder, UInt64, map, i) - } - DataType::Int8 => { - group_array_from_map_entries!(Int8Builder, Int8, map, i) - } - DataType::Int16 => { - group_array_from_map_entries!(Int16Builder, Int16, map, i) - } - DataType::Int32 => { - group_array_from_map_entries!(Int32Builder, Int32, map, i) - } - DataType::Int64 => { - group_array_from_map_entries!(Int64Builder, Int64, map, i) - } - DataType::Utf8 => { - let mut builder = StringBuilder::new(1); - for k in map.keys() { - match &k[i] { - GroupByScalar::Utf8(s) => builder.append_value(&s).unwrap(), - _ => { - return Err(ExecutionError::ExecutionError( - "Unexpected value for Utf8 group column".to_string(), - ) - .into_arrow_external_error()) - } - } - } - Ok(Arc::new(builder.finish()) as ArrayRef) - } - _ => Err(ExecutionError::ExecutionError( - "Unsupported group by expr".to_string(), - )), - }; - result_arrays.push(array.map_err(ExecutionError::into_arrow_external_error)?); + .map_err(ExecutionError::into_arrow_external_error)?; + let array = create_key_array(i, data_type, &map) + .map_err(ExecutionError::into_arrow_external_error)?; + result_arrays.push(array); // aggregate values for i in 0..self.aggr_expr.len() { @@ -677,76 +513,6 @@ impl RecordBatchReader for HashAggregateIterator { } } -/// Enumeration of types that can be used in a GROUP BY expression (all primitives except -/// for floating point numerics) -#[derive(Debug, PartialEq, Eq, Hash, Clone)] -enum GroupByScalar { - UInt8(u8), - UInt16(u16), - UInt32(u32), - UInt64(u64), - Int8(i8), - Int16(i16), - Int32(i32), - Int64(i64), - Utf8(String), -} - -/// Create a Vec that can be used as a map key -fn create_key( - group_by_keys: &[ArrayRef], - row: usize, - vec: &mut Vec, -) -> Result<()> { - for i in 0..group_by_keys.len() { - let col = &group_by_keys[i]; - match col.data_type() { - DataType::UInt8 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec[i] = GroupByScalar::UInt8(array.value(row)) - } - DataType::UInt16 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec[i] = GroupByScalar::UInt16(array.value(row)) - } - DataType::UInt32 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec[i] = GroupByScalar::UInt32(array.value(row)) - } - DataType::UInt64 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec[i] = GroupByScalar::UInt64(array.value(row)) - } - DataType::Int8 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec[i] = GroupByScalar::Int8(array.value(row)) - } - DataType::Int16 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec[i] = GroupByScalar::Int16(array.value(row)) - } - DataType::Int32 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec[i] = GroupByScalar::Int32(array.value(row)) - } - DataType::Int64 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec[i] = GroupByScalar::Int64(array.value(row)) - } - DataType::Utf8 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec[i] = GroupByScalar::Utf8(String::from(array.value(row))) - } - _ => { - return Err(ExecutionError::ExecutionError( - "Unsupported GROUP BY data type".to_string(), - )) - } - } - } - Ok(()) -} - #[cfg(test)] mod tests { @@ -755,6 +521,7 @@ mod tests { use crate::execution::physical_plan::expressions::{col, sum}; use crate::execution::physical_plan::merge::MergeExec; use crate::test; + use arrow::array::{Int64Array, UInt32Array}; #[test] fn aggregate() -> Result<()> { diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs index 2e191784678..4d5c56e349b 100644 --- a/rust/datafusion/src/execution/physical_plan/mod.rs +++ b/rust/datafusion/src/execution/physical_plan/mod.rs @@ -80,6 +80,7 @@ pub mod common; pub mod csv; pub mod datasource; pub mod expressions; +pub mod hash; pub mod hash_aggregate; pub mod limit; pub mod math_expressions;