From 9d8e09848c64322c4e5d0ae6cf85f079be84cac0 Mon Sep 17 00:00:00 2001 From: mqy Date: Fri, 25 Dec 2020 20:18:11 +0800 Subject: [PATCH 1/4] IPC: support write custom metadata --- rust/arrow-flight/src/utils.rs | 4 +- rust/arrow/src/datatypes.rs | 6 + rust/arrow/src/ipc/reader.rs | 6 +- rust/arrow/src/ipc/writer.rs | 193 +++++++++++++----- .../src/bin/arrow-file-to-stream.rs | 6 +- .../src/bin/arrow-json-integration-test.rs | 5 +- .../src/bin/arrow-stream-to-file.rs | 5 +- rust/parquet/src/arrow/schema.rs | 2 +- 8 files changed, 167 insertions(+), 60 deletions(-) diff --git a/rust/arrow-flight/src/utils.rs b/rust/arrow-flight/src/utils.rs index c2e01fb6ccc..1909a29b020 100644 --- a/rust/arrow-flight/src/utils.rs +++ b/rust/arrow-flight/src/utils.rs @@ -57,7 +57,7 @@ pub fn flight_schema_from_arrow_schema( options: &IpcWriteOptions, ) -> SchemaResult { let data_gen = writer::IpcDataGenerator::default(); - let schema_bytes = data_gen.schema_to_bytes(schema, &options); + let schema_bytes = data_gen.schema_to_bytes(schema, &options, &None); SchemaResult { schema: schema_bytes.ipc_message, @@ -70,7 +70,7 @@ pub fn flight_data_from_arrow_schema( options: &IpcWriteOptions, ) -> FlightData { let data_gen = writer::IpcDataGenerator::default(); - let schema = data_gen.schema_to_bytes(schema, &options); + let schema = data_gen.schema_to_bytes(schema, &options, &None); FlightData { flight_descriptor: None, app_metadata: vec![], diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index d2cf47e80a1..c80fa39f16e 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -22,6 +22,7 @@ //! * [`Field`](crate::datatypes::Field) to describe one field within a schema. //! * [`DataType`](crate::datatypes::DataType) to describe the type of a field. +use std::collections::BTreeMap; use std::collections::HashMap; use std::default::Default; use std::fmt; @@ -183,6 +184,11 @@ pub enum IntervalUnit { DayTime, } +/// The `CustomMetaData` is an alias to `BTreeMap` which implements traits: Hash, PartialOrd, Ord. +/// It contains custom meta data (key-value pairs), defined by flatbuffers structs: +/// `Field`, `Message`, or file `Footer`. +pub type CustomMetaData = BTreeMap; + /// Contains the meta-data for a single relative type. /// /// The `Schema` object is an ordered collection of `Field` objects. diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index 29e0e23fd35..70875bdcf1b 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -1068,9 +1068,9 @@ mod tests { let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap(); // create stream writer let file = File::create("target/debug/testdata/float.stream").unwrap(); - let mut stream_writer = - crate::ipc::writer::StreamWriter::try_new(file, &schema).unwrap(); - stream_writer.write(&batch).unwrap(); + let mut stream_writer = crate::ipc::writer::StreamWriter::new(file, &schema); + stream_writer.write_schema().unwrap(); + stream_writer.write_record_batch(&batch).unwrap(); stream_writer.finish().unwrap(); // read stream back diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 694375a4faf..8d4f44c02e2 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -107,6 +107,7 @@ impl IpcDataGenerator { &self, schema: &Schema, write_options: &IpcWriteOptions, + custom_metadata: &Option, ) -> EncodedData { let mut fbb = FlatBufferBuilder::new(); let schema = { @@ -114,13 +115,36 @@ impl IpcDataGenerator { fb.as_union_value() }; - let mut message = ipc::MessageBuilder::new(&mut fbb); - message.add_version(write_options.metadata_version); - message.add_header_type(ipc::MessageHeader::Schema); - message.add_bodyLength(0); - message.add_header(schema); - // TODO: custom metadata - let data = message.finish(); + // Optional custom metadata. + let mut fb_metadata = None; + if let Some(md) = custom_metadata { + if !md.is_empty() { + let mut kv_vec = vec![]; + for (k, v) in md { + let kv_args = ipc::KeyValueArgs { + key: Some(fbb.create_string(k.as_str())), + value: Some(fbb.create_string(v.as_str())), + }; + let kv_offset = ipc::KeyValue::create(&mut fbb, &kv_args); + kv_vec.push(kv_offset); + } + + fb_metadata = Some(fbb.create_vector(&kv_vec)); + } + } + + let message_args = ipc::MessageArgs { + version: write_options.metadata_version, + header_type: ipc::MessageHeader::Schema, + header: Some(schema), + bodyLength: 0, + custom_metadata: fb_metadata, + }; + + // NOTE: + // As of crate `flatbuffers` 0.8.0, with `Message::new()`, almost no way to fix + // compilation error caused by "multiple mutable reference to fbb". + let data = ipc::Message::create(&mut fbb, &message_args); fbb.finish(data, None); let data = fbb.finished_data(); @@ -339,6 +363,8 @@ pub struct FileWriter { write_options: IpcWriteOptions, /// A reference to the schema, used in validating record batches schema: Schema, + /// Optional custom metadata. + custom_metadata: Option, /// The number of bytes between each block of bytes, as an offset for random access block_offsets: usize, /// Dictionary blocks that will be written as part of the IPC footer @@ -355,41 +381,78 @@ pub struct FileWriter { impl FileWriter { /// Try create a new writer, with the schema written as part of the header + #[deprecated( + since = "2.0.0", + note = "This method is deprecated in favour of `new(...)[.with_write_options(...).with_custom_schema()].build()`" + )] pub fn try_new(writer: W, schema: &Schema) -> Result { - let write_options = IpcWriteOptions::default(); - Self::try_new_with_options(writer, schema, write_options) + let mut me = FileWriter::new(writer, schema); + me.write_header_schema()?; + Ok(me) } /// Try create a new writer with IpcWriteOptions + #[deprecated( + since = "2.0.0", + note = "This method is deprecated in favour of `new(...).with_write_options(...)`" + )] pub fn try_new_with_options( writer: W, schema: &Schema, write_options: IpcWriteOptions, ) -> Result { - let data_gen = IpcDataGenerator::default(); - let mut writer = BufWriter::new(writer); - // write magic to header - writer.write_all(&super::ARROW_MAGIC[..])?; - // create an 8-byte boundary after the header - writer.write_all(&[0, 0])?; - // write the schema, set the written bytes to the schema + header - let encoded_message = data_gen.schema_to_bytes(schema, &write_options); - let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?; - Ok(Self { - writer, - write_options, + let mut me = FileWriter::new(writer, schema).with_write_options(write_options); + me.write_header_schema()?; + Ok(me) + } + + pub fn new(writer: W, schema: &Schema) -> Self { + FileWriter { + writer: BufWriter::new(writer), + write_options: IpcWriteOptions::default(), schema: schema.clone(), - block_offsets: meta + data + 8, + custom_metadata: None, + block_offsets: 0, dictionary_blocks: vec![], record_blocks: vec![], finished: false, dictionary_tracker: DictionaryTracker::new(true), - data_gen, - }) + data_gen: IpcDataGenerator::default(), + } + } + + pub fn with_write_options(mut self, write_options: IpcWriteOptions) -> Self { + self.write_options = write_options; + self + } + + pub fn with_custom_metadata(mut self, metadata: CustomMetaData) -> Self { + self.custom_metadata = Some(metadata); + self + } + + /// Write header and schema. + pub fn write_header_schema(&mut self) -> Result<()> { + // write magic to header + self.writer.write_all(&super::ARROW_MAGIC[..])?; + // create an 8-byte boundary after the header + self.writer.write_all(&[0, 0])?; + // write the schema, set the written bytes to the schema + header + let encoded_message = self.data_gen.schema_to_bytes( + &self.schema, + &self.write_options, + &self.custom_metadata, + ); + let (meta, data) = + write_message(&mut self.writer, encoded_message, &self.write_options)?; + + self.block_offsets = meta + data + 8; + + Ok(()) } /// Write a record batch to the file - pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + pub fn write_record_batch(&mut self, batch: &RecordBatch) -> Result<()> { if self.finished { return Err(ArrowError::IoError( "Cannot write record batch to file writer as it is closed".to_string(), @@ -472,6 +535,8 @@ pub struct StreamWriter { write_options: IpcWriteOptions, /// A reference to the schema, used in validating record batches schema: Schema, + /// Optional custom metadata. + custom_metadata: Option, /// Whether the writer footer has been written, and the writer is finished finished: bool, /// Keeps track of dictionaries that have been written @@ -482,33 +547,64 @@ pub struct StreamWriter { impl StreamWriter { /// Try create a new writer, with the schema written as part of the header + #[deprecated( + since = "2.0.0", + note = "This method is deprecated in favour of `new(...)[.with_options(...).with_schema()].build()`" + )] pub fn try_new(writer: W, schema: &Schema) -> Result { - let write_options = IpcWriteOptions::default(); - Self::try_new_with_options(writer, schema, write_options) + let mut me = StreamWriter::new(writer, schema); + me.write_schema()?; + Ok(me) } + #[deprecated( + since = "2.0.0", + note = "This method is deprecated in favour of `new(...)[.with_options(...).with_schema()].build()`" + )] pub fn try_new_with_options( writer: W, schema: &Schema, write_options: IpcWriteOptions, ) -> Result { - let data_gen = IpcDataGenerator::default(); - let mut writer = BufWriter::new(writer); - // write the schema, set the written bytes to the schema - let encoded_message = data_gen.schema_to_bytes(schema, &write_options); - write_message(&mut writer, encoded_message, &write_options)?; - Ok(Self { - writer, - write_options, + let mut me = StreamWriter::new(writer, schema).with_write_options(write_options); + me.write_schema()?; + Ok(me) + } + + pub fn new(writer: W, schema: &Schema) -> Self { + StreamWriter { + writer: BufWriter::new(writer), + write_options: IpcWriteOptions::default(), schema: schema.clone(), + custom_metadata: None, finished: false, dictionary_tracker: DictionaryTracker::new(false), - data_gen, - }) + data_gen: IpcDataGenerator::default(), + } + } + + pub fn with_write_options(mut self, write_options: IpcWriteOptions) -> Self { + self.write_options = write_options; + self + } + + pub fn with_custom_metadata(mut self, metadata: CustomMetaData) -> Self { + self.custom_metadata = Some(metadata); + self + } + + pub fn write_schema(&mut self) -> Result<()> { + let encoded_message = self.data_gen.schema_to_bytes( + &self.schema, + &self.write_options, + &self.custom_metadata, + ); + write_message(&mut self.writer, encoded_message, &self.write_options)?; + Ok(()) } /// Write a record batch to the stream - pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + pub fn write_record_batch(&mut self, batch: &RecordBatch) -> Result<()> { if self.finished { return Err(ArrowError::IoError( "Cannot write record batch to stream writer as it is closed".to_string(), @@ -766,9 +862,10 @@ mod tests { .unwrap(); { let file = File::create("target/debug/testdata/arrow.arrow_file").unwrap(); - let mut writer = FileWriter::try_new(file, &schema).unwrap(); + let mut writer = FileWriter::new(file, &schema); + writer.write_header_schema().unwrap(); - writer.write(&batch).unwrap(); + writer.write_record_batch(&batch).unwrap(); // this is inside a block to test the implicit finishing of the file on `Drop` } @@ -815,9 +912,9 @@ mod tests { .unwrap(); { let file = File::create("target/debug/testdata/nulls.arrow_file").unwrap(); - let mut writer = FileWriter::try_new(file, &schema).unwrap(); - - writer.write(&batch).unwrap(); + let mut writer = FileWriter::new(file, &schema); + writer.write_header_schema().unwrap(); + writer.write_record_batch(&batch).unwrap(); // this is inside a block to test the implicit finishing of the file on `Drop` } @@ -867,9 +964,10 @@ mod tests { let file = File::create(format!("target/debug/testdata/{}.arrow_file", path)) .unwrap(); - let mut writer = FileWriter::try_new(file, &reader.schema()).unwrap(); + let mut writer = FileWriter::new(file, &reader.schema()); + writer.write_header_schema().unwrap(); while let Some(Ok(batch)) = reader.next() { - writer.write(&batch).unwrap(); + writer.write_record_batch(&batch).unwrap(); } writer.finish().unwrap(); } @@ -911,9 +1009,10 @@ mod tests { { let file = File::create(format!("target/debug/testdata/{}.stream", path)) .unwrap(); - let mut writer = StreamWriter::try_new(file, &reader.schema()).unwrap(); + let mut writer = StreamWriter::new(file, &reader.schema()); + writer.write_schema().unwrap(); reader.for_each(|batch| { - writer.write(&batch.unwrap()).unwrap(); + writer.write_record_batch(&batch.unwrap()).unwrap(); }); writer.finish().unwrap(); } diff --git a/rust/integration-testing/src/bin/arrow-file-to-stream.rs b/rust/integration-testing/src/bin/arrow-file-to-stream.rs index d6bb0428c0f..0be9d6bdec2 100644 --- a/rust/integration-testing/src/bin/arrow-file-to-stream.rs +++ b/rust/integration-testing/src/bin/arrow-file-to-stream.rs @@ -31,11 +31,11 @@ fn main() -> Result<()> { let mut reader = FileReader::try_new(reader)?; let schema = reader.schema(); - let mut writer = StreamWriter::try_new(io::stdout(), &schema)?; - + let mut writer = StreamWriter::new(io::stdout(), &schema); + writer.write_schema()?; reader.try_for_each(|batch| { let batch = batch?; - writer.write(&batch) + writer.write_record_batch(&batch) })?; writer.finish()?; diff --git a/rust/integration-testing/src/bin/arrow-json-integration-test.rs b/rust/integration-testing/src/bin/arrow-json-integration-test.rs index b1bec677cf1..08a9740ce8a 100644 --- a/rust/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/rust/integration-testing/src/bin/arrow-json-integration-test.rs @@ -84,10 +84,11 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()> let json_file = read_json_file(json_name)?; let arrow_file = File::create(arrow_name)?; - let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?; + let mut writer = FileWriter::new(arrow_file, &json_file.schema); + writer.write_header_schema()?; for b in json_file.batches { - writer.write(&b)?; + writer.write_record_batch(&b)?; } Ok(()) diff --git a/rust/integration-testing/src/bin/arrow-stream-to-file.rs b/rust/integration-testing/src/bin/arrow-stream-to-file.rs index f81d42e6eda..0a4d3752e77 100644 --- a/rust/integration-testing/src/bin/arrow-stream-to-file.rs +++ b/rust/integration-testing/src/bin/arrow-stream-to-file.rs @@ -25,9 +25,10 @@ fn main() -> Result<()> { let mut arrow_stream_reader = StreamReader::try_new(io::stdin())?; let schema = arrow_stream_reader.schema(); - let mut writer = FileWriter::try_new(io::stdout(), &schema)?; + let mut writer = FileWriter::new(io::stdout(), &schema); + writer.write_header_schema()?; - arrow_stream_reader.try_for_each(|batch| writer.write(&batch?))?; + arrow_stream_reader.try_for_each(|batch| writer.write_record_batch(&batch?))?; writer.finish()?; Ok(()) diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index 22213d4f0db..d9c39aef796 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -217,7 +217,7 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option { fn encode_arrow_schema(schema: &Schema) -> String { let options = writer::IpcWriteOptions::default(); let data_gen = arrow::ipc::writer::IpcDataGenerator::default(); - let mut serialized_schema = data_gen.schema_to_bytes(&schema, &options); + let mut serialized_schema = data_gen.schema_to_bytes(&schema, &options, &None); // manually prepending the length to the schema as arrow uses the legacy IPC format // TODO: change after addressing ARROW-9777 From 8dfa2f5b374a8304b856427502a4b048d89121ba Mon Sep 17 00:00:00 2001 From: mqy Date: Fri, 25 Dec 2020 20:54:04 +0800 Subject: [PATCH 2/4] Update deprecated notes --- rust/arrow/src/ipc/writer.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 8d4f44c02e2..7991d3c34ac 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -383,7 +383,7 @@ impl FileWriter { /// Try create a new writer, with the schema written as part of the header #[deprecated( since = "2.0.0", - note = "This method is deprecated in favour of `new(...)[.with_write_options(...).with_custom_schema()].build()`" + note = "This method is deprecated in favour of `let writer = FileWriter::new(writer, schema); writer.write_header_schema().unwrap();`" )] pub fn try_new(writer: W, schema: &Schema) -> Result { let mut me = FileWriter::new(writer, schema); @@ -394,7 +394,7 @@ impl FileWriter { /// Try create a new writer with IpcWriteOptions #[deprecated( since = "2.0.0", - note = "This method is deprecated in favour of `new(...).with_write_options(...)`" + note = "This method is deprecated in favour of `let writer = FileWriter::new(writer, schema).with_write_options(opt); w.write_header_schema().unwrap();`" )] pub fn try_new_with_options( writer: W, @@ -549,7 +549,7 @@ impl StreamWriter { /// Try create a new writer, with the schema written as part of the header #[deprecated( since = "2.0.0", - note = "This method is deprecated in favour of `new(...)[.with_options(...).with_schema()].build()`" + note = "This method is deprecated in favour of `let writer = StreamWriter::new(writer, schema); writer.write_schema().unwrap();`" )] pub fn try_new(writer: W, schema: &Schema) -> Result { let mut me = StreamWriter::new(writer, schema); @@ -559,7 +559,7 @@ impl StreamWriter { #[deprecated( since = "2.0.0", - note = "This method is deprecated in favour of `new(...)[.with_options(...).with_schema()].build()`" + note = "This method is deprecated in favour of `let writer = StreamWriter::new(writer, schema).with_write_options(opt); w.write_schema().unwrap();`" )] pub fn try_new_with_options( writer: W, From 7cfd12e7e4fd7a0a5ffdbe71eed3196699792763 Mon Sep 17 00:00:00 2001 From: mqy Date: Fri, 25 Dec 2020 23:11:47 +0800 Subject: [PATCH 3/4] update doc for CustomMetaData --- rust/arrow/src/datatypes.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index c80fa39f16e..81484904db5 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -185,8 +185,8 @@ pub enum IntervalUnit { } /// The `CustomMetaData` is an alias to `BTreeMap` which implements traits: Hash, PartialOrd, Ord. -/// It contains custom meta data (key-value pairs), defined by flatbuffers structs: -/// `Field`, `Message`, or file `Footer`. +/// It contains custom meta data (key-value pairs) defined by Arrow format for: field, file +/// footer, message and schema. pub type CustomMetaData = BTreeMap; /// Contains the meta-data for a single relative type. From f13c00bc32871b172acfc2678b5e6a136cd36c95 Mon Sep 17 00:00:00 2001 From: mqy Date: Sat, 26 Dec 2020 08:04:02 +0800 Subject: [PATCH 4/4] Make with_xxx() return &Self; Addd/fix docs --- rust/arrow/src/ipc/writer.rs | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 7991d3c34ac..5009aaf24ef 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -382,7 +382,7 @@ pub struct FileWriter { impl FileWriter { /// Try create a new writer, with the schema written as part of the header #[deprecated( - since = "2.0.0", + since = "3.0.0", note = "This method is deprecated in favour of `let writer = FileWriter::new(writer, schema); writer.write_header_schema().unwrap();`" )] pub fn try_new(writer: W, schema: &Schema) -> Result { @@ -393,7 +393,7 @@ impl FileWriter { /// Try create a new writer with IpcWriteOptions #[deprecated( - since = "2.0.0", + since = "3.0.0", note = "This method is deprecated in favour of `let writer = FileWriter::new(writer, schema).with_write_options(opt); w.write_header_schema().unwrap();`" )] pub fn try_new_with_options( @@ -401,11 +401,13 @@ impl FileWriter { schema: &Schema, write_options: IpcWriteOptions, ) -> Result { - let mut me = FileWriter::new(writer, schema).with_write_options(write_options); + let mut me = FileWriter::new(writer, schema); + me.write_options = write_options; me.write_header_schema()?; Ok(me) } + /// Creates the FileWriter. pub fn new(writer: W, schema: &Schema) -> Self { FileWriter { writer: BufWriter::new(writer), @@ -421,17 +423,20 @@ impl FileWriter { } } - pub fn with_write_options(mut self, write_options: IpcWriteOptions) -> Self { + /// Set optional write options and return `&Self`. + pub fn with_write_options(&mut self, write_options: IpcWriteOptions) -> &mut Self { self.write_options = write_options; self } - pub fn with_custom_metadata(mut self, metadata: CustomMetaData) -> Self { + /// Set optional custom metadata and return `&Self`. + pub fn with_custom_metadata(&mut self, metadata: CustomMetaData) -> &mut Self { self.custom_metadata = Some(metadata); self } - /// Write header and schema. + /// Write header and schema to the file. + /// Must be called before `write_record_batch`. pub fn write_header_schema(&mut self) -> Result<()> { // write magic to header self.writer.write_all(&super::ARROW_MAGIC[..])?; @@ -548,7 +553,7 @@ pub struct StreamWriter { impl StreamWriter { /// Try create a new writer, with the schema written as part of the header #[deprecated( - since = "2.0.0", + since = "3.0.0", note = "This method is deprecated in favour of `let writer = StreamWriter::new(writer, schema); writer.write_schema().unwrap();`" )] pub fn try_new(writer: W, schema: &Schema) -> Result { @@ -558,7 +563,7 @@ impl StreamWriter { } #[deprecated( - since = "2.0.0", + since = "3.0.0", note = "This method is deprecated in favour of `let writer = StreamWriter::new(writer, schema).with_write_options(opt); w.write_schema().unwrap();`" )] pub fn try_new_with_options( @@ -566,7 +571,8 @@ impl StreamWriter { schema: &Schema, write_options: IpcWriteOptions, ) -> Result { - let mut me = StreamWriter::new(writer, schema).with_write_options(write_options); + let mut me = StreamWriter::new(writer, schema); + me.write_options = write_options; me.write_schema()?; Ok(me) } @@ -583,16 +589,19 @@ impl StreamWriter { } } - pub fn with_write_options(mut self, write_options: IpcWriteOptions) -> Self { + /// Set optional write options and return `&Self`. + pub fn with_write_options(&mut self, write_options: IpcWriteOptions) -> &Self { self.write_options = write_options; self } - pub fn with_custom_metadata(mut self, metadata: CustomMetaData) -> Self { + /// Set optional custom metadata and return `&Self`. + pub fn with_custom_metadata(&mut self, metadata: CustomMetaData) -> &Self { self.custom_metadata = Some(metadata); self } + /// Write schema to the stream. pub fn write_schema(&mut self) -> Result<()> { let encoded_message = self.data_gen.schema_to_bytes( &self.schema,