diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 3d50381f0d3..4a8bc8f1471 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1551,12 +1551,10 @@ def _temp_path(): .skip_category('JS') # TODO(ARROW-8716) .skip_category('Rust'), - generate_nested_case() - .skip_category('Rust'), + generate_nested_case(), generate_recursive_nested_case() - .skip_category('Go') # TODO(ARROW-8453) - .skip_category('Rust'), + .skip_category('Go'), # TODO(ARROW-8453) generate_nested_large_offsets_case() .skip_category('Go') @@ -1571,12 +1569,11 @@ def _temp_path(): generate_custom_metadata_case() .skip_category('Go') .skip_category('JS') - .skip_category('Rust'), + .skip_category('Rust'), # TODO(ARROW-10259) generate_duplicate_fieldnames_case() .skip_category('Go') - .skip_category('JS') - .skip_category('Rust'), + .skip_category('JS'), # TODO(ARROW-3039, ARROW-5267): Dictionaries in GO generate_dictionary_case() diff --git a/docs/source/status.rst b/docs/source/status.rst index d43c3bc80ca..dc78dbbe3f4 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -30,52 +30,52 @@ Data Types | Data type | C++ | Java | Go | JavaScript | C# | Rust | | (primitive) | | | | | | | +===================+=======+=======+=======+============+=======+=======+ -| Null | ✓ | ✓ | | | | | +| Null | ✓ | ✓ | | | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Boolean | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Boolean | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Int8/16/32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Int8/16/32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| UInt8/16/32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | | +| UInt8/16/32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ | Float16 | | | ✓ | | | | +-------------------+-------+-------+-------+------------+-------+-------+ -| Float32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Float32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ | Decimal128 | ✓ | ✓ | | | | | +-------------------+-------+-------+-------+------------+-------+-------+ -| Date32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Date32/64 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Time32/64 | ✓ | ✓ | ✓ | ✓ | | | +| Time32/64 | ✓ | ✓ | ✓ | ✓ | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Timestamp | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Timestamp | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ | Duration | ✓ | ✓ | ✓ | | | | +-------------------+-------+-------+-------+------------+-------+-------+ | Interval | ✓ | ✓ | ✓ | | | | +-------------------+-------+-------+-------+------------+-------+-------+ -| Fixed Size Binary | ✓ | ✓ | ✓ | ✓ | | | +| Fixed Size Binary | ✓ | ✓ | ✓ | ✓ | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Binary | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Binary | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Large Binary | ✓ | ✓ | ✓ | ✓ | | | +| Large Binary | ✓ | ✓ | ✓ | ✓ | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Utf8 | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Utf8 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| Large Utf8 | ✓ | ✓ | ✓ | ✓ | | | +| Large Utf8 | ✓ | ✓ | ✓ | ✓ | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ +-------------------+-------+-------+-------+------------+-------+-------+ | Data type | C++ | Java | Go | JavaScript | C# | Rust | | (nested) | | | | | | | +===================+=======+=======+=======+============+=======+=======+ -| Fixed Size List | ✓ | ✓ | ✓ | ✓ | | | +| Fixed Size List | ✓ | ✓ | ✓ | ✓ | | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ -| List | ✓ | ✓ | ✓ | ✓ | ✓ | | +| List | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ | Large List | ✓ | ✓ | | | | | +-------------------+-------+-------+-------+------------+-------+-------+ -| Struct | ✓ | ✓ | ✓ | ✓ | ✓ | | +| Struct | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+ | Map | ✓ | ✓ | | ✓ | | | +-------------------+-------+-------+-------+------------+-------+-------+ diff --git a/rust/arrow/src/array/array_struct.rs b/rust/arrow/src/array/array_struct.rs index 7f190b83008..3715a8b1501 100644 --- a/rust/arrow/src/array/array_struct.rs +++ b/rust/arrow/src/array/array_struct.rs @@ -282,7 +282,7 @@ mod tests { .build(); let int_data = ArrayData::builder(DataType::Int64) .len(4) - .add_buffer(Buffer::from([42, 28, 19, 31].to_byte_slice())) + .add_buffer(Buffer::from([42i64, 28, 19, 31].to_byte_slice())) .build(); let mut field_types = vec![]; field_types.push(Field::new("a", DataType::Boolean, false)); diff --git a/rust/arrow/src/array/data.rs b/rust/arrow/src/array/data.rs index e328c01ee7a..839c20335fe 100644 --- a/rust/arrow/src/array/data.rs +++ b/rust/arrow/src/array/data.rs @@ -26,6 +26,8 @@ use crate::datatypes::DataType; use crate::util::bit_util; use crate::{bitmap::Bitmap, datatypes::ArrowNativeType}; +use super::equal::equal; + #[inline] fn count_nulls(null_bit_buffer: Option<&Buffer>, offset: usize, len: usize) -> usize { if let Some(ref buf) = null_bit_buffer { @@ -245,59 +247,10 @@ impl ArrayData { impl PartialEq for ArrayData { fn eq(&self, other: &Self) -> bool { - assert_eq!( - self.data_type(), - other.data_type(), - "Data types not the same" - ); - assert_eq!(self.len(), other.len(), "Lengths not the same"); - // TODO: when adding tests for this, test that we can compare with arrays that have offsets - assert_eq!(self.offset(), other.offset(), "Offsets not the same"); - assert_eq!(self.null_count(), other.null_count()); - // compare buffers excluding padding - let self_buffers = self.buffers(); - let other_buffers = other.buffers(); - assert_eq!(self_buffers.len(), other_buffers.len()); - self_buffers.iter().zip(other_buffers).for_each(|(s, o)| { - compare_buffer_regions( - s, - self.offset(), // TODO mul by data length - o, - other.offset(), // TODO mul by data len - ); - }); - // assert_eq!(self.buffers(), other.buffers()); - - assert_eq!(self.child_data(), other.child_data()); - // null arrays can skip the null bitmap, thus only compare if there are no nulls - if self.null_count() != 0 || other.null_count() != 0 { - compare_buffer_regions( - self.null_buffer().unwrap(), - self.offset(), - other.null_buffer().unwrap(), - other.offset(), - ) - } - true + equal(self, other) } } -/// A helper to compare buffer regions of 2 buffers. -/// Compares the length of the shorter buffer. -fn compare_buffer_regions( - left: &Buffer, - left_offset: usize, - right: &Buffer, - right_offset: usize, -) { - // for convenience, we assume that the buffer lengths are only unequal if one has padding, - // so we take the shorter length so we can discard the padding from the longer length - let shorter_len = left.len().min(right.len()); - let s_sliced = left.bit_slice(left_offset, shorter_len); - let o_sliced = right.bit_slice(right_offset, shorter_len); - assert_eq!(s_sliced, o_sliced); -} - /// Builder for `ArrayData` type #[derive(Debug)] pub struct ArrayDataBuilder { @@ -388,6 +341,7 @@ mod tests { use std::sync::Arc; use crate::buffer::Buffer; + use crate::datatypes::ToByteSlice; use crate::util::bit_util; #[test] @@ -403,16 +357,16 @@ mod tests { #[test] fn test_builder() { - let v = vec![0, 1, 2, 3]; let child_arr_data = Arc::new(ArrayData::new( DataType::Int32, - 10, + 5, Some(0), None, 0, - vec![], + vec![Buffer::from([1i32, 2, 3, 4, 5].to_byte_slice())], vec![], )); + let v = vec![0, 1, 2, 3]; let b1 = Buffer::from(&v[..]); let arr_data = ArrayData::builder(DataType::Int32) .len(20) diff --git a/rust/arrow/src/array/equal/variable_size.rs b/rust/arrow/src/array/equal/variable_size.rs index 237b353d287..c26ec6cc1b3 100644 --- a/rust/arrow/src/array/equal/variable_size.rs +++ b/rust/arrow/src/array/equal/variable_size.rs @@ -57,7 +57,11 @@ pub(super) fn variable_sized_equal( let lhs_values = &lhs.buffers()[1].data()[lhs.offset()..]; let rhs_values = &rhs.buffers()[1].data()[rhs.offset()..]; - if lhs.null_count() == 0 && rhs.null_count() == 0 { + if lhs.null_count() == 0 + && rhs.null_count() == 0 + && !lhs_values.is_empty() + && !rhs_values.is_empty() + { offset_value_equal( lhs_values, rhs_values, diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 5b51a2f5203..d70cde15b94 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -70,7 +70,7 @@ struct BufferData { impl PartialEq for BufferData { fn eq(&self, other: &BufferData) -> bool { - if self.capacity != other.capacity { + if self.len != other.len { return false; } diff --git a/rust/arrow/src/compute/kernels/cast.rs b/rust/arrow/src/compute/kernels/cast.rs index 9d1b7fc6011..f054542b079 100644 --- a/rust/arrow/src/compute/kernels/cast.rs +++ b/rust/arrow/src/compute/kernels/cast.rs @@ -202,6 +202,7 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool { (Timestamp(_, _), Date32(_)) => true, (Timestamp(_, _), Date64(_)) => true, // date64 to timestamp might not make sense, + (Int64, Duration(_)) => true, (Null, Int32) => true, (_, _) => false, } @@ -751,6 +752,21 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> Result { } } // date64 to timestamp might not make sense, + (Int64, Duration(to_unit)) => { + use TimeUnit::*; + match to_unit { + Second => cast_array_data::(array, to_type.clone()), + Millisecond => { + cast_array_data::(array, to_type.clone()) + } + Microsecond => { + cast_array_data::(array, to_type.clone()) + } + Nanosecond => { + cast_array_data::(array, to_type.clone()) + } + } + } // null to primitive/flat types (Null, Int32) => Ok(Arc::new(Int32Array::from(vec![None; array.len()]))), diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index 099355e7596..d6dd7a544cd 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -873,7 +873,7 @@ impl ToByteSlice for T { impl DataType { /// Parse a data type from a JSON representation - fn from(json: &Value) -> Result { + pub(crate) fn from(json: &Value) -> Result { let default_field = Field::new("", DataType::Boolean, true); match *json { Value::Object(ref map) => match map.get("name") { @@ -1489,7 +1489,7 @@ impl fmt::Display for Field { pub struct Schema { pub(crate) fields: Vec, /// A map of key-value pairs containing additional meta data. - #[serde(default)] + #[serde(skip_serializing_if = "HashMap::is_empty")] pub(crate) metadata: HashMap, } @@ -1696,9 +1696,24 @@ impl Schema { } /// Parse a `metadata` definition from a JSON representation + /// The JSON can either be an Object or an Array of Objects fn from_metadata(json: &Value) -> Result> { - if let Value::Object(md) = json { - md.iter() + match json { + Value::Array(_) => { + let mut hashmap = HashMap::new(); + let values: Vec = serde_json::from_value(json.clone()) + .map_err(|_| { + ArrowError::JsonError( + "Unable to parse object into key-value pair".to_string(), + ) + })?; + for meta in values { + hashmap.insert(meta.key.clone(), meta.value); + } + Ok(hashmap) + } + Value::Object(md) => md + .iter() .map(|(k, v)| { if let Value::String(v) = v { Ok((k.to_string(), v.to_string())) @@ -1708,11 +1723,10 @@ impl Schema { )) } }) - .collect::>() - } else { - Err(ArrowError::ParseError( + .collect::>(), + _ => Err(ArrowError::ParseError( "`metadata` field must be an object".to_string(), - )) + )), } } } @@ -1733,6 +1747,11 @@ impl fmt::Display for Schema { /// A reference-counted reference to a [`Schema`](crate::datatypes::Schema). pub type SchemaRef = Arc; +#[derive(Deserialize)] +struct MetadataKeyValue { + key: String, + value: String, +} #[cfg(test)] mod tests { use super::*; diff --git a/rust/arrow/src/error.rs b/rust/arrow/src/error.rs index f428b4f0f99..51b614ae44c 100644 --- a/rust/arrow/src/error.rs +++ b/rust/arrow/src/error.rs @@ -79,6 +79,12 @@ impl From<::std::string::FromUtf8Error> for ArrowError { } } +impl From for ArrowError { + fn from(error: serde_json::Error) -> Self { + ArrowError::JsonError(error.to_string()) + } +} + impl Display for ArrowError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index feb4133d383..127a3631553 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -311,7 +311,7 @@ pub(crate) fn build_field<'a: 'b, 'b>( field: &Field, ) -> WIPOffset> { let fb_field_name = fbb.create_string(field.name().as_str()); - let field_type = get_fb_field_type(field.data_type(), fbb); + let field_type = get_fb_field_type(field.data_type(), field.is_nullable(), fbb); let fb_dictionary = if let Dictionary(index_type, _) = field.data_type() { Some(get_fb_dictionary( @@ -342,6 +342,7 @@ pub(crate) fn build_field<'a: 'b, 'b>( /// Get the IPC type of a data type pub(crate) fn get_fb_field_type<'a: 'b, 'b>( data_type: &DataType, + is_nullable: bool, fbb: &mut FlatBufferBuilder<'a>, ) -> FBFieldType<'b> { // some IPC implementations expect an empty list for child data, instead of a null value. @@ -558,7 +559,8 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>( // struct's fields are children let mut children = vec![]; for field in fields { - let inner_types = get_fb_field_type(field.data_type(), fbb); + let inner_types = + get_fb_field_type(field.data_type(), field.is_nullable(), fbb); let field_name = fbb.create_string(field.name()); children.push(ipc::Field::create( fbb, @@ -583,7 +585,7 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>( // In this library, the dictionary "type" is a logical construct. Here we // pass through to the value type, as we've already captured the index // type in the DictionaryEncoding metadata in the parent field - get_fb_field_type(value_type, fbb) + get_fb_field_type(value_type, is_nullable, fbb) } t => unimplemented!("Type {:?} not supported", t), } diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs index cba8fb269a4..a2d7103aacf 100644 --- a/rust/arrow/src/ipc/mod.rs +++ b/rust/arrow/src/ipc/mod.rs @@ -35,5 +35,5 @@ pub use self::gen::Schema::*; pub use self::gen::SparseTensor::*; pub use self::gen::Tensor::*; -static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; -static CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; +const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; +const CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index 05cf00b822b..76ad6b77cf3 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -364,6 +364,19 @@ fn create_list_array( .null_bit_buffer(buffers[0].clone()) } make_array(builder.build()) + } else if let DataType::LargeList(_) = *data_type { + let null_count = field_node.null_count() as usize; + let mut builder = ArrayData::builder(data_type.clone()) + .len(field_node.length() as usize) + .buffers(buffers[1..2].to_vec()) + .offset(0) + .child_data(vec![child_array.data()]); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + make_array(builder.build()) } else if let DataType::FixedSizeList(_, _) = *data_type { let null_count = field_node.null_count() as usize; let mut builder = ArrayData::builder(data_type.clone()) @@ -599,11 +612,18 @@ impl FileReader { let mut dictionaries_by_field = vec![None; schema.fields().len()]; for block in footer.dictionaries().unwrap() { // read length from end of offset - // TODO: ARROW-9848: dictionary metadata has not been tested - let meta_len = block.metaDataLength() - 4; + let mut message_size: [u8; 4] = [0; 4]; + reader.seek(SeekFrom::Start(block.offset() as u64))?; + reader.read_exact(&mut message_size)?; + let footer_len = if message_size == CONTINUATION_MARKER { + reader.read_exact(&mut message_size)?; + i32::from_le_bytes(message_size) + } else { + i32::from_le_bytes(message_size) + }; + + let mut block_data = vec![0; footer_len as usize]; - let mut block_data = vec![0; meta_len as usize]; - reader.seek(SeekFrom::Start(block.offset() as u64 + 4))?; reader.read_exact(&mut block_data)?; let message = ipc::get_root_as_message(&block_data[..]); @@ -627,10 +647,11 @@ impl FileReader { &mut dictionaries_by_field, )?; } - _ => { - return Err(ArrowError::IoError( - "Expecting DictionaryBatch in dictionary blocks.".to_string(), - )) + t => { + return Err(ArrowError::IoError(format!( + "Expecting DictionaryBatch in dictionary blocks, found {:?}.", + t + ))); } }; } diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 7678a5cd200..cb861609f51 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -180,8 +180,11 @@ impl FileWriter { let mut fields = vec![]; for field in self.schema.fields() { let fb_field_name = fbb.create_string(field.name().as_str()); - let field_type = - ipc::convert::get_fb_field_type(field.data_type(), &mut fbb); + let field_type = ipc::convert::get_fb_field_type( + field.data_type(), + field.is_nullable(), + &mut fbb, + ); let mut field_builder = ipc::FieldBuilder::new(&mut fbb); field_builder.add_name(fb_field_name); field_builder.add_type_type(field_type.type_type); diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index 22d271618b6..94d0a9b75a0 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -24,6 +24,7 @@ use serde_json::{Number as VNumber, Value}; use crate::array::*; use crate::datatypes::*; +use crate::error::Result; use crate::record_batch::{RecordBatch, RecordBatchReader}; /// A struct that represents an Arrow file with a schema and record batches @@ -31,6 +32,7 @@ use crate::record_batch::{RecordBatch, RecordBatchReader}; pub struct ArrowJson { pub schema: ArrowJsonSchema, pub batches: Vec, + #[serde(skip_serializing_if = "Option::is_none")] pub dictionaries: Option>, } @@ -39,7 +41,52 @@ pub struct ArrowJson { /// Fields are left as JSON `Value` as they vary by `DataType` #[derive(Deserialize, Serialize, Debug)] pub struct ArrowJsonSchema { - pub fields: Vec, + pub fields: Vec, +} + +/// Fields are left as JSON `Value` as they vary by `DataType` +#[derive(Deserialize, Serialize, Debug)] +pub struct ArrowJsonField { + pub name: String, + #[serde(rename = "type")] + pub field_type: Value, + pub nullable: bool, + pub children: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub dictionary: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, +} + +impl From<&Field> for ArrowJsonField { + fn from(field: &Field) -> Self { + Self { + name: field.name().to_string(), + field_type: field.data_type().to_json(), + nullable: field.is_nullable(), + children: vec![], + dictionary: None, // TODO: not enough info + metadata: None, // TODO(ARROW-10259) + } + } +} + +#[derive(Deserialize, Serialize, Debug)] +pub struct ArrowJsonFieldDictionary { + pub id: i64, + #[serde(rename = "indexType")] + pub index_type: DictionaryIndexType, + #[serde(rename = "isOrdered")] + pub is_ordered: bool, +} + +#[derive(Deserialize, Serialize, Debug)] +pub struct DictionaryIndexType { + pub name: String, + #[serde(rename = "isSigned")] + pub is_signed: bool, + #[serde(rename = "bitWidth")] + pub bit_width: i64, } /// A struct that partially reads the Arrow JSON record batch @@ -53,8 +100,8 @@ pub struct ArrowJsonBatch { #[derive(Deserialize, Serialize, Debug)] #[allow(non_snake_case)] pub struct ArrowJsonDictionaryBatch { - id: i64, - data: ArrowJsonBatch, + pub id: i64, + pub data: ArrowJsonBatch, } /// A struct that partially reads the Arrow JSON column/array @@ -97,12 +144,42 @@ impl ArrowJsonSchema { for i in 0..field_len { let json_field = &self.fields[i]; let field = schema.field(i); - assert_eq!(json_field, &field.to_json()); + if !json_field.equals_field(field) { + return false; + } } true } } +impl ArrowJsonField { + /// Compare the Arrow JSON field with the Arrow `Field` + fn equals_field(&self, field: &Field) -> bool { + // convert to a field + match self.to_arrow_field() { + Ok(self_field) => { + assert_eq!(&self_field, field, "Arrow fields not the same"); + true + } + Err(e) => { + eprintln!( + "Encountered error while converting JSON field to Arrow field: {:?}", + e + ); + false + } + } + } + + /// Convert to an Arrow Field + /// TODO: convert to use an Into + fn to_arrow_field(&self) -> Result { + // a bit regressive, but we have to convert the field to JSON in order to convert it + let field = serde_json::to_value(self)?; + Field::from(&field) + } +} + impl ArrowJsonBatch { /// Compare the Arrow JSON record batch with a `RecordBatch` fn equals_batch(&self, batch: &RecordBatch) -> bool { @@ -218,8 +295,9 @@ impl ArrowJsonBatch { let arr = arr.as_any().downcast_ref::().unwrap(); arr.equals_json(&json_array.iter().collect::>()[..]) } - DataType::Utf8 => { - let arr = arr.as_any().downcast_ref::().unwrap(); + DataType::LargeBinary => { + let arr = + arr.as_any().downcast_ref::().unwrap(); arr.equals_json(&json_array.iter().collect::>()[..]) } DataType::FixedSizeBinary(_) => { @@ -227,10 +305,23 @@ impl ArrowJsonBatch { arr.as_any().downcast_ref::().unwrap(); arr.equals_json(&json_array.iter().collect::>()[..]) } + DataType::Utf8 => { + let arr = arr.as_any().downcast_ref::().unwrap(); + arr.equals_json(&json_array.iter().collect::>()[..]) + } + DataType::LargeUtf8 => { + let arr = + arr.as_any().downcast_ref::().unwrap(); + arr.equals_json(&json_array.iter().collect::>()[..]) + } DataType::List(_) => { let arr = arr.as_any().downcast_ref::().unwrap(); arr.equals_json(&json_array.iter().collect::>()[..]) } + DataType::LargeList(_) => { + let arr = arr.as_any().downcast_ref::().unwrap(); + arr.equals_json(&json_array.iter().collect::>()[..]) + } DataType::FixedSizeList(_, _) => { let arr = arr.as_any().downcast_ref::().unwrap(); diff --git a/rust/integration-testing/src/bin/arrow-json-integration-test.rs b/rust/integration-testing/src/bin/arrow-json-integration-test.rs index 5556c4cebc8..d4afd13528d 100644 --- a/rust/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/rust/integration-testing/src/bin/arrow-json-integration-test.rs @@ -15,23 +15,28 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; +use std::env; +use std::fs::File; +use std::io::BufReader; +use std::sync::Arc; + use clap::{App, Arg}; +use hex::decode; use serde_json::Value; -use arrow::util::integration_util::{ArrowJson, ArrowJsonBatch, ArrowJsonSchema}; - use arrow::array::*; -use arrow::datatypes::{DataType, DateUnit, IntervalUnit, Schema}; +use arrow::datatypes::{DataType, DateUnit, Field, IntervalUnit, Schema}; use arrow::error::{ArrowError, Result}; use arrow::ipc::reader::FileReader; use arrow::ipc::writer::FileWriter; use arrow::record_batch::RecordBatch; - -use hex::decode; -use std::env; -use std::fs::File; -use std::io::BufReader; -use std::sync::Arc; +use arrow::{ + buffer::Buffer, + buffer::MutableBuffer, + datatypes::ToByteSlice, + util::{bit_util, integration_util::*}, +}; fn main() -> Result<()> { let args: Vec = env::args().collect(); @@ -80,12 +85,18 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()> eprintln!("Converting {} to {}", json_name, arrow_name); } - let (schema, batches) = read_json_file(json_name)?; + let json_file = read_json_file(json_name)?; let arrow_file = File::create(arrow_name)?; - let mut writer = FileWriter::try_new(arrow_file, &schema)?; + let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?; - for b in batches { + if !json_file.dictionaries.is_empty() { + return Err(ArrowError::JsonError( + "Writing dictionaries not yet supported".to_string(), + )); + } + + for b in json_file.batches { writer.write(&b)?; } @@ -95,326 +106,503 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()> fn record_batch_from_json( schema: &Schema, json_batch: ArrowJsonBatch, + json_dictionaries: Option<&HashMap>, ) -> Result { let mut columns = vec![]; for (field, json_col) in schema.fields().iter().zip(json_batch.columns) { - let col: ArrayRef = match field.data_type() { - DataType::Null => Arc::new(NullArray::new(json_col.count)), - DataType::Boolean => { - let mut b = BooleanBuilder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_bool().unwrap()), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + let col = array_from_json(field, json_col, json_dictionaries)?; + columns.push(col); + } + + RecordBatch::try_new(Arc::new(schema.clone()), columns) +} + +/// Construct an Arrow array from a partially typed JSON column +fn array_from_json( + field: &Field, + json_col: ArrowJsonColumn, + dictionaries: Option<&HashMap>, +) -> Result { + match field.data_type() { + DataType::Null => Ok(Arc::new(NullArray::new(json_col.count))), + DataType::Boolean => { + let mut b = BooleanBuilder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_bool().unwrap()), + _ => b.append_null(), + }?; } - DataType::Int8 => { - let mut b = Int8Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_i64().unwrap() as i8), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::Int8 => { + let mut b = Int8Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_i64().ok_or_else(|| { + ArrowError::JsonError(format!( + "Unable to get {:?} as int64", + value + )) + })? as i8), + _ => b.append_null(), + }?; } - DataType::Int16 => { - let mut b = Int16Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_i64().unwrap() as i16), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::Int16 => { + let mut b = Int16Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_i64().unwrap() as i16), + _ => b.append_null(), + }?; } - DataType::Int32 - | DataType::Date32(DateUnit::Day) - | DataType::Time32(_) - | DataType::Interval(IntervalUnit::YearMonth) => { - let mut b = Int32Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_i64().unwrap() as i32), - _ => b.append_null(), - } - .unwrap(); - } - let array = Arc::new(b.finish()) as ArrayRef; - arrow::compute::cast(&array, field.data_type()).unwrap() + Ok(Arc::new(b.finish())) + } + DataType::Int32 + | DataType::Date32(DateUnit::Day) + | DataType::Time32(_) + | DataType::Interval(IntervalUnit::YearMonth) => { + let mut b = Int32Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_i64().unwrap() as i32), + _ => b.append_null(), + }?; } - DataType::Int64 - | DataType::Date64(DateUnit::Millisecond) - | DataType::Time64(_) - | DataType::Timestamp(_, _) - | DataType::Duration(_) - | DataType::Interval(IntervalUnit::DayTime) => { - let mut b = Int64Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value( - value - .as_str() - .unwrap() - .parse() - .expect("Unable to parse string as i64"), - ), - _ => b.append_null(), - } - .unwrap(); - } - let array = Arc::new(b.finish()) as ArrayRef; - arrow::compute::cast(&array, field.data_type()).unwrap() + let array = Arc::new(b.finish()) as ArrayRef; + arrow::compute::cast(&array, field.data_type()) + } + DataType::Int64 + | DataType::Date64(DateUnit::Millisecond) + | DataType::Time64(_) + | DataType::Timestamp(_, _) + | DataType::Duration(_) + | DataType::Interval(IntervalUnit::DayTime) => { + let mut b = Int64Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(match value { + Value::Number(n) => n.as_i64().unwrap(), + Value::String(s) => { + s.parse().expect("Unable to parse string as i64") + } + _ => panic!("Unable to parse {:?} as number", value), + }), + _ => b.append_null(), + }?; } - DataType::UInt8 => { - let mut b = UInt8Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_u64().unwrap() as u8), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + let array = Arc::new(b.finish()) as ArrayRef; + arrow::compute::cast(&array, field.data_type()) + } + DataType::UInt8 => { + let mut b = UInt8Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_u64().unwrap() as u8), + _ => b.append_null(), + }?; } - DataType::UInt16 => { - let mut b = UInt16Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_u64().unwrap() as u16), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::UInt16 => { + let mut b = UInt16Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_u64().unwrap() as u16), + _ => b.append_null(), + }?; } - DataType::UInt32 => { - let mut b = UInt32Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_u64().unwrap() as u32), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::UInt32 => { + let mut b = UInt32Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_u64().unwrap() as u32), + _ => b.append_null(), + }?; } - DataType::UInt64 => { - let mut b = UInt64Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value( - value - .as_str() - .unwrap() - .parse() - .expect("Unable to parse string as u64"), - ), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::UInt64 => { + let mut b = UInt64Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value( + value + .as_str() + .unwrap() + .parse() + .expect("Unable to parse string as u64"), + ), + _ => b.append_null(), + }?; } - DataType::Float32 => { - let mut b = Float32Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_f64().unwrap() as f32), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::Float32 => { + let mut b = Float32Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_f64().unwrap() as f32), + _ => b.append_null(), + }?; } - DataType::Float64 => { - let mut b = Float64Builder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_f64().unwrap()), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::Float64 => { + let mut b = Float64Builder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_f64().unwrap()), + _ => b.append_null(), + }?; } - DataType::Binary => { - let mut b = BinaryBuilder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => { - let v = decode(value.as_str().unwrap()).unwrap(); - b.append_value(&v) - } - _ => b.append_null(), + Ok(Arc::new(b.finish())) + } + DataType::Binary => { + let mut b = BinaryBuilder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => { + let v = decode(value.as_str().unwrap()).unwrap(); + b.append_value(&v) } - .unwrap(); - } - Arc::new(b.finish()) + _ => b.append_null(), + }?; } - DataType::LargeBinary => { - let mut b = LargeBinaryBuilder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => { - let v = decode(value.as_str().unwrap()).unwrap(); - b.append_value(&v) - } - _ => b.append_null(), + Ok(Arc::new(b.finish())) + } + DataType::LargeBinary => { + let mut b = LargeBinaryBuilder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => { + let v = decode(value.as_str().unwrap()).unwrap(); + b.append_value(&v) } - .unwrap(); - } - Arc::new(b.finish()) + _ => b.append_null(), + }?; } - DataType::Utf8 => { - let mut b = StringBuilder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_str().unwrap()), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::Utf8 => { + let mut b = StringBuilder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_str().unwrap()), + _ => b.append_null(), + }?; } - DataType::LargeUtf8 => { - let mut b = LargeStringBuilder::new(json_col.count); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => b.append_value(value.as_str().unwrap()), - _ => b.append_null(), - } - .unwrap(); - } - Arc::new(b.finish()) + Ok(Arc::new(b.finish())) + } + DataType::LargeUtf8 => { + let mut b = LargeStringBuilder::new(json_col.count); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => b.append_value(value.as_str().unwrap()), + _ => b.append_null(), + }?; } - DataType::FixedSizeBinary(len) => { - let mut b = FixedSizeBinaryBuilder::new(json_col.count, *len); - for (is_valid, value) in json_col - .validity - .as_ref() - .unwrap() - .iter() - .zip(json_col.data.unwrap()) - { - match is_valid { - 1 => { - let v = hex::decode(value.as_str().unwrap()).unwrap(); - b.append_value(&v) - } - _ => b.append_null(), + Ok(Arc::new(b.finish())) + } + DataType::FixedSizeBinary(len) => { + let mut b = FixedSizeBinaryBuilder::new(json_col.count, *len); + for (is_valid, value) in json_col + .validity + .as_ref() + .unwrap() + .iter() + .zip(json_col.data.unwrap()) + { + match is_valid { + 1 => { + let v = hex::decode(value.as_str().unwrap()).unwrap(); + b.append_value(&v) } - .unwrap(); - } - Arc::new(b.finish()) + _ => b.append_null(), + }?; + } + Ok(Arc::new(b.finish())) + } + DataType::List(child_field) => { + let null_buf = create_null_buf(&json_col); + let children = json_col.children.clone().unwrap(); + let child_array = array_from_json( + &child_field, + children.get(0).unwrap().clone(), + dictionaries, + )?; + let offsets: Vec = json_col + .offset + .unwrap() + .iter() + .map(|v| v.as_i64().unwrap() as i32) + .collect(); + let list_data = ArrayData::builder(field.data_type().clone()) + .len(json_col.count) + .offset(0) + .add_buffer(Buffer::from(&offsets.to_byte_slice())) + .add_child_data(child_array.data()) + .null_bit_buffer(null_buf) + .build(); + Ok(Arc::new(ListArray::from(list_data))) + } + DataType::LargeList(child_field) => { + let null_buf = create_null_buf(&json_col); + let children = json_col.children.clone().unwrap(); + let child_array = array_from_json( + &child_field, + children.get(0).unwrap().clone(), + dictionaries, + )?; + let offsets: Vec = json_col + .offset + .unwrap() + .iter() + .map(|v| match v { + Value::Number(n) => n.as_i64().unwrap(), + Value::String(s) => s.parse::().unwrap(), + _ => panic!("64-bit offset must be either string or number"), + }) + .collect(); + let list_data = ArrayData::builder(field.data_type().clone()) + .len(json_col.count) + .offset(0) + .add_buffer(Buffer::from(&offsets.to_byte_slice())) + .add_child_data(child_array.data()) + .null_bit_buffer(null_buf) + .build(); + Ok(Arc::new(LargeListArray::from(list_data))) + } + DataType::FixedSizeList(child_field, _) => { + let children = json_col.children.clone().unwrap(); + let child_array = array_from_json( + &child_field, + children.get(0).unwrap().clone(), + dictionaries, + )?; + let null_buf = create_null_buf(&json_col); + let list_data = ArrayData::builder(field.data_type().clone()) + .len(json_col.count) + .add_child_data(child_array.data()) + .null_bit_buffer(null_buf) + .build(); + Ok(Arc::new(FixedSizeListArray::from(list_data))) + } + DataType::Struct(fields) => { + // construct struct with null data + let null_buf = create_null_buf(&json_col); + let mut array_data = ArrayData::builder(field.data_type().clone()) + .len(json_col.count) + .null_bit_buffer(null_buf); + + for (field, col) in fields.iter().zip(json_col.children.unwrap()) { + let array = array_from_json(field, col, dictionaries)?; + array_data = array_data.add_child_data(array.data()); } - t => { - return Err(ArrowError::JsonError(format!( - "data type {:?} not supported", - t - ))) + + let array = StructArray::from(array_data.build()); + Ok(Arc::new(array)) + } + DataType::Dictionary(key_type, value_type) => { + let dict_id = field.dict_id(); + // find dictionary + let dictionary = dictionaries + .ok_or_else(|| { + ArrowError::JsonError(format!( + "Unable to find any dictionaries for field {:?}", + field + )) + })? + .get(&dict_id); + match dictionary { + Some(dictionary) => dictionary_array_from_json( + field, json_col, key_type, value_type, dictionary, + ), + None => Err(ArrowError::JsonError(format!( + "Unable to find dictionary for field {:?}", + field + ))), } - }; - columns.push(col); + } + t => Err(ArrowError::JsonError(format!( + "data type {:?} not supported", + t + ))), } +} - RecordBatch::try_new(Arc::new(schema.clone()), columns) +fn dictionary_array_from_json( + field: &Field, + json_col: ArrowJsonColumn, + dict_key: &DataType, + dict_value: &DataType, + dictionary: &ArrowJsonDictionaryBatch, +) -> Result { + match dict_key { + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 => { + // build the key data into a buffer, then construct values separately + let key_field = Field::new_dict( + "key", + dict_key.clone(), + field.is_nullable(), + field.dict_id(), + field.dict_is_ordered(), + ); + let keys = array_from_json(&key_field, json_col, None)?; + // note: not enough info on nullability of dictionary + let value_field = Field::new("value", dict_value.clone(), true); + println!("dictionary value type: {:?}", dict_value); + let values = + array_from_json(&value_field, dictionary.data.columns[0].clone(), None)?; + + // convert key and value to dictionary data + let dict_data = ArrayData::builder(field.data_type().clone()) + .len(keys.len()) + .add_buffer(keys.data().buffers()[0].clone()) + .add_child_data(values.data()) + .build(); + + let array = match dict_key { + DataType::Int8 => { + Arc::new(Int8DictionaryArray::from(dict_data)) as ArrayRef + } + DataType::Int16 => Arc::new(Int16DictionaryArray::from(dict_data)), + DataType::Int32 => Arc::new(Int32DictionaryArray::from(dict_data)), + DataType::Int64 => Arc::new(Int64DictionaryArray::from(dict_data)), + DataType::UInt8 => Arc::new(UInt8DictionaryArray::from(dict_data)), + DataType::UInt16 => Arc::new(UInt16DictionaryArray::from(dict_data)), + DataType::UInt32 => Arc::new(UInt32DictionaryArray::from(dict_data)), + DataType::UInt64 => Arc::new(UInt64DictionaryArray::from(dict_data)), + _ => unreachable!(), + }; + Ok(array) + } + _ => Err(ArrowError::JsonError(format!( + "Dictionary key type {:?} not supported", + dict_key + ))), + } +} + +/// A helper to create a null buffer from a Vec +fn create_null_buf(json_col: &ArrowJsonColumn) -> Buffer { + let num_bytes = bit_util::ceil(json_col.count, 8); + let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + json_col + .validity + .clone() + .unwrap() + .iter() + .enumerate() + .for_each(|(i, v)| { + let null_slice = null_buf.data_mut(); + if *v != 0 { + bit_util::set_bit(null_slice, i); + } + }); + null_buf.freeze() } fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { @@ -425,9 +613,9 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> let arrow_file = File::open(arrow_name)?; let reader = FileReader::try_new(arrow_file)?; - let mut fields = vec![]; + let mut fields: Vec = vec![]; for f in reader.schema().fields() { - fields.push(f.to_json()); + fields.push(ArrowJsonField::from(f)); } let schema = ArrowJsonSchema { fields }; @@ -453,7 +641,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { } // open JSON file - let (json_schema, json_batches) = read_json_file(json_name)?; + let json_file = read_json_file(json_name)?; // open Arrow file let arrow_file = File::open(arrow_name)?; @@ -461,13 +649,15 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { let arrow_schema = arrow_reader.schema().as_ref().to_owned(); // compare schemas - if json_schema != arrow_schema { + if json_file.schema != arrow_schema { return Err(ArrowError::ComputeError(format!( "Schemas do not match. JSON: {:?}. Arrow: {:?}", - json_schema, arrow_schema + json_file.schema, arrow_schema ))); } + let json_batches = &json_file.batches; + // compare number of batches assert!( json_batches.len() == arrow_reader.num_batches(), @@ -481,17 +671,20 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { ); } - for json_batch in &json_batches { + for json_batch in json_batches { if let Some(Ok(arrow_batch)) = arrow_reader.next() { // compare batches - assert!(arrow_batch.num_columns() == json_batch.num_columns()); + let num_columns = arrow_batch.num_columns(); + assert!(num_columns == json_batch.num_columns()); assert!(arrow_batch.num_rows() == json_batch.num_rows()); - // TODO compare in more detail - eprintln!( - "Basic validation of {} and {} PASSES", - arrow_name, json_name - ); + for i in 0..num_columns { + assert_eq!( + arrow_batch.column(i).data(), + json_batch.column(i).data(), + "Arrow and JSON batch columns not the same" + ); + } } else { return Err(ArrowError::ComputeError( "no more arrow batches left".to_owned(), @@ -508,16 +701,41 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { Ok(()) } -fn read_json_file(json_name: &str) -> Result<(Schema, Vec)> { +struct ArrowFile { + schema: Schema, + // we can evolve this into a concrete Arrow type + dictionaries: HashMap, + batches: Vec, +} + +fn read_json_file(json_name: &str) -> Result { let json_file = File::open(json_name)?; let reader = BufReader::new(json_file); let arrow_json: Value = serde_json::from_reader(reader).unwrap(); let schema = Schema::from(&arrow_json["schema"])?; + // read dictionaries + let mut dictionaries = HashMap::new(); + if let Some(dicts) = arrow_json.get("dictionaries") { + for d in dicts + .as_array() + .expect("Unable to get dictionaries as array") + { + let json_dict: ArrowJsonDictionaryBatch = serde_json::from_value(d.clone()) + .expect("Unable to get dictionary from JSON"); + // TODO: convert to a concrete Arrow type + dictionaries.insert(json_dict.id, json_dict); + } + } + let mut batches = vec![]; for b in arrow_json["batches"].as_array().unwrap() { let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap(); - let batch = record_batch_from_json(&schema, json_batch)?; + let batch = record_batch_from_json(&schema, json_batch, Some(&dictionaries))?; batches.push(batch); } - Ok((schema, batches)) + Ok(ArrowFile { + schema, + dictionaries, + batches, + }) }