diff --git a/arrow-json/src/reader/boolean_array.rs b/arrow-json/src/reader/boolean_array.rs index 17c0586dfad2..2f18cff31800 100644 --- a/arrow-json/src/reader/boolean_array.rs +++ b/arrow-json/src/reader/boolean_array.rs @@ -21,11 +21,20 @@ use arrow_array::ArrayRef; use arrow_array::builder::BooleanBuilder; use arrow_schema::ArrowError; -use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::{ArrayDecoder, DecoderContext}; #[derive(Default)] -pub struct BooleanArrayDecoder {} +pub struct BooleanArrayDecoder { + ignore_type_conflicts: bool, +} +impl BooleanArrayDecoder { + pub fn new(ctx: &DecoderContext) -> Self { + Self { + ignore_type_conflicts: ctx.ignore_type_conflicts(), + } + } +} impl ArrayDecoder for BooleanArrayDecoder { fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { @@ -35,6 +44,7 @@ impl ArrayDecoder for BooleanArrayDecoder { TapeElement::Null => builder.append_null(), TapeElement::True => builder.append_value(true), TapeElement::False => builder.append_value(false), + _ if self.ignore_type_conflicts => builder.append_null(), _ => return Err(tape.error(*p, "boolean")), } } diff --git a/arrow-json/src/reader/decimal_array.rs b/arrow-json/src/reader/decimal_array.rs index c9936e04a454..4eaab4847dc0 100644 --- a/arrow-json/src/reader/decimal_array.rs +++ b/arrow-json/src/reader/decimal_array.rs @@ -24,21 +24,23 @@ use arrow_array::types::DecimalType; use arrow_cast::parse::parse_decimal; use arrow_schema::ArrowError; -use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::{ArrayDecoder, DecoderContext}; pub struct DecimalArrayDecoder { precision: u8, scale: i8, + ignore_type_conflicts: bool, // Invariant and Send phantom: PhantomData D>, } impl DecimalArrayDecoder { - pub fn new(precision: u8, scale: i8) -> Self { + pub fn new(ctx: &DecoderContext, precision: u8, scale: i8) -> Self { Self { precision, scale, + ignore_type_conflicts: ctx.ignore_type_conflicts(), phantom: PhantomData, } } @@ -51,46 +53,48 @@ where fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = PrimitiveBuilder::::with_capacity(pos.len()); + #[allow(unused)] // initial value overwritten without ever being read + let mut anchor = String::default(); for p in pos { - match tape.get(*p) { - TapeElement::Null => builder.append_null(), - TapeElement::String(idx) => { - let s = tape.get_string(idx); - let value = parse_decimal::(s, self.precision, self.scale)?; - builder.append_value(value) - } - TapeElement::Number(idx) => { - let s = tape.get_string(idx); - let value = parse_decimal::(s, self.precision, self.scale)?; - builder.append_value(value) + let value = match tape.get(*p) { + TapeElement::Null => { + builder.append_null(); + continue; } + TapeElement::String(idx) | TapeElement::Number(idx) => tape.get_string(idx), TapeElement::I64(high) => match tape.get(*p + 1) { TapeElement::I32(low) => { - let val = (((high as i64) << 32) | (low as u32) as i64).to_string(); - let value = parse_decimal::(&val, self.precision, self.scale)?; - builder.append_value(value) + anchor = (((high as i64) << 32) | (low as u32) as i64).to_string(); + anchor.as_str() } _ => unreachable!(), }, TapeElement::I32(val) => { - let s = val.to_string(); - let value = parse_decimal::(&s, self.precision, self.scale)?; - builder.append_value(value) + anchor = val.to_string(); + anchor.as_str() } TapeElement::F64(high) => match tape.get(*p + 1) { TapeElement::F32(low) => { - let val = f64::from_bits(((high as u64) << 32) | low as u64).to_string(); - let value = parse_decimal::(&val, self.precision, self.scale)?; - builder.append_value(value) + anchor = f64::from_bits(((high as u64) << 32) | low as u64).to_string(); + anchor.as_str() } _ => unreachable!(), }, TapeElement::F32(val) => { - let s = f32::from_bits(val).to_string(); - let value = parse_decimal::(&s, self.precision, self.scale)?; - builder.append_value(value) + anchor = f32::from_bits(val).to_string(); + anchor.as_str() + } + _ if self.ignore_type_conflicts => { + builder.append_null(); + continue; } _ => return Err(tape.error(*p, "decimal")), + }; + + match parse_decimal::(value, self.precision, self.scale) { + Ok(value) => builder.append_value(value), + Err(_) if self.ignore_type_conflicts => builder.append_null(), + Err(e) => return Err(e), } } diff --git a/arrow-json/src/reader/list_array.rs b/arrow-json/src/reader/list_array.rs index b11124576df2..113e628541c6 100644 --- a/arrow-json/src/reader/list_array.rs +++ b/arrow-json/src/reader/list_array.rs @@ -35,6 +35,7 @@ pub struct ListLikeArrayDecoder { field: FieldRef, decoder: Box, phantom: PhantomData, + ignore_type_conflicts: bool, is_nullable: bool, } @@ -57,6 +58,7 @@ impl ListLikeArrayDecoder { field: field.clone(), decoder, phantom: Default::default(), + ignore_type_conflicts: ctx.ignore_type_conflicts(), is_nullable, }) } @@ -83,6 +85,10 @@ impl ArrayDecoder for ListLikeArrayDeco nulls.append(false); *p + 1 } + (_, Some(nulls)) if self.ignore_type_conflicts => { + nulls.append(false); + *p + 1 + } _ => return Err(tape.error(*p, "[")), }; diff --git a/arrow-json/src/reader/map_array.rs b/arrow-json/src/reader/map_array.rs index 4ec855a666c3..87cd84cc3e54 100644 --- a/arrow-json/src/reader/map_array.rs +++ b/arrow-json/src/reader/map_array.rs @@ -32,6 +32,7 @@ pub struct MapArrayDecoder { ordered: bool, keys: Box, values: Box, + ignore_type_conflicts: bool, is_nullable: bool, } @@ -75,6 +76,7 @@ impl MapArrayDecoder { ordered, keys, values, + ignore_type_conflicts: ctx.ignore_type_conflicts(), is_nullable, }) } @@ -103,6 +105,10 @@ impl ArrayDecoder for MapArrayDecoder { nulls.append(false); p + 1 } + (_, Some(nulls)) if self.ignore_type_conflicts => { + nulls.append(false); + p + 1 + } _ => return Err(tape.error(p, "{")), }; diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index 497ec4c3f398..62c13c70ed99 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -187,6 +187,7 @@ pub struct ReaderBuilder { batch_size: usize, coerce_primitive: bool, strict_mode: bool, + ignore_type_conflicts: bool, is_field: bool, struct_mode: StructMode, @@ -207,6 +208,7 @@ impl ReaderBuilder { batch_size: 1024, coerce_primitive: false, strict_mode: false, + ignore_type_conflicts: false, is_field: false, struct_mode: Default::default(), schema, @@ -248,6 +250,7 @@ impl ReaderBuilder { batch_size: 1024, coerce_primitive: false, strict_mode: false, + ignore_type_conflicts: false, is_field: true, struct_mode: Default::default(), schema: Arc::new(Schema::new([field.into()])), @@ -290,6 +293,25 @@ impl ReaderBuilder { } } + /// Sets whether the decoder should produce NULL instead of returning an error if it encounters + /// value that can not be parsed into the specified column type. + /// + /// For example, if the type is declared to be a nullable array of `DataType::Int32` but the + /// reader encounters a string value `"foo"` and the value `ignore_type_conflicts` is: + /// + /// * `false` (the default): The reader will return an error. + /// + /// * `true`: The reader will fill in NULL value for that array element. + /// + /// NOTE: An inferred NULL due to a type conflict will still produce parsing errors for + /// non-nullable fields, the same as any other NULL or missing value. + pub fn with_ignore_type_conflicts(self, ignore_type_conflicts: bool) -> Self { + Self { + ignore_type_conflicts, + ..self + } + } + /// Create a [`Reader`] with the provided [`BufRead`] pub fn build(self, reader: R) -> Result, ArrowError> { Ok(Reader { @@ -313,6 +335,7 @@ impl ReaderBuilder { coerce_primitive: self.coerce_primitive, strict_mode: self.strict_mode, struct_mode: self.struct_mode, + ignore_type_conflicts: self.ignore_type_conflicts, }; let decoder = ctx.make_decoder(data_type.as_ref(), nullable)?; @@ -695,6 +718,8 @@ pub struct DecoderContext { strict_mode: bool, /// How to decode struct fields struct_mode: StructMode, + /// Whether to treat columns with incompatible types as missing (i.e. NULL) + ignore_type_conflicts: bool, } impl DecoderContext { @@ -713,6 +738,11 @@ impl DecoderContext { self.struct_mode } + /// Returns whether to treat columns with incompatible types as missing (i.e. NULL) + pub fn ignore_type_conflicts(&self) -> bool { + self.ignore_type_conflicts + } + /// Create a decoder for a type. /// /// This is the standard way to create child decoders from within a decoder @@ -726,51 +756,62 @@ impl DecoderContext { } } -macro_rules! primitive_decoder { - ($t:ty, $data_type:expr) => { - Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type))) - }; -} - fn make_decoder( ctx: &DecoderContext, data_type: &DataType, is_nullable: bool, ) -> Result, ArrowError> { - let coerce_primitive = ctx.coerce_primitive(); + macro_rules! primitive_decoder { + ($t:ty, $data_type:expr) => { + Ok(Box::new(PrimitiveArrayDecoder::<$t>::new(ctx, $data_type))) + }; + } + macro_rules! timestamp_decoder { + ($t:ty, $data_type:expr, $tz:expr) => {{ + Ok(Box::new(TimestampArrayDecoder::<$t, _>::new( + ctx, $data_type, $tz, + ))) + }}; + } + macro_rules! decimal_decoder { + ($t:ty, $p:expr, $s:expr) => { + Ok(Box::new(DecimalArrayDecoder::<$t>::new(ctx, $p, $s))) + }; + } + downcast_integer! { *data_type => (primitive_decoder, data_type), - DataType::Null => Ok(Box::::default()), + DataType::Null => Ok(Box::new(NullArrayDecoder::new(ctx))), DataType::Float16 => primitive_decoder!(Float16Type, data_type), DataType::Float32 => primitive_decoder!(Float32Type, data_type), DataType::Float64 => primitive_decoder!(Float64Type, data_type), DataType::Timestamp(TimeUnit::Second, None) => { - Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc))) + timestamp_decoder!(TimestampSecondType, data_type, Utc) }, DataType::Timestamp(TimeUnit::Millisecond, None) => { - Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc))) + timestamp_decoder!(TimestampMillisecondType, data_type, Utc) }, DataType::Timestamp(TimeUnit::Microsecond, None) => { - Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc))) + timestamp_decoder!(TimestampMicrosecondType, data_type, Utc) }, DataType::Timestamp(TimeUnit::Nanosecond, None) => { - Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc))) + timestamp_decoder!(TimestampNanosecondType, data_type, Utc) }, DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => { let tz: Tz = tz.parse()?; - Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz))) + timestamp_decoder!(TimestampSecondType, data_type, tz) }, DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => { let tz: Tz = tz.parse()?; - Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz))) + timestamp_decoder!(TimestampMillisecondType, data_type, tz) }, DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => { let tz: Tz = tz.parse()?; - Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz))) + timestamp_decoder!(TimestampMicrosecondType, data_type, tz) }, DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => { let tz: Tz = tz.parse()?; - Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz))) + timestamp_decoder!(TimestampNanosecondType, data_type, tz) }, DataType::Date32 => primitive_decoder!(Date32Type, data_type), DataType::Date64 => primitive_decoder!(Date64Type, data_type), @@ -782,14 +823,14 @@ fn make_decoder( DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type), DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type), DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type), - DataType::Decimal32(p, s) => Ok(Box::new(DecimalArrayDecoder::::new(p, s))), - DataType::Decimal64(p, s) => Ok(Box::new(DecimalArrayDecoder::::new(p, s))), - DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::::new(p, s))), - DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::::new(p, s))), - DataType::Boolean => Ok(Box::::default()), - DataType::Utf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), - DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))), - DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), + DataType::Decimal32(p, s) => decimal_decoder!(Decimal32Type, p, s), + DataType::Decimal64(p, s) => decimal_decoder!(Decimal64Type, p, s), + DataType::Decimal128(p, s) => decimal_decoder!(Decimal128Type, p, s), + DataType::Decimal256(p, s) => decimal_decoder!(Decimal256Type, p, s), + DataType::Boolean => Ok(Box::new(BooleanArrayDecoder::new(ctx))), + DataType::Utf8 => Ok(Box::new(StringArrayDecoder::::new(ctx))), + DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(ctx))), + DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::::new(ctx))), DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(ctx, data_type, is_nullable)?)), DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(ctx, data_type, is_nullable)?)), DataType::ListView(_) => Ok(Box::new(ListViewArrayDecoder::::new(ctx, data_type, is_nullable)?)), @@ -812,19 +853,18 @@ fn make_decoder( #[cfg(test)] mod tests { - use serde_json::json; - use std::fs::File; - use std::io::{BufReader, Cursor, Seek}; - use arrow_array::cast::AsArray; use arrow_array::{ - Array, BooleanArray, Float64Array, GenericListViewArray, ListArray, OffsetSizeTrait, - StringArray, StringViewArray, StructArray, make_array, + Array, BooleanArray, Float64Array, GenericListViewArray, Int32Array, ListArray, MapArray, + NullArray, OffsetSizeTrait, StringArray, StringViewArray, StructArray, make_array, }; - use arrow_buffer::{ArrowNativeType, Buffer}; + use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer}; use arrow_cast::display::{ArrayFormatter, FormatOptions}; use arrow_data::ArrayDataBuilder; use arrow_schema::{Field, Fields}; + use serde_json::json; + use std::fs::File; + use std::io::{BufReader, Cursor, Seek}; use super::*; @@ -2944,6 +2984,370 @@ mod tests { ); } + #[test] + fn test_type_conflict_nulls() { + let schema = Schema::new(vec![ + Field::new("null", DataType::Null, true), + Field::new("bool", DataType::Boolean, true), + Field::new("primitive", DataType::Int32, true), + Field::new("numeric", DataType::Decimal128(10, 3), true), + Field::new("string", DataType::Utf8, true), + Field::new("string_view", DataType::Utf8View, true), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Second, None), + true, + ), + Field::new( + "array", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + Field::new( + "map", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Utf8, true), + ])), + false, // not nullable + )), + false, // not sorted + ), + true, // nullable + ), + Field::new( + "struct", + DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])), + true, + ), + ]); + + // A compatible value for each schema field above, in schema order + let json_values = vec![ + json!(null), + json!(true), + json!(42), + json!(1.234), + json!("hi"), + json!("ho"), + json!("1970-01-01T00:00:00+02:00"), + json!([1, "ho", 3]), + json!({"k": "value"}), + json!({"a": 1}), + ]; + + // Create a set of JSON rows that rotates each value past every field + let json: Vec<_> = (0..json_values.len()) + .map(|i| { + let pairs = json_values[i..] + .iter() + .chain(json_values[..i].iter()) + .zip(&schema.fields) + .map(|(v, f)| (f.name().to_string(), v.clone())) + .collect(); + serde_json::Value::Object(pairs) + }) + .collect(); + let mut decoder = ReaderBuilder::new(Arc::new(schema)) + .with_ignore_type_conflicts(true) + .with_coerce_primitive(true) + .build_decoder() + .unwrap(); + decoder.serialize(&json).unwrap(); + let batch = decoder.flush().unwrap().unwrap(); + assert_eq!(batch.num_rows(), 10); + assert_eq!(batch.num_columns(), 10); + + // NOTE: NullArray doesn't materialize any values (they're all NULL by definition) + let _ = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!( + batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .eq([ + Some(true), + None, + None, + None, + None, + None, + None, + None, + None, + None + ]) + ); + + assert!(batch.column(2).as_primitive::().iter().eq([ + Some(42), + Some(1), + None, + None, + None, + None, + None, + None, + None, + None + ])); + + assert!(batch.column(3).as_primitive::().iter().eq([ + Some(1234), + None, + None, + None, + None, + None, + None, + None, + None, + Some(42000) + ])); + + assert!( + batch + .column(4) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .eq([ + Some("hi"), + Some("ho"), + Some("1970-01-01T00:00:00+02:00"), + None, + None, + None, + None, + Some("true"), + Some("42"), + Some("1.234"), + ]) + ); + + assert!( + batch + .column(5) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .eq([ + Some("ho"), + Some("1970-01-01T00:00:00+02:00"), + None, + None, + None, + None, + Some("true"), + Some("42"), + Some("1.234"), + Some("hi"), + ]) + ); + + assert!( + batch + .column(6) + .as_primitive::() + .iter() + .eq([ + Some(-7200), + None, + None, + None, + None, + None, + Some(42), + None, + None, + None, + ]) + ); + + let arrays = batch + .column(7) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + arrays.nulls(), + Some(&NullBuffer::from( + &[ + true, false, false, false, false, false, false, false, false, false + ][..] + )) + ); + assert_eq!(arrays.offsets()[1], 3); + let array_values = arrays + .values() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(array_values.iter().eq([Some(1), None, Some(3)])); + + let maps = batch.column(8).as_any().downcast_ref::().unwrap(); + assert_eq!( + maps.nulls(), + Some(&NullBuffer::from( + // Both map and struct can parse + &[ + true, true, false, false, false, false, false, false, false, false + ][..] + )) + ); + let map_keys = maps.keys().as_any().downcast_ref::().unwrap(); + assert!(map_keys.iter().eq([Some("k"), Some("a")])); + let map_values = maps + .values() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(map_values.iter().eq([Some("value"), Some("1")])); + + let structs = batch + .column(9) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + structs.nulls(), + Some(&NullBuffer::from( + // Both map and struct can parse + &[ + true, false, false, false, false, false, false, false, false, true + ][..] + )) + ); + let struct_fields = structs + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(struct_fields.slice(0, 2).iter().eq([Some(1), None])); + } + + #[test] + fn test_type_conflict_non_nullable() { + let fields = [ + Field::new("bool", DataType::Boolean, false), + Field::new("primitive", DataType::Int32, false), + Field::new("numeric", DataType::Decimal128(10, 3), false), + Field::new("string", DataType::Utf8, false), + Field::new("string_view", DataType::Utf8View, false), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Second, None), + false, + ), + Field::new( + "array", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + false, + ), + Field::new( + "map", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Utf8, true), + ])), + false, // not nullable + )), + false, // not sorted + ), + false, // not nullable + ), + Field::new( + "struct", + DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])), + false, + ), + ]; + + // Every field above will have a type conflict with at least one of these values + let json_values = vec![json!(true), json!({"a": 1})]; + + for field in fields { + let mut decoder = ReaderBuilder::new_with_field(field) + .with_ignore_type_conflicts(true) + .build_decoder() + .unwrap(); + decoder.serialize(&json_values).unwrap(); + decoder + .flush() + .expect_err("type conflict on non-nullable type"); + } + } + + #[test] + fn test_ignore_type_conflicts_disabled() { + let fields = [ + Field::new("null", DataType::Null, true), + Field::new("bool", DataType::Boolean, true), + Field::new("primitive", DataType::Int32, true), + Field::new("numeric", DataType::Decimal128(10, 3), true), + Field::new("string", DataType::Utf8, true), + Field::new("string_view", DataType::Utf8View, true), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Second, None), + true, + ), + Field::new( + "array", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + Field::new( + "map", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Utf8, true), + ])), + false, // not nullable + )), + false, // not sorted + ), + true, // not nullable + ), + Field::new( + "struct", + DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])), + true, + ), + ]; + + // Every field above will have a type conflict with at least one of these values + let json_values = vec![json!(true), json!({"a": 1})]; + + for field in fields { + let mut decoder = ReaderBuilder::new_with_field(field) + .build_decoder() + .unwrap(); + decoder.serialize(&json_values).unwrap(); + decoder + .flush() + .expect_err("type conflict on non-nullable type"); + } + } + #[test] fn test_read_run_end_encoded() { let buf = r#" diff --git a/arrow-json/src/reader/null_array.rs b/arrow-json/src/reader/null_array.rs index 9c6ac3d2886d..6a9660cfa35c 100644 --- a/arrow-json/src/reader/null_array.rs +++ b/arrow-json/src/reader/null_array.rs @@ -20,17 +20,28 @@ use std::sync::Arc; use arrow_array::{ArrayRef, NullArray}; use arrow_schema::ArrowError; -use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::{ArrayDecoder, DecoderContext}; #[derive(Default)] -pub struct NullArrayDecoder {} +pub struct NullArrayDecoder { + ignore_type_conflicts: bool, +} +impl NullArrayDecoder { + pub fn new(ctx: &DecoderContext) -> Self { + Self { + ignore_type_conflicts: ctx.ignore_type_conflicts(), + } + } +} impl ArrayDecoder for NullArrayDecoder { fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { - for p in pos { - if !matches!(tape.get(*p), TapeElement::Null) { - return Err(tape.error(*p, "null")); + if !self.ignore_type_conflicts { + for p in pos { + if !matches!(tape.get(*p), TapeElement::Null) { + return Err(tape.error(*p, "null")); + } } } Ok(Arc::new(NullArray::new(pos.len()))) diff --git a/arrow-json/src/reader/primitive_array.rs b/arrow-json/src/reader/primitive_array.rs index 559b82ea833d..b086954297a9 100644 --- a/arrow-json/src/reader/primitive_array.rs +++ b/arrow-json/src/reader/primitive_array.rs @@ -25,8 +25,8 @@ use arrow_schema::{ArrowError, DataType}; use half::f16; use num_traits::NumCast; -use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::{ArrayDecoder, DecoderContext}; /// A trait for JSON-specific primitive parsing logic /// @@ -75,14 +75,16 @@ impl ParseJsonNumber for f64 { pub struct PrimitiveArrayDecoder { data_type: DataType, + ignore_type_conflicts: bool, // Invariant and Send phantom: PhantomData P>, } impl PrimitiveArrayDecoder

{ - pub fn new(data_type: &DataType) -> Self { + pub fn new(ctx: &DecoderContext, data_type: &DataType) -> Self { Self { data_type: data_type.clone(), + ignore_type_conflicts: ctx.ignore_type_conflicts(), phantom: Default::default(), } } @@ -99,58 +101,56 @@ where let d = &self.data_type; for p in pos { - match tape.get(*p) { - TapeElement::Null => builder.append_null(), + let value = match tape.get(*p) { + TapeElement::Null => { + builder.append_null(); + continue; + } TapeElement::String(idx) => { let s = tape.get_string(idx); - let value = P::parse(s).ok_or_else(|| { + P::parse(s).ok_or_else(|| { ArrowError::JsonError(format!("failed to parse \"{s}\" as {d}",)) - })?; - - builder.append_value(value) + }) } TapeElement::Number(idx) => { let s = tape.get_string(idx); - let value = ParseJsonNumber::parse(s.as_bytes()).ok_or_else(|| { + ParseJsonNumber::parse(s.as_bytes()).ok_or_else(|| { ArrowError::JsonError(format!("failed to parse {s} as {d}",)) - })?; - - builder.append_value(value) + }) } TapeElement::F32(v) => { let v = f32::from_bits(v); - let value = NumCast::from(v).ok_or_else(|| { + NumCast::from(v).ok_or_else(|| { ArrowError::JsonError(format!("failed to parse {v} as {d}",)) - })?; - builder.append_value(value) - } - TapeElement::I32(v) => { - let value = NumCast::from(v).ok_or_else(|| { - ArrowError::JsonError(format!("failed to parse {v} as {d}",)) - })?; - builder.append_value(value) + }) } + TapeElement::I32(v) => NumCast::from(v) + .ok_or_else(|| ArrowError::JsonError(format!("failed to parse {v} as {d}",))), TapeElement::F64(high) => match tape.get(p + 1) { TapeElement::F32(low) => { let v = f64::from_bits(((high as u64) << 32) | low as u64); - let value = NumCast::from(v).ok_or_else(|| { + NumCast::from(v).ok_or_else(|| { ArrowError::JsonError(format!("failed to parse {v} as {d}",)) - })?; - builder.append_value(value) + }) } _ => unreachable!(), }, TapeElement::I64(high) => match tape.get(p + 1) { TapeElement::I32(low) => { let v = ((high as i64) << 32) | (low as u32) as i64; - let value = NumCast::from(v).ok_or_else(|| { + NumCast::from(v).ok_or_else(|| { ArrowError::JsonError(format!("failed to parse {v} as {d}",)) - })?; - builder.append_value(value) + }) } _ => unreachable!(), }, - _ => return Err(tape.error(*p, "primitive")), + _ => Err(tape.error(*p, "primitive")), + }; + + match value { + Ok(value) => builder.append_value(value), + Err(_) if self.ignore_type_conflicts => builder.append_null(), + Err(e) => return Err(e), } } diff --git a/arrow-json/src/reader/string_array.rs b/arrow-json/src/reader/string_array.rs index 6cdfa060138d..6b6abc21ede8 100644 --- a/arrow-json/src/reader/string_array.rs +++ b/arrow-json/src/reader/string_array.rs @@ -24,21 +24,23 @@ use arrow_schema::ArrowError; use itoa; use ryu; -use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::{ArrayDecoder, DecoderContext}; const TRUE: &str = "true"; const FALSE: &str = "false"; pub struct StringArrayDecoder { coerce_primitive: bool, + ignore_type_conflicts: bool, phantom: PhantomData, } impl StringArrayDecoder { - pub fn new(coerce_primitive: bool) -> Self { + pub fn new(ctx: &DecoderContext) -> Self { Self { - coerce_primitive, + coerce_primitive: ctx.coerce_primitive(), + ignore_type_conflicts: ctx.ignore_type_conflicts(), phantom: Default::default(), } } @@ -73,6 +75,7 @@ impl ArrayDecoder for StringArrayDecoder { // An arbitrary estimate data_capacity += 10; } + _ if self.ignore_type_conflicts => {} _ => { return Err(tape.error(*p, "string")); } @@ -126,6 +129,7 @@ impl ArrayDecoder for StringArrayDecoder { } _ => unreachable!(), }, + _ if self.ignore_type_conflicts => builder.append_null(), _ => unreachable!(), } } diff --git a/arrow-json/src/reader/string_view_array.rs b/arrow-json/src/reader/string_view_array.rs index 5364317dfd25..1fc4627d3a01 100644 --- a/arrow-json/src/reader/string_view_array.rs +++ b/arrow-json/src/reader/string_view_array.rs @@ -23,19 +23,23 @@ use arrow_array::builder::GenericByteViewBuilder; use arrow_array::types::StringViewType; use arrow_schema::ArrowError; -use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::{ArrayDecoder, DecoderContext}; const TRUE: &str = "true"; const FALSE: &str = "false"; pub struct StringViewArrayDecoder { coerce_primitive: bool, + ignore_type_conflicts: bool, } impl StringViewArrayDecoder { - pub fn new(coerce_primitive: bool) -> Self { - Self { coerce_primitive } + pub fn new(ctx: &DecoderContext) -> Self { + Self { + coerce_primitive: ctx.coerce_primitive(), + ignore_type_conflicts: ctx.ignore_type_conflicts(), + } } } @@ -100,6 +104,7 @@ impl ArrayDecoder for StringViewArrayDecoder { TapeElement::F64(_) if coerce => { data_capacity += 10; } + _ if self.ignore_type_conflicts => {} // treat type conflicts like nulls _ => { return Err(tape.error(p, "string")); } @@ -156,6 +161,9 @@ impl ArrayDecoder for StringViewArrayDecoder { } _ => unreachable!(), }, + _ if self.ignore_type_conflicts => { + builder.append_null(); + } _ => unreachable!(), } } diff --git a/arrow-json/src/reader/struct_array.rs b/arrow-json/src/reader/struct_array.rs index 00dc55a5fd66..cfad1ed61213 100644 --- a/arrow-json/src/reader/struct_array.rs +++ b/arrow-json/src/reader/struct_array.rs @@ -75,6 +75,7 @@ pub struct StructArrayDecoder { data_type: DataType, decoders: Vec>, strict_mode: bool, + ignore_type_conflicts: bool, is_nullable: bool, struct_mode: StructMode, field_name_to_index: Option>, @@ -110,6 +111,7 @@ impl StructArrayDecoder { data_type: data_type.clone(), decoders, strict_mode: ctx.strict_mode(), + ignore_type_conflicts: ctx.ignore_type_conflicts(), is_nullable, struct_mode, field_name_to_index, @@ -144,6 +146,10 @@ impl ArrayDecoder for StructArrayDecoder { nulls.append(false); continue; } + (_, Some(nulls)) if self.ignore_type_conflicts => { + nulls.append(false); + continue; + } (_, _) => return Err(tape.error(*p, "{")), }; @@ -189,6 +195,10 @@ impl ArrayDecoder for StructArrayDecoder { nulls.append(false); continue; } + (_, Some(nulls)) if self.ignore_type_conflicts => { + nulls.append(false); + continue; + } (_, _) => return Err(tape.error(*p, "[")), }; diff --git a/arrow-json/src/reader/timestamp_array.rs b/arrow-json/src/reader/timestamp_array.rs index 3fe4dc07af36..7a3937565f6c 100644 --- a/arrow-json/src/reader/timestamp_array.rs +++ b/arrow-json/src/reader/timestamp_array.rs @@ -25,22 +25,24 @@ use arrow_cast::parse::string_to_datetime; use arrow_schema::{ArrowError, DataType, TimeUnit}; use chrono::TimeZone; -use crate::reader::ArrayDecoder; use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::{ArrayDecoder, DecoderContext}; /// A specialized [`ArrayDecoder`] for timestamps pub struct TimestampArrayDecoder { data_type: DataType, timezone: Tz, + ignore_type_conflicts: bool, // Invariant and Send phantom: PhantomData P>, } impl TimestampArrayDecoder { - pub fn new(data_type: &DataType, timezone: Tz) -> Self { + pub fn new(ctx: &DecoderContext, data_type: &DataType, timezone: Tz) -> Self { Self { data_type: data_type.clone(), timezone, + ignore_type_conflicts: ctx.ignore_type_conflicts(), phantom: Default::default(), } } @@ -54,10 +56,12 @@ where fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { let mut builder = PrimitiveBuilder::

::with_capacity(pos.len()).with_data_type(self.data_type.clone()); - for p in pos { - match tape.get(*p) { - TapeElement::Null => builder.append_null(), + let value = match tape.get(*p) { + TapeElement::Null => { + builder.append_null(); + continue; + } TapeElement::String(idx) => { let s = tape.get_string(idx); let date = string_to_datetime(&self.timezone, s).map_err(|e| { @@ -65,43 +69,44 @@ where "failed to parse \"{s}\" as {}: {}", self.data_type, e )) - })?; + }); - let value = match P::UNIT { - TimeUnit::Second => date.timestamp(), - TimeUnit::Millisecond => date.timestamp_millis(), - TimeUnit::Microsecond => date.timestamp_micros(), + date.and_then(|date| match P::UNIT { + TimeUnit::Second => Ok(date.timestamp()), + TimeUnit::Millisecond => Ok(date.timestamp_millis()), + TimeUnit::Microsecond => Ok(date.timestamp_micros()), TimeUnit::Nanosecond => date.timestamp_nanos_opt().ok_or_else(|| { ArrowError::ParseError(format!( "{} would overflow 64-bit signed nanoseconds", date.to_rfc3339(), )) - })?, - }; - builder.append_value(value) + }), + }) } TapeElement::Number(idx) => { let s = tape.get_string(idx); let b = s.as_bytes(); - let value = lexical_core::parse::(b) + lexical_core::parse::(b) .or_else(|_| lexical_core::parse::(b).map(|x| x as i64)) .map_err(|_| { ArrowError::JsonError(format!( "failed to parse {s} as {}", self.data_type )) - })?; - - builder.append_value(value) + }) } - TapeElement::I32(v) => builder.append_value(v as i64), + TapeElement::I32(v) => Ok(v as i64), TapeElement::I64(high) => match tape.get(p + 1) { - TapeElement::I32(low) => { - builder.append_value(((high as i64) << 32) | (low as u32) as i64) - } + TapeElement::I32(low) => Ok(((high as i64) << 32) | (low as u32) as i64), _ => unreachable!(), }, - _ => return Err(tape.error(*p, "primitive")), + _ => Err(tape.error(*p, "primitive")), + }; + + match value { + Ok(value) => builder.append_value(value), + Err(_) if self.ignore_type_conflicts => builder.append_null(), + Err(e) => return Err(e), } }