diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 69f463b1145..32ecbb6430f 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1493,23 +1493,18 @@ def _temp_path(): file_objs = [ generate_primitive_case([], name='primitive_no_batches'), - generate_primitive_case([17, 20], name='primitive') - .skip_category('Rust'), - generate_primitive_case([0, 0, 0], name='primitive_zerolength') - .skip_category('Rust'), + generate_primitive_case([17, 20], name='primitive'), + generate_primitive_case([0, 0, 0], name='primitive_zerolength'), generate_primitive_large_offsets_case([17, 20]) .skip_category('Go') - .skip_category('JS') - .skip_category('Rust'), + .skip_category('JS'), generate_null_case([10, 0]) - .skip_category('Rust') .skip_category('JS') # TODO(ARROW-7900) .skip_category('Go'), # TODO(ARROW-7901) generate_null_trivial_case([0, 0]) - .skip_category('Rust') .skip_category('JS') # TODO(ARROW-7900) .skip_category('Go'), # TODO(ARROW-7901) @@ -1517,8 +1512,7 @@ def _temp_path(): .skip_category('Go') # TODO(ARROW-7948): Decimal + Go .skip_category('Rust'), - generate_datetime_case() - .skip_category('Rust'), + generate_datetime_case(), generate_interval_case() .skip_category('JS') # TODO(ARROW-5239): Intervals + JS diff --git a/rust/arrow-flight/src/utils.rs b/rust/arrow-flight/src/utils.rs index aa93cbfdc6e..c28e39b8842 100644 --- a/rust/arrow-flight/src/utils.rs +++ b/rust/arrow-flight/src/utils.rs @@ -29,12 +29,13 @@ use arrow::record_batch::RecordBatch; /// Convert a `RecordBatch` to `FlightData` by getting the header and body as bytes impl From<&RecordBatch> for FlightData { fn from(batch: &RecordBatch) -> Self { - let (header, body) = writer::record_batch_to_bytes(batch); + let options = writer::IpcWriteOptions::default(); + let data = writer::record_batch_to_bytes(batch, &options); Self { flight_descriptor: None, app_metadata: vec![], - data_header: header, - data_body: body, + data_header: data.ipc_message, + data_body: data.arrow_data, } } } @@ -42,8 +43,9 @@ impl From<&RecordBatch> for FlightData { /// Convert a `Schema` to `SchemaResult` by converting to an IPC message impl From<&Schema> for SchemaResult { fn from(schema: &Schema) -> Self { + let options = writer::IpcWriteOptions::default(); Self { - schema: writer::schema_to_bytes(schema), + schema: writer::schema_to_bytes(schema, &options).ipc_message, } } } @@ -51,11 +53,12 @@ impl From<&Schema> for SchemaResult { /// Convert a `Schema` to `FlightData` by converting to an IPC message impl From<&Schema> for FlightData { fn from(schema: &Schema) -> Self { - let schema = writer::schema_to_bytes(schema); + let options = writer::IpcWriteOptions::default(); + let schema = writer::schema_to_bytes(schema, &options); Self { flight_descriptor: None, app_metadata: vec![], - data_header: schema, + data_header: schema.ipc_message, data_body: vec![], } } diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index 1788a24f40d..7a5795de91c 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -345,7 +345,7 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>( Null => FBFieldType { type_type: ipc::Type::Null, type_: ipc::NullBuilder::new(fbb).finish().as_union_value(), - children: None, + children: Some(fbb.create_vector(&empty_fields[..])), }, Boolean => FBFieldType { type_type: ipc::Type::Bool, diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs index 5f7442d03a7..cba8fb269a4 100644 --- a/rust/arrow/src/ipc/mod.rs +++ b/rust/arrow/src/ipc/mod.rs @@ -36,3 +36,4 @@ pub use self::gen::SparseTensor::*; pub use self::gen::Tensor::*; static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; +static CONTINUATION_MARKER: [u8; 4] = [0xff; 4]; diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index c39c73716e3..af0b4e66a39 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -31,9 +31,9 @@ use crate::datatypes::{DataType, Field, IntervalUnit, Schema, SchemaRef}; use crate::error::{ArrowError, Result}; use crate::ipc; use crate::record_batch::{RecordBatch, RecordBatchReader}; -use DataType::*; -const CONTINUATION_MARKER: u32 = 0xffff_ffff; +use ipc::CONTINUATION_MARKER; +use DataType::*; /// Read a buffer based on offset and length fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer { @@ -482,6 +482,9 @@ pub struct FileReader { /// /// Dictionaries may be appended to in the streaming format. dictionaries_by_field: Vec>, + + /// Metadata version + metadata_version: ipc::MetadataVersion, } impl FileReader { @@ -506,12 +509,11 @@ impl FileReader { "Arrow file does not contain correct footer".to_string(), )); } - - // what does the footer contain? + // read footer length let mut footer_size: [u8; 4] = [0; 4]; reader.seek(SeekFrom::End(-10))?; reader.read_exact(&mut footer_size)?; - let footer_len = u32::from_le_bytes(footer_size); + let footer_len = i32::from_le_bytes(footer_size); // read footer let mut footer_data = vec![0; footer_len as usize]; @@ -534,6 +536,7 @@ impl FileReader { let mut dictionaries_by_field = vec![None; schema.fields().len()]; for block in footer.dictionaries().unwrap() { // read length from end of offset + // TODO: ARROW-9848: dictionary metadata has not been tested let meta_len = block.metaDataLength() - 4; let mut block_data = vec![0; meta_len as usize]; @@ -554,15 +557,21 @@ impl FileReader { reader.read_exact(&mut buf)?; if batch.isDelta() { - panic!("delta dictionary batches not supported"); + return Err(ArrowError::IoError( + "delta dictionary batches not supported".to_string(), + )); } let id = batch.id(); // As the dictionary batch does not contain the type of the // values array, we need to retieve this from the schema. - let first_field = find_dictionary_field(&ipc_schema, id) - .expect("dictionary id not found in shchema"); + let first_field = + find_dictionary_field(&ipc_schema, id).ok_or_else(|| { + ArrowError::InvalidArgumentError( + "dictionary id not found in schema".to_string(), + ) + })?; // Get an array representing this dictionary's values. let dictionary_values: ArrayRef = @@ -589,7 +598,11 @@ impl FileReader { } _ => None, } - .expect("dictionary id not found in schema"); + .ok_or_else(|| { + ArrowError::InvalidArgumentError( + "dictionary id not found in schema".to_string(), + ) + })?; // for all fields with this dictionary id, update the dictionaries vector // in the reader. Note that a dictionary batch may be shared between many fields. @@ -606,7 +619,11 @@ impl FileReader { } } } - _ => panic!("Expecting DictionaryBatch in dictionary blocks."), + _ => { + return Err(ArrowError::IoError( + "Expecting DictionaryBatch in dictionary blocks.".to_string(), + )) + } }; } @@ -617,6 +634,7 @@ impl FileReader { current_block: 0, total_blocks, dictionaries_by_field, + metadata_version: footer.version(), }) } @@ -657,16 +675,31 @@ impl RecordBatchReader for FileReader { let block = self.blocks[self.current_block]; self.current_block += 1; - // read length from end of offset - let meta_len = block.metaDataLength() - 4; + // read length + self.reader.seek(SeekFrom::Start(block.offset() as u64))?; + let mut meta_buf = [0; 4]; + self.reader.read_exact(&mut meta_buf)?; + if meta_buf == CONTINUATION_MARKER { + // continuation marker encountered, read message next + self.reader.read_exact(&mut meta_buf)?; + } + let meta_len = i32::from_le_bytes(meta_buf); let mut block_data = vec![0; meta_len as usize]; - self.reader - .seek(SeekFrom::Start(block.offset() as u64 + 4))?; self.reader.read_exact(&mut block_data)?; let message = ipc::get_root_as_message(&block_data[..]); + // some old test data's footer metadata is not set, so we account for that + if self.metadata_version != ipc::MetadataVersion::V1 + && message.version() != self.metadata_version + { + return Err(ArrowError::IoError( + "Could not read IPC message as metadata versions mismatch" + .to_string(), + )); + } + match message.header_type() { ipc::MessageHeader::Schema => Err(ArrowError::IoError( "Not expecting a schema when messages are read".to_string(), @@ -733,16 +766,12 @@ impl StreamReader { let mut meta_size: [u8; 4] = [0; 4]; reader.read_exact(&mut meta_size)?; let meta_len = { - let meta_len = u32::from_le_bytes(meta_size); - // If a continuation marker is encountered, skip over it and read // the size from the next four bytes. - if meta_len == CONTINUATION_MARKER { + if meta_size == CONTINUATION_MARKER { reader.read_exact(&mut meta_size)?; - u32::from_le_bytes(meta_size) - } else { - meta_len } + i32::from_le_bytes(meta_size) }; let mut meta_buffer = vec![0; meta_len as usize]; @@ -806,16 +835,12 @@ impl RecordBatchReader for StreamReader { } let meta_len = { - let meta_len = u32::from_le_bytes(meta_size); - // If a continuation marker is encountered, skip over it and read // the size from the next four bytes. - if meta_len == CONTINUATION_MARKER { + if meta_size == CONTINUATION_MARKER { self.reader.read_exact(&mut meta_size)?; - u32::from_le_bytes(meta_size) - } else { - meta_len } + i32::from_le_bytes(meta_size) }; if meta_len == 0 { diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 22121437cd7..effbc7168e5 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -32,9 +32,76 @@ use crate::ipc; use crate::record_batch::RecordBatch; use crate::util::bit_util; +use ipc::CONTINUATION_MARKER; + +/// IPC write options used to control the behaviour of the writer +#[derive(Debug)] +pub struct IpcWriteOptions { + /// Write padding after memory buffers to this multiple of bytes. + /// Generally 8 or 64, defaults to 8 + alignment: usize, + /// The legacy format is for releases before 0.15.0, and uses metadata V4 + write_legacy_ipc_format: bool, + /// The metadata version to write. The Rust IPC writer supports V4+ + metadata_version: ipc::MetadataVersion, +} + +impl IpcWriteOptions { + /// Try create IpcWriteOptions, checking for incompatible settings + pub fn try_new( + alignment: usize, + write_legacy_ipc_format: bool, + metadata_version: ipc::MetadataVersion, + ) -> Result { + if alignment == 0 || alignment % 8 != 0 { + return Err(ArrowError::InvalidArgumentError( + "Alignment should be greater than 0 and be a multiple of 8".to_string(), + )); + } + match metadata_version { + ipc::MetadataVersion::V1 + | ipc::MetadataVersion::V2 + | ipc::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError( + "Writing IPC metadata version 3 and lower not supported".to_string(), + )), + ipc::MetadataVersion::V4 => Ok(Self { + alignment, + write_legacy_ipc_format, + metadata_version, + }), + ipc::MetadataVersion::V5 => { + if write_legacy_ipc_format { + Err(ArrowError::InvalidArgumentError( + "Legacy IPC format only supported on metadata version 4" + .to_string(), + )) + } else { + Ok(Self { + alignment, + write_legacy_ipc_format, + metadata_version, + }) + } + } + } + } +} + +impl Default for IpcWriteOptions { + fn default() -> Self { + Self { + alignment: 8, + write_legacy_ipc_format: true, + metadata_version: ipc::MetadataVersion::V4, + } + } +} + pub struct FileWriter { /// The object to write to writer: BufWriter, + /// IPC write options + write_options: IpcWriteOptions, /// A reference to the schema, used in validating record batches schema: Schema, /// The number of bytes between each block of bytes, as an offset for random access @@ -50,17 +117,29 @@ pub struct FileWriter { impl FileWriter { /// Try create a new writer, with the schema written as part of the header pub fn try_new(writer: W, schema: &Schema) -> Result { + let write_options = IpcWriteOptions::default(); + Self::try_new_with_options(writer, schema, write_options) + } + + /// Try create a new writer with IpcWriteOptions + pub fn try_new_with_options( + writer: W, + schema: &Schema, + write_options: IpcWriteOptions, + ) -> Result { let mut writer = BufWriter::new(writer); // write magic to header writer.write_all(&super::ARROW_MAGIC[..])?; // create an 8-byte boundary after the header writer.write_all(&[0, 0])?; // write the schema, set the written bytes to the schema + header - let written = write_schema(&mut writer, schema)? + 8; + let message = Message::Schema(schema, &write_options); + let (meta, data) = write_message(&mut writer, &message, &write_options)?; Ok(Self { writer, + write_options, schema: schema.clone(), - block_offsets: written, + block_offsets: meta + data + 8, dictionary_blocks: vec![], record_blocks: vec![], finished: false, @@ -74,19 +153,25 @@ impl FileWriter { "Cannot write record batch to file writer as it is closed".to_string(), )); } - let (meta, data) = write_record_batch(&mut self.writer, batch, false)?; + let message = Message::RecordBatch(batch, &self.write_options); + let (meta, data) = + write_message(&mut self.writer, &message, &self.write_options)?; // add a record block for the footer - self.record_blocks.push(ipc::Block::new( + let block = ipc::Block::new( self.block_offsets as i64, - (meta as i32) + 4, + meta as i32, // TODO: is this still applicable? data as i64, - )); + ); + self.record_blocks.push(block); self.block_offsets += meta + data; Ok(()) } /// Write footer and closing tag, then mark the writer as done pub fn finish(&mut self) -> Result<()> { + // write EOS + write_continuation(&mut self.writer, &self.write_options, 0)?; + let mut fbb = FlatBufferBuilder::new(); let dictionaries = fbb.create_vector(&self.dictionary_blocks); let record_batches = fbb.create_vector(&self.record_blocks); @@ -130,14 +215,17 @@ impl FileWriter { }; let root = { let mut footer_builder = ipc::FooterBuilder::new(&mut fbb); - footer_builder.add_version(ipc::MetadataVersion::V4); + footer_builder.add_version(self.write_options.metadata_version); footer_builder.add_schema(schema); footer_builder.add_dictionaries(dictionaries); footer_builder.add_recordBatches(record_batches); footer_builder.finish() }; fbb.finish(root, None); - write_padded_data(&mut self.writer, fbb.finished_data(), WriteDataType::Footer)?; + let footer_data = fbb.finished_data(); + self.writer.write_all(footer_data)?; + self.writer + .write_all(&(footer_data.len() as i32).to_le_bytes())?; self.writer.write_all(&super::ARROW_MAGIC)?; self.writer.flush()?; self.finished = true; @@ -158,6 +246,8 @@ impl Drop for FileWriter { pub struct StreamWriter { /// The object to write to writer: BufWriter, + /// IPC write options + write_options: IpcWriteOptions, /// A reference to the schema, used in validating record batches schema: Schema, /// Whether the writer footer has been written, and the writer is finished @@ -167,11 +257,22 @@ pub struct StreamWriter { impl StreamWriter { /// Try create a new writer, with the schema written as part of the header pub fn try_new(writer: W, schema: &Schema) -> Result { + let write_options = IpcWriteOptions::default(); + Self::try_new_with_options(writer, schema, write_options) + } + + pub fn try_new_with_options( + writer: W, + schema: &Schema, + write_options: IpcWriteOptions, + ) -> Result { let mut writer = BufWriter::new(writer); // write the schema, set the written bytes to the schema - write_schema(&mut writer, schema)?; + let message = Message::Schema(schema, &write_options); + write_message(&mut writer, &message, &write_options)?; Ok(Self { writer, + write_options, schema: schema.clone(), finished: false, }) @@ -184,15 +285,14 @@ impl StreamWriter { "Cannot write record batch to stream writer as it is closed".to_string(), )); } - write_record_batch(&mut self.writer, batch, true)?; + let message = Message::RecordBatch(batch, &self.write_options); + write_message(&mut self.writer, &message, &self.write_options)?; Ok(()) } /// Write continuation bytes, and mark the stream as done pub fn finish(&mut self) -> Result<()> { - self.writer.write_all(&[255u8, 255, 255, 255])?; - self.writer.write_all(&[0u8, 0, 0, 0])?; - self.writer.flush()?; + write_continuation(&mut self.writer, &self.write_options, 0)?; self.finished = true; @@ -209,7 +309,15 @@ impl Drop for StreamWriter { } } -pub fn schema_to_bytes(schema: &Schema) -> Vec { +/// Stores the encoded data, which is an ipc::Message, and optional Arrow data +pub struct EncodedData { + /// An encoded ipc::Message + pub ipc_message: Vec, + /// Arrow buffers to be written, should be an empty vec for schema messages + pub arrow_data: Vec, +} + +pub fn schema_to_bytes(schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData { let mut fbb = FlatBufferBuilder::new(); let schema = { let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema); @@ -217,7 +325,7 @@ pub fn schema_to_bytes(schema: &Schema) -> Vec { }; let mut message = ipc::MessageBuilder::new(&mut fbb); - message.add_version(ipc::MetadataVersion::V4); + message.add_version(write_options.metadata_version); message.add_header_type(ipc::MessageHeader::Schema); message.add_bodyLength(0); message.add_header(schema); @@ -226,51 +334,101 @@ pub fn schema_to_bytes(schema: &Schema) -> Vec { fbb.finish(data, None); let data = fbb.finished_data(); - data.to_vec() + EncodedData { + ipc_message: data.to_vec(), + arrow_data: vec![], + } } -/// Convert the schema to its IPC representation, and write it to the `writer` -fn write_schema(writer: &mut BufWriter, schema: &Schema) -> Result { - let data = schema_to_bytes(schema); - write_padded_data(writer, &data[..], WriteDataType::Header) +enum Message<'a> { + Schema(&'a Schema, &'a IpcWriteOptions), + RecordBatch(&'a RecordBatch, &'a IpcWriteOptions), + DictionaryBatch(&'a IpcWriteOptions), } -/// The message type being written. This determines whether to write the data length or not. -/// Data length is written before the header, after the footer, and never for the body. -#[derive(PartialEq)] -enum WriteDataType { - Header, - Body, - Footer, +impl<'a> Message<'a> { + /// Encode message to a ipc::Message and return data as bytes + fn encode(&'a self) -> EncodedData { + match self { + Message::Schema(schema, options) => schema_to_bytes(*schema, *options), + Message::RecordBatch(batch, options) => { + record_batch_to_bytes(*batch, *options) + } + Message::DictionaryBatch(_) => { + unimplemented!("Writing dictionary batches not implemented") + } + } + } } -/// Write a slice of data to the writer, ensuring that it is padded to 8 bytes -fn write_padded_data( - writer: &mut BufWriter, - data: &[u8], - data_type: WriteDataType, -) -> Result { +/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written +fn write_message( + mut writer: &mut BufWriter, + message: &Message, + write_options: &IpcWriteOptions, +) -> Result<(usize, usize)> { + let encoded = message.encode(); + let arrow_data_len = encoded.arrow_data.len(); + if arrow_data_len % 8 != 0 { + return Err(ArrowError::MemoryError( + "Arrow data not aligned".to_string(), + )); + } + + let a = write_options.alignment - 1; + let buffer = encoded.ipc_message; + let flatbuf_size = buffer.len(); + let prefix_size = if write_options.write_legacy_ipc_format { + 4 + } else { + 8 + }; + let aligned_size = (flatbuf_size + prefix_size + a) & !a; + let padding_bytes = aligned_size - flatbuf_size - prefix_size; + + write_continuation( + &mut writer, + &write_options, + (aligned_size - prefix_size) as i32, + )?; + + // write the flatbuf + if flatbuf_size > 0 { + writer.write_all(&buffer)?; + } + // write padding + writer.write_all(&vec![0; padding_bytes])?; + + // write arrow data + let body_len = if arrow_data_len > 0 { + write_body_buffers(&mut writer, &encoded.arrow_data)? + } else { + 0 + }; + + Ok((aligned_size, body_len)) +} + +fn write_body_buffers(writer: &mut BufWriter, data: &[u8]) -> Result { let len = data.len() as u32; let pad_len = pad_to_8(len) as u32; let total_len = len + pad_len; - // write data length - if data_type == WriteDataType::Header { - writer.write_all(&total_len.to_le_bytes()[..])?; - } - // write flatbuffer data + + // write body buffer writer.write_all(data)?; if pad_len > 0 { writer.write_all(&vec![0u8; pad_len as usize][..])?; } - if data_type == WriteDataType::Footer { - writer.write_all(&total_len.to_le_bytes()[..])?; - } + writer.flush()?; Ok(total_len as usize) } /// Write a `RecordBatch` into a tuple of bytes, one for the header (ipc::Message) and the other for the batch's data -pub fn record_batch_to_bytes(batch: &RecordBatch) -> (Vec, Vec) { +pub fn record_batch_to_bytes( + batch: &RecordBatch, + write_options: &IpcWriteOptions, +) -> EncodedData { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; @@ -304,7 +462,7 @@ pub fn record_batch_to_bytes(batch: &RecordBatch) -> (Vec, Vec) { }; // create an ipc::Message let mut message = ipc::MessageBuilder::new(&mut fbb); - message.add_version(ipc::MetadataVersion::V4); + message.add_version(write_options.metadata_version); message.add_header_type(ipc::MessageHeader::RecordBatch); message.add_bodyLength(arrow_data.len() as i64); message.add_header(root); @@ -312,26 +470,46 @@ pub fn record_batch_to_bytes(batch: &RecordBatch) -> (Vec, Vec) { fbb.finish(root, None); let finished_data = fbb.finished_data(); - (finished_data.to_vec(), arrow_data) + EncodedData { + ipc_message: finished_data.to_vec(), + arrow_data, + } } /// Write a record batch to the writer, writing the message size before the message /// if the record batch is being written to a stream -fn write_record_batch( - writer: &mut BufWriter, - batch: &RecordBatch, - is_stream: bool, -) -> Result<(usize, usize)> { - let (meta_data, arrow_data) = record_batch_to_bytes(batch); - // write the length of data if writing to stream - if is_stream { - let total_len: u32 = meta_data.len() as u32; - writer.write_all(&total_len.to_le_bytes()[..])?; - } - let meta_written = write_padded_data(writer, &meta_data[..], WriteDataType::Body)?; - let arrow_data_written = - write_padded_data(writer, &arrow_data[..], WriteDataType::Body)?; - Ok((meta_written, arrow_data_written)) +fn write_continuation( + writer: &mut BufWriter, + write_options: &IpcWriteOptions, + total_len: i32, +) -> Result { + let mut written = 8; + + // the version of the writer determines whether continuation markers should be added + match write_options.metadata_version { + ipc::MetadataVersion::V1 + | ipc::MetadataVersion::V2 + | ipc::MetadataVersion::V3 => { + unreachable!("Options with the metadata version cannot be created") + } + ipc::MetadataVersion::V4 => { + if !write_options.write_legacy_ipc_format { + // v0.15.0 format + writer.write_all(&CONTINUATION_MARKER)?; + written = 4; + } + writer.write_all(&total_len.to_le_bytes()[..])?; + } + ipc::MetadataVersion::V5 => { + // write continuation marker and message length + writer.write_all(&CONTINUATION_MARKER)?; + writer.write_all(&total_len.to_le_bytes()[..])?; + } + }; + + writer.flush()?; + + Ok(written) } /// Write array data to a vector of bytes @@ -383,7 +561,7 @@ fn write_array_data( offset } -/// Write a buffer to a vector of bytes, and add its ipc Buffer to a vector +/// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector fn write_buffer( buffer: &Buffer, buffers: &mut Vec, @@ -401,11 +579,9 @@ fn write_buffer( } /// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes +#[inline] fn pad_to_8(len: u32) -> usize { - match len % 8 { - 0 => 0 as usize, - v => 8 - v as usize, - } + (((len + 7) & !7) - len) as usize } #[cfg(test)]