diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index 028444bd113..e34d061198b 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -51,6 +51,7 @@ flatbuffers = "^0.8" hex = "0.4" prettytable-rs = { version = "0.8.0", optional = true } lexical-core = "^0.7" +lz4 = "1.23" [features] default = [] diff --git a/rust/arrow/src/array/array_struct.rs b/rust/arrow/src/array/array_struct.rs index 50e7eea3db6..eeae29987f9 100644 --- a/rust/arrow/src/array/array_struct.rs +++ b/rust/arrow/src/array/array_struct.rs @@ -69,6 +69,9 @@ impl StructArray { } /// Return child array whose field name equals to column_name + /// + /// Note: The Arrow specification allows for duplicate field names, and in such + /// case, this function will return the first column with the specified name. pub fn column_by_name(&self, column_name: &str) -> Option<&ArrayRef> { self.column_names() .iter() diff --git a/rust/arrow/src/ipc/compression.rs b/rust/arrow/src/ipc/compression.rs new file mode 100644 index 00000000000..f17d9eac3fa --- /dev/null +++ b/rust/arrow/src/ipc/compression.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! IPC Compression Utilities + +use std::io::{Read, Write}; + +use crate::error::{ArrowError, Result}; +use crate::ipc::gen::Message::CompressionType; + +pub trait IpcCompressionCodec { + fn compress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()>; + fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result; + fn get_compression_type(&self) -> CompressionType; +} + +pub type Codec = Box; + +#[inline] +pub(crate) fn get_codec( + compression_type: Option, +) -> Result> { + match compression_type { + Some(CompressionType::LZ4_FRAME) => Ok(Some(Box::new(Lz4CompressionCodec {}))), + Some(ctype) => Err(ArrowError::InvalidArgumentError(format!( + "IPC CompresstionType {:?} not yet supported", + ctype + ))), + None => Ok(None), + } +} + +pub struct Lz4CompressionCodec {} +const LZ4_BUFFER_SIZE: usize = 4096; + +impl IpcCompressionCodec for Lz4CompressionCodec { + fn compress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { + if input_buf.is_empty() { + output_buf.write_all(&8i64.to_le_bytes())?; + return Ok(()); + } + // write out the uncompressed length as a LE i64 value + output_buf.write_all(&(input_buf.len() as i64).to_le_bytes())?; + let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?; + let mut from = 0; + loop { + let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len()); + encoder.write_all(&input_buf[from..to])?; + from += LZ4_BUFFER_SIZE; + if from >= input_buf.len() { + break; + } + } + Ok(encoder.finish().1?) + } + + fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result { + if input_buf.is_empty() { + return Ok(0); + } + let mut decoder = lz4::Decoder::new(&input_buf[8..])?; + let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE]; + let mut total_len = 0; + loop { + let len = decoder.read(&mut buffer)?; + if len == 0 { + break; + } + total_len += len; + output_buf.write_all(&buffer[0..len])?; + } + decoder.finish().1?; + Ok(total_len) + } + + fn get_compression_type(&self) -> CompressionType { + CompressionType::LZ4_FRAME + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use rand::Rng; + + use crate::util::test_util::seedable_rng; + + const INPUT_BUFFER_LEN: usize = 256; + + #[test] + fn test_lz4_roundtrip() { + let mut rng = seedable_rng(); + let mut bytes: Vec = Vec::with_capacity(INPUT_BUFFER_LEN); + + (0..INPUT_BUFFER_LEN).for_each(|_| { + bytes.push(rng.gen::()); + }); + + let codec = Lz4CompressionCodec {}; + let mut compressed = Vec::new(); + codec.compress(&bytes, &mut compressed).unwrap(); + + let mut decompressed = Vec::new(); + let _ = codec.decompress(&compressed, &mut decompressed).unwrap(); + assert_eq!(decompressed, bytes); + } +} diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs index a2d7103aacf..7b7ad070294 100644 --- a/rust/arrow/src/ipc/mod.rs +++ b/rust/arrow/src/ipc/mod.rs @@ -18,6 +18,7 @@ // TODO: (vcq): Protobuf codegen is not generating Debug impls. #![allow(missing_debug_implementations)] +pub mod compression; pub mod convert; pub mod reader; pub mod writer; diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index d3f282961d1..a344e558b44 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -32,15 +32,24 @@ use crate::error::{ArrowError, Result}; use crate::ipc; use crate::record_batch::{RecordBatch, RecordBatchReader}; +use ipc::compression::{get_codec, Codec}; use ipc::CONTINUATION_MARKER; use DataType::*; /// Read a buffer based on offset and length -fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer { +fn read_buffer(buf: &ipc::Buffer, a_data: &[u8], codec: Option<&Codec>) -> Buffer { let start_offset = buf.offset() as usize; let end_offset = start_offset + buf.length() as usize; let buf_data = &a_data[start_offset..end_offset]; - Buffer::from(&buf_data) + // decompress the buffer if it is compressed + match codec { + Some(codec) => { + let mut buf = Vec::new(); + codec.decompress(buf_data, &mut buf).unwrap(); + Buffer::from(buf) + } + None => Buffer::from(&buf_data), + } } /// Coordinates reading arrays based on data types. @@ -52,6 +61,7 @@ fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer { /// - check if the bit width of non-64-bit numbers is 64, and /// - read the buffer as 64-bit (signed integer or float), and /// - cast the 64-bit array to the appropriate data type +#[allow(clippy::clippy::too_many_arguments)] fn create_array( nodes: &[ipc::FieldNode], data_type: &DataType, @@ -60,6 +70,7 @@ fn create_array( dictionaries: &[Option], mut node_index: usize, mut buffer_index: usize, + codec: Option<&Codec>, ) -> (ArrayRef, usize, usize) { use DataType::*; let array = match data_type { @@ -69,7 +80,7 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 3] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, codec)) .collect(), ); node_index += 1; @@ -82,7 +93,7 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, codec)) .collect(), ); node_index += 1; @@ -93,7 +104,7 @@ fn create_array( let list_node = &nodes[node_index]; let list_buffers: Vec = buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, codec)) .collect(); node_index += 1; buffer_index += 2; @@ -105,6 +116,7 @@ fn create_array( dictionaries, node_index, buffer_index, + codec, ); node_index = triple.1; buffer_index = triple.2; @@ -115,7 +127,7 @@ fn create_array( let list_node = &nodes[node_index]; let list_buffers: Vec = buffers[buffer_index..=buffer_index] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, codec)) .collect(); node_index += 1; buffer_index += 1; @@ -127,6 +139,7 @@ fn create_array( dictionaries, node_index, buffer_index, + codec, ); node_index = triple.1; buffer_index = triple.2; @@ -135,7 +148,7 @@ fn create_array( } Struct(struct_fields) => { let struct_node = &nodes[node_index]; - let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data); + let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data, codec); node_index += 1; buffer_index += 1; @@ -152,6 +165,7 @@ fn create_array( dictionaries, node_index, buffer_index, + codec, ); node_index = triple.1; buffer_index = triple.2; @@ -171,7 +185,7 @@ fn create_array( let index_node = &nodes[node_index]; let index_buffers: Vec = buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, codec)) .collect(); let value_array = dictionaries[node_index].clone().unwrap(); node_index += 1; @@ -200,7 +214,7 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, codec)) .collect(), ); node_index += 1; @@ -424,6 +438,20 @@ pub fn read_record_batch( let mut node_index = 0; let mut arrays = vec![]; + let codec: Option = match batch.compression() { + Some(body_compression) => { + let method = body_compression.method(); + if ipc::BodyCompressionMethod::BUFFER != method { + return Err(ArrowError::IoError(format!( + "Encountered unsupported body compression method: {:?}", + method + ))); + } + get_codec(Some(body_compression.codec()))? + } + None => None, + }; + // keep track of index as lists require more than one node for field in schema.fields() { let triple = create_array( @@ -434,6 +462,7 @@ pub fn read_record_batch( dictionaries, node_index, buffer_index, + codec.as_ref(), ); node_index = triple.1; buffer_index = triple.2; @@ -919,14 +948,16 @@ impl RecordBatchReader for StreamReader { mod tests { use super::*; + use std::fs::File; + use flate2::read::GzDecoder; use crate::util::integration_util::*; - use std::fs::File; #[test] - fn read_generated_files() { + fn read_generated_files_014() { let testdata = crate::util::test_util::arrow_test_data(); + let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", @@ -940,15 +971,15 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.arrow_file", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path )) .unwrap(); let mut reader = FileReader::try_new(file).unwrap(); // read expected JSON output - let arrow_json = read_gzip_json(path); + let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader)); }); } @@ -974,6 +1005,8 @@ mod tests { "generated_datetime", "generated_dictionary", "generated_nested", + "generated_null_trivial", + "generated_null", "generated_primitive_no_batches", "generated_primitive_zerolength", "generated_primitive", @@ -990,8 +1023,9 @@ mod tests { } #[test] - fn read_generated_streams() { + fn read_generated_streams_014() { let testdata = crate::util::test_util::arrow_test_data(); + let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", @@ -1005,15 +1039,81 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.stream", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let mut reader = StreamReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + // the next batch must be empty + assert!(reader.next().is_none()); + // the stream must indicate that it's finished + assert!(reader.is_finished()); + }); + } + + #[test] + fn read_generated_files_100() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_dictionary", + "generated_nested", + "generated_null_trivial", + "generated_null", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + + #[test] + fn read_generated_streams_100() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_dictionary", + "generated_nested", + "generated_null_trivial", + "generated_null", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path )) .unwrap(); let mut reader = StreamReader::try_new(file).unwrap(); // read expected JSON output - let arrow_json = read_gzip_json(path); + let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader)); // the next batch must be empty assert!(reader.next().is_none()); @@ -1072,11 +1172,11 @@ mod tests { } /// Read gzipped JSON file - fn read_gzip_json(path: &str) -> ArrowJson { + fn read_gzip_json(version: &str, path: &str) -> ArrowJson { let testdata = crate::util::test_util::arrow_test_data(); let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.json.gz", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.json.gz", + testdata, version, path )) .unwrap(); let mut gz = GzDecoder::new(&file); diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index fdec26c1b79..2af71fabb11 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -33,6 +33,7 @@ use crate::ipc; use crate::record_batch::RecordBatch; use crate::util::bit_util; +use ipc::compression::{get_codec, Codec}; use ipc::CONTINUATION_MARKER; /// IPC write options used to control the behaviour of the writer @@ -44,7 +45,21 @@ pub struct IpcWriteOptions { /// The legacy format is for releases before 0.15.0, and uses metadata V4 write_legacy_ipc_format: bool, /// The metadata version to write. The Rust IPC writer supports V4+ + /// + /// *Default versions per crate* + /// + /// When creating the default IpcWriteOptions, the following metadata versions are used: + /// + /// version 2.0.0: V4, with legacy format enabled + /// version 3.0.0: V5 metadata_version: ipc::MetadataVersion, + /// Compression type to use. + /// + /// Compression is currently only supported when writing metadata version V5, though + /// it might be possible to read non-legacy V4 buffers that are compressed. + /// The compression formats supported as of v3.0.0 of the library are: + /// - LZ4_FRAME + compression_type: Option, } impl IpcWriteOptions { @@ -53,6 +68,7 @@ impl IpcWriteOptions { alignment: usize, write_legacy_ipc_format: bool, metadata_version: ipc::MetadataVersion, + compression_type: Option, ) -> Result { if alignment == 0 || alignment % 8 != 0 { return Err(ArrowError::InvalidArgumentError( @@ -65,24 +81,32 @@ impl IpcWriteOptions { | ipc::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError( "Writing IPC metadata version 3 and lower not supported".to_string(), )), - ipc::MetadataVersion::V4 => Ok(Self { - alignment, - write_legacy_ipc_format, - metadata_version, - }), + ipc::MetadataVersion::V4 => { + if compression_type.is_some() { + return Err(ArrowError::InvalidArgumentError( + "Writing IPC metadata version 4 with compressed buffers is currently not supported".to_string() + )); + } + Ok(Self { + alignment, + write_legacy_ipc_format, + metadata_version, + compression_type, + }) + } ipc::MetadataVersion::V5 => { if write_legacy_ipc_format { - Err(ArrowError::InvalidArgumentError( + return Err(ArrowError::InvalidArgumentError( "Legacy IPC format only supported on metadata version 4" .to_string(), - )) - } else { - Ok(Self { - alignment, - write_legacy_ipc_format, - metadata_version, - }) + )); } + Ok(Self { + alignment, + write_legacy_ipc_format, + metadata_version, + compression_type, + }) } z => panic!("Unsupported ipc::MetadataVersion {:?}", z), } @@ -93,8 +117,9 @@ impl Default for IpcWriteOptions { fn default() -> Self { Self { alignment: 8, - write_legacy_ipc_format: true, - metadata_version: ipc::MetadataVersion::V4, + write_legacy_ipc_format: false, + metadata_version: ipc::MetadataVersion::V5, + compression_type: None, } } } @@ -140,6 +165,9 @@ impl IpcDataGenerator { let schema = batch.schema(); let mut encoded_dictionaries = Vec::with_capacity(schema.fields().len()); + let codec = get_codec(write_options.compression_type)?; + let codec_ref = codec.as_ref(); + for (i, field) in schema.fields().iter().enumerate() { let column = batch.column(i); @@ -157,12 +185,13 @@ impl IpcDataGenerator { dict_id, dict_values, write_options, - )); + codec_ref, + )?); } } } - let encoded_message = self.record_batch_to_bytes(batch, write_options); + let encoded_message = self.record_batch_to_bytes(batch, write_options)?; Ok((encoded_dictionaries, encoded_message)) } @@ -173,13 +202,14 @@ impl IpcDataGenerator { &self, batch: &RecordBatch, write_options: &IpcWriteOptions, - ) -> EncodedData { + ) -> Result { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; let mut buffers: Vec = vec![]; let mut arrow_data: Vec = vec![]; let mut offset = 0; + let codec = get_codec(write_options.compression_type)?; for array in batch.columns() { let array_data = array.data(); offset = write_array_data( @@ -190,18 +220,32 @@ impl IpcDataGenerator { offset, array.len(), array.null_count(), - ); + codec.as_ref(), + )?; } // write data let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); + let compression = { + if let Some(compression_type) = &write_options.compression_type { + let mut c = ipc::BodyCompressionBuilder::new(&mut fbb); + c.add_method(ipc::BodyCompressionMethod::BUFFER); + c.add_codec(*compression_type); + Some(c.finish()) + } else { + None + } + }; let root = { let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); batch_builder.add_length(batch.num_rows() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); + if let Some(c) = compression { + batch_builder.add_compression(c); + } let b = batch_builder.finish(); b.as_union_value() }; @@ -215,10 +259,10 @@ impl IpcDataGenerator { fbb.finish(root, None); let finished_data = fbb.finished_data(); - EncodedData { + Ok(EncodedData { ipc_message: finished_data.to_vec(), arrow_data, - } + }) } /// Write dictionary values into two sets of bytes, one for the header (ipc::Message) and the @@ -228,7 +272,8 @@ impl IpcDataGenerator { dict_id: i64, array_data: &ArrayDataRef, write_options: &IpcWriteOptions, - ) -> EncodedData { + codec: Option<&Codec>, + ) -> Result { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; @@ -243,7 +288,8 @@ impl IpcDataGenerator { 0, array_data.len(), array_data.null_count(), - ); + codec, + )?; // write data let buffers = fbb.create_vector(&buffers); @@ -276,10 +322,10 @@ impl IpcDataGenerator { fbb.finish(root, None); let finished_data = fbb.finished_data(); - EncodedData { + Ok(EncodedData { ipc_message: finished_data.to_vec(), arrow_data, - } + }) } } @@ -655,6 +701,7 @@ fn write_continuation( } /// Write array data to a vector of bytes +#[allow(clippy::clippy::too_many_arguments)] fn write_array_data( array_data: &ArrayDataRef, mut buffers: &mut Vec, @@ -663,7 +710,8 @@ fn write_array_data( offset: i64, num_rows: usize, null_count: usize, -) -> i64 { + codec: Option<&Codec>, +) -> Result { let mut offset = offset; nodes.push(ipc::FieldNode::new(num_rows as i64, null_count as i64)); // NullArray does not have any buffers, thus the null buffer is not generated @@ -680,16 +728,17 @@ fn write_array_data( Some(buffer) => buffer.clone(), }; - offset = write_buffer(&null_buffer, &mut buffers, &mut arrow_data, offset); + offset = + write_buffer(&null_buffer, &mut buffers, &mut arrow_data, offset, codec)?; } - array_data.buffers().iter().for_each(|buffer| { - offset = write_buffer(buffer, &mut buffers, &mut arrow_data, offset); - }); + for buffer in array_data.buffers() { + offset = write_buffer(buffer, &mut buffers, &mut arrow_data, offset, codec)?; + } if !matches!(array_data.data_type(), DataType::Dictionary(_, _)) { // recursively write out nested structures - array_data.child_data().iter().for_each(|data_ref| { + for data_ref in array_data.child_data() { // write the nested data (e.g list data) offset = write_array_data( data_ref, @@ -699,11 +748,12 @@ fn write_array_data( offset, data_ref.len(), data_ref.null_count(), - ); - }); + codec, + )?; + } } - offset + Ok(offset) } /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector @@ -712,15 +762,24 @@ fn write_buffer( buffers: &mut Vec, arrow_data: &mut Vec, offset: i64, -) -> i64 { - let len = buffer.len(); + codec: Option<&Codec>, +) -> Result { + let mut compressed = Vec::new(); + let buf_slice = match codec { + Some(codec) => { + codec.compress(buffer.as_slice(), &mut compressed)?; + compressed.as_slice() + } + None => buffer.as_slice(), + }; + let len = buf_slice.len(); let pad_len = pad_to_8(len as u32); let total_len: i64 = (len + pad_len) as i64; // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes"); buffers.push(ipc::Buffer::new(offset, total_len)); - arrow_data.extend_from_slice(buffer.as_slice()); + arrow_data.extend_from_slice(buf_slice); arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); - offset + total_len + Ok(offset + total_len) } /// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes @@ -733,15 +792,17 @@ fn pad_to_8(len: u32) -> usize { mod tests { use super::*; + use std::fs::File; + use std::io::Read; + use std::sync::Arc; + use flate2::read::GzDecoder; + use ipc::MetadataVersion; use crate::array::*; use crate::datatypes::Field; use crate::ipc::reader::*; use crate::util::integration_util::*; - use std::fs::File; - use std::io::Read; - use std::sync::Arc; #[test] fn test_write_file() { @@ -790,8 +851,7 @@ mod tests { } } - #[test] - fn test_write_null_file() { + fn write_null_file(options: IpcWriteOptions, suffix: &str) { let schema = Schema::new(vec![ Field::new("nulls", DataType::Null, true), Field::new("int32s", DataType::Int32, false), @@ -812,16 +872,18 @@ mod tests { ], ) .unwrap(); + let file_name = format!("target/debug/testdata/nulls_{}.arrow_file", suffix); { - let file = File::create("target/debug/testdata/nulls.arrow_file").unwrap(); - let mut writer = FileWriter::try_new(file, &schema).unwrap(); + let file = File::create(&file_name).unwrap(); + let mut writer = + FileWriter::try_new_with_options(file, &schema, options).unwrap(); writer.write(&batch).unwrap(); // this is inside a block to test the implicit finishing of the file on `Drop` } { - let file = File::open("target/debug/testdata/nulls.arrow_file").unwrap(); + let file = File::open(&file_name).unwrap(); let reader = FileReader::try_new(file).unwrap(); reader.for_each(|maybe_batch| { maybe_batch @@ -837,10 +899,87 @@ mod tests { }); } } + #[test] + fn test_write_null_file_v4() { + write_null_file( + IpcWriteOptions::try_new(8, false, MetadataVersion::V4, None).unwrap(), + "v4_a8", + ); + write_null_file( + IpcWriteOptions::try_new(8, true, MetadataVersion::V4, None).unwrap(), + "v4_a8l", + ); + write_null_file( + IpcWriteOptions::try_new(64, false, MetadataVersion::V4, None).unwrap(), + "v4_a64", + ); + write_null_file( + IpcWriteOptions::try_new(64, true, MetadataVersion::V4, None).unwrap(), + "v4_a64l", + ); + } + + #[test] + fn test_write_null_file_v5() { + write_null_file( + IpcWriteOptions::try_new(8, false, MetadataVersion::V5, None).unwrap(), + "v5_a8", + ); + write_null_file( + IpcWriteOptions::try_new(64, false, MetadataVersion::V5, None).unwrap(), + "v5_a64", + ); + } #[test] - fn read_and_rewrite_generated_files() { + fn test_write_file_v5_compressed() { let testdata = crate::util::test_util::arrow_test_data(); + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/1.0.0-littleendian/generated_primitive.arrow_file", + testdata + )).unwrap(); + let options = IpcWriteOptions::try_new( + 64, + false, + MetadataVersion::V5, + Some(ipc::CompressionType::LZ4_FRAME), + ) + .unwrap(); + let reader = FileReader::try_new(file).unwrap(); + let filepath = "target/debug/testdata/primitive_lz4.arrow_file"; + let file = File::create(&filepath).unwrap(); + let mut writer = + FileWriter::try_new_with_options(file, &reader.schema(), options).unwrap(); + let batches = reader.collect::>>().unwrap(); + for b in &batches { + writer.write(b).unwrap(); + } + // close writer + writer.finish().unwrap(); + + // read written + let file = File::open(&filepath).unwrap(); + let reader = FileReader::try_new(file).unwrap(); + let decompressed_batches = reader.collect::>>().unwrap(); + assert_eq!(batches.len(), decompressed_batches.len()); + for (a, b) in batches.iter().zip(decompressed_batches) { + assert_eq!(a.schema(), b.schema()); + assert_eq!(a.num_columns(), b.num_columns()); + assert_eq!(a.num_rows(), b.num_rows()); + a.columns() + .iter() + .zip(b.columns()) + .for_each(|(a_col, b_col)| { + println!("A: {:?}, \nB: {:?}", a_col, b_col); + assert_eq!(a_col, b_col); + }); + } + } + + #[test] + fn read_and_rewrite_generated_files_014() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", @@ -854,8 +993,8 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.arrow_file", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path )) .unwrap(); @@ -863,9 +1002,11 @@ mod tests { // read and rewrite the file to a temp location { - let file = - File::create(format!("target/debug/testdata/{}.arrow_file", path)) - .unwrap(); + let file = File::create(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); let mut writer = FileWriter::try_new(file, &reader.schema()).unwrap(); while let Some(Ok(batch)) = reader.next() { writer.write(&batch).unwrap(); @@ -873,19 +1014,23 @@ mod tests { writer.finish().unwrap(); } - let file = - File::open(format!("target/debug/testdata/{}.arrow_file", path)).unwrap(); + let file = File::open(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); let mut reader = FileReader::try_new(file).unwrap(); // read expected JSON output - let arrow_json = read_gzip_json(path); + let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader)); }); } #[test] - fn read_and_rewrite_generated_streams() { + fn read_and_rewrite_generated_streams_014() { let testdata = crate::util::test_util::arrow_test_data(); + let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", @@ -899,8 +1044,8 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.stream", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path )) .unwrap(); @@ -908,8 +1053,11 @@ mod tests { // read and rewrite the stream to a temp location { - let file = File::create(format!("target/debug/testdata/{}.stream", path)) - .unwrap(); + let file = File::create(format!( + "target/debug/testdata/{}-{}.stream", + version, path + )) + .unwrap(); let mut writer = StreamWriter::try_new(file, &reader.schema()).unwrap(); reader.for_each(|batch| { writer.write(&batch.unwrap()).unwrap(); @@ -918,21 +1066,149 @@ mod tests { } let file = - File::open(format!("target/debug/testdata/{}.stream", path)).unwrap(); + File::open(format!("target/debug/testdata/{}-{}.stream", version, path)) + .unwrap(); + let mut reader = StreamReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + + #[test] + fn read_and_rewrite_generated_files_100() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_custom_metadata", + "generated_datetime", + "generated_dictionary_unsigned", + "generated_dictionary", + // "generated_duplicate_fieldnames", + "generated_interval", + "generated_large_batch", + "generated_nested", + // "generated_nested_large_offsets", + "generated_null_trivial", + "generated_null", + "generated_primitive_large_offsets", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + // "generated_recursive_nested", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file).unwrap(); + + // read and rewrite the file to a temp location + { + let file = File::create(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); + // write IPC version 5 + let options = + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5, None) + .unwrap(); + let mut writer = + FileWriter::try_new_with_options(file, &reader.schema(), options) + .unwrap(); + while let Some(Ok(batch)) = reader.next() { + writer.write(&batch).unwrap(); + } + writer.finish().unwrap(); + } + + let file = File::open(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); + let mut reader = FileReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + + #[test] + fn read_and_rewrite_generated_streams_100() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_custom_metadata", + "generated_datetime", + "generated_dictionary_unsigned", + "generated_dictionary", + // "generated_duplicate_fieldnames", + "generated_interval", + "generated_large_batch", + "generated_nested", + // "generated_nested_large_offsets", + "generated_null_trivial", + "generated_null", + "generated_primitive_large_offsets", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + // "generated_recursive_nested", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let reader = StreamReader::try_new(file).unwrap(); + + // read and rewrite the stream to a temp location + { + let file = File::create(format!( + "target/debug/testdata/{}-{}.stream", + version, path + )) + .unwrap(); + let options = + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5, None) + .unwrap(); + let mut writer = + StreamWriter::try_new_with_options(file, &reader.schema(), options) + .unwrap(); + reader.for_each(|batch| { + writer.write(&batch.unwrap()).unwrap(); + }); + writer.finish().unwrap(); + } + + let file = + File::open(format!("target/debug/testdata/{}-{}.stream", version, path)) + .unwrap(); let mut reader = StreamReader::try_new(file).unwrap(); // read expected JSON output - let arrow_json = read_gzip_json(path); + let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader)); }); } /// Read gzipped JSON file - fn read_gzip_json(path: &str) -> ArrowJson { + fn read_gzip_json(version: &str, path: &str) -> ArrowJson { let testdata = crate::util::test_util::arrow_test_data(); let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.json.gz", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.json.gz", + testdata, version, path )) .unwrap(); let mut gz = GzDecoder::new(&file); diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index 6a6e7ea77de..71498aeea78 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -203,8 +203,10 @@ impl ArrowJsonBatch { let json_array: Vec = json_from_col(&col, field.data_type()); match field.data_type() { DataType::Null => { - let arr = arr.as_any().downcast_ref::().unwrap(); - arr.equals_json(&json_array.iter().collect::>()[..]) + let arr: &NullArray = + arr.as_any().downcast_ref::().unwrap(); + // NullArrays should have the same length, json_array is empty + arr.len() == col.count } DataType::Boolean => { let arr = arr.as_any().downcast_ref::().unwrap(); @@ -504,6 +506,7 @@ fn json_from_col(col: &ArrowJsonColumn, data_type: &DataType) -> Vec { converted_col.as_slice(), ) } + DataType::Null => vec![], _ => merge_json_array( col.validity.as_ref().unwrap().as_slice(), &col.data.clone().unwrap(),