From cb82160db1b22bca55db86b99f29d2d40d4b362a Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 25 Jan 2020 23:59:30 +0200 Subject: [PATCH 1/3] remove redundant header_bytes --- rust/arrow/src/ipc/writer.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 0800bd60639..9b8ad5553a7 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -36,8 +36,6 @@ pub struct FileWriter { writer: BufWriter, /// A reference to the schema, used in validating record batches schema: Schema, - /// The number of bytes written for the header (up to schema) - header_bytes: usize, /// The number of bytes between each block of bytes, as an offset for random access block_offsets: usize, /// Dictionary blocks that will be written as part of the IPC footer @@ -61,7 +59,6 @@ impl FileWriter { Ok(Self { writer, schema: schema.clone(), - header_bytes: written, block_offsets: written, dictionary_blocks: vec![], record_blocks: vec![], From 0f90b3e57f524236caf642498c21e040941cf205 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sun, 26 Jan 2020 01:40:25 +0200 Subject: [PATCH 2/3] ARROW-7475: [Rust] Arrow IPC Stream writer This adds a stream writer on top of the file writer. The file writer should be merged first. --- rust/arrow/src/ipc/gen/File.rs | 31 +++++ rust/arrow/src/ipc/gen/Message.rs | 3 +- rust/arrow/src/ipc/gen/Schema.rs | 80 ++++++++++++- rust/arrow/src/ipc/gen/SparseTensor.rs | 154 +++++++++++++++++++------ rust/arrow/src/ipc/writer.rs | 109 ++++++++++++++++- 5 files changed, 334 insertions(+), 43 deletions(-) diff --git a/rust/arrow/src/ipc/gen/File.rs b/rust/arrow/src/ipc/gen/File.rs index e68c8e54816..a805100baf7 100644 --- a/rust/arrow/src/ipc/gen/File.rs +++ b/rust/arrow/src/ipc/gen/File.rs @@ -125,6 +125,9 @@ impl<'a> Footer<'a> { args: &'args FooterArgs<'args>, ) -> flatbuffers::WIPOffset> { let mut builder = FooterBuilder::new(_fbb); + if let Some(x) = args.custom_metadata { + builder.add_custom_metadata(x); + } if let Some(x) = args.recordBatches { builder.add_recordBatches(x); } @@ -142,6 +145,7 @@ impl<'a> Footer<'a> { pub const VT_SCHEMA: flatbuffers::VOffsetT = 6; pub const VT_DICTIONARIES: flatbuffers::VOffsetT = 8; pub const VT_RECORDBATCHES: flatbuffers::VOffsetT = 10; + pub const VT_CUSTOM_METADATA: flatbuffers::VOffsetT = 12; #[inline] pub fn version(&self) -> MetadataVersion { @@ -172,6 +176,15 @@ impl<'a> Footer<'a> { ) .map(|v| v.safe_slice()) } + /// User-defined metadata + #[inline] + pub fn custom_metadata( + &self, + ) -> Option>>> { + self._tab.get::>>, + >>(Footer::VT_CUSTOM_METADATA, None) + } } pub struct FooterArgs<'a> { @@ -179,6 +192,11 @@ pub struct FooterArgs<'a> { pub schema: Option>>, pub dictionaries: Option>>, pub recordBatches: Option>>, + pub custom_metadata: Option< + flatbuffers::WIPOffset< + flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset>>, + >, + >, } impl<'a> Default for FooterArgs<'a> { #[inline] @@ -188,6 +206,7 @@ impl<'a> Default for FooterArgs<'a> { schema: None, dictionaries: None, recordBatches: None, + custom_metadata: None, } } } @@ -233,6 +252,18 @@ impl<'a: 'b, 'b> FooterBuilder<'a, 'b> { ); } #[inline] + pub fn add_custom_metadata( + &mut self, + custom_metadata: flatbuffers::WIPOffset< + flatbuffers::Vector<'b, flatbuffers::ForwardsUOffset>>, + >, + ) { + self.fbb_.push_slot_always::>( + Footer::VT_CUSTOM_METADATA, + custom_metadata, + ); + } + #[inline] pub fn new( _fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>, ) -> FooterBuilder<'a, 'b> { diff --git a/rust/arrow/src/ipc/gen/Message.rs b/rust/arrow/src/ipc/gen/Message.rs index 5e403b08413..0907ea84fb9 100644 --- a/rust/arrow/src/ipc/gen/Message.rs +++ b/rust/arrow/src/ipc/gen/Message.rs @@ -385,7 +385,8 @@ impl<'a> DictionaryBatch<'a> { ) } /// If isDelta is true the values in the dictionary are to be appended to a - /// dictionary with the indicated id + /// dictionary with the indicated id. If isDelta is false this dictionary + /// should replace the existing dictionary. #[inline] pub fn isDelta(&self) -> bool { self._tab diff --git a/rust/arrow/src/ipc/gen/Schema.rs b/rust/arrow/src/ipc/gen/Schema.rs index 30113bdd7d7..24136adef6a 100644 --- a/rust/arrow/src/ipc/gen/Schema.rs +++ b/rust/arrow/src/ipc/gen/Schema.rs @@ -485,6 +485,63 @@ pub fn enum_name_type(e: Type) -> &'static str { } pub struct TypeUnionTableOffset {} +/// ---------------------------------------------------------------------- +/// Dictionary encoding metadata +/// Maintained for forwards compatibility, in the future +/// Dictionaries might be explicit maps between integers and values +/// allowing for non-contiguous index values +#[allow(non_camel_case_types)] +#[repr(i16)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] +pub enum DictionaryKind { + DenseArray = 0, +} + +const ENUM_MIN_DICTIONARY_KIND: i16 = 0; +const ENUM_MAX_DICTIONARY_KIND: i16 = 0; + +impl<'a> flatbuffers::Follow<'a> for DictionaryKind { + type Inner = Self; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + flatbuffers::read_scalar_at::(buf, loc) + } +} + +impl flatbuffers::EndianScalar for DictionaryKind { + #[inline] + fn to_little_endian(self) -> Self { + let n = i16::to_le(self as i16); + let p = &n as *const i16 as *const DictionaryKind; + unsafe { *p } + } + #[inline] + fn from_little_endian(self) -> Self { + let n = i16::from_le(self as i16); + let p = &n as *const i16 as *const DictionaryKind; + unsafe { *p } + } +} + +impl flatbuffers::Push for DictionaryKind { + type Output = DictionaryKind; + #[inline] + fn push(&self, dst: &mut [u8], _rest: &[u8]) { + flatbuffers::emplace_scalar::(dst, *self); + } +} + +#[allow(non_camel_case_types)] +const ENUM_VALUES_DICTIONARY_KIND: [DictionaryKind; 1] = [DictionaryKind::DenseArray]; + +#[allow(non_camel_case_types)] +const ENUM_NAMES_DICTIONARY_KIND: [&'static str; 1] = ["DenseArray"]; + +pub fn enum_name_dictionary_kind(e: DictionaryKind) -> &'static str { + let index = e as i16; + ENUM_NAMES_DICTIONARY_KIND[index as usize] +} + /// ---------------------------------------------------------------------- /// Endianness of the platform producing the data #[allow(non_camel_case_types)] @@ -2358,8 +2415,6 @@ impl<'a: 'b, 'b> KeyValueBuilder<'a, 'b> { pub enum DictionaryEncodingOffset {} #[derive(Copy, Clone, Debug, PartialEq)] -/// ---------------------------------------------------------------------- -/// Dictionary encoding metadata pub struct DictionaryEncoding<'a> { pub _tab: flatbuffers::Table<'a>, } @@ -2389,6 +2444,7 @@ impl<'a> DictionaryEncoding<'a> { if let Some(x) = args.indexType { builder.add_indexType(x); } + builder.add_dictionaryKind(args.dictionaryKind); builder.add_isOrdered(args.isOrdered); builder.finish() } @@ -2396,6 +2452,7 @@ impl<'a> DictionaryEncoding<'a> { pub const VT_ID: flatbuffers::VOffsetT = 4; pub const VT_INDEXTYPE: flatbuffers::VOffsetT = 6; pub const VT_ISORDERED: flatbuffers::VOffsetT = 8; + pub const VT_DICTIONARYKIND: flatbuffers::VOffsetT = 10; /// The known dictionary id in the application where this data is used. In /// the file or streaming formats, the dictionary ids are found in the @@ -2425,12 +2482,22 @@ impl<'a> DictionaryEncoding<'a> { .get::(DictionaryEncoding::VT_ISORDERED, Some(false)) .unwrap() } + #[inline] + pub fn dictionaryKind(&self) -> DictionaryKind { + self._tab + .get::( + DictionaryEncoding::VT_DICTIONARYKIND, + Some(DictionaryKind::DenseArray), + ) + .unwrap() + } } pub struct DictionaryEncodingArgs<'a> { pub id: i64, pub indexType: Option>>, pub isOrdered: bool, + pub dictionaryKind: DictionaryKind, } impl<'a> Default for DictionaryEncodingArgs<'a> { #[inline] @@ -2439,6 +2506,7 @@ impl<'a> Default for DictionaryEncodingArgs<'a> { id: 0, indexType: None, isOrdered: false, + dictionaryKind: DictionaryKind::DenseArray, } } } @@ -2464,6 +2532,14 @@ impl<'a: 'b, 'b> DictionaryEncodingBuilder<'a, 'b> { .push_slot::(DictionaryEncoding::VT_ISORDERED, isOrdered, false); } #[inline] + pub fn add_dictionaryKind(&mut self, dictionaryKind: DictionaryKind) { + self.fbb_.push_slot::( + DictionaryEncoding::VT_DICTIONARYKIND, + dictionaryKind, + DictionaryKind::DenseArray, + ); + } + #[inline] pub fn new( _fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>, ) -> DictionaryEncodingBuilder<'a, 'b> { diff --git a/rust/arrow/src/ipc/gen/SparseTensor.rs b/rust/arrow/src/ipc/gen/SparseTensor.rs index 8c04c6c2910..c7168973ebf 100644 --- a/rust/arrow/src/ipc/gen/SparseTensor.rs +++ b/rust/arrow/src/ipc/gen/SparseTensor.rs @@ -24,13 +24,71 @@ use flatbuffers::EndianScalar; use std::{cmp::Ordering, mem}; // automatically generated by the FlatBuffers compiler, do not modify +#[allow(non_camel_case_types)] +#[repr(i16)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] +pub enum SparseMatrixCompressedAxis { + Row = 0, + Column = 1, +} + +const ENUM_MIN_SPARSE_MATRIX_COMPRESSED_AXIS: i16 = 0; +const ENUM_MAX_SPARSE_MATRIX_COMPRESSED_AXIS: i16 = 1; + +impl<'a> flatbuffers::Follow<'a> for SparseMatrixCompressedAxis { + type Inner = Self; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + flatbuffers::read_scalar_at::(buf, loc) + } +} + +impl flatbuffers::EndianScalar for SparseMatrixCompressedAxis { + #[inline] + fn to_little_endian(self) -> Self { + let n = i16::to_le(self as i16); + let p = &n as *const i16 as *const SparseMatrixCompressedAxis; + unsafe { *p } + } + #[inline] + fn from_little_endian(self) -> Self { + let n = i16::from_le(self as i16); + let p = &n as *const i16 as *const SparseMatrixCompressedAxis; + unsafe { *p } + } +} + +impl flatbuffers::Push for SparseMatrixCompressedAxis { + type Output = SparseMatrixCompressedAxis; + #[inline] + fn push(&self, dst: &mut [u8], _rest: &[u8]) { + flatbuffers::emplace_scalar::(dst, *self); + } +} + +#[allow(non_camel_case_types)] +const ENUM_VALUES_SPARSE_MATRIX_COMPRESSED_AXIS: [SparseMatrixCompressedAxis; 2] = [ + SparseMatrixCompressedAxis::Row, + SparseMatrixCompressedAxis::Column, +]; + +#[allow(non_camel_case_types)] +const ENUM_NAMES_SPARSE_MATRIX_COMPRESSED_AXIS: [&'static str; 2] = ["Row", "Column"]; + +pub fn enum_name_sparse_matrix_compressed_axis( + e: SparseMatrixCompressedAxis, +) -> &'static str { + let index = e as i16; + ENUM_NAMES_SPARSE_MATRIX_COMPRESSED_AXIS[index as usize] +} + #[allow(non_camel_case_types)] #[repr(u8)] #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] pub enum SparseTensorIndex { NONE = 0, SparseTensorIndexCOO = 1, - SparseMatrixIndexCSR = 2, + SparseMatrixIndexCSX = 2, } const ENUM_MIN_SPARSE_TENSOR_INDEX: u8 = 0; @@ -71,12 +129,12 @@ impl flatbuffers::Push for SparseTensorIndex { const ENUM_VALUES_SPARSE_TENSOR_INDEX: [SparseTensorIndex; 3] = [ SparseTensorIndex::NONE, SparseTensorIndex::SparseTensorIndexCOO, - SparseTensorIndex::SparseMatrixIndexCSR, + SparseTensorIndex::SparseMatrixIndexCSX, ]; #[allow(non_camel_case_types)] const ENUM_NAMES_SPARSE_TENSOR_INDEX: [&'static str; 3] = - ["NONE", "SparseTensorIndexCOO", "SparseMatrixIndexCSR"]; + ["NONE", "SparseTensorIndexCOO", "SparseMatrixIndexCSX"]; pub fn enum_name_sparse_tensor_index(e: SparseTensorIndex) -> &'static str { let index = e as u8; @@ -244,16 +302,16 @@ impl<'a: 'b, 'b> SparseTensorIndexCOOBuilder<'a, 'b> { } } -pub enum SparseMatrixIndexCSROffset {} +pub enum SparseMatrixIndexCSXOffset {} #[derive(Copy, Clone, Debug, PartialEq)] -/// Compressed Sparse Row format, that is matrix-specific. -pub struct SparseMatrixIndexCSR<'a> { +/// Compressed Sparse format, that is matrix-specific. +pub struct SparseMatrixIndexCSX<'a> { pub _tab: flatbuffers::Table<'a>, } -impl<'a> flatbuffers::Follow<'a> for SparseMatrixIndexCSR<'a> { - type Inner = SparseMatrixIndexCSR<'a>; +impl<'a> flatbuffers::Follow<'a> for SparseMatrixIndexCSX<'a> { + type Inner = SparseMatrixIndexCSX<'a>; #[inline] fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { Self { @@ -262,17 +320,17 @@ impl<'a> flatbuffers::Follow<'a> for SparseMatrixIndexCSR<'a> { } } -impl<'a> SparseMatrixIndexCSR<'a> { +impl<'a> SparseMatrixIndexCSX<'a> { #[inline] pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { - SparseMatrixIndexCSR { _tab: table } + SparseMatrixIndexCSX { _tab: table } } #[allow(unused_mut)] pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, - args: &'args SparseMatrixIndexCSRArgs<'args>, - ) -> flatbuffers::WIPOffset> { - let mut builder = SparseMatrixIndexCSRBuilder::new(_fbb); + args: &'args SparseMatrixIndexCSXArgs<'args>, + ) -> flatbuffers::WIPOffset> { + let mut builder = SparseMatrixIndexCSXBuilder::new(_fbb); if let Some(x) = args.indicesBuffer { builder.add_indicesBuffer(x); } @@ -285,19 +343,31 @@ impl<'a> SparseMatrixIndexCSR<'a> { if let Some(x) = args.indptrType { builder.add_indptrType(x); } + builder.add_compressedAxis(args.compressedAxis); builder.finish() } - pub const VT_INDPTRTYPE: flatbuffers::VOffsetT = 4; - pub const VT_INDPTRBUFFER: flatbuffers::VOffsetT = 6; - pub const VT_INDICESTYPE: flatbuffers::VOffsetT = 8; - pub const VT_INDICESBUFFER: flatbuffers::VOffsetT = 10; + pub const VT_COMPRESSEDAXIS: flatbuffers::VOffsetT = 4; + pub const VT_INDPTRTYPE: flatbuffers::VOffsetT = 6; + pub const VT_INDPTRBUFFER: flatbuffers::VOffsetT = 8; + pub const VT_INDICESTYPE: flatbuffers::VOffsetT = 10; + pub const VT_INDICESBUFFER: flatbuffers::VOffsetT = 12; + /// Which axis, row or column, is compressed + #[inline] + pub fn compressedAxis(&self) -> SparseMatrixCompressedAxis { + self._tab + .get::( + SparseMatrixIndexCSX::VT_COMPRESSEDAXIS, + Some(SparseMatrixCompressedAxis::Row), + ) + .unwrap() + } /// The type of values in indptrBuffer #[inline] pub fn indptrType(&self) -> Option> { self._tab.get::>>( - SparseMatrixIndexCSR::VT_INDPTRTYPE, + SparseMatrixIndexCSX::VT_INDPTRTYPE, None, ) } @@ -326,13 +396,13 @@ impl<'a> SparseMatrixIndexCSR<'a> { #[inline] pub fn indptrBuffer(&self) -> Option<&'a Buffer> { self._tab - .get::(SparseMatrixIndexCSR::VT_INDPTRBUFFER, None) + .get::(SparseMatrixIndexCSX::VT_INDPTRBUFFER, None) } /// The type of values in indicesBuffer #[inline] pub fn indicesType(&self) -> Option> { self._tab.get::>>( - SparseMatrixIndexCSR::VT_INDICESTYPE, + SparseMatrixIndexCSX::VT_INDICESTYPE, None, ) } @@ -348,20 +418,22 @@ impl<'a> SparseMatrixIndexCSR<'a> { #[inline] pub fn indicesBuffer(&self) -> Option<&'a Buffer> { self._tab - .get::(SparseMatrixIndexCSR::VT_INDICESBUFFER, None) + .get::(SparseMatrixIndexCSX::VT_INDICESBUFFER, None) } } -pub struct SparseMatrixIndexCSRArgs<'a> { +pub struct SparseMatrixIndexCSXArgs<'a> { + pub compressedAxis: SparseMatrixCompressedAxis, pub indptrType: Option>>, pub indptrBuffer: Option<&'a Buffer>, pub indicesType: Option>>, pub indicesBuffer: Option<&'a Buffer>, } -impl<'a> Default for SparseMatrixIndexCSRArgs<'a> { +impl<'a> Default for SparseMatrixIndexCSXArgs<'a> { #[inline] fn default() -> Self { - SparseMatrixIndexCSRArgs { + SparseMatrixIndexCSXArgs { + compressedAxis: SparseMatrixCompressedAxis::Row, indptrType: None, indptrBuffer: None, indicesType: None, @@ -369,51 +441,59 @@ impl<'a> Default for SparseMatrixIndexCSRArgs<'a> { } } } -pub struct SparseMatrixIndexCSRBuilder<'a: 'b, 'b> { +pub struct SparseMatrixIndexCSXBuilder<'a: 'b, 'b> { fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, start_: flatbuffers::WIPOffset, } -impl<'a: 'b, 'b> SparseMatrixIndexCSRBuilder<'a, 'b> { +impl<'a: 'b, 'b> SparseMatrixIndexCSXBuilder<'a, 'b> { + #[inline] + pub fn add_compressedAxis(&mut self, compressedAxis: SparseMatrixCompressedAxis) { + self.fbb_.push_slot::( + SparseMatrixIndexCSX::VT_COMPRESSEDAXIS, + compressedAxis, + SparseMatrixCompressedAxis::Row, + ); + } #[inline] pub fn add_indptrType(&mut self, indptrType: flatbuffers::WIPOffset>) { self.fbb_.push_slot_always::>( - SparseMatrixIndexCSR::VT_INDPTRTYPE, + SparseMatrixIndexCSX::VT_INDPTRTYPE, indptrType, ); } #[inline] pub fn add_indptrBuffer(&mut self, indptrBuffer: &'b Buffer) { self.fbb_.push_slot_always::<&Buffer>( - SparseMatrixIndexCSR::VT_INDPTRBUFFER, + SparseMatrixIndexCSX::VT_INDPTRBUFFER, indptrBuffer, ); } #[inline] pub fn add_indicesType(&mut self, indicesType: flatbuffers::WIPOffset>) { self.fbb_.push_slot_always::>( - SparseMatrixIndexCSR::VT_INDICESTYPE, + SparseMatrixIndexCSX::VT_INDICESTYPE, indicesType, ); } #[inline] pub fn add_indicesBuffer(&mut self, indicesBuffer: &'b Buffer) { self.fbb_.push_slot_always::<&Buffer>( - SparseMatrixIndexCSR::VT_INDICESBUFFER, + SparseMatrixIndexCSX::VT_INDICESBUFFER, indicesBuffer, ); } #[inline] pub fn new( _fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>, - ) -> SparseMatrixIndexCSRBuilder<'a, 'b> { + ) -> SparseMatrixIndexCSXBuilder<'a, 'b> { let start = _fbb.start_table(); - SparseMatrixIndexCSRBuilder { + SparseMatrixIndexCSXBuilder { fbb_: _fbb, start_: start, } } #[inline] - pub fn finish(self) -> flatbuffers::WIPOffset> { + pub fn finish(self) -> flatbuffers::WIPOffset> { let o = self.fbb_.end_table(self.start_); flatbuffers::WIPOffset::new(o.value()) } @@ -755,12 +835,12 @@ impl<'a> SparseTensor<'a> { #[inline] #[allow(non_snake_case)] - pub fn sparseIndex_as_sparse_matrix_index_csr( + pub fn sparseIndex_as_sparse_matrix_index_csx( &self, - ) -> Option> { - if self.sparseIndex_type() == SparseTensorIndex::SparseMatrixIndexCSR { + ) -> Option> { + if self.sparseIndex_type() == SparseTensorIndex::SparseMatrixIndexCSX { self.sparseIndex() - .map(|u| SparseMatrixIndexCSR::init_from_table(u)) + .map(|u| SparseMatrixIndexCSX::init_from_table(u)) } else { None } diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 9b8ad5553a7..aaeec1becf9 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -73,7 +73,7 @@ impl FileWriter { "Cannot write record batch to file writer as it is closed".to_string(), )); } - let (meta, data) = write_record_batch(&mut self.writer, batch)?; + let (meta, data) = write_record_batch(&mut self.writer, batch, false)?; // add a record block for the footer self.record_blocks.push(ipc::Block::new( self.block_offsets as i64, @@ -84,7 +84,7 @@ impl FileWriter { Ok(()) } - /// write footer and closing tag, then mark the writer as done + /// Write footer and closing tag, then mark the writer as done pub fn finish(&mut self) -> Result<()> { let mut fbb = FlatBufferBuilder::new(); let dictionaries = fbb.create_vector(&self.dictionary_blocks); @@ -157,6 +157,58 @@ impl Drop for FileWriter { } } +pub struct StreamWriter { + /// The object to write to + writer: BufWriter, + /// A reference to the schema, used in validating record batches + schema: Schema, + /// Whether the writer footer has been written, and the writer is finished + finished: bool, +} + +impl StreamWriter { + /// Try create a new writer, with the schema written as part of the header + pub fn try_new(writer: W, schema: &Schema) -> Result { + let mut writer = BufWriter::new(writer); + // write the schema, set the written bytes to the schema + write_schema(&mut writer, schema)?; + Ok(Self { + writer, + schema: schema.clone(), + finished: false, + }) + } + + /// Write a record batch to the stream + pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + if self.finished { + return Err(ArrowError::IoError( + "Cannot write record batch to stream writer as it is closed".to_string(), + )); + } + write_record_batch(&mut self.writer, batch, true)?; + Ok(()) + } + + /// Write continuation bytes, and mark the stream as done + pub fn finish(&mut self) -> Result<()> { + self.writer.write(&[0u8, 0, 0, 0])?; + self.writer.write(&[255u8, 255, 255, 255])?; + self.finished = true; + + Ok(()) + } +} + +/// Finish the stream if it is not 'finished' when it goes out of scope +impl Drop for StreamWriter { + fn drop(&mut self) { + if !self.finished { + self.finish().unwrap(); + } + } +} + /// Convert the schema to its IPC representation, and write it to the `writer` fn write_schema(writer: &mut BufWriter, schema: &Schema) -> Result { let mut fbb = FlatBufferBuilder::new(); @@ -214,10 +266,12 @@ fn write_padded_data( Ok(total_len as usize) } -/// Write a record batch to the writer +/// Write a record batch to the writer, writing the message size before the message +/// if the record batch is being written to a stream fn write_record_batch( writer: &mut BufWriter, batch: &RecordBatch, + is_stream: bool, ) -> Result<(usize, usize)> { let mut fbb = FlatBufferBuilder::new(); @@ -258,6 +312,12 @@ fn write_record_batch( message.add_header(root); let root = message.finish(); fbb.finish(root, None); + let finished_data = fbb.finished_data(); + // write the length of data if writing to stream + if is_stream { + let total_len: u32 = finished_data.len() as u32; + writer.write(&total_len.to_le_bytes()[..])?; + } let meta_written = write_padded_data(writer, fbb.finished_data(), WriteDataType::Body)?; let arrow_data_written = @@ -440,6 +500,49 @@ mod tests { assert!(arrow_json.equals_reader(&mut reader)); }); } + + #[test] + fn read_and_rewrite_generated_streams() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_nested", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc/integration/0.14.1/{}.stream", + testdata, path + )) + .unwrap(); + + let mut reader = StreamReader::try_new(file).unwrap(); + + // read and rewrite the stream to a temp location + { + let file = File::create(format!("target/debug/testdata/{}.stream", path)) + .unwrap(); + let mut writer = StreamWriter::try_new(file, &reader.schema()).unwrap(); + while let Ok(Some(batch)) = reader.next() { + writer.write(&batch).unwrap(); + } + writer.finish().unwrap(); + } + + let file = + File::open(format!("target/debug/testdata/{}.stream", path)).unwrap(); + let mut reader = StreamReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + /// Read gzipped JSON file fn read_gzip_json(path: &str) -> ArrowJson { let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); From 11ed168b7fea5a2d6b14a8dadb1959c878afa509 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sun, 26 Jan 2020 21:58:46 +0200 Subject: [PATCH 3/3] fix IPC test file location --- rust/arrow/src/ipc/writer.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index aaeec1becf9..36c89c7a666 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -472,7 +472,7 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc/integration/0.14.1/{}.arrow_file", + "{}/arrow-ipc-stream/integration/0.14.1/{}.arrow_file", testdata, path )) .unwrap(); @@ -515,7 +515,7 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc/integration/0.14.1/{}.stream", + "{}/arrow-ipc-stream/integration/0.14.1/{}.stream", testdata, path )) .unwrap(); @@ -547,7 +547,7 @@ mod tests { fn read_gzip_json(path: &str) -> ArrowJson { let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); let file = File::open(format!( - "{}/arrow-ipc/integration/0.14.1/{}.json.gz", + "{}/arrow-ipc-stream/integration/0.14.1/{}.json.gz", testdata, path )) .unwrap();