From 82d33471c9d95e028373010a55a3ab93d8315b44 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 25 Dec 2020 08:49:28 +0100 Subject: [PATCH 01/10] Use buffer for csv writer to avoid extra allocation --- rust/arrow/src/csv/writer.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs index 1eb0c4db6f6..54188989726 100644 --- a/rust/arrow/src/csv/writer.rs +++ b/rust/arrow/src/csv/writer.rs @@ -124,15 +124,19 @@ impl Writer { } /// Convert a record to a string vector - fn convert(&self, batch: &RecordBatch, row_index: usize) -> Result> { + fn convert( + &self, + batch: &RecordBatch, + row_index: usize, + buffer: &mut [String], + ) -> Result<()> { // TODO: it'd be more efficient if we could create `record: Vec<&[u8]> - let mut record: Vec = Vec::with_capacity(batch.num_columns()); - for col_index in 0..batch.num_columns() { + for (col_index, item) in buffer.iter_mut().enumerate() { let col = batch.column(col_index); if col.is_null(row_index) { // write an empty value - record.push(String::from("")); - continue; + *item = "".to_string(); + continue; } let string = match col.data_type() { DataType::Float64 => write_primitive_value::(col, row_index), @@ -243,10 +247,9 @@ impl Writer { ))); } }; - - record.push(string); + *item = string; } - Ok(record) + Ok(()) } /// Write a vector of record batches to a writable object @@ -265,9 +268,11 @@ impl Writer { self.beginning = false; } + let mut buffer = vec!["".to_string(); batch.num_columns()]; + for row_index in 0..batch.num_rows() { - let record = self.convert(batch, row_index)?; - self.writer.write_record(&record[..])?; + self.convert(batch, row_index, &mut buffer)?; + self.writer.write_record(&buffer[..])?; } self.writer.flush()?; From 1c78dc6db6f9b5dc212ef9cb8a20b4e9f518cf4d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 25 Dec 2020 08:50:51 +0100 Subject: [PATCH 02/10] Style tweak --- rust/arrow/src/csv/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs index 54188989726..5497cfa0e7b 100644 --- a/rust/arrow/src/csv/writer.rs +++ b/rust/arrow/src/csv/writer.rs @@ -272,7 +272,7 @@ impl Writer { for row_index in 0..batch.num_rows() { self.convert(batch, row_index, &mut buffer)?; - self.writer.write_record(&buffer[..])?; + self.writer.write_record(&buffer)?; } self.writer.flush()?; From 2a8db6788badd478eba39b31db0bc60afdfda2a0 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 25 Dec 2020 09:40:40 +0100 Subject: [PATCH 03/10] Allow configuring batch size --- rust/arrow/src/csv/writer.rs | 2 +- rust/benchmarks/src/bin/tpch.rs | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs index 5497cfa0e7b..b3d57bd49ed 100644 --- a/rust/arrow/src/csv/writer.rs +++ b/rust/arrow/src/csv/writer.rs @@ -136,7 +136,7 @@ impl Writer { if col.is_null(row_index) { // write an empty value *item = "".to_string(); - continue; + continue; } let string = match col.data_type() { DataType::Float64 => write_primitive_value::(col, row_index), diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index 8a6fca78930..769668c958b 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -89,6 +89,10 @@ struct ConvertOpt { /// Number of partitions to produce #[structopt(short = "p", long = "partitions", default_value = "1")] partitions: usize, + + /// Batch size when reading CSV or Parquet files + #[structopt(short = "s", long = "batch-size", default_value = "4096")] + batch_size: usize, } #[derive(Debug, StructOpt)] @@ -1019,7 +1023,8 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> { .delimiter(b'|') .file_extension(".tbl"); - let mut ctx = ExecutionContext::new(); + let config = ExecutionConfig::new().with_batch_size(opt.batch_size); + let mut ctx = ExecutionContext::with_config(config); // build plan to read the TBL file let mut csv = ctx.read_csv(&input_path, options)?; From 2676383f755c79815edd9314d9cb6fa9eed10fee Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 25 Dec 2020 10:00:36 +0100 Subject: [PATCH 04/10] Use lexical core for serialization --- rust/arrow/src/csv/writer.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs index b3d57bd49ed..ca8f6a6df83 100644 --- a/rust/arrow/src/csv/writer.rs +++ b/rust/arrow/src/csv/writer.rs @@ -73,18 +73,32 @@ use crate::array::*; use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; - +use lexical_core; const DEFAULT_DATE_FORMAT: &str = "%F"; const DEFAULT_TIME_FORMAT: &str = "%T"; const DEFAULT_TIMESTAMP_FORMAT: &str = "%FT%H:%M:%S.%9f"; +unsafe fn vector_as_slice<'a, T>(buf: &'a mut Vec) -> &'a mut [T] { + let first = buf.as_mut_ptr(); + std::slice::from_raw_parts_mut(first, buf.capacity()) +} +pub fn to_string(n: N) -> String { + let mut buf = Vec::::with_capacity(N::FORMATTED_SIZE_DECIMAL); + unsafe { + let len = lexical_core::write(n, vector_as_slice(&mut buf)).len(); + buf.set_len(len); + String::from_utf8_unchecked(buf) + } +} + fn write_primitive_value(array: &ArrayRef, i: usize) -> String where T: ArrowNumericType, T::Native: std::string::ToString, + T::Native: lexical_core::ToLexical, { let c = array.as_any().downcast_ref::>().unwrap(); - c.value(i).to_string() + to_string::(c.value(i)) } /// A CSV writer From e4d5b5dae495b542598fe914b04001feb9bc19fd Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 25 Dec 2020 10:35:27 +0100 Subject: [PATCH 05/10] Clippy --- rust/arrow/src/csv/writer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs index ca8f6a6df83..a1571aeec1d 100644 --- a/rust/arrow/src/csv/writer.rs +++ b/rust/arrow/src/csv/writer.rs @@ -73,7 +73,6 @@ use crate::array::*; use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; -use lexical_core; const DEFAULT_DATE_FORMAT: &str = "%F"; const DEFAULT_TIME_FORMAT: &str = "%T"; const DEFAULT_TIMESTAMP_FORMAT: &str = "%FT%H:%M:%S.%9f"; From 70fbfcea20bc32222ec7bdbb4de42ebb09959292 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 25 Dec 2020 11:51:30 +0100 Subject: [PATCH 06/10] Simplify implementation a bit --- rust/arrow/src/csv/writer.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs index a1571aeec1d..6440a497434 100644 --- a/rust/arrow/src/csv/writer.rs +++ b/rust/arrow/src/csv/writer.rs @@ -77,14 +77,11 @@ const DEFAULT_DATE_FORMAT: &str = "%F"; const DEFAULT_TIME_FORMAT: &str = "%T"; const DEFAULT_TIMESTAMP_FORMAT: &str = "%FT%H:%M:%S.%9f"; -unsafe fn vector_as_slice<'a, T>(buf: &'a mut Vec) -> &'a mut [T] { - let first = buf.as_mut_ptr(); - std::slice::from_raw_parts_mut(first, buf.capacity()) -} pub fn to_string(n: N) -> String { let mut buf = Vec::::with_capacity(N::FORMATTED_SIZE_DECIMAL); unsafe { - let len = lexical_core::write(n, vector_as_slice(&mut buf)).len(); + let mut slice = std::slice::from_raw_parts_mut(first.as_mut_ptr(), buf.capacity()); + let len = lexical_core::write(n, slice).len(); buf.set_len(len); String::from_utf8_unchecked(buf) } From 4f2a72aa4ce3ee9c503681e1305f9e5e7321dd70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 25 Dec 2020 11:56:14 +0100 Subject: [PATCH 07/10] Fix --- rust/arrow/src/csv/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs index 6440a497434..52eb258cc07 100644 --- a/rust/arrow/src/csv/writer.rs +++ b/rust/arrow/src/csv/writer.rs @@ -80,7 +80,7 @@ const DEFAULT_TIMESTAMP_FORMAT: &str = "%FT%H:%M:%S.%9f"; pub fn to_string(n: N) -> String { let mut buf = Vec::::with_capacity(N::FORMATTED_SIZE_DECIMAL); unsafe { - let mut slice = std::slice::from_raw_parts_mut(first.as_mut_ptr(), buf.capacity()); + let mut slice = std::slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity()); let len = lexical_core::write(n, slice).len(); buf.set_len(len); String::from_utf8_unchecked(buf) From 0fda55aa8bb974f5bcedce8de3b7249f93f9dd2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 25 Dec 2020 12:14:01 +0100 Subject: [PATCH 08/10] Add safety docs --- rust/arrow/src/csv/writer.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs index 52eb258cc07..d03beb7d194 100644 --- a/rust/arrow/src/csv/writer.rs +++ b/rust/arrow/src/csv/writer.rs @@ -80,6 +80,12 @@ const DEFAULT_TIMESTAMP_FORMAT: &str = "%FT%H:%M:%S.%9f"; pub fn to_string(n: N) -> String { let mut buf = Vec::::with_capacity(N::FORMATTED_SIZE_DECIMAL); unsafe { + // JUSTIFICATION + // Benefit + // Allows using the faster serializer lexical core and convert to string + // Soundness + // Length of buf is set as written length afterwards. lexical_core + // creates a valid string, so doesn't need to be checked. let mut slice = std::slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity()); let len = lexical_core::write(n, slice).len(); buf.set_len(len); From 8f3d28c948d09cffbb1f96a25dc09aa9fe96eac3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 25 Dec 2020 12:26:43 +0100 Subject: [PATCH 09/10] Clippy --- rust/arrow/src/csv/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs index d03beb7d194..ab007a75cfe 100644 --- a/rust/arrow/src/csv/writer.rs +++ b/rust/arrow/src/csv/writer.rs @@ -86,7 +86,7 @@ pub fn to_string(n: N) -> String { // Soundness // Length of buf is set as written length afterwards. lexical_core // creates a valid string, so doesn't need to be checked. - let mut slice = std::slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity()); + let slice = std::slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity()); let len = lexical_core::write(n, slice).len(); buf.set_len(len); String::from_utf8_unchecked(buf) From 3e808e1f83031b7c2bc90be0aee9589e0ace0dbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 25 Dec 2020 16:06:33 +0100 Subject: [PATCH 10/10] Simplify --- rust/arrow/src/csv/writer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs index ab007a75cfe..9ef9de8301a 100644 --- a/rust/arrow/src/csv/writer.rs +++ b/rust/arrow/src/csv/writer.rs @@ -96,11 +96,10 @@ pub fn to_string(n: N) -> String { fn write_primitive_value(array: &ArrayRef, i: usize) -> String where T: ArrowNumericType, - T::Native: std::string::ToString, T::Native: lexical_core::ToLexical, { let c = array.as_any().downcast_ref::>().unwrap(); - to_string::(c.value(i)) + to_string(c.value(i)) } /// A CSV writer