From 2e41fe5efb83bb7a590d265d0161af9fb47a5d54 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sun, 29 Dec 2019 23:14:03 +0200 Subject: [PATCH 1/2] ARROW-5182: [Rust] Arrow IPC file writer --- rust/arrow/src/ipc/convert.rs | 218 +++++++++++++--- rust/arrow/src/ipc/mod.rs | 3 + rust/arrow/src/ipc/reader.rs | 6 +- rust/arrow/src/ipc/writer.rs | 461 ++++++++++++++++++++++++++++++++++ 4 files changed, 654 insertions(+), 34 deletions(-) create mode 100644 rust/arrow/src/ipc/writer.rs diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index 9620bc40605..ed22eb4f5a9 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -26,8 +26,10 @@ use flatbuffers::{ use std::collections::HashMap; use std::sync::Arc; +use DataType::*; + /// Serialize a schema in IPC format -fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { +pub(crate) fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { let mut fbb = FlatBufferBuilder::new(); let mut fields = vec![]; @@ -73,6 +75,47 @@ fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { fbb } +pub(crate) fn schema_to_fb_offset<'a: 'b, 'b>( + mut fbb: &'a mut FlatBufferBuilder, + schema: &Schema, +) -> WIPOffset> { + let mut fields = vec![]; + for field in schema.fields() { + let fb_field_name = fbb.create_string(field.name().as_str()); + let (ipc_type_type, ipc_type, ipc_children) = + get_fb_field_type(field.data_type(), &mut fbb); + let mut field_builder = ipc::FieldBuilder::new(&mut fbb); + field_builder.add_name(fb_field_name); + field_builder.add_type_type(ipc_type_type); + field_builder.add_nullable(field.is_nullable()); + match ipc_children { + None => {} + Some(children) => field_builder.add_children(children), + }; + field_builder.add_type_(ipc_type); + fields.push(field_builder.finish()); + } + + let mut custom_metadata = vec![]; + for (k, v) in schema.metadata() { + let fb_key_name = fbb.create_string(k.as_str()); + let fb_val_name = fbb.create_string(v.as_str()); + + let mut kv_builder = ipc::KeyValueBuilder::new(&mut fbb); + kv_builder.add_key(fb_key_name); + kv_builder.add_value(fb_val_name); + custom_metadata.push(kv_builder.finish()); + } + + let fb_field_list = fbb.create_vector(&fields); + let fb_metadata_list = fbb.create_vector(&custom_metadata); + + let mut builder = ipc::SchemaBuilder::new(&mut fbb); + builder.add_fields(fb_field_list); + builder.add_custom_metadata(fb_metadata_list); + builder.finish() +} + /// Convert an IPC Field to Arrow Field impl<'a> From> for Field { fn from(field: ipc::Field) -> Field { @@ -85,7 +128,7 @@ impl<'a> From> for Field { } /// Deserialize a Schema table from IPC format to Schema data type -pub fn fb_to_schema(fb: ipc::Schema) -> Schema { +pub(crate) fn fb_to_schema(fb: ipc::Schema) -> Schema { let mut fields: Vec = vec![]; let c_fields = fb.fields().unwrap(); let len = c_fields.len(); @@ -110,7 +153,7 @@ pub fn fb_to_schema(fb: ipc::Schema) -> Schema { } /// Get the Arrow data type from the flatbuffer Field table -fn get_data_type(field: ipc::Field) -> DataType { +pub(crate) fn get_data_type(field: ipc::Field) -> DataType { match field.type_type() { ipc::Type::Bool => DataType::Boolean, ipc::Type::Int => { @@ -233,7 +276,7 @@ fn get_data_type(field: ipc::Field) -> DataType { } /// Get the IPC type of a data type -fn get_fb_field_type<'a: 'b, 'b>( +pub(crate) fn get_fb_field_type<'a: 'b, 'b>( data_type: &DataType, mut fbb: &mut FlatBufferBuilder<'a>, ) -> ( @@ -241,14 +284,20 @@ fn get_fb_field_type<'a: 'b, 'b>( WIPOffset, Option>>>>, ) { - use DataType::*; + // some IPC implementations expect an empty list for child data, instead of a null value. + // An empty field list is thus returned for primitive types + let empty_fields: Vec> = vec![]; match data_type { - Boolean => ( - ipc::Type::Bool, - ipc::BoolBuilder::new(&mut fbb).finish().as_union_value(), - None, - ), + Boolean => { + let children = fbb.create_vector(&empty_fields[..]); + ( + ipc::Type::Bool, + ipc::BoolBuilder::new(&mut fbb).finish().as_union_value(), + Some(children), + ) + } UInt8 | UInt16 | UInt32 | UInt64 => { + let children = fbb.create_vector(&empty_fields[..]); let mut builder = ipc::IntBuilder::new(&mut fbb); builder.add_is_signed(false); match data_type { @@ -258,9 +307,14 @@ fn get_fb_field_type<'a: 'b, 'b>( UInt64 => builder.add_bitWidth(64), _ => {} }; - (ipc::Type::Int, builder.finish().as_union_value(), None) + ( + ipc::Type::Int, + builder.finish().as_union_value(), + Some(children), + ) } Int8 | Int16 | Int32 | Int64 => { + let children = fbb.create_vector(&empty_fields[..]); let mut builder = ipc::IntBuilder::new(&mut fbb); builder.add_is_signed(true); match data_type { @@ -270,9 +324,14 @@ fn get_fb_field_type<'a: 'b, 'b>( Int64 => builder.add_bitWidth(64), _ => {} }; - (ipc::Type::Int, builder.finish().as_union_value(), None) + ( + ipc::Type::Int, + builder.finish().as_union_value(), + Some(children), + ) } Float16 | Float32 | Float64 => { + let children = fbb.create_vector(&empty_fields[..]); let mut builder = ipc::FloatingPointBuilder::new(&mut fbb); match data_type { Float16 => builder.add_precision(ipc::Precision::HALF), @@ -283,30 +342,57 @@ fn get_fb_field_type<'a: 'b, 'b>( ( ipc::Type::FloatingPoint, builder.finish().as_union_value(), - None, + Some(children), + ) + } + Binary => { + let children = fbb.create_vector(&empty_fields[..]); + ( + ipc::Type::Binary, + ipc::BinaryBuilder::new(&mut fbb).finish().as_union_value(), + Some(children), + ) + } + Utf8 => { + let children = fbb.create_vector(&empty_fields[..]); + ( + ipc::Type::Utf8, + ipc::Utf8Builder::new(&mut fbb).finish().as_union_value(), + Some(children), + ) + } + FixedSizeBinary(len) => { + let children = fbb.create_vector(&empty_fields[..]); + let mut builder = ipc::FixedSizeBinaryBuilder::new(&mut fbb); + builder.add_byteWidth(*len as i32); + ( + ipc::Type::FixedSizeBinary, + builder.finish().as_union_value(), + Some(children), ) } - Binary => ( - ipc::Type::Binary, - ipc::BinaryBuilder::new(&mut fbb).finish().as_union_value(), - None, - ), - Utf8 => ( - ipc::Type::Utf8, - ipc::Utf8Builder::new(&mut fbb).finish().as_union_value(), - None, - ), Date32(_) => { + let children = fbb.create_vector(&empty_fields[..]); let mut builder = ipc::DateBuilder::new(&mut fbb); builder.add_unit(ipc::DateUnit::DAY); - (ipc::Type::Date, builder.finish().as_union_value(), None) + ( + ipc::Type::Date, + builder.finish().as_union_value(), + Some(children), + ) } Date64(_) => { + let children = fbb.create_vector(&empty_fields[..]); let mut builder = ipc::DateBuilder::new(&mut fbb); builder.add_unit(ipc::DateUnit::MILLISECOND); - (ipc::Type::Date, builder.finish().as_union_value(), None) + ( + ipc::Type::Date, + builder.finish().as_union_value(), + Some(children), + ) } Time32(unit) | Time64(unit) => { + let children = fbb.create_vector(&empty_fields[..]); let mut builder = ipc::TimeBuilder::new(&mut fbb); match unit { TimeUnit::Second => { @@ -326,9 +412,14 @@ fn get_fb_field_type<'a: 'b, 'b>( builder.add_unit(ipc::TimeUnit::NANOSECOND); } } - (ipc::Type::Time, builder.finish().as_union_value(), None) + ( + ipc::Type::Time, + builder.finish().as_union_value(), + Some(children), + ) } Timestamp(unit, tz) => { + let children = fbb.create_vector(&empty_fields[..]); let tz = tz.clone().unwrap_or(Arc::new(String::new())); let tz_str = fbb.create_string(tz.as_str()); let mut builder = ipc::TimestampBuilder::new(&mut fbb); @@ -345,19 +436,25 @@ fn get_fb_field_type<'a: 'b, 'b>( ( ipc::Type::Timestamp, builder.finish().as_union_value(), - None, + Some(children), ) } Interval(unit) => { + let children = fbb.create_vector(&empty_fields[..]); let mut builder = ipc::IntervalBuilder::new(&mut fbb); let interval_unit = match unit { IntervalUnit::YearMonth => ipc::IntervalUnit::YEAR_MONTH, IntervalUnit::DayTime => ipc::IntervalUnit::DAY_TIME, }; builder.add_unit(interval_unit); - (ipc::Type::Interval, builder.finish().as_union_value(), None) + ( + ipc::Type::Interval, + builder.finish().as_union_value(), + Some(children), + ) } Duration(unit) => { + let children = fbb.create_vector(&empty_fields[..]); let mut builder = ipc::DurationBuilder::new(&mut fbb); let time_unit = match unit { TimeUnit::Second => ipc::TimeUnit::SECOND, @@ -366,7 +463,11 @@ fn get_fb_field_type<'a: 'b, 'b>( TimeUnit::Nanosecond => ipc::TimeUnit::NANOSECOND, }; builder.add_unit(time_unit); - (ipc::Type::Duration, builder.finish().as_union_value(), None) + ( + ipc::Type::Duration, + builder.finish().as_union_value(), + Some(children), + ) } List(ref list_type) => { let inner_types = get_fb_field_type(list_type, &mut fbb); @@ -389,6 +490,29 @@ fn get_fb_field_type<'a: 'b, 'b>( Some(children), ) } + FixedSizeList((ref list_type, len)) => { + let inner_types = get_fb_field_type(list_type, &mut fbb); + let child = ipc::Field::create( + &mut fbb, + &ipc::FieldArgs { + name: None, + nullable: false, + type_type: inner_types.0, + type_: Some(inner_types.1), + dictionary: None, + children: inner_types.2, + custom_metadata: None, + }, + ); + let children = fbb.create_vector(&[child]); + let mut builder = ipc::FixedSizeListBuilder::new(&mut fbb); + builder.add_listSize(*len as i32); + ( + ipc::Type::FixedSizeList, + builder.finish().as_union_value(), + Some(children), + ) + } Struct(fields) => { // struct's fields are children let mut children = vec![]; @@ -415,7 +539,6 @@ fn get_fb_field_type<'a: 'b, 'b>( Some(children), ) } - t @ _ => panic!("Unsupported Arrow Data Type {:?}", t), } } @@ -530,4 +653,39 @@ mod tests { let schema2 = fb_to_schema(ipc); assert_eq!(schema, schema2); } + + #[test] + fn schema_from_bytes() { + // bytes of a schema generated from python (0.14.0), saved as an `ipc::Message`. + // the schema is: Field("field1", DataType::UInt32, false) + let bytes: Vec = vec![ + 16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 3, 0, + 12, 0, 0, 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20, + 0, 0, 0, 16, 0, 20, 0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0, + 0, 0, 2, 32, 0, 0, 0, 20, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0, 8, 0, + 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0, + 0, 0, 0, 0, + ]; + let ipc = ipc::get_root_as_message(&bytes[..]); + let schema = ipc.header_as_schema().unwrap(); + + // a message generated from Rust, same as the Python one + let bytes: Vec = vec![ + 16, 0, 0, 0, 0, 0, 10, 0, 14, 0, 12, 0, 11, 0, 4, 0, 10, 0, 0, 0, 20, 0, 0, + 0, 0, 0, 0, 1, 3, 0, 10, 0, 12, 0, 0, 0, 8, 0, 4, 0, 10, 0, 0, 0, 8, 0, 0, 0, + 8, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 16, 0, 0, 0, 12, 0, 18, 0, 12, 0, 0, 0, + 11, 0, 4, 0, 12, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 2, 20, 0, 0, 0, 0, 0, 6, 0, + 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, + 0, 0, + ]; + let ipc2 = ipc::get_root_as_message(&bytes[..]); + let schema2 = ipc.header_as_schema().unwrap(); + + assert_eq!(schema, schema2); + assert_eq!(ipc.version(), ipc2.version()); + assert_eq!(ipc.header_type(), ipc2.header_type()); + assert_eq!(ipc.bodyLength(), ipc2.bodyLength()); + assert!(ipc.custom_metadata().is_none()); + assert!(ipc2.custom_metadata().is_none()); + } } diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs index 8c3bc08aec8..1ef653f00f8 100644 --- a/rust/arrow/src/ipc/mod.rs +++ b/rust/arrow/src/ipc/mod.rs @@ -17,6 +17,7 @@ pub mod convert; pub mod reader; +pub mod writer; pub mod gen; @@ -25,3 +26,5 @@ pub use self::gen::Message::*; pub use self::gen::Schema::*; pub use self::gen::SparseTensor::*; pub use self::gen::Tensor::*; + +static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index a7b89052184..b0e747f789e 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -32,8 +32,6 @@ use crate::ipc; use crate::record_batch::{RecordBatch, RecordBatchReader}; use DataType::*; -static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; - /// Read a buffer based on offset and length fn read_buffer(buf: &ipc::Buffer, a_data: &Vec) -> Buffer { let start_offset = buf.offset() as usize; @@ -410,14 +408,14 @@ impl FileReader { // check if header and footer contain correct magic bytes let mut magic_buffer: [u8; 6] = [0; 6]; reader.read_exact(&mut magic_buffer)?; - if magic_buffer != ARROW_MAGIC { + if magic_buffer != super::ARROW_MAGIC { return Err(ArrowError::IoError( "Arrow file does not contain correct header".to_string(), )); } reader.seek(SeekFrom::End(-6))?; reader.read_exact(&mut magic_buffer)?; - if magic_buffer != ARROW_MAGIC { + if magic_buffer != super::ARROW_MAGIC { return Err(ArrowError::IoError( "Arrow file does not contain correct footer".to_string(), )); diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs new file mode 100644 index 00000000000..0800bd60639 --- /dev/null +++ b/rust/arrow/src/ipc/writer.rs @@ -0,0 +1,461 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Arrow IPC File and Stream Writers +//! +//! The `FileWriter` and `StreamWriter` have similar interfaces, +//! however the `FileWriter` expects a reader that supports `Seek`ing + +use std::io::{BufWriter, Write}; + +use flatbuffers::FlatBufferBuilder; + +use crate::array::ArrayDataRef; +use crate::buffer::{Buffer, MutableBuffer}; +use crate::datatypes::*; +use crate::error::{ArrowError, Result}; +use crate::ipc; +use crate::record_batch::RecordBatch; + +pub struct FileWriter { + /// The object to write to + writer: BufWriter, + /// A reference to the schema, used in validating record batches + schema: Schema, + /// The number of bytes written for the header (up to schema) + header_bytes: usize, + /// The number of bytes between each block of bytes, as an offset for random access + block_offsets: usize, + /// Dictionary blocks that will be written as part of the IPC footer + dictionary_blocks: Vec, + /// Record blocks that will be written as part of the IPC footer + record_blocks: Vec, + /// Whether the writer footer has been written, and the writer is finished + finished: bool, +} + +impl FileWriter { + /// Try create a new writer, with the schema written as part of the header + pub fn try_new(writer: W, schema: &Schema) -> Result { + let mut writer = BufWriter::new(writer); + // write magic to header + writer.write(&super::ARROW_MAGIC[..])?; + // create an 8-byte boudnary after the header + writer.write(&[0, 0])?; + // write the schema, set the written bytes to the schema + header + let written = write_schema(&mut writer, schema)? + 8; + Ok(Self { + writer, + schema: schema.clone(), + header_bytes: written, + block_offsets: written, + dictionary_blocks: vec![], + record_blocks: vec![], + finished: false, + }) + } + + /// Write a record batch to the file + pub fn write(&mut self, batch: &RecordBatch) -> Result<()> { + if self.finished { + return Err(ArrowError::IoError( + "Cannot write record batch to file writer as it is closed".to_string(), + )); + } + let (meta, data) = write_record_batch(&mut self.writer, batch)?; + // add a record block for the footer + self.record_blocks.push(ipc::Block::new( + self.block_offsets as i64, + (meta as i32) + 4, + data as i64, + )); + self.block_offsets += meta + data; + Ok(()) + } + + /// write footer and closing tag, then mark the writer as done + pub fn finish(&mut self) -> Result<()> { + let mut fbb = FlatBufferBuilder::new(); + let dictionaries = fbb.create_vector(&self.dictionary_blocks); + let record_batches = fbb.create_vector(&self.record_blocks); + // TODO: this is duplicated as we otherwise mutably borrow twice + let schema = { + let mut fields = vec![]; + for field in self.schema.fields() { + let fb_field_name = fbb.create_string(field.name().as_str()); + let (ipc_type_type, ipc_type, ipc_children) = + ipc::convert::get_fb_field_type(field.data_type(), &mut fbb); + let mut field_builder = ipc::FieldBuilder::new(&mut fbb); + field_builder.add_name(fb_field_name); + field_builder.add_type_type(ipc_type_type); + field_builder.add_nullable(field.is_nullable()); + match ipc_children { + None => {} + Some(children) => field_builder.add_children(children), + }; + field_builder.add_type_(ipc_type); + fields.push(field_builder.finish()); + } + + let mut custom_metadata = vec![]; + for (k, v) in self.schema.metadata() { + let fb_key_name = fbb.create_string(k.as_str()); + let fb_val_name = fbb.create_string(v.as_str()); + + let mut kv_builder = ipc::KeyValueBuilder::new(&mut fbb); + kv_builder.add_key(fb_key_name); + kv_builder.add_value(fb_val_name); + custom_metadata.push(kv_builder.finish()); + } + + let fb_field_list = fbb.create_vector(&fields); + let fb_metadata_list = fbb.create_vector(&custom_metadata); + + let root = { + let mut builder = ipc::SchemaBuilder::new(&mut fbb); + builder.add_fields(fb_field_list); + builder.add_custom_metadata(fb_metadata_list); + builder.finish() + }; + root + }; + let root = { + let mut footer_builder = ipc::FooterBuilder::new(&mut fbb); + footer_builder.add_version(ipc::MetadataVersion::V4); + footer_builder.add_schema(schema); + footer_builder.add_dictionaries(dictionaries); + footer_builder.add_recordBatches(record_batches); + footer_builder.finish() + }; + fbb.finish(root, None); + write_padded_data(&mut self.writer, fbb.finished_data(), WriteDataType::Footer)?; + self.writer.write(&super::ARROW_MAGIC)?; + self.writer.flush()?; + self.finished = true; + + Ok(()) + } +} + +/// Finish the file if it is not 'finished' when it goes out of scope +impl Drop for FileWriter { + fn drop(&mut self) { + if !self.finished { + self.finish().unwrap(); + } + } +} + +/// Convert the schema to its IPC representation, and write it to the `writer` +fn write_schema(writer: &mut BufWriter, schema: &Schema) -> Result { + let mut fbb = FlatBufferBuilder::new(); + let schema = { + let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema); + fb.as_union_value() + }; + + let mut message = ipc::MessageBuilder::new(&mut fbb); + message.add_version(ipc::MetadataVersion::V4); + message.add_header_type(ipc::MessageHeader::Schema); + message.add_bodyLength(0); + message.add_header(schema); + // TODO: custom metadata + let data = message.finish(); + fbb.finish(data, None); + + let data = fbb.finished_data(); + let written = write_padded_data(writer, data, WriteDataType::Header); + + written +} + +/// The message type being written. This determines whether to write the data length or not. +/// Data length is written before the header, after the footer, and never for the body. +#[derive(PartialEq)] +enum WriteDataType { + Header, + Body, + Footer, +} + +/// Write a slice of data to the writer, ensuring that it is padded to 8 bytes +fn write_padded_data( + writer: &mut BufWriter, + data: &[u8], + data_type: WriteDataType, +) -> Result { + let len = data.len() as u32; + let pad_len = pad_to_8(len) as u32; + let total_len = len + pad_len; + // write data length + if data_type == WriteDataType::Header { + writer.write(&total_len.to_le_bytes()[..])?; + } + // write flatbuffer data + writer.write(data)?; + if pad_len > 0 { + writer.write(&vec![0u8; pad_len as usize][..])?; + } + if data_type == WriteDataType::Footer { + writer.write(&total_len.to_le_bytes()[..])?; + } + writer.flush()?; + Ok(total_len as usize) +} + +/// Write a record batch to the writer +fn write_record_batch( + writer: &mut BufWriter, + batch: &RecordBatch, +) -> Result<(usize, usize)> { + let mut fbb = FlatBufferBuilder::new(); + + let mut nodes: Vec = vec![]; + let mut buffers: Vec = vec![]; + let mut arrow_data: Vec = vec![]; + let mut offset = 0; + for array in batch.columns() { + let array_data = array.data(); + offset = write_array_data( + &array_data, + &mut buffers, + &mut arrow_data, + &mut nodes, + offset, + array.len(), + array.null_count(), + ); + } + + // write data + let buffers = fbb.create_vector(&buffers); + let nodes = fbb.create_vector(&nodes); + + let root = { + let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); + batch_builder.add_length(batch.num_rows() as i64); + batch_builder.add_nodes(nodes); + batch_builder.add_buffers(buffers); + let b = batch_builder.finish(); + b.as_union_value() + }; + // create an ipc::Message + let mut message = ipc::MessageBuilder::new(&mut fbb); + message.add_version(ipc::MetadataVersion::V4); + message.add_header_type(ipc::MessageHeader::RecordBatch); + message.add_bodyLength(arrow_data.len() as i64); + message.add_header(root); + let root = message.finish(); + fbb.finish(root, None); + let meta_written = + write_padded_data(writer, fbb.finished_data(), WriteDataType::Body)?; + let arrow_data_written = + write_padded_data(writer, &arrow_data[..], WriteDataType::Body)?; + Ok((meta_written, arrow_data_written)) +} + +/// Write array data to a vector of bytes +fn write_array_data( + array_data: &ArrayDataRef, + mut buffers: &mut Vec, + mut arrow_data: &mut Vec, + mut nodes: &mut Vec, + offset: i64, + num_rows: usize, + null_count: usize, +) -> i64 { + let mut offset = offset; + nodes.push(ipc::FieldNode::new(num_rows as i64, null_count as i64)); + // write null buffer if exists + let null_buffer = match array_data.null_buffer() { + None => { + // create a buffer and fill it with valid bits + let buffer = MutableBuffer::new(num_rows); + let buffer = buffer.with_bitset(num_rows, true); + let buffer = buffer.freeze(); + buffer + } + Some(buffer) => buffer.clone(), + }; + offset = write_buffer(&null_buffer, &mut buffers, &mut arrow_data, offset); + + array_data.buffers().iter().for_each(|buffer| { + offset = write_buffer(buffer, &mut buffers, &mut arrow_data, offset); + }); + + // recursively write out nested structures + array_data.child_data().iter().for_each(|data_ref| { + // write the nested data (e.g list data) + offset = write_array_data( + data_ref, + &mut buffers, + &mut arrow_data, + &mut nodes, + offset, + data_ref.len(), + data_ref.null_count(), + ); + }); + offset +} + +/// Write a buffer to a vector of bytes, and add its ipc Buffer to a vector +fn write_buffer( + buffer: &Buffer, + buffers: &mut Vec, + arrow_data: &mut Vec, + offset: i64, +) -> i64 { + let len = buffer.len(); + let pad_len = pad_to_8(len as u32); + let total_len: i64 = (len + pad_len) as i64; + // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes"); + buffers.push(ipc::Buffer::new(offset, total_len)); + arrow_data.extend_from_slice(buffer.data()); + arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); + offset + total_len +} + +/// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes +fn pad_to_8<'a>(len: u32) -> usize { + match len % 8 { + 0 => 0 as usize, + v => 8 - v as usize, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use flate2::read::GzDecoder; + + use crate::array::*; + use crate::datatypes::Field; + use crate::ipc::reader::*; + use crate::util::integration_util::*; + use std::env; + use std::fs::File; + use std::io::Read; + use std::sync::Arc; + + #[test] + fn test_write_file() { + let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]); + let values: Vec> = vec![ + Some(999), + None, + Some(235), + Some(123), + None, + None, + None, + None, + None, + ]; + let array1 = UInt32Array::from(values); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(array1) as ArrayRef], + ) + .unwrap(); + { + let file = File::create("target/debug/testdata/arrow.arrow_file").unwrap(); + let mut writer = FileWriter::try_new(file, &schema).unwrap(); + + writer.write(&batch).unwrap(); + // this is inside a block to test the implicit finishing of the file on `Drop` + } + + { + let file = + File::open(format!("target/debug/testdata/{}.arrow_file", "arrow")) + .unwrap(); + let mut reader = FileReader::try_new(file).unwrap(); + while let Ok(Some(read_batch)) = reader.next() { + read_batch + .columns() + .iter() + .zip(batch.columns()) + .for_each(|(a, b)| { + assert_eq!(a.data_type(), b.data_type()); + assert_eq!(a.len(), b.len()); + assert_eq!(a.null_count(), b.null_count()); + }); + } + } + // panic!("intentional failure"); + } + + #[test] + fn read_and_rewrite_generated_files() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_nested", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc/integration/0.14.1/{}.arrow_file", + testdata, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file).unwrap(); + + // read and rewrite the file to a temp location + { + let file = + File::create(format!("target/debug/testdata/{}.arrow_file", path)) + .unwrap(); + let mut writer = FileWriter::try_new(file, &reader.schema()).unwrap(); + while let Ok(Some(batch)) = reader.next() { + writer.write(&batch).unwrap(); + } + writer.finish().unwrap(); + } + + let file = + File::open(format!("target/debug/testdata/{}.arrow_file", path)).unwrap(); + let mut reader = FileReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + /// Read gzipped JSON file + fn read_gzip_json(path: &str) -> ArrowJson { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let file = File::open(format!( + "{}/arrow-ipc/integration/0.14.1/{}.json.gz", + testdata, path + )) + .unwrap(); + let mut gz = GzDecoder::new(&file); + let mut s = String::new(); + gz.read_to_string(&mut s).unwrap(); + // convert to Arrow JSON + let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap(); + arrow_json + } +} From a23ecdf5d08793f49583c05f9221359ca8f81d20 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sat, 18 Jan 2020 12:36:47 +0200 Subject: [PATCH 2/2] fix FixedSizeList after rebase --- rust/arrow/src/ipc/convert.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index ed22eb4f5a9..a38975fbf1c 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -490,7 +490,7 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>( Some(children), ) } - FixedSizeList((ref list_type, len)) => { + FixedSizeList(ref list_type, len) => { let inner_types = get_fb_field_type(list_type, &mut fbb); let child = ipc::Field::create( &mut fbb,