From 5acff2cc6f673de75e730b18449591d6a8da8007 Mon Sep 17 00:00:00 2001 From: Morgan Cassels Date: Mon, 30 Mar 2020 11:50:02 -0700 Subject: [PATCH 01/13] implement list columns --- rust/arrow/src/array/array.rs | 8 + rust/arrow/src/compute/kernels/comparison.rs | 258 +++++++- rust/arrow/src/compute/kernels/filter.rs | 206 +++++- rust/arrow/src/ipc/convert.rs | 3 +- rust/arrow/src/record_batch.rs | 116 +++- rust/datafusion/Cargo.toml | 2 +- .../execution/physical_plan/expressions.rs | 136 +++- rust/datafusion/src/logicalplan.rs | 2 + .../src/optimizer/projection_push_down.rs | 3 + .../datafusion/src/optimizer/type_coercion.rs | 23 +- rust/datafusion/src/sql/planner.rs | 1 + rust/datafusion/tests/sql.rs | 44 ++ rust/parquet/src/arrow/array_reader.rs | 615 +++++++++++++++++- 13 files changed, 1361 insertions(+), 56 deletions(-) diff --git a/rust/arrow/src/array/array.rs b/rust/arrow/src/array/array.rs index 05620e383a1..9759b3c1cef 100644 --- a/rust/arrow/src/array/array.rs +++ b/rust/arrow/src/array/array.rs @@ -2323,6 +2323,14 @@ mod tests { BooleanArray::from(data); } + #[test] + fn test_empty_list_array() { + let values_builder = Int32Builder::new(10); + let mut builder = ListBuilder::new(values_builder); + let list_array = builder.finish(); + assert_eq!(0, list_array.len()); + } + #[test] fn test_list_array() { // Construct a value array diff --git a/rust/arrow/src/compute/kernels/comparison.rs b/rust/arrow/src/compute/kernels/comparison.rs index 0409fac0a96..b282d5cacea 100644 --- a/rust/arrow/src/compute/kernels/comparison.rs +++ b/rust/arrow/src/compute/kernels/comparison.rs @@ -27,9 +27,11 @@ use std::collections::HashMap; use std::sync::Arc; use crate::array::*; +use crate::buffer::{Buffer, MutableBuffer}; use crate::compute::util::apply_bin_op_to_option_bitmap; use crate::datatypes::{ArrowNumericType, BooleanType, DataType}; use crate::error::{ArrowError, Result}; +use crate::util::bit_util; /// Helper function to perform boolean lambda function on values from two arrays, this /// version does not attempt to use SIMD. @@ -347,10 +349,158 @@ where compare_op!(left, right, |a, b| a >= b) } +// create a buffer and fill it with valid bits +fn new_all_set_buffer(len: usize) -> Buffer { + let buffer = MutableBuffer::new(len); + let buffer = buffer.with_bitset(len, true); + let buffer = buffer.freeze(); + buffer +} + +pub fn contains(left: &PrimitiveArray, right: &ListArray) -> Result +where + T: ArrowNumericType, +{ + if left.len() != right.len() { + return Err(ArrowError::ComputeError( + "Cannot perform comparison operation on arrays of different length" + .to_string(), + )); + } + + let not_both_null_bit_buffer = match apply_bin_op_to_option_bitmap( + left.data().null_bitmap(), + right.data().null_bitmap(), + |a, b| a | b, + ) { + Ok(both_null_buff) => match both_null_buff { + Some(buff) => buff, + _ => new_all_set_buffer(left.len()), + }, + _ => new_all_set_buffer(left.len()), + }; + let not_both_null_bitmap = not_both_null_bit_buffer.data(); + + let left_data = left.data(); + let left_null_bitmap = match left_data.null_bitmap() { + Some(bitmap) => bitmap.clone().to_buffer(), + _ => new_all_set_buffer(left.len()), + }; + let left_null_bitmap = left_null_bitmap.data(); + + let mut result = BooleanBufferBuilder::new(left.len()); + + for i in 0..left.len() { + let mut is_in = false; + + // contains(null, null) = false + if bit_util::get_bit(not_both_null_bitmap, i) { + let list = right.value(i); + + // contains(null, [null]) = true + if !bit_util::get_bit(left_null_bitmap, i) { + if list.null_count() > 0 { + is_in = true; + } + } else { + let list = list.as_any().downcast_ref::>().unwrap(); + + for j in 0..list.len() { + if list.is_valid(j) && (left.value(i) == list.value(j)) { + is_in = true; + } + } + } + } + result.append(is_in)?; + } + + let data = ArrayData::new( + DataType::Boolean, + left.len(), + None, + None, + left.offset(), + vec![result.finish()], + vec![], + ); + Ok(PrimitiveArray::::from(Arc::new(data))) +} + +pub fn contains_utf8(left: &StringArray, right: &ListArray) -> Result { + if left.len() != right.len() { + return Err(ArrowError::ComputeError( + "Cannot perform comparison operation on arrays of different length" + .to_string(), + )); + } + + let not_both_null_bit_buffer = match apply_bin_op_to_option_bitmap( + left.data().null_bitmap(), + right.data().null_bitmap(), + |a, b| a | b, + ) { + Ok(both_null_buff) => match both_null_buff { + Some(buff) => buff, + _ => new_all_set_buffer(left.len()), + }, + _ => new_all_set_buffer(left.len()), + }; + let not_both_null_bitmap = not_both_null_bit_buffer.data(); + + let left_data = left.data(); + let left_null_bitmap = match left_data.null_bitmap() { + Some(bitmap) => bitmap.clone().to_buffer(), + _ => new_all_set_buffer(left.len()), + }; + let left_null_bitmap = left_null_bitmap.data(); + + let mut result = BooleanBufferBuilder::new(left.len()); + + for i in 0..left.len() { + let mut is_in = false; + + // contains(null, null) = false + if bit_util::get_bit(not_both_null_bitmap, i) { + let list = right.value(i); + + // contains(null, [null]) = true + if !bit_util::get_bit(left_null_bitmap, i) { + if list.null_count() > 0 { + is_in = true; + } + } else { + let list = list.as_any().downcast_ref::().unwrap(); + + for j in 0..list.len() { + if list.is_valid(j) && (left.value(i) == list.value(j)) { + is_in = true; + } + } + } + } + result.append(is_in)?; + } + + let data = ArrayData::new( + DataType::Boolean, + left.len(), + None, + None, + left.offset(), + vec![result.finish()], + vec![], + ); + Ok(PrimitiveArray::::from(Arc::new(data))) +} + #[cfg(test)] mod tests { use super::*; use crate::array::Int32Array; + use crate::buffer::Buffer; + use crate::datatypes::ToByteSlice; + use std::convert::TryFrom; #[test] fn test_primitive_array_eq() { @@ -457,13 +607,119 @@ mod tests { #[test] fn test_primitive_array_gt_eq_nulls() { let a = Int32Array::from(vec![None, None, Some(1)]); - let b = Int32Array::from(vec![None, Some(1), None]); + let b = Int32Array::from(vec![None, Some(1), Some(1)]); let c = gt_eq(&a, &b).unwrap(); assert_eq!(true, c.value(0)); assert_eq!(false, c.value(1)); assert_eq!(true, c.value(2)); } + // Expected behaviour: + // contains(1, [1, 2, null]) = true + // contains(3, [1, 2, null]) = false + // contains(null, [1, 2, null]) = true + // contains(null, null) = false + #[test] + fn test_contains() { + let value_data = Int32Array::from(vec![ + Some(0), + Some(1), + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + None, + Some(7), + ]) + .data(); + let value_offsets = Buffer::from(&[0, 3, 6, 6, 9].to_byte_slice()); + let list_data_type = DataType::List(Box::new(DataType::Int32)); + let list_data = ArrayData::builder(list_data_type) + .len(4) + .add_buffer(value_offsets) + .null_count(1) + .add_child_data(value_data) + .null_bit_buffer(Buffer::from([0b00001011])) + .build(); + + // [[0, 1, 2], [3, 4, 5], null, [6, null, 7]] + let list_array = ListArray::from(list_data); + + let nulls = Int32Array::from(vec![None, None, None, None]); + let nulls_result = contains(&nulls, &list_array).unwrap(); + assert_eq!( + nulls_result + .as_any() + .downcast_ref::() + .unwrap(), + &BooleanArray::from(vec![false, false, false, true]), + ); + + let values = Int32Array::from(vec![Some(0), Some(0), Some(0), Some(0)]); + let values_result = contains(&values, &list_array).unwrap(); + assert_eq!( + values_result + .as_any() + .downcast_ref::() + .unwrap(), + &BooleanArray::from(vec![true, false, false, false]), + ); + } + + // Expected behaviour: + // contains("ab", ["ab", "cd", null]) = true + // contains("ef", ["ab", "cd", null]) = false + // contains(null, ["ab", "cd", null]) = true + // contains(null, null) = false + #[test] + fn test_contains_utf8() { + let values_builder = StringBuilder::new(10); + let mut builder = ListBuilder::new(values_builder); + + builder.values().append_value("Lorem").unwrap(); + builder.values().append_value("ipsum").unwrap(); + builder.values().append_null().unwrap(); + builder.append(true).unwrap(); + builder.values().append_value("sit").unwrap(); + builder.values().append_value("amet").unwrap(); + builder.values().append_value("Lorem").unwrap(); + builder.append(true).unwrap(); + builder.append(false).unwrap(); + builder.values().append_value("ipsum").unwrap(); + builder.append(true).unwrap(); + + // [["Lorem", "ipsum", null], ["sit", "amet", "Lorem"], null, ["ipsum"]] + // value_offsets = [0, 3, 6, 6] + let list_array = builder.finish(); + + let nulls = StringArray::try_from(vec![None, None, None, None]).unwrap(); + let nulls_result = contains_utf8(&nulls, &list_array).unwrap(); + assert_eq!( + nulls_result + .as_any() + .downcast_ref::() + .unwrap(), + &BooleanArray::from(vec![true, false, false, false]), + ); + + let values = StringArray::try_from(vec![ + Some("Lorem"), + Some("Lorem"), + Some("Lorem"), + Some("Lorem"), + ]) + .unwrap(); + let values_result = contains_utf8(&values, &list_array).unwrap(); + assert_eq!( + values_result + .as_any() + .downcast_ref::() + .unwrap(), + &BooleanArray::from(vec![true, true, false, false]), + ); + } + macro_rules! test_utf8 { ($test_name:ident, $left:expr, $right:expr, $op:expr, $expected:expr) => { #[test] diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index 52e12cfef19..0f143877144 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use crate::array::*; -use crate::datatypes::{ArrowNumericType, DataType, TimeUnit}; +use crate::datatypes::*; use crate::error::{ArrowError, Result}; /// Helper function to perform boolean lambda function on values from two arrays. @@ -73,6 +73,66 @@ macro_rules! filter_array { }}; } +macro_rules! filter_primitive_item_list_array { + ($array:expr, $filter:expr, $item_type:ident) => {{ + let list_of_lists = $array.as_any().downcast_ref::().unwrap(); + let values_builder = PrimitiveBuilder::<$item_type>::new(list_of_lists.len()); + let mut builder = ListBuilder::new(values_builder); + for i in 0..list_of_lists.len() { + if $filter.value(i) { + if list_of_lists.is_null(i) { + builder.append(false)?; + } else { + let this_inner_list = list_of_lists.value(i); + let inner_list = this_inner_list + .as_any() + .downcast_ref::>() + .unwrap(); + for j in 0..inner_list.len() { + if inner_list.is_null(j) { + builder.values().append_null()?; + } else { + builder.values().append_value(inner_list.value(j))?; + } + } + builder.append(true)?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! filter_non_primitive_item_list_array { + ($array:expr, $filter:expr, $item_array_type:ident, $item_builder:ident) => {{ + let list_of_lists = $array.as_any().downcast_ref::().unwrap(); + let values_builder = $item_builder::new(list_of_lists.len()); + let mut builder = ListBuilder::new(values_builder); + for i in 0..list_of_lists.len() { + if $filter.value(i) { + if list_of_lists.is_null(i) { + builder.append(false)?; + } else { + let this_inner_list = list_of_lists.value(i); + let inner_list = this_inner_list + .as_any() + .downcast_ref::<$item_array_type>() + .unwrap(); + for j in 0..inner_list.len() { + if inner_list.is_null(j) { + builder.values().append_null()?; + } else { + builder.values().append_value(inner_list.value(j))?; + } + } + builder.append(true)?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + /// Returns the array, taking only the elements matching the filter pub fn filter(array: &Array, filter: &BooleanArray) -> Result { match array.data_type() { @@ -125,6 +185,99 @@ pub fn filter(array: &Array, filter: &BooleanArray) -> Result { DataType::Timestamp(TimeUnit::Nanosecond, _) => { filter_array!(array, filter, TimestampNanosecondArray) } + DataType::List(dt) => match &**dt { + DataType::UInt8 => { + filter_primitive_item_list_array!(array, filter, UInt8Type) + } + DataType::UInt16 => { + filter_primitive_item_list_array!(array, filter, UInt16Type) + } + DataType::UInt32 => { + filter_primitive_item_list_array!(array, filter, UInt32Type) + } + DataType::UInt64 => { + filter_primitive_item_list_array!(array, filter, UInt64Type) + } + DataType::Int8 => filter_primitive_item_list_array!(array, filter, Int8Type), + DataType::Int16 => { + filter_primitive_item_list_array!(array, filter, Int16Type) + } + DataType::Int32 => { + filter_primitive_item_list_array!(array, filter, Int32Type) + } + DataType::Int64 => { + filter_primitive_item_list_array!(array, filter, Int64Type) + } + DataType::Float32 => { + filter_primitive_item_list_array!(array, filter, Float32Type) + } + DataType::Float64 => { + filter_primitive_item_list_array!(array, filter, Float64Type) + } + DataType::Boolean => { + filter_primitive_item_list_array!(array, filter, BooleanType) + } + DataType::Date32(_) => { + filter_primitive_item_list_array!(array, filter, Date32Type) + } + DataType::Date64(_) => { + filter_primitive_item_list_array!(array, filter, Date64Type) + } + DataType::Time32(TimeUnit::Second) => { + filter_primitive_item_list_array!(array, filter, Time32SecondType) + } + DataType::Time32(TimeUnit::Millisecond) => { + filter_primitive_item_list_array!(array, filter, Time32MillisecondType) + } + DataType::Time64(TimeUnit::Microsecond) => { + filter_primitive_item_list_array!(array, filter, Time64MicrosecondType) + } + DataType::Time64(TimeUnit::Nanosecond) => { + filter_primitive_item_list_array!(array, filter, Time64NanosecondType) + } + DataType::Duration(TimeUnit::Second) => { + filter_primitive_item_list_array!(array, filter, DurationSecondType) + } + DataType::Duration(TimeUnit::Millisecond) => { + filter_primitive_item_list_array!(array, filter, DurationMillisecondType) + } + DataType::Duration(TimeUnit::Microsecond) => { + filter_primitive_item_list_array!(array, filter, DurationMicrosecondType) + } + DataType::Duration(TimeUnit::Nanosecond) => { + filter_primitive_item_list_array!(array, filter, DurationNanosecondType) + } + DataType::Timestamp(TimeUnit::Second, _) => { + filter_primitive_item_list_array!(array, filter, TimestampSecondType) + } + DataType::Timestamp(TimeUnit::Millisecond, _) => { + filter_primitive_item_list_array!(array, filter, TimestampMillisecondType) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + filter_primitive_item_list_array!(array, filter, TimestampMicrosecondType) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + filter_primitive_item_list_array!(array, filter, TimestampNanosecondType) + } + DataType::Binary => filter_non_primitive_item_list_array!( + array, + filter, + BinaryArray, + BinaryBuilder + ), + DataType::Utf8 => filter_non_primitive_item_list_array!( + array, + filter, + StringArray, + StringBuilder + ), + other => { + return Err(ArrowError::ComputeError(format!( + "filter not supported for List({:?})", + other + ))); + } + }, DataType::Binary => { let b = array.as_any().downcast_ref::().unwrap(); let mut values: Vec<&[u8]> = Vec::with_capacity(b.len()); @@ -155,6 +308,8 @@ pub fn filter(array: &Array, filter: &BooleanArray) -> Result { #[cfg(test)] mod tests { use super::*; + use crate::buffer::Buffer; + use crate::datatypes::ToByteSlice; macro_rules! def_temporal_test { ($test:ident, $array_type: ident, $data: expr) => { @@ -273,4 +428,53 @@ mod tests { assert_eq!(1, d.len()); assert_eq!(true, d.is_null(0)); } + + #[test] + fn test_filter_list_array() { + let value_data = ArrayData::builder(DataType::Int32) + .len(8) + .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) + .build(); + + let value_offsets = Buffer::from(&[0, 3, 6, 8, 8].to_byte_slice()); + + let list_data_type = DataType::List(Box::new(DataType::Int32)); + let list_data = ArrayData::builder(list_data_type.clone()) + .len(4) + .add_buffer(value_offsets.clone()) + .add_child_data(value_data.clone()) + .null_bit_buffer(Buffer::from([0b00000111])) + .build(); + + // a = [[0, 1, 2], [3, 4, 5], [6, 7], null] + let a = ListArray::from(list_data); + let b = BooleanArray::from(vec![false, true, false, true]); + let c = filter(&a, &b).unwrap(); + let d = c.as_ref().as_any().downcast_ref::().unwrap(); + + assert_eq!(DataType::Int32, d.value_type()); + + // result should be [[3, 4, 5], null] + assert_eq!(2, d.len()); + assert_eq!(1, d.null_count()); + assert_eq!(true, d.is_null(1)); + + assert_eq!(0, d.value_offset(0)); + assert_eq!(3, d.value_length(0)); + assert_eq!(3, d.value_offset(1)); + assert_eq!(0, d.value_length(1)); + assert_eq!( + Buffer::from(&[3, 4, 5].to_byte_slice()), + d.values().data().buffers()[0].clone() + ); + assert_eq!( + Buffer::from(&[0, 3, 3].to_byte_slice()), + d.data().buffers()[0].clone() + ); + let inner_list = d.value(0); + let inner_list = inner_list.as_any().downcast_ref::().unwrap(); + assert_eq!(3, inner_list.len()); + assert_eq!(0, inner_list.null_count()); + assert_eq!(inner_list, &Int32Array::from(vec![3, 4, 5])); + } } diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index 0429d905824..f395279d818 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -511,10 +511,11 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>( } List(ref list_type) => { let inner_types = get_fb_field_type(list_type, &mut fbb); + let field_name = fbb.create_string("list"); // field schema requires name to be not None let child = ipc::Field::create( &mut fbb, &ipc::FieldArgs { - name: None, + name: Some(field_name), nullable: false, type_type: inner_types.0, type_: Some(inner_types.1), diff --git a/rust/arrow/src/record_batch.rs b/rust/arrow/src/record_batch.rs index 8cfa22515b6..e1aa6f87f36 100644 --- a/rust/arrow/src/record_batch.rs +++ b/rust/arrow/src/record_batch.rs @@ -214,6 +214,25 @@ mod tests { .len(4) .add_buffer(Buffer::from([42, 28, 19, 31].to_byte_slice())) .build(); + + // Construct a value array + let value_data = ArrayData::builder(DataType::Int32) + .len(9) + .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7, 8].to_byte_slice())) + .build(); + + // Construct a buffer for value offsets, for the nested array: + // [[0, 1, 2], [3, 4, 5], [6, 7], [8]] + let value_offsets = Buffer::from(&[0, 3, 6, 8, 9].to_byte_slice()); + + // Construct a list array from the above two + let list_data_type = DataType::List(Box::new(DataType::Int32)); + let list_data = ArrayData::builder(list_data_type.clone()) + .len(4) + .add_buffer(value_offsets.clone()) + .add_child_data(value_data.clone()) + .build(); + let struct_array = StructArray::from(vec![ ( Field::new("b", DataType::Boolean, false), @@ -224,10 +243,14 @@ mod tests { Field::new("c", DataType::Int32, false), Arc::new(Int32Array::from(vec![42, 28, 19, 31])), ), + ( + Field::new("d", DataType::List(Box::new(DataType::Int32)), false), + Arc::new(ListArray::from(list_data.clone())), + ), ]); let batch = RecordBatch::from(&struct_array); - assert_eq!(2, batch.num_columns()); + assert_eq!(3, batch.num_columns()); assert_eq!(4, batch.num_rows()); assert_eq!( struct_array.data_type(), @@ -235,5 +258,96 @@ mod tests { ); assert_eq!(batch.column(0).data(), boolean_data); assert_eq!(batch.column(1).data(), int_data); + assert_eq!(batch.column(2).data(), list_data); + } + + #[test] + fn create_record_batch_with_list_column() { + let schema = Schema::new(vec![Field::new( + "a", + DataType::List(Box::new(DataType::Int32)), + false, + )]); + + // Construct a value array + let value_data = ArrayData::builder(DataType::Int32) + .len(8) + .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) + .build(); + + // Construct a buffer for value offsets, for the nested array: + // [[0, 1, 2], [3, 4, 5], [6, 7]] + let value_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice()); + + // Construct a list array from the above two + let list_data_type = DataType::List(Box::new(DataType::Int32)); + let list_data = ArrayData::builder(list_data_type.clone()) + .len(3) + .add_buffer(value_offsets.clone()) + .add_child_data(value_data.clone()) + .build(); + let a = ListArray::from(list_data); + + let record_batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap(); + + assert_eq!(3, record_batch.num_rows()); + assert_eq!(1, record_batch.num_columns()); + assert_eq!( + &DataType::List(Box::new(DataType::Int32)), + record_batch.schema().field(0).data_type() + ); + assert_eq!(3, record_batch.column(0).data().len()); + } + + #[test] + fn create_record_batch_with_list_column_nulls() { + let schema = Schema::new(vec![Field::new( + "a", + DataType::List(Box::new(DataType::Int32)), + false, + )]); + + let values_builder = PrimitiveBuilder::::new(10); + let mut builder = ListBuilder::new(values_builder); + + builder.values().append_null().unwrap(); + builder.values().append_null().unwrap(); + builder.append(true).unwrap(); + builder.append(false).unwrap(); + builder.append(true).unwrap(); + + // [[null, null], null, []] + let list_array = builder.finish(); + + let record_batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(list_array)]).unwrap(); + + assert_eq!(3, record_batch.num_rows()); + assert_eq!(1, record_batch.num_columns()); + assert_eq!( + &DataType::List(Box::new(DataType::Int32)), + record_batch.schema().field(0).data_type() + ); + assert_eq!(3, record_batch.column(0).data().len()); + + assert_eq!(false, record_batch.column(0).is_null(0)); + assert_eq!(true, record_batch.column(0).is_null(1)); + assert_eq!(false, record_batch.column(0).is_null(2)); + + let col_as_list_array = record_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(2, col_as_list_array.value(0).len()); + assert_eq!(0, col_as_list_array.value(2).len()); + + let sublist_0_val = col_as_list_array.value(0); + let sublist_0 = sublist_0_val.as_any().downcast_ref::().unwrap(); + + assert_eq!(true, sublist_0.is_null(0)); + assert_eq!(true, sublist_0.is_null(1)); } } diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index 6a77c21ec30..49c0bc94a32 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -47,7 +47,7 @@ cli = ["rustyline"] fnv = "1.0" arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT" } parquet = { path = "../parquet", version = "1.0.0-SNAPSHOT" } -sqlparser = "0.2.5" +sqlparser = { git = "https://github.com/urbanlogiq/sqlparser-rs", branch="ul_sqlparser" } clap = "2.33" prettytable-rs = "0.8.0" rustyline = {version = "6.0", optional = true} diff --git a/rust/datafusion/src/execution/physical_plan/expressions.rs b/rust/datafusion/src/execution/physical_plan/expressions.rs index 9969eeddf10..ff3dd6d3033 100644 --- a/rust/datafusion/src/execution/physical_plan/expressions.rs +++ b/rust/datafusion/src/execution/physical_plan/expressions.rs @@ -32,16 +32,16 @@ use arrow::array::{ }; use arrow::array::{ Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, - Int8Builder, StringBuilder, UInt16Builder, UInt32Builder, UInt64Builder, + Int8Builder, ListArray, StringBuilder, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder, }; use arrow::compute; use arrow::compute::kernels::arithmetic::{add, divide, multiply, subtract}; use arrow::compute::kernels::boolean::{and, or}; use arrow::compute::kernels::cast::cast; -use arrow::compute::kernels::comparison::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow::compute::kernels::comparison::{ - eq_utf8, gt_eq_utf8, gt_utf8, like_utf8, lt_eq_utf8, lt_utf8, neq_utf8, nlike_utf8, + contains, contains_utf8, eq, eq_utf8, gt, gt_eq, gt_eq_utf8, gt_utf8, like_utf8, lt, + lt_eq, lt_eq_utf8, lt_utf8, neq, neq_utf8, nlike_utf8, }; use arrow::datatypes::{DataType, Schema, TimeUnit}; use arrow::record_batch::RecordBatch; @@ -918,6 +918,60 @@ macro_rules! compute_op { }}; } +macro_rules! list_array_op { + ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ + match $LEFT.data_type() { + DataType::Int8 => compute_list_array_op!($LEFT, $RIGHT, $OP, Int8Array), + DataType::Int16 => compute_list_array_op!($LEFT, $RIGHT, $OP, Int16Array), + DataType::Int32 => compute_list_array_op!($LEFT, $RIGHT, $OP, Int32Array), + DataType::Int64 => compute_list_array_op!($LEFT, $RIGHT, $OP, Int64Array), + DataType::UInt8 => compute_list_array_op!($LEFT, $RIGHT, $OP, UInt8Array), + DataType::UInt16 => compute_list_array_op!($LEFT, $RIGHT, $OP, UInt16Array), + DataType::UInt32 => compute_list_array_op!($LEFT, $RIGHT, $OP, UInt32Array), + DataType::UInt64 => compute_list_array_op!($LEFT, $RIGHT, $OP, UInt64Array), + DataType::Float32 => compute_list_array_op!($LEFT, $RIGHT, $OP, Float32Array), + DataType::Float64 => compute_list_array_op!($LEFT, $RIGHT, $OP, Float64Array), + DataType::Utf8 => { + compute_list_array_utf8_op!($LEFT, $RIGHT, $OP, StringArray) + } + other => Err(ExecutionError::General(format!( + "Unsupported data type {:?} for Contains operator", + other + ))), + } + }}; +} + +/// Invoke a compute kernel on a primitive array and a ListArray +macro_rules! compute_list_array_op { + ($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{ + let ll = $LEFT + .as_any() + .downcast_ref::<$DT>() + .expect("compute_op failed to downcast array"); + let rr = $RIGHT + .as_any() + .downcast_ref::() + .expect("compute_op failed to downcast array"); + Ok(Arc::new($OP(&ll, &rr)?)) + }}; +} + +/// Invoke a compute kernel on a binary data array and a ListArray +macro_rules! compute_list_array_utf8_op { + ($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{ + let ll = $LEFT + .as_any() + .downcast_ref::<$DT>() + .expect("compute_op failed to downcast array"); + let rr = $RIGHT + .as_any() + .downcast_ref::() + .expect("compute_op failed to downcast array"); + Ok(Arc::new(paste::expr! {[<$OP _utf8>]}(&ll, &rr)?)) + }}; +} + macro_rules! binary_string_array_op { ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ match $LEFT.data_type() { @@ -1026,12 +1080,22 @@ impl PhysicalExpr for BinaryExpr { let left = self.left.evaluate(batch)?; let right = self.right.evaluate(batch)?; if left.data_type() != right.data_type() { - return Err(ExecutionError::General(format!( - "Cannot evaluate binary expression {:?} with types {:?} and {:?}", - self.op, - left.data_type(), - right.data_type() - ))); + let mut invalid_type_combination = true; + if self.op == Operator::Contains { + if let DataType::List(dt) = right.data_type() { + if **dt == left.data_type().clone() { + invalid_type_combination = false; + } + } + } + if invalid_type_combination { + return Err(ExecutionError::General(format!( + "Cannot evaluate binary expression {:?} with types {:?} and {:?}", + self.op, + left.data_type(), + right.data_type() + ))); + } } match &self.op { Operator::Like => binary_string_array_op!(left, right, like), @@ -1042,6 +1106,7 @@ impl PhysicalExpr for BinaryExpr { Operator::GtEq => binary_array_op!(left, right, gt_eq), Operator::Eq => binary_array_op!(left, right, eq), Operator::NotEq => binary_array_op!(left, right, neq), + Operator::Contains => list_array_op!(left, right, contains), Operator::Plus => binary_primitive_array_op!(left, right, add), Operator::Minus => binary_primitive_array_op!(left, right, subtract), Operator::Multiply => binary_primitive_array_op!(left, right, multiply), @@ -1269,7 +1334,8 @@ mod tests { use super::*; use crate::error::Result; use crate::execution::physical_plan::common::get_scalar_value; - use arrow::array::{PrimitiveArray, StringArray, Time64NanosecondArray}; + use arrow::array::{ArrayData, PrimitiveArray, StringArray, Time64NanosecondArray}; + use arrow::buffer::Buffer; use arrow::datatypes::*; #[test] @@ -1302,6 +1368,56 @@ mod tests { Ok(()) } + #[test] + fn contains_comparison() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::List(Box::new(DataType::Int32)), false), + ]); + // Construct a value array + let value_data = ArrayData::builder(DataType::Int32) + .len(8) + .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) + .build(); + + // Construct a buffer for value offsets, for the nested array: + // [[0, 1, 2], [3, 4, 5], [6, 7]] + let value_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice()); + + // Construct a list array from the above two + let list_data_type = DataType::List(Box::new(DataType::Int32)); + let list_data = ArrayData::builder(list_data_type.clone()) + .len(3) + .add_buffer(value_offsets.clone()) + .add_child_data(value_data.clone()) + .build(); + + let a = Int32Array::from(vec![Some(1), Some(1), Some(1)]); + let b = ListArray::from(list_data); + + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(a), Arc::new(b)], + )?; + + // expression: "a >] b" + let contains = binary(col(0), Operator::Contains, col(1)); + + let result = contains.evaluate(&batch)?; + assert_eq!(result.len(), 3); + + let expected = vec![true, false, false]; + let result = result + .as_any() + .downcast_ref::() + .expect("failed to downcast to BooleanArray"); + for i in 0..3 { + assert_eq!(result.value(i), expected[i]); + } + + Ok(()) + } + #[test] fn binary_nested() -> Result<()> { let schema = Schema::new(vec![ diff --git a/rust/datafusion/src/logicalplan.rs b/rust/datafusion/src/logicalplan.rs index 0b9464c1588..5bbf64a79ae 100644 --- a/rust/datafusion/src/logicalplan.rs +++ b/rust/datafusion/src/logicalplan.rs @@ -110,6 +110,8 @@ pub enum Operator { Divide, /// Remainder operator, like `%` Modulus, + /// Contains operator, like `>]` + Contains, /// Logical AND, like `&&` And, /// Logical OR, like `||` diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs index 9310c4c4195..c71332981a7 100644 --- a/rust/datafusion/src/optimizer/projection_push_down.rs +++ b/rust/datafusion/src/optimizer/projection_push_down.rs @@ -56,6 +56,9 @@ impl ProjectionPushDown { // collect all columns referenced by projection expressions utils::exprlist_to_column_indices(&expr, accum)?; + // push projection down + let input = self.optimize_plan(&input, accum, mapping)?; + LogicalPlanBuilder::from(&self.optimize_plan(&input, accum, mapping)?) .project(self.rewrite_expr_list(expr, mapping)?)? .build() diff --git a/rust/datafusion/src/optimizer/type_coercion.rs b/rust/datafusion/src/optimizer/type_coercion.rs index b8d90a90910..7ed1ed0bb36 100644 --- a/rust/datafusion/src/optimizer/type_coercion.rs +++ b/rust/datafusion/src/optimizer/type_coercion.rs @@ -98,12 +98,23 @@ fn rewrite_expr(expr: &Expr, schema: &Schema) -> Result { right: Arc::new(right), }) } else { - let super_type = utils::get_supertype(&left_type, &right_type)?; - Ok(Expr::BinaryExpr { - left: Arc::new(left.cast_to(&super_type, schema)?), - op: op.clone(), - right: Arc::new(right.cast_to(&super_type, schema)?), - }) + match op { + Operator::Contains => { + return Ok(Expr::BinaryExpr { + left: Arc::new(left.cast_to(&left_type, schema)?), + op: op.clone(), + right: Arc::new(right.cast_to(&right_type, schema)?), + }); + } + _ => { + let super_type = utils::get_supertype(&left_type, &right_type)?; + return Ok(Expr::BinaryExpr { + left: Arc::new(left.cast_to(&super_type, schema)?), + op: op.clone(), + right: Arc::new(right.cast_to(&super_type, schema)?), + }); + } + }; } } Expr::IsNull(e) => Ok(Expr::IsNull(Arc::new(rewrite_expr(e, schema)?))), diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index 714be1ebd0f..dac0e08f4cb 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -340,6 +340,7 @@ impl SqlToRel { SQLOperator::Not => Operator::Not, SQLOperator::Like => Operator::Like, SQLOperator::NotLike => Operator::NotLike, + SQLOperator::Contains => Operator::Contains, }; match operator { diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 293a2313768..babfb58f389 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -113,6 +113,50 @@ fn parquet_single_nan_schema() { } } +#[test] +fn parquet_query_int_array() { + //TO DO: this test file is not part of parquet-testing submodule (Morgan, 16/03/2020) + let mut ctx = ExecutionContext::new(); + let testdata = env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined"); + ctx.register_parquet( + "int_array_test", + &format!("{}/int_array_test_file.parquet", testdata), + ) + .unwrap(); + let sql = "SELECT int_array FROM int_array_test WHERE 7006 >] int_array"; + let plan = ctx.create_logical_plan(&sql).unwrap(); + let plan = ctx.optimize(&plan).unwrap(); + let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap(); + let results = ctx.collect(plan.as_ref()).unwrap(); + for batch in results { + assert_eq!(2, batch.num_rows()); + assert_eq!(1, batch.num_columns()); + } +} + +#[test] +fn string_parquet_query_array() { + //TODO: this test file is not part of parquet-testing submodule (Morgan, 16/03/2020) + let mut ctx = ExecutionContext::new(); + let testdata = env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined"); + ctx.register_parquet( + "string_array_table", + &format!("{}/string_array.parquet", testdata), + ) + .unwrap(); + let sql = "SELECT string_array FROM string_array_table WHERE 'abc' >] string_array"; + let plan = ctx.create_logical_plan(&sql).expect("logical"); + let plan = ctx.optimize(&plan).expect("optimize"); + let plan = ctx + .create_physical_plan(&plan, DEFAULT_BATCH_SIZE) + .expect("physical"); + let results = ctx.collect(plan.as_ref()).expect("record batches"); + for batch in results { + assert_eq!(8, batch.num_rows()); + assert_eq!(1, batch.num_columns()); + } +} + #[test] fn csv_count_star() { let mut ctx = ExecutionContext::new(); diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 342e3642c92..06173fb6be4 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -26,13 +26,6 @@ use std::slice::from_raw_parts_mut; use std::sync::Arc; use std::vec::Vec; -use arrow::array::{ - ArrayDataBuilder, ArrayDataRef, ArrayRef, BooleanBufferBuilder, BufferBuilderTrait, - Int16BufferBuilder, StructArray, -}; -use arrow::buffer::{Buffer, MutableBuffer}; -use arrow::datatypes::{DataType as ArrowType, Field, IntervalUnit}; - use crate::arrow::converter::{ BinaryConverter, BoolConverter, Converter, Float32Converter, Float64Converter, Int16Converter, Int32Converter, Int64Converter, Int8Converter, Int96Converter, @@ -53,7 +46,38 @@ use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr, }; use crate::schema::visitor::TypeVisitor; +use arrow::array::{ + Array, ArrayData, ArrayDataBuilder, ArrayDataRef, ArrayRef, BinaryArray, + BinaryBuilder, BooleanBufferBuilder, BufferBuilderTrait, FixedSizeBinaryArray, + FixedSizeBinaryBuilder, Int16BufferBuilder, ListArray, ListBuilder, PrimitiveArray, + PrimitiveBuilder, StringArray, StringBuilder, StructArray, +}; +use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::datatypes::{ + BooleanType as ArrowBooleanType, DataType as ArrowType, + Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type, + DurationMicrosecondType as ArrowDurationMicrosecondType, + DurationMillisecondType as ArrowDurationMillisecondType, + DurationNanosecondType as ArrowDurationNanosecondType, + DurationSecondType as ArrowDurationSecondType, Field, + Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type, + Int16Type as ArrowInt16Type, Int32Type as ArrowInt32Type, + Int64Type as ArrowInt64Type, Int8Type as ArrowInt8Type, IntervalUnit, + Time32MillisecondType as ArrowTime32MillisecondType, + Time32SecondType as ArrowTime32SecondType, + Time64MicrosecondType as ArrowTime64MicrosecondType, + Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit, + TimestampMicrosecondType as ArrowTimestampMicrosecondType, + TimestampMillisecondType as ArrowTimestampMillisecondType, + TimestampNanosecondType as ArrowTimestampNanosecondType, + TimestampSecondType as ArrowTimestampSecondType, ToByteSlice, + UInt16Type as ArrowUInt16Type, UInt32Type as ArrowUInt32Type, + UInt64Type as ArrowUInt64Type, UInt8Type as ArrowUInt8Type, +}; + +use arrow::util::bit_util; use std::any::Any; +use std::convert::TryFrom; /// Array reader reads parquet data into arrow array. pub trait ArrayReader { @@ -136,7 +160,6 @@ impl ArrayReader for PrimitiveArrayReader { let records_read_once = self.record_reader.read_records(records_to_read)?; records_read = records_read + records_read_once; - // Record reader exhausted if records_read_once < records_to_read { if let Some(page_reader) = self.pages.next() { @@ -468,6 +491,452 @@ where } } +/// Implementation of struct array reader. +pub struct ListArrayReader { + item_reader: Box, + data_type: ArrowType, + item_type: ArrowType, + list_def_level: i16, + list_rep_level: i16, + def_level_buffer: Option, + rep_level_buffer: Option, +} + +impl ListArrayReader { + /// Construct struct array reader. + pub fn new( + item_reader: Box, + data_type: ArrowType, + item_type: ArrowType, + def_level: i16, + rep_level: i16, + ) -> Self { + Self { + item_reader, + data_type, + item_type, + list_def_level: def_level, + list_rep_level: rep_level, + def_level_buffer: None, + rep_level_buffer: None, + } + } +} + +macro_rules! build_empty_list_array_with_primitive_items { + ($item_type:ident) => {{ + let values_builder = PrimitiveBuilder::<$item_type>::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +macro_rules! build_empty_list_array_with_non_primitive_items { + ($builder:ident) => {{ + let values_builder = $builder::new(0); + let mut builder = ListBuilder::new(values_builder); + let empty_list_array = builder.finish(); + Ok(Arc::new(empty_list_array)) + }}; +} + +fn build_empty_list_array(item_type: ArrowType) -> Result { + match item_type { + ArrowType::UInt8 => build_empty_list_array_with_primitive_items!(ArrowUInt8Type), + ArrowType::UInt16 => { + build_empty_list_array_with_primitive_items!(ArrowUInt16Type) + } + ArrowType::UInt32 => { + build_empty_list_array_with_primitive_items!(ArrowUInt32Type) + } + ArrowType::UInt64 => { + build_empty_list_array_with_primitive_items!(ArrowUInt64Type) + } + ArrowType::Int8 => build_empty_list_array_with_primitive_items!(ArrowInt8Type), + ArrowType::Int16 => build_empty_list_array_with_primitive_items!(ArrowInt16Type), + ArrowType::Int32 => build_empty_list_array_with_primitive_items!(ArrowInt32Type), + ArrowType::Int64 => build_empty_list_array_with_primitive_items!(ArrowInt64Type), + ArrowType::Float32 => { + build_empty_list_array_with_primitive_items!(ArrowFloat32Type) + } + ArrowType::Float64 => { + build_empty_list_array_with_primitive_items!(ArrowFloat64Type) + } + ArrowType::Boolean => { + build_empty_list_array_with_primitive_items!(ArrowBooleanType) + } + ArrowType::Date32(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate32Type) + } + ArrowType::Date64(_) => { + build_empty_list_array_with_primitive_items!(ArrowDate64Type) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowTime32SecondType) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + build_empty_list_array_with_primitive_items!(ArrowDurationSecondType) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType) + } + ArrowType::Utf8 => { + build_empty_list_array_with_non_primitive_items!(StringBuilder) + } + ArrowType::Binary => { + build_empty_list_array_with_non_primitive_items!(BinaryBuilder) + } + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +macro_rules! remove_primitive_array_indices { + ($arr: expr, $item_type:ty, $indices:expr) => {{ + let mut new_data_vec = Vec::new(); + let array_data = match $arr.as_any().downcast_ref::>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + new_data_vec.push(None); + } else { + new_data_vec.push(Some(array_data.value(i).into())); + } + } + } + Ok(Arc::new(>::from(new_data_vec))) + }}; +} + +macro_rules! remove_timestamp_array_indices { + ($arr: expr, $array_type:ty, $indices:expr, $time_unit:expr) => {{ + let mut new_data_vec = Vec::new(); + let array_data = match $arr.as_any().downcast_ref::>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray<>", $arr))), + }; + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + new_data_vec.push(None); + } else { + new_data_vec.push(Some(array_data.value(i).into())); + } + } + } + Ok(Arc::new(>::from_opt_vec(new_data_vec, $time_unit))) + }}; +} + +macro_rules! remove_binary_array_indices { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = BinaryBuilder::new(array_data.len()); + + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_fixed_size_binary_array_indices { + ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, $len:expr) => {{ + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), + }; + let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len); + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + builder.append_null()?; + } else { + builder.append_value(array_data.value(i))?; + } + } + } + Ok(Arc::new(builder.finish())) + }}; +} + +macro_rules! remove_string_array_indices { + ($arr: expr, $array_type:ty, $indices:expr) => {{ + let mut new_data_vec = Vec::new(); + let array_data = match $arr.as_any().downcast_ref::<$array_type>() { + Some(a) => a, + _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray<>", $arr))), + }; + for i in 0..array_data.len() { + if !$indices.contains(&i) { + if array_data.is_null(i) { + new_data_vec.push(None); + } else { + new_data_vec.push(Some(array_data.value(i).into())); + } + } + } + match <$array_type>::try_from(new_data_vec.clone()) { + Ok(a) => Ok(Arc::new(a)), + _ => Err(ParquetError::General(format!("Error generating next batch for ListArray: non-primitive Arrow array cannot be built from {:?}", new_data_vec))) + } + }}; +} + +fn remove_indices( + arr: ArrayRef, + item_type: ArrowType, + indices: Vec, +) -> Result { + match item_type { + ArrowType::UInt8 => remove_primitive_array_indices!(arr, ArrowUInt8Type, indices), + ArrowType::UInt16 => { + remove_primitive_array_indices!(arr, ArrowUInt16Type, indices) + } + ArrowType::UInt32 => { + remove_primitive_array_indices!(arr, ArrowUInt32Type, indices) + } + ArrowType::UInt64 => { + remove_primitive_array_indices!(arr, ArrowUInt64Type, indices) + } + ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, indices), + ArrowType::Int16 => remove_primitive_array_indices!(arr, ArrowInt16Type, indices), + ArrowType::Int32 => remove_primitive_array_indices!(arr, ArrowInt32Type, indices), + ArrowType::Int64 => remove_primitive_array_indices!(arr, ArrowInt64Type, indices), + ArrowType::Float32 => { + remove_primitive_array_indices!(arr, ArrowFloat32Type, indices) + } + ArrowType::Float64 => { + remove_primitive_array_indices!(arr, ArrowFloat64Type, indices) + } + ArrowType::Boolean => { + remove_primitive_array_indices!(arr, ArrowBooleanType, indices) + } + ArrowType::Date32(_) => { + remove_primitive_array_indices!(arr, ArrowDate32Type, indices) + } + ArrowType::Date64(_) => { + remove_primitive_array_indices!(arr, ArrowDate64Type, indices) + } + ArrowType::Time32(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowTime32SecondType, indices) + } + ArrowType::Time32(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, indices) + } + ArrowType::Time64(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Second) => { + remove_primitive_array_indices!(arr, ArrowDurationSecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Millisecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Microsecond) => { + remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, indices) + } + ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { + remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, indices) + } + ArrowType::Timestamp(ArrowTimeUnit::Second, time_unit) => { + remove_timestamp_array_indices!( + arr, + ArrowTimestampSecondType, + indices, + time_unit + ) + } + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, time_unit) => { + remove_timestamp_array_indices!( + arr, + ArrowTimestampMillisecondType, + indices, + time_unit + ) + } + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, time_unit) => { + remove_timestamp_array_indices!( + arr, + ArrowTimestampMicrosecondType, + indices, + time_unit + ) + } + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, time_unit) => { + remove_timestamp_array_indices!( + arr, + ArrowTimestampNanosecondType, + indices, + time_unit + ) + } + ArrowType::Utf8 => remove_string_array_indices!(arr, StringArray, indices), + ArrowType::Binary => { + remove_binary_array_indices!(arr, BinaryArray, BinaryBuilder, indices) + } + ArrowType::FixedSizeBinary(size) => remove_fixed_size_binary_array_indices!( + arr, + FixedSizeBinaryArray, + FixedSizeBinaryBuilder, + indices, + size + ), + _ => Err(ParquetError::General(format!( + "ListArray of type List({:?}) is not supported by array_reader", + item_type + ))), + } +} + +impl ArrayReader for ListArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns data type. + /// This must be a List. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result { + let next_batch_array = self.item_reader.next_batch(batch_size).unwrap(); + let item_type = self.item_reader.get_data_type().clone(); + + if next_batch_array.len() == 0 { + return build_empty_list_array(item_type); + } + let def_levels = self.item_reader.get_def_levels().unwrap(); + let rep_levels = self.item_reader.get_rep_levels().unwrap(); + + if !((def_levels.len() == rep_levels.len()) + && (rep_levels.len() == next_batch_array.len())) + { + return Err(ArrowError( + "Expected item_reader def_level and rep_level arrays to have the same length as batch array".to_string(), + )); + } + + // Need to remove from the values array the nulls that represent null lists rather than null items + // null lists have def_level = 0 + let mut null_list_indices: Vec = Vec::new(); + for i in 0..def_levels.len() { + if def_levels[i] == 0 { + null_list_indices.push(i); + } + } + let batch_values = match null_list_indices.len() { + 0 => next_batch_array.clone(), + _ => remove_indices(next_batch_array.clone(), item_type, null_list_indices)?, + }; + + // null list has def_level = 0 + // empty list has def_level = 1 + // null item in a list has def_level = 2 + // non-null item has def_level = 3 + // first item in each list has rep_level = 0, subsequent items have rep_level = 1 + + let mut offsets = Vec::new(); + let mut cur_offset = 0; + for i in 0..rep_levels.len() { + if rep_levels[i] == (0 as i16) { + offsets.push(cur_offset) + } + if def_levels[i] > 0 { + cur_offset = cur_offset + 1; + } + } + offsets.push(cur_offset); + + let num_bytes = bit_util::ceil(offsets.len(), 8); + let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + let null_slice = null_buf.data_mut(); + let mut list_index = 0; + for i in 0..rep_levels.len() { + if rep_levels[i] == (0 as i16) && def_levels[i] != (0 as i16) { + bit_util::set_bit(null_slice, list_index); + } + if rep_levels[i] == (0 as i16) { + list_index = list_index + 1; + } + } + let value_offsets = Buffer::from(&offsets.to_byte_slice()); + + // null list has def_level = 0 + let null_count = def_levels.iter().filter(|x| x == &&(0 as i16)).count(); + + let list_data = ArrayData::builder(self.get_data_type().clone()) + .len(offsets.len() - 1) + .add_buffer(value_offsets.clone()) + .add_child_data(batch_values.data()) + .null_bit_buffer(null_buf.freeze()) + .null_count(null_count) + .offset(next_batch_array.offset()) + .build(); + + let result_array = ListArray::from(list_data); + return Ok(Arc::new(result_array)); + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_level_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_level_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } +} + /// Implementation of struct array reader. pub struct StructArrayReader { children: Vec>, @@ -536,6 +1005,9 @@ impl ArrayReader for StructArrayReader { .children .iter_mut() .map(|reader| reader.next_batch(batch_size)) + .map(|batch| { + return batch; + }) .try_fold( Vec::new(), |mut result, child_array| -> Result> { @@ -573,7 +1045,9 @@ impl ArrayReader for StructArrayReader { for child in &self.children { if let Some(current_child_def_levels) = child.get_def_levels() { - if current_child_def_levels.len() != children_array_len { + if current_child_def_levels.len() != children_array_len + && children_array.len() != 1 + { return Err(general_err!("Child array length are not equal!")); } else { for i in 0..children_array_len { @@ -627,7 +1101,9 @@ impl ArrayReader for StructArrayReader { self.def_level_buffer = Some(def_level_data_buffer.freeze()); self.rep_level_buffer = rep_level_data; - Ok(Arc::new(StructArray::from(array_data))) + let result_array = StructArray::from(array_data); + + return Ok(Arc::new(result_array)); } fn get_def_levels(&self) -> Option<&[i16]> { @@ -712,8 +1188,6 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext for ArrayReaderBuilder { /// Build array reader for primitive type. - /// Currently we don't have a list reader implementation, so repeated type is not - /// supported yet. fn visit_primitive( &mut self, cur_type: TypePtr, @@ -721,8 +1195,9 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext ) -> Result>> { if self.is_included(cur_type.as_ref()) { let mut new_context = context.clone(); - new_context.path.append(vec![cur_type.name().to_string()]); - + if cur_type.name().to_string() != "item" { + new_context.path.append(vec![cur_type.name().to_string()]); + } match cur_type.get_basic_info().repetition() { Repetition::REPEATED => { new_context.def_level += 1; @@ -733,10 +1208,8 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext } _ => (), } - let reader = self.build_for_primitive_type_inner(cur_type.clone(), &new_context)?; - if cur_type.get_basic_info().repetition() == Repetition::REPEATED { Err(ArrowError( "Reading repeated field is not supported yet!".to_string(), @@ -799,16 +1272,67 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext } /// Build array reader for list type. - /// Currently this is not supported. + /// List type is a special case of struct type (see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md) fn visit_list_with_item( &mut self, - _list_type: Rc, + list_type: Rc, _item_type: &Type, - _context: &'a ArrayReaderBuilderContext, + context: &'a ArrayReaderBuilderContext, ) -> Result>> { - Err(ArrowError( - "Reading parquet list array into arrow is not supported yet!".to_string(), - )) + let mut new_context = context.clone(); + + let list_child = &list_type.get_fields()[0]; + let item_child = &list_child.get_fields()[0]; + + new_context.path.append(vec![list_type.name().to_string()]); + + match list_type.get_basic_info().repetition() { + Repetition::REPEATED => { + new_context.def_level += 1; + new_context.rep_level += 1; + } + Repetition::OPTIONAL => { + new_context.def_level += 1; + } + _ => (), + } + + match list_child.get_basic_info().repetition() { + Repetition::REPEATED => { + new_context.def_level += 1; + new_context.rep_level += 1; + } + Repetition::OPTIONAL => { + new_context.def_level += 1; + } + _ => (), + } + + let item_reader = self + .dispatch(item_child.clone(), &new_context) + .unwrap() + .unwrap(); + let item_type = item_reader.get_data_type().clone(); + + match item_type { + ArrowType::List(_) + | ArrowType::FixedSizeList(_, _) + | ArrowType::Struct(_) + | ArrowType::Dictionary(_, _) => Err(ArrowError(format!( + "reading List({:?}) into arrow not supported yet", + item_type + ))), + _ => { + let arrow_type = ArrowType::List(Box::new(item_type.clone())); + Ok(Some(Box::new(ListArrayReader::new( + item_reader, + arrow_type, + item_type, + new_context.def_level, + new_context.rep_level, + )))) + } + } } } @@ -959,10 +1483,10 @@ mod tests { use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; use crate::util::test_common::page_util::InMemoryPageIterator; use crate::util::test_common::{get_test_file, make_pages}; - use arrow::array::{Array, ArrayRef, PrimitiveArray, StructArray}; + use arrow::array::{Array, ArrayRef, ListArray, PrimitiveArray, StructArray}; use arrow::datatypes::{ - DataType as ArrowType, Field, Int32Type as ArrowInt32, UInt32Type as ArrowUInt32, - UInt64Type as ArrowUInt64, + DataType as ArrowType, Field, Int32Type as ArrowInt32, Int64Type as ArrowInt64, + UInt32Type as ArrowUInt32, UInt64Type as ArrowUInt64, }; use rand::distributions::uniform::SampleUniform; use std::any::Any; @@ -1368,7 +1892,7 @@ mod tests { 1, ); - let struct_array = struct_array_reader.next_batch(5).unwrap(); + let struct_array = struct_array_reader.next_batch(1024).unwrap(); let struct_array = struct_array.as_any().downcast_ref::().unwrap(); assert_eq!(5, struct_array.len()); @@ -1388,25 +1912,46 @@ mod tests { ); } - #[test] - fn test_create_array_reader() { - let file = get_test_file("nulls.snappy.parquet"); + #[test] //TODO: this test file is not in the testing submodule yet (Morgan 03-18-2020) + fn test_create_list_array_reader() { + let file = get_test_file("tiny_int_array.parquet"); let file_reader = Rc::new(SerializedFileReader::new(file).unwrap()); - - let array_reader = build_array_reader( + let mut array_reader = build_array_reader( file_reader.metadata().file_metadata().schema_descr_ptr(), vec![0usize].into_iter(), file_reader, ) .unwrap(); - // Create arrow types let arrow_type = ArrowType::Struct(vec![Field::new( - "b_struct", - ArrowType::Struct(vec![Field::new("b_c_int", ArrowType::Int32, true)]), + "int_array", + ArrowType::List(Box::new(ArrowType::Int64)), true, )]); - assert_eq!(array_reader.get_data_type(), &arrow_type); + + let next_batch = array_reader.next_batch(1024).unwrap(); + + // parquets with primitive columns also produce a StructArray -- each struct is a field + let struct_array = next_batch.as_any().downcast_ref::().unwrap(); + let array = struct_array + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(8, array.len()); + + let sublist_0 = array.value(0); + let sublist_0 = sublist_0 + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!(2, sublist_0.len()); + assert_eq!(false, sublist_0.is_null(0)); + assert_eq!(false, sublist_0.is_null(1)); + assert_eq!(7000, sublist_0.value(0)); + assert_eq!(7001, sublist_0.value(1)); } } From 3fb8fb5752ca6254f830ca7fed4e6f5a5bf53dc4 Mon Sep 17 00:00:00 2001 From: Morgan Cassels Date: Mon, 30 Mar 2020 15:25:00 -0700 Subject: [PATCH 02/13] cleanup and fix tests --- rust/arrow/src/compute/kernels/comparison.rs | 247 ------------------ rust/datafusion/Cargo.toml | 2 +- .../execution/physical_plan/expressions.rs | 136 +--------- rust/datafusion/src/logicalplan.rs | 2 - .../datafusion/src/optimizer/type_coercion.rs | 23 +- rust/datafusion/src/sql/planner.rs | 3 +- rust/datafusion/tests/sql.rs | 117 ++++++--- rust/parquet/src/arrow/array_reader.rs | 192 +++++--------- 8 files changed, 172 insertions(+), 550 deletions(-) diff --git a/rust/arrow/src/compute/kernels/comparison.rs b/rust/arrow/src/compute/kernels/comparison.rs index b282d5cacea..aa5000f5a53 100644 --- a/rust/arrow/src/compute/kernels/comparison.rs +++ b/rust/arrow/src/compute/kernels/comparison.rs @@ -31,7 +31,6 @@ use crate::buffer::{Buffer, MutableBuffer}; use crate::compute::util::apply_bin_op_to_option_bitmap; use crate::datatypes::{ArrowNumericType, BooleanType, DataType}; use crate::error::{ArrowError, Result}; -use crate::util::bit_util; /// Helper function to perform boolean lambda function on values from two arrays, this /// version does not attempt to use SIMD. @@ -357,150 +356,10 @@ fn new_all_set_buffer(len: usize) -> Buffer { buffer } -pub fn contains(left: &PrimitiveArray, right: &ListArray) -> Result -where - T: ArrowNumericType, -{ - if left.len() != right.len() { - return Err(ArrowError::ComputeError( - "Cannot perform comparison operation on arrays of different length" - .to_string(), - )); - } - - let not_both_null_bit_buffer = match apply_bin_op_to_option_bitmap( - left.data().null_bitmap(), - right.data().null_bitmap(), - |a, b| a | b, - ) { - Ok(both_null_buff) => match both_null_buff { - Some(buff) => buff, - _ => new_all_set_buffer(left.len()), - }, - _ => new_all_set_buffer(left.len()), - }; - let not_both_null_bitmap = not_both_null_bit_buffer.data(); - - let left_data = left.data(); - let left_null_bitmap = match left_data.null_bitmap() { - Some(bitmap) => bitmap.clone().to_buffer(), - _ => new_all_set_buffer(left.len()), - }; - let left_null_bitmap = left_null_bitmap.data(); - - let mut result = BooleanBufferBuilder::new(left.len()); - - for i in 0..left.len() { - let mut is_in = false; - - // contains(null, null) = false - if bit_util::get_bit(not_both_null_bitmap, i) { - let list = right.value(i); - - // contains(null, [null]) = true - if !bit_util::get_bit(left_null_bitmap, i) { - if list.null_count() > 0 { - is_in = true; - } - } else { - let list = list.as_any().downcast_ref::>().unwrap(); - - for j in 0..list.len() { - if list.is_valid(j) && (left.value(i) == list.value(j)) { - is_in = true; - } - } - } - } - result.append(is_in)?; - } - - let data = ArrayData::new( - DataType::Boolean, - left.len(), - None, - None, - left.offset(), - vec![result.finish()], - vec![], - ); - Ok(PrimitiveArray::::from(Arc::new(data))) -} - -pub fn contains_utf8(left: &StringArray, right: &ListArray) -> Result { - if left.len() != right.len() { - return Err(ArrowError::ComputeError( - "Cannot perform comparison operation on arrays of different length" - .to_string(), - )); - } - - let not_both_null_bit_buffer = match apply_bin_op_to_option_bitmap( - left.data().null_bitmap(), - right.data().null_bitmap(), - |a, b| a | b, - ) { - Ok(both_null_buff) => match both_null_buff { - Some(buff) => buff, - _ => new_all_set_buffer(left.len()), - }, - _ => new_all_set_buffer(left.len()), - }; - let not_both_null_bitmap = not_both_null_bit_buffer.data(); - - let left_data = left.data(); - let left_null_bitmap = match left_data.null_bitmap() { - Some(bitmap) => bitmap.clone().to_buffer(), - _ => new_all_set_buffer(left.len()), - }; - let left_null_bitmap = left_null_bitmap.data(); - - let mut result = BooleanBufferBuilder::new(left.len()); - - for i in 0..left.len() { - let mut is_in = false; - - // contains(null, null) = false - if bit_util::get_bit(not_both_null_bitmap, i) { - let list = right.value(i); - - // contains(null, [null]) = true - if !bit_util::get_bit(left_null_bitmap, i) { - if list.null_count() > 0 { - is_in = true; - } - } else { - let list = list.as_any().downcast_ref::().unwrap(); - - for j in 0..list.len() { - if list.is_valid(j) && (left.value(i) == list.value(j)) { - is_in = true; - } - } - } - } - result.append(is_in)?; - } - - let data = ArrayData::new( - DataType::Boolean, - left.len(), - None, - None, - left.offset(), - vec![result.finish()], - vec![], - ); - Ok(PrimitiveArray::::from(Arc::new(data))) -} - #[cfg(test)] mod tests { use super::*; use crate::array::Int32Array; - use crate::buffer::Buffer; - use crate::datatypes::ToByteSlice; - use std::convert::TryFrom; #[test] fn test_primitive_array_eq() { @@ -614,112 +473,6 @@ mod tests { assert_eq!(true, c.value(2)); } - // Expected behaviour: - // contains(1, [1, 2, null]) = true - // contains(3, [1, 2, null]) = false - // contains(null, [1, 2, null]) = true - // contains(null, null) = false - #[test] - fn test_contains() { - let value_data = Int32Array::from(vec![ - Some(0), - Some(1), - Some(2), - Some(3), - Some(4), - Some(5), - Some(6), - None, - Some(7), - ]) - .data(); - let value_offsets = Buffer::from(&[0, 3, 6, 6, 9].to_byte_slice()); - let list_data_type = DataType::List(Box::new(DataType::Int32)); - let list_data = ArrayData::builder(list_data_type) - .len(4) - .add_buffer(value_offsets) - .null_count(1) - .add_child_data(value_data) - .null_bit_buffer(Buffer::from([0b00001011])) - .build(); - - // [[0, 1, 2], [3, 4, 5], null, [6, null, 7]] - let list_array = ListArray::from(list_data); - - let nulls = Int32Array::from(vec![None, None, None, None]); - let nulls_result = contains(&nulls, &list_array).unwrap(); - assert_eq!( - nulls_result - .as_any() - .downcast_ref::() - .unwrap(), - &BooleanArray::from(vec![false, false, false, true]), - ); - - let values = Int32Array::from(vec![Some(0), Some(0), Some(0), Some(0)]); - let values_result = contains(&values, &list_array).unwrap(); - assert_eq!( - values_result - .as_any() - .downcast_ref::() - .unwrap(), - &BooleanArray::from(vec![true, false, false, false]), - ); - } - - // Expected behaviour: - // contains("ab", ["ab", "cd", null]) = true - // contains("ef", ["ab", "cd", null]) = false - // contains(null, ["ab", "cd", null]) = true - // contains(null, null) = false - #[test] - fn test_contains_utf8() { - let values_builder = StringBuilder::new(10); - let mut builder = ListBuilder::new(values_builder); - - builder.values().append_value("Lorem").unwrap(); - builder.values().append_value("ipsum").unwrap(); - builder.values().append_null().unwrap(); - builder.append(true).unwrap(); - builder.values().append_value("sit").unwrap(); - builder.values().append_value("amet").unwrap(); - builder.values().append_value("Lorem").unwrap(); - builder.append(true).unwrap(); - builder.append(false).unwrap(); - builder.values().append_value("ipsum").unwrap(); - builder.append(true).unwrap(); - - // [["Lorem", "ipsum", null], ["sit", "amet", "Lorem"], null, ["ipsum"]] - // value_offsets = [0, 3, 6, 6] - let list_array = builder.finish(); - - let nulls = StringArray::try_from(vec![None, None, None, None]).unwrap(); - let nulls_result = contains_utf8(&nulls, &list_array).unwrap(); - assert_eq!( - nulls_result - .as_any() - .downcast_ref::() - .unwrap(), - &BooleanArray::from(vec![true, false, false, false]), - ); - - let values = StringArray::try_from(vec![ - Some("Lorem"), - Some("Lorem"), - Some("Lorem"), - Some("Lorem"), - ]) - .unwrap(); - let values_result = contains_utf8(&values, &list_array).unwrap(); - assert_eq!( - values_result - .as_any() - .downcast_ref::() - .unwrap(), - &BooleanArray::from(vec![true, true, false, false]), - ); - } - macro_rules! test_utf8 { ($test_name:ident, $left:expr, $right:expr, $op:expr, $expected:expr) => { #[test] diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index 49c0bc94a32..6a77c21ec30 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -47,7 +47,7 @@ cli = ["rustyline"] fnv = "1.0" arrow = { path = "../arrow", version = "1.0.0-SNAPSHOT" } parquet = { path = "../parquet", version = "1.0.0-SNAPSHOT" } -sqlparser = { git = "https://github.com/urbanlogiq/sqlparser-rs", branch="ul_sqlparser" } +sqlparser = "0.2.5" clap = "2.33" prettytable-rs = "0.8.0" rustyline = {version = "6.0", optional = true} diff --git a/rust/datafusion/src/execution/physical_plan/expressions.rs b/rust/datafusion/src/execution/physical_plan/expressions.rs index ff3dd6d3033..08ce7e62695 100644 --- a/rust/datafusion/src/execution/physical_plan/expressions.rs +++ b/rust/datafusion/src/execution/physical_plan/expressions.rs @@ -32,7 +32,7 @@ use arrow::array::{ }; use arrow::array::{ Float32Builder, Float64Builder, Int16Builder, Int32Builder, Int64Builder, - Int8Builder, ListArray, StringBuilder, UInt16Builder, UInt32Builder, UInt64Builder, + Int8Builder, StringBuilder, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder, }; use arrow::compute; @@ -40,8 +40,8 @@ use arrow::compute::kernels::arithmetic::{add, divide, multiply, subtract}; use arrow::compute::kernels::boolean::{and, or}; use arrow::compute::kernels::cast::cast; use arrow::compute::kernels::comparison::{ - contains, contains_utf8, eq, eq_utf8, gt, gt_eq, gt_eq_utf8, gt_utf8, like_utf8, lt, - lt_eq, lt_eq_utf8, lt_utf8, neq, neq_utf8, nlike_utf8, + eq, eq_utf8, gt, gt_eq, gt_eq_utf8, gt_utf8, like_utf8, lt, lt_eq, lt_eq_utf8, + lt_utf8, neq, neq_utf8, nlike_utf8, }; use arrow::datatypes::{DataType, Schema, TimeUnit}; use arrow::record_batch::RecordBatch; @@ -918,60 +918,6 @@ macro_rules! compute_op { }}; } -macro_rules! list_array_op { - ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ - match $LEFT.data_type() { - DataType::Int8 => compute_list_array_op!($LEFT, $RIGHT, $OP, Int8Array), - DataType::Int16 => compute_list_array_op!($LEFT, $RIGHT, $OP, Int16Array), - DataType::Int32 => compute_list_array_op!($LEFT, $RIGHT, $OP, Int32Array), - DataType::Int64 => compute_list_array_op!($LEFT, $RIGHT, $OP, Int64Array), - DataType::UInt8 => compute_list_array_op!($LEFT, $RIGHT, $OP, UInt8Array), - DataType::UInt16 => compute_list_array_op!($LEFT, $RIGHT, $OP, UInt16Array), - DataType::UInt32 => compute_list_array_op!($LEFT, $RIGHT, $OP, UInt32Array), - DataType::UInt64 => compute_list_array_op!($LEFT, $RIGHT, $OP, UInt64Array), - DataType::Float32 => compute_list_array_op!($LEFT, $RIGHT, $OP, Float32Array), - DataType::Float64 => compute_list_array_op!($LEFT, $RIGHT, $OP, Float64Array), - DataType::Utf8 => { - compute_list_array_utf8_op!($LEFT, $RIGHT, $OP, StringArray) - } - other => Err(ExecutionError::General(format!( - "Unsupported data type {:?} for Contains operator", - other - ))), - } - }}; -} - -/// Invoke a compute kernel on a primitive array and a ListArray -macro_rules! compute_list_array_op { - ($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{ - let ll = $LEFT - .as_any() - .downcast_ref::<$DT>() - .expect("compute_op failed to downcast array"); - let rr = $RIGHT - .as_any() - .downcast_ref::() - .expect("compute_op failed to downcast array"); - Ok(Arc::new($OP(&ll, &rr)?)) - }}; -} - -/// Invoke a compute kernel on a binary data array and a ListArray -macro_rules! compute_list_array_utf8_op { - ($LEFT:expr, $RIGHT:expr, $OP:ident, $DT:ident) => {{ - let ll = $LEFT - .as_any() - .downcast_ref::<$DT>() - .expect("compute_op failed to downcast array"); - let rr = $RIGHT - .as_any() - .downcast_ref::() - .expect("compute_op failed to downcast array"); - Ok(Arc::new(paste::expr! {[<$OP _utf8>]}(&ll, &rr)?)) - }}; -} - macro_rules! binary_string_array_op { ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ match $LEFT.data_type() { @@ -1080,22 +1026,12 @@ impl PhysicalExpr for BinaryExpr { let left = self.left.evaluate(batch)?; let right = self.right.evaluate(batch)?; if left.data_type() != right.data_type() { - let mut invalid_type_combination = true; - if self.op == Operator::Contains { - if let DataType::List(dt) = right.data_type() { - if **dt == left.data_type().clone() { - invalid_type_combination = false; - } - } - } - if invalid_type_combination { - return Err(ExecutionError::General(format!( - "Cannot evaluate binary expression {:?} with types {:?} and {:?}", - self.op, - left.data_type(), - right.data_type() - ))); - } + return Err(ExecutionError::General(format!( + "Cannot evaluate binary expression {:?} with types {:?} and {:?}", + self.op, + left.data_type(), + right.data_type() + ))); } match &self.op { Operator::Like => binary_string_array_op!(left, right, like), @@ -1106,7 +1042,6 @@ impl PhysicalExpr for BinaryExpr { Operator::GtEq => binary_array_op!(left, right, gt_eq), Operator::Eq => binary_array_op!(left, right, eq), Operator::NotEq => binary_array_op!(left, right, neq), - Operator::Contains => list_array_op!(left, right, contains), Operator::Plus => binary_primitive_array_op!(left, right, add), Operator::Minus => binary_primitive_array_op!(left, right, subtract), Operator::Multiply => binary_primitive_array_op!(left, right, multiply), @@ -1334,8 +1269,7 @@ mod tests { use super::*; use crate::error::Result; use crate::execution::physical_plan::common::get_scalar_value; - use arrow::array::{ArrayData, PrimitiveArray, StringArray, Time64NanosecondArray}; - use arrow::buffer::Buffer; + use arrow::array::{PrimitiveArray, StringArray, Time64NanosecondArray}; use arrow::datatypes::*; #[test] @@ -1368,56 +1302,6 @@ mod tests { Ok(()) } - #[test] - fn contains_comparison() -> Result<()> { - let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::List(Box::new(DataType::Int32)), false), - ]); - // Construct a value array - let value_data = ArrayData::builder(DataType::Int32) - .len(8) - .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) - .build(); - - // Construct a buffer for value offsets, for the nested array: - // [[0, 1, 2], [3, 4, 5], [6, 7]] - let value_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice()); - - // Construct a list array from the above two - let list_data_type = DataType::List(Box::new(DataType::Int32)); - let list_data = ArrayData::builder(list_data_type.clone()) - .len(3) - .add_buffer(value_offsets.clone()) - .add_child_data(value_data.clone()) - .build(); - - let a = Int32Array::from(vec![Some(1), Some(1), Some(1)]); - let b = ListArray::from(list_data); - - let batch = RecordBatch::try_new( - Arc::new(schema.clone()), - vec![Arc::new(a), Arc::new(b)], - )?; - - // expression: "a >] b" - let contains = binary(col(0), Operator::Contains, col(1)); - - let result = contains.evaluate(&batch)?; - assert_eq!(result.len(), 3); - - let expected = vec![true, false, false]; - let result = result - .as_any() - .downcast_ref::() - .expect("failed to downcast to BooleanArray"); - for i in 0..3 { - assert_eq!(result.value(i), expected[i]); - } - - Ok(()) - } - #[test] fn binary_nested() -> Result<()> { let schema = Schema::new(vec![ diff --git a/rust/datafusion/src/logicalplan.rs b/rust/datafusion/src/logicalplan.rs index 5bbf64a79ae..0b9464c1588 100644 --- a/rust/datafusion/src/logicalplan.rs +++ b/rust/datafusion/src/logicalplan.rs @@ -110,8 +110,6 @@ pub enum Operator { Divide, /// Remainder operator, like `%` Modulus, - /// Contains operator, like `>]` - Contains, /// Logical AND, like `&&` And, /// Logical OR, like `||` diff --git a/rust/datafusion/src/optimizer/type_coercion.rs b/rust/datafusion/src/optimizer/type_coercion.rs index 7ed1ed0bb36..0bd4edd9083 100644 --- a/rust/datafusion/src/optimizer/type_coercion.rs +++ b/rust/datafusion/src/optimizer/type_coercion.rs @@ -98,23 +98,12 @@ fn rewrite_expr(expr: &Expr, schema: &Schema) -> Result { right: Arc::new(right), }) } else { - match op { - Operator::Contains => { - return Ok(Expr::BinaryExpr { - left: Arc::new(left.cast_to(&left_type, schema)?), - op: op.clone(), - right: Arc::new(right.cast_to(&right_type, schema)?), - }); - } - _ => { - let super_type = utils::get_supertype(&left_type, &right_type)?; - return Ok(Expr::BinaryExpr { - left: Arc::new(left.cast_to(&super_type, schema)?), - op: op.clone(), - right: Arc::new(right.cast_to(&super_type, schema)?), - }); - } - }; + let super_type = utils::get_supertype(&left_type, &right_type)?; + return Ok(Expr::BinaryExpr { + left: Arc::new(left.cast_to(&super_type, schema)?), + op: op.clone(), + right: Arc::new(right.cast_to(&super_type, schema)?), + }); } } Expr::IsNull(e) => Ok(Expr::IsNull(Arc::new(rewrite_expr(e, schema)?))), diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index dac0e08f4cb..77deb58f2d5 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -340,7 +340,6 @@ impl SqlToRel { SQLOperator::Not => Operator::Not, SQLOperator::Like => Operator::Like, SQLOperator::NotLike => Operator::NotLike, - SQLOperator::Contains => Operator::Contains, }; match operator { @@ -580,7 +579,7 @@ mod tests { quick_test( "SELECT * from person", "Projection: #0, #1, #2, #3, #4, #5, #6\ - \n TableScan: person projection=None", + \n TableScan: person projection=None", ); } diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index babfb58f389..1918a9db2ae 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::convert::TryFrom; use std::env; use std::sync::Arc; @@ -22,7 +23,7 @@ extern crate arrow; extern crate datafusion; use arrow::array::*; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{DataType, Field, Int64Type, Schema}; use arrow::record_batch::RecordBatch; use datafusion::error::Result; @@ -114,47 +115,99 @@ fn parquet_single_nan_schema() { } #[test] -fn parquet_query_int_array() { - //TO DO: this test file is not part of parquet-testing submodule (Morgan, 16/03/2020) +fn parquet_list_columns() { let mut ctx = ExecutionContext::new(); let testdata = env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined"); ctx.register_parquet( - "int_array_test", - &format!("{}/int_array_test_file.parquet", testdata), + "list_columns", + &format!("{}/list_columns.parquet", testdata), ) .unwrap(); - let sql = "SELECT int_array FROM int_array_test WHERE 7006 >] int_array"; + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "int64_list", + DataType::List(Box::new(DataType::Int64)), + true, + ), + Field::new("utf8_list", DataType::List(Box::new(DataType::Utf8)), true), + ])); + + let sql = "SELECT int64_list, utf8_list FROM list_columns"; let plan = ctx.create_logical_plan(&sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan, DEFAULT_BATCH_SIZE).unwrap(); let results = ctx.collect(plan.as_ref()).unwrap(); - for batch in results { - assert_eq!(2, batch.num_rows()); - assert_eq!(1, batch.num_columns()); - } -} -#[test] -fn string_parquet_query_array() { - //TODO: this test file is not part of parquet-testing submodule (Morgan, 16/03/2020) - let mut ctx = ExecutionContext::new(); - let testdata = env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined"); - ctx.register_parquet( - "string_array_table", - &format!("{}/string_array.parquet", testdata), - ) - .unwrap(); - let sql = "SELECT string_array FROM string_array_table WHERE 'abc' >] string_array"; - let plan = ctx.create_logical_plan(&sql).expect("logical"); - let plan = ctx.optimize(&plan).expect("optimize"); - let plan = ctx - .create_physical_plan(&plan, DEFAULT_BATCH_SIZE) - .expect("physical"); - let results = ctx.collect(plan.as_ref()).expect("record batches"); - for batch in results { - assert_eq!(8, batch.num_rows()); - assert_eq!(1, batch.num_columns()); - } + // int64_list utf8_list + // 0 [1, 2, 3] [abc, efg, hij] + // 1 [None, 1] None + // 2 [4] [efg, None, hij, xyz] + + assert_eq!(1, results.len()); + let batch = &results[0]; + assert_eq!(3, batch.num_rows()); + assert_eq!(2, batch.num_columns()); + assert_eq!(&schema, batch.schema()); + + let int_list_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let utf8_list_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!( + int_list_array + .value(0) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(1), Some(2), Some(3),]) + ); + + assert_eq!( + utf8_list_array + .value(0) + .as_any() + .downcast_ref::() + .unwrap(), + &StringArray::try_from(vec![Some("abc"), Some("efg"), Some("hij"),]).unwrap() + ); + + assert_eq!( + int_list_array + .value(1) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![None, Some(1),]) + ); + + assert!(utf8_list_array.is_null(1)); + + assert_eq!( + int_list_array + .value(2) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(4),]) + ); + + assert_eq!( + utf8_list_array + .value(2) + .as_any() + .downcast_ref::() + .unwrap(), + &StringArray::try_from(vec![Some("efg"), None, Some("hij"), Some("xyz")]) + .unwrap() + ); } #[test] diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 06173fb6be4..2b87fb68109 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -77,7 +77,6 @@ use arrow::datatypes::{ use arrow::util::bit_util; use std::any::Any; -use std::convert::TryFrom; /// Array reader reads parquet data into arrow array. pub trait ArrayReader { @@ -623,51 +622,31 @@ fn build_empty_list_array(item_type: ArrowType) -> Result { macro_rules! remove_primitive_array_indices { ($arr: expr, $item_type:ty, $indices:expr) => {{ - let mut new_data_vec = Vec::new(); let array_data = match $arr.as_any().downcast_ref::>() { Some(a) => a, _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), }; + let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len()); for i in 0..array_data.len() { if !$indices.contains(&i) { if array_data.is_null(i) { - new_data_vec.push(None); - } else { - new_data_vec.push(Some(array_data.value(i).into())); - } - } - } - Ok(Arc::new(>::from(new_data_vec))) - }}; -} - -macro_rules! remove_timestamp_array_indices { - ($arr: expr, $array_type:ty, $indices:expr, $time_unit:expr) => {{ - let mut new_data_vec = Vec::new(); - let array_data = match $arr.as_any().downcast_ref::>() { - Some(a) => a, - _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray<>", $arr))), - }; - for i in 0..array_data.len() { - if !$indices.contains(&i) { - if array_data.is_null(i) { - new_data_vec.push(None); + builder.append_null()?; } else { - new_data_vec.push(Some(array_data.value(i).into())); + builder.append_value(array_data.value(i))?; } } } - Ok(Arc::new(>::from_opt_vec(new_data_vec, $time_unit))) + Ok(Arc::new(builder.finish())) }}; } -macro_rules! remove_binary_array_indices { +macro_rules! remove_array_indices_custom_builder { ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{ let array_data = match $arr.as_any().downcast_ref::<$array_type>() { Some(a) => a, _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))), }; - let mut builder = BinaryBuilder::new(array_data.len()); + let mut builder = $item_builder::new(array_data.len()); for i in 0..array_data.len() { if !$indices.contains(&i) { @@ -702,29 +681,6 @@ macro_rules! remove_fixed_size_binary_array_indices { }}; } -macro_rules! remove_string_array_indices { - ($arr: expr, $array_type:ty, $indices:expr) => {{ - let mut new_data_vec = Vec::new(); - let array_data = match $arr.as_any().downcast_ref::<$array_type>() { - Some(a) => a, - _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray<>", $arr))), - }; - for i in 0..array_data.len() { - if !$indices.contains(&i) { - if array_data.is_null(i) { - new_data_vec.push(None); - } else { - new_data_vec.push(Some(array_data.value(i).into())); - } - } - } - match <$array_type>::try_from(new_data_vec.clone()) { - Ok(a) => Ok(Arc::new(a)), - _ => Err(ParquetError::General(format!("Error generating next batch for ListArray: non-primitive Arrow array cannot be built from {:?}", new_data_vec))) - } - }}; -} - fn remove_indices( arr: ArrayRef, item_type: ArrowType, @@ -784,41 +740,23 @@ fn remove_indices( ArrowType::Duration(ArrowTimeUnit::Nanosecond) => { remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, indices) } - ArrowType::Timestamp(ArrowTimeUnit::Second, time_unit) => { - remove_timestamp_array_indices!( - arr, - ArrowTimestampSecondType, - indices, - time_unit - ) + ArrowType::Timestamp(ArrowTimeUnit::Second, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampSecondType, indices) } - ArrowType::Timestamp(ArrowTimeUnit::Millisecond, time_unit) => { - remove_timestamp_array_indices!( - arr, - ArrowTimestampMillisecondType, - indices, - time_unit - ) + ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMillisecondType, indices) } - ArrowType::Timestamp(ArrowTimeUnit::Microsecond, time_unit) => { - remove_timestamp_array_indices!( - arr, - ArrowTimestampMicrosecondType, - indices, - time_unit - ) + ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampMicrosecondType, indices) } - ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, time_unit) => { - remove_timestamp_array_indices!( - arr, - ArrowTimestampNanosecondType, - indices, - time_unit - ) + ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => { + remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, indices) + } + ArrowType::Utf8 => { + remove_array_indices_custom_builder!(arr, StringArray, StringBuilder, indices) } - ArrowType::Utf8 => remove_string_array_indices!(arr, StringArray, indices), ArrowType::Binary => { - remove_binary_array_indices!(arr, BinaryArray, BinaryBuilder, indices) + remove_array_indices_custom_builder!(arr, BinaryArray, BinaryBuilder, indices) } ArrowType::FixedSizeBinary(size) => remove_fixed_size_binary_array_indices!( arr, @@ -1472,21 +1410,20 @@ impl<'a> ArrayReaderBuilder { #[cfg(test)] mod tests { use crate::arrow::array_reader::{ - build_array_reader, ArrayReader, PrimitiveArrayReader, StructArrayReader, + ArrayReader, ListArrayReader, PrimitiveArrayReader, StructArrayReader, }; use crate::basic::{Encoding, Type as PhysicalType}; use crate::column::page::Page; use crate::data_type::{DataType, Int32Type, Int64Type}; use crate::errors::Result; - use crate::file::reader::{FileReader, SerializedFileReader}; use crate::schema::parser::parse_message_type; use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; + use crate::util::test_common::make_pages; use crate::util::test_common::page_util::InMemoryPageIterator; - use crate::util::test_common::{get_test_file, make_pages}; use arrow::array::{Array, ArrayRef, ListArray, PrimitiveArray, StructArray}; use arrow::datatypes::{ - DataType as ArrowType, Field, Int32Type as ArrowInt32, Int64Type as ArrowInt64, - UInt32Type as ArrowUInt32, UInt64Type as ArrowUInt64, + DataType as ArrowType, Field, Int32Type as ArrowInt32, UInt32Type as ArrowUInt32, + UInt64Type as ArrowUInt64, }; use rand::distributions::uniform::SampleUniform; use std::any::Any; @@ -1912,46 +1849,55 @@ mod tests { ); } - #[test] //TODO: this test file is not in the testing submodule yet (Morgan 03-18-2020) - fn test_create_list_array_reader() { - let file = get_test_file("tiny_int_array.parquet"); - let file_reader = Rc::new(SerializedFileReader::new(file).unwrap()); - let mut array_reader = build_array_reader( - file_reader.metadata().file_metadata().schema_descr_ptr(), - vec![0usize].into_iter(), - file_reader, - ) - .unwrap(); - - let arrow_type = ArrowType::Struct(vec![Field::new( - "int_array", - ArrowType::List(Box::new(ArrowType::Int64)), - true, - )]); - assert_eq!(array_reader.get_data_type(), &arrow_type); - - let next_batch = array_reader.next_batch(1024).unwrap(); - - // parquets with primitive columns also produce a StructArray -- each struct is a field - let struct_array = next_batch.as_any().downcast_ref::().unwrap(); - let array = struct_array - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); + #[test] + fn test_list_array_reader() { + // [[1, null, 2], null, [3, 4]] + let array = Arc::new(PrimitiveArray::::from(vec![ + Some(1), + None, + Some(2), + None, + Some(3), + Some(4), + ])); + let item_array_reader = InMemoryArrayReader::new( + ArrowType::Int32, + array.clone(), + Some(vec![3, 2, 3, 0, 3, 3]), + Some(vec![0, 1, 1, 0, 0, 1]), + ); + + let mut list_array_reader = ListArrayReader::new( + Box::new(item_array_reader), + ArrowType::List(Box::new(ArrowType::Int32)), + ArrowType::Int32, + 1, + 1, + ); - assert_eq!(8, array.len()); + let next_batch = list_array_reader.next_batch(1024).unwrap(); + let list_array = next_batch.as_any().downcast_ref::().unwrap(); - let sublist_0 = array.value(0); - let sublist_0 = sublist_0 - .as_any() - .downcast_ref::>() - .unwrap(); + assert_eq!(3, list_array.len()); - assert_eq!(2, sublist_0.len()); - assert_eq!(false, sublist_0.is_null(0)); - assert_eq!(false, sublist_0.is_null(1)); - assert_eq!(7000, sublist_0.value(0)); - assert_eq!(7001, sublist_0.value(1)); + assert_eq!( + list_array + .value(0) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(1), None, Some(2)]) + ); + + assert!(list_array.is_null(1)); + + assert_eq!( + list_array + .value(2) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(3), Some(4)]) + ); } } From 48ee841a238c344aed5f897cb4eb81f282a0b611 Mon Sep 17 00:00:00 2001 From: Morgan Cassels Date: Mon, 30 Mar 2020 16:11:02 -0700 Subject: [PATCH 03/13] more cleanup --- rust/arrow/src/compute/kernels/comparison.rs | 11 +- rust/arrow/src/compute/kernels/filter.rs | 204 +------------------ rust/parquet/src/arrow/array_reader.rs | 51 +++-- 3 files changed, 36 insertions(+), 230 deletions(-) diff --git a/rust/arrow/src/compute/kernels/comparison.rs b/rust/arrow/src/compute/kernels/comparison.rs index aa5000f5a53..0409fac0a96 100644 --- a/rust/arrow/src/compute/kernels/comparison.rs +++ b/rust/arrow/src/compute/kernels/comparison.rs @@ -27,7 +27,6 @@ use std::collections::HashMap; use std::sync::Arc; use crate::array::*; -use crate::buffer::{Buffer, MutableBuffer}; use crate::compute::util::apply_bin_op_to_option_bitmap; use crate::datatypes::{ArrowNumericType, BooleanType, DataType}; use crate::error::{ArrowError, Result}; @@ -348,14 +347,6 @@ where compare_op!(left, right, |a, b| a >= b) } -// create a buffer and fill it with valid bits -fn new_all_set_buffer(len: usize) -> Buffer { - let buffer = MutableBuffer::new(len); - let buffer = buffer.with_bitset(len, true); - let buffer = buffer.freeze(); - buffer -} - #[cfg(test)] mod tests { use super::*; @@ -466,7 +457,7 @@ mod tests { #[test] fn test_primitive_array_gt_eq_nulls() { let a = Int32Array::from(vec![None, None, Some(1)]); - let b = Int32Array::from(vec![None, Some(1), Some(1)]); + let b = Int32Array::from(vec![None, Some(1), None]); let c = gt_eq(&a, &b).unwrap(); assert_eq!(true, c.value(0)); assert_eq!(false, c.value(1)); diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index 0f143877144..502ac892641 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use crate::array::*; -use crate::datatypes::*; +use crate::datatypes::{ArrowNumericType, DataType, TimeUnit}; use crate::error::{ArrowError, Result}; /// Helper function to perform boolean lambda function on values from two arrays. @@ -73,66 +73,6 @@ macro_rules! filter_array { }}; } -macro_rules! filter_primitive_item_list_array { - ($array:expr, $filter:expr, $item_type:ident) => {{ - let list_of_lists = $array.as_any().downcast_ref::().unwrap(); - let values_builder = PrimitiveBuilder::<$item_type>::new(list_of_lists.len()); - let mut builder = ListBuilder::new(values_builder); - for i in 0..list_of_lists.len() { - if $filter.value(i) { - if list_of_lists.is_null(i) { - builder.append(false)?; - } else { - let this_inner_list = list_of_lists.value(i); - let inner_list = this_inner_list - .as_any() - .downcast_ref::>() - .unwrap(); - for j in 0..inner_list.len() { - if inner_list.is_null(j) { - builder.values().append_null()?; - } else { - builder.values().append_value(inner_list.value(j))?; - } - } - builder.append(true)?; - } - } - } - Ok(Arc::new(builder.finish())) - }}; -} - -macro_rules! filter_non_primitive_item_list_array { - ($array:expr, $filter:expr, $item_array_type:ident, $item_builder:ident) => {{ - let list_of_lists = $array.as_any().downcast_ref::().unwrap(); - let values_builder = $item_builder::new(list_of_lists.len()); - let mut builder = ListBuilder::new(values_builder); - for i in 0..list_of_lists.len() { - if $filter.value(i) { - if list_of_lists.is_null(i) { - builder.append(false)?; - } else { - let this_inner_list = list_of_lists.value(i); - let inner_list = this_inner_list - .as_any() - .downcast_ref::<$item_array_type>() - .unwrap(); - for j in 0..inner_list.len() { - if inner_list.is_null(j) { - builder.values().append_null()?; - } else { - builder.values().append_value(inner_list.value(j))?; - } - } - builder.append(true)?; - } - } - } - Ok(Arc::new(builder.finish())) - }}; -} - /// Returns the array, taking only the elements matching the filter pub fn filter(array: &Array, filter: &BooleanArray) -> Result { match array.data_type() { @@ -185,99 +125,6 @@ pub fn filter(array: &Array, filter: &BooleanArray) -> Result { DataType::Timestamp(TimeUnit::Nanosecond, _) => { filter_array!(array, filter, TimestampNanosecondArray) } - DataType::List(dt) => match &**dt { - DataType::UInt8 => { - filter_primitive_item_list_array!(array, filter, UInt8Type) - } - DataType::UInt16 => { - filter_primitive_item_list_array!(array, filter, UInt16Type) - } - DataType::UInt32 => { - filter_primitive_item_list_array!(array, filter, UInt32Type) - } - DataType::UInt64 => { - filter_primitive_item_list_array!(array, filter, UInt64Type) - } - DataType::Int8 => filter_primitive_item_list_array!(array, filter, Int8Type), - DataType::Int16 => { - filter_primitive_item_list_array!(array, filter, Int16Type) - } - DataType::Int32 => { - filter_primitive_item_list_array!(array, filter, Int32Type) - } - DataType::Int64 => { - filter_primitive_item_list_array!(array, filter, Int64Type) - } - DataType::Float32 => { - filter_primitive_item_list_array!(array, filter, Float32Type) - } - DataType::Float64 => { - filter_primitive_item_list_array!(array, filter, Float64Type) - } - DataType::Boolean => { - filter_primitive_item_list_array!(array, filter, BooleanType) - } - DataType::Date32(_) => { - filter_primitive_item_list_array!(array, filter, Date32Type) - } - DataType::Date64(_) => { - filter_primitive_item_list_array!(array, filter, Date64Type) - } - DataType::Time32(TimeUnit::Second) => { - filter_primitive_item_list_array!(array, filter, Time32SecondType) - } - DataType::Time32(TimeUnit::Millisecond) => { - filter_primitive_item_list_array!(array, filter, Time32MillisecondType) - } - DataType::Time64(TimeUnit::Microsecond) => { - filter_primitive_item_list_array!(array, filter, Time64MicrosecondType) - } - DataType::Time64(TimeUnit::Nanosecond) => { - filter_primitive_item_list_array!(array, filter, Time64NanosecondType) - } - DataType::Duration(TimeUnit::Second) => { - filter_primitive_item_list_array!(array, filter, DurationSecondType) - } - DataType::Duration(TimeUnit::Millisecond) => { - filter_primitive_item_list_array!(array, filter, DurationMillisecondType) - } - DataType::Duration(TimeUnit::Microsecond) => { - filter_primitive_item_list_array!(array, filter, DurationMicrosecondType) - } - DataType::Duration(TimeUnit::Nanosecond) => { - filter_primitive_item_list_array!(array, filter, DurationNanosecondType) - } - DataType::Timestamp(TimeUnit::Second, _) => { - filter_primitive_item_list_array!(array, filter, TimestampSecondType) - } - DataType::Timestamp(TimeUnit::Millisecond, _) => { - filter_primitive_item_list_array!(array, filter, TimestampMillisecondType) - } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - filter_primitive_item_list_array!(array, filter, TimestampMicrosecondType) - } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - filter_primitive_item_list_array!(array, filter, TimestampNanosecondType) - } - DataType::Binary => filter_non_primitive_item_list_array!( - array, - filter, - BinaryArray, - BinaryBuilder - ), - DataType::Utf8 => filter_non_primitive_item_list_array!( - array, - filter, - StringArray, - StringBuilder - ), - other => { - return Err(ArrowError::ComputeError(format!( - "filter not supported for List({:?})", - other - ))); - } - }, DataType::Binary => { let b = array.as_any().downcast_ref::().unwrap(); let mut values: Vec<&[u8]> = Vec::with_capacity(b.len()); @@ -428,53 +275,4 @@ mod tests { assert_eq!(1, d.len()); assert_eq!(true, d.is_null(0)); } - - #[test] - fn test_filter_list_array() { - let value_data = ArrayData::builder(DataType::Int32) - .len(8) - .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) - .build(); - - let value_offsets = Buffer::from(&[0, 3, 6, 8, 8].to_byte_slice()); - - let list_data_type = DataType::List(Box::new(DataType::Int32)); - let list_data = ArrayData::builder(list_data_type.clone()) - .len(4) - .add_buffer(value_offsets.clone()) - .add_child_data(value_data.clone()) - .null_bit_buffer(Buffer::from([0b00000111])) - .build(); - - // a = [[0, 1, 2], [3, 4, 5], [6, 7], null] - let a = ListArray::from(list_data); - let b = BooleanArray::from(vec![false, true, false, true]); - let c = filter(&a, &b).unwrap(); - let d = c.as_ref().as_any().downcast_ref::().unwrap(); - - assert_eq!(DataType::Int32, d.value_type()); - - // result should be [[3, 4, 5], null] - assert_eq!(2, d.len()); - assert_eq!(1, d.null_count()); - assert_eq!(true, d.is_null(1)); - - assert_eq!(0, d.value_offset(0)); - assert_eq!(3, d.value_length(0)); - assert_eq!(3, d.value_offset(1)); - assert_eq!(0, d.value_length(1)); - assert_eq!( - Buffer::from(&[3, 4, 5].to_byte_slice()), - d.values().data().buffers()[0].clone() - ); - assert_eq!( - Buffer::from(&[0, 3, 3].to_byte_slice()), - d.data().buffers()[0].clone() - ); - let inner_list = d.value(0); - let inner_list = inner_list.as_any().downcast_ref::().unwrap(); - assert_eq!(3, inner_list.len()); - assert_eq!(0, inner_list.null_count()); - assert_eq!(inner_list, &Int32Array::from(vec![3, 4, 5])); - } } diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 2b87fb68109..4e8141f4029 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -490,7 +490,7 @@ where } } -/// Implementation of struct array reader. +/// Implementation of list array reader. pub struct ListArrayReader { item_reader: Box, data_type: ArrowType, @@ -502,7 +502,7 @@ pub struct ListArrayReader { } impl ListArrayReader { - /// Construct struct array reader. + /// Construct list array reader. pub fn new( item_reader: Box, data_type: ArrowType, @@ -943,9 +943,6 @@ impl ArrayReader for StructArrayReader { .children .iter_mut() .map(|reader| reader.next_batch(batch_size)) - .map(|batch| { - return batch; - }) .try_fold( Vec::new(), |mut result, child_array| -> Result> { @@ -983,9 +980,7 @@ impl ArrayReader for StructArrayReader { for child in &self.children { if let Some(current_child_def_levels) = child.get_def_levels() { - if current_child_def_levels.len() != children_array_len - && children_array.len() != 1 - { + if current_child_def_levels.len() != children_array_len { return Err(general_err!("Child array length are not equal!")); } else { for i in 0..children_array_len { @@ -1039,9 +1034,7 @@ impl ArrayReader for StructArrayReader { self.def_level_buffer = Some(def_level_data_buffer.freeze()); self.rep_level_buffer = rep_level_data; - let result_array = StructArray::from(array_data); - - return Ok(Arc::new(result_array)); + Ok(Arc::new(StructArray::from(array_data))) } fn get_def_levels(&self) -> Option<&[i16]> { @@ -1133,9 +1126,8 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext ) -> Result>> { if self.is_included(cur_type.as_ref()) { let mut new_context = context.clone(); - if cur_type.name().to_string() != "item" { - new_context.path.append(vec![cur_type.name().to_string()]); - } + new_context.path.append(vec![cur_type.name().to_string()]); + match cur_type.get_basic_info().repetition() { Repetition::REPEATED => { new_context.def_level += 1; @@ -1146,8 +1138,10 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext } _ => (), } + let reader = self.build_for_primitive_type_inner(cur_type.clone(), &new_context)?; + if cur_type.get_basic_info().repetition() == Repetition::REPEATED { Err(ArrowError( "Reading repeated field is not supported yet!".to_string(), @@ -1210,7 +1204,6 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext } /// Build array reader for list type. - /// List type is a special case of struct type (see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md) fn visit_list_with_item( &mut self, list_type: Rc, @@ -1410,16 +1403,18 @@ impl<'a> ArrayReaderBuilder { #[cfg(test)] mod tests { use crate::arrow::array_reader::{ - ArrayReader, ListArrayReader, PrimitiveArrayReader, StructArrayReader, + build_array_reader, ArrayReader, ListArrayReader, PrimitiveArrayReader, + StructArrayReader, }; use crate::basic::{Encoding, Type as PhysicalType}; use crate::column::page::Page; use crate::data_type::{DataType, Int32Type, Int64Type}; use crate::errors::Result; + use crate::file::reader::{FileReader, SerializedFileReader}; use crate::schema::parser::parse_message_type; use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; - use crate::util::test_common::make_pages; use crate::util::test_common::page_util::InMemoryPageIterator; + use crate::util::test_common::{get_test_file, make_pages}; use arrow::array::{Array, ArrayRef, ListArray, PrimitiveArray, StructArray}; use arrow::datatypes::{ DataType as ArrowType, Field, Int32Type as ArrowInt32, UInt32Type as ArrowUInt32, @@ -1849,6 +1844,28 @@ mod tests { ); } + #[test] + fn test_create_array_reader() { + let file = get_test_file("nulls.snappy.parquet"); + let file_reader = Rc::new(SerializedFileReader::new(file).unwrap()); + + let array_reader = build_array_reader( + file_reader.metadata().file_metadata().schema_descr_ptr(), + vec![0usize].into_iter(), + file_reader, + ) + .unwrap(); + + // Create arrow types + let arrow_type = ArrowType::Struct(vec![Field::new( + "b_struct", + ArrowType::Struct(vec![Field::new("b_c_int", ArrowType::Int32, true)]), + true, + )]); + + assert_eq!(array_reader.get_data_type(), &arrow_type); + } + #[test] fn test_list_array_reader() { // [[1, null, 2], null, [3, 4]] From dd4b10987e8197c8f1bfca34ebeb01464dbde52d Mon Sep 17 00:00:00 2001 From: Morgan Cassels Date: Mon, 30 Mar 2020 16:19:55 -0700 Subject: [PATCH 04/13] remove unused import --- rust/arrow/src/compute/kernels/filter.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index 502ac892641..52e12cfef19 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -155,8 +155,6 @@ pub fn filter(array: &Array, filter: &BooleanArray) -> Result { #[cfg(test)] mod tests { use super::*; - use crate::buffer::Buffer; - use crate::datatypes::ToByteSlice; macro_rules! def_temporal_test { ($test:ident, $array_type: ident, $data: expr) => { From f9411f135318cf71d458023f46eded178a0b1708 Mon Sep 17 00:00:00 2001 From: Morgan Cassels Date: Mon, 30 Mar 2020 16:25:56 -0700 Subject: [PATCH 05/13] more cleanup --- rust/datafusion/src/logicalplan.rs | 11 ++++++----- .../src/optimizer/projection_push_down.rs | 19 ++++++++----------- .../datafusion/src/optimizer/type_coercion.rs | 4 ++-- rust/parquet/src/arrow/array_reader.rs | 3 ++- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/rust/datafusion/src/logicalplan.rs b/rust/datafusion/src/logicalplan.rs index 0b9464c1588..25c8c94efe6 100644 --- a/rust/datafusion/src/logicalplan.rs +++ b/rust/datafusion/src/logicalplan.rs @@ -828,8 +828,8 @@ mod tests { .build()?; let expected = "Projection: #id\ - \n Selection: #state Eq Utf8(\"CO\")\ - \n TableScan: employee.csv projection=Some([0, 3])"; + \n Selection: #state Eq Utf8(\"CO\")\ + \n TableScan: employee.csv projection=Some([0, 3])"; assert_eq!(expected, format!("{:?}", plan)); @@ -852,9 +852,10 @@ mod tests { .project(vec![col("state"), col("total_salary")])? .build()?; - let expected = "Projection: #state, #total_salary\ - \n Aggregate: groupBy=[[#state]], aggr=[[SUM(#salary) AS total_salary]]\ - \n TableScan: employee.csv projection=Some([3, 4])"; + let expected = + "Projection: #state, #total_salary\ + \n Aggregate: groupBy=[[#state]], aggr=[[SUM(#salary) AS total_salary]]\ + \n TableScan: employee.csv projection=Some([3, 4])"; assert_eq!(expected, format!("{:?}", plan)); diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs index c71332981a7..744b8dcef08 100644 --- a/rust/datafusion/src/optimizer/projection_push_down.rs +++ b/rust/datafusion/src/optimizer/projection_push_down.rs @@ -56,9 +56,6 @@ impl ProjectionPushDown { // collect all columns referenced by projection expressions utils::exprlist_to_column_indices(&expr, accum)?; - // push projection down - let input = self.optimize_plan(&input, accum, mapping)?; - LogicalPlanBuilder::from(&self.optimize_plan(&input, accum, mapping)?) .project(self.rewrite_expr_list(expr, mapping)?)? .build() @@ -277,7 +274,7 @@ mod tests { .build()?; let expected = "Aggregate: groupBy=[[]], aggr=[[MAX(#0)]]\ - \n TableScan: test projection=Some([1])"; + \n TableScan: test projection=Some([1])"; assert_optimized_plan_eq(&plan, expected); @@ -293,7 +290,7 @@ mod tests { .build()?; let expected = "Aggregate: groupBy=[[#1]], aggr=[[MAX(#0)]]\ - \n TableScan: test projection=Some([1, 2])"; + \n TableScan: test projection=Some([1, 2])"; assert_optimized_plan_eq(&plan, expected); @@ -310,8 +307,8 @@ mod tests { .build()?; let expected = "Aggregate: groupBy=[[]], aggr=[[MAX(#0)]]\ - \n Selection: #1\ - \n TableScan: test projection=Some([1, 2])"; + \n Selection: #1\ + \n TableScan: test projection=Some([1, 2])"; assert_optimized_plan_eq(&plan, expected); @@ -330,7 +327,7 @@ mod tests { .build()?; let expected = "Projection: CAST(#0 AS Float64)\ - \n TableScan: test projection=Some([2])"; + \n TableScan: test projection=Some([2])"; assert_optimized_plan_eq(&projection, expected); @@ -350,7 +347,7 @@ mod tests { assert_fields_eq(&plan, vec!["a", "b"]); let expected = "Projection: #0, #1\ - \n TableScan: test projection=Some([0, 1])"; + \n TableScan: test projection=Some([0, 1])"; assert_optimized_plan_eq(&plan, expected); @@ -371,8 +368,8 @@ mod tests { assert_fields_eq(&plan, vec!["c", "a"]); let expected = "Limit: UInt32(5)\ - \n Projection: #1, #0\ - \n TableScan: test projection=Some([0, 2])"; + \n Projection: #1, #0\ + \n TableScan: test projection=Some([0, 2])"; assert_optimized_plan_eq(&plan, expected); diff --git a/rust/datafusion/src/optimizer/type_coercion.rs b/rust/datafusion/src/optimizer/type_coercion.rs index 0bd4edd9083..b8d90a90910 100644 --- a/rust/datafusion/src/optimizer/type_coercion.rs +++ b/rust/datafusion/src/optimizer/type_coercion.rs @@ -99,11 +99,11 @@ fn rewrite_expr(expr: &Expr, schema: &Schema) -> Result { }) } else { let super_type = utils::get_supertype(&left_type, &right_type)?; - return Ok(Expr::BinaryExpr { + Ok(Expr::BinaryExpr { left: Arc::new(left.cast_to(&super_type, schema)?), op: op.clone(), right: Arc::new(right.cast_to(&super_type, schema)?), - }); + }) } } Expr::IsNull(e) => Ok(Expr::IsNull(Arc::new(rewrite_expr(e, schema)?))), diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 4e8141f4029..76d833cebd7 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -159,6 +159,7 @@ impl ArrayReader for PrimitiveArrayReader { let records_read_once = self.record_reader.read_records(records_to_read)?; records_read = records_read + records_read_once; + // Record reader exhausted if records_read_once < records_to_read { if let Some(page_reader) = self.pages.next() { @@ -1824,7 +1825,7 @@ mod tests { 1, ); - let struct_array = struct_array_reader.next_batch(1024).unwrap(); + let struct_array = struct_array_reader.next_batch(5).unwrap(); let struct_array = struct_array.as_any().downcast_ref::().unwrap(); assert_eq!(5, struct_array.len()); From f4e70fdacb93ecdf174c354f9ce1e4debdc15d78 Mon Sep 17 00:00:00 2001 From: Morgan Cassels Date: Mon, 30 Mar 2020 16:28:21 -0700 Subject: [PATCH 06/13] remove unnecessary test --- rust/arrow/src/array/array.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/rust/arrow/src/array/array.rs b/rust/arrow/src/array/array.rs index 9759b3c1cef..05620e383a1 100644 --- a/rust/arrow/src/array/array.rs +++ b/rust/arrow/src/array/array.rs @@ -2323,14 +2323,6 @@ mod tests { BooleanArray::from(data); } - #[test] - fn test_empty_list_array() { - let values_builder = Int32Builder::new(10); - let mut builder = ListBuilder::new(values_builder); - let list_array = builder.finish(); - assert_eq!(0, list_array.len()); - } - #[test] fn test_list_array() { // Construct a value array From 8c8efa1c61d7bfa44cc9f1b06f9a39f6e7b33f63 Mon Sep 17 00:00:00 2001 From: Morgan Cassels Date: Tue, 31 Mar 2020 13:12:39 -0700 Subject: [PATCH 07/13] rebase and cleanup repl list printing --- rust/datafusion/src/utils.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/rust/datafusion/src/utils.rs b/rust/datafusion/src/utils.rs index 826d6e23629..ea3bdf85958 100644 --- a/rust/datafusion/src/utils.rs +++ b/rust/datafusion/src/utils.rs @@ -74,6 +74,29 @@ macro_rules! make_string { }}; } +macro_rules! make_string_from_list { + ($column: ident, $row: ident) => {{ + let list = $column + .as_any() + .downcast_ref::() + .unwrap() + .value($row); + let string_values = match (0..list.len()) + .map(|i| array_value_to_string(list.clone(), i)) + .collect::>>() + { + Ok(values) => values, + _ => { + return Err(ExecutionError::ExecutionError(format!( + "Unsupported {:?} type for repl.", + $column.data_type() + ))) + } + }; + Ok(format!("[{}]", string_values.join(", "))) + }}; +} + /// Get the value at the given row in an array as a string pub fn array_value_to_string(column: array::ArrayRef, row: usize) -> Result { match column.data_type() { @@ -120,6 +143,7 @@ pub fn array_value_to_string(column: array::ArrayRef, row: usize) -> Result { make_string!(array::Time64NanosecondArray, column, row) } + DataType::List(_) => make_string_from_list!(column, row), _ => Err(ExecutionError::ExecutionError(format!( "Unsupported {:?} type for repl.", column.data_type() From feb72d356cd9d13a2000ce3e436e6ea2b401b40f Mon Sep 17 00:00:00 2001 From: Morgan Cassels Date: Thu, 23 Apr 2020 21:01:17 -0700 Subject: [PATCH 08/13] address comment --- rust/datafusion/src/utils.rs | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/rust/datafusion/src/utils.rs b/rust/datafusion/src/utils.rs index ea3bdf85958..7d045ed0caf 100644 --- a/rust/datafusion/src/utils.rs +++ b/rust/datafusion/src/utils.rs @@ -79,20 +79,13 @@ macro_rules! make_string_from_list { let list = $column .as_any() .downcast_ref::() - .unwrap() + .ok_or(ExecutionError::ExecutionError(format!( + "Repl error: could not convert list column to list array." + )))? .value($row); - let string_values = match (0..list.len()) + let string_values = (0..list.len()) .map(|i| array_value_to_string(list.clone(), i)) - .collect::>>() - { - Ok(values) => values, - _ => { - return Err(ExecutionError::ExecutionError(format!( - "Unsupported {:?} type for repl.", - $column.data_type() - ))) - } - }; + .collect::>>()?; Ok(format!("[{}]", string_values.join(", "))) }}; } From 6f906369aac17ae5e568e21765d588793d2ad0ff Mon Sep 17 00:00:00 2001 From: Morgan Cassels Date: Sun, 3 May 2020 22:39:01 -0700 Subject: [PATCH 09/13] cargo fmt --- rust/datafusion/src/logicalplan.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/datafusion/src/logicalplan.rs b/rust/datafusion/src/logicalplan.rs index ba507618973..90ae9b02a82 100644 --- a/rust/datafusion/src/logicalplan.rs +++ b/rust/datafusion/src/logicalplan.rs @@ -890,8 +890,7 @@ mod tests { .project(vec![col("state"), col("total_salary")])? .build()?; - let expected = - "Projection: #state, #total_salary\ + let expected = "Projection: #state, #total_salary\ \n Aggregate: groupBy=[[#state]], aggr=[[SUM(#salary) AS total_salary]]\ \n TableScan: employee.csv projection=Some([3, 4])"; From cef96c012c8d274ac547768a698bec633392ebdf Mon Sep 17 00:00:00 2001 From: Morgan Cassels Date: Tue, 5 May 2020 08:44:21 -0700 Subject: [PATCH 10/13] test file submodule update --- cpp/submodules/parquet-testing | 2 +- rust/datafusion/tests/sql.rs | 16 +++++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index 46c9e977f58..40379b3c582 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit 46c9e977f58f6c5ef1b81f782f3746b3656e5a8c +Subproject commit 40379b3c58298fd22589dec7e41748375b5a8e82 diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 9f514bd300f..65b029469cb 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -200,15 +200,13 @@ fn parquet_list_columns() { &PrimitiveArray::::from(vec![Some(4),]) ); - assert_eq!( - utf8_list_array - .value(2) - .as_any() - .downcast_ref::() - .unwrap(), - &StringArray::try_from(vec![Some("efg"), None, Some("hij"), Some("xyz")]) - .unwrap() - ); + let result = utf8_list_array.value(2); + let result = result.as_any().downcast_ref::().unwrap(); + + assert_eq!(result.value(0), "efg"); + assert!(result.is_null(1)); + assert_eq!(result.value(2), "hij"); + assert_eq!(result.value(3), "xyz"); } #[test] From abc2a700bb84e85854c74c7cd10614a400490284 Mon Sep 17 00:00:00 2001 From: Max Burke Date: Fri, 15 May 2020 11:02:40 -0700 Subject: [PATCH 11/13] Fix merge issues --- rust/parquet/src/arrow/array_reader.rs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 679023e0048..06b2dba1a88 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -25,11 +25,12 @@ use std::sync::Arc; use std::vec::Vec; use arrow::array::{ - ArrayDataBuilder, ArrayDataRef, ArrayRef, BooleanBufferBuilder, BufferBuilderTrait, - Int16BufferBuilder, StructArray, + Array, ArrayData, ArrayDataBuilder, ArrayDataRef, ArrayRef, BinaryArray, + BinaryBuilder, BooleanBufferBuilder, BufferBuilderTrait, FixedSizeBinaryArray, + FixedSizeBinaryBuilder, Int16BufferBuilder, ListArray, ListBuilder, PrimitiveArray, + PrimitiveBuilder, StringArray, StringBuilder, StructArray, }; use arrow::buffer::{Buffer, MutableBuffer}; -use arrow::datatypes::{DataType as ArrowType, Field, IntervalUnit, TimeUnit}; use crate::arrow::converter::{ BinaryArrayConverter, BinaryConverter, BoolConverter, BooleanArrayConverter, @@ -54,13 +55,6 @@ use crate::schema::types::{ ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr, }; use crate::schema::visitor::TypeVisitor; -use arrow::array::{ - Array, ArrayData, ArrayDataBuilder, ArrayDataRef, ArrayRef, BinaryArray, - BinaryBuilder, BooleanBufferBuilder, BufferBuilderTrait, FixedSizeBinaryArray, - FixedSizeBinaryBuilder, Int16BufferBuilder, ListArray, ListBuilder, PrimitiveArray, - PrimitiveBuilder, StringArray, StringBuilder, StructArray, -}; -use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::{ BooleanType as ArrowBooleanType, DataType as ArrowType, Date32Type as ArrowDate32Type, Date64Type as ArrowDate64Type, @@ -74,8 +68,8 @@ use arrow::datatypes::{ Time32MillisecondType as ArrowTime32MillisecondType, Time32SecondType as ArrowTime32SecondType, Time64MicrosecondType as ArrowTime64MicrosecondType, - Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit as ArrowTimeUnit, - TimestampMicrosecondType as ArrowTimestampMicrosecondType, + Time64NanosecondType as ArrowTime64NanosecondType, TimeUnit, + TimeUnit as ArrowTimeUnit, TimestampMicrosecondType as ArrowTimestampMicrosecondType, TimestampMillisecondType as ArrowTimestampMillisecondType, TimestampNanosecondType as ArrowTimestampNanosecondType, TimestampSecondType as ArrowTimestampSecondType, ToByteSlice, From 70d40db4117c5692e3e354dfed590420380e95d0 Mon Sep 17 00:00:00 2001 From: Morgan Cassels Date: Mon, 8 Jun 2020 20:51:44 -0700 Subject: [PATCH 12/13] code review --- rust/parquet/src/arrow/array_reader.rs | 35 +++++++++++++++----------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 06b2dba1a88..e651c2e37b6 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -732,6 +732,7 @@ fn remove_indices( } } +/// Implementation of ListArrayReader. Nested lists and lists of structs are not yet supported. impl ArrayReader for ListArrayReader { fn as_any(&self) -> &dyn Any { self @@ -744,20 +745,26 @@ impl ArrayReader for ListArrayReader { } fn next_batch(&mut self, batch_size: usize) -> Result { - let next_batch_array = self.item_reader.next_batch(batch_size).unwrap(); + let next_batch_array = self.item_reader.next_batch(batch_size)?; let item_type = self.item_reader.get_data_type().clone(); if next_batch_array.len() == 0 { return build_empty_list_array(item_type); } - let def_levels = self.item_reader.get_def_levels().unwrap(); - let rep_levels = self.item_reader.get_rep_levels().unwrap(); + let def_levels = self + .item_reader + .get_def_levels() + .ok_or(ArrowError("item_reader def levels are None.".to_string()))?; + let rep_levels = self + .item_reader + .get_rep_levels() + .ok_or(ArrowError("item_reader rep levels are None.".to_string()))?; if !((def_levels.len() == rep_levels.len()) && (rep_levels.len() == next_batch_array.len())) { return Err(ArrowError( - "Expected item_reader def_level and rep_level arrays to have the same length as batch array".to_string(), + "Expected item_reader def_levels and rep_levels to be same length as batch".to_string(), )); } @@ -1151,18 +1158,15 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext )) } - /// Build array reader for list type. + /// Build array reader for list type. Nested lists and lists of structs are not yet supported. fn visit_list_with_item( &mut self, list_type: Rc, - _item_type: &Type, + item_type: &Type, context: &'a ArrayReaderBuilderContext, ) -> Result>> { let mut new_context = context.clone(); - let list_child = &list_type.get_fields()[0]; - let item_child = &list_child.get_fields()[0]; - new_context.path.append(vec![list_type.name().to_string()]); match list_type.get_basic_info().repetition() { @@ -1176,7 +1180,7 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext _ => (), } - match list_child.get_basic_info().repetition() { + match item_type.get_basic_info().repetition() { Repetition::REPEATED => { new_context.def_level += 1; new_context.rep_level += 1; @@ -1188,12 +1192,13 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext } let item_reader = self - .dispatch(item_child.clone(), &new_context) + .dispatch(Rc::new(item_type.clone()), &new_context) .unwrap() .unwrap(); - let item_type = item_reader.get_data_type().clone(); - match item_type { + let item_reader_type = item_reader.get_data_type().clone(); + + match item_reader_type { ArrowType::List(_) | ArrowType::FixedSizeList(_, _) | ArrowType::Struct(_) @@ -1202,11 +1207,11 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext item_type ))), _ => { - let arrow_type = ArrowType::List(Box::new(item_type.clone())); + let arrow_type = ArrowType::List(Box::new(item_reader_type.clone())); Ok(Some(Box::new(ListArrayReader::new( item_reader, arrow_type, - item_type, + item_reader_type, new_context.def_level, new_context.rep_level, )))) From e24df6ea8a9ccd012edfe2f641e6abf2885a15e3 Mon Sep 17 00:00:00 2001 From: Morgan Cassels Date: Mon, 8 Jun 2020 22:26:53 -0700 Subject: [PATCH 13/13] pass item type as Rc to visit_list_with_item --- rust/parquet/src/arrow/array_reader.rs | 10 +++++++--- rust/parquet/src/schema/visitor.rs | 11 ++++++----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 6c77172efea..a0fa13594c0 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -1179,9 +1179,13 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext fn visit_list_with_item( &mut self, list_type: Rc, - item_type: &Type, + item_type: Rc, context: &'a ArrayReaderBuilderContext, ) -> Result>> { + let list_child = &list_type + .get_fields() + .first() + .ok_or(ArrowError("List field must have a child.".to_string()))?; let mut new_context = context.clone(); new_context.path.append(vec![list_type.name().to_string()]); @@ -1197,7 +1201,7 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext _ => (), } - match item_type.get_basic_info().repetition() { + match list_child.get_basic_info().repetition() { Repetition::REPEATED => { new_context.def_level += 1; new_context.rep_level += 1; @@ -1209,7 +1213,7 @@ impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext } let item_reader = self - .dispatch(Rc::new(item_type.clone()), &new_context) + .dispatch(item_type.clone(), &new_context) .unwrap() .unwrap(); diff --git a/rust/parquet/src/schema/visitor.rs b/rust/parquet/src/schema/visitor.rs index 6970f9ed47a..0ed818e9ce2 100644 --- a/rust/parquet/src/schema/visitor.rs +++ b/rust/parquet/src/schema/visitor.rs @@ -50,7 +50,7 @@ pub trait TypeVisitor { { self.visit_list_with_item( list_type.clone(), - list_item, + list_item.clone(), context, ) } else { @@ -70,13 +70,13 @@ pub trait TypeVisitor { { self.visit_list_with_item( list_type.clone(), - fields.first().unwrap(), + fields.first().unwrap().clone(), context, ) } else { self.visit_list_with_item( list_type.clone(), - list_item, + list_item.clone(), context, ) } @@ -98,6 +98,7 @@ pub trait TypeVisitor { /// A utility method which detects input type and calls corresponding method. fn dispatch(&mut self, cur_type: TypePtr, context: C) -> Result { if cur_type.is_primitive() { + println!("visiting primitive"); self.visit_primitive(cur_type, context) } else { match cur_type.get_basic_info().logical_type() { @@ -114,7 +115,7 @@ pub trait TypeVisitor { fn visit_list_with_item( &mut self, list_type: TypePtr, - item_type: &Type, + item_type: TypePtr, context: C, ) -> Result; } @@ -174,7 +175,7 @@ mod tests { fn visit_list_with_item( &mut self, list_type: TypePtr, - item_type: &Type, + item_type: TypePtr, _context: TestVisitorContext, ) -> Result { assert_eq!(