From c43d11b9d95574c69a846e863a5fecf30c3e73ed Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 3 Dec 2020 14:02:26 -0500 Subject: [PATCH 1/9] Extract generating schema EncodedData to a new struct's method --- rust/arrow-flight/src/utils.rs | 8 ++- rust/arrow/src/ipc/writer.rs | 107 +++++++++++++++++++++---------- rust/parquet/src/arrow/schema.rs | 3 +- 3 files changed, 81 insertions(+), 37 deletions(-) diff --git a/rust/arrow-flight/src/utils.rs b/rust/arrow-flight/src/utils.rs index ee19f34a7c5..bc16692baca 100644 --- a/rust/arrow-flight/src/utils.rs +++ b/rust/arrow-flight/src/utils.rs @@ -67,8 +67,11 @@ pub fn flight_schema_from_arrow_schema( schema: &Schema, options: &IpcWriteOptions, ) -> SchemaResult { + let data_gen = writer::IpcDataGenerator::default(); + let schema_bytes = data_gen.schema_to_bytes(schema, &options); + SchemaResult { - schema: writer::schema_to_bytes(schema, &options).ipc_message, + schema: schema_bytes.ipc_message, } } @@ -88,7 +91,8 @@ pub fn flight_data_from_arrow_schema( schema: &Schema, options: &IpcWriteOptions, ) -> FlightData { - let schema = writer::schema_to_bytes(schema, &options); + let data_gen = writer::IpcDataGenerator::default(); + let schema = data_gen.schema_to_bytes(schema, &options); FlightData { flight_descriptor: None, app_metadata: vec![], diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index d6a52a62c5d..b845b3e4aa0 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -98,6 +98,38 @@ impl Default for IpcWriteOptions { } } +#[derive(Debug, Default)] +pub struct IpcDataGenerator {} + +impl IpcDataGenerator { + pub fn schema_to_bytes( + &self, + schema: &Schema, + write_options: &IpcWriteOptions, + ) -> EncodedData { + let mut fbb = FlatBufferBuilder::new(); + let schema = { + let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema); + fb.as_union_value() + }; + + let mut message = ipc::MessageBuilder::new(&mut fbb); + message.add_version(write_options.metadata_version); + message.add_header_type(ipc::MessageHeader::Schema); + message.add_bodyLength(0); + message.add_header(schema); + // TODO: custom metadata + let data = message.finish(); + fbb.finish(data, None); + + let data = fbb.finished_data(); + EncodedData { + ipc_message: data.to_vec(), + arrow_data: vec![], + } + } +} + pub struct FileWriter { /// The object to write to writer: BufWriter, @@ -115,6 +147,8 @@ pub struct FileWriter { finished: bool, /// Keeps track of dictionaries that have been written last_written_dictionaries: HashMap, + + data_gen: IpcDataGenerator, } impl FileWriter { @@ -130,6 +164,7 @@ impl FileWriter { schema: &Schema, write_options: IpcWriteOptions, ) -> Result { + let data_gen = IpcDataGenerator::default(); let mut writer = BufWriter::new(writer); // write magic to header writer.write_all(&super::ARROW_MAGIC[..])?; @@ -137,7 +172,8 @@ impl FileWriter { writer.write_all(&[0, 0])?; // write the schema, set the written bytes to the schema + header let message = Message::Schema(schema, &write_options); - let (meta, data) = write_message(&mut writer, &message, &write_options)?; + let (meta, data) = + write_message(&mut writer, &message, &write_options, &data_gen)?; Ok(Self { writer, write_options, @@ -147,6 +183,7 @@ impl FileWriter { record_blocks: vec![], finished: false, last_written_dictionaries: HashMap::new(), + data_gen, }) } @@ -159,8 +196,12 @@ impl FileWriter { } self.write_dictionaries(&batch)?; let message = Message::RecordBatch(batch, &self.write_options); - let (meta, data) = - write_message(&mut self.writer, &message, &self.write_options)?; + let (meta, data) = write_message( + &mut self.writer, + &message, + &self.write_options, + &self.data_gen, + )?; // add a record block for the footer let block = ipc::Block::new( self.block_offsets as i64, @@ -207,8 +248,12 @@ impl FileWriter { let message = Message::DictionaryBatch(dict_id, dict_values, &self.write_options); - let (meta, data) = - write_message(&mut self.writer, &message, &self.write_options)?; + let (meta, data) = write_message( + &mut self.writer, + &message, + &self.write_options, + &self.data_gen, + )?; let block = ipc::Block::new(self.block_offsets as i64, meta as i32, data as i64); @@ -270,6 +315,8 @@ pub struct StreamWriter { finished: bool, /// Keeps track of dictionaries that have been written last_written_dictionaries: HashMap, + + data_gen: IpcDataGenerator, } impl StreamWriter { @@ -284,16 +331,18 @@ impl StreamWriter { schema: &Schema, write_options: IpcWriteOptions, ) -> Result { + let data_gen = IpcDataGenerator::default(); let mut writer = BufWriter::new(writer); // write the schema, set the written bytes to the schema let message = Message::Schema(schema, &write_options); - write_message(&mut writer, &message, &write_options)?; + write_message(&mut writer, &message, &write_options, &data_gen)?; Ok(Self { writer, write_options, schema: schema.clone(), finished: false, last_written_dictionaries: HashMap::new(), + data_gen, }) } @@ -307,7 +356,12 @@ impl StreamWriter { self.write_dictionaries(&batch)?; let message = Message::RecordBatch(batch, &self.write_options); - write_message(&mut self.writer, &message, &self.write_options)?; + write_message( + &mut self.writer, + &message, + &self.write_options, + &self.data_gen, + )?; Ok(()) } @@ -341,7 +395,12 @@ impl StreamWriter { let message = Message::DictionaryBatch(dict_id, dict_values, &self.write_options); - write_message(&mut self.writer, &message, &self.write_options)?; + write_message( + &mut self.writer, + &message, + &self.write_options, + &self.data_gen, + )?; } } Ok(()) @@ -374,29 +433,6 @@ pub struct EncodedData { pub arrow_data: Vec, } -pub fn schema_to_bytes(schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData { - let mut fbb = FlatBufferBuilder::new(); - let schema = { - let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema); - fb.as_union_value() - }; - - let mut message = ipc::MessageBuilder::new(&mut fbb); - message.add_version(write_options.metadata_version); - message.add_header_type(ipc::MessageHeader::Schema); - message.add_bodyLength(0); - message.add_header(schema); - // TODO: custom metadata - let data = message.finish(); - fbb.finish(data, None); - - let data = fbb.finished_data(); - EncodedData { - ipc_message: data.to_vec(), - arrow_data: vec![], - } -} - enum Message<'a> { Schema(&'a Schema, &'a IpcWriteOptions), RecordBatch(&'a RecordBatch, &'a IpcWriteOptions), @@ -405,9 +441,11 @@ enum Message<'a> { impl<'a> Message<'a> { /// Encode message to a ipc::Message and return data as bytes - fn encode(&'a self) -> EncodedData { + fn encode(&'a self, data_gen: &IpcDataGenerator) -> EncodedData { match self { - Message::Schema(schema, options) => schema_to_bytes(*schema, *options), + Message::Schema(schema, options) => { + data_gen.schema_to_bytes(*schema, *options) + } Message::RecordBatch(batch, options) => { record_batch_to_bytes(*batch, *options) } @@ -423,8 +461,9 @@ fn write_message( mut writer: &mut BufWriter, message: &Message, write_options: &IpcWriteOptions, + data_gen: &IpcDataGenerator, ) -> Result<(usize, usize)> { - let encoded = message.encode(); + let encoded = message.encode(data_gen); let arrow_data_len = encoded.arrow_data.len(); if arrow_data_len % 8 != 0 { return Err(ArrowError::MemoryError( diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs index c93325b79b1..0c04704ae0f 100644 --- a/rust/parquet/src/arrow/schema.rs +++ b/rust/parquet/src/arrow/schema.rs @@ -205,7 +205,8 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option { /// Encodes the Arrow schema into the IPC format, and base64 encodes it fn encode_arrow_schema(schema: &Schema) -> String { let options = writer::IpcWriteOptions::default(); - let mut serialized_schema = arrow::ipc::writer::schema_to_bytes(&schema, &options); + let data_gen = arrow::ipc::writer::IpcDataGenerator::default(); + let mut serialized_schema = data_gen.schema_to_bytes(&schema, &options); // manually prepending the length to the schema as arrow uses the legacy IPC format // TODO: change after addressing ARROW-9777 From 3c0ae2b7f01b140a27c8152725af4a9bec7aca4a Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 3 Dec 2020 14:07:06 -0500 Subject: [PATCH 2/9] Move record_batch_to_bytes to the new object --- rust/arrow-flight/src/utils.rs | 3 +- rust/arrow/src/ipc/writer.rs | 108 +++++++++++++++++---------------- 2 files changed, 57 insertions(+), 54 deletions(-) diff --git a/rust/arrow-flight/src/utils.rs b/rust/arrow-flight/src/utils.rs index bc16692baca..77e4092eb64 100644 --- a/rust/arrow-flight/src/utils.rs +++ b/rust/arrow-flight/src/utils.rs @@ -42,7 +42,8 @@ pub fn flight_data_from_arrow_batch( batch: &RecordBatch, options: &IpcWriteOptions, ) -> FlightData { - let data = writer::record_batch_to_bytes(batch, &options); + let data_gen = writer::IpcDataGenerator::default(); + let data = data_gen.record_batch_to_bytes(batch, &options); FlightData { flight_descriptor: None, app_metadata: vec![], diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index b845b3e4aa0..7f0e866122f 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -128,6 +128,60 @@ impl IpcDataGenerator { arrow_data: vec![], } } + + /// Write a `RecordBatch` into two sets of bytes, one for the header (ipc::Message) and the + /// other for the batch's data + pub fn record_batch_to_bytes( + &self, + batch: &RecordBatch, + write_options: &IpcWriteOptions, + ) -> EncodedData { + let mut fbb = FlatBufferBuilder::new(); + + let mut nodes: Vec = vec![]; + let mut buffers: Vec = vec![]; + let mut arrow_data: Vec = vec![]; + let mut offset = 0; + for array in batch.columns() { + let array_data = array.data(); + offset = write_array_data( + &array_data, + &mut buffers, + &mut arrow_data, + &mut nodes, + offset, + array.len(), + array.null_count(), + ); + } + + // write data + let buffers = fbb.create_vector(&buffers); + let nodes = fbb.create_vector(&nodes); + + let root = { + let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); + batch_builder.add_length(batch.num_rows() as i64); + batch_builder.add_nodes(nodes); + batch_builder.add_buffers(buffers); + let b = batch_builder.finish(); + b.as_union_value() + }; + // create an ipc::Message + let mut message = ipc::MessageBuilder::new(&mut fbb); + message.add_version(write_options.metadata_version); + message.add_header_type(ipc::MessageHeader::RecordBatch); + message.add_bodyLength(arrow_data.len() as i64); + message.add_header(root); + let root = message.finish(); + fbb.finish(root, None); + let finished_data = fbb.finished_data(); + + EncodedData { + ipc_message: finished_data.to_vec(), + arrow_data, + } + } } pub struct FileWriter { @@ -447,7 +501,7 @@ impl<'a> Message<'a> { data_gen.schema_to_bytes(*schema, *options) } Message::RecordBatch(batch, options) => { - record_batch_to_bytes(*batch, *options) + data_gen.record_batch_to_bytes(*batch, *options) } Message::DictionaryBatch(dict_id, array_data, options) => { dictionary_batch_to_bytes(*dict_id, *array_data, *options) @@ -520,58 +574,6 @@ fn write_body_buffers(writer: &mut BufWriter, data: &[u8]) -> Resul Ok(total_len as usize) } -/// Write a `RecordBatch` into a tuple of bytes, one for the header (ipc::Message) and the other for the batch's data -pub fn record_batch_to_bytes( - batch: &RecordBatch, - write_options: &IpcWriteOptions, -) -> EncodedData { - let mut fbb = FlatBufferBuilder::new(); - - let mut nodes: Vec = vec![]; - let mut buffers: Vec = vec![]; - let mut arrow_data: Vec = vec![]; - let mut offset = 0; - for array in batch.columns() { - let array_data = array.data(); - offset = write_array_data( - &array_data, - &mut buffers, - &mut arrow_data, - &mut nodes, - offset, - array.len(), - array.null_count(), - ); - } - - // write data - let buffers = fbb.create_vector(&buffers); - let nodes = fbb.create_vector(&nodes); - - let root = { - let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); - batch_builder.add_length(batch.num_rows() as i64); - batch_builder.add_nodes(nodes); - batch_builder.add_buffers(buffers); - let b = batch_builder.finish(); - b.as_union_value() - }; - // create an ipc::Message - let mut message = ipc::MessageBuilder::new(&mut fbb); - message.add_version(write_options.metadata_version); - message.add_header_type(ipc::MessageHeader::RecordBatch); - message.add_bodyLength(arrow_data.len() as i64); - message.add_header(root); - let root = message.finish(); - fbb.finish(root, None); - let finished_data = fbb.finished_data(); - - EncodedData { - ipc_message: finished_data.to_vec(), - arrow_data, - } -} - /// Write dictionary values into a tuple of bytes, one for the header (ipc::Message) and the other for the data pub fn dictionary_batch_to_bytes( dict_id: i64, From 6493493c175576defc8f163b26b5c98dce7a7703 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 3 Dec 2020 14:11:19 -0500 Subject: [PATCH 3/9] Extract dictionary_batch_to_bytes to the new struct --- rust/arrow/src/ipc/writer.rs | 122 ++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 60 deletions(-) diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 7f0e866122f..41e04c5d43b 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -182,6 +182,67 @@ impl IpcDataGenerator { arrow_data, } } + + /// Write dictionary values into two sets of bytes, one for the header (ipc::Message) and the + /// other for the data + pub fn dictionary_batch_to_bytes( + &self, + dict_id: i64, + array_data: &ArrayDataRef, + write_options: &IpcWriteOptions, + ) -> EncodedData { + let mut fbb = FlatBufferBuilder::new(); + + let mut nodes: Vec = vec![]; + let mut buffers: Vec = vec![]; + let mut arrow_data: Vec = vec![]; + + write_array_data( + &array_data, + &mut buffers, + &mut arrow_data, + &mut nodes, + 0, + array_data.len(), + array_data.null_count(), + ); + + // write data + let buffers = fbb.create_vector(&buffers); + let nodes = fbb.create_vector(&nodes); + + let root = { + let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); + batch_builder.add_length(array_data.len() as i64); + batch_builder.add_nodes(nodes); + batch_builder.add_buffers(buffers); + batch_builder.finish() + }; + + let root = { + let mut batch_builder = ipc::DictionaryBatchBuilder::new(&mut fbb); + batch_builder.add_id(dict_id); + batch_builder.add_data(root); + batch_builder.finish().as_union_value() + }; + + let root = { + let mut message_builder = ipc::MessageBuilder::new(&mut fbb); + message_builder.add_version(write_options.metadata_version); + message_builder.add_header_type(ipc::MessageHeader::DictionaryBatch); + message_builder.add_bodyLength(arrow_data.len() as i64); + message_builder.add_header(root); + message_builder.finish() + }; + + fbb.finish(root, None); + let finished_data = fbb.finished_data(); + + EncodedData { + ipc_message: finished_data.to_vec(), + arrow_data, + } + } } pub struct FileWriter { @@ -504,7 +565,7 @@ impl<'a> Message<'a> { data_gen.record_batch_to_bytes(*batch, *options) } Message::DictionaryBatch(dict_id, array_data, options) => { - dictionary_batch_to_bytes(*dict_id, *array_data, *options) + data_gen.dictionary_batch_to_bytes(*dict_id, *array_data, *options) } } } @@ -574,65 +635,6 @@ fn write_body_buffers(writer: &mut BufWriter, data: &[u8]) -> Resul Ok(total_len as usize) } -/// Write dictionary values into a tuple of bytes, one for the header (ipc::Message) and the other for the data -pub fn dictionary_batch_to_bytes( - dict_id: i64, - array_data: &ArrayDataRef, - write_options: &IpcWriteOptions, -) -> EncodedData { - let mut fbb = FlatBufferBuilder::new(); - - let mut nodes: Vec = vec![]; - let mut buffers: Vec = vec![]; - let mut arrow_data: Vec = vec![]; - - write_array_data( - &array_data, - &mut buffers, - &mut arrow_data, - &mut nodes, - 0, - array_data.len(), - array_data.null_count(), - ); - - // write data - let buffers = fbb.create_vector(&buffers); - let nodes = fbb.create_vector(&nodes); - - let root = { - let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); - batch_builder.add_length(array_data.len() as i64); - batch_builder.add_nodes(nodes); - batch_builder.add_buffers(buffers); - batch_builder.finish() - }; - - let root = { - let mut batch_builder = ipc::DictionaryBatchBuilder::new(&mut fbb); - batch_builder.add_id(dict_id); - batch_builder.add_data(root); - batch_builder.finish().as_union_value() - }; - - let root = { - let mut message_builder = ipc::MessageBuilder::new(&mut fbb); - message_builder.add_version(write_options.metadata_version); - message_builder.add_header_type(ipc::MessageHeader::DictionaryBatch); - message_builder.add_bodyLength(arrow_data.len() as i64); - message_builder.add_header(root); - message_builder.finish() - }; - - fbb.finish(root, None); - let finished_data = fbb.finished_data(); - - EncodedData { - ipc_message: finished_data.to_vec(), - arrow_data, - } -} - /// Write a record batch to the writer, writing the message size before the message /// if the record batch is being written to a stream fn write_continuation( From 46bdd64302ae6d570f33d982cd1dd523fdab2c8e Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 3 Dec 2020 14:21:36 -0500 Subject: [PATCH 4/9] Move EncodedData generation out of write_message --- rust/arrow/src/ipc/writer.rs | 60 ++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 34 deletions(-) diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 41e04c5d43b..eef7d56234b 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -286,9 +286,8 @@ impl FileWriter { // create an 8-byte boundary after the header writer.write_all(&[0, 0])?; // write the schema, set the written bytes to the schema + header - let message = Message::Schema(schema, &write_options); - let (meta, data) = - write_message(&mut writer, &message, &write_options, &data_gen)?; + let encoded_message = data_gen.schema_to_bytes(schema, &write_options); + let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?; Ok(Self { writer, write_options, @@ -310,13 +309,11 @@ impl FileWriter { )); } self.write_dictionaries(&batch)?; - let message = Message::RecordBatch(batch, &self.write_options); - let (meta, data) = write_message( - &mut self.writer, - &message, - &self.write_options, - &self.data_gen, - )?; + let encoded_message = self + .data_gen + .record_batch_to_bytes(batch, &self.write_options); + let (meta, data) = + write_message(&mut self.writer, encoded_message, &self.write_options)?; // add a record block for the footer let block = ipc::Block::new( self.block_offsets as i64, @@ -360,14 +357,16 @@ impl FileWriter { self.last_written_dictionaries .insert(dict_id, column.clone()); - let message = - Message::DictionaryBatch(dict_id, dict_values, &self.write_options); + let encoded_message = self.data_gen.dictionary_batch_to_bytes( + dict_id, + dict_values, + &self.write_options, + ); let (meta, data) = write_message( &mut self.writer, - &message, + encoded_message, &self.write_options, - &self.data_gen, )?; let block = @@ -449,8 +448,8 @@ impl StreamWriter { let data_gen = IpcDataGenerator::default(); let mut writer = BufWriter::new(writer); // write the schema, set the written bytes to the schema - let message = Message::Schema(schema, &write_options); - write_message(&mut writer, &message, &write_options, &data_gen)?; + let encoded_message = data_gen.schema_to_bytes(schema, &write_options); + write_message(&mut writer, encoded_message, &write_options)?; Ok(Self { writer, write_options, @@ -470,13 +469,10 @@ impl StreamWriter { } self.write_dictionaries(&batch)?; - let message = Message::RecordBatch(batch, &self.write_options); - write_message( - &mut self.writer, - &message, - &self.write_options, - &self.data_gen, - )?; + let encoded_message = self + .data_gen + .record_batch_to_bytes(batch, &self.write_options); + write_message(&mut self.writer, encoded_message, &self.write_options)?; Ok(()) } @@ -507,15 +503,13 @@ impl StreamWriter { self.last_written_dictionaries .insert(dict_id, column.clone()); - let message = - Message::DictionaryBatch(dict_id, dict_values, &self.write_options); - - write_message( - &mut self.writer, - &message, + let encoded_message = self.data_gen.dictionary_batch_to_bytes( + dict_id, + dict_values, &self.write_options, - &self.data_gen, - )?; + ); + + write_message(&mut self.writer, encoded_message, &self.write_options)?; } } Ok(()) @@ -574,11 +568,9 @@ impl<'a> Message<'a> { /// Write a message's IPC data and buffers, returning metadata and buffer data lengths written fn write_message( mut writer: &mut BufWriter, - message: &Message, + encoded: EncodedData, write_options: &IpcWriteOptions, - data_gen: &IpcDataGenerator, ) -> Result<(usize, usize)> { - let encoded = message.encode(data_gen); let arrow_data_len = encoded.arrow_data.len(); if arrow_data_len % 8 != 0 { return Err(ArrowError::MemoryError( From b8a989b2d4c3eaa89bc6fe9adbf3159f69f7562d Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 3 Dec 2020 14:32:18 -0500 Subject: [PATCH 5/9] Remove the now-unused intermediate Message enum --- rust/arrow/src/ipc/writer.rs | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index eef7d56234b..1408d5a75c8 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -542,29 +542,6 @@ pub struct EncodedData { pub arrow_data: Vec, } -enum Message<'a> { - Schema(&'a Schema, &'a IpcWriteOptions), - RecordBatch(&'a RecordBatch, &'a IpcWriteOptions), - DictionaryBatch(i64, &'a ArrayDataRef, &'a IpcWriteOptions), -} - -impl<'a> Message<'a> { - /// Encode message to a ipc::Message and return data as bytes - fn encode(&'a self, data_gen: &IpcDataGenerator) -> EncodedData { - match self { - Message::Schema(schema, options) => { - data_gen.schema_to_bytes(*schema, *options) - } - Message::RecordBatch(batch, options) => { - data_gen.record_batch_to_bytes(*batch, *options) - } - Message::DictionaryBatch(dict_id, array_data, options) => { - data_gen.dictionary_batch_to_bytes(*dict_id, *array_data, *options) - } - } - } -} - /// Write a message's IPC data and buffers, returning metadata and buffer data lengths written fn write_message( mut writer: &mut BufWriter, From f51c6a50a8daae6a2760709589e1f72fcdf49230 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 3 Dec 2020 15:51:10 -0500 Subject: [PATCH 6/9] Extract a DictionaryTracker for logic of whether replacement is an error --- rust/arrow/src/ipc/writer.rs | 148 ++++++++++++++++++++++------------- 1 file changed, 92 insertions(+), 56 deletions(-) diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 1408d5a75c8..9187fe3ff62 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -245,6 +245,55 @@ impl IpcDataGenerator { } } +/// Keeps track of dictionaries that have been written, to avoid emitting the same dictionary +/// multiple times. Can optionally error if an update to an existing dictionary is attempted, which +/// isn't allowed in the `FileWriter`. +pub struct DictionaryTracker { + written: HashMap, + error_on_replacement: bool, +} + +impl DictionaryTracker { + pub fn new(error_on_replacement: bool) -> Self { + Self { + written: HashMap::new(), + error_on_replacement, + } + } + + /// Keep track of the dictionary with the given ID and values. Behavior: + /// + /// * If this ID has been written already and has the same data, return `Ok(false)` to indicate + /// that the dictionary was not actually inserted (because it's already been seen). + /// * If this ID has been written already but with different data, and this tracker is + /// configured to return an error, return an error. + /// * If the tracker has not been configured to error on replacement or this dictionary + /// has never been seen before, return `Ok(true)` to indicate that the dictionary was just + /// inserted. + pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result { + let dict_data = column.data(); + let dict_values = &dict_data.child_data()[0]; + + // If a dictionary with this id was already emitted, check if it was the same. + if let Some(last) = self.written.get(&dict_id) { + if last.data().child_data()[0] == *dict_values { + // Same dictionary values => no need to emit it again + return Ok(false); + } else if self.error_on_replacement { + return Err(ArrowError::InvalidArgumentError( + "Dictionary replacement detected when writing IPC file format. \ + Arrow IPC files only support a single dictionary for a given field \ + across all batches." + .to_string(), + )); + } + } + + self.written.insert(dict_id, column.clone()); + Ok(true) + } +} + pub struct FileWriter { /// The object to write to writer: BufWriter, @@ -261,7 +310,7 @@ pub struct FileWriter { /// Whether the writer footer has been written, and the writer is finished finished: bool, /// Keeps track of dictionaries that have been written - last_written_dictionaries: HashMap, + dictionary_tracker: DictionaryTracker, data_gen: IpcDataGenerator, } @@ -296,7 +345,7 @@ impl FileWriter { dictionary_blocks: vec![], record_blocks: vec![], finished: false, - last_written_dictionaries: HashMap::new(), + dictionary_tracker: DictionaryTracker::new(true), data_gen, }) } @@ -339,40 +388,29 @@ impl FileWriter { let dict_data = column.data(); let dict_values = &dict_data.child_data()[0]; - // If a dictionary with this id was already emitted, check if it was the same. - if let Some(last_dictionary) = - self.last_written_dictionaries.get(&dict_id) - { - if last_dictionary.data().child_data()[0] == *dict_values { - // Same dictionary values => no need to emit it again - continue; - } else { - return Err(ArrowError::InvalidArgumentError( - "Dictionary replacement detected when writing IPC file format. \ - Arrow IPC files only support a single dictionary for a given field \ - across all batches.".to_string())); - } + let emit = self.dictionary_tracker.insert(dict_id, column)?; + + if emit { + let encoded_message = self.data_gen.dictionary_batch_to_bytes( + dict_id, + dict_values, + &self.write_options, + ); + + let (meta, data) = write_message( + &mut self.writer, + encoded_message, + &self.write_options, + )?; + + let block = ipc::Block::new( + self.block_offsets as i64, + meta as i32, + data as i64, + ); + self.dictionary_blocks.push(block); + self.block_offsets += meta + data; } - - self.last_written_dictionaries - .insert(dict_id, column.clone()); - - let encoded_message = self.data_gen.dictionary_batch_to_bytes( - dict_id, - dict_values, - &self.write_options, - ); - - let (meta, data) = write_message( - &mut self.writer, - encoded_message, - &self.write_options, - )?; - - let block = - ipc::Block::new(self.block_offsets as i64, meta as i32, data as i64); - self.dictionary_blocks.push(block); - self.block_offsets += meta + data; } } Ok(()) @@ -428,7 +466,7 @@ pub struct StreamWriter { /// Whether the writer footer has been written, and the writer is finished finished: bool, /// Keeps track of dictionaries that have been written - last_written_dictionaries: HashMap, + dictionary_tracker: DictionaryTracker, data_gen: IpcDataGenerator, } @@ -455,7 +493,7 @@ impl StreamWriter { write_options, schema: schema.clone(), finished: false, - last_written_dictionaries: HashMap::new(), + dictionary_tracker: DictionaryTracker::new(false), data_gen, }) } @@ -490,26 +528,24 @@ impl StreamWriter { let dict_data = column.data(); let dict_values = &dict_data.child_data()[0]; - // If a dictionary with this id was already emitted, check if it was the same. - if let Some(last_dictionary) = - self.last_written_dictionaries.get(&dict_id) - { - if last_dictionary.data().child_data()[0] == *dict_values { - // Same dictionary values => no need to emit it again - continue; - } + let emit = self + .dictionary_tracker + .insert(dict_id, column) + .expect("StreamWriter is configured to not error on replacement"); + + if emit { + let encoded_message = self.data_gen.dictionary_batch_to_bytes( + dict_id, + dict_values, + &self.write_options, + ); + + write_message( + &mut self.writer, + encoded_message, + &self.write_options, + )?; } - - self.last_written_dictionaries - .insert(dict_id, column.clone()); - - let encoded_message = self.data_gen.dictionary_batch_to_bytes( - dict_id, - dict_values, - &self.write_options, - ); - - write_message(&mut self.writer, encoded_message, &self.write_options)?; } } Ok(()) From 5eab7ddf056f3e23beec1fc1fe6b764a9181afa7 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 3 Dec 2020 14:54:04 -0500 Subject: [PATCH 7/9] Extract shared dictionary code to the new struct --- rust/arrow/src/ipc/writer.rs | 146 +++++++++++++++-------------------- 1 file changed, 61 insertions(+), 85 deletions(-) diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 9187fe3ff62..f28b1cf6994 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -129,6 +129,43 @@ impl IpcDataGenerator { } } + pub fn encoded_batch( + &self, + batch: &RecordBatch, + dictionary_tracker: &mut DictionaryTracker, + write_options: &IpcWriteOptions, + ) -> Result<(Vec, EncodedData)> { + // TODO: handle nested dictionaries + let schema = batch.schema(); + let mut encoded_dictionaries = Vec::with_capacity(schema.fields().len()); + + for (i, field) in schema.fields().iter().enumerate() { + let column = batch.column(i); + + if let DataType::Dictionary(_key_type, _value_type) = column.data_type() { + let dict_id = field + .dict_id() + .expect("All Dictionary types have `dict_id`"); + let dict_data = column.data(); + let dict_values = &dict_data.child_data()[0]; + + let emit = dictionary_tracker.insert(dict_id, column)?; + + if emit { + encoded_dictionaries.push(self.dictionary_batch_to_bytes( + dict_id, + dict_values, + write_options, + )); + } + } + } + + let encoded_message = self.record_batch_to_bytes(batch, write_options); + + Ok((encoded_dictionaries, encoded_message)) + } + /// Write a `RecordBatch` into two sets of bytes, one for the header (ipc::Message) and the /// other for the batch's data pub fn record_batch_to_bytes( @@ -357,10 +394,23 @@ impl FileWriter { "Cannot write record batch to file writer as it is closed".to_string(), )); } - self.write_dictionaries(&batch)?; - let encoded_message = self - .data_gen - .record_batch_to_bytes(batch, &self.write_options); + + let (encoded_dictionaries, encoded_message) = self.data_gen.encoded_batch( + batch, + &mut self.dictionary_tracker, + &self.write_options, + )?; + + for encoded_dictionary in encoded_dictionaries { + let (meta, data) = + write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; + + let block = + ipc::Block::new(self.block_offsets as i64, meta as i32, data as i64); + self.dictionary_blocks.push(block); + self.block_offsets += meta + data; + } + let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?; // add a record block for the footer @@ -374,48 +424,6 @@ impl FileWriter { Ok(()) } - fn write_dictionaries(&mut self, batch: &RecordBatch) -> Result<()> { - // TODO: handle nested dictionaries - - let schema = batch.schema(); - for (i, field) in schema.fields().iter().enumerate() { - let column = batch.column(i); - - if let DataType::Dictionary(_key_type, _value_type) = column.data_type() { - let dict_id = field - .dict_id() - .expect("All Dictionary types have `dict_id`"); - let dict_data = column.data(); - let dict_values = &dict_data.child_data()[0]; - - let emit = self.dictionary_tracker.insert(dict_id, column)?; - - if emit { - let encoded_message = self.data_gen.dictionary_batch_to_bytes( - dict_id, - dict_values, - &self.write_options, - ); - - let (meta, data) = write_message( - &mut self.writer, - encoded_message, - &self.write_options, - )?; - - let block = ipc::Block::new( - self.block_offsets as i64, - meta as i32, - data as i64, - ); - self.dictionary_blocks.push(block); - self.block_offsets += meta + data; - } - } - } - Ok(()) - } - /// Write footer and closing tag, then mark the writer as done pub fn finish(&mut self) -> Result<()> { // write EOS @@ -505,49 +513,17 @@ impl StreamWriter { "Cannot write record batch to stream writer as it is closed".to_string(), )); } - self.write_dictionaries(&batch)?; - let encoded_message = self + let (encoded_dictionaries, encoded_message) = self .data_gen - .record_batch_to_bytes(batch, &self.write_options); - write_message(&mut self.writer, encoded_message, &self.write_options)?; - Ok(()) - } - - fn write_dictionaries(&mut self, batch: &RecordBatch) -> Result<()> { - // TODO: handle nested dictionaries - - let schema = batch.schema(); - for (i, field) in schema.fields().iter().enumerate() { - let column = batch.column(i); - - if let DataType::Dictionary(_key_type, _value_type) = column.data_type() { - let dict_id = field - .dict_id() - .expect("All Dictionary types have `dict_id`"); - let dict_data = column.data(); - let dict_values = &dict_data.child_data()[0]; - - let emit = self - .dictionary_tracker - .insert(dict_id, column) - .expect("StreamWriter is configured to not error on replacement"); + .encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options) + .expect("StreamWriter is configured to not error on dictionary replacement"); - if emit { - let encoded_message = self.data_gen.dictionary_batch_to_bytes( - dict_id, - dict_values, - &self.write_options, - ); - - write_message( - &mut self.writer, - encoded_message, - &self.write_options, - )?; - } - } + for encoded_dictionary in encoded_dictionaries { + write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; } + + write_message(&mut self.writer, encoded_message, &self.write_options)?; Ok(()) } From afc50dc0a90e9170ca906e41d0e60b602c6eccb1 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 3 Dec 2020 16:32:00 -0500 Subject: [PATCH 8/9] Always create FlightData for dictionaries when given a RecordBatch --- rust/arrow-flight/src/utils.rs | 40 ++++++++++++++--------- rust/arrow/src/ipc/writer.rs | 4 +-- rust/datafusion/examples/flight_server.rs | 6 +++- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/rust/arrow-flight/src/utils.rs b/rust/arrow-flight/src/utils.rs index 77e4092eb64..c2fcfa92884 100644 --- a/rust/arrow-flight/src/utils.rs +++ b/rust/arrow-flight/src/utils.rs @@ -26,30 +26,40 @@ use arrow::error::{ArrowError, Result}; use arrow::ipc::{convert, reader, writer, writer::IpcWriteOptions}; use arrow::record_batch::RecordBatch; -/// Convert a `RecordBatch` to `FlightData` by converting the header and body to bytes +/// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries +/// and values. This can't be a `From` implementation because neither `RecordBatch` nor `Vec` are +/// implemented in this crate. /// /// Note: This implicitly uses the default `IpcWriteOptions`. To configure options, /// use `flight_data_from_arrow_batch()` -impl From<&RecordBatch> for FlightData { - fn from(batch: &RecordBatch) -> Self { - let options = IpcWriteOptions::default(); - flight_data_from_arrow_batch(batch, &options) - } +pub fn convert_to_flight_data(batch: &RecordBatch) -> Vec { + let options = IpcWriteOptions::default(); + flight_data_from_arrow_batch(batch, &options) } -/// Convert a `RecordBatch` to `FlightData` by converting the header and body to bytes +/// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries +/// and values pub fn flight_data_from_arrow_batch( batch: &RecordBatch, options: &IpcWriteOptions, -) -> FlightData { +) -> Vec { let data_gen = writer::IpcDataGenerator::default(); - let data = data_gen.record_batch_to_bytes(batch, &options); - FlightData { - flight_descriptor: None, - app_metadata: vec![], - data_header: data.ipc_message, - data_body: data.arrow_data, - } + let mut dictionary_tracker = writer::DictionaryTracker::new(false); + + let (encoded_dictionaries, encoded_batch) = data_gen + .encoded_batch(batch, &mut dictionary_tracker, &options) + .expect("DictionaryTracker configured above to not error on replacement"); + + encoded_dictionaries + .into_iter() + .chain(std::iter::once(encoded_batch)) + .map(|data| FlightData { + flight_descriptor: None, + app_metadata: vec![], + data_header: data.ipc_message, + data_body: data.arrow_data, + }) + .collect() } /// Convert a `Schema` to `SchemaResult` by converting to an IPC message diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index f28b1cf6994..361edb8f510 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -168,7 +168,7 @@ impl IpcDataGenerator { /// Write a `RecordBatch` into two sets of bytes, one for the header (ipc::Message) and the /// other for the batch's data - pub fn record_batch_to_bytes( + fn record_batch_to_bytes( &self, batch: &RecordBatch, write_options: &IpcWriteOptions, @@ -222,7 +222,7 @@ impl IpcDataGenerator { /// Write dictionary values into two sets of bytes, one for the header (ipc::Message) and the /// other for the data - pub fn dictionary_batch_to_bytes( + fn dictionary_batch_to_bytes( &self, dict_id: i64, array_data: &ArrayDataRef, diff --git a/rust/datafusion/examples/flight_server.rs b/rust/datafusion/examples/flight_server.rs index a601b7cafdd..a5e4aee6017 100644 --- a/rust/datafusion/examples/flight_server.rs +++ b/rust/datafusion/examples/flight_server.rs @@ -114,7 +114,11 @@ impl FlightService for FlightServiceImpl { let mut batches: Vec> = results .iter() - .map(|batch| Ok(FlightData::from(batch))) + .flat_map(|batch| { + let flight_data = + arrow_flight::utils::convert_to_flight_data(batch); + flight_data.into_iter().map(Ok) + }) .collect(); // append batch vector to schema vector, so that the first message sent is the schema From 78a913a901dfb4936e75700f216cf9a1ec0acab2 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 7 Dec 2020 09:58:52 -0500 Subject: [PATCH 9/9] Remove functions that implicitly use default IpcWriteOptions --- rust/arrow-flight/src/utils.rs | 33 ----------------------- rust/datafusion/examples/flight_server.rs | 21 ++++++++++++--- 2 files changed, 18 insertions(+), 36 deletions(-) diff --git a/rust/arrow-flight/src/utils.rs b/rust/arrow-flight/src/utils.rs index c2fcfa92884..995aa18bc43 100644 --- a/rust/arrow-flight/src/utils.rs +++ b/rust/arrow-flight/src/utils.rs @@ -26,17 +26,6 @@ use arrow::error::{ArrowError, Result}; use arrow::ipc::{convert, reader, writer, writer::IpcWriteOptions}; use arrow::record_batch::RecordBatch; -/// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries -/// and values. This can't be a `From` implementation because neither `RecordBatch` nor `Vec` are -/// implemented in this crate. -/// -/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options, -/// use `flight_data_from_arrow_batch()` -pub fn convert_to_flight_data(batch: &RecordBatch) -> Vec { - let options = IpcWriteOptions::default(); - flight_data_from_arrow_batch(batch, &options) -} - /// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries /// and values pub fn flight_data_from_arrow_batch( @@ -62,17 +51,6 @@ pub fn flight_data_from_arrow_batch( .collect() } -/// Convert a `Schema` to `SchemaResult` by converting to an IPC message -/// -/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options, -/// use `flight_schema_from_arrow_schema()` -impl From<&Schema> for SchemaResult { - fn from(schema: &Schema) -> Self { - let options = IpcWriteOptions::default(); - flight_schema_from_arrow_schema(schema, &options) - } -} - /// Convert a `Schema` to `SchemaResult` by converting to an IPC message pub fn flight_schema_from_arrow_schema( schema: &Schema, @@ -86,17 +64,6 @@ pub fn flight_schema_from_arrow_schema( } } -/// Convert a `Schema` to `FlightData` by converting to an IPC message -/// -/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options, -/// use `flight_data_from_arrow_schema()` -impl From<&Schema> for FlightData { - fn from(schema: &Schema) -> Self { - let options = writer::IpcWriteOptions::default(); - flight_data_from_arrow_schema(schema, &options) - } -} - /// Convert a `Schema` to `FlightData` by converting to an IPC message pub fn flight_data_from_arrow_schema( schema: &Schema, diff --git a/rust/datafusion/examples/flight_server.rs b/rust/datafusion/examples/flight_server.rs index a5e4aee6017..d835ab050d6 100644 --- a/rust/datafusion/examples/flight_server.rs +++ b/rust/datafusion/examples/flight_server.rs @@ -66,7 +66,13 @@ impl FlightService for FlightServiceImpl { let table = ParquetTable::try_new(&request.path[0]).unwrap(); - Ok(Response::new(SchemaResult::from(table.schema().as_ref()))) + let options = arrow::ipc::writer::IpcWriteOptions::default(); + let schema_result = arrow_flight::utils::flight_schema_from_arrow_schema( + table.schema().as_ref(), + &options, + ); + + Ok(Response::new(schema_result)) } async fn do_get( @@ -108,15 +114,24 @@ impl FlightService for FlightServiceImpl { } // add an initial FlightData message that sends schema + let options = arrow::ipc::writer::IpcWriteOptions::default(); let schema = plan.schema(); + let schema_flight_data = + arrow_flight::utils::flight_data_from_arrow_schema( + schema.as_ref(), + &options, + ); + let mut flights: Vec> = - vec![Ok(FlightData::from(schema.as_ref()))]; + vec![Ok(schema_flight_data)]; let mut batches: Vec> = results .iter() .flat_map(|batch| { let flight_data = - arrow_flight::utils::convert_to_flight_data(batch); + arrow_flight::utils::flight_data_from_arrow_batch( + batch, &options, + ); flight_data.into_iter().map(Ok) }) .collect();