From c70612720c7d8ef3d76cade23cbce8f821fc5ce9 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 12 Sep 2020 22:48:17 +0200 Subject: [PATCH 01/18] fixes to get more tests passing Tested with Rust and Java, but there are Java failures make ARROW_MAGIC and CONT_MARKER consts remove unused import --- dev/archery/archery/integration/datagen.py | 15 +- rust/arrow/src/compute/kernels/cast.rs | 15 + rust/arrow/src/datatypes.rs | 31 +- rust/arrow/src/ipc/mod.rs | 4 +- rust/arrow/src/ipc/reader.rs | 13 + .../src/bin/arrow-json-integration-test.rs | 708 ++++++++++-------- 6 files changed, 464 insertions(+), 322 deletions(-) diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 3d50381f0d3..353b818c3d7 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1551,17 +1551,14 @@ def _temp_path(): .skip_category('JS') # TODO(ARROW-8716) .skip_category('Rust'), - generate_nested_case() - .skip_category('Rust'), + generate_nested_case(), generate_recursive_nested_case() - .skip_category('Go') # TODO(ARROW-8453) - .skip_category('Rust'), + .skip_category('Go'), # TODO(ARROW-8453) generate_nested_large_offsets_case() .skip_category('Go') - .skip_category('JS') - .skip_category('Rust'), + .skip_category('JS'), generate_unions_case() .skip_category('Go') @@ -1570,13 +1567,11 @@ def _temp_path(): generate_custom_metadata_case() .skip_category('Go') - .skip_category('JS') - .skip_category('Rust'), + .skip_category('JS'), generate_duplicate_fieldnames_case() .skip_category('Go') - .skip_category('JS') - .skip_category('Rust'), + .skip_category('JS'), # TODO(ARROW-3039, ARROW-5267): Dictionaries in GO generate_dictionary_case() diff --git a/rust/arrow/src/compute/kernels/cast.rs b/rust/arrow/src/compute/kernels/cast.rs index 9d1b7fc6011..b40577fc839 100644 --- a/rust/arrow/src/compute/kernels/cast.rs +++ b/rust/arrow/src/compute/kernels/cast.rs @@ -751,6 +751,21 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> Result { } } // date64 to timestamp might not make sense, + (Int64, Duration(to_unit)) => { + use TimeUnit::*; + match to_unit { + Second => cast_array_data::(array, to_type.clone()), + Millisecond => { + cast_array_data::(array, to_type.clone()) + } + Microsecond => { + cast_array_data::(array, to_type.clone()) + } + Nanosecond => { + cast_array_data::(array, to_type.clone()) + } + } + } // null to primitive/flat types (Null, Int32) => Ok(Arc::new(Int32Array::from(vec![None; array.len()]))), diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index 099355e7596..53340883d97 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -1696,9 +1696,24 @@ impl Schema { } /// Parse a `metadata` definition from a JSON representation + /// The JSON can either be an Object or an Array of Objects fn from_metadata(json: &Value) -> Result> { - if let Value::Object(md) = json { - md.iter() + match json { + Value::Array(_) => { + let mut hashmap = HashMap::new(); + let values: Vec = serde_json::from_value(json.clone()) + .map_err(|_| { + ArrowError::JsonError( + "Unable to parse object into key-value pair".to_string(), + ) + })?; + for meta in values { + hashmap.insert(meta.key.clone(), meta.value); + } + Ok(hashmap) + } + Value::Object(md) => md + .iter() .map(|(k, v)| { if let Value::String(v) = v { Ok((k.to_string(), v.to_string())) @@ -1708,11 +1723,10 @@ impl Schema { )) } }) - .collect::>() - } else { - Err(ArrowError::ParseError( + .collect::>(), + _ => Err(ArrowError::ParseError( "`metadata` field must be an object".to_string(), - )) + )), } } } @@ -1733,6 +1747,11 @@ impl fmt::Display for Schema { /// A reference-counted reference to a [`Schema`](crate::datatypes::Schema). pub type SchemaRef = Arc; +#[derive(Deserialize)] +struct MetadataKeyValue { + key: String, + value: String, +} #[cfg(test)] mod tests { use super::*; diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs index cba8fb269a4..a2d7103aacf 100644 --- a/rust/arrow/src/ipc/mod.rs +++ b/rust/arrow/src/ipc/mod.rs @@ -35,5 +35,5 @@ pub use self::gen::Schema::*; pub use self::gen::SparseTensor::*; pub use self::gen::Tensor::*; -static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; -static CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; +const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; +const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index 05cf00b822b..930c7a9ff93 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -364,6 +364,19 @@ fn create_list_array( .null_bit_buffer(buffers[0].clone()) } make_array(builder.build()) + } else if let DataType::LargeList(_) = *data_type { + let null_count = field_node.null_count() as usize; + let mut builder = ArrayData::builder(data_type.clone()) + .len(field_node.length() as usize) + .buffers(buffers[1..2].to_vec()) + .offset(0) + .child_data(vec![child_array.data()]); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + make_array(builder.build()) } else if let DataType::FixedSizeList(_, _) = *data_type { let null_count = field_node.null_count() as usize; let mut builder = ArrayData::builder(data_type.clone()) diff --git a/rust/integration-testing/src/bin/arrow-json-integration-test.rs b/rust/integration-testing/src/bin/arrow-json-integration-test.rs index 5556c4cebc8..cb8143ea717 100644 --- a/rust/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/rust/integration-testing/src/bin/arrow-json-integration-test.rs @@ -15,23 +15,30 @@ // specific language governing permissions and limitations // under the License. +use std::env; +use std::fs::File; +use std::io::BufReader; +use std::sync::Arc; + use clap::{App, Arg}; +use hex::decode; use serde_json::Value; -use arrow::util::integration_util::{ArrowJson, ArrowJsonBatch, ArrowJsonSchema}; - use arrow::array::*; -use arrow::datatypes::{DataType, DateUnit, IntervalUnit, Schema}; +use arrow::datatypes::{DataType, DateUnit, Field, IntervalUnit, Schema}; use arrow::error::{ArrowError, Result}; use arrow::ipc::reader::FileReader; use arrow::ipc::writer::FileWriter; use arrow::record_batch::RecordBatch; - -use hex::decode; -use std::env; -use std::fs::File; -use std::io::BufReader; -use std::sync::Arc; +use arrow::{ + buffer::Buffer, + buffer::MutableBuffer, + datatypes::ToByteSlice, + util::{ + bit_util, + integration_util::{ArrowJson, ArrowJsonBatch, ArrowJsonColumn, ArrowJsonSchema}, + }, +}; fn main() -> Result<()> { let args: Vec = env::args().collect(); @@ -99,322 +106,415 @@ fn record_batch_from_json( let mut columns = vec![]; for (field, json_col) in schema.fields().iter().zip(json_batch.columns) { - let col: ArrayRef = match field.data_type() { - DataType::Null => Arc::new(NullArray::new(json_col.count)), - DataType::Boolean => { - let mut b = BooleanBuilder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_bool().unwrap()), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + let col = array_from_json(field, json_col)?; + columns.push(col); + } + + RecordBatch::try_new(Arc::new(schema.clone()), columns) +} + +fn array_from_json(field: &Field, json_col: ArrowJsonColumn) -> Result { + match field.data_type() { + DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))), + DataType::Boolean => { + let mut b = BooleanBuilder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_bool().unwrap()), + _ => b.append_null(), + }?; } - DataType::Int8 => { - let mut b = Int8Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_i64().unwrap() as i8), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::Int8 => { + let mut b = Int8Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_i64().unwrap() as i8), + _ => b.append_null(), + }?; } - DataType::Int16 => { - let mut b = Int16Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_i64().unwrap() as i16), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::Int16 => { + let mut b = Int16Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_i64().unwrap() as i16), + _ => b.append_null(), + }?; } - DataType::Int32 - | DataType::Date32(DateUnit::Day) - | DataType::Time32(_) - | DataType::Interval(IntervalUnit::YearMonth) => { - let mut b = Int32Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_i64().unwrap() as i32), - _ => b.append_null(), - } - .unwrap(); - } - let array = Arc::new(b.finish()) as ArrayRef; - arrow::compute::cast(&array, field.data_type()).unwrap() + Ok(Arc::new(b.finish())) + } + DataType::Int32 + | DataType::Date32(DateUnit::Day) + | DataType::Time32(_) + | DataType::Interval(IntervalUnit::YearMonth) => { + let mut b = Int32Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_i64().unwrap() as i32), + _ => b.append_null(), + }?; } - DataType::Int64 - | DataType::Date64(DateUnit::Millisecond) - | DataType::Time64(_) - | DataType::Timestamp(_, _) - | DataType::Duration(_) - | DataType::Interval(IntervalUnit::DayTime) => { - let mut b = Int64Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value( - value - .as_str() - .unwrap() - .parse() - .expect("Unable to parse string as i64"), - ), - _ => b.append_null(), - } - .unwrap(); - } - let array = Arc::new(b.finish()) as ArrayRef; - arrow::compute::cast(&array, field.data_type()).unwrap() + let array = Arc::new(b.finish()) as ArrayRef; + arrow::compute::cast(&array, field.data_type()) + } + DataType::Int64 + | DataType::Date64(DateUnit::Millisecond) + | DataType::Time64(_) + | DataType::Timestamp(_, _) + | DataType::Duration(_) + | DataType::Interval(IntervalUnit::DayTime) => { + let mut b = Int64Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(match value { + Value::Number(n) => n.as_i64().unwrap(), + Value::String(s) => { + s.parse().expect("Unable to parse string as i64") + } + _ => panic!("Unable to parse {:?} as number", value), + }), + _ => b.append_null(), + }?; } - DataType::UInt8 => { - let mut b = UInt8Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_u64().unwrap() as u8), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + let array = Arc::new(b.finish()) as ArrayRef; + arrow::compute::cast(&array, field.data_type()) + } + DataType::UInt8 => { + let mut b = UInt8Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_u64().unwrap() as u8), + _ => b.append_null(), + }?; } - DataType::UInt16 => { - let mut b = UInt16Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_u64().unwrap() as u16), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::UInt16 => { + let mut b = UInt16Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_u64().unwrap() as u16), + _ => b.append_null(), + }?; } - DataType::UInt32 => { - let mut b = UInt32Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_u64().unwrap() as u32), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::UInt32 => { + let mut b = UInt32Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_u64().unwrap() as u32), + _ => b.append_null(), + }?; } - DataType::UInt64 => { - let mut b = UInt64Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value( - value - .as_str() - .unwrap() - .parse() - .expect("Unable to parse string as u64"), - ), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::UInt64 => { + let mut b = UInt64Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value( + value + .as_str() + .unwrap() + .parse() + .expect("Unable to parse string as u64"), + ), + _ => b.append_null(), + }?; } - DataType::Float32 => { - let mut b = Float32Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_f64().unwrap() as f32), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::Float32 => { + let mut b = Float32Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_f64().unwrap() as f32), + _ => b.append_null(), + }?; } - DataType::Float64 => { - let mut b = Float64Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_f64().unwrap()), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::Float64 => { + let mut b = Float64Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_f64().unwrap()), + _ => b.append_null(), + }?; } - DataType::Binary => { - let mut b = BinaryBuilder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => { - let v = decode(value.as_str().unwrap()).unwrap(); - b.append_value(&v) - } - _ => b.append_null(), + Ok(Arc::new(b.finish())) + } + DataType::Binary => { + let mut b = BinaryBuilder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => { + let v = decode(value.as_str().unwrap()).unwrap(); + b.append_value(&v) } - .unwrap(); - } - Arc::new(b.finish()) + _ => b.append_null(), + }?; } - DataType::LargeBinary => { - let mut b = LargeBinaryBuilder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => { - let v = decode(value.as_str().unwrap()).unwrap(); - b.append_value(&v) - } - _ => b.append_null(), + Ok(Arc::new(b.finish())) + } + DataType::LargeBinary => { + let mut b = LargeBinaryBuilder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => { + let v = decode(value.as_str().unwrap()).unwrap(); + b.append_value(&v) } - .unwrap(); - } - Arc::new(b.finish()) + _ => b.append_null(), + }?; } - DataType::Utf8 => { - let mut b = StringBuilder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_str().unwrap()), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::Utf8 => { + let mut b = StringBuilder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_str().unwrap()), + _ => b.append_null(), + }?; } - DataType::LargeUtf8 => { - let mut b = LargeStringBuilder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_str().unwrap()), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::LargeUtf8 => { + let mut b = LargeStringBuilder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_str().unwrap()), + _ => b.append_null(), + }?; } - DataType::FixedSizeBinary(len) => { - let mut b = FixedSizeBinaryBuilder::new(json_col.count, *len); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => { - let v = hex::decode(value.as_str().unwrap()).unwrap(); - b.append_value(&v) - } - _ => b.append_null(), + Ok(Arc::new(b.finish())) + } + DataType::FixedSizeBinary(len) => { + let mut b = FixedSizeBinaryBuilder::new(json_col.count, *len); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => { + let v = hex::decode(value.as_str().unwrap()).unwrap(); + b.append_value(&v) } - .unwrap(); - } - Arc::new(b.finish()) + _ => b.append_null(), + }?; } - t => { - return Err(ArrowError::JsonError(format!( - "data type {:?} not supported", - t - ))) + Ok(Arc::new(b.finish())) + } + DataType::List(child_type) => { + let child_field = Field::new("", *child_type.clone(), false); + let children = json_col.children.clone().unwrap(); + let child_array = + array_from_json(&child_field, children.get(0).unwrap().clone())?; + let offsets: Vec = json_col + .offset + .unwrap() + .iter() + .map(|v| v.as_i64().unwrap() as i32) + .collect(); + let num_bytes = bit_util::ceil(json_col.count, 8); + let mut null_buf = + MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + json_col + .validity + .unwrap() + .iter() + .enumerate() + .for_each(|(i, v)| { + let null_slice = null_buf.data_mut(); + if *v != 0 { + bit_util::set_bit(null_slice, i); + } + }); + let list_data = ArrayData::builder(field.data_type().clone()) + .len(json_col.count) + .offset(0) + .add_buffer(Buffer::from(&offsets.to_byte_slice())) + .add_child_data(child_array.data()) + .null_bit_buffer(null_buf.freeze()) + .build(); + Ok(Arc::new(ListArray::from(list_data))) + } + DataType::LargeList(child_type) => { + let child_field = Field::new("", *child_type.clone(), false); + let children = json_col.children.clone().unwrap(); + let child_array = + array_from_json(&child_field, children.get(0).unwrap().clone())?; + let offsets: Vec = json_col + .offset + .unwrap() + .iter() + .map(|v| match v { + Value::Number(n) => n.as_i64().unwrap(), + Value::String(s) => s.parse::().unwrap(), + _ => panic!("64-bit offset must be either string or number"), + }) + .collect(); + let num_bytes = bit_util::ceil(json_col.count, 8); + let mut null_buf = + MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + json_col + .validity + .unwrap() + .iter() + .enumerate() + .for_each(|(i, v)| { + let null_slice = null_buf.data_mut(); + if *v != 0 { + bit_util::set_bit(null_slice, i); + } + }); + let list_data = ArrayData::builder(field.data_type().clone()) + .len(json_col.count) + .offset(0) + .add_buffer(Buffer::from(&offsets.to_byte_slice())) + .add_child_data(child_array.data()) + .null_bit_buffer(null_buf.freeze()) + .build(); + Ok(Arc::new(LargeListArray::from(list_data))) + } + DataType::FixedSizeList(child_type, _) => { + let child_field = Field::new("", *child_type.clone(), false); + let children = json_col.children.clone().unwrap(); + let child_array = + array_from_json(&child_field, children.get(0).unwrap().clone())?; + let num_bytes = bit_util::ceil(json_col.count, 8); + let mut null_buf = + MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + json_col + .validity + .unwrap() + .iter() + .enumerate() + .for_each(|(i, v)| { + let null_slice = null_buf.data_mut(); + if *v != 0 { + bit_util::set_bit(null_slice, i); + } + }); + let list_data = ArrayData::builder(field.data_type().clone()) + .len(json_col.count) + .add_child_data(child_array.data()) + .null_bit_buffer(null_buf.freeze()) + .build(); + Ok(Arc::new(FixedSizeListArray::from(list_data))) + } + DataType::Struct(fields) => { + let mut children = Vec::with_capacity(fields.len()); + for (field, col) in fields.iter().zip(json_col.children.unwrap()) { + let array = array_from_json(field, col)?; + children.push((field.clone(), array)); } - }; - columns.push(col); + let array = StructArray::from(children); + Ok(Arc::new(array)) + } + t => Err(ArrowError::JsonError(format!( + "data type {:?} not supported", + t + ))), } - - RecordBatch::try_new(Arc::new(schema.clone()), columns) } fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { From c50c0f46c307302560713185f65fd4265313ff0c Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 10 Oct 2020 04:53:48 +0200 Subject: [PATCH 02/18] port Carol's changes to dictionary serialization --- rust/arrow/src/ipc/convert.rs | 8 +++++--- rust/arrow/src/ipc/writer.rs | 7 +++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index feb4133d383..127a3631553 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -311,7 +311,7 @@ pub(crate) fn build_field<'a: 'b, 'b>( field: &Field, ) -> WIPOffset> { let fb_field_name = fbb.create_string(field.name().as_str()); - let field_type = get_fb_field_type(field.data_type(), fbb); + let field_type = get_fb_field_type(field.data_type(), field.is_nullable(), fbb); let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() { Some(get_fb_dictionary( @@ -342,6 +342,7 @@ pub(crate) fn build_field<'a: 'b, 'b>( /// Get the IPC type of a data type pub(crate) fn get_fb_field_type<'a: 'b, 'b>( data_type: &DataType, + is_nullable: bool, fbb: &mut FlatBufferBuilder<'a>, ) -> FBFieldType<'b> { // some IPC implementations expect an empty list for child data, instead of a null value. @@ -558,7 +559,8 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>( // struct's fields are children let mut children = vec![]; for field in fields { - let inner_types = get_fb_field_type(field.data_type(), fbb); + let inner_types = + get_fb_field_type(field.data_type(), field.is_nullable(), fbb); let field_name = fbb.create_string(field.name()); children.push(ipc::Field::create( fbb, @@ -583,7 +585,7 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>( // In this library, the dictionary "type" is a logical construct. Here we // pass through to the value type, as we've already captured the index // type in the DictionaryEncoding metadata in the parent field - get_fb_field_type(value_type, fbb) + get_fb_field_type(value_type, is_nullable, fbb) } t => unimplemented!("Type {:?} not supported", t), } diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 7678a5cd200..cb861609f51 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -180,8 +180,11 @@ impl FileWriter { let mut fields = vec![]; for field in self.schema.fields() { let fb_field_name = fbb.create_string(field.name().as_str()); - let field_type = - ipc::convert::get_fb_field_type(field.data_type(), &mut fbb); + let field_type = ipc::convert::get_fb_field_type( + field.data_type(), + field.is_nullable(), + &mut fbb, + ); let mut field_builder = ipc::FieldBuilder::new(&mut fbb); field_builder.add_name(fb_field_name); field_builder.add_type_type(field_type.type_type); From cb331f7563ca20c2b17f31957344574119d6dde4 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 10 Oct 2020 04:54:47 +0200 Subject: [PATCH 03/18] fix IPC file reader's dictionary message offsets We weren't able to read dictionaries because of legacy issues --- rust/arrow/src/ipc/reader.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index 930c7a9ff93..76ad6b77cf3 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -612,11 +612,18 @@ impl FileReader { let mut dictionaries_by_field = vec![None; schema.fields().len()]; for block in footer.dictionaries().unwrap() { // read length from end of offset - // TODO: ARROW-9848: dictionary metadata has not been tested - let meta_len = block.metaDataLength() - 4; + let mut message_size: [u8; 4] = [0; 4]; + reader.seek(SeekFrom::Start(block.offset() as u64))?; + reader.read_exact(&mut message_size)?; + let footer_len = if message_size == CONTINUATION_MARKER { + reader.read_exact(&mut message_size)?; + i32::from_le_bytes(message_size) + } else { + i32::from_le_bytes(message_size) + }; + + let mut block_data = vec![0; footer_len as usize]; - let mut block_data = vec![0; meta_len as usize]; - reader.seek(SeekFrom::Start(block.offset() as u64 + 4))?; reader.read_exact(&mut block_data)?; let message = ipc::get_root_as_message(&block_data[..]); @@ -640,10 +647,11 @@ impl FileReader { &mut dictionaries_by_field, )?; } - _ => { - return Err(ArrowError::IoError( - "Expecting DictionaryBatch in dictionary blocks.".to_string(), - )) + t => { + return Err(ArrowError::IoError(format!( + "Expecting DictionaryBatch in dictionary blocks, found {:?}.", + t + ))); } }; } From 782c92578a88b9457e596d43a1499f48a783d9a2 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 10 Oct 2020 04:55:11 +0200 Subject: [PATCH 04/18] add serde_json error expose Field's dictionary fields via methods some progress on dictionary integration --- dev/archery/archery/integration/datagen.py | 3 +- rust/arrow/src/datatypes.rs | 2 +- rust/arrow/src/error.rs | 6 + rust/arrow/src/util/integration_util.rs | 104 +++++++++- .../src/bin/arrow-json-integration-test.rs | 182 +++++++++++++++--- 5 files changed, 260 insertions(+), 37 deletions(-) diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 353b818c3d7..8271eb147f5 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1567,7 +1567,8 @@ def _temp_path(): generate_custom_metadata_case() .skip_category('Go') - .skip_category('JS'), + .skip_category('JS') + .skip_category('Rust'), #TODO(ARROW-10259) generate_duplicate_fieldnames_case() .skip_category('Go') diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index 53340883d97..00fb6a7cc53 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -873,7 +873,7 @@ impl ToByteSlice for T { impl DataType { /// Parse a data type from a JSON representation - fn from(json: &Value) -> Result { + pub(crate) fn from(json: &Value) -> Result { let default_field = Field::new("", DataType::Boolean, true); match *json { Value::Object(ref map) => match map.get("name") { diff --git a/rust/arrow/src/error.rs b/rust/arrow/src/error.rs index f428b4f0f99..51b614ae44c 100644 --- a/rust/arrow/src/error.rs +++ b/rust/arrow/src/error.rs @@ -79,6 +79,12 @@ impl From<::std::string::FromUtf8Error> for ArrowError { } } +impl From for ArrowError { + fn from(error: serde_json::Error) -> Self { + ArrowError::JsonError(error.to_string()) + } +} + impl Display for ArrowError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index 22d271618b6..523b7f34422 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -24,6 +24,7 @@ use serde_json::{Number as VNumber, Value}; use crate::array::*; use crate::datatypes::*; +use crate::error::Result; use crate::record_batch::{RecordBatch, RecordBatchReader}; /// A struct that represents an Arrow file with a schema and record batches @@ -31,6 +32,7 @@ use crate::record_batch::{RecordBatch, RecordBatchReader}; pub struct ArrowJson { pub schema: ArrowJsonSchema, pub batches: Vec, + #[serde(skip_serializing_if = "Option::is_none")] pub dictionaries: Option>, } @@ -39,7 +41,52 @@ pub struct ArrowJson { /// Fields are left as JSON `Value` as they vary by `DataType` #[derive(Deserialize, Serialize, Debug)] pub struct ArrowJsonSchema { - pub fields: Vec, + pub fields: Vec, +} + +/// Fields are left as JSON `Value` as they vary by `DataType` +#[derive(Deserialize, Serialize, Debug)] +pub struct ArrowJsonField { + pub name: String, + #[serde(rename = "type")] + pub field_type: Value, + pub nullable: bool, + pub children: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub dictionary: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, +} + +impl From<&Field> for ArrowJsonField { + fn from(field: &Field) -> Self { + Self { + name: field.name().to_string(), + field_type: field.data_type().to_json(), + nullable: field.is_nullable(), + children: vec![], + dictionary: None, // TODO: not enough info + metadata: None, // TODO(ARROW-10259) + } + } +} + +#[derive(Deserialize, Serialize, Debug)] +pub struct ArrowJsonFieldDictionary { + pub id: i64, + #[serde(rename = "indexType")] + pub index_type: DictionaryIndexType, + #[serde(rename = "isOrdered")] + pub is_ordered: bool, +} + +#[derive(Deserialize, Serialize, Debug)] +pub struct DictionaryIndexType { + pub name: String, + #[serde(rename = "isSigned")] + pub is_signed: bool, + #[serde(rename = "bitWidth")] + pub bit_width: i64, } /// A struct that partially reads the Arrow JSON record batch @@ -53,8 +100,8 @@ pub struct ArrowJsonBatch { #[derive(Deserialize, Serialize, Debug)] #[allow(non_snake_case)] pub struct ArrowJsonDictionaryBatch { - id: i64, - data: ArrowJsonBatch, + pub id: i64, + pub data: ArrowJsonBatch, } /// A struct that partially reads the Arrow JSON column/array @@ -97,12 +144,43 @@ impl ArrowJsonSchema { for i in 0..field_len { let json_field = &self.fields[i]; let field = schema.field(i); - assert_eq!(json_field, &field.to_json()); + if !json_field.equals_field(field) { + return false; + } } true } } +impl ArrowJsonField { + /// Compare the Arrow JSON field with the Arrow `Field` + fn equals_field(&self, field: &Field) -> bool { + dbg!((&self, &field)); + // convert to a field + match self.to_arrow_field() { + Ok(self_field) => { + assert_eq!(&self_field, field, "Arrow fields not the same"); + true + } + Err(e) => { + eprintln!( + "Encountered error while converting JSON field to Arrow field: {:?}", + e + ); + false + } + } + } + + /// Convert to an Arrow Field + /// TODO: convert to use an Into + fn to_arrow_field(&self) -> Result { + // a bit regressive, but we have to convert the field to JSON in order to convert it + let field = serde_json::to_value(self)?; + Field::from(&field) + } +} + impl ArrowJsonBatch { /// Compare the Arrow JSON record batch with a `RecordBatch` fn equals_batch(&self, batch: &RecordBatch) -> bool { @@ -218,8 +296,9 @@ impl ArrowJsonBatch { let arr = arr.as_any().downcast_ref::().unwrap(); arr.equals_json(&json_array.iter().collect::>()[..]) } - DataType::Utf8 => { - let arr = arr.as_any().downcast_ref::().unwrap(); + DataType::LargeBinary => { + let arr = + arr.as_any().downcast_ref::().unwrap(); arr.equals_json(&json_array.iter().collect::>()[..]) } DataType::FixedSizeBinary(_) => { @@ -227,10 +306,23 @@ impl ArrowJsonBatch { arr.as_any().downcast_ref::().unwrap(); arr.equals_json(&json_array.iter().collect::>()[..]) } + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + arr.equals_json(&json_array.iter().collect::>()[..]) + } + DataType::LargeUtf8 => { + let arr = + arr.as_any().downcast_ref::().unwrap(); + arr.equals_json(&json_array.iter().collect::>()[..]) + } DataType::List(_) => { let arr = arr.as_any().downcast_ref::().unwrap(); arr.equals_json(&json_array.iter().collect::>()[..]) } + DataType::LargeList(_) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + arr.equals_json(&json_array.iter().collect::>()[..]) + } DataType::FixedSizeList(_, _) => { let arr = arr.as_any().downcast_ref::().unwrap(); diff --git a/rust/integration-testing/src/bin/arrow-json-integration-test.rs b/rust/integration-testing/src/bin/arrow-json-integration-test.rs index cb8143ea717..84fb63c9bfe 100644 --- a/rust/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/rust/integration-testing/src/bin/arrow-json-integration-test.rs @@ -34,10 +34,7 @@ use arrow::{ buffer::Buffer, buffer::MutableBuffer, datatypes::ToByteSlice, - util::{ - bit_util, - integration_util::{ArrowJson, ArrowJsonBatch, ArrowJsonColumn, ArrowJsonSchema}, - }, + util::{bit_util, integration_util::*}, }; fn main() -> Result<()> { @@ -87,12 +84,18 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()> eprintln!("Converting {} to {}", json_name, arrow_name); } - let (schema, batches) = read_json_file(json_name)?; + let json_file = read_json_file(json_name)?; let arrow_file = File::create(arrow_name)?; - let mut writer = FileWriter::try_new(arrow_file, &schema)?; + let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?; - for b in batches { + if !json_file.dictionaries.is_empty() { + return Err(ArrowError::JsonError( + "Writing dictionaries not yet supported".to_string(), + )); + } + + for b in json_file.batches { writer.write(&b)?; } @@ -102,18 +105,23 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()> fn record_batch_from_json( schema: &Schema, json_batch: ArrowJsonBatch, + json_dictionaries: &[ArrowJsonDictionaryBatch], ) -> Result { let mut columns = vec![]; for (field, json_col) in schema.fields().iter().zip(json_batch.columns) { - let col = array_from_json(field, json_col)?; + let col = array_from_json(field, json_col, json_dictionaries)?; columns.push(col); } RecordBatch::try_new(Arc::new(schema.clone()), columns) } -fn array_from_json(field: &Field, json_col: ArrowJsonColumn) -> Result { +fn array_from_json( + field: &Field, + json_col: ArrowJsonColumn, + dictionaries: &[ArrowJsonDictionaryBatch], +) -> Result { match field.data_type() { DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))), DataType::Boolean => { @@ -142,7 +150,12 @@ fn array_from_json(field: &Field, json_col: ArrowJsonColumn) -> Result .zip(json_col.data.unwrap()) { match is_valid { - 1 => b.append_value(value.as_i64().unwrap() as i8), + 1 => b.append_value(value.as_i64().ok_or_else(|| { + ArrowError::JsonError(format!( + "Unable to get {:?} as int64", + value + )) + })? as i8), _ => b.append_null(), }?; } @@ -404,10 +417,14 @@ fn array_from_json(field: &Field, json_col: ArrowJsonColumn) -> Result Ok(Arc::new(b.finish())) } DataType::List(child_type) => { - let child_field = Field::new("", *child_type.clone(), false); + let child_field = + Field::new("item", *child_type.clone(), field.is_nullable()); let children = json_col.children.clone().unwrap(); - let child_array = - array_from_json(&child_field, children.get(0).unwrap().clone())?; + let child_array = array_from_json( + &child_field, + children.get(0).unwrap().clone(), + dictionaries, + )?; let offsets: Vec = json_col .offset .unwrap() @@ -438,10 +455,14 @@ fn array_from_json(field: &Field, json_col: ArrowJsonColumn) -> Result Ok(Arc::new(ListArray::from(list_data))) } DataType::LargeList(child_type) => { - let child_field = Field::new("", *child_type.clone(), false); + let child_field = + Field::new("item", *child_type.clone(), field.is_nullable()); let children = json_col.children.clone().unwrap(); - let child_array = - array_from_json(&child_field, children.get(0).unwrap().clone())?; + let child_array = array_from_json( + &child_field, + children.get(0).unwrap().clone(), + dictionaries, + )?; let offsets: Vec = json_col .offset .unwrap() @@ -476,10 +497,13 @@ fn array_from_json(field: &Field, json_col: ArrowJsonColumn) -> Result Ok(Arc::new(LargeListArray::from(list_data))) } DataType::FixedSizeList(child_type, _) => { - let child_field = Field::new("", *child_type.clone(), false); + let child_field = Field::new("item", *child_type.clone(), true); // field.is_nullable() let children = json_col.children.clone().unwrap(); - let child_array = - array_from_json(&child_field, children.get(0).unwrap().clone())?; + let child_array = array_from_json( + &child_field, + children.get(0).unwrap().clone(), + dictionaries, + )?; let num_bytes = bit_util::ceil(json_col.count, 8); let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); @@ -504,12 +528,25 @@ fn array_from_json(field: &Field, json_col: ArrowJsonColumn) -> Result DataType::Struct(fields) => { let mut children = Vec::with_capacity(fields.len()); for (field, col) in fields.iter().zip(json_col.children.unwrap()) { - let array = array_from_json(field, col)?; + let array = array_from_json(field, col, dictionaries)?; children.push((field.clone(), array)); } let array = StructArray::from(children); Ok(Arc::new(array)) } + DataType::Dictionary(key_type, value_type) => { + // find dictionary + let dictionary = dictionaries.iter().find(|d| d.id == field.dict_id()); + match dictionary { + Some(dictionary) => dictionary_array_from_json( + field, json_col, key_type, value_type, dictionary, + ), + None => Err(ArrowError::JsonError(format!( + "Unable to find dictionary for field {:?}", + field + ))), + } + } t => Err(ArrowError::JsonError(format!( "data type {:?} not supported", t @@ -517,6 +554,66 @@ fn array_from_json(field: &Field, json_col: ArrowJsonColumn) -> Result } } +fn dictionary_array_from_json( + field: &Field, + json_col: ArrowJsonColumn, + dict_key: &DataType, + dict_value: &DataType, + dictionary: &ArrowJsonDictionaryBatch, +) -> Result { + match dict_key { + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 => { + // build the key data into a buffer, then construct values separately + let key_field = Field::new_dict( + "key", + dict_key.clone(), + field.is_nullable(), + field.dict_id(), + field.dict_is_ordered(), + ); + let keys = array_from_json(&key_field, json_col, &[])?; + // note: not enough info on nullability of dictionary + let value_field = Field::new("value", dict_value.clone(), true); + println!("dictionary value type: {:?}", dict_value); + let values = + array_from_json(&value_field, dictionary.data.columns[0].clone(), &[])?; + + // convert key and value to dictionary data + let dict_data = ArrayData::builder(field.data_type().clone()) + .len(keys.len()) + .add_buffer(keys.data().buffers()[0].clone()) + .add_child_data(values.data()) + .build(); + + let array = match dict_key { + DataType::Int8 => { + Arc::new(Int8DictionaryArray::from(dict_data)) as ArrayRef + } + DataType::Int16 => Arc::new(Int16DictionaryArray::from(dict_data)), + DataType::Int32 => Arc::new(Int32DictionaryArray::from(dict_data)), + DataType::Int64 => Arc::new(Int64DictionaryArray::from(dict_data)), + DataType::UInt8 => Arc::new(UInt8DictionaryArray::from(dict_data)), + DataType::UInt16 => Arc::new(UInt16DictionaryArray::from(dict_data)), + DataType::UInt32 => Arc::new(UInt32DictionaryArray::from(dict_data)), + DataType::UInt64 => Arc::new(UInt64DictionaryArray::from(dict_data)), + _ => unreachable!(), + }; + Ok(array) + } + _ => Err(ArrowError::JsonError(format!( + "Dictionary key type {:?} not supported", + dict_key + ))), + } +} + fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { if verbose { eprintln!("Converting {} to {}", arrow_name, json_name); @@ -525,9 +622,9 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> let arrow_file = File::open(arrow_name)?; let reader = FileReader::try_new(arrow_file)?; - let mut fields = vec![]; + let mut fields: Vec = vec![]; for f in reader.schema().fields() { - fields.push(f.to_json()); + fields.push(ArrowJsonField::from(f)); } let schema = ArrowJsonSchema { fields }; @@ -553,7 +650,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { } // open JSON file - let (json_schema, json_batches) = read_json_file(json_name)?; + let json_file = read_json_file(json_name)?; // open Arrow file let arrow_file = File::open(arrow_name)?; @@ -561,13 +658,15 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { let arrow_schema = arrow_reader.schema().as_ref().to_owned(); // compare schemas - if json_schema != arrow_schema { + if json_file.schema != arrow_schema { return Err(ArrowError::ComputeError(format!( "Schemas do not match. JSON: {:?}. Arrow: {:?}", - json_schema, arrow_schema + json_file.schema, arrow_schema ))); } + let json_batches = &json_file.batches; + // compare number of batches assert!( json_batches.len() == arrow_reader.num_batches(), @@ -581,7 +680,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { ); } - for json_batch in &json_batches { + for json_batch in json_batches { if let Some(Ok(arrow_batch)) = arrow_reader.next() { // compare batches assert!(arrow_batch.num_columns() == json_batch.num_columns()); @@ -608,16 +707,41 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { Ok(()) } -fn read_json_file(json_name: &str) -> Result<(Schema, Vec)> { +struct ArrowFile { + schema: Schema, + // we can evolve this into a concrete Arrow type + dictionaries: Vec, + batches: Vec, +} + +fn read_json_file(json_name: &str) -> Result { let json_file = File::open(json_name)?; let reader = BufReader::new(json_file); let arrow_json: Value = serde_json::from_reader(reader).unwrap(); let schema = Schema::from(&arrow_json["schema"])?; + // read dictionaries + let mut dictionaries = vec![]; + if let Some(dicts) = arrow_json.get("dictionaries") { + for d in dicts + .as_array() + .expect("Unable to get dictionaries as array") + { + let json_dict: ArrowJsonDictionaryBatch = serde_json::from_value(d.clone()) + .expect("Unable to get dictionary from JSON"); + // TODO: convert to a concrete Arrow type + dictionaries.push(json_dict); + } + } + let mut batches = vec![]; for b in arrow_json["batches"].as_array().unwrap() { let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap(); - let batch = record_batch_from_json(&schema, json_batch)?; + let batch = record_batch_from_json(&schema, json_batch, dictionaries.as_slice())?; batches.push(batch); } - Ok((schema, batches)) + Ok(ArrowFile { + schema, + dictionaries, + batches, + }) } From ffdd8aa56f179389cb90228cf3bca9e64f882956 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 10 Oct 2020 14:24:08 +0200 Subject: [PATCH 05/18] Add integration script helper The intention is not to commit this, but to share with other users who might be interested in helping out with the integration testing. I found the docker-compose process too slow, and it was returning errors on Rust :( --- .gitignore | 3 +++ integration-testing.sh | 25 +++++++++++++++++++++++++ 2 files changed, 28 insertions(+) create mode 100755 integration-testing.sh diff --git a/.gitignore b/.gitignore index 6f123362ef1..3f4c07218f6 100644 --- a/.gitignore +++ b/.gitignore @@ -80,3 +80,6 @@ cpp/Brewfile.lock.json # docker volumes used for caching .docker + +# local integration testing logs +integration-testing.log \ No newline at end of file diff --git a/integration-testing.sh b/integration-testing.sh new file mode 100755 index 00000000000..e18de14a734 --- /dev/null +++ b/integration-testing.sh @@ -0,0 +1,25 @@ +# prep work +export ARROW_HOME=~/arrow2/arrow +export ARROW_VERSION=2.0.0-SNAPSHOT +export ARROW_JAVA_INTEGRATION_JAR=$ARROW_HOME/java/tools/target/arrow-tools-$ARROW_VERSION-jar-with-dependencies.jar + +# build CPP testing binary +cd ./cpp +mkdir -p build +cd build +cmake -DCMAKE_BUILD_TYPE=Debug -DARROW_BUILD_INTEGRATION=ON -DARROW_BUILD_TESTS=ON .. +make +cd ../../ + +# build Java testing binary +cd ./java +mvn install -DskipTests +cd ../ + +# build Rust testing binary +cd ./rust && cargo build +cd ../ + +# run tests +rm integration-testing.log +archery integration --with-cpp=1 --with-rust=1 --with-java=0 --debug >> integration-testing.log \ No newline at end of file From 41fabed409bdf34e351ba3db1e0ca6943b8583f4 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 10 Oct 2020 14:24:49 +0200 Subject: [PATCH 06/18] don't serialize metadata to JSON if empty This is to be more in line with integration testing behaviour --- rust/arrow/src/datatypes.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index 00fb6a7cc53..d6dd7a544cd 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -1489,7 +1489,7 @@ impl fmt::Display for Field { pub struct Schema { pub(crate) fields: Vec, /// A map of key-value pairs containing additional meta data. - #[serde(default)] + #[serde(skip_serializing_if = "HashMap::is_empty")] pub(crate) metadata: HashMap, } From 3ab2109c23a251a6fe7a6f2b53068ba7635530f8 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 10 Oct 2020 14:28:46 +0200 Subject: [PATCH 07/18] Temporarily compare buffer data lengths, not capacity After slicing buffers, we can end up with 2 buffers having the same len and data, but not capacity. Such buffers fail the equality test. CC @sunchao jorgecarleitao as you've worked on this. Perhaps we can find a suitable definition of equality, as we're otherwise failing integration tests because of this. --- rust/arrow/src/buffer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 5b51a2f5203..d70cde15b94 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -70,7 +70,7 @@ struct BufferData { impl PartialEq for BufferData { fn eq(&self, other: &BufferData) -> bool { - if self.capacity != other.capacity { + if self.len != other.len { return false; } From fec4a17014fa1ab086e4b251ccc0ea01e6e807ef Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 10 Oct 2020 14:30:12 +0200 Subject: [PATCH 08/18] Add logic to compare array data We were only doing basic comparisons of array properties, such as whether data types and lengths are the same. This extends this to test the array data. fix lint --- dev/archery/archery/integration/datagen.py | 2 +- .../src/bin/arrow-json-integration-test.rs | 104 +++++++++--------- 2 files changed, 51 insertions(+), 55 deletions(-) diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 8271eb147f5..37fe43d9e40 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1568,7 +1568,7 @@ def _temp_path(): generate_custom_metadata_case() .skip_category('Go') .skip_category('JS') - .skip_category('Rust'), #TODO(ARROW-10259) + .skip_category('Rust'), # TODO(ARROW-10259) generate_duplicate_fieldnames_case() .skip_category('Go') diff --git a/rust/integration-testing/src/bin/arrow-json-integration-test.rs b/rust/integration-testing/src/bin/arrow-json-integration-test.rs index 84fb63c9bfe..9f97e6818e7 100644 --- a/rust/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/rust/integration-testing/src/bin/arrow-json-integration-test.rs @@ -117,6 +117,7 @@ fn record_batch_from_json( RecordBatch::try_new(Arc::new(schema.clone()), columns) } +/// Construct an Arrow array from a partially typed JSON column fn array_from_json( field: &Field, json_col: ArrowJsonColumn, @@ -419,6 +420,7 @@ fn array_from_json( DataType::List(child_type) => { let child_field = Field::new("item", *child_type.clone(), field.is_nullable()); + let null_buf = create_null_buf(&json_col); let children = json_col.children.clone().unwrap(); let child_array = array_from_json( &child_field, @@ -431,32 +433,19 @@ fn array_from_json( .iter() .map(|v| v.as_i64().unwrap() as i32) .collect(); - let num_bytes = bit_util::ceil(json_col.count, 8); - let mut null_buf = - MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); - json_col - .validity - .unwrap() - .iter() - .enumerate() - .for_each(|(i, v)| { - let null_slice = null_buf.data_mut(); - if *v != 0 { - bit_util::set_bit(null_slice, i); - } - }); let list_data = ArrayData::builder(field.data_type().clone()) .len(json_col.count) .offset(0) .add_buffer(Buffer::from(&offsets.to_byte_slice())) .add_child_data(child_array.data()) - .null_bit_buffer(null_buf.freeze()) + .null_bit_buffer(null_buf) .build(); Ok(Arc::new(ListArray::from(list_data))) } DataType::LargeList(child_type) => { let child_field = Field::new("item", *child_type.clone(), field.is_nullable()); + let null_buf = create_null_buf(&json_col); let children = json_col.children.clone().unwrap(); let child_array = array_from_json( &child_field, @@ -473,26 +462,12 @@ fn array_from_json( _ => panic!("64-bit offset must be either string or number"), }) .collect(); - let num_bytes = bit_util::ceil(json_col.count, 8); - let mut null_buf = - MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); - json_col - .validity - .unwrap() - .iter() - .enumerate() - .for_each(|(i, v)| { - let null_slice = null_buf.data_mut(); - if *v != 0 { - bit_util::set_bit(null_slice, i); - } - }); let list_data = ArrayData::builder(field.data_type().clone()) .len(json_col.count) .offset(0) .add_buffer(Buffer::from(&offsets.to_byte_slice())) .add_child_data(child_array.data()) - .null_bit_buffer(null_buf.freeze()) + .null_bit_buffer(null_buf) .build(); Ok(Arc::new(LargeListArray::from(list_data))) } @@ -504,34 +479,27 @@ fn array_from_json( children.get(0).unwrap().clone(), dictionaries, )?; - let num_bytes = bit_util::ceil(json_col.count, 8); - let mut null_buf = - MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); - json_col - .validity - .unwrap() - .iter() - .enumerate() - .for_each(|(i, v)| { - let null_slice = null_buf.data_mut(); - if *v != 0 { - bit_util::set_bit(null_slice, i); - } - }); + let null_buf = create_null_buf(&json_col); let list_data = ArrayData::builder(field.data_type().clone()) .len(json_col.count) .add_child_data(child_array.data()) - .null_bit_buffer(null_buf.freeze()) + .null_bit_buffer(null_buf) .build(); Ok(Arc::new(FixedSizeListArray::from(list_data))) } DataType::Struct(fields) => { - let mut children = Vec::with_capacity(fields.len()); + // construct struct with null data + let null_buf = create_null_buf(&json_col); + let mut array_data = ArrayData::builder(field.data_type().clone()) + .len(json_col.count) + .null_bit_buffer(null_buf); + for (field, col) in fields.iter().zip(json_col.children.unwrap()) { let array = array_from_json(field, col, dictionaries)?; - children.push((field.clone(), array)); + array_data = array_data.add_child_data(array.data()); } - let array = StructArray::from(children); + + let array = StructArray::from(array_data.build()); Ok(Arc::new(array)) } DataType::Dictionary(key_type, value_type) => { @@ -614,6 +582,25 @@ fn dictionary_array_from_json( } } +/// A helper to create a null buffer from a Vec +fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer { + let num_bytes = bit_util::ceil(json_col.count, 8); + let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + json_col + .validity + .clone() + .unwrap() + .iter() + .enumerate() + .for_each(|(i, v)| { + let null_slice = null_buf.data_mut(); + if *v != 0 { + bit_util::set_bit(null_slice, i); + } + }); + null_buf.freeze() +} + fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { if verbose { eprintln!("Converting {} to {}", arrow_name, json_name); @@ -683,14 +670,23 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { for json_batch in json_batches { if let Some(Ok(arrow_batch)) = arrow_reader.next() { // compare batches - assert!(arrow_batch.num_columns() == json_batch.num_columns()); + let num_columns = arrow_batch.num_columns(); + assert!(num_columns == json_batch.num_columns()); assert!(arrow_batch.num_rows() == json_batch.num_rows()); - // TODO compare in more detail - eprintln!( - "Basic validation of {} and {} PASSES", - arrow_name, json_name - ); + for i in 0..num_columns { + // println!( + // "Comparing arrays with types: {:?} and {:?} and length {:?}", + // arrow_batch.column(i).data_type(), + // json_batch.column(i).data_type(), + // arrow_batch.num_rows() + // ); + assert_eq!( + arrow_batch.column(i).data(), + json_batch.column(i).data(), + "Arrow and JSON batch columns not the same" + ); + } } else { return Err(ArrowError::ComputeError( "no more arrow batches left".to_owned(), From f0bbcfb5f48430d9a8527d78c813883ba4074738 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Tue, 27 Oct 2020 07:07:09 +0200 Subject: [PATCH 09/18] disable failing tests, document IPC from tests remove my integration testing helper rebase, fix failing test --- .gitignore | 3 --- dev/archery/archery/integration/datagen.py | 9 +++++--- docs/source/status.rst | 26 +++++++++++----------- integration-testing.sh | 25 --------------------- rust/arrow/src/compute/kernels/cast.rs | 1 + 5 files changed, 20 insertions(+), 44 deletions(-) delete mode 100755 integration-testing.sh diff --git a/.gitignore b/.gitignore index 3f4c07218f6..6f123362ef1 100644 --- a/.gitignore +++ b/.gitignore @@ -80,6 +80,3 @@ cpp/Brewfile.lock.json # docker volumes used for caching .docker - -# local integration testing logs -integration-testing.log \ No newline at end of file diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 37fe43d9e40..73f632692f0 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1551,14 +1551,17 @@ def _temp_path(): .skip_category('JS') # TODO(ARROW-8716) .skip_category('Rust'), - generate_nested_case(), + generate_nested_case() + .skip_category('Rust'), # TODO(ARROW-10261) generate_recursive_nested_case() - .skip_category('Go'), # TODO(ARROW-8453) + .skip_category('Go') # TODO(ARROW-8453) + .skip_category('Rust'), # TODO(ARROW-10261) generate_nested_large_offsets_case() .skip_category('Go') - .skip_category('JS'), + .skip_category('JS') + .skip_category('Rust'), # TODO(ARROW-10261) generate_unions_case() .skip_category('Go') diff --git a/docs/source/status.rst b/docs/source/status.rst index d43c3bc80ca..da951bda51a 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -30,39 +30,39 @@ Data Types | Data type | C++ | Java | Go | JavaScript | C# | Rust | | (primitive) | | | | | | | +===================+=======+=======+=======+============+=======+=======+ -| Null | ✓ | ✓ | | | | | +| Null | ✓ | ✓ | | | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Boolean | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Boolean | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Int8/16/32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Int8/16/32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| UInt8/16/32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | | +| UInt8/16/32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ | Float16 | | | ✓ | | | | +-------------------+-------+-------+-------+------------+-------+-------+ -| Float32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Float32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ | Decimal128 | ✓ | ✓ | | | | | +-------------------+-------+-------+-------+------------+-------+-------+ -| Date32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Date32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Time32/64 | ✓ | ✓ | ✓ | ✓ | | | +| Time32/64 | ✓ | ✓ | ✓ | ✓ | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Timestamp | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Timestamp | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ | Duration | ✓ | ✓ | ✓ | | | | +-------------------+-------+-------+-------+------------+-------+-------+ | Interval | ✓ | ✓ | ✓ | | | | +-------------------+-------+-------+-------+------------+-------+-------+ -| Fixed Size Binary | ✓ | ✓ | ✓ | ✓ | | | +| Fixed Size Binary | ✓ | ✓ | ✓ | ✓ | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Binary | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Binary | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Large Binary | ✓ | ✓ | ✓ | ✓ | | | +| Large Binary | ✓ | ✓ | ✓ | ✓ | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Utf8 | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Utf8 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Large Utf8 | ✓ | ✓ | ✓ | ✓ | | | +| Large Utf8 | ✓ | ✓ | ✓ | ✓ | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ +-------------------+-------+-------+-------+------------+-------+-------+ diff --git a/integration-testing.sh b/integration-testing.sh deleted file mode 100755 index e18de14a734..00000000000 --- a/integration-testing.sh +++ /dev/null @@ -1,25 +0,0 @@ -# prep work -export ARROW_HOME=~/arrow2/arrow -export ARROW_VERSION=2.0.0-SNAPSHOT -export ARROW_JAVA_INTEGRATION_JAR=$ARROW_HOME/java/tools/target/arrow-tools-$ARROW_VERSION-jar-with-dependencies.jar - -# build CPP testing binary -cd ./cpp -mkdir -p build -cd build -cmake -DCMAKE_BUILD_TYPE=Debug -DARROW_BUILD_INTEGRATION=ON -DARROW_BUILD_TESTS=ON .. -make -cd ../../ - -# build Java testing binary -cd ./java -mvn install -DskipTests -cd ../ - -# build Rust testing binary -cd ./rust && cargo build -cd ../ - -# run tests -rm integration-testing.log -archery integration --with-cpp=1 --with-rust=1 --with-java=0 --debug >> integration-testing.log \ No newline at end of file diff --git a/rust/arrow/src/compute/kernels/cast.rs b/rust/arrow/src/compute/kernels/cast.rs index b40577fc839..f054542b079 100644 --- a/rust/arrow/src/compute/kernels/cast.rs +++ b/rust/arrow/src/compute/kernels/cast.rs @@ -202,6 +202,7 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool { (Timestamp(_, _), Date32(_)) => true, (Timestamp(_, _), Date64(_)) => true, // date64 to timestamp might not make sense, + (Int64, Duration(_)) => true, (Null, Int32) => true, (_, _) => false, } From 7f3fc879b0b19704f747ff8416b8771d09ffc9e6 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 6 Nov 2020 15:35:01 -0500 Subject: [PATCH 10/18] Fix Docker linter problem (cherry picked from commit 03b8aa4e3ce3824a3ee26548e1943464ee9d2476) --- dev/archery/archery/integration/datagen.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 73f632692f0..abfd18e0bff 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1561,7 +1561,7 @@ def _temp_path(): generate_nested_large_offsets_case() .skip_category('Go') .skip_category('JS') - .skip_category('Rust'), # TODO(ARROW-10261) + .skip_category('Rust'), # TODO(ARROW-10261) generate_unions_case() .skip_category('Go') From 904113c3913bfc314eb31be4d7e0139c42412078 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 6 Nov 2020 16:20:52 -0500 Subject: [PATCH 11/18] Keep track of Dictionaries in an optional HashMap (cherry picked from commit 448172deb4b23d64f710e6933ae9922f8d286735) --- .../src/bin/arrow-json-integration-test.rs | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/rust/integration-testing/src/bin/arrow-json-integration-test.rs b/rust/integration-testing/src/bin/arrow-json-integration-test.rs index 9f97e6818e7..8a214fc4555 100644 --- a/rust/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/rust/integration-testing/src/bin/arrow-json-integration-test.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::env; use std::fs::File; use std::io::BufReader; @@ -105,7 +106,7 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()> fn record_batch_from_json( schema: &Schema, json_batch: ArrowJsonBatch, - json_dictionaries: &[ArrowJsonDictionaryBatch], + json_dictionaries: Option<&HashMap>, ) -> Result { let mut columns = vec![]; @@ -121,7 +122,7 @@ fn record_batch_from_json( fn array_from_json( field: &Field, json_col: ArrowJsonColumn, - dictionaries: &[ArrowJsonDictionaryBatch], + dictionaries: Option<&HashMap>, ) -> Result { match field.data_type() { DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))), @@ -504,7 +505,14 @@ fn array_from_json( } DataType::Dictionary(key_type, value_type) => { // find dictionary - let dictionary = dictionaries.iter().find(|d| d.id == field.dict_id()); + let dictionary = dictionaries + .ok_or_else(|| { + ArrowError::JsonError(format!( + "Unable to find any dictionaries for field {:?}", + field + )) + })? + .get(&field.dict_id()); match dictionary { Some(dictionary) => dictionary_array_from_json( field, json_col, key_type, value_type, dictionary, @@ -546,12 +554,12 @@ fn dictionary_array_from_json( field.dict_id(), field.dict_is_ordered(), ); - let keys = array_from_json(&key_field, json_col, &[])?; + let keys = array_from_json(&key_field, json_col, None)?; // note: not enough info on nullability of dictionary let value_field = Field::new("value", dict_value.clone(), true); println!("dictionary value type: {:?}", dict_value); let values = - array_from_json(&value_field, dictionary.data.columns[0].clone(), &[])?; + array_from_json(&value_field, dictionary.data.columns[0].clone(), None)?; // convert key and value to dictionary data let dict_data = ArrayData::builder(field.data_type().clone()) @@ -706,7 +714,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { struct ArrowFile { schema: Schema, // we can evolve this into a concrete Arrow type - dictionaries: Vec, + dictionaries: HashMap, batches: Vec, } @@ -716,7 +724,7 @@ fn read_json_file(json_name: &str) -> Result { let arrow_json: Value = serde_json::from_reader(reader).unwrap(); let schema = Schema::from(&arrow_json["schema"])?; // read dictionaries - let mut dictionaries = vec![]; + let mut dictionaries = HashMap::new(); if let Some(dicts) = arrow_json.get("dictionaries") { for d in dicts .as_array() @@ -725,14 +733,14 @@ fn read_json_file(json_name: &str) -> Result { let json_dict: ArrowJsonDictionaryBatch = serde_json::from_value(d.clone()) .expect("Unable to get dictionary from JSON"); // TODO: convert to a concrete Arrow type - dictionaries.push(json_dict); + dictionaries.insert(json_dict.id, json_dict); } } let mut batches = vec![]; for b in arrow_json["batches"].as_array().unwrap() { let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap(); - let batch = record_batch_from_json(&schema, json_batch, dictionaries.as_slice())?; + let batch = record_batch_from_json(&schema, json_batch, Some(&dictionaries))?; batches.push(batch); } Ok(ArrowFile { From 9fd9cbcf7c05c5150d375b22c7125a112752b129 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 6 Nov 2020 16:29:36 -0500 Subject: [PATCH 12/18] Only return dict_id/dict_is_ordered values for Dictionary types (cherry picked from commit 2eaae67081e9bdbad7b8c0c201c0d1d456f6c63d) --- .../src/bin/arrow-json-integration-test.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/rust/integration-testing/src/bin/arrow-json-integration-test.rs b/rust/integration-testing/src/bin/arrow-json-integration-test.rs index 8a214fc4555..f1df6fc4e90 100644 --- a/rust/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/rust/integration-testing/src/bin/arrow-json-integration-test.rs @@ -504,6 +504,12 @@ fn array_from_json( Ok(Arc::new(array)) } DataType::Dictionary(key_type, value_type) => { + let dict_id = field.dict_id().ok_or_else(|| { + ArrowError::JsonError(format!( + "Unable to find dict_id for field {:?}", + field + )) + })?; // find dictionary let dictionary = dictionaries .ok_or_else(|| { @@ -512,7 +518,7 @@ fn array_from_json( field )) })? - .get(&field.dict_id()); + .get(&dict_id); match dictionary { Some(dictionary) => dictionary_array_from_json( field, json_col, key_type, value_type, dictionary, @@ -551,8 +557,12 @@ fn dictionary_array_from_json( "key", dict_key.clone(), field.is_nullable(), - field.dict_id(), - field.dict_is_ordered(), + field + .dict_id() + .expect("Dictionary fields must have a dict_id value"), + field + .dict_is_ordered() + .expect("Dictionary fields must have a dict_is_ordered value"), ); let keys = array_from_json(&key_field, json_col, None)?; // note: not enough info on nullability of dictionary From b565295842a9ce451cf07d5da27afcc9fa463596 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 6 Nov 2020 16:36:23 -0500 Subject: [PATCH 13/18] Remove dbg and commented-out println (cherry picked from commit 0122cd53bf6f14facede4df00c5bd7595f2f2975) --- rust/arrow/src/util/integration_util.rs | 1 - .../src/bin/arrow-json-integration-test.rs | 6 ------ 2 files changed, 7 deletions(-) diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index 523b7f34422..94d0a9b75a0 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -155,7 +155,6 @@ impl ArrowJsonSchema { impl ArrowJsonField { /// Compare the Arrow JSON field with the Arrow `Field` fn equals_field(&self, field: &Field) -> bool { - dbg!((&self, &field)); // convert to a field match self.to_arrow_field() { Ok(self_field) => { diff --git a/rust/integration-testing/src/bin/arrow-json-integration-test.rs b/rust/integration-testing/src/bin/arrow-json-integration-test.rs index f1df6fc4e90..e801c040537 100644 --- a/rust/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/rust/integration-testing/src/bin/arrow-json-integration-test.rs @@ -693,12 +693,6 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { assert!(arrow_batch.num_rows() == json_batch.num_rows()); for i in 0..num_columns { - // println!( - // "Comparing arrays with types: {:?} and {:?} and length {:?}", - // arrow_batch.column(i).data_type(), - // json_batch.column(i).data_type(), - // arrow_batch.num_rows() - // ); assert_eq!( arrow_batch.column(i).data(), json_batch.column(i).data(), From ec2a1000bf3c5abaaf0f7ec137e3562b5f4f4f70 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 7 Nov 2020 14:41:27 +0200 Subject: [PATCH 14/18] fix issues from rebase --- .../src/bin/arrow-json-integration-test.rs | 26 +++++-------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/rust/integration-testing/src/bin/arrow-json-integration-test.rs b/rust/integration-testing/src/bin/arrow-json-integration-test.rs index e801c040537..d4afd13528d 100644 --- a/rust/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/rust/integration-testing/src/bin/arrow-json-integration-test.rs @@ -418,9 +418,7 @@ fn array_from_json( } Ok(Arc::new(b.finish())) } - DataType::List(child_type) => { - let child_field = - Field::new("item", *child_type.clone(), field.is_nullable()); + DataType::List(child_field) => { let null_buf = create_null_buf(&json_col); let children = json_col.children.clone().unwrap(); let child_array = array_from_json( @@ -443,9 +441,7 @@ fn array_from_json( .build(); Ok(Arc::new(ListArray::from(list_data))) } - DataType::LargeList(child_type) => { - let child_field = - Field::new("item", *child_type.clone(), field.is_nullable()); + DataType::LargeList(child_field) => { let null_buf = create_null_buf(&json_col); let children = json_col.children.clone().unwrap(); let child_array = array_from_json( @@ -472,8 +468,7 @@ fn array_from_json( .build(); Ok(Arc::new(LargeListArray::from(list_data))) } - DataType::FixedSizeList(child_type, _) => { - let child_field = Field::new("item", *child_type.clone(), true); // field.is_nullable() + DataType::FixedSizeList(child_field, _) => { let children = json_col.children.clone().unwrap(); let child_array = array_from_json( &child_field, @@ -504,12 +499,7 @@ fn array_from_json( Ok(Arc::new(array)) } DataType::Dictionary(key_type, value_type) => { - let dict_id = field.dict_id().ok_or_else(|| { - ArrowError::JsonError(format!( - "Unable to find dict_id for field {:?}", - field - )) - })?; + let dict_id = field.dict_id(); // find dictionary let dictionary = dictionaries .ok_or_else(|| { @@ -557,12 +547,8 @@ fn dictionary_array_from_json( "key", dict_key.clone(), field.is_nullable(), - field - .dict_id() - .expect("Dictionary fields must have a dict_id value"), - field - .dict_is_ordered() - .expect("Dictionary fields must have a dict_is_ordered value"), + field.dict_id(), + field.dict_is_ordered(), ); let keys = array_from_json(&key_field, json_col, None)?; // note: not enough info on nullability of dictionary From dc2b01a97f1539f85b948393322c189e3b0e8b82 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 7 Nov 2020 15:58:07 +0200 Subject: [PATCH 15/18] C++ tests that now pass I suspect the failures are due to array equality. I'll have a look into them separately --- dev/archery/archery/integration/datagen.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index abfd18e0bff..bd849c7777c 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1551,17 +1551,14 @@ def _temp_path(): .skip_category('JS') # TODO(ARROW-8716) .skip_category('Rust'), - generate_nested_case() - .skip_category('Rust'), # TODO(ARROW-10261) + generate_nested_case(), generate_recursive_nested_case() - .skip_category('Go') # TODO(ARROW-8453) - .skip_category('Rust'), # TODO(ARROW-10261) + .skip_category('Go'), # TODO(ARROW-8453) generate_nested_large_offsets_case() .skip_category('Go') - .skip_category('JS') - .skip_category('Rust'), # TODO(ARROW-10261) + .skip_category('JS'), generate_unions_case() .skip_category('Go') From dde96c6d2628c7c0eedc4f7c15061e746b86fb36 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Tue, 10 Nov 2020 08:05:28 +0200 Subject: [PATCH 16/18] [EXPERIMENT] Test impact of changing logical eq --- rust/arrow/src/array/array_struct.rs | 2 +- rust/arrow/src/array/data.rs | 60 ++++------------------------ 2 files changed, 8 insertions(+), 54 deletions(-) diff --git a/rust/arrow/src/array/array_struct.rs b/rust/arrow/src/array/array_struct.rs index 7f190b83008..3715a8b1501 100644 --- a/rust/arrow/src/array/array_struct.rs +++ b/rust/arrow/src/array/array_struct.rs @@ -282,7 +282,7 @@ mod tests { .build(); let int_data = ArrayData::builder(DataType::Int64) .len(4) - .add_buffer(Buffer::from([42, 28, 19, 31].to_byte_slice())) + .add_buffer(Buffer::from([42i64, 28, 19, 31].to_byte_slice())) .build(); let mut field_types = vec![]; field_types.push(Field::new("a", DataType::Boolean, false)); diff --git a/rust/arrow/src/array/data.rs b/rust/arrow/src/array/data.rs index e328c01ee7a..839c20335fe 100644 --- a/rust/arrow/src/array/data.rs +++ b/rust/arrow/src/array/data.rs @@ -26,6 +26,8 @@ use crate::datatypes::DataType; use crate::util::bit_util; use crate::{bitmap::Bitmap, datatypes::ArrowNativeType}; +use super::equal::equal; + #[inline] fn count_nulls(null_bit_buffer: Option<&Buffer>, offset: usize, len: usize) -> usize { if let Some(ref buf) = null_bit_buffer { @@ -245,59 +247,10 @@ impl ArrayData { impl PartialEq for ArrayData { fn eq(&self, other: &Self) -> bool { - assert_eq!( - self.data_type(), - other.data_type(), - "Data types not the same" - ); - assert_eq!(self.len(), other.len(), "Lengths not the same"); - // TODO: when adding tests for this, test that we can compare with arrays that have offsets - assert_eq!(self.offset(), other.offset(), "Offsets not the same"); - assert_eq!(self.null_count(), other.null_count()); - // compare buffers excluding padding - let self_buffers = self.buffers(); - let other_buffers = other.buffers(); - assert_eq!(self_buffers.len(), other_buffers.len()); - self_buffers.iter().zip(other_buffers).for_each(|(s, o)| { - compare_buffer_regions( - s, - self.offset(), // TODO mul by data length - o, - other.offset(), // TODO mul by data len - ); - }); - // assert_eq!(self.buffers(), other.buffers()); - - assert_eq!(self.child_data(), other.child_data()); - // null arrays can skip the null bitmap, thus only compare if there are no nulls - if self.null_count() != 0 || other.null_count() != 0 { - compare_buffer_regions( - self.null_buffer().unwrap(), - self.offset(), - other.null_buffer().unwrap(), - other.offset(), - ) - } - true + equal(self, other) } } -/// A helper to compare buffer regions of 2 buffers. -/// Compares the length of the shorter buffer. -fn compare_buffer_regions( - left: &Buffer, - left_offset: usize, - right: &Buffer, - right_offset: usize, -) { - // for convenience, we assume that the buffer lengths are only unequal if one has padding, - // so we take the shorter length so we can discard the padding from the longer length - let shorter_len = left.len().min(right.len()); - let s_sliced = left.bit_slice(left_offset, shorter_len); - let o_sliced = right.bit_slice(right_offset, shorter_len); - assert_eq!(s_sliced, o_sliced); -} - /// Builder for `ArrayData` type #[derive(Debug)] pub struct ArrayDataBuilder { @@ -388,6 +341,7 @@ mod tests { use std::sync::Arc; use crate::buffer::Buffer; + use crate::datatypes::ToByteSlice; use crate::util::bit_util; #[test] @@ -403,16 +357,16 @@ mod tests { #[test] fn test_builder() { - let v = vec![0, 1, 2, 3]; let child_arr_data = Arc::new(ArrayData::new( DataType::Int32, - 10, + 5, Some(0), None, 0, - vec![], + vec![Buffer::from([1i32, 2, 3, 4, 5].to_byte_slice())], vec![], )); + let v = vec![0, 1, 2, 3]; let b1 = Buffer::from(&v[..]); let arr_data = ArrayData::builder(DataType::Int32) .len(20) From 3038bc443ddbd9b645b2d842eec616324d809693 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Tue, 10 Nov 2020 19:38:12 +0200 Subject: [PATCH 17/18] prevent panic in offset_value_equal --- rust/arrow/src/array/equal/variable_size.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rust/arrow/src/array/equal/variable_size.rs b/rust/arrow/src/array/equal/variable_size.rs index 237b353d287..c26ec6cc1b3 100644 --- a/rust/arrow/src/array/equal/variable_size.rs +++ b/rust/arrow/src/array/equal/variable_size.rs @@ -57,7 +57,11 @@ pub(super) fn variable_sized_equal( let lhs_values = &lhs.buffers()[1].data()[lhs.offset()..]; let rhs_values = &rhs.buffers()[1].data()[rhs.offset()..]; - if lhs.null_count() == 0 && rhs.null_count() == 0 { + if lhs.null_count() == 0 + && rhs.null_count() == 0 + && !lhs_values.is_empty() + && !rhs_values.is_empty() + { offset_value_equal( lhs_values, rhs_values, From e43c394441c8a48cc32ec97290b8d31e5c5763d8 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Tue, 10 Nov 2020 20:24:41 +0200 Subject: [PATCH 18/18] disable failing test, update status doc --- dev/archery/archery/integration/datagen.py | 3 ++- docs/source/status.rst | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index bd849c7777c..4a8bc8f1471 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1558,7 +1558,8 @@ def _temp_path(): generate_nested_large_offsets_case() .skip_category('Go') - .skip_category('JS'), + .skip_category('JS') + .skip_category('Rust'), generate_unions_case() .skip_category('Go') diff --git a/docs/source/status.rst b/docs/source/status.rst index da951bda51a..dc78dbbe3f4 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -69,13 +69,13 @@ Data Types | Data type | C++ | Java | Go | JavaScript | C# | Rust | | (nested) | | | | | | | +===================+=======+=======+=======+============+=======+=======+ -| Fixed Size List | ✓ | ✓ | ✓ | ✓ | | | +| Fixed Size List | ✓ | ✓ | ✓ | ✓ | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| List | ✓ | ✓ | ✓ | ✓ | ✓ | | +| List | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ | Large List | ✓ | ✓ | | | | | +-------------------+-------+-------+-------+------------+-------+-------+ -| Struct | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Struct | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ | Map | ✓ | ✓ | | ✓ | | | +-------------------+-------+-------+-------+------------+-------+-------+