diff --git a/rust/arrow-flight/src/utils.rs b/rust/arrow-flight/src/utils.rs index ee19f34a7c5..995aa18bc43 100644 --- a/rust/arrow-flight/src/utils.rs +++ b/rust/arrow-flight/src/utils.rs @@ -26,40 +26,29 @@ use arrow::error::{ArrowError, Result}; use arrow::ipc::{convert, reader, writer, writer::IpcWriteOptions}; use arrow::record_batch::RecordBatch; -/// Convert a `RecordBatch` to `FlightData` by converting the header and body to bytes -/// -/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options, -/// use `flight_data_from_arrow_batch()` -impl From<&RecordBatch> for FlightData { - fn from(batch: &RecordBatch) -> Self { - let options = IpcWriteOptions::default(); - flight_data_from_arrow_batch(batch, &options) - } -} - -/// Convert a `RecordBatch` to `FlightData` by converting the header and body to bytes +/// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries +/// and values pub fn flight_data_from_arrow_batch( batch: &RecordBatch, options: &IpcWriteOptions, -) -> FlightData { - let data = writer::record_batch_to_bytes(batch, &options); - FlightData { - flight_descriptor: None, - app_metadata: vec![], - data_header: data.ipc_message, - data_body: data.arrow_data, - } -} +) -> Vec { + let data_gen = writer::IpcDataGenerator::default(); + let mut dictionary_tracker = writer::DictionaryTracker::new(false); -/// Convert a `Schema` to `SchemaResult` by converting to an IPC message -/// -/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options, -/// use `flight_schema_from_arrow_schema()` -impl From<&Schema> for SchemaResult { - fn from(schema: &Schema) -> Self { - let options = IpcWriteOptions::default(); - flight_schema_from_arrow_schema(schema, &options) - } + let (encoded_dictionaries, encoded_batch) = data_gen + .encoded_batch(batch, &mut dictionary_tracker, &options) + .expect("DictionaryTracker configured above to not error on replacement"); + + encoded_dictionaries + .into_iter() + .chain(std::iter::once(encoded_batch)) + .map(|data| FlightData { + flight_descriptor: None, + app_metadata: vec![], + data_header: data.ipc_message, + data_body: data.arrow_data, + }) + .collect() } /// Convert a `Schema` to `SchemaResult` by converting to an IPC message @@ -67,19 +56,11 @@ pub fn flight_schema_from_arrow_schema( schema: &Schema, options: &IpcWriteOptions, ) -> SchemaResult { - SchemaResult { - schema: writer::schema_to_bytes(schema, &options).ipc_message, - } -} + let data_gen = writer::IpcDataGenerator::default(); + let schema_bytes = data_gen.schema_to_bytes(schema, &options); -/// Convert a `Schema` to `FlightData` by converting to an IPC message -/// -/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options, -/// use `flight_data_from_arrow_schema()` -impl From<&Schema> for FlightData { - fn from(schema: &Schema) -> Self { - let options = writer::IpcWriteOptions::default(); - flight_data_from_arrow_schema(schema, &options) + SchemaResult { + schema: schema_bytes.ipc_message, } } @@ -88,7 +69,8 @@ pub fn flight_data_from_arrow_schema( schema: &Schema, options: &IpcWriteOptions, ) -> FlightData { - let schema = writer::schema_to_bytes(schema, &options); + let data_gen = writer::IpcDataGenerator::default(); + let schema = data_gen.schema_to_bytes(schema, &options); FlightData { flight_descriptor: None, app_metadata: vec![], diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index d6a52a62c5d..361edb8f510 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -98,6 +98,239 @@ impl Default for IpcWriteOptions { } } +#[derive(Debug, Default)] +pub struct IpcDataGenerator {} + +impl IpcDataGenerator { + pub fn schema_to_bytes( + &self, + schema: &Schema, + write_options: &IpcWriteOptions, + ) -> EncodedData { + let mut fbb = FlatBufferBuilder::new(); + let schema = { + let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema); + fb.as_union_value() + }; + + let mut message = ipc::MessageBuilder::new(&mut fbb); + message.add_version(write_options.metadata_version); + message.add_header_type(ipc::MessageHeader::Schema); + message.add_bodyLength(0); + message.add_header(schema); + // TODO: custom metadata + let data = message.finish(); + fbb.finish(data, None); + + let data = fbb.finished_data(); + EncodedData { + ipc_message: data.to_vec(), + arrow_data: vec![], + } + } + + pub fn encoded_batch( + &self, + batch: &RecordBatch, + dictionary_tracker: &mut DictionaryTracker, + write_options: &IpcWriteOptions, + ) -> Result<(Vec, EncodedData)> { + // TODO: handle nested dictionaries + let schema = batch.schema(); + let mut encoded_dictionaries = Vec::with_capacity(schema.fields().len()); + + for (i, field) in schema.fields().iter().enumerate() { + let column = batch.column(i); + + if let DataType::Dictionary(_key_type, _value_type) = column.data_type() { + let dict_id = field + .dict_id() + .expect("All Dictionary types have `dict_id`"); + let dict_data = column.data(); + let dict_values = &dict_data.child_data()[0]; + + let emit = dictionary_tracker.insert(dict_id, column)?; + + if emit { + encoded_dictionaries.push(self.dictionary_batch_to_bytes( + dict_id, + dict_values, + write_options, + )); + } + } + } + + let encoded_message = self.record_batch_to_bytes(batch, write_options); + + Ok((encoded_dictionaries, encoded_message)) + } + + /// Write a `RecordBatch` into two sets of bytes, one for the header (ipc::Message) and the + /// other for the batch's data + fn record_batch_to_bytes( + &self, + batch: &RecordBatch, + write_options: &IpcWriteOptions, + ) -> EncodedData { + let mut fbb = FlatBufferBuilder::new(); + + let mut nodes: Vec = vec![]; + let mut buffers: Vec = vec![]; + let mut arrow_data: Vec = vec![]; + let mut offset = 0; + for array in batch.columns() { + let array_data = array.data(); + offset = write_array_data( + &array_data, + &mut buffers, + &mut arrow_data, + &mut nodes, + offset, + array.len(), + array.null_count(), + ); + } + + // write data + let buffers = fbb.create_vector(&buffers); + let nodes = fbb.create_vector(&nodes); + + let root = { + let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); + batch_builder.add_length(batch.num_rows() as i64); + batch_builder.add_nodes(nodes); + batch_builder.add_buffers(buffers); + let b = batch_builder.finish(); + b.as_union_value() + }; + // create an ipc::Message + let mut message = ipc::MessageBuilder::new(&mut fbb); + message.add_version(write_options.metadata_version); + message.add_header_type(ipc::MessageHeader::RecordBatch); + message.add_bodyLength(arrow_data.len() as i64); + message.add_header(root); + let root = message.finish(); + fbb.finish(root, None); + let finished_data = fbb.finished_data(); + + EncodedData { + ipc_message: finished_data.to_vec(), + arrow_data, + } + } + + /// Write dictionary values into two sets of bytes, one for the header (ipc::Message) and the + /// other for the data + fn dictionary_batch_to_bytes( + &self, + dict_id: i64, + array_data: &ArrayDataRef, + write_options: &IpcWriteOptions, + ) -> EncodedData { + let mut fbb = FlatBufferBuilder::new(); + + let mut nodes: Vec = vec![]; + let mut buffers: Vec = vec![]; + let mut arrow_data: Vec = vec![]; + + write_array_data( + &array_data, + &mut buffers, + &mut arrow_data, + &mut nodes, + 0, + array_data.len(), + array_data.null_count(), + ); + + // write data + let buffers = fbb.create_vector(&buffers); + let nodes = fbb.create_vector(&nodes); + + let root = { + let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); + batch_builder.add_length(array_data.len() as i64); + batch_builder.add_nodes(nodes); + batch_builder.add_buffers(buffers); + batch_builder.finish() + }; + + let root = { + let mut batch_builder = ipc::DictionaryBatchBuilder::new(&mut fbb); + batch_builder.add_id(dict_id); + batch_builder.add_data(root); + batch_builder.finish().as_union_value() + }; + + let root = { + let mut message_builder = ipc::MessageBuilder::new(&mut fbb); + message_builder.add_version(write_options.metadata_version); + message_builder.add_header_type(ipc::MessageHeader::DictionaryBatch); + message_builder.add_bodyLength(arrow_data.len() as i64); + message_builder.add_header(root); + message_builder.finish() + }; + + fbb.finish(root, None); + let finished_data = fbb.finished_data(); + + EncodedData { + ipc_message: finished_data.to_vec(), + arrow_data, + } + } +} + +/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary +/// multiple times. Can optionally error if an update to an existing dictionary is attempted, which +/// isn't allowed in the `FileWriter`. +pub struct DictionaryTracker { + written: HashMap, + error_on_replacement: bool, +} + +impl DictionaryTracker { + pub fn new(error_on_replacement: bool) -> Self { + Self { + written: HashMap::new(), + error_on_replacement, + } + } + + /// Keep track of the dictionary with the given ID and values. Behavior: + /// + /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate + /// that the dictionary was not actually inserted (because it's already been seen). + /// * If this ID has been written already but with different data, and this tracker is + /// configured to return an error, return an error. + /// * If the tracker has not been configured to error on replacement or this dictionary + /// has never been seen before, return `Ok(true)` to indicate that the dictionary was just + /// inserted. + pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result { + let dict_data = column.data(); + let dict_values = &dict_data.child_data()[0]; + + // If a dictionary with this id was already emitted, check if it was the same. + if let Some(last) = self.written.get(&dict_id) { + if last.data().child_data()[0] == *dict_values { + // Same dictionary values => no need to emit it again + return Ok(false); + } else if self.error_on_replacement { + return Err(ArrowError::InvalidArgumentError( + "Dictionary replacement detected when writing IPC file format. \ + Arrow IPC files only support a single dictionary for a given field \ + across all batches." + .to_string(), + )); + } + } + + self.written.insert(dict_id, column.clone()); + Ok(true) + } +} + pub struct FileWriter { /// The object to write to writer: BufWriter, @@ -114,7 +347,9 @@ pub struct FileWriter { /// Whether the writer footer has been written, and the writer is finished finished: bool, /// Keeps track of dictionaries that have been written - last_written_dictionaries: HashMap, + dictionary_tracker: DictionaryTracker, + + data_gen: IpcDataGenerator, } impl FileWriter { @@ -130,14 +365,15 @@ impl FileWriter { schema: &Schema, write_options: IpcWriteOptions, ) -> Result { + let data_gen = IpcDataGenerator::default(); let mut writer = BufWriter::new(writer); // write magic to header writer.write_all(&super::ARROW_MAGIC[..])?; // create an 8-byte boundary after the header writer.write_all(&[0, 0])?; // write the schema, set the written bytes to the schema + header - let message = Message::Schema(schema, &write_options); - let (meta, data) = write_message(&mut writer, &message, &write_options)?; + let encoded_message = data_gen.schema_to_bytes(schema, &write_options); + let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?; Ok(Self { writer, write_options, @@ -146,7 +382,8 @@ impl FileWriter { dictionary_blocks: vec![], record_blocks: vec![], finished: false, - last_written_dictionaries: HashMap::new(), + dictionary_tracker: DictionaryTracker::new(true), + data_gen, }) } @@ -157,10 +394,25 @@ impl FileWriter { "Cannot write record batch to file writer as it is closed".to_string(), )); } - self.write_dictionaries(&batch)?; - let message = Message::RecordBatch(batch, &self.write_options); + + let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch( + batch, + &mut self.dictionary_tracker, + &self.write_options, + )?; + + for encoded_dictionary in encoded_dictionaries { + let (meta, data) = + write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; + + let block = + ipc::Block::new(self.block_offsets as i64, meta as i32, data as i64); + self.dictionary_blocks.push(block); + self.block_offsets += meta + data; + } + let (meta, data) = - write_message(&mut self.writer, &message, &self.write_options)?; + write_message(&mut self.writer, encoded_message, &self.write_options)?; // add a record block for the footer let block = ipc::Block::new( self.block_offsets as i64, @@ -172,53 +424,6 @@ impl FileWriter { Ok(()) } - fn write_dictionaries(&mut self, batch: &RecordBatch) -> Result<()> { - // TODO: handle nested dictionaries - - let schema = batch.schema(); - for (i, field) in schema.fields().iter().enumerate() { - let column = batch.column(i); - - if let DataType::Dictionary(_key_type, _value_type) = column.data_type() { - let dict_id = field - .dict_id() - .expect("All Dictionary types have `dict_id`"); - let dict_data = column.data(); - let dict_values = &dict_data.child_data()[0]; - - // If a dictionary with this id was already emitted, check if it was the same. - if let Some(last_dictionary) = - self.last_written_dictionaries.get(&dict_id) - { - if last_dictionary.data().child_data()[0] == *dict_values { - // Same dictionary values => no need to emit it again - continue; - } else { - return Err(ArrowError::InvalidArgumentError( - "Dictionary replacement detected when writing IPC file format. \ - Arrow IPC files only support a single dictionary for a given field \ - across all batches.".to_string())); - } - } - - self.last_written_dictionaries - .insert(dict_id, column.clone()); - - let message = - Message::DictionaryBatch(dict_id, dict_values, &self.write_options); - - let (meta, data) = - write_message(&mut self.writer, &message, &self.write_options)?; - - let block = - ipc::Block::new(self.block_offsets as i64, meta as i32, data as i64); - self.dictionary_blocks.push(block); - self.block_offsets += meta + data; - } - } - Ok(()) - } - /// Write footer and closing tag, then mark the writer as done pub fn finish(&mut self) -> Result<()> { // write EOS @@ -269,7 +474,9 @@ pub struct StreamWriter { /// Whether the writer footer has been written, and the writer is finished finished: bool, /// Keeps track of dictionaries that have been written - last_written_dictionaries: HashMap, + dictionary_tracker: DictionaryTracker, + + data_gen: IpcDataGenerator, } impl StreamWriter { @@ -284,16 +491,18 @@ impl StreamWriter { schema: &Schema, write_options: IpcWriteOptions, ) -> Result { + let data_gen = IpcDataGenerator::default(); let mut writer = BufWriter::new(writer); // write the schema, set the written bytes to the schema - let message = Message::Schema(schema, &write_options); - write_message(&mut writer, &message, &write_options)?; + let encoded_message = data_gen.schema_to_bytes(schema, &write_options); + write_message(&mut writer, encoded_message, &write_options)?; Ok(Self { writer, write_options, schema: schema.clone(), finished: false, - last_written_dictionaries: HashMap::new(), + dictionary_tracker: DictionaryTracker::new(false), + data_gen, }) } @@ -304,46 +513,17 @@ impl StreamWriter { "Cannot write record batch to stream writer as it is closed".to_string(), )); } - self.write_dictionaries(&batch)?; - - let message = Message::RecordBatch(batch, &self.write_options); - write_message(&mut self.writer, &message, &self.write_options)?; - Ok(()) - } - - fn write_dictionaries(&mut self, batch: &RecordBatch) -> Result<()> { - // TODO: handle nested dictionaries - - let schema = batch.schema(); - for (i, field) in schema.fields().iter().enumerate() { - let column = batch.column(i); - - if let DataType::Dictionary(_key_type, _value_type) = column.data_type() { - let dict_id = field - .dict_id() - .expect("All Dictionary types have `dict_id`"); - let dict_data = column.data(); - let dict_values = &dict_data.child_data()[0]; - - // If a dictionary with this id was already emitted, check if it was the same. - if let Some(last_dictionary) = - self.last_written_dictionaries.get(&dict_id) - { - if last_dictionary.data().child_data()[0] == *dict_values { - // Same dictionary values => no need to emit it again - continue; - } - } - self.last_written_dictionaries - .insert(dict_id, column.clone()); + let (encoded_dictionaries, encoded_message) = self + .data_gen + .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options) + .expect("StreamWriter is configured to not error on dictionary replacement"); - let message = - Message::DictionaryBatch(dict_id, dict_values, &self.write_options); - - write_message(&mut self.writer, &message, &self.write_options)?; - } + for encoded_dictionary in encoded_dictionaries { + write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; } + + write_message(&mut self.writer, encoded_message, &self.write_options)?; Ok(()) } @@ -374,57 +554,12 @@ pub struct EncodedData { pub arrow_data: Vec, } -pub fn schema_to_bytes(schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData { - let mut fbb = FlatBufferBuilder::new(); - let schema = { - let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema); - fb.as_union_value() - }; - - let mut message = ipc::MessageBuilder::new(&mut fbb); - message.add_version(write_options.metadata_version); - message.add_header_type(ipc::MessageHeader::Schema); - message.add_bodyLength(0); - message.add_header(schema); - // TODO: custom metadata - let data = message.finish(); - fbb.finish(data, None); - - let data = fbb.finished_data(); - EncodedData { - ipc_message: data.to_vec(), - arrow_data: vec![], - } -} - -enum Message<'a> { - Schema(&'a Schema, &'a IpcWriteOptions), - RecordBatch(&'a RecordBatch, &'a IpcWriteOptions), - DictionaryBatch(i64, &'a ArrayDataRef, &'a IpcWriteOptions), -} - -impl<'a> Message<'a> { - /// Encode message to a ipc::Message and return data as bytes - fn encode(&'a self) -> EncodedData { - match self { - Message::Schema(schema, options) => schema_to_bytes(*schema, *options), - Message::RecordBatch(batch, options) => { - record_batch_to_bytes(*batch, *options) - } - Message::DictionaryBatch(dict_id, array_data, options) => { - dictionary_batch_to_bytes(*dict_id, *array_data, *options) - } - } - } -} - /// Write a message's IPC data and buffers, returning metadata and buffer data lengths written fn write_message( mut writer: &mut BufWriter, - message: &Message, + encoded: EncodedData, write_options: &IpcWriteOptions, ) -> Result<(usize, usize)> { - let encoded = message.encode(); let arrow_data_len = encoded.arrow_data.len(); if arrow_data_len % 8 != 0 { return Err(ArrowError::MemoryError( @@ -481,117 +616,6 @@ fn write_body_buffers(writer: &mut BufWriter, data: &[u8]) -> Resul Ok(total_len as usize) } -/// Write a `RecordBatch` into a tuple of bytes, one for the header (ipc::Message) and the other for the batch's data -pub fn record_batch_to_bytes( - batch: &RecordBatch, - write_options: &IpcWriteOptions, -) -> EncodedData { - let mut fbb = FlatBufferBuilder::new(); - - let mut nodes: Vec = vec![]; - let mut buffers: Vec = vec![]; - let mut arrow_data: Vec = vec![]; - let mut offset = 0; - for array in batch.columns() { - let array_data = array.data(); - offset = write_array_data( - &array_data, - &mut buffers, - &mut arrow_data, - &mut nodes, - offset, - array.len(), - array.null_count(), - ); - } - - // write data - let buffers = fbb.create_vector(&buffers); - let nodes = fbb.create_vector(&nodes); - - let root = { - let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); - batch_builder.add_length(batch.num_rows() as i64); - batch_builder.add_nodes(nodes); - batch_builder.add_buffers(buffers); - let b = batch_builder.finish(); - b.as_union_value() - }; - // create an ipc::Message - let mut message = ipc::MessageBuilder::new(&mut fbb); - message.add_version(write_options.metadata_version); - message.add_header_type(ipc::MessageHeader::RecordBatch); - message.add_bodyLength(arrow_data.len() as i64); - message.add_header(root); - let root = message.finish(); - fbb.finish(root, None); - let finished_data = fbb.finished_data(); - - EncodedData { - ipc_message: finished_data.to_vec(), - arrow_data, - } -} - -/// Write dictionary values into a tuple of bytes, one for the header (ipc::Message) and the other for the data -pub fn dictionary_batch_to_bytes( - dict_id: i64, - array_data: &ArrayDataRef, - write_options: &IpcWriteOptions, -) -> EncodedData { - let mut fbb = FlatBufferBuilder::new(); - - let mut nodes: Vec = vec![]; - let mut buffers: Vec = vec![]; - let mut arrow_data: Vec = vec![]; - - write_array_data( - &array_data, - &mut buffers, - &mut arrow_data, - &mut nodes, - 0, - array_data.len(), - array_data.null_count(), - ); - - // write data - let buffers = fbb.create_vector(&buffers); - let nodes = fbb.create_vector(&nodes); - - let root = { - let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); - batch_builder.add_length(array_data.len() as i64); - batch_builder.add_nodes(nodes); - batch_builder.add_buffers(buffers); - batch_builder.finish() - }; - - let root = { - let mut batch_builder = ipc::DictionaryBatchBuilder::new(&mut fbb); - batch_builder.add_id(dict_id); - batch_builder.add_data(root); - batch_builder.finish().as_union_value() - }; - - let root = { - let mut message_builder = ipc::MessageBuilder::new(&mut fbb); - message_builder.add_version(write_options.metadata_version); - message_builder.add_header_type(ipc::MessageHeader::DictionaryBatch); - message_builder.add_bodyLength(arrow_data.len() as i64); - message_builder.add_header(root); - message_builder.finish() - }; - - fbb.finish(root, None); - let finished_data = fbb.finished_data(); - - EncodedData { - ipc_message: finished_data.to_vec(), - arrow_data, - } -} - /// Write a record batch to the writer, writing the message size before the message /// if the record batch is being written to a stream fn write_continuation( diff --git a/rust/datafusion/examples/flight_server.rs b/rust/datafusion/examples/flight_server.rs index a601b7cafdd..d835ab050d6 100644 --- a/rust/datafusion/examples/flight_server.rs +++ b/rust/datafusion/examples/flight_server.rs @@ -66,7 +66,13 @@ impl FlightService for FlightServiceImpl { let table = ParquetTable::try_new(&request.path[0]).unwrap(); - Ok(Response::new(SchemaResult::from(table.schema().as_ref()))) + let options = arrow::ipc::writer::IpcWriteOptions::default(); + let schema_result = arrow_flight::utils::flight_schema_from_arrow_schema( + table.schema().as_ref(), + &options, + ); + + Ok(Response::new(schema_result)) } async fn do_get( @@ -108,13 +114,26 @@ impl FlightService for FlightServiceImpl { } // add an initial FlightData message that sends schema + let options = arrow::ipc::writer::IpcWriteOptions::default(); let schema = plan.schema(); + let schema_flight_data = + arrow_flight::utils::flight_data_from_arrow_schema( + schema.as_ref(), + &options, + ); + let mut flights: Vec> = - vec![Ok(FlightData::from(schema.as_ref()))]; + vec![Ok(schema_flight_data)]; let mut batches: Vec> = results .iter() - .map(|batch| Ok(FlightData::from(batch))) + .flat_map(|batch| { + let flight_data = + arrow_flight::utils::flight_data_from_arrow_batch( + batch, &options, + ); + flight_data.into_iter().map(Ok) + }) .collect(); // append batch vector to schema vector, so that the first message sent is the schema diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index c93325b79b1..0c04704ae0f 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -205,7 +205,8 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option { /// Encodes the Arrow schema into the IPC format, and base64 encodes it fn encode_arrow_schema(schema: &Schema) -> String { let options = writer::IpcWriteOptions::default(); - let mut serialized_schema = arrow::ipc::writer::schema_to_bytes(&schema, &options); + let data_gen = arrow::ipc::writer::IpcDataGenerator::default(); + let mut serialized_schema = data_gen.schema_to_bytes(&schema, &options); // manually prepending the length to the schema as arrow uses the legacy IPC format // TODO: change after addressing ARROW-9777