diff --git a/rust/arrow/src/json/reader.rs b/rust/arrow/src/json/reader.rs index aaec8459102..a8375dcb33e 100644 --- a/rust/arrow/src/json/reader.rs +++ b/rust/arrow/src/json/reader.rs @@ -44,6 +44,7 @@ use indexmap::map::IndexMap as HashMap; use indexmap::set::IndexSet as HashSet; +use std::cell::RefCell; use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; @@ -448,153 +449,185 @@ impl Reader { let rows = &rows[..]; let projection = self.projection.clone().unwrap_or_else(Vec::new); - let arrays: Result> = self - .schema - .clone() - .fields() - .iter() - .filter(|field| { - if projection.is_empty() { - return true; - } - projection.contains(field.name()) - }) - .map(|field| { - match field.data_type().clone() { - DataType::Null => unimplemented!(), - DataType::Boolean => self.build_boolean_array(rows, field.name()), - DataType::Float64 => { - self.build_primitive_array::(rows, field.name()) - } - DataType::Float32 => { - self.build_primitive_array::(rows, field.name()) - } - DataType::Int64 => self.build_primitive_array::(rows, field.name()), - DataType::Int32 => self.build_primitive_array::(rows, field.name()), - DataType::Int16 => self.build_primitive_array::(rows, field.name()), - DataType::Int8 => self.build_primitive_array::(rows, field.name()), - DataType::UInt64 => { - self.build_primitive_array::(rows, field.name()) - } - DataType::UInt32 => { - self.build_primitive_array::(rows, field.name()) - } - DataType::UInt16 => { - self.build_primitive_array::(rows, field.name()) + let arrays: Result> = + self.schema + .clone() + .fields() + .iter() + .filter(|field| { + if projection.is_empty() { + return true; } - DataType::UInt8 => self.build_primitive_array::(rows, field.name()), - DataType::Timestamp(unit, _) => - match unit { - TimeUnit::Second => self.build_primitive_array::(rows, field.name()), - TimeUnit::Microsecond => self.build_primitive_array::(rows, field.name()), - TimeUnit::Millisecond => self.build_primitive_array::(rows, field.name()), - TimeUnit::Nanosecond => self.build_primitive_array::(rows, field.name()), + projection.contains(field.name()) + }) + .map(|field| { + match field.data_type().clone() { + DataType::Null => unimplemented!(), + DataType::Boolean => self.build_boolean_array(rows, field.name()), + DataType::Float64 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Float32 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Int64 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Int32 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Int16 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Int8 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::UInt64 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::UInt32 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::UInt16 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::UInt8 => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Timestamp(unit, _) => match unit { + TimeUnit::Second => self + .build_primitive_array::( + rows, + field.name(), + ), + TimeUnit::Microsecond => self + .build_primitive_array::( + rows, + field.name(), + ), + TimeUnit::Millisecond => self + .build_primitive_array::( + rows, + field.name(), + ), + TimeUnit::Nanosecond => self + .build_primitive_array::( + rows, + field.name(), + ), }, - DataType::Date64(_) => self.build_primitive_array::(rows, field.name()), - DataType::Date32(_) => self.build_primitive_array::(rows, field.name()), - DataType::Time64(unit) => - match unit { - TimeUnit::Microsecond => self.build_primitive_array::(rows, field.name()), - TimeUnit::Nanosecond => self.build_primitive_array::(rows, field.name()), + DataType::Date64(_) => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Date32(_) => { + self.build_primitive_array::(rows, field.name()) + } + DataType::Time64(unit) => match unit { + TimeUnit::Microsecond => self + .build_primitive_array::( + rows, + field.name(), + ), + TimeUnit::Nanosecond => self + .build_primitive_array::( + rows, + field.name(), + ), _ => unimplemented!(), }, - DataType::Time32(unit) => - match unit { - TimeUnit::Second => self.build_primitive_array::(rows, field.name()), - TimeUnit::Millisecond => self.build_primitive_array::(rows, field.name()), + DataType::Time32(unit) => match unit { + TimeUnit::Second => self + .build_primitive_array::( + rows, + field.name(), + ), + TimeUnit::Millisecond => self + .build_primitive_array::( + rows, + field.name(), + ), _ => unimplemented!(), }, - DataType::Utf8 => { - let mut builder = StringBuilder::new(rows.len()); - for row in rows { - if let Some(value) = row.get(field.name()) { - if let Some(str_v) = value.as_str() { - builder.append_value(str_v)? - } else { - builder.append(false)? - } - } else { - builder.append(false)? - } - } - Ok(Arc::new(builder.finish()) as ArrayRef) - } - DataType::List(ref t) => match **t { - DataType::Int8 => self.build_list_array::(rows, field.name()), - DataType::Int16 => self.build_list_array::(rows, field.name()), - DataType::Int32 => self.build_list_array::(rows, field.name()), - DataType::Int64 => self.build_list_array::(rows, field.name()), - DataType::UInt8 => self.build_list_array::(rows, field.name()), - DataType::UInt16 => self.build_list_array::(rows, field.name()), - DataType::UInt32 => self.build_list_array::(rows, field.name()), - DataType::UInt64 => self.build_list_array::(rows, field.name()), - DataType::Float32 => self.build_list_array::(rows, field.name()), - DataType::Float64 => self.build_list_array::(rows, field.name()), - DataType::Null => unimplemented!(), - DataType::Boolean => self.build_boolean_list_array(rows, field.name()), DataType::Utf8 => { - let values_builder = StringBuilder::new(rows.len() * 5); - let mut builder = ListBuilder::new(values_builder); + let mut builder = StringBuilder::new(rows.len()); for row in rows { if let Some(value) = row.get(field.name()) { - // value can be an array or a scalar - let vals: Vec> = if let Value::String(v) = value { - vec![Some(v.to_string())] - } else if let Value::Array(n) = value { - n.iter().map(|v: &Value| { - if v.is_string() { - Some(v.as_str().unwrap().to_string()) - } else if v.is_array() || v.is_object() { - // implicitly drop nested values - // TODO support deep-nesting - None - } else { - Some(v.to_string()) - } - }).collect() - } else if let Value::Null = value { - vec![None] - } else if !value.is_object() { - vec![Some(value.to_string())] + if let Some(str_v) = value.as_str() { + builder.append_value(str_v)? } else { - return Err(ArrowError::JsonError("Only scalars are currently supported in JSON arrays".to_string())); - }; - for val in vals { - if let Some(v) = val { - builder.values().append_value(&v)? - } else { - builder.values().append_null()? - }; + builder.append(false)? } + } else { + builder.append(false)? } - builder.append(true)? } Ok(Arc::new(builder.finish()) as ArrayRef) } - _ => Err(ArrowError::JsonError("Data type is currently not supported in a list".to_string())), - }, - DataType::Dictionary(ref key_typ, ref value_type) => { - if let DataType::Utf8 = **value_type { - match **key_typ { - DataType::Int8 => self.build_dictionary_array::(rows, field.name()), - DataType::Int16 => self.build_dictionary_array::(rows, field.name()), - DataType::Int32 => self.build_dictionary_array::(rows, field.name()), - DataType::Int64 => self.build_dictionary_array::(rows, field.name()), - DataType::UInt8 => self.build_dictionary_array::(rows, field.name()), - DataType::UInt16 => self.build_dictionary_array::(rows, field.name()), - DataType::UInt32 => self.build_dictionary_array::(rows, field.name()), - DataType::UInt64 => self.build_dictionary_array::(rows, field.name()), - _ => Err(ArrowError::JsonError("unsupported dictionary key type".to_string())) + DataType::List(ref t) => { + match **t { + DataType::Int8 => { + self.build_list_array::(rows, field.name()) + } + DataType::Int16 => { + self.build_list_array::(rows, field.name()) + } + DataType::Int32 => { + self.build_list_array::(rows, field.name()) + } + DataType::Int64 => { + self.build_list_array::(rows, field.name()) + } + DataType::UInt8 => { + self.build_list_array::(rows, field.name()) + } + DataType::UInt16 => self + .build_list_array::(rows, field.name()), + DataType::UInt32 => self + .build_list_array::(rows, field.name()), + DataType::UInt64 => self + .build_list_array::(rows, field.name()), + DataType::Float32 => self + .build_list_array::(rows, field.name()), + DataType::Float64 => self + .build_list_array::(rows, field.name()), + DataType::Null => unimplemented!(), + DataType::Boolean => { + self.build_boolean_list_array(rows, field.name()) + } + ref dtype @ DataType::Utf8 => { + // UInt64Type passed down below is a fake type for dictionary builder. + // It is there to make compiler happy. + self.list_array_string_array_builder::( + &dtype, + field.name(), + rows, + ) + } + DataType::Dictionary(ref key_ty, _) => self + .build_wrapped_list_array(rows, field.name(), key_ty), + ref e => Err(ArrowError::JsonError(format!( + "Data type is currently not supported in a list : {:?}", + e + ))), } - } else { - Err(ArrowError::JsonError("dictionary types other than UTF-8 not yet supported".to_string())) } + DataType::Dictionary(ref key_ty, ref val_ty) => self + .build_string_dictionary_array( + rows, + field.name(), + key_ty, + val_ty, + ), + DataType::Struct(_) => Err(ArrowError::JsonError( + "struct types are not yet supported".to_string(), + )), + _ => Err(ArrowError::JsonError(format!( + "{:?} type is not supported", + field.data_type() + ))), } - DataType::Struct(_) => Err(ArrowError::JsonError("struct types are not yet supported".to_string())), - _ => Err(ArrowError::JsonError(format!("{:?} type is not supported", field.data_type()))), - } - }) - .collect(); + }) + .collect(); let projected_fields: Vec = if projection.is_empty() { self.schema.fields().to_vec() @@ -612,6 +645,240 @@ impl Reader { arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr).map(Some)) } + fn build_wrapped_list_array( + &self, + rows: &[Value], + col_name: &str, + key_type: &DataType, + ) -> Result { + match *key_type { + DataType::Int8 => { + let dtype = DataType::Dictionary( + Box::new(DataType::Int8), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::Int16 => { + let dtype = DataType::Dictionary( + Box::new(DataType::Int16), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::Int32 => { + let dtype = DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::Int64 => { + let dtype = DataType::Dictionary( + Box::new(DataType::Int64), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::UInt8 => { + let dtype = DataType::Dictionary( + Box::new(DataType::UInt8), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::UInt16 => { + let dtype = DataType::Dictionary( + Box::new(DataType::UInt16), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::UInt32 => { + let dtype = DataType::Dictionary( + Box::new(DataType::UInt32), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + DataType::UInt64 => { + let dtype = DataType::Dictionary( + Box::new(DataType::UInt64), + Box::new(DataType::Utf8), + ); + self.list_array_string_array_builder::(&dtype, col_name, rows) + } + ref e => Err(ArrowError::JsonError(format!( + "Data type is currently not supported for dictionaries in list : {:?}", + e + ))), + } + } + + #[inline(always)] + fn list_array_string_array_builder( + &self, + data_type: &DataType, + col_name: &str, + rows: &[Value], + ) -> Result + where + DICT_TY: ArrowPrimitiveType + ArrowDictionaryKeyType, + { + let mut builder: RefCell> = match data_type { + DataType::Utf8 => { + let values_builder = StringBuilder::new(rows.len() * 5); + RefCell::new(Box::new(ListBuilder::new(values_builder))) + } + DataType::Dictionary(_, _) => { + let values_builder = + self.build_string_dictionary_builder::(rows.len() * 5)?; + RefCell::new(Box::new(ListBuilder::new(values_builder))) + } + e => { + return Err(ArrowError::JsonError(format!( + "Nested list data builder type is not supported: {:?}", + e + ))) + } + }; + + for row in rows { + if let Some(value) = row.get(col_name) { + // value can be an array or a scalar + let vals: Vec> = if let Value::String(v) = value { + vec![Some(v.to_string())] + } else if let Value::Array(n) = value { + n.iter() + .map(|v: &Value| { + if v.is_string() { + Some(v.as_str().unwrap().to_string()) + } else if v.is_array() || v.is_object() { + // implicitly drop nested values + // TODO support deep-nesting + None + } else { + Some(v.to_string()) + } + }) + .collect() + } else if let Value::Null = value { + vec![None] + } else if !value.is_object() { + vec![Some(value.to_string())] + } else { + return Err(ArrowError::JsonError( + "Only scalars are currently supported in JSON arrays".to_string(), + )); + }; + + // TODO: ARROW-10335: APIs of dictionary arrays and others are different. Unify + // them. + match data_type { + DataType::Utf8 => { + let builder = &mut builder.borrow_mut(); + let builder = builder + .as_any_mut() + .downcast_mut::>() + .ok_or(ArrowError::JsonError( + "Cast failed for ListBuilder during nested data parsing".to_string(), + ))?; + for val in vals { + if let Some(v) = val { + builder.values().append_value(&v)? + } else { + builder.values().append_null()? + }; + } + + // Append to the list + builder.append(true)?; + } + DataType::Dictionary(_, _) => { + let builder = &mut builder.borrow_mut(); + let builder = builder.as_any_mut().downcast_mut::>>().ok_or(ArrowError::JsonError( + "Cast failed for ListBuilder during nested data parsing".to_string(), + ))?; + for val in vals { + if let Some(v) = val { + let _ = builder.values().append(&v)?; + } else { + builder.values().append_null()? + }; + } + + // Append to the list + builder.append(true)?; + } + e => { + return Err(ArrowError::JsonError(format!( + "Nested list data builder type is not supported: {:?}", + e + ))) + } + } + } + } + + Ok(builder.get_mut().finish() as ArrayRef) + } + + #[inline(always)] + fn build_string_dictionary_builder( + &self, + row_len: usize, + ) -> Result> + where + T: ArrowPrimitiveType + ArrowDictionaryKeyType, + { + let key_builder = PrimitiveBuilder::::new(row_len); + let values_builder = StringBuilder::new(row_len * 5); + Ok(StringDictionaryBuilder::new(key_builder, values_builder)) + } + + #[inline(always)] + fn build_string_dictionary_array( + &self, + rows: &[Value], + col_name: &str, + key_type: &DataType, + value_type: &DataType, + ) -> Result { + if let DataType::Utf8 = *value_type { + match *key_type { + DataType::Int8 => self.build_dictionary_array::(rows, col_name), + DataType::Int16 => { + self.build_dictionary_array::(rows, col_name) + } + DataType::Int32 => { + self.build_dictionary_array::(rows, col_name) + } + DataType::Int64 => { + self.build_dictionary_array::(rows, col_name) + } + DataType::UInt8 => { + self.build_dictionary_array::(rows, col_name) + } + DataType::UInt16 => { + self.build_dictionary_array::(rows, col_name) + } + DataType::UInt32 => { + self.build_dictionary_array::(rows, col_name) + } + DataType::UInt64 => { + self.build_dictionary_array::(rows, col_name) + } + _ => Err(ArrowError::JsonError( + "unsupported dictionary key type".to_string(), + )), + } + } else { + Err(ArrowError::JsonError( + "dictionary types other than UTF-8 not yet supported".to_string(), + )) + } + } + fn build_boolean_array(&self, rows: &[Value], col_name: &str) -> Result { let mut builder = BooleanBuilder::new(rows.len()); for row in rows { @@ -722,18 +989,18 @@ impl Reader { Ok(Arc::new(builder.finish())) } - fn build_dictionary_array( + #[inline(always)] + fn build_dictionary_array( &self, rows: &[Value], col_name: &str, ) -> Result where T::Native: num::NumCast, - T: ArrowDictionaryKeyType, + T: ArrowPrimitiveType + ArrowDictionaryKeyType, { - let key_builder = PrimitiveBuilder::::new(rows.len()); - let value_builder = StringBuilder::new(100); - let mut builder = StringDictionaryBuilder::new(key_builder, value_builder); + let mut builder: StringDictionaryBuilder = + self.build_string_dictionary_builder(rows.len())?; for row in rows { if let Some(value) = row.get(&col_name) { if let Some(str_v) = value.as_str() { @@ -855,7 +1122,7 @@ impl ReaderBuilder { #[cfg(test)] mod tests { - use crate::datatypes::DataType::Dictionary; + use crate::datatypes::DataType::{Dictionary, List}; use super::*; use flate2::read::GzDecoder; @@ -1385,6 +1652,63 @@ mod tests { ); } + #[test] + fn test_list_of_string_dictionary_from_json() { + let schema = Schema::new(vec![Field::new( + "events", + List(Box::new(Dictionary( + Box::new(DataType::UInt64), + Box::new(DataType::Utf8), + ))), + true, + )]); + let builder = ReaderBuilder::new() + .with_schema(Arc::new(schema)) + .with_batch_size(64); + let mut reader: Reader = builder + .build::(File::open("test/data/list_string_dict_nested.json").unwrap()) + .unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_columns()); + assert_eq!(3, batch.num_rows()); + + let schema = reader.schema(); + let batch_schema = batch.schema(); + assert_eq!(schema, batch_schema); + + let events = schema.column_with_name("events").unwrap(); + assert_eq!( + &List(Box::new(Dictionary( + Box::new(DataType::UInt64), + Box::new(DataType::Utf8) + ))), + events.1.data_type() + ); + + let evs_list = batch + .column(events.0) + .as_any() + .downcast_ref::() + .unwrap(); + let evs_list = evs_list.values(); + let evs_list = evs_list + .as_any() + .downcast_ref::>() + .unwrap(); + assert_eq!(6, evs_list.len()); + assert_eq!(true, evs_list.is_valid(1)); + assert_eq!(DataType::Utf8, evs_list.value_type()); + + // dict from the events list + let dict_el = evs_list.values(); + let dict_el = dict_el.as_any().downcast_ref::().unwrap(); + assert_eq!(3, dict_el.len()); + assert_eq!("Elect Leader", dict_el.value(0)); + assert_eq!("Do Ballot", dict_el.value(1)); + assert_eq!("Send Data", dict_el.value(2)); + } + #[test] fn test_dictionary_from_json_uint8() { let schema = Schema::new(vec![Field::new( @@ -1471,6 +1795,7 @@ mod tests { d.1.data_type() ); } + #[test] fn test_with_multiple_batches() { let builder = ReaderBuilder::new() diff --git a/rust/arrow/test/data/list_string_dict_nested.json b/rust/arrow/test/data/list_string_dict_nested.json new file mode 100644 index 00000000000..d215b318bae --- /dev/null +++ b/rust/arrow/test/data/list_string_dict_nested.json @@ -0,0 +1,3 @@ +{"machine": "a", "events": ["Elect Leader", "Do Ballot"]} +{"machine": "b", "events": ["Do Ballot", "Send Data", "Elect Leader"]} +{"machine": "c", "events": ["Send Data"]}