From e6cd6fb62f93229e334ae1e2e22d4774c42c289d Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Thu, 3 Dec 2020 07:47:31 +0200 Subject: [PATCH 1/5] ARROW-10299: [Rust] Add IPC V5 tests, set V5 as default --- rust/arrow/src/ipc/reader.rs | 94 ++++++++++++++--- rust/arrow/src/ipc/writer.rs | 190 ++++++++++++++++++++++++++++++----- 2 files changed, 247 insertions(+), 37 deletions(-) diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index d3f282961d1..1ce83b3e93c 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -562,6 +562,7 @@ impl FileReader { let mut footer_data = vec![0; footer_len as usize]; reader.seek(SeekFrom::End(-10 - footer_len as i64))?; reader.read_exact(&mut footer_data)?; + dbg!(&footer_data); let footer = ipc::root_as_footer(&footer_data[..]).map_err(|err| { ArrowError::IoError(format!("Unable to get root as footer: {:?}", err)) @@ -923,10 +924,12 @@ mod tests { use crate::util::integration_util::*; use std::fs::File; + use std::env; #[test] - fn read_generated_files() { - let testdata = crate::util::test_util::arrow_test_data(); + fn read_generated_files_014() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", @@ -940,15 +943,15 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.arrow_file", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path )) .unwrap(); let mut reader = FileReader::try_new(file).unwrap(); // read expected JSON output - let arrow_json = read_gzip_json(path); + let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader)); }); } @@ -990,8 +993,9 @@ mod tests { } #[test] - fn read_generated_streams() { - let testdata = crate::util::test_util::arrow_test_data(); + fn read_generated_streams_014() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", @@ -1005,15 +1009,77 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.stream", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path )) .unwrap(); let mut reader = StreamReader::try_new(file).unwrap(); // read expected JSON output - let arrow_json = read_gzip_json(path); + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + // the next batch must be empty + assert!(reader.next().is_none()); + // the stream must indicate that it's finished + assert!(reader.is_finished()); + }); + } + + #[test] + fn read_generated_files_100() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_dictionary", + "generated_nested", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + + #[test] + fn read_generated_streams_100() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_dictionary", + "generated_nested", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let mut reader = StreamReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader)); // the next batch must be empty assert!(reader.next().is_none()); @@ -1072,11 +1138,11 @@ mod tests { } /// Read gzipped JSON file - fn read_gzip_json(path: &str) -> ArrowJson { - let testdata = crate::util::test_util::arrow_test_data(); + fn read_gzip_json(version: &str, path: &str) -> ArrowJson { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.json.gz", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.json.gz", + testdata, version, path )) .unwrap(); let mut gz = GzDecoder::new(&file); diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index fdec26c1b79..cc89abfab6d 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -44,6 +44,13 @@ pub struct IpcWriteOptions { /// The legacy format is for releases before 0.15.0, and uses metadata V4 write_legacy_ipc_format: bool, /// The metadata version to write. The Rust IPC writer supports V4+ + /// + /// *Default versions per crate* + /// + /// When creating the default IpcWriteOptions, the following metadata versions are used: + /// + /// version 2.0.0: V4 + /// version 3.0.0: V4 metadata_version: ipc::MetadataVersion, } @@ -94,7 +101,7 @@ impl Default for IpcWriteOptions { Self { alignment: 8, write_legacy_ipc_format: true, - metadata_version: ipc::MetadataVersion::V4, + metadata_version: ipc::MetadataVersion::V5, } } } @@ -740,6 +747,7 @@ mod tests { use crate::ipc::reader::*; use crate::util::integration_util::*; use std::fs::File; + use std::env; use std::io::Read; use std::sync::Arc; @@ -839,8 +847,9 @@ mod tests { } #[test] - fn read_and_rewrite_generated_files() { - let testdata = crate::util::test_util::arrow_test_data(); + fn read_and_rewrite_generated_files_014() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", @@ -854,8 +863,8 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.arrow_file", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path )) .unwrap(); @@ -863,9 +872,11 @@ mod tests { // read and rewrite the file to a temp location { - let file = - File::create(format!("target/debug/testdata/{}.arrow_file", path)) - .unwrap(); + let file = File::create(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); let mut writer = FileWriter::try_new(file, &reader.schema()).unwrap(); while let Some(Ok(batch)) = reader.next() { writer.write(&batch).unwrap(); @@ -873,19 +884,23 @@ mod tests { writer.finish().unwrap(); } - let file = - File::open(format!("target/debug/testdata/{}.arrow_file", path)).unwrap(); + let file = File::open(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); let mut reader = FileReader::try_new(file).unwrap(); // read expected JSON output - let arrow_json = read_gzip_json(path); + let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader)); }); } #[test] - fn read_and_rewrite_generated_streams() { - let testdata = crate::util::test_util::arrow_test_data(); + fn read_and_rewrite_generated_streams_014() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", @@ -899,8 +914,8 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.stream", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path )) .unwrap(); @@ -908,8 +923,11 @@ mod tests { // read and rewrite the stream to a temp location { - let file = File::create(format!("target/debug/testdata/{}.stream", path)) - .unwrap(); + let file = File::create(format!( + "target/debug/testdata/{}-{}.stream", + version, path + )) + .unwrap(); let mut writer = StreamWriter::try_new(file, &reader.schema()).unwrap(); reader.for_each(|batch| { writer.write(&batch.unwrap()).unwrap(); @@ -918,21 +936,147 @@ mod tests { } let file = - File::open(format!("target/debug/testdata/{}.stream", path)).unwrap(); + File::open(format!("target/debug/testdata/{}-{}.stream", version, path)) + .unwrap(); + let mut reader = StreamReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + + #[test] + fn read_and_rewrite_generated_files_100() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_custom_metadata", + "generated_datetime", + "generated_dictionary_unsigned", + "generated_dictionary", + // "generated_duplicate_fieldnames", + "generated_interval", + "generated_large_batch", + "generated_nested", + // "generated_nested_large_offsets", + // "generated_null_trivial", + // "generated_null", + "generated_primitive_large_offsets", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + // "generated_recursive_nested", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file).unwrap(); + + // read and rewrite the file to a temp location + { + let file = File::create(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); + // write IPC version 5 + let options = + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap(); + let mut writer = + FileWriter::try_new_with_options(file, &reader.schema(), options) + .unwrap(); + while let Some(Ok(batch)) = reader.next() { + writer.write(&batch).unwrap(); + } + writer.finish().unwrap(); + } + + let file = File::open(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); + let mut reader = FileReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + + #[test] + fn read_and_rewrite_generated_streams_100() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_custom_metadata", + "generated_datetime", + "generated_dictionary_unsigned", + "generated_dictionary", + // "generated_duplicate_fieldnames", + "generated_interval", + "generated_large_batch", + "generated_nested", + // "generated_nested_large_offsets", + // "generated_null_trivial", + // "generated_null", + "generated_primitive_large_offsets", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + // "generated_recursive_nested", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let reader = StreamReader::try_new(file).unwrap(); + + // read and rewrite the stream to a temp location + { + let file = File::create(format!( + "target/debug/testdata/{}-{}.stream", + version, path + )) + .unwrap(); + let options = + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap(); + let mut writer = + StreamWriter::try_new_with_options(file, &reader.schema(), options) + .unwrap(); + reader.for_each(|batch| { + writer.write(&batch.unwrap()).unwrap(); + }); + writer.finish().unwrap(); + } + + let file = + File::open(format!("target/debug/testdata/{}-{}.stream", version, path)) + .unwrap(); let mut reader = StreamReader::try_new(file).unwrap(); // read expected JSON output - let arrow_json = read_gzip_json(path); + let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader)); }); } /// Read gzipped JSON file - fn read_gzip_json(path: &str) -> ArrowJson { - let testdata = crate::util::test_util::arrow_test_data(); + fn read_gzip_json(version: &str, path: &str) -> ArrowJson { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.json.gz", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.json.gz", + testdata, version, path )) .unwrap(); let mut gz = GzDecoder::new(&file); From ccee6a36f958619093a2a841ac11d3eb13c9c479 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Thu, 7 Jan 2021 10:30:09 +0200 Subject: [PATCH 2/5] Add tests for different padding lengths --- rust/arrow/src/ipc/reader.rs | 15 ++++----- rust/arrow/src/ipc/writer.rs | 63 +++++++++++++++++++++++++++--------- 2 files changed, 55 insertions(+), 23 deletions(-) diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index 1ce83b3e93c..26b4d0c30bc 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -562,7 +562,6 @@ impl FileReader { let mut footer_data = vec![0; footer_len as usize]; reader.seek(SeekFrom::End(-10 - footer_len as i64))?; reader.read_exact(&mut footer_data)?; - dbg!(&footer_data); let footer = ipc::root_as_footer(&footer_data[..]).map_err(|err| { ArrowError::IoError(format!("Unable to get root as footer: {:?}", err)) @@ -920,15 +919,15 @@ impl RecordBatchReader for StreamReader { mod tests { use super::*; + use std::fs::File; + use flate2::read::GzDecoder; use crate::util::integration_util::*; - use std::fs::File; - use std::env; #[test] fn read_generated_files_014() { - let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let testdata = crate::util::test_util::arrow_test_data(); let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ @@ -994,7 +993,7 @@ mod tests { #[test] fn read_generated_streams_014() { - let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let testdata = crate::util::test_util::arrow_test_data(); let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ @@ -1028,7 +1027,7 @@ mod tests { #[test] fn read_generated_files_100() { - let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let testdata = crate::util::test_util::arrow_test_data(); let version = "1.0.0-littleendian"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ @@ -1057,7 +1056,7 @@ mod tests { #[test] fn read_generated_streams_100() { - let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let testdata = crate::util::test_util::arrow_test_data(); let version = "1.0.0-littleendian"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ @@ -1139,7 +1138,7 @@ mod tests { /// Read gzipped JSON file fn read_gzip_json(version: &str, path: &str) -> ArrowJson { - let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let testdata = crate::util::test_util::arrow_test_data(); let file = File::open(format!( "{}/arrow-ipc-stream/integration/{}/{}.json.gz", testdata, version, path diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index cc89abfab6d..4f7ec77c9d7 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -100,7 +100,7 @@ impl Default for IpcWriteOptions { fn default() -> Self { Self { alignment: 8, - write_legacy_ipc_format: true, + write_legacy_ipc_format: false, metadata_version: ipc::MetadataVersion::V5, } } @@ -740,16 +740,17 @@ fn pad_to_8(len: u32) -> usize { mod tests { use super::*; + use std::fs::File; + use std::io::Read; + use std::sync::Arc; + use flate2::read::GzDecoder; + use ipc::MetadataVersion; use crate::array::*; use crate::datatypes::Field; use crate::ipc::reader::*; use crate::util::integration_util::*; - use std::fs::File; - use std::env; - use std::io::Read; - use std::sync::Arc; #[test] fn test_write_file() { @@ -798,8 +799,7 @@ mod tests { } } - #[test] - fn test_write_null_file() { + fn write_null_file(options: IpcWriteOptions, suffix: &str) { let schema = Schema::new(vec![ Field::new("nulls", DataType::Null, true), Field::new("int32s", DataType::Int32, false), @@ -820,16 +820,18 @@ mod tests { ], ) .unwrap(); + let file_name = format!("target/debug/testdata/nulls_{}.arrow_file", suffix); { - let file = File::create("target/debug/testdata/nulls.arrow_file").unwrap(); - let mut writer = FileWriter::try_new(file, &schema).unwrap(); + let file = File::create(&file_name).unwrap(); + let mut writer = + FileWriter::try_new_with_options(file, &schema, options).unwrap(); writer.write(&batch).unwrap(); // this is inside a block to test the implicit finishing of the file on `Drop` } { - let file = File::open("target/debug/testdata/nulls.arrow_file").unwrap(); + let file = File::open(&file_name).unwrap(); let reader = FileReader::try_new(file).unwrap(); reader.for_each(|maybe_batch| { maybe_batch @@ -845,10 +847,41 @@ mod tests { }); } } + #[test] + fn test_write_null_file_v4() { + write_null_file( + IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap(), + "v4_a8", + ); + write_null_file( + IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap(), + "v4_a8l", + ); + write_null_file( + IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap(), + "v4_a64", + ); + write_null_file( + IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap(), + "v4_a64l", + ); + } + + #[test] + fn test_write_null_file_v5() { + write_null_file( + IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap(), + "v5_a8", + ); + write_null_file( + IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap(), + "v5_a64", + ); + } #[test] fn read_and_rewrite_generated_files_014() { - let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let testdata = crate::util::test_util::arrow_test_data(); let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ @@ -899,7 +932,7 @@ mod tests { #[test] fn read_and_rewrite_generated_streams_014() { - let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let testdata = crate::util::test_util::arrow_test_data(); let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ @@ -948,7 +981,7 @@ mod tests { #[test] fn read_and_rewrite_generated_files_100() { - let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let testdata = crate::util::test_util::arrow_test_data(); let version = "1.0.0-littleendian"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ @@ -1012,7 +1045,7 @@ mod tests { #[test] fn read_and_rewrite_generated_streams_100() { - let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let testdata = crate::util::test_util::arrow_test_data(); let version = "1.0.0-littleendian"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ @@ -1073,7 +1106,7 @@ mod tests { /// Read gzipped JSON file fn read_gzip_json(version: &str, path: &str) -> ArrowJson { - let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let testdata = crate::util::test_util::arrow_test_data(); let file = File::open(format!( "{}/arrow-ipc-stream/integration/{}/{}.json.gz", testdata, version, path From 2af63f319dfdce4ccc7d67a80f25fa8c48063595 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Thu, 7 Jan 2021 10:33:08 +0200 Subject: [PATCH 3/5] update doc --- rust/arrow/src/ipc/writer.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 4f7ec77c9d7..68ef6592d2f 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -49,8 +49,8 @@ pub struct IpcWriteOptions { /// /// When creating the default IpcWriteOptions, the following metadata versions are used: /// - /// version 2.0.0: V4 - /// version 3.0.0: V4 + /// version 2.0.0: V4, with legacy format enabled + /// version 3.0.0: V5 metadata_version: ipc::MetadataVersion, } From bd2e13ebf8d09c7e25275d7fc65908f42f1bcae3 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Thu, 7 Jan 2021 12:33:46 +0200 Subject: [PATCH 4/5] add null tests --- rust/arrow/src/array/array_struct.rs | 3 +++ rust/arrow/src/ipc/reader.rs | 6 ++++++ rust/arrow/src/ipc/writer.rs | 8 ++++---- rust/arrow/src/util/integration_util.rs | 7 +++++-- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/rust/arrow/src/array/array_struct.rs b/rust/arrow/src/array/array_struct.rs index 50e7eea3db6..eeae29987f9 100644 --- a/rust/arrow/src/array/array_struct.rs +++ b/rust/arrow/src/array/array_struct.rs @@ -69,6 +69,9 @@ impl StructArray { } /// Return child array whose field name equals to column_name + /// + /// Note: The Arrow specification allows for duplicate field names, and in such + /// case, this function will return the first column with the specified name. pub fn column_by_name(&self, column_name: &str) -> Option<&ArrayRef> { self.column_names() .iter() diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index 26b4d0c30bc..efe1d72f85b 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -976,6 +976,8 @@ mod tests { "generated_datetime", "generated_dictionary", "generated_nested", + "generated_null_trivial", + "generated_null", "generated_primitive_no_batches", "generated_primitive_zerolength", "generated_primitive", @@ -1035,6 +1037,8 @@ mod tests { "generated_datetime", "generated_dictionary", "generated_nested", + "generated_null_trivial", + "generated_null", "generated_primitive_no_batches", "generated_primitive_zerolength", "generated_primitive", @@ -1064,6 +1068,8 @@ mod tests { "generated_datetime", "generated_dictionary", "generated_nested", + "generated_null_trivial", + "generated_null", "generated_primitive_no_batches", "generated_primitive_zerolength", "generated_primitive", diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 68ef6592d2f..277c7abe328 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -994,8 +994,8 @@ mod tests { "generated_large_batch", "generated_nested", // "generated_nested_large_offsets", - // "generated_null_trivial", - // "generated_null", + "generated_null_trivial", + "generated_null", "generated_primitive_large_offsets", "generated_primitive_no_batches", "generated_primitive_zerolength", @@ -1058,8 +1058,8 @@ mod tests { "generated_large_batch", "generated_nested", // "generated_nested_large_offsets", - // "generated_null_trivial", - // "generated_null", + "generated_null_trivial", + "generated_null", "generated_primitive_large_offsets", "generated_primitive_no_batches", "generated_primitive_zerolength", diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index 6a6e7ea77de..71498aeea78 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -203,8 +203,10 @@ impl ArrowJsonBatch { let json_array: Vec = json_from_col(&col, field.data_type()); match field.data_type() { DataType::Null => { - let arr = arr.as_any().downcast_ref::().unwrap(); - arr.equals_json(&json_array.iter().collect::>()[..]) + let arr: &NullArray = + arr.as_any().downcast_ref::().unwrap(); + // NullArrays should have the same length, json_array is empty + arr.len() == col.count } DataType::Boolean => { let arr = arr.as_any().downcast_ref::().unwrap(); @@ -504,6 +506,7 @@ fn json_from_col(col: &ArrowJsonColumn, data_type: &DataType) -> Vec { converted_col.as_slice(), ) } + DataType::Null => vec![], _ => merge_json_array( col.validity.as_ref().unwrap().as_slice(), &col.data.clone().unwrap(), From 0bdeb97b02f0abd770ceff5295fedd102cf4aa6f Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Fri, 8 Jan 2021 17:18:10 +0200 Subject: [PATCH 5/5] ARROW-8676: [Rust] IPC RecordBatch body compression Creates body compression, with LZ4_FRAME supported. This depends on ARROW-10299 being merged first. --- rust/arrow/Cargo.toml | 1 + rust/arrow/src/ipc/compression.rs | 122 +++++++++++++++++++ rust/arrow/src/ipc/mod.rs | 1 + rust/arrow/src/ipc/reader.rs | 47 ++++++-- rust/arrow/src/ipc/writer.rs | 187 +++++++++++++++++++++++------- 5 files changed, 305 insertions(+), 53 deletions(-) create mode 100644 rust/arrow/src/ipc/compression.rs diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index 028444bd113..e34d061198b 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -51,6 +51,7 @@ flatbuffers = "^0.8" hex = "0.4" prettytable-rs = { version = "0.8.0", optional = true } lexical-core = "^0.7" +lz4 = "1.23" [features] default = [] diff --git a/rust/arrow/src/ipc/compression.rs b/rust/arrow/src/ipc/compression.rs new file mode 100644 index 00000000000..f17d9eac3fa --- /dev/null +++ b/rust/arrow/src/ipc/compression.rs @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! IPC Compression Utilities + +use std::io::{Read, Write}; + +use crate::error::{ArrowError, Result}; +use crate::ipc::gen::Message::CompressionType; + +pub trait IpcCompressionCodec { + fn compress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()>; + fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result; + fn get_compression_type(&self) -> CompressionType; +} + +pub type Codec = Box; + +#[inline] +pub(crate) fn get_codec( + compression_type: Option, +) -> Result> { + match compression_type { + Some(CompressionType::LZ4_FRAME) => Ok(Some(Box::new(Lz4CompressionCodec {}))), + Some(ctype) => Err(ArrowError::InvalidArgumentError(format!( + "IPC CompresstionType {:?} not yet supported", + ctype + ))), + None => Ok(None), + } +} + +pub struct Lz4CompressionCodec {} +const LZ4_BUFFER_SIZE: usize = 4096; + +impl IpcCompressionCodec for Lz4CompressionCodec { + fn compress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { + if input_buf.is_empty() { + output_buf.write_all(&8i64.to_le_bytes())?; + return Ok(()); + } + // write out the uncompressed length as a LE i64 value + output_buf.write_all(&(input_buf.len() as i64).to_le_bytes())?; + let mut encoder = lz4::EncoderBuilder::new().build(output_buf)?; + let mut from = 0; + loop { + let to = std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len()); + encoder.write_all(&input_buf[from..to])?; + from += LZ4_BUFFER_SIZE; + if from >= input_buf.len() { + break; + } + } + Ok(encoder.finish().1?) + } + + fn decompress(&self, input_buf: &[u8], output_buf: &mut Vec) -> Result { + if input_buf.is_empty() { + return Ok(0); + } + let mut decoder = lz4::Decoder::new(&input_buf[8..])?; + let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE]; + let mut total_len = 0; + loop { + let len = decoder.read(&mut buffer)?; + if len == 0 { + break; + } + total_len += len; + output_buf.write_all(&buffer[0..len])?; + } + decoder.finish().1?; + Ok(total_len) + } + + fn get_compression_type(&self) -> CompressionType { + CompressionType::LZ4_FRAME + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use rand::Rng; + + use crate::util::test_util::seedable_rng; + + const INPUT_BUFFER_LEN: usize = 256; + + #[test] + fn test_lz4_roundtrip() { + let mut rng = seedable_rng(); + let mut bytes: Vec = Vec::with_capacity(INPUT_BUFFER_LEN); + + (0..INPUT_BUFFER_LEN).for_each(|_| { + bytes.push(rng.gen::()); + }); + + let codec = Lz4CompressionCodec {}; + let mut compressed = Vec::new(); + codec.compress(&bytes, &mut compressed).unwrap(); + + let mut decompressed = Vec::new(); + let _ = codec.decompress(&compressed, &mut decompressed).unwrap(); + assert_eq!(decompressed, bytes); + } +} diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs index a2d7103aacf..7b7ad070294 100644 --- a/rust/arrow/src/ipc/mod.rs +++ b/rust/arrow/src/ipc/mod.rs @@ -18,6 +18,7 @@ // TODO: (vcq): Protobuf codegen is not generating Debug impls. #![allow(missing_debug_implementations)] +pub mod compression; pub mod convert; pub mod reader; pub mod writer; diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index efe1d72f85b..a344e558b44 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -32,15 +32,24 @@ use crate::error::{ArrowError, Result}; use crate::ipc; use crate::record_batch::{RecordBatch, RecordBatchReader}; +use ipc::compression::{get_codec, Codec}; use ipc::CONTINUATION_MARKER; use DataType::*; /// Read a buffer based on offset and length -fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer { +fn read_buffer(buf: &ipc::Buffer, a_data: &[u8], codec: Option<&Codec>) -> Buffer { let start_offset = buf.offset() as usize; let end_offset = start_offset + buf.length() as usize; let buf_data = &a_data[start_offset..end_offset]; - Buffer::from(&buf_data) + // decompress the buffer if it is compressed + match codec { + Some(codec) => { + let mut buf = Vec::new(); + codec.decompress(buf_data, &mut buf).unwrap(); + Buffer::from(buf) + } + None => Buffer::from(&buf_data), + } } /// Coordinates reading arrays based on data types. @@ -52,6 +61,7 @@ fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer { /// - check if the bit width of non-64-bit numbers is 64, and /// - read the buffer as 64-bit (signed integer or float), and /// - cast the 64-bit array to the appropriate data type +#[allow(clippy::clippy::too_many_arguments)] fn create_array( nodes: &[ipc::FieldNode], data_type: &DataType, @@ -60,6 +70,7 @@ fn create_array( dictionaries: &[Option], mut node_index: usize, mut buffer_index: usize, + codec: Option<&Codec>, ) -> (ArrayRef, usize, usize) { use DataType::*; let array = match data_type { @@ -69,7 +80,7 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 3] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, codec)) .collect(), ); node_index += 1; @@ -82,7 +93,7 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, codec)) .collect(), ); node_index += 1; @@ -93,7 +104,7 @@ fn create_array( let list_node = &nodes[node_index]; let list_buffers: Vec = buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, codec)) .collect(); node_index += 1; buffer_index += 2; @@ -105,6 +116,7 @@ fn create_array( dictionaries, node_index, buffer_index, + codec, ); node_index = triple.1; buffer_index = triple.2; @@ -115,7 +127,7 @@ fn create_array( let list_node = &nodes[node_index]; let list_buffers: Vec = buffers[buffer_index..=buffer_index] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, codec)) .collect(); node_index += 1; buffer_index += 1; @@ -127,6 +139,7 @@ fn create_array( dictionaries, node_index, buffer_index, + codec, ); node_index = triple.1; buffer_index = triple.2; @@ -135,7 +148,7 @@ fn create_array( } Struct(struct_fields) => { let struct_node = &nodes[node_index]; - let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data); + let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data, codec); node_index += 1; buffer_index += 1; @@ -152,6 +165,7 @@ fn create_array( dictionaries, node_index, buffer_index, + codec, ); node_index = triple.1; buffer_index = triple.2; @@ -171,7 +185,7 @@ fn create_array( let index_node = &nodes[node_index]; let index_buffers: Vec = buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, codec)) .collect(); let value_array = dictionaries[node_index].clone().unwrap(); node_index += 1; @@ -200,7 +214,7 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, codec)) .collect(), ); node_index += 1; @@ -424,6 +438,20 @@ pub fn read_record_batch( let mut node_index = 0; let mut arrays = vec![]; + let codec: Option = match batch.compression() { + Some(body_compression) => { + let method = body_compression.method(); + if ipc::BodyCompressionMethod::BUFFER != method { + return Err(ArrowError::IoError(format!( + "Encountered unsupported body compression method: {:?}", + method + ))); + } + get_codec(Some(body_compression.codec()))? + } + None => None, + }; + // keep track of index as lists require more than one node for field in schema.fields() { let triple = create_array( @@ -434,6 +462,7 @@ pub fn read_record_batch( dictionaries, node_index, buffer_index, + codec.as_ref(), ); node_index = triple.1; buffer_index = triple.2; diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 277c7abe328..2af71fabb11 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -33,6 +33,7 @@ use crate::ipc; use crate::record_batch::RecordBatch; use crate::util::bit_util; +use ipc::compression::{get_codec, Codec}; use ipc::CONTINUATION_MARKER; /// IPC write options used to control the behaviour of the writer @@ -52,6 +53,13 @@ pub struct IpcWriteOptions { /// version 2.0.0: V4, with legacy format enabled /// version 3.0.0: V5 metadata_version: ipc::MetadataVersion, + /// Compression type to use. + /// + /// Compression is currently only supported when writing metadata version V5, though + /// it might be possible to read non-legacy V4 buffers that are compressed. + /// The compression formats supported as of v3.0.0 of the library are: + /// - LZ4_FRAME + compression_type: Option, } impl IpcWriteOptions { @@ -60,6 +68,7 @@ impl IpcWriteOptions { alignment: usize, write_legacy_ipc_format: bool, metadata_version: ipc::MetadataVersion, + compression_type: Option, ) -> Result { if alignment == 0 || alignment % 8 != 0 { return Err(ArrowError::InvalidArgumentError( @@ -72,24 +81,32 @@ impl IpcWriteOptions { | ipc::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError( "Writing IPC metadata version 3 and lower not supported".to_string(), )), - ipc::MetadataVersion::V4 => Ok(Self { - alignment, - write_legacy_ipc_format, - metadata_version, - }), + ipc::MetadataVersion::V4 => { + if compression_type.is_some() { + return Err(ArrowError::InvalidArgumentError( + "Writing IPC metadata version 4 with compressed buffers is currently not supported".to_string() + )); + } + Ok(Self { + alignment, + write_legacy_ipc_format, + metadata_version, + compression_type, + }) + } ipc::MetadataVersion::V5 => { if write_legacy_ipc_format { - Err(ArrowError::InvalidArgumentError( + return Err(ArrowError::InvalidArgumentError( "Legacy IPC format only supported on metadata version 4" .to_string(), - )) - } else { - Ok(Self { - alignment, - write_legacy_ipc_format, - metadata_version, - }) + )); } + Ok(Self { + alignment, + write_legacy_ipc_format, + metadata_version, + compression_type, + }) } z => panic!("Unsupported ipc::MetadataVersion {:?}", z), } @@ -102,6 +119,7 @@ impl Default for IpcWriteOptions { alignment: 8, write_legacy_ipc_format: false, metadata_version: ipc::MetadataVersion::V5, + compression_type: None, } } } @@ -147,6 +165,9 @@ impl IpcDataGenerator { let schema = batch.schema(); let mut encoded_dictionaries = Vec::with_capacity(schema.fields().len()); + let codec = get_codec(write_options.compression_type)?; + let codec_ref = codec.as_ref(); + for (i, field) in schema.fields().iter().enumerate() { let column = batch.column(i); @@ -164,12 +185,13 @@ impl IpcDataGenerator { dict_id, dict_values, write_options, - )); + codec_ref, + )?); } } } - let encoded_message = self.record_batch_to_bytes(batch, write_options); + let encoded_message = self.record_batch_to_bytes(batch, write_options)?; Ok((encoded_dictionaries, encoded_message)) } @@ -180,13 +202,14 @@ impl IpcDataGenerator { &self, batch: &RecordBatch, write_options: &IpcWriteOptions, - ) -> EncodedData { + ) -> Result { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; let mut buffers: Vec = vec![]; let mut arrow_data: Vec = vec![]; let mut offset = 0; + let codec = get_codec(write_options.compression_type)?; for array in batch.columns() { let array_data = array.data(); offset = write_array_data( @@ -197,18 +220,32 @@ impl IpcDataGenerator { offset, array.len(), array.null_count(), - ); + codec.as_ref(), + )?; } // write data let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); + let compression = { + if let Some(compression_type) = &write_options.compression_type { + let mut c = ipc::BodyCompressionBuilder::new(&mut fbb); + c.add_method(ipc::BodyCompressionMethod::BUFFER); + c.add_codec(*compression_type); + Some(c.finish()) + } else { + None + } + }; let root = { let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); batch_builder.add_length(batch.num_rows() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); + if let Some(c) = compression { + batch_builder.add_compression(c); + } let b = batch_builder.finish(); b.as_union_value() }; @@ -222,10 +259,10 @@ impl IpcDataGenerator { fbb.finish(root, None); let finished_data = fbb.finished_data(); - EncodedData { + Ok(EncodedData { ipc_message: finished_data.to_vec(), arrow_data, - } + }) } /// Write dictionary values into two sets of bytes, one for the header (ipc::Message) and the @@ -235,7 +272,8 @@ impl IpcDataGenerator { dict_id: i64, array_data: &ArrayDataRef, write_options: &IpcWriteOptions, - ) -> EncodedData { + codec: Option<&Codec>, + ) -> Result { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; @@ -250,7 +288,8 @@ impl IpcDataGenerator { 0, array_data.len(), array_data.null_count(), - ); + codec, + )?; // write data let buffers = fbb.create_vector(&buffers); @@ -283,10 +322,10 @@ impl IpcDataGenerator { fbb.finish(root, None); let finished_data = fbb.finished_data(); - EncodedData { + Ok(EncodedData { ipc_message: finished_data.to_vec(), arrow_data, - } + }) } } @@ -662,6 +701,7 @@ fn write_continuation( } /// Write array data to a vector of bytes +#[allow(clippy::clippy::too_many_arguments)] fn write_array_data( array_data: &ArrayDataRef, mut buffers: &mut Vec, @@ -670,7 +710,8 @@ fn write_array_data( offset: i64, num_rows: usize, null_count: usize, -) -> i64 { + codec: Option<&Codec>, +) -> Result { let mut offset = offset; nodes.push(ipc::FieldNode::new(num_rows as i64, null_count as i64)); // NullArray does not have any buffers, thus the null buffer is not generated @@ -687,16 +728,17 @@ fn write_array_data( Some(buffer) => buffer.clone(), }; - offset = write_buffer(&null_buffer, &mut buffers, &mut arrow_data, offset); + offset = + write_buffer(&null_buffer, &mut buffers, &mut arrow_data, offset, codec)?; } - array_data.buffers().iter().for_each(|buffer| { - offset = write_buffer(buffer, &mut buffers, &mut arrow_data, offset); - }); + for buffer in array_data.buffers() { + offset = write_buffer(buffer, &mut buffers, &mut arrow_data, offset, codec)?; + } if !matches!(array_data.data_type(), DataType::Dictionary(_, _)) { // recursively write out nested structures - array_data.child_data().iter().for_each(|data_ref| { + for data_ref in array_data.child_data() { // write the nested data (e.g list data) offset = write_array_data( data_ref, @@ -706,11 +748,12 @@ fn write_array_data( offset, data_ref.len(), data_ref.null_count(), - ); - }); + codec, + )?; + } } - offset + Ok(offset) } /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector @@ -719,15 +762,24 @@ fn write_buffer( buffers: &mut Vec, arrow_data: &mut Vec, offset: i64, -) -> i64 { - let len = buffer.len(); + codec: Option<&Codec>, +) -> Result { + let mut compressed = Vec::new(); + let buf_slice = match codec { + Some(codec) => { + codec.compress(buffer.as_slice(), &mut compressed)?; + compressed.as_slice() + } + None => buffer.as_slice(), + }; + let len = buf_slice.len(); let pad_len = pad_to_8(len as u32); let total_len: i64 = (len + pad_len) as i64; // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes"); buffers.push(ipc::Buffer::new(offset, total_len)); - arrow_data.extend_from_slice(buffer.as_slice()); + arrow_data.extend_from_slice(buf_slice); arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); - offset + total_len + Ok(offset + total_len) } /// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes @@ -850,19 +902,19 @@ mod tests { #[test] fn test_write_null_file_v4() { write_null_file( - IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap(), + IpcWriteOptions::try_new(8, false, MetadataVersion::V4, None).unwrap(), "v4_a8", ); write_null_file( - IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap(), + IpcWriteOptions::try_new(8, true, MetadataVersion::V4, None).unwrap(), "v4_a8l", ); write_null_file( - IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap(), + IpcWriteOptions::try_new(64, false, MetadataVersion::V4, None).unwrap(), "v4_a64", ); write_null_file( - IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap(), + IpcWriteOptions::try_new(64, true, MetadataVersion::V4, None).unwrap(), "v4_a64l", ); } @@ -870,15 +922,60 @@ mod tests { #[test] fn test_write_null_file_v5() { write_null_file( - IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap(), + IpcWriteOptions::try_new(8, false, MetadataVersion::V5, None).unwrap(), "v5_a8", ); write_null_file( - IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap(), + IpcWriteOptions::try_new(64, false, MetadataVersion::V5, None).unwrap(), "v5_a64", ); } + #[test] + fn test_write_file_v5_compressed() { + let testdata = crate::util::test_util::arrow_test_data(); + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/1.0.0-littleendian/generated_primitive.arrow_file", + testdata + )).unwrap(); + let options = IpcWriteOptions::try_new( + 64, + false, + MetadataVersion::V5, + Some(ipc::CompressionType::LZ4_FRAME), + ) + .unwrap(); + let reader = FileReader::try_new(file).unwrap(); + let filepath = "target/debug/testdata/primitive_lz4.arrow_file"; + let file = File::create(&filepath).unwrap(); + let mut writer = + FileWriter::try_new_with_options(file, &reader.schema(), options).unwrap(); + let batches = reader.collect::>>().unwrap(); + for b in &batches { + writer.write(b).unwrap(); + } + // close writer + writer.finish().unwrap(); + + // read written + let file = File::open(&filepath).unwrap(); + let reader = FileReader::try_new(file).unwrap(); + let decompressed_batches = reader.collect::>>().unwrap(); + assert_eq!(batches.len(), decompressed_batches.len()); + for (a, b) in batches.iter().zip(decompressed_batches) { + assert_eq!(a.schema(), b.schema()); + assert_eq!(a.num_columns(), b.num_columns()); + assert_eq!(a.num_rows(), b.num_rows()); + a.columns() + .iter() + .zip(b.columns()) + .for_each(|(a_col, b_col)| { + println!("A: {:?}, \nB: {:?}", a_col, b_col); + assert_eq!(a_col, b_col); + }); + } + } + #[test] fn read_and_rewrite_generated_files_014() { let testdata = crate::util::test_util::arrow_test_data(); @@ -1020,7 +1117,8 @@ mod tests { .unwrap(); // write IPC version 5 let options = - IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap(); + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5, None) + .unwrap(); let mut writer = FileWriter::try_new_with_options(file, &reader.schema(), options) .unwrap(); @@ -1083,7 +1181,8 @@ mod tests { )) .unwrap(); let options = - IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap(); + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5, None) + .unwrap(); let mut writer = StreamWriter::try_new_with_options(file, &reader.schema(), options) .unwrap();