From 0ac80db1c7d1946c787d40c1ae36b81cce4c5176 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 10 Mar 2025 03:00:16 -0500 Subject: [PATCH 01/19] add public json encoder trait --- arrow-json/Cargo.toml | 4 + arrow-json/benches/encode.rs | 220 +++++++++++++++ arrow-json/src/lib.rs | 5 +- arrow-json/src/writer/encoder.rs | 344 ++++++++++++++++------- arrow-json/src/writer/mod.rs | 464 ++++++++++++++++++++++++++++++- 5 files changed, 934 insertions(+), 103 deletions(-) create mode 100644 arrow-json/benches/encode.rs diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index cbca108cfda2..09d66c23d40c 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -63,3 +63,7 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng" [[bench]] name = "serde" harness = false + +[[bench]] +name = "encode" +harness = false diff --git a/arrow-json/benches/encode.rs b/arrow-json/benches/encode.rs new file mode 100644 index 000000000000..f814c97d2cc3 --- /dev/null +++ b/arrow-json/benches/encode.rs @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder}; +use arrow_array::RecordBatch; +use arrow_json::{LineDelimitedWriter, WriterBuilder}; +use arrow_json::writer::LineDelimited; +use arrow_schema::{DataType, Field, Schema}; +use criterion::{criterion_group, criterion_main, Criterion, BenchmarkId, Throughput}; +use rand::Rng; + +#[derive(Debug, Clone, Copy)] +enum TestCaseNulls { + None, + All, +} + +impl std::fmt::Display for TestCaseNulls { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TestCaseNulls::None => write!(f, "none"), + TestCaseNulls::All => write!(f, "all"), + } + } +} + +impl TestCaseNulls { + fn is_nullable(&self) -> bool { + match self { + TestCaseNulls::None => false, + TestCaseNulls::All => true, + } + } +} + +#[derive(Debug, Clone, Copy)] +enum TestCaseNumRows { + Small, + Large, +} + +impl TestCaseNumRows { + fn row_count(&self) -> usize { + match self { + TestCaseNumRows::Small => 10, + TestCaseNumRows::Large => 100_000, + } + } +} + +/// Represents a test case configuration for benchmarking +#[derive(Debug)] +struct TestCase { + name: String, + data: RecordBatch, + explicit_nulls: bool, + null_generation: TestCaseNulls, + row_count: TestCaseNumRows, +} + +impl TestCase { + fn name(&self) -> String { + format!( + "{}_explicit_nulls:{}_nulls:{}_row_count:{}", + self.name, + self.explicit_nulls, + self.null_generation, + self.row_count.row_count(), + ) + } +} + +fn create_int32_array( + nulls: TestCaseNulls, + num_rows: TestCaseNumRows, +) -> RecordBatch { + let mut builder = Int32Builder::new(); + for _ in 0..num_rows.row_count() { + match nulls { + TestCaseNulls::None => builder.append_value(rand::random::()), + TestCaseNulls::All => builder.append_null(), + } + } + let array = builder.finish(); + let schema = Arc::new(Schema::new(vec![Field::new("int", DataType::Int32, nulls.is_nullable())])); + RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() +} + +fn create_string_array( + nulls: TestCaseNulls, + num_rows: TestCaseNumRows, +) -> RecordBatch { + let mut rng = rand::thread_rng(); + let mut builder = StringBuilder::new(); + for _ in 0..num_rows.row_count() { + match nulls { + TestCaseNulls::None => builder.append_value(rng.gen_range(0..100).to_string()), + TestCaseNulls::All => builder.append_null(), + } + } + let array = builder.finish(); + let schema = Arc::new(Schema::new(vec![Field::new("string", DataType::Utf8, nulls.is_nullable())])); + RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() +} + +fn crete_list_array( + nulls: TestCaseNulls, + num_rows: TestCaseNumRows, +) -> RecordBatch { + // create a list aray with 1-100 elements per row + let mut rng = rand::thread_rng(); + let mut builder= ListBuilder::new(Int32Builder::new()); + for _ in 0..num_rows.row_count() { + let len = rng.gen_range(1..100); + for _ in 0..len { + match nulls { + TestCaseNulls::None => builder.values().append_value(rng.gen_range(0..100)), + TestCaseNulls::All => builder.values().append_null(), + } + } + builder.append(true); + } + let array = builder.finish(); + let schema = Arc::new(Schema::new(vec![Field::new_list("list", Field::new("item", DataType::Int32, true), nulls.is_nullable())])); + RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() +} + + +/// Runs a benchmark with the default writer (skip nulls) +fn bench_default_writer(record_batch: &RecordBatch) { + let mut buffer = std::io::sink(); + let mut writer = LineDelimitedWriter::new(&mut buffer); + writer.write(record_batch).unwrap(); +} + +/// Runs a benchmark with explicit nulls +fn bench_explicit_nulls_writer(record_batch: &RecordBatch) { + let mut buffer = std::io::sink(); + let mut writer = WriterBuilder::new() + .with_explicit_nulls(true) + .build::<_, LineDelimited>(&mut buffer); + writer.write(record_batch).unwrap(); +} + +//------------------------------------------------------------------------------ +// Benchmark Definition +//------------------------------------------------------------------------------ + +fn bench_json_encoding(c: &mut Criterion) { + // Define all test cases + let mut cases = vec![]; + for nulls in [TestCaseNulls::None, TestCaseNulls::All] { + for size in [TestCaseNumRows::Large, TestCaseNumRows::Small] { + for explicit_nulls in [false, true] { + cases.push(TestCase { + name: "int32".to_string(), + data: create_int32_array(nulls, size), + explicit_nulls, + null_generation: nulls, + row_count: size, + }); + cases.push(TestCase { + name: "string".to_string(), + data: create_string_array(nulls, size), + explicit_nulls, + null_generation: nulls, + row_count: size, + }); + cases.push(TestCase { + name: "list".to_string(), + data: crete_list_array(nulls, size), + explicit_nulls, + null_generation: nulls, + row_count: size, + }); + } + } + } + + let mut group = c.benchmark_group("JSON Encoding"); + + // Run benchmarks for each test case + for test_case in cases { + + // Set up the benchmark + group.throughput(Throughput::Elements(test_case.row_count.row_count() as u64)); + group.bench_with_input( + BenchmarkId::new(test_case.name(), test_case.explicit_nulls), + &test_case.data, + |b, batch| { + if test_case.explicit_nulls { + b.iter(|| bench_explicit_nulls_writer(batch)); + } else { + b.iter(|| bench_default_writer(batch)); + } + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, bench_json_encoding); +criterion_main!(benches); \ No newline at end of file diff --git a/arrow-json/src/lib.rs b/arrow-json/src/lib.rs index ea0446c3d6b3..6d7ab4400b6e 100644 --- a/arrow-json/src/lib.rs +++ b/arrow-json/src/lib.rs @@ -75,7 +75,10 @@ pub mod reader; pub mod writer; pub use self::reader::{Reader, ReaderBuilder}; -pub use self::writer::{ArrayWriter, LineDelimitedWriter, Writer, WriterBuilder}; +pub use self::writer::{ + ArrayWriter, Encoder, EncoderFactory, EncoderOptions, LineDelimitedWriter, Writer, + WriterBuilder, +}; use half::f16; use serde_json::{Number, Value}; diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index 0b3c788d5519..5c8d832ba6d9 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -14,6 +14,8 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +use std::io::Write; +use std::sync::Arc; use crate::StructMode; use arrow_array::cast::AsArray; @@ -25,45 +27,78 @@ use arrow_schema::{ArrowError, DataType, FieldRef}; use half::f16; use lexical_core::FormattedSize; use serde::Serializer; -use std::io::Write; +/// Configuration options for the JSON encoder. #[derive(Debug, Clone, Default)] pub struct EncoderOptions { + /// Whether to include nulls in the output or elide them. pub explicit_nulls: bool, + /// Whether to encode structs as JSON objects or JSON arrays of their values. pub struct_mode: StructMode, + /// An optional hook for customizing encoding behavior. + pub encoder_factory: Option>, +} + +/// A trait to create custom encoders for specific data types. +/// +/// This allows overriding the default encoders for specific data types, +/// or adding new encoders for custom data types. +pub trait EncoderFactory: std::fmt::Debug + Send + Sync { + /// Make an encoder that if returned runs before all of the default encoders. + /// This can be used to override how e.g. binary data is encoded so that it is an encoded string or an array of integers. + /// + /// Note that the type of the field may not match the type of the array: for dictionary arrays unless the top-level dictionary is handled this + /// will be called again for the keys and values of the dictionary, at which point the field type will still be the outer dictionary type but the + /// array will have a different type. + /// For example, `field`` might have the type `Dictionary(i32, Utf8)` but `array` will be `Utf8`. + fn make_default_encoder<'a>( + &self, + _field: &'a FieldRef, + _array: &'a dyn Array, + _options: &'a EncoderOptions, + ) -> Result>, ArrowError> { + Ok(None) + } } /// A trait to format array values as JSON values /// /// Nullability is handled by the caller to allow encoding nulls implicitly, i.e. `{}` instead of `{"a": null}` pub trait Encoder { - /// Encode the non-null value at index `idx` to `out` + /// Encode the non-null value at index `idx` to `out`. /// - /// The behaviour is unspecified if `idx` corresponds to a null index + /// The behaviour is unspecified if `idx` corresponds to a null index. fn encode(&mut self, idx: usize, out: &mut Vec); + + /// Returns the nullability buffer for this encoder, if any. + /// + /// This replaces the `is_null` and `has_nulls` methods by returning the underlying + /// buffer state directly, avoiding dynamic dispatch for null checks. + fn nulls(&self) -> Option; } +/// Creates an encoder for the given array and field. +/// +/// This first calls the EncoderFactory if one is provided, and then falls back to the default encoders. pub fn make_encoder<'a>( + field: &'a FieldRef, array: &'a dyn Array, - options: &EncoderOptions, + options: &'a EncoderOptions, ) -> Result, ArrowError> { - let (encoder, nulls) = make_encoder_impl(array, options)?; - assert!(nulls.is_none(), "root cannot be nullable"); - Ok(encoder) -} - -fn make_encoder_impl<'a>( - array: &'a dyn Array, - options: &EncoderOptions, -) -> Result<(Box, Option), ArrowError> { macro_rules! primitive_helper { ($t:ty) => {{ let array = array.as_primitive::<$t>(); let nulls = array.nulls().cloned(); - (Box::new(PrimitiveEncoder::new(array)) as _, nulls) + Box::new(PrimitiveEncoder::new(array, nulls)) }}; } + if let Some(factory) = &options.encoder_factory { + if let Some(encoder) = factory.make_default_encoder(field, array, options)? { + return Ok(encoder); + } + } + Ok(downcast_integer! { array.data_type() => (primitive_helper), DataType::Float16 => primitive_helper!(Float16Type), @@ -71,80 +106,81 @@ fn make_encoder_impl<'a>( DataType::Float64 => primitive_helper!(Float64Type), DataType::Boolean => { let array = array.as_boolean(); - (Box::new(BooleanEncoder(array)), array.nulls().cloned()) + Box::new(BooleanEncoder(array)) } - DataType::Null => (Box::new(NullEncoder), array.logical_nulls()), + DataType::Null => Box::new(NullEncoder), DataType::Utf8 => { let array = array.as_string::(); - (Box::new(StringEncoder(array)) as _, array.nulls().cloned()) + Box::new(StringEncoder(array)) } DataType::LargeUtf8 => { let array = array.as_string::(); - (Box::new(StringEncoder(array)) as _, array.nulls().cloned()) + Box::new(StringEncoder(array)) as _ } DataType::Utf8View => { let array = array.as_string_view(); - (Box::new(StringViewEncoder(array)) as _, array.nulls().cloned()) + Box::new(StringViewEncoder(array)) as _ } DataType::List(_) => { let array = array.as_list::(); - (Box::new(ListEncoder::try_new(array, options)?) as _, array.nulls().cloned()) + Box::new(ListEncoder::try_new(field, array, options)?) as _ } DataType::LargeList(_) => { let array = array.as_list::(); - (Box::new(ListEncoder::try_new(array, options)?) as _, array.nulls().cloned()) + Box::new(ListEncoder::try_new(field, array, options)?) as _ } DataType::FixedSizeList(_, _) => { let array = array.as_fixed_size_list(); - (Box::new(FixedSizeListEncoder::try_new(array, options)?) as _, array.nulls().cloned()) + Box::new(FixedSizeListEncoder::try_new(field, array, options)?) as _ } DataType::Dictionary(_, _) => downcast_dictionary_array! { - array => (Box::new(DictionaryEncoder::try_new(array, options)?) as _, array.logical_nulls()), + array => Box::new(DictionaryEncoder::try_new(field, array, options)?) as _, _ => unreachable!() } DataType::Map(_, _) => { let array = array.as_map(); - (Box::new(MapEncoder::try_new(array, options)?) as _, array.nulls().cloned()) + Box::new(MapEncoder::try_new(field, array, options)?) as _ } DataType::FixedSizeBinary(_) => { let array = array.as_fixed_size_binary(); - (Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned()) + Box::new(BinaryEncoder::new(array)) as _ } DataType::Binary => { let array: &BinaryArray = array.as_binary(); - (Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned()) + Box::new(BinaryEncoder::new(array)) as _ } DataType::LargeBinary => { let array: &LargeBinaryArray = array.as_binary(); - (Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned()) + Box::new(BinaryEncoder::new(array)) as _ } DataType::Struct(fields) => { let array = array.as_struct(); let encoders = fields.iter().zip(array.columns()).map(|(field, array)| { - let (encoder, nulls) = make_encoder_impl(array, options)?; + let encoder = make_encoder(field, array, options)?; Ok(FieldEncoder{ field: field.clone(), - encoder, nulls + encoder, }) }).collect::, ArrowError>>()?; let encoder = StructArrayEncoder{ encoders, + nulls: array.nulls(), explicit_nulls: options.explicit_nulls, struct_mode: options.struct_mode, }; - (Box::new(encoder) as _, array.nulls().cloned()) + Box::new(encoder) as _ } DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => { let options = FormatOptions::new().with_display_error(true); - let formatter = ArrayFormatter::try_new(array, &options)?; - (Box::new(RawArrayFormatter(formatter)) as _, array.nulls().cloned()) + let formatter = JsonArrayFormatter::new(ArrayFormatter::try_new(array, &options)?, array.nulls()); + Box::new(RawArrayFormatter(formatter)) } d => match d.is_temporal() { true => { @@ -154,9 +190,13 @@ fn make_encoder_impl<'a>( // may need to be revisited let options = FormatOptions::new().with_display_error(true); let formatter = ArrayFormatter::try_new(array, &options)?; - (Box::new(formatter) as _, array.nulls().cloned()) + let formatter = JsonArrayFormatter::new(formatter, array.nulls()); + Box::new(formatter) } - false => return Err(ArrowError::InvalidArgumentError(format!("JSON Writer does not support data type: {d}"))), + false => return Err(ArrowError::JsonError(format!( + "Unsupported data type for JSON encoding: {:?}", + d + ))) } }) } @@ -169,11 +209,11 @@ fn encode_string(s: &str, out: &mut Vec) { struct FieldEncoder<'a> { field: FieldRef, encoder: Box, - nulls: Option, } struct StructArrayEncoder<'a> { encoders: Vec>, + nulls: Option<&'a NullBuffer>, explicit_nulls: bool, struct_mode: StructMode, } @@ -196,9 +236,18 @@ impl Encoder for StructArrayEncoder<'_> { let mut is_first = true; // Nulls can only be dropped in explicit mode let drop_nulls = (self.struct_mode == StructMode::ObjectOnly) && !self.explicit_nulls; - for field_encoder in &mut self.encoders { - let is_null = is_some_and(field_encoder.nulls.as_ref(), |n| n.is_null(idx)); - if drop_nulls && is_null { + + // Collect all the field nulls buffers up front to avoid dynamic dispatch in the loop + // This creates a temporary Vec, but avoids repeated virtual calls which should be a net win + let field_nulls: Vec<_> = self + .encoders + .iter() + .map(|field_encoder| field_encoder.encoder.nulls()) + .collect(); + + for (i, field_encoder) in self.encoders.iter_mut().enumerate() { + let is_null = field_nulls[i].is_null(idx); + if is_null && drop_nulls { continue; } @@ -212,9 +261,10 @@ impl Encoder for StructArrayEncoder<'_> { out.push(b':'); } - match is_null { - true => out.extend_from_slice(b"null"), - false => field_encoder.encoder.encode(idx, out), + if is_null { + out.extend_from_slice(b"null"); + } else { + field_encoder.encoder.encode(idx, out); } } match self.struct_mode { @@ -222,6 +272,10 @@ impl Encoder for StructArrayEncoder<'_> { StructMode::ListOnly => out.push(b']'), } } + + fn nulls(&self) -> Option { + self.nulls.cloned() + } } trait PrimitiveEncode: ArrowNativeType { @@ -290,15 +344,41 @@ impl PrimitiveEncode for f16 { } } +/// Extension trait providing null-related methods to `Option` +pub trait NullBufferExt { + /// Check if the value at `idx` is null. + fn is_null(&self, idx: usize) -> bool; + + /// Check if this buffer contains any nulls. + fn has_nulls(&self) -> bool; +} + +impl NullBufferExt for Option { + fn is_null(&self, idx: usize) -> bool { + self.as_ref() + .map(|nulls| nulls.is_null(idx)) + .unwrap_or_default() + } + + fn has_nulls(&self) -> bool { + self.is_some() + } +} + struct PrimitiveEncoder { values: ScalarBuffer, + nulls: Option, buffer: N::Buffer, } impl PrimitiveEncoder { - fn new>(array: &PrimitiveArray

) -> Self { + fn new>( + array: &PrimitiveArray

, + nulls: Option, + ) -> Self { Self { values: array.values().clone(), + nulls, buffer: N::init_buffer(), } } @@ -308,6 +388,10 @@ impl Encoder for PrimitiveEncoder { fn encode(&mut self, idx: usize, out: &mut Vec) { out.extend_from_slice(self.values[idx].encode(&mut self.buffer)); } + + fn nulls(&self) -> Option { + self.nulls.clone() + } } struct BooleanEncoder<'a>(&'a BooleanArray); @@ -319,6 +403,10 @@ impl Encoder for BooleanEncoder<'_> { false => out.extend_from_slice(b"false"), } } + + fn nulls(&self) -> Option { + self.0.nulls().cloned() + } } struct StringEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray); @@ -327,6 +415,10 @@ impl Encoder for StringEncoder<'_, O> { fn encode(&mut self, idx: usize, out: &mut Vec) { encode_string(self.0.value(idx), out); } + + fn nulls(&self) -> Option { + self.0.nulls().cloned() + } } struct StringViewEncoder<'a>(&'a StringViewArray); @@ -335,20 +427,26 @@ impl Encoder for StringViewEncoder<'_> { fn encode(&mut self, idx: usize, out: &mut Vec) { encode_string(self.0.value(idx), out); } + + fn nulls(&self) -> Option { + self.0.nulls().cloned() + } } struct ListEncoder<'a, O: OffsetSizeTrait> { offsets: OffsetBuffer, - nulls: Option, encoder: Box, + nulls: Option, } impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> { fn try_new( + field: &'a FieldRef, array: &'a GenericListArray, - options: &EncoderOptions, + options: &'a EncoderOptions, ) -> Result { - let (encoder, nulls) = make_encoder_impl(array.values().as_ref(), options)?; + let nulls = array.logical_nulls(); + let encoder = make_encoder(field, array.values().as_ref(), options)?; Ok(Self { offsets: array.offsets().clone(), encoder, @@ -362,43 +460,46 @@ impl Encoder for ListEncoder<'_, O> { let end = self.offsets[idx + 1].as_usize(); let start = self.offsets[idx].as_usize(); out.push(b'['); - match self.nulls.as_ref() { - Some(n) => (start..end).for_each(|idx| { - if idx != start { - out.push(b',') - } - match n.is_null(idx) { - true => out.extend_from_slice(b"null"), - false => self.encoder.encode(idx, out), - } - }), - None => (start..end).for_each(|idx| { - if idx != start { - out.push(b',') - } + + let item_nulls = self.encoder.nulls(); + + for idx in start..end { + if idx != start { + out.push(b',') + } + + if item_nulls.is_null(idx) { + out.extend_from_slice(b"null"); + } else { self.encoder.encode(idx, out); - }), + } } out.push(b']'); } + + fn nulls(&self) -> Option { + self.nulls.clone() + } } struct FixedSizeListEncoder<'a> { value_length: usize, - nulls: Option, encoder: Box, + nulls: Option, } impl<'a> FixedSizeListEncoder<'a> { fn try_new( + field: &'a FieldRef, array: &'a FixedSizeListArray, - options: &EncoderOptions, + options: &'a EncoderOptions, ) -> Result { - let (encoder, nulls) = make_encoder_impl(array.values().as_ref(), options)?; + let nulls = array.logical_nulls(); + let encoder = make_encoder(field, array.values().as_ref(), options)?; Ok(Self { encoder, - nulls, value_length: array.value_length().as_usize(), + nulls, }) } } @@ -408,42 +509,45 @@ impl Encoder for FixedSizeListEncoder<'_> { let start = idx * self.value_length; let end = start + self.value_length; out.push(b'['); - match self.nulls.as_ref() { - Some(n) => (start..end).for_each(|idx| { - if idx != start { - out.push(b','); - } - if n.is_null(idx) { - out.extend_from_slice(b"null"); - } else { - self.encoder.encode(idx, out); - } - }), - None => (start..end).for_each(|idx| { - if idx != start { - out.push(b','); - } + + let item_nulls = self.encoder.nulls(); + + for idx in start..end { + if idx != start { + out.push(b','); + } + if item_nulls.is_null(idx) { + out.extend_from_slice(b"null"); + } else { self.encoder.encode(idx, out); - }), + } } out.push(b']'); } + + fn nulls(&self) -> Option { + self.nulls.clone() + } } struct DictionaryEncoder<'a, K: ArrowDictionaryKeyType> { keys: ScalarBuffer, + nulls: Option, encoder: Box, } impl<'a, K: ArrowDictionaryKeyType> DictionaryEncoder<'a, K> { fn try_new( + field: &'a FieldRef, array: &'a DictionaryArray, - options: &EncoderOptions, + options: &'a EncoderOptions, ) -> Result { - let (encoder, _) = make_encoder_impl(array.values().as_ref(), options)?; + let nulls = array.logical_nulls(); + let encoder = make_encoder(field, array.values().as_ref(), options)?; Ok(Self { keys: array.keys().values().clone(), + nulls, encoder, }) } @@ -453,24 +557,48 @@ impl Encoder for DictionaryEncoder<'_, K> { fn encode(&mut self, idx: usize, out: &mut Vec) { self.encoder.encode(self.keys[idx].as_usize(), out) } + + fn nulls(&self) -> Option { + self.nulls.clone() + } +} + +/// A newtype wrapper around [`ArrayFormatter`] to keep our usage of it private and not implement `Encoder` for the public type +struct JsonArrayFormatter<'a> { + formatter: ArrayFormatter<'a>, + nulls: Option<&'a NullBuffer>, +} + +impl<'a> JsonArrayFormatter<'a> { + fn new(formatter: ArrayFormatter<'a>, nulls: Option<&'a NullBuffer>) -> Self { + Self { formatter, nulls } + } } -impl Encoder for ArrayFormatter<'_> { +impl Encoder for JsonArrayFormatter<'_> { fn encode(&mut self, idx: usize, out: &mut Vec) { out.push(b'"'); // Should be infallible // Note: We are making an assumption that the formatter does not produce characters that require escaping - let _ = write!(out, "{}", self.value(idx)); + let _ = write!(out, "{}", self.formatter.value(idx)); out.push(b'"') } + + fn nulls(&self) -> Option { + self.nulls.cloned() + } } -/// A newtype wrapper around [`ArrayFormatter`] that skips surrounding the value with `"` -struct RawArrayFormatter<'a>(ArrayFormatter<'a>); +/// A newtype wrapper around [`JsonArrayFormatter`] that skips surrounding the value with `"` +struct RawArrayFormatter<'a>(JsonArrayFormatter<'a>); impl Encoder for RawArrayFormatter<'_> { fn encode(&mut self, idx: usize, out: &mut Vec) { - let _ = write!(out, "{}", self.0.value(idx)); + let _ = write!(out, "{}", self.0.formatter.value(idx)); + } + + fn nulls(&self) -> Option { + self.0.nulls.cloned() } } @@ -480,20 +608,30 @@ impl Encoder for NullEncoder { fn encode(&mut self, _idx: usize, _out: &mut Vec) { unreachable!() } + + fn nulls(&self) -> Option { + // Indicate that all values in this encoder are null + Some(NullBuffer::new_null(64)) // Create a buffer larger than needed to accommodate any index + } } struct MapEncoder<'a> { offsets: OffsetBuffer, keys: Box, values: Box, - value_nulls: Option, explicit_nulls: bool, + nulls: Option, } impl<'a> MapEncoder<'a> { - fn try_new(array: &'a MapArray, options: &EncoderOptions) -> Result { + fn try_new( + field: &'a FieldRef, + array: &'a MapArray, + options: &'a EncoderOptions, + ) -> Result { let values = array.values(); let keys = array.keys(); + let nulls = array.logical_nulls(); if !matches!(keys.data_type(), DataType::Utf8 | DataType::LargeUtf8) { return Err(ArrowError::JsonError(format!( @@ -502,11 +640,11 @@ impl<'a> MapEncoder<'a> { ))); } - let (keys, key_nulls) = make_encoder_impl(keys, options)?; - let (values, value_nulls) = make_encoder_impl(values, options)?; + let keys = make_encoder(field, keys, options)?; + let values = make_encoder(field, values, options)?; // We sanity check nulls as these are currently not enforced by MapArray (#1697) - if is_some_and(key_nulls, |x| x.null_count() != 0) { + if keys.nulls().is_some() { return Err(ArrowError::InvalidArgumentError( "Encountered nulls in MapArray keys".to_string(), )); @@ -522,8 +660,8 @@ impl<'a> MapEncoder<'a> { offsets: array.offsets().clone(), keys, values, - value_nulls, explicit_nulls: options.explicit_nulls, + nulls, }) } } @@ -536,8 +674,11 @@ impl Encoder for MapEncoder<'_> { let mut is_first = true; out.push(b'{'); + + let value_nulls = self.values.nulls(); + for idx in start..end { - let is_null = is_some_and(self.value_nulls.as_ref(), |n| n.is_null(idx)); + let is_null = value_nulls.is_null(idx); if is_null && !self.explicit_nulls { continue; } @@ -550,13 +691,18 @@ impl Encoder for MapEncoder<'_> { self.keys.encode(idx, out); out.push(b':'); - match is_null { - true => out.extend_from_slice(b"null"), - false => self.values.encode(idx, out), + if is_null { + out.extend_from_slice(b"null"); + } else { + self.values.encode(idx, out); } } out.push(b'}'); } + + fn nulls(&self) -> Option { + self.nulls.clone() + } } /// New-type wrapper for encoding the binary types in arrow: `Binary`, `LargeBinary` @@ -584,4 +730,8 @@ where } out.push(b'"'); } + + fn nulls(&self) -> Option { + self.0.nulls().cloned() + } } diff --git a/arrow-json/src/writer/mod.rs b/arrow-json/src/writer/mod.rs index 5d3e558480ca..b1c7391d9329 100644 --- a/arrow-json/src/writer/mod.rs +++ b/arrow-json/src/writer/mod.rs @@ -106,13 +106,13 @@ //! ``` mod encoder; -use std::{fmt::Debug, io::Write}; +use std::{fmt::Debug, io::Write, sync::Arc}; use crate::StructMode; use arrow_array::*; use arrow_schema::*; -use encoder::{make_encoder, EncoderOptions}; +pub use encoder::{make_encoder, Encoder, EncoderFactory, EncoderOptions, NullBufferExt}; /// This trait defines how to format a sequence of JSON objects to a /// byte stream. @@ -270,6 +270,15 @@ impl WriterBuilder { self } + /// Set an encoder factory to use when creating encoders for writing JSON. + /// + /// This can be used to override how some types are encoded or to provide + /// a fallback for types that are not supported by the default encoder. + pub fn with_encoder_factory(mut self, factory: Arc) -> Self { + self.0.encoder_factory = Some(factory); + self + } + /// Create a new `Writer` with specified `JsonFormat` and builder options. pub fn build(self, writer: W) -> Writer where @@ -351,8 +360,23 @@ where } let array = StructArray::from(batch.clone()); - let mut encoder = make_encoder(&array, &self.options)?; + let field = Arc::new(Field::new_struct( + "", + batch.schema().fields().iter().cloned().collect::>(), + false, + )); + let encoder = make_encoder(&field, &array, &self.options)?; + // This call has dynamic dispatch, but it's only called once per batch + // rather than in the inner loop below + let nulls = encoder.nulls(); + + // Validate that the root is not nullable + for idx in 0..array.len() { + assert!(!nulls.is_null(idx), "root cannot be nullable"); + } + + let mut encoder = make_encoder(&field, &array, &self.options)?; for idx in 0..batch.num_rows() { self.format.start_row(&mut buffer, is_first_row)?; is_first_row = false; @@ -419,15 +443,19 @@ where #[cfg(test)] mod tests { use core::str; + use std::collections::HashMap; use std::fs::{read_to_string, File}; use std::io::{BufReader, Seek}; use std::sync::Arc; + use arrow_array::cast::AsArray; use serde_json::{json, Value}; + use super::LineDelimited; + use super::{Encoder, WriterBuilder}; use arrow_array::builder::*; use arrow_array::types::*; - use arrow_buffer::{i256, Buffer, NullBuffer, OffsetBuffer, ToByteSlice}; + use arrow_buffer::{i256, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer, ToByteSlice}; use arrow_data::ArrayData; use crate::reader::*; @@ -446,7 +474,7 @@ mod tests { .map(|s| (!s.is_empty()).then(|| serde_json::from_slice(s).unwrap())) .collect(); - assert_eq!(expected, actual); + assert_eq!(actual, expected); } #[test] @@ -2036,4 +2064,430 @@ mod tests { } assert_json_eq(&buf, expected); } + + fn make_fallback_encoder_test_data() -> (RecordBatch, Arc) { + // Note: this is not intended to be an efficient implementation. + // Just a simple example to demonstrate how to implement a custom encoder. + #[derive(Debug)] + enum UnionValue { + Int32(i32), + String(String), + } + + #[derive(Debug)] + struct UnionEncoder { + array: Vec>, + } + + impl Encoder for UnionEncoder { + fn encode(&mut self, idx: usize, out: &mut Vec) { + match &self.array[idx] { + None => out.extend_from_slice(b"null"), + Some(UnionValue::Int32(v)) => out.extend_from_slice(v.to_string().as_bytes()), + Some(UnionValue::String(v)) => { + out.extend_from_slice(format!("\"{}\"", v).as_bytes()) + } + } + } + + fn nulls(&self) -> Option { + if self.array.iter().any(Option::is_none) { + let mut builder = NullBufferBuilder::new(self.array.len()); + for val in self.array.iter() { + builder.append(val.is_some()); + } + builder.finish() + } else { + None + } + } + } + + #[derive(Debug)] + struct UnionEncoderFactory; + + impl EncoderFactory for UnionEncoderFactory { + fn make_default_encoder<'a>( + &self, + _field: &FieldRef, + array: &dyn Array, + _options: &EncoderOptions, + ) -> Result>, ArrowError> { + let data_type = array.data_type(); + let fields = match data_type { + DataType::Union(fields, UnionMode::Sparse) => fields, + _ => return Ok(None), + }; + // check that the fields are supported + let fields = fields.iter().map(|(_, f)| f).collect::>(); + for f in fields.iter() { + match f.data_type() { + DataType::Null => {} + DataType::Int32 => {} + DataType::Utf8 => {} + _ => return Ok(None), + } + } + let (_, type_ids, _, buffers) = array.as_union().clone().into_parts(); + let mut values = Vec::with_capacity(type_ids.len()); + for idx in 0..type_ids.len() { + let type_id = type_ids[idx]; + let field = &fields[type_id as usize]; + let value = match field.data_type() { + DataType::Null => None, + DataType::Int32 => Some(UnionValue::Int32( + buffers[type_id as usize] + .as_primitive::() + .value(idx), + )), + DataType::Utf8 => Some(UnionValue::String( + buffers[type_id as usize] + .as_string::() + .value(idx) + .to_string(), + )), + _ => unreachable!(), + }; + values.push(value); + } + Ok(Some(Box::new(UnionEncoder { array: values }))) + } + } + + let int_array = Int32Array::from(vec![Some(1), None, None]); + let string_array = StringArray::from(vec![None, Some("a"), None]); + let null_array = NullArray::new(3); + let type_ids = [0_i8, 1, 2].into_iter().collect::>(); + + let union_fields = [ + (0, Arc::new(Field::new("A", DataType::Int32, false))), + (1, Arc::new(Field::new("B", DataType::Utf8, false))), + (2, Arc::new(Field::new("C", DataType::Null, false))), + ] + .into_iter() + .collect::(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(string_array), + Arc::new(null_array), + ]; + + let array = UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap(); + + let float_array = Float64Array::from(vec![Some(1.0), None, Some(3.4)]); + + let fields = vec![ + Field::new( + "union", + DataType::Union(union_fields, UnionMode::Sparse), + true, + ), + Field::new("float", DataType::Float64, true), + ]; + + let batch = RecordBatch::try_new( + Arc::new(Schema::new(fields)), + vec![ + Arc::new(array) as Arc, + Arc::new(float_array) as Arc, + ], + ) + .unwrap(); + + (batch, Arc::new(UnionEncoderFactory)) + } + + #[test] + fn test_fallback_encoder_factory_line_delimited_implicit_nulls() { + let (batch, encoder_factory) = make_fallback_encoder_test_data(); + + let mut buf = Vec::new(); + { + let mut writer = WriterBuilder::new() + .with_encoder_factory(encoder_factory) + .with_explicit_nulls(false) + .build::<_, LineDelimited>(&mut buf); + writer.write_batches(&[&batch]).unwrap(); + writer.finish().unwrap(); + } + + println!("{}", str::from_utf8(&buf).unwrap()); + + assert_json_eq( + &buf, + r#"{"union":1,"float":1.0} +{"union":"a"} +{"float":3.4} +"#, + ); + } + + #[test] + fn test_fallback_encoder_factory_line_delimited_explicit_nulls() { + let (batch, encoder_factory) = make_fallback_encoder_test_data(); + + let mut buf = Vec::new(); + { + let mut writer = WriterBuilder::new() + .with_encoder_factory(encoder_factory) + .with_explicit_nulls(true) + .build::<_, LineDelimited>(&mut buf); + writer.write_batches(&[&batch]).unwrap(); + writer.finish().unwrap(); + } + + assert_json_eq( + &buf, + r#"{"union":1,"float":1.0} +{"union":"a","float":null} +{"union":null,"float":3.4} +"#, + ); + } + + #[test] + fn test_fallback_encoder_factory_array_implicit_nulls() { + let (batch, encoder_factory) = make_fallback_encoder_test_data(); + + let json_value: Value = { + let mut buf = Vec::new(); + let mut writer = WriterBuilder::new() + .with_encoder_factory(encoder_factory) + .build::<_, JsonArray>(&mut buf); + writer.write_batches(&[&batch]).unwrap(); + writer.finish().unwrap(); + serde_json::from_slice(&buf).unwrap() + }; + + let expected = json!([ + {"union":1,"float":1.0}, + {"union":"a"}, + {"float":3.4}, + ]); + + assert_eq!(json_value, expected); + } + + #[test] + fn test_fallback_encoder_factory_array_explicit_nulls() { + let (batch, encoder_factory) = make_fallback_encoder_test_data(); + + let json_value: Value = { + let mut buf = Vec::new(); + let mut writer = WriterBuilder::new() + .with_encoder_factory(encoder_factory) + .with_explicit_nulls(true) + .build::<_, JsonArray>(&mut buf); + writer.write_batches(&[&batch]).unwrap(); + writer.finish().unwrap(); + serde_json::from_slice(&buf).unwrap() + }; + + let expected = json!([ + {"union":1,"float":1.0}, + {"union":"a", "float": null}, + {"union":null,"float":3.4}, + ]); + + assert_eq!(json_value, expected); + } + + #[test] + fn test_default_encoder_byte_array() { + struct IntArrayBinaryEncoder { + array: B, + } + + impl<'a, B> Encoder for IntArrayBinaryEncoder + where + B: ArrayAccessor, + { + fn encode(&mut self, idx: usize, out: &mut Vec) { + out.push(b'['); + let child = self.array.value(idx); + for (idx, byte) in child.iter().enumerate() { + write!(out, "{byte}").unwrap(); + if idx < child.len() - 1 { + out.push(b','); + } + } + out.push(b']'); + } + + fn nulls(&self) -> Option { + self.array.nulls().cloned() + } + } + + #[derive(Debug)] + struct IntArayBinaryEncoderFactory; + + impl EncoderFactory for IntArayBinaryEncoderFactory { + fn make_default_encoder<'a>( + &self, + _field: &FieldRef, + array: &'a dyn Array, + _options: &EncoderOptions, + ) -> Result>, ArrowError> { + match array.data_type() { + DataType::Binary => { + let array = array.as_binary::(); + let encoder = IntArrayBinaryEncoder { array }; + Ok(Some(Box::new(encoder))) + } + _ => Ok(None), + } + } + } + + let binary_array = BinaryArray::from_opt_vec(vec![Some(b"a"), None, Some(b"b")]); + let float_array = Float64Array::from(vec![Some(1.0), Some(2.3), None]); + let fields = vec![ + Field::new("bytes", DataType::Binary, true), + Field::new("float", DataType::Float64, true), + ]; + let batch = RecordBatch::try_new( + Arc::new(Schema::new(fields)), + vec![ + Arc::new(binary_array) as Arc, + Arc::new(float_array) as Arc, + ], + ) + .unwrap(); + + let json_value: Value = { + let mut buf = Vec::new(); + let mut writer = WriterBuilder::new() + .with_encoder_factory(Arc::new(IntArayBinaryEncoderFactory)) + .build::<_, JsonArray>(&mut buf); + writer.write_batches(&[&batch]).unwrap(); + writer.finish().unwrap(); + serde_json::from_slice(&buf).unwrap() + }; + + let expected = json!([ + {"bytes": [97], "float": 1.0}, + {"float": 2.3}, + {"bytes": [98]}, + ]); + + assert_eq!(json_value, expected); + } + + #[test] + fn test_encoder_factory_customize_dictionary() { + // Test that we can customize the encoding of T even when it shows up as Dictionary<_, T>. + + // No particular reason to choose this example. + // Just trying to add some variety to the test cases and demonstrate use cases of the encoder factory. + struct PaddedInt32Encoder { + array: Int32Array, + nulls: Option, + } + + impl Encoder for PaddedInt32Encoder { + fn encode(&mut self, idx: usize, out: &mut Vec) { + let value = self.array.value(idx); + write!(out, "\"{value:0>8}\"").unwrap(); + } + + fn nulls(&self) -> Option { + self.nulls.clone() + } + } + + #[derive(Debug)] + struct CustomEncoderFactory; + + impl EncoderFactory for CustomEncoderFactory { + fn make_default_encoder<'a>( + &self, + field: &FieldRef, + array: &'a dyn Array, + _options: &EncoderOptions, + ) -> Result>, ArrowError> { + // The point here is: + // 1. You can use information from Field to determine how to do the encoding. + // 2. For dictionary arrays the Field is always the outer field but the array may be the keys or values array + // and thus the data type of `field` may not match the data type of `array`. + let padded = field + .metadata() + .get("padded") + .map(|v| v == "true") + .unwrap_or_default(); + match (array.data_type(), padded) { + (DataType::Int32, true) => { + let array = array.as_primitive::(); + let nulls = array.nulls().cloned(); + let encoder = PaddedInt32Encoder { + array: array.clone(), + nulls, + }; + Ok(Some(Box::new(encoder))) + } + _ => Ok(None), + } + } + } + + let to_json = |batch| { + let mut buf = Vec::new(); + let mut writer = WriterBuilder::new() + .with_encoder_factory(Arc::new(CustomEncoderFactory)) + .build::<_, JsonArray>(&mut buf); + writer.write_batches(&[batch]).unwrap(); + writer.finish().unwrap(); + serde_json::from_slice::(&buf).unwrap() + }; + + // Control case: no dictionary wrapping works as expected. + let array = Int32Array::from(vec![Some(1), None, Some(2)]); + let field = Arc::new(Field::new("int", DataType::Int32, true).with_metadata( + HashMap::from_iter(vec![("padded".to_string(), "true".to_string())]), + )); + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![field.clone()])), + vec![Arc::new(array)], + ) + .unwrap(); + + let json_value = to_json(&batch); + + let expected = json!([ + {"int": "00000001"}, + {}, + {"int": "00000002"}, + ]); + + assert_eq!(json_value, expected); + + // Now make a dictionary batch + let mut array_builder = PrimitiveDictionaryBuilder::::new(); + array_builder.append_value(1); + array_builder.append_null(); + array_builder.append_value(1); + let array = array_builder.finish(); + let field = Field::new( + "int", + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Int32)), + true, + ) + .with_metadata(HashMap::from_iter(vec![( + "padded".to_string(), + "true".to_string(), + )])); + let batch = RecordBatch::try_new(Arc::new(Schema::new(vec![field])), vec![Arc::new(array)]) + .unwrap(); + + let json_value = to_json(&batch); + + let expected = json!([ + {"int": "00000001"}, + {}, + {"int": "00000001"}, + ]); + + assert_eq!(json_value, expected); + } } From f3230c373ee6233c96b5c638b5c113133e9a7f12 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 10 Mar 2025 09:41:43 -0500 Subject: [PATCH 02/19] tweak bench --- arrow-json/benches/encode.rs | 49 ++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/arrow-json/benches/encode.rs b/arrow-json/benches/encode.rs index f814c97d2cc3..e5d0b8a488ee 100644 --- a/arrow-json/benches/encode.rs +++ b/arrow-json/benches/encode.rs @@ -141,23 +141,6 @@ fn crete_list_array( RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() } - -/// Runs a benchmark with the default writer (skip nulls) -fn bench_default_writer(record_batch: &RecordBatch) { - let mut buffer = std::io::sink(); - let mut writer = LineDelimitedWriter::new(&mut buffer); - writer.write(record_batch).unwrap(); -} - -/// Runs a benchmark with explicit nulls -fn bench_explicit_nulls_writer(record_batch: &RecordBatch) { - let mut buffer = std::io::sink(); - let mut writer = WriterBuilder::new() - .with_explicit_nulls(true) - .build::<_, LineDelimited>(&mut buffer); - writer.write(record_batch).unwrap(); -} - //------------------------------------------------------------------------------ // Benchmark Definition //------------------------------------------------------------------------------ @@ -197,18 +180,36 @@ fn bench_json_encoding(c: &mut Criterion) { // Run benchmarks for each test case for test_case in cases { - - // Set up the benchmark group.throughput(Throughput::Elements(test_case.row_count.row_count() as u64)); group.bench_with_input( BenchmarkId::new(test_case.name(), test_case.explicit_nulls), &test_case.data, |b, batch| { - if test_case.explicit_nulls { - b.iter(|| bench_explicit_nulls_writer(batch)); - } else { - b.iter(|| bench_default_writer(batch)); - } + // Setup: create a new sink and writer for each iteration + // Only the write method call is measured + b.iter_batched_ref( + || { + // Setup phase - create the writer with a new sink + let sink = std::io::sink(); + + if test_case.explicit_nulls { + // Create writer with explicit nulls + let writer = WriterBuilder::new() + .with_explicit_nulls(true) + .build::<_, LineDelimited>(sink); + writer + } else { + // Create default writer + let writer = LineDelimitedWriter::new(sink); + writer + } + }, + |writer| { + // Only this part is measured - the write operation + writer.write(batch) + }, + criterion::BatchSize::SmallInput, + ) }, ); } From eb146f815715fd1bed1eac47104e9855d9ff37c2 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 10 Mar 2025 09:48:53 -0500 Subject: [PATCH 03/19] fmt --- arrow-json/benches/encode.rs | 53 +++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/arrow-json/benches/encode.rs b/arrow-json/benches/encode.rs index e5d0b8a488ee..109f8de31000 100644 --- a/arrow-json/benches/encode.rs +++ b/arrow-json/benches/encode.rs @@ -19,10 +19,10 @@ use std::sync::Arc; use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder}; use arrow_array::RecordBatch; -use arrow_json::{LineDelimitedWriter, WriterBuilder}; use arrow_json::writer::LineDelimited; +use arrow_json::{LineDelimitedWriter, WriterBuilder}; use arrow_schema::{DataType, Field, Schema}; -use criterion::{criterion_group, criterion_main, Criterion, BenchmarkId, Throughput}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use rand::Rng; #[derive(Debug, Clone, Copy)] @@ -86,10 +86,7 @@ impl TestCase { } } -fn create_int32_array( - nulls: TestCaseNulls, - num_rows: TestCaseNumRows, -) -> RecordBatch { +fn create_int32_array(nulls: TestCaseNulls, num_rows: TestCaseNumRows) -> RecordBatch { let mut builder = Int32Builder::new(); for _ in 0..num_rows.row_count() { match nulls { @@ -98,14 +95,15 @@ fn create_int32_array( } } let array = builder.finish(); - let schema = Arc::new(Schema::new(vec![Field::new("int", DataType::Int32, nulls.is_nullable())])); + let schema = Arc::new(Schema::new(vec![Field::new( + "int", + DataType::Int32, + nulls.is_nullable(), + )])); RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() } -fn create_string_array( - nulls: TestCaseNulls, - num_rows: TestCaseNumRows, -) -> RecordBatch { +fn create_string_array(nulls: TestCaseNulls, num_rows: TestCaseNumRows) -> RecordBatch { let mut rng = rand::thread_rng(); let mut builder = StringBuilder::new(); for _ in 0..num_rows.row_count() { @@ -115,17 +113,18 @@ fn create_string_array( } } let array = builder.finish(); - let schema = Arc::new(Schema::new(vec![Field::new("string", DataType::Utf8, nulls.is_nullable())])); + let schema = Arc::new(Schema::new(vec![Field::new( + "string", + DataType::Utf8, + nulls.is_nullable(), + )])); RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() } -fn crete_list_array( - nulls: TestCaseNulls, - num_rows: TestCaseNumRows, -) -> RecordBatch { +fn crete_list_array(nulls: TestCaseNulls, num_rows: TestCaseNumRows) -> RecordBatch { // create a list aray with 1-100 elements per row let mut rng = rand::thread_rng(); - let mut builder= ListBuilder::new(Int32Builder::new()); + let mut builder = ListBuilder::new(Int32Builder::new()); for _ in 0..num_rows.row_count() { let len = rng.gen_range(1..100); for _ in 0..len { @@ -137,7 +136,11 @@ fn crete_list_array( builder.append(true); } let array = builder.finish(); - let schema = Arc::new(Schema::new(vec![Field::new_list("list", Field::new("item", DataType::Int32, true), nulls.is_nullable())])); + let schema = Arc::new(Schema::new(vec![Field::new_list( + "list", + Field::new("item", DataType::Int32, true), + nulls.is_nullable(), + )])); RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() } @@ -191,17 +194,17 @@ fn bench_json_encoding(c: &mut Criterion) { || { // Setup phase - create the writer with a new sink let sink = std::io::sink(); - + if test_case.explicit_nulls { // Create writer with explicit nulls - let writer = WriterBuilder::new() + + WriterBuilder::new() .with_explicit_nulls(true) - .build::<_, LineDelimited>(sink); - writer + .build::<_, LineDelimited>(sink) } else { // Create default writer - let writer = LineDelimitedWriter::new(sink); - writer + + LineDelimitedWriter::new(sink) } }, |writer| { @@ -218,4 +221,4 @@ fn bench_json_encoding(c: &mut Criterion) { } criterion_group!(benches, bench_json_encoding); -criterion_main!(benches); \ No newline at end of file +criterion_main!(benches); From 4a523764e565403ddbb10cd23a3720a66d443a6d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 10 Mar 2025 10:08:15 -0500 Subject: [PATCH 04/19] tweak bench --- arrow-json/benches/encode.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/arrow-json/benches/encode.rs b/arrow-json/benches/encode.rs index 109f8de31000..2ac8a733be30 100644 --- a/arrow-json/benches/encode.rs +++ b/arrow-json/benches/encode.rs @@ -144,10 +144,6 @@ fn crete_list_array(nulls: TestCaseNulls, num_rows: TestCaseNumRows) -> RecordBa RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() } -//------------------------------------------------------------------------------ -// Benchmark Definition -//------------------------------------------------------------------------------ - fn bench_json_encoding(c: &mut Criterion) { // Define all test cases let mut cases = vec![]; From 5b2ade7e018e334624a4b68d353f31a5bb6eb823 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 10 Mar 2025 14:09:20 -0500 Subject: [PATCH 05/19] wip --- Cargo.toml | 4 ++++ arrow-json/src/writer/encoder.rs | 9 +-------- arrow-json/src/writer/mod.rs | 5 ++--- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f401b3c6fdd8..a3dd443e1b7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,3 +96,7 @@ arrow-string = { version = "54.3.0", path = "./arrow-string" } parquet = { version = "54.3.0", path = "./parquet", default-features = false } chrono = { version = "0.4.40", default-features = false, features = ["clock"] } + +[profile.profiling] +inherits = "release" +debug = true diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index 5c8d832ba6d9..fcc2b4248148 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -345,12 +345,9 @@ impl PrimitiveEncode for f16 { } /// Extension trait providing null-related methods to `Option` -pub trait NullBufferExt { +pub(crate) trait NullBufferExt { /// Check if the value at `idx` is null. fn is_null(&self, idx: usize) -> bool; - - /// Check if this buffer contains any nulls. - fn has_nulls(&self) -> bool; } impl NullBufferExt for Option { @@ -359,10 +356,6 @@ impl NullBufferExt for Option { .map(|nulls| nulls.is_null(idx)) .unwrap_or_default() } - - fn has_nulls(&self) -> bool { - self.is_some() - } } struct PrimitiveEncoder { diff --git a/arrow-json/src/writer/mod.rs b/arrow-json/src/writer/mod.rs index b1c7391d9329..1754cf7bc65a 100644 --- a/arrow-json/src/writer/mod.rs +++ b/arrow-json/src/writer/mod.rs @@ -112,7 +112,8 @@ use crate::StructMode; use arrow_array::*; use arrow_schema::*; -pub use encoder::{make_encoder, Encoder, EncoderFactory, EncoderOptions, NullBufferExt}; +use encoder::NullBufferExt; +pub use encoder::{make_encoder, Encoder, EncoderFactory, EncoderOptions}; /// This trait defines how to format a sequence of JSON objects to a /// byte stream. @@ -367,8 +368,6 @@ where )); let encoder = make_encoder(&field, &array, &self.options)?; - // This call has dynamic dispatch, but it's only called once per batch - // rather than in the inner loop below let nulls = encoder.nulls(); // Validate that the root is not nullable From 721a4b951492a63db2bfd40e338718490792f40e Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 10 Mar 2025 14:13:02 -0500 Subject: [PATCH 06/19] refactor EncoderOptions into a builder --- arrow-json/src/writer/encoder.rs | 54 +++++++++++++++++++++++++++----- arrow-json/src/writer/mod.rs | 10 +++--- 2 files changed, 52 insertions(+), 12 deletions(-) diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index fcc2b4248148..3a288d4cb05b 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -32,11 +32,51 @@ use serde::Serializer; #[derive(Debug, Clone, Default)] pub struct EncoderOptions { /// Whether to include nulls in the output or elide them. - pub explicit_nulls: bool, + explicit_nulls: bool, /// Whether to encode structs as JSON objects or JSON arrays of their values. - pub struct_mode: StructMode, + struct_mode: StructMode, /// An optional hook for customizing encoding behavior. - pub encoder_factory: Option>, + encoder_factory: Option>, +} + +impl EncoderOptions { + /// Create a new EncoderOptions with default values. + pub fn new() -> Self { + Self::default() + } + + /// Set whether to include nulls in the output or elide them. + pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self { + self.explicit_nulls = explicit_nulls; + self + } + + /// Set whether to encode structs as JSON objects or JSON arrays of their values. + pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self { + self.struct_mode = struct_mode; + self + } + + /// Set an optional hook for customizing encoding behavior. + pub fn with_encoder_factory(mut self, encoder_factory: Arc) -> Self { + self.encoder_factory = Some(encoder_factory); + self + } + + /// Get whether to include nulls in the output or elide them. + pub fn explicit_nulls(&self) -> bool { + self.explicit_nulls + } + + /// Get whether to encode structs as JSON objects or JSON arrays of their values. + pub fn struct_mode(&self) -> StructMode { + self.struct_mode + } + + /// Get the optional hook for customizing encoding behavior. + pub fn encoder_factory(&self) -> Option<&Arc> { + self.encoder_factory.as_ref() + } } /// A trait to create custom encoders for specific data types. @@ -93,7 +133,7 @@ pub fn make_encoder<'a>( }}; } - if let Some(factory) = &options.encoder_factory { + if let Some(factory) = options.encoder_factory() { if let Some(encoder) = factory.make_default_encoder(field, array, options)? { return Ok(encoder); } @@ -172,8 +212,8 @@ pub fn make_encoder<'a>( let encoder = StructArrayEncoder{ encoders, nulls: array.nulls(), - explicit_nulls: options.explicit_nulls, - struct_mode: options.struct_mode, + explicit_nulls: options.explicit_nulls(), + struct_mode: options.struct_mode(), }; Box::new(encoder) as _ } @@ -653,7 +693,7 @@ impl<'a> MapEncoder<'a> { offsets: array.offsets().clone(), keys, values, - explicit_nulls: options.explicit_nulls, + explicit_nulls: options.explicit_nulls(), nulls, }) } diff --git a/arrow-json/src/writer/mod.rs b/arrow-json/src/writer/mod.rs index 1754cf7bc65a..4d29272a4fe6 100644 --- a/arrow-json/src/writer/mod.rs +++ b/arrow-json/src/writer/mod.rs @@ -226,7 +226,7 @@ impl WriterBuilder { /// Returns `true` if this writer is configured to keep keys with null values. pub fn explicit_nulls(&self) -> bool { - self.0.explicit_nulls + self.0.explicit_nulls() } /// Set whether to keep keys with null values, or to omit writing them. @@ -252,13 +252,13 @@ impl WriterBuilder { /// Default is to skip nulls (set to `false`). If `struct_mode == ListOnly`, /// nulls will be written explicitly regardless of this setting. pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self { - self.0.explicit_nulls = explicit_nulls; + self.0 = self.0.with_explicit_nulls(explicit_nulls); self } /// Returns if this writer is configured to write structs as JSON Objects or Arrays. pub fn struct_mode(&self) -> StructMode { - self.0.struct_mode + self.0.struct_mode() } /// Set the [`StructMode`] for the writer, which determines whether structs @@ -267,7 +267,7 @@ impl WriterBuilder { /// `ListOnly`, nulls will be written explicitly regardless of the /// `explicit_nulls` setting. pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self { - self.0.struct_mode = struct_mode; + self.0 = self.0.with_struct_mode(struct_mode); self } @@ -276,7 +276,7 @@ impl WriterBuilder { /// This can be used to override how some types are encoded or to provide /// a fallback for types that are not supported by the default encoder. pub fn with_encoder_factory(mut self, factory: Arc) -> Self { - self.0.encoder_factory = Some(factory); + self.0 = self.0.with_encoder_factory(factory); self } From 8ffb84744a860c816291b5185fe54b036862b155 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 10 Mar 2025 14:13:51 -0500 Subject: [PATCH 07/19] remove unused new method --- arrow-json/src/writer/encoder.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index 3a288d4cb05b..3f3dbb0abd95 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -40,11 +40,6 @@ pub struct EncoderOptions { } impl EncoderOptions { - /// Create a new EncoderOptions with default values. - pub fn new() -> Self { - Self::default() - } - /// Set whether to include nulls in the output or elide them. pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self { self.explicit_nulls = explicit_nulls; From 7f9b732b8e19950f2e88dce22b49c4522359a365 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 10 Mar 2025 20:09:57 -0500 Subject: [PATCH 08/19] fmt --- arrow-json/benches/encode.rs | 4 ++-- arrow-json/src/writer/encoder.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/arrow-json/benches/encode.rs b/arrow-json/benches/encode.rs index 2ac8a733be30..62bbe08cfc58 100644 --- a/arrow-json/benches/encode.rs +++ b/arrow-json/benches/encode.rs @@ -193,13 +193,13 @@ fn bench_json_encoding(c: &mut Criterion) { if test_case.explicit_nulls { // Create writer with explicit nulls - + WriterBuilder::new() .with_explicit_nulls(true) .build::<_, LineDelimited>(sink) } else { // Create default writer - + LineDelimitedWriter::new(sink) } }, diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index 3f3dbb0abd95..eff31dcb787a 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -57,17 +57,17 @@ impl EncoderOptions { self.encoder_factory = Some(encoder_factory); self } - + /// Get whether to include nulls in the output or elide them. pub fn explicit_nulls(&self) -> bool { self.explicit_nulls } - + /// Get whether to encode structs as JSON objects or JSON arrays of their values. pub fn struct_mode(&self) -> StructMode { self.struct_mode } - + /// Get the optional hook for customizing encoding behavior. pub fn encoder_factory(&self) -> Option<&Arc> { self.encoder_factory.as_ref() From 006d4d43b10cd8674853546bd5434aa841e49269 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 12 Mar 2025 16:19:21 -0500 Subject: [PATCH 09/19] wip --- arrow-json/src/writer/encoder.rs | 214 +++++++++++-------------------- arrow-json/src/writer/mod.rs | 6 +- 2 files changed, 78 insertions(+), 142 deletions(-) diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index eff31dcb787a..c49f3378530f 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -91,11 +91,43 @@ pub trait EncoderFactory: std::fmt::Debug + Send + Sync { _field: &'a FieldRef, _array: &'a dyn Array, _options: &'a EncoderOptions, - ) -> Result>, ArrowError> { + ) -> Result, ArrowError> { Ok(None) } } +pub struct EncoderWithNullBuffer<'a> { + encoder: Box, + nulls: Option, +} + +impl EncoderWithNullBuffer<'_> { + pub fn new(encoder: Box, nulls: Option) -> Self { + Self { encoder, nulls } + } + + pub fn encode(&mut self, idx: usize, out: &mut Vec) { + self.encoder.encode(idx, out) + } + + pub fn is_null(&self, idx: usize) -> bool { + self.nulls.as_ref().map_or(false, |nulls| nulls.is_null(idx)) + } + + pub fn has_nulls(&self) -> bool { + match self.nulls { + Some(ref nulls) => nulls.null_count() > 0, + None => false, + } + } +} + +impl Encoder for EncoderWithNullBuffer<'_> { + fn encode(&mut self, idx: usize, out: &mut Vec) { + self.encoder.encode(idx, out) + } +} + /// A trait to format array values as JSON values /// /// Nullability is handled by the caller to allow encoding nulls implicitly, i.e. `{}` instead of `{"a": null}` @@ -104,12 +136,6 @@ pub trait Encoder { /// /// The behaviour is unspecified if `idx` corresponds to a null index. fn encode(&mut self, idx: usize, out: &mut Vec); - - /// Returns the nullability buffer for this encoder, if any. - /// - /// This replaces the `is_null` and `has_nulls` methods by returning the underlying - /// buffer state directly, avoiding dynamic dispatch for null checks. - fn nulls(&self) -> Option; } /// Creates an encoder for the given array and field. @@ -119,12 +145,12 @@ pub fn make_encoder<'a>( field: &'a FieldRef, array: &'a dyn Array, options: &'a EncoderOptions, -) -> Result, ArrowError> { +) -> Result, ArrowError> { macro_rules! primitive_helper { ($t:ty) => {{ let array = array.as_primitive::<$t>(); let nulls = array.nulls().cloned(); - Box::new(PrimitiveEncoder::new(array, nulls)) + EncoderWithNullBuffer::new(Box::new(PrimitiveEncoder::new(array)), nulls) }}; } @@ -134,64 +160,67 @@ pub fn make_encoder<'a>( } } - Ok(downcast_integer! { + let nulls = array.nulls().cloned(); + let encoder = downcast_integer! { array.data_type() => (primitive_helper), DataType::Float16 => primitive_helper!(Float16Type), DataType::Float32 => primitive_helper!(Float32Type), DataType::Float64 => primitive_helper!(Float64Type), DataType::Boolean => { let array = array.as_boolean(); - Box::new(BooleanEncoder(array)) + EncoderWithNullBuffer::new(Box::new(BooleanEncoder(array)), array.nulls().cloned()) } - DataType::Null => Box::new(NullEncoder), + DataType::Null => EncoderWithNullBuffer::new(Box::new(NullEncoder), array.logical_nulls()), DataType::Utf8 => { let array = array.as_string::(); - Box::new(StringEncoder(array)) + EncoderWithNullBuffer::new(Box::new(StringEncoder(array)), array.nulls().cloned()) } DataType::LargeUtf8 => { let array = array.as_string::(); - Box::new(StringEncoder(array)) as _ + EncoderWithNullBuffer::new(Box::new(StringEncoder(array)), array.nulls().cloned()) } DataType::Utf8View => { let array = array.as_string_view(); - Box::new(StringViewEncoder(array)) as _ + EncoderWithNullBuffer::new(Box::new(StringViewEncoder(array)), array.nulls().cloned()) } DataType::List(_) => { let array = array.as_list::(); - Box::new(ListEncoder::try_new(field, array, options)?) as _ + EncoderWithNullBuffer::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned()) } DataType::LargeList(_) => { let array = array.as_list::(); - Box::new(ListEncoder::try_new(field, array, options)?) as _ + EncoderWithNullBuffer::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned()) } DataType::FixedSizeList(_, _) => { let array = array.as_fixed_size_list(); - Box::new(FixedSizeListEncoder::try_new(field, array, options)?) as _ + EncoderWithNullBuffer::new(Box::new(FixedSizeListEncoder::try_new(field, array, options)?), array.nulls().cloned()) } DataType::Dictionary(_, _) => downcast_dictionary_array! { - array => Box::new(DictionaryEncoder::try_new(field, array, options)?) as _, + array => { + EncoderWithNullBuffer::new(Box::new(DictionaryEncoder::try_new(field, array, options)?), array.nulls().cloned()) + }, _ => unreachable!() } DataType::Map(_, _) => { let array = array.as_map(); - Box::new(MapEncoder::try_new(field, array, options)?) as _ + EncoderWithNullBuffer::new(Box::new(MapEncoder::try_new(field, array, options)?), array.nulls().cloned()) } DataType::FixedSizeBinary(_) => { let array = array.as_fixed_size_binary(); - Box::new(BinaryEncoder::new(array)) as _ + EncoderWithNullBuffer::new(Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned()) } DataType::Binary => { let array: &BinaryArray = array.as_binary(); - Box::new(BinaryEncoder::new(array)) as _ + EncoderWithNullBuffer::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned()) } DataType::LargeBinary => { let array: &LargeBinaryArray = array.as_binary(); - Box::new(BinaryEncoder::new(array)) as _ + EncoderWithNullBuffer::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned()) } DataType::Struct(fields) => { @@ -206,16 +235,16 @@ pub fn make_encoder<'a>( let encoder = StructArrayEncoder{ encoders, - nulls: array.nulls(), explicit_nulls: options.explicit_nulls(), struct_mode: options.struct_mode(), }; - Box::new(encoder) as _ + let nulls = array.nulls().cloned(); + EncoderWithNullBuffer::new(Box::new(encoder), nulls) } DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => { let options = FormatOptions::new().with_display_error(true); let formatter = JsonArrayFormatter::new(ArrayFormatter::try_new(array, &options)?, array.nulls()); - Box::new(RawArrayFormatter(formatter)) + EncoderWithNullBuffer::new(Box::new(RawArrayFormatter(formatter)), nulls) } d => match d.is_temporal() { true => { @@ -226,14 +255,16 @@ pub fn make_encoder<'a>( let options = FormatOptions::new().with_display_error(true); let formatter = ArrayFormatter::try_new(array, &options)?; let formatter = JsonArrayFormatter::new(formatter, array.nulls()); - Box::new(formatter) + EncoderWithNullBuffer::new(Box::new(formatter) as Box, nulls) } false => return Err(ArrowError::JsonError(format!( "Unsupported data type for JSON encoding: {:?}", d ))) } - }) + }; + + Ok(encoder) } fn encode_string(s: &str, out: &mut Vec) { @@ -243,12 +274,17 @@ fn encode_string(s: &str, out: &mut Vec) { struct FieldEncoder<'a> { field: FieldRef, - encoder: Box, + encoder: EncoderWithNullBuffer<'a>, +} + +impl FieldEncoder<'_> { + fn is_null(&self, idx: usize) -> bool { + self.encoder.is_null(idx) + } } struct StructArrayEncoder<'a> { encoders: Vec>, - nulls: Option<&'a NullBuffer>, explicit_nulls: bool, struct_mode: StructMode, } @@ -272,16 +308,8 @@ impl Encoder for StructArrayEncoder<'_> { // Nulls can only be dropped in explicit mode let drop_nulls = (self.struct_mode == StructMode::ObjectOnly) && !self.explicit_nulls; - // Collect all the field nulls buffers up front to avoid dynamic dispatch in the loop - // This creates a temporary Vec, but avoids repeated virtual calls which should be a net win - let field_nulls: Vec<_> = self - .encoders - .iter() - .map(|field_encoder| field_encoder.encoder.nulls()) - .collect(); - - for (i, field_encoder) in self.encoders.iter_mut().enumerate() { - let is_null = field_nulls[i].is_null(idx); + for field_encoder in self.encoders.iter_mut() { + let is_null = field_encoder.is_null(idx); if is_null && drop_nulls { continue; } @@ -307,10 +335,6 @@ impl Encoder for StructArrayEncoder<'_> { StructMode::ListOnly => out.push(b']'), } } - - fn nulls(&self) -> Option { - self.nulls.cloned() - } } trait PrimitiveEncode: ArrowNativeType { @@ -379,34 +403,17 @@ impl PrimitiveEncode for f16 { } } -/// Extension trait providing null-related methods to `Option` -pub(crate) trait NullBufferExt { - /// Check if the value at `idx` is null. - fn is_null(&self, idx: usize) -> bool; -} - -impl NullBufferExt for Option { - fn is_null(&self, idx: usize) -> bool { - self.as_ref() - .map(|nulls| nulls.is_null(idx)) - .unwrap_or_default() - } -} - struct PrimitiveEncoder { values: ScalarBuffer, - nulls: Option, buffer: N::Buffer, } impl PrimitiveEncoder { fn new>( array: &PrimitiveArray

, - nulls: Option, ) -> Self { Self { values: array.values().clone(), - nulls, buffer: N::init_buffer(), } } @@ -416,10 +423,6 @@ impl Encoder for PrimitiveEncoder { fn encode(&mut self, idx: usize, out: &mut Vec) { out.extend_from_slice(self.values[idx].encode(&mut self.buffer)); } - - fn nulls(&self) -> Option { - self.nulls.clone() - } } struct BooleanEncoder<'a>(&'a BooleanArray); @@ -431,10 +434,6 @@ impl Encoder for BooleanEncoder<'_> { false => out.extend_from_slice(b"false"), } } - - fn nulls(&self) -> Option { - self.0.nulls().cloned() - } } struct StringEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray); @@ -443,10 +442,6 @@ impl Encoder for StringEncoder<'_, O> { fn encode(&mut self, idx: usize, out: &mut Vec) { encode_string(self.0.value(idx), out); } - - fn nulls(&self) -> Option { - self.0.nulls().cloned() - } } struct StringViewEncoder<'a>(&'a StringViewArray); @@ -455,16 +450,11 @@ impl Encoder for StringViewEncoder<'_> { fn encode(&mut self, idx: usize, out: &mut Vec) { encode_string(self.0.value(idx), out); } - - fn nulls(&self) -> Option { - self.0.nulls().cloned() - } } struct ListEncoder<'a, O: OffsetSizeTrait> { offsets: OffsetBuffer, - encoder: Box, - nulls: Option, + encoder: EncoderWithNullBuffer<'a>, } impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> { @@ -473,12 +463,10 @@ impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> { array: &'a GenericListArray, options: &'a EncoderOptions, ) -> Result { - let nulls = array.logical_nulls(); let encoder = make_encoder(field, array.values().as_ref(), options)?; Ok(Self { offsets: array.offsets().clone(), encoder, - nulls, }) } } @@ -489,14 +477,12 @@ impl Encoder for ListEncoder<'_, O> { let start = self.offsets[idx].as_usize(); out.push(b'['); - let item_nulls = self.encoder.nulls(); - for idx in start..end { if idx != start { out.push(b',') } - if item_nulls.is_null(idx) { + if self.encoder.is_null(idx) { out.extend_from_slice(b"null"); } else { self.encoder.encode(idx, out); @@ -504,16 +490,11 @@ impl Encoder for ListEncoder<'_, O> { } out.push(b']'); } - - fn nulls(&self) -> Option { - self.nulls.clone() - } } struct FixedSizeListEncoder<'a> { value_length: usize, - encoder: Box, - nulls: Option, + encoder: EncoderWithNullBuffer<'a>, } impl<'a> FixedSizeListEncoder<'a> { @@ -522,12 +503,10 @@ impl<'a> FixedSizeListEncoder<'a> { array: &'a FixedSizeListArray, options: &'a EncoderOptions, ) -> Result { - let nulls = array.logical_nulls(); let encoder = make_encoder(field, array.values().as_ref(), options)?; Ok(Self { encoder, value_length: array.value_length().as_usize(), - nulls, }) } } @@ -538,13 +517,11 @@ impl Encoder for FixedSizeListEncoder<'_> { let end = start + self.value_length; out.push(b'['); - let item_nulls = self.encoder.nulls(); - for idx in start..end { if idx != start { out.push(b','); } - if item_nulls.is_null(idx) { + if self.encoder.is_null(idx) { out.extend_from_slice(b"null"); } else { self.encoder.encode(idx, out); @@ -552,16 +529,11 @@ impl Encoder for FixedSizeListEncoder<'_> { } out.push(b']'); } - - fn nulls(&self) -> Option { - self.nulls.clone() - } } struct DictionaryEncoder<'a, K: ArrowDictionaryKeyType> { keys: ScalarBuffer, - nulls: Option, - encoder: Box, + encoder: EncoderWithNullBuffer<'a>, } impl<'a, K: ArrowDictionaryKeyType> DictionaryEncoder<'a, K> { @@ -570,12 +542,10 @@ impl<'a, K: ArrowDictionaryKeyType> DictionaryEncoder<'a, K> { array: &'a DictionaryArray, options: &'a EncoderOptions, ) -> Result { - let nulls = array.logical_nulls(); let encoder = make_encoder(field, array.values().as_ref(), options)?; Ok(Self { keys: array.keys().values().clone(), - nulls, encoder, }) } @@ -585,10 +555,6 @@ impl Encoder for DictionaryEncoder<'_, K> { fn encode(&mut self, idx: usize, out: &mut Vec) { self.encoder.encode(self.keys[idx].as_usize(), out) } - - fn nulls(&self) -> Option { - self.nulls.clone() - } } /// A newtype wrapper around [`ArrayFormatter`] to keep our usage of it private and not implement `Encoder` for the public type @@ -611,10 +577,6 @@ impl Encoder for JsonArrayFormatter<'_> { let _ = write!(out, "{}", self.formatter.value(idx)); out.push(b'"') } - - fn nulls(&self) -> Option { - self.nulls.cloned() - } } /// A newtype wrapper around [`JsonArrayFormatter`] that skips surrounding the value with `"` @@ -624,10 +586,6 @@ impl Encoder for RawArrayFormatter<'_> { fn encode(&mut self, idx: usize, out: &mut Vec) { let _ = write!(out, "{}", self.0.formatter.value(idx)); } - - fn nulls(&self) -> Option { - self.0.nulls.cloned() - } } struct NullEncoder; @@ -636,19 +594,13 @@ impl Encoder for NullEncoder { fn encode(&mut self, _idx: usize, _out: &mut Vec) { unreachable!() } - - fn nulls(&self) -> Option { - // Indicate that all values in this encoder are null - Some(NullBuffer::new_null(64)) // Create a buffer larger than needed to accommodate any index - } } struct MapEncoder<'a> { offsets: OffsetBuffer, - keys: Box, - values: Box, + keys: EncoderWithNullBuffer<'a>, + values: EncoderWithNullBuffer<'a>, explicit_nulls: bool, - nulls: Option, } impl<'a> MapEncoder<'a> { @@ -659,7 +611,6 @@ impl<'a> MapEncoder<'a> { ) -> Result { let values = array.values(); let keys = array.keys(); - let nulls = array.logical_nulls(); if !matches!(keys.data_type(), DataType::Utf8 | DataType::LargeUtf8) { return Err(ArrowError::JsonError(format!( @@ -672,7 +623,7 @@ impl<'a> MapEncoder<'a> { let values = make_encoder(field, values, options)?; // We sanity check nulls as these are currently not enforced by MapArray (#1697) - if keys.nulls().is_some() { + if keys.has_nulls() { return Err(ArrowError::InvalidArgumentError( "Encountered nulls in MapArray keys".to_string(), )); @@ -689,7 +640,6 @@ impl<'a> MapEncoder<'a> { keys, values, explicit_nulls: options.explicit_nulls(), - nulls, }) } } @@ -703,10 +653,8 @@ impl Encoder for MapEncoder<'_> { out.push(b'{'); - let value_nulls = self.values.nulls(); - for idx in start..end { - let is_null = value_nulls.is_null(idx); + let is_null = self.values.is_null(idx); if is_null && !self.explicit_nulls { continue; } @@ -727,10 +675,6 @@ impl Encoder for MapEncoder<'_> { } out.push(b'}'); } - - fn nulls(&self) -> Option { - self.nulls.clone() - } } /// New-type wrapper for encoding the binary types in arrow: `Binary`, `LargeBinary` @@ -758,8 +702,4 @@ where } out.push(b'"'); } - - fn nulls(&self) -> Option { - self.0.nulls().cloned() - } } diff --git a/arrow-json/src/writer/mod.rs b/arrow-json/src/writer/mod.rs index 4d29272a4fe6..3512f1f522b4 100644 --- a/arrow-json/src/writer/mod.rs +++ b/arrow-json/src/writer/mod.rs @@ -112,7 +112,6 @@ use crate::StructMode; use arrow_array::*; use arrow_schema::*; -use encoder::NullBufferExt; pub use encoder::{make_encoder, Encoder, EncoderFactory, EncoderOptions}; /// This trait defines how to format a sequence of JSON objects to a @@ -368,12 +367,9 @@ where )); let encoder = make_encoder(&field, &array, &self.options)?; - let nulls = encoder.nulls(); // Validate that the root is not nullable - for idx in 0..array.len() { - assert!(!nulls.is_null(idx), "root cannot be nullable"); - } + assert!(!encoder.has_nulls(), "root cannot be nullable"); let mut encoder = make_encoder(&field, &array, &self.options)?; for idx in 0..batch.num_rows() { From ab94d7d12a9342ab97b6f85cdf87a49619aa8e0c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 12 Mar 2025 16:29:46 -0500 Subject: [PATCH 10/19] remove dynamic dispatch --- arrow-json/src/writer/encoder.rs | 35 ++++++++++-------- arrow-json/src/writer/mod.rs | 62 ++++++++++++-------------------- 2 files changed, 43 insertions(+), 54 deletions(-) diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index c49f3378530f..760e2bcefcf9 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -91,29 +91,37 @@ pub trait EncoderFactory: std::fmt::Debug + Send + Sync { _field: &'a FieldRef, _array: &'a dyn Array, _options: &'a EncoderOptions, - ) -> Result, ArrowError> { + ) -> Result>, ArrowError> { Ok(None) } } +/// An encoder + a null buffer. +/// This is packaged together into a wrapper struct to minimize dynamic dispatch for null checks. pub struct EncoderWithNullBuffer<'a> { encoder: Box, nulls: Option, } -impl EncoderWithNullBuffer<'_> { - pub fn new(encoder: Box, nulls: Option) -> Self { +impl<'a> EncoderWithNullBuffer<'a> { + /// Create a new encoder with a null buffer. + pub fn new(encoder: Box, nulls: Option) -> Self { Self { encoder, nulls } } + /// Encode the value at index `idx` to `out`. pub fn encode(&mut self, idx: usize, out: &mut Vec) { self.encoder.encode(idx, out) } + /// Returns whether the value at index `idx` is null. pub fn is_null(&self, idx: usize) -> bool { - self.nulls.as_ref().map_or(false, |nulls| nulls.is_null(idx)) + self.nulls + .as_ref() + .map_or(false, |nulls| nulls.is_null(idx)) } + /// Returns whether the encoder has any nulls. pub fn has_nulls(&self) -> bool { match self.nulls { Some(ref nulls) => nulls.null_count() > 0, @@ -239,12 +247,12 @@ pub fn make_encoder<'a>( struct_mode: options.struct_mode(), }; let nulls = array.nulls().cloned(); - EncoderWithNullBuffer::new(Box::new(encoder), nulls) + EncoderWithNullBuffer::new(Box::new(encoder) as Box, nulls) } DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => { let options = FormatOptions::new().with_display_error(true); - let formatter = JsonArrayFormatter::new(ArrayFormatter::try_new(array, &options)?, array.nulls()); - EncoderWithNullBuffer::new(Box::new(RawArrayFormatter(formatter)), nulls) + let formatter = JsonArrayFormatter::new(ArrayFormatter::try_new(array, &options)?); + EncoderWithNullBuffer::new(Box::new(RawArrayFormatter(formatter)) as Box, nulls) } d => match d.is_temporal() { true => { @@ -254,7 +262,7 @@ pub fn make_encoder<'a>( // may need to be revisited let options = FormatOptions::new().with_display_error(true); let formatter = ArrayFormatter::try_new(array, &options)?; - let formatter = JsonArrayFormatter::new(formatter, array.nulls()); + let formatter = JsonArrayFormatter::new(formatter); EncoderWithNullBuffer::new(Box::new(formatter) as Box, nulls) } false => return Err(ArrowError::JsonError(format!( @@ -263,7 +271,7 @@ pub fn make_encoder<'a>( ))) } }; - + Ok(encoder) } @@ -409,9 +417,7 @@ struct PrimitiveEncoder { } impl PrimitiveEncoder { - fn new>( - array: &PrimitiveArray

, - ) -> Self { + fn new>(array: &PrimitiveArray

) -> Self { Self { values: array.values().clone(), buffer: N::init_buffer(), @@ -560,12 +566,11 @@ impl Encoder for DictionaryEncoder<'_, K> { /// A newtype wrapper around [`ArrayFormatter`] to keep our usage of it private and not implement `Encoder` for the public type struct JsonArrayFormatter<'a> { formatter: ArrayFormatter<'a>, - nulls: Option<&'a NullBuffer>, } impl<'a> JsonArrayFormatter<'a> { - fn new(formatter: ArrayFormatter<'a>, nulls: Option<&'a NullBuffer>) -> Self { - Self { formatter, nulls } + fn new(formatter: ArrayFormatter<'a>) -> Self { + Self { formatter } } } diff --git a/arrow-json/src/writer/mod.rs b/arrow-json/src/writer/mod.rs index 3512f1f522b4..fb4be4ea01e2 100644 --- a/arrow-json/src/writer/mod.rs +++ b/arrow-json/src/writer/mod.rs @@ -112,7 +112,7 @@ use crate::StructMode; use arrow_array::*; use arrow_schema::*; -pub use encoder::{make_encoder, Encoder, EncoderFactory, EncoderOptions}; +pub use encoder::{make_encoder, Encoder, EncoderFactory, EncoderOptions, EncoderWithNullBuffer}; /// This trait defines how to format a sequence of JSON objects to a /// byte stream. @@ -1914,7 +1914,7 @@ mod tests { let json_str = str::from_utf8(&json).unwrap(); assert_eq!( json_str, - r#"[{"my_dict":"a"},{"my_dict":null},{"my_dict":null}]"# + r#"[{"my_dict":"a"},{"my_dict":null},{"my_dict":""}]"# ) } @@ -2084,18 +2084,6 @@ mod tests { } } } - - fn nulls(&self) -> Option { - if self.array.iter().any(Option::is_none) { - let mut builder = NullBufferBuilder::new(self.array.len()); - for val in self.array.iter() { - builder.append(val.is_some()); - } - builder.finish() - } else { - None - } - } } #[derive(Debug)] @@ -2104,10 +2092,10 @@ mod tests { impl EncoderFactory for UnionEncoderFactory { fn make_default_encoder<'a>( &self, - _field: &FieldRef, - array: &dyn Array, - _options: &EncoderOptions, - ) -> Result>, ArrowError> { + _field: &'a FieldRef, + array: &'a dyn Array, + _options: &'a EncoderOptions, + ) -> Result>, ArrowError> { let data_type = array.data_type(); let fields = match data_type { DataType::Union(fields, UnionMode::Sparse) => fields, @@ -2145,7 +2133,10 @@ mod tests { }; values.push(value); } - Ok(Some(Box::new(UnionEncoder { array: values }))) + let array_encoder = + Box::new(UnionEncoder { array: values }) as Box; + let nulls = array.nulls().cloned(); + Ok(Some(EncoderWithNullBuffer::new(array_encoder, nulls))) } } @@ -2213,7 +2204,7 @@ mod tests { &buf, r#"{"union":1,"float":1.0} {"union":"a"} -{"float":3.4} +{"union":null,"float":3.4} "#, ); } @@ -2258,7 +2249,7 @@ mod tests { let expected = json!([ {"union":1,"float":1.0}, {"union":"a"}, - {"float":3.4}, + {"float":3.4,"union":null}, ]); assert_eq!(json_value, expected); @@ -2309,10 +2300,6 @@ mod tests { } out.push(b']'); } - - fn nulls(&self) -> Option { - self.array.nulls().cloned() - } } #[derive(Debug)] @@ -2321,15 +2308,17 @@ mod tests { impl EncoderFactory for IntArayBinaryEncoderFactory { fn make_default_encoder<'a>( &self, - _field: &FieldRef, + _field: &'a FieldRef, array: &'a dyn Array, - _options: &EncoderOptions, - ) -> Result>, ArrowError> { + _options: &'a EncoderOptions, + ) -> Result>, ArrowError> { match array.data_type() { DataType::Binary => { let array = array.as_binary::(); let encoder = IntArrayBinaryEncoder { array }; - Ok(Some(Box::new(encoder))) + let array_encoder = Box::new(encoder) as Box; + let nulls = array.nulls().cloned(); + Ok(Some(EncoderWithNullBuffer::new(array_encoder, nulls))) } _ => Ok(None), } @@ -2378,7 +2367,6 @@ mod tests { // Just trying to add some variety to the test cases and demonstrate use cases of the encoder factory. struct PaddedInt32Encoder { array: Int32Array, - nulls: Option, } impl Encoder for PaddedInt32Encoder { @@ -2386,10 +2374,6 @@ mod tests { let value = self.array.value(idx); write!(out, "\"{value:0>8}\"").unwrap(); } - - fn nulls(&self) -> Option { - self.nulls.clone() - } } #[derive(Debug)] @@ -2398,10 +2382,10 @@ mod tests { impl EncoderFactory for CustomEncoderFactory { fn make_default_encoder<'a>( &self, - field: &FieldRef, + field: &'a FieldRef, array: &'a dyn Array, - _options: &EncoderOptions, - ) -> Result>, ArrowError> { + _options: &'a EncoderOptions, + ) -> Result>, ArrowError> { // The point here is: // 1. You can use information from Field to determine how to do the encoding. // 2. For dictionary arrays the Field is always the outer field but the array may be the keys or values array @@ -2417,9 +2401,9 @@ mod tests { let nulls = array.nulls().cloned(); let encoder = PaddedInt32Encoder { array: array.clone(), - nulls, }; - Ok(Some(Box::new(encoder))) + let array_encoder = Box::new(encoder) as Box; + Ok(Some(EncoderWithNullBuffer::new(array_encoder, nulls))) } _ => Ok(None), } From 08b8dc25377239ff92c765ed48d80821d8767a65 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 12 Mar 2025 16:35:02 -0500 Subject: [PATCH 11/19] clippy --- arrow-json/src/writer/encoder.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index 760e2bcefcf9..97371f87d9c4 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -116,9 +116,7 @@ impl<'a> EncoderWithNullBuffer<'a> { /// Returns whether the value at index `idx` is null. pub fn is_null(&self, idx: usize) -> bool { - self.nulls - .as_ref() - .map_or(false, |nulls| nulls.is_null(idx)) + self.nulls.as_ref().is_some_and(|nulls| nulls.is_null(idx)) } /// Returns whether the encoder has any nulls. From df1d3311e449b07cb1c62b22c461d0758440b6b6 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 12 Mar 2025 16:35:53 -0500 Subject: [PATCH 12/19] remove bench --- Cargo.toml | 4 ---- arrow-json/Cargo.toml | 4 ---- 2 files changed, 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a3dd443e1b7c..f401b3c6fdd8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,3 @@ arrow-string = { version = "54.3.0", path = "./arrow-string" } parquet = { version = "54.3.0", path = "./parquet", default-features = false } chrono = { version = "0.4.40", default-features = false, features = ["clock"] } - -[profile.profiling] -inherits = "release" -debug = true diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index 09d66c23d40c..cbca108cfda2 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -63,7 +63,3 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng" [[bench]] name = "serde" harness = false - -[[bench]] -name = "encode" -harness = false From 80b86132fcdb9ea57fc2fd2b8234e37e9df4f2bf Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 12 Mar 2025 16:35:58 -0500 Subject: [PATCH 13/19] remove bench --- arrow-json/benches/encode.rs | 220 ----------------------------------- 1 file changed, 220 deletions(-) delete mode 100644 arrow-json/benches/encode.rs diff --git a/arrow-json/benches/encode.rs b/arrow-json/benches/encode.rs deleted file mode 100644 index 62bbe08cfc58..000000000000 --- a/arrow-json/benches/encode.rs +++ /dev/null @@ -1,220 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder}; -use arrow_array::RecordBatch; -use arrow_json::writer::LineDelimited; -use arrow_json::{LineDelimitedWriter, WriterBuilder}; -use arrow_schema::{DataType, Field, Schema}; -use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; -use rand::Rng; - -#[derive(Debug, Clone, Copy)] -enum TestCaseNulls { - None, - All, -} - -impl std::fmt::Display for TestCaseNulls { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - TestCaseNulls::None => write!(f, "none"), - TestCaseNulls::All => write!(f, "all"), - } - } -} - -impl TestCaseNulls { - fn is_nullable(&self) -> bool { - match self { - TestCaseNulls::None => false, - TestCaseNulls::All => true, - } - } -} - -#[derive(Debug, Clone, Copy)] -enum TestCaseNumRows { - Small, - Large, -} - -impl TestCaseNumRows { - fn row_count(&self) -> usize { - match self { - TestCaseNumRows::Small => 10, - TestCaseNumRows::Large => 100_000, - } - } -} - -/// Represents a test case configuration for benchmarking -#[derive(Debug)] -struct TestCase { - name: String, - data: RecordBatch, - explicit_nulls: bool, - null_generation: TestCaseNulls, - row_count: TestCaseNumRows, -} - -impl TestCase { - fn name(&self) -> String { - format!( - "{}_explicit_nulls:{}_nulls:{}_row_count:{}", - self.name, - self.explicit_nulls, - self.null_generation, - self.row_count.row_count(), - ) - } -} - -fn create_int32_array(nulls: TestCaseNulls, num_rows: TestCaseNumRows) -> RecordBatch { - let mut builder = Int32Builder::new(); - for _ in 0..num_rows.row_count() { - match nulls { - TestCaseNulls::None => builder.append_value(rand::random::()), - TestCaseNulls::All => builder.append_null(), - } - } - let array = builder.finish(); - let schema = Arc::new(Schema::new(vec![Field::new( - "int", - DataType::Int32, - nulls.is_nullable(), - )])); - RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() -} - -fn create_string_array(nulls: TestCaseNulls, num_rows: TestCaseNumRows) -> RecordBatch { - let mut rng = rand::thread_rng(); - let mut builder = StringBuilder::new(); - for _ in 0..num_rows.row_count() { - match nulls { - TestCaseNulls::None => builder.append_value(rng.gen_range(0..100).to_string()), - TestCaseNulls::All => builder.append_null(), - } - } - let array = builder.finish(); - let schema = Arc::new(Schema::new(vec![Field::new( - "string", - DataType::Utf8, - nulls.is_nullable(), - )])); - RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() -} - -fn crete_list_array(nulls: TestCaseNulls, num_rows: TestCaseNumRows) -> RecordBatch { - // create a list aray with 1-100 elements per row - let mut rng = rand::thread_rng(); - let mut builder = ListBuilder::new(Int32Builder::new()); - for _ in 0..num_rows.row_count() { - let len = rng.gen_range(1..100); - for _ in 0..len { - match nulls { - TestCaseNulls::None => builder.values().append_value(rng.gen_range(0..100)), - TestCaseNulls::All => builder.values().append_null(), - } - } - builder.append(true); - } - let array = builder.finish(); - let schema = Arc::new(Schema::new(vec![Field::new_list( - "list", - Field::new("item", DataType::Int32, true), - nulls.is_nullable(), - )])); - RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap() -} - -fn bench_json_encoding(c: &mut Criterion) { - // Define all test cases - let mut cases = vec![]; - for nulls in [TestCaseNulls::None, TestCaseNulls::All] { - for size in [TestCaseNumRows::Large, TestCaseNumRows::Small] { - for explicit_nulls in [false, true] { - cases.push(TestCase { - name: "int32".to_string(), - data: create_int32_array(nulls, size), - explicit_nulls, - null_generation: nulls, - row_count: size, - }); - cases.push(TestCase { - name: "string".to_string(), - data: create_string_array(nulls, size), - explicit_nulls, - null_generation: nulls, - row_count: size, - }); - cases.push(TestCase { - name: "list".to_string(), - data: crete_list_array(nulls, size), - explicit_nulls, - null_generation: nulls, - row_count: size, - }); - } - } - } - - let mut group = c.benchmark_group("JSON Encoding"); - - // Run benchmarks for each test case - for test_case in cases { - group.throughput(Throughput::Elements(test_case.row_count.row_count() as u64)); - group.bench_with_input( - BenchmarkId::new(test_case.name(), test_case.explicit_nulls), - &test_case.data, - |b, batch| { - // Setup: create a new sink and writer for each iteration - // Only the write method call is measured - b.iter_batched_ref( - || { - // Setup phase - create the writer with a new sink - let sink = std::io::sink(); - - if test_case.explicit_nulls { - // Create writer with explicit nulls - - WriterBuilder::new() - .with_explicit_nulls(true) - .build::<_, LineDelimited>(sink) - } else { - // Create default writer - - LineDelimitedWriter::new(sink) - } - }, - |writer| { - // Only this part is measured - the write operation - writer.write(batch) - }, - criterion::BatchSize::SmallInput, - ) - }, - ); - } - - group.finish(); -} - -criterion_group!(benches, bench_json_encoding); -criterion_main!(benches); From 60bcd18a032eff8ae365d0ee3e80a42cd5b26c41 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 17 Mar 2025 18:06:51 -0500 Subject: [PATCH 14/19] address perf --- arrow-json/src/writer/encoder.rs | 44 ++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index 97371f87d9c4..dba7e2be4daa 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -481,14 +481,22 @@ impl Encoder for ListEncoder<'_, O> { let start = self.offsets[idx].as_usize(); out.push(b'['); - for idx in start..end { - if idx != start { - out.push(b',') + if self.encoder.has_nulls() { + for idx in start..end { + if idx != start { + out.push(b',') + } + if self.encoder.is_null(idx) { + out.extend_from_slice(b"null"); + } else { + self.encoder.encode(idx, out); + } } - - if self.encoder.is_null(idx) { - out.extend_from_slice(b"null"); - } else { + } else { + for idx in start..end { + if idx != start { + out.push(b',') + } self.encoder.encode(idx, out); } } @@ -520,14 +528,22 @@ impl Encoder for FixedSizeListEncoder<'_> { let start = idx * self.value_length; let end = start + self.value_length; out.push(b'['); - - for idx in start..end { - if idx != start { - out.push(b','); + if self.encoder.has_nulls() { + for idx in start..end { + if idx != start { + out.push(b',') + } + if self.encoder.is_null(idx) { + out.extend_from_slice(b"null"); + } else { + self.encoder.encode(idx, out); + } } - if self.encoder.is_null(idx) { - out.extend_from_slice(b"null"); - } else { + } else { + for idx in start..end { + if idx != start { + out.push(b',') + } self.encoder.encode(idx, out); } } From dcfae68356639f3a5db74861308c34b065c17595 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 17 Mar 2025 19:05:56 -0500 Subject: [PATCH 15/19] add doctest --- arrow-json/src/writer/encoder.rs | 94 +++++++++++++++++++++++++++++++- 1 file changed, 93 insertions(+), 1 deletion(-) diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index dba7e2be4daa..d5bb4b20b13d 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -78,8 +78,100 @@ impl EncoderOptions { /// /// This allows overriding the default encoders for specific data types, /// or adding new encoders for custom data types. +/// +/// # Examples +/// +/// ``` +/// use std::io::Write; +/// use arrow_array::{ArrayAccessor, Array, BinaryArray, Float64Array, RecordBatch}; +/// use arrow_array::cast::AsArray; +/// use arrow_schema::{DataType, Field, Schema, FieldRef}; +/// use arrow_json::{writer::{WriterBuilder, JsonArray, EncoderWithNullBuffer}, StructMode}; +/// use arrow_json::{Encoder, EncoderFactory, EncoderOptions}; +/// use arrow_schema::ArrowError; +/// use std::sync::Arc; +/// use serde_json::json; +/// use serde_json::Value; +/// +/// struct IntArrayBinaryEncoder { +/// array: B, +/// } +/// +/// impl<'a, B> Encoder for IntArrayBinaryEncoder +/// where +/// B: ArrayAccessor, +/// { +/// fn encode(&mut self, idx: usize, out: &mut Vec) { +/// out.push(b'['); +/// let child = self.array.value(idx); +/// for (idx, byte) in child.iter().enumerate() { +/// write!(out, "{byte}").unwrap(); +/// if idx < child.len() - 1 { +/// out.push(b','); +/// } +/// } +/// out.push(b']'); +/// } +/// } +/// +/// #[derive(Debug)] +/// struct IntArayBinaryEncoderFactory; +/// +/// impl EncoderFactory for IntArayBinaryEncoderFactory { +/// fn make_default_encoder<'a>( +/// &self, +/// _field: &'a FieldRef, +/// array: &'a dyn Array, +/// _options: &'a EncoderOptions, +/// ) -> Result>, ArrowError> { +/// match array.data_type() { +/// DataType::Binary => { +/// let array = array.as_binary::(); +/// let encoder = IntArrayBinaryEncoder { array }; +/// let array_encoder = Box::new(encoder) as Box; +/// let nulls = array.nulls().cloned(); +/// Ok(Some(EncoderWithNullBuffer::new(array_encoder, nulls))) +/// } +/// _ => Ok(None), +/// } +/// } +/// } +/// +/// let binary_array = BinaryArray::from_iter([Some(b"a".as_slice()), None, Some(b"b".as_slice())]); +/// let float_array = Float64Array::from(vec![Some(1.0), Some(2.3), None]); +/// let fields = vec![ +/// Field::new("bytes", DataType::Binary, true), +/// Field::new("float", DataType::Float64, true), +/// ]; +/// let batch = RecordBatch::try_new( +/// Arc::new(Schema::new(fields)), +/// vec![ +/// Arc::new(binary_array) as Arc, +/// Arc::new(float_array) as Arc, +/// ], +/// ) +/// .unwrap(); +/// +/// let json_value: Value = { +/// let mut buf = Vec::new(); +/// let mut writer = WriterBuilder::new() +/// .with_encoder_factory(Arc::new(IntArayBinaryEncoderFactory)) +/// .build::<_, JsonArray>(&mut buf); +/// writer.write_batches(&[&batch]).unwrap(); +/// writer.finish().unwrap(); +/// serde_json::from_slice(&buf).unwrap() +/// }; +/// +/// let expected = json!([ +/// {"bytes": [97], "float": 1.0}, +/// {"float": 2.3}, +/// {"bytes": [98]}, +/// ]); +/// +/// assert_eq!(json_value, expected); +/// ``` pub trait EncoderFactory: std::fmt::Debug + Send + Sync { - /// Make an encoder that if returned runs before all of the default encoders. + /// Make an encoder that overrides the default encoder for a specific field and array or provides an encoder for a custom data type. /// This can be used to override how e.g. binary data is encoded so that it is an encoded string or an array of integers. /// /// Note that the type of the field may not match the type of the array: for dictionary arrays unless the top-level dictionary is handled this From cc90e07aacab0b2bb782e45e8d2231780f311d1c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 18 Mar 2025 01:11:35 -0500 Subject: [PATCH 16/19] fmt --- arrow-json/src/writer/encoder.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index d5bb4b20b13d..01c8655e6926 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -78,9 +78,9 @@ impl EncoderOptions { /// /// This allows overriding the default encoders for specific data types, /// or adding new encoders for custom data types. -/// +/// /// # Examples -/// +/// /// ``` /// use std::io::Write; /// use arrow_array::{ArrayAccessor, Array, BinaryArray, Float64Array, RecordBatch}; From 4cfa5b77718007e7b05b9d8b15a414c8657e9655 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 18 Mar 2025 17:51:16 -0500 Subject: [PATCH 17/19] rename EncoderWithNullBuffer to NullableEncoder --- arrow-json/src/writer/encoder.rs | 62 ++++++++++++++++---------------- arrow-json/src/writer/mod.rs | 14 ++++---- 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs index 01c8655e6926..ee6af03101f8 100644 --- a/arrow-json/src/writer/encoder.rs +++ b/arrow-json/src/writer/encoder.rs @@ -86,7 +86,7 @@ impl EncoderOptions { /// use arrow_array::{ArrayAccessor, Array, BinaryArray, Float64Array, RecordBatch}; /// use arrow_array::cast::AsArray; /// use arrow_schema::{DataType, Field, Schema, FieldRef}; -/// use arrow_json::{writer::{WriterBuilder, JsonArray, EncoderWithNullBuffer}, StructMode}; +/// use arrow_json::{writer::{WriterBuilder, JsonArray, NullableEncoder}, StructMode}; /// use arrow_json::{Encoder, EncoderFactory, EncoderOptions}; /// use arrow_schema::ArrowError; /// use std::sync::Arc; @@ -123,14 +123,14 @@ impl EncoderOptions { /// _field: &'a FieldRef, /// array: &'a dyn Array, /// _options: &'a EncoderOptions, -/// ) -> Result>, ArrowError> { +/// ) -> Result>, ArrowError> { /// match array.data_type() { /// DataType::Binary => { /// let array = array.as_binary::(); /// let encoder = IntArrayBinaryEncoder { array }; /// let array_encoder = Box::new(encoder) as Box; /// let nulls = array.nulls().cloned(); -/// Ok(Some(EncoderWithNullBuffer::new(array_encoder, nulls))) +/// Ok(Some(NullableEncoder::new(array_encoder, nulls))) /// } /// _ => Ok(None), /// } @@ -183,19 +183,19 @@ pub trait EncoderFactory: std::fmt::Debug + Send + Sync { _field: &'a FieldRef, _array: &'a dyn Array, _options: &'a EncoderOptions, - ) -> Result>, ArrowError> { + ) -> Result>, ArrowError> { Ok(None) } } /// An encoder + a null buffer. /// This is packaged together into a wrapper struct to minimize dynamic dispatch for null checks. -pub struct EncoderWithNullBuffer<'a> { +pub struct NullableEncoder<'a> { encoder: Box, nulls: Option, } -impl<'a> EncoderWithNullBuffer<'a> { +impl<'a> NullableEncoder<'a> { /// Create a new encoder with a null buffer. pub fn new(encoder: Box, nulls: Option) -> Self { Self { encoder, nulls } @@ -220,7 +220,7 @@ impl<'a> EncoderWithNullBuffer<'a> { } } -impl Encoder for EncoderWithNullBuffer<'_> { +impl Encoder for NullableEncoder<'_> { fn encode(&mut self, idx: usize, out: &mut Vec) { self.encoder.encode(idx, out) } @@ -243,12 +243,12 @@ pub fn make_encoder<'a>( field: &'a FieldRef, array: &'a dyn Array, options: &'a EncoderOptions, -) -> Result, ArrowError> { +) -> Result, ArrowError> { macro_rules! primitive_helper { ($t:ty) => {{ let array = array.as_primitive::<$t>(); let nulls = array.nulls().cloned(); - EncoderWithNullBuffer::new(Box::new(PrimitiveEncoder::new(array)), nulls) + NullableEncoder::new(Box::new(PrimitiveEncoder::new(array)), nulls) }}; } @@ -266,59 +266,59 @@ pub fn make_encoder<'a>( DataType::Float64 => primitive_helper!(Float64Type), DataType::Boolean => { let array = array.as_boolean(); - EncoderWithNullBuffer::new(Box::new(BooleanEncoder(array)), array.nulls().cloned()) + NullableEncoder::new(Box::new(BooleanEncoder(array)), array.nulls().cloned()) } - DataType::Null => EncoderWithNullBuffer::new(Box::new(NullEncoder), array.logical_nulls()), + DataType::Null => NullableEncoder::new(Box::new(NullEncoder), array.logical_nulls()), DataType::Utf8 => { let array = array.as_string::(); - EncoderWithNullBuffer::new(Box::new(StringEncoder(array)), array.nulls().cloned()) + NullableEncoder::new(Box::new(StringEncoder(array)), array.nulls().cloned()) } DataType::LargeUtf8 => { let array = array.as_string::(); - EncoderWithNullBuffer::new(Box::new(StringEncoder(array)), array.nulls().cloned()) + NullableEncoder::new(Box::new(StringEncoder(array)), array.nulls().cloned()) } DataType::Utf8View => { let array = array.as_string_view(); - EncoderWithNullBuffer::new(Box::new(StringViewEncoder(array)), array.nulls().cloned()) + NullableEncoder::new(Box::new(StringViewEncoder(array)), array.nulls().cloned()) } DataType::List(_) => { let array = array.as_list::(); - EncoderWithNullBuffer::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned()) + NullableEncoder::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned()) } DataType::LargeList(_) => { let array = array.as_list::(); - EncoderWithNullBuffer::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned()) + NullableEncoder::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned()) } DataType::FixedSizeList(_, _) => { let array = array.as_fixed_size_list(); - EncoderWithNullBuffer::new(Box::new(FixedSizeListEncoder::try_new(field, array, options)?), array.nulls().cloned()) + NullableEncoder::new(Box::new(FixedSizeListEncoder::try_new(field, array, options)?), array.nulls().cloned()) } DataType::Dictionary(_, _) => downcast_dictionary_array! { array => { - EncoderWithNullBuffer::new(Box::new(DictionaryEncoder::try_new(field, array, options)?), array.nulls().cloned()) + NullableEncoder::new(Box::new(DictionaryEncoder::try_new(field, array, options)?), array.nulls().cloned()) }, _ => unreachable!() } DataType::Map(_, _) => { let array = array.as_map(); - EncoderWithNullBuffer::new(Box::new(MapEncoder::try_new(field, array, options)?), array.nulls().cloned()) + NullableEncoder::new(Box::new(MapEncoder::try_new(field, array, options)?), array.nulls().cloned()) } DataType::FixedSizeBinary(_) => { let array = array.as_fixed_size_binary(); - EncoderWithNullBuffer::new(Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned()) + NullableEncoder::new(Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned()) } DataType::Binary => { let array: &BinaryArray = array.as_binary(); - EncoderWithNullBuffer::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned()) + NullableEncoder::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned()) } DataType::LargeBinary => { let array: &LargeBinaryArray = array.as_binary(); - EncoderWithNullBuffer::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned()) + NullableEncoder::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned()) } DataType::Struct(fields) => { @@ -337,12 +337,12 @@ pub fn make_encoder<'a>( struct_mode: options.struct_mode(), }; let nulls = array.nulls().cloned(); - EncoderWithNullBuffer::new(Box::new(encoder) as Box, nulls) + NullableEncoder::new(Box::new(encoder) as Box, nulls) } DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => { let options = FormatOptions::new().with_display_error(true); let formatter = JsonArrayFormatter::new(ArrayFormatter::try_new(array, &options)?); - EncoderWithNullBuffer::new(Box::new(RawArrayFormatter(formatter)) as Box, nulls) + NullableEncoder::new(Box::new(RawArrayFormatter(formatter)) as Box, nulls) } d => match d.is_temporal() { true => { @@ -353,7 +353,7 @@ pub fn make_encoder<'a>( let options = FormatOptions::new().with_display_error(true); let formatter = ArrayFormatter::try_new(array, &options)?; let formatter = JsonArrayFormatter::new(formatter); - EncoderWithNullBuffer::new(Box::new(formatter) as Box, nulls) + NullableEncoder::new(Box::new(formatter) as Box, nulls) } false => return Err(ArrowError::JsonError(format!( "Unsupported data type for JSON encoding: {:?}", @@ -372,7 +372,7 @@ fn encode_string(s: &str, out: &mut Vec) { struct FieldEncoder<'a> { field: FieldRef, - encoder: EncoderWithNullBuffer<'a>, + encoder: NullableEncoder<'a>, } impl FieldEncoder<'_> { @@ -550,7 +550,7 @@ impl Encoder for StringViewEncoder<'_> { struct ListEncoder<'a, O: OffsetSizeTrait> { offsets: OffsetBuffer, - encoder: EncoderWithNullBuffer<'a>, + encoder: NullableEncoder<'a>, } impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> { @@ -598,7 +598,7 @@ impl Encoder for ListEncoder<'_, O> { struct FixedSizeListEncoder<'a> { value_length: usize, - encoder: EncoderWithNullBuffer<'a>, + encoder: NullableEncoder<'a>, } impl<'a> FixedSizeListEncoder<'a> { @@ -645,7 +645,7 @@ impl Encoder for FixedSizeListEncoder<'_> { struct DictionaryEncoder<'a, K: ArrowDictionaryKeyType> { keys: ScalarBuffer, - encoder: EncoderWithNullBuffer<'a>, + encoder: NullableEncoder<'a>, } impl<'a, K: ArrowDictionaryKeyType> DictionaryEncoder<'a, K> { @@ -709,8 +709,8 @@ impl Encoder for NullEncoder { struct MapEncoder<'a> { offsets: OffsetBuffer, - keys: EncoderWithNullBuffer<'a>, - values: EncoderWithNullBuffer<'a>, + keys: NullableEncoder<'a>, + values: NullableEncoder<'a>, explicit_nulls: bool, } diff --git a/arrow-json/src/writer/mod.rs b/arrow-json/src/writer/mod.rs index fb4be4ea01e2..5b4554df7d21 100644 --- a/arrow-json/src/writer/mod.rs +++ b/arrow-json/src/writer/mod.rs @@ -112,7 +112,7 @@ use crate::StructMode; use arrow_array::*; use arrow_schema::*; -pub use encoder::{make_encoder, Encoder, EncoderFactory, EncoderOptions, EncoderWithNullBuffer}; +pub use encoder::{make_encoder, Encoder, EncoderFactory, EncoderOptions, NullableEncoder}; /// This trait defines how to format a sequence of JSON objects to a /// byte stream. @@ -2095,7 +2095,7 @@ mod tests { _field: &'a FieldRef, array: &'a dyn Array, _options: &'a EncoderOptions, - ) -> Result>, ArrowError> { + ) -> Result>, ArrowError> { let data_type = array.data_type(); let fields = match data_type { DataType::Union(fields, UnionMode::Sparse) => fields, @@ -2136,7 +2136,7 @@ mod tests { let array_encoder = Box::new(UnionEncoder { array: values }) as Box; let nulls = array.nulls().cloned(); - Ok(Some(EncoderWithNullBuffer::new(array_encoder, nulls))) + Ok(Some(NullableEncoder::new(array_encoder, nulls))) } } @@ -2311,14 +2311,14 @@ mod tests { _field: &'a FieldRef, array: &'a dyn Array, _options: &'a EncoderOptions, - ) -> Result>, ArrowError> { + ) -> Result>, ArrowError> { match array.data_type() { DataType::Binary => { let array = array.as_binary::(); let encoder = IntArrayBinaryEncoder { array }; let array_encoder = Box::new(encoder) as Box; let nulls = array.nulls().cloned(); - Ok(Some(EncoderWithNullBuffer::new(array_encoder, nulls))) + Ok(Some(NullableEncoder::new(array_encoder, nulls))) } _ => Ok(None), } @@ -2385,7 +2385,7 @@ mod tests { field: &'a FieldRef, array: &'a dyn Array, _options: &'a EncoderOptions, - ) -> Result>, ArrowError> { + ) -> Result>, ArrowError> { // The point here is: // 1. You can use information from Field to determine how to do the encoding. // 2. For dictionary arrays the Field is always the outer field but the array may be the keys or values array @@ -2403,7 +2403,7 @@ mod tests { array: array.clone(), }; let array_encoder = Box::new(encoder) as Box; - Ok(Some(EncoderWithNullBuffer::new(array_encoder, nulls))) + Ok(Some(NullableEncoder::new(array_encoder, nulls))) } _ => Ok(None), } From fc7f1f0e7fb39dda289791e209e21d62c63b4b6c Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 18 Mar 2025 17:56:37 -0500 Subject: [PATCH 18/19] Update arrow-json/src/writer/mod.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- arrow-json/src/writer/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/arrow-json/src/writer/mod.rs b/arrow-json/src/writer/mod.rs index 5b4554df7d21..37879d938255 100644 --- a/arrow-json/src/writer/mod.rs +++ b/arrow-json/src/writer/mod.rs @@ -366,12 +366,10 @@ where false, )); - let encoder = make_encoder(&field, &array, &self.options)?; + let mut encoder = make_encoder(&field, &array, &self.options)?; // Validate that the root is not nullable assert!(!encoder.has_nulls(), "root cannot be nullable"); - - let mut encoder = make_encoder(&field, &array, &self.options)?; for idx in 0..batch.num_rows() { self.format.start_row(&mut buffer, is_first_row)?; is_first_row = false; From fe8a039dde4f35ed91f72cc89bfa34b2b7b67672 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 18 Mar 2025 18:11:15 -0500 Subject: [PATCH 19/19] Update mod.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- arrow-json/src/writer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-json/src/writer/mod.rs b/arrow-json/src/writer/mod.rs index 37879d938255..ee1b5fabe538 100644 --- a/arrow-json/src/writer/mod.rs +++ b/arrow-json/src/writer/mod.rs @@ -362,7 +362,7 @@ where let array = StructArray::from(batch.clone()); let field = Arc::new(Field::new_struct( "", - batch.schema().fields().iter().cloned().collect::>(), + batch.schema().fields().clone(), false, ));