diff --git a/rust/arrow/src/csv/writer.rs b/rust/arrow/src/csv/writer.rs index 1eb0c4db6f6..9ef9de8301a 100644 --- a/rust/arrow/src/csv/writer.rs +++ b/rust/arrow/src/csv/writer.rs @@ -73,18 +73,33 @@ use crate::array::*; use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; - const DEFAULT_DATE_FORMAT: &str = "%F"; const DEFAULT_TIME_FORMAT: &str = "%T"; const DEFAULT_TIMESTAMP_FORMAT: &str = "%FT%H:%M:%S.%9f"; +pub fn to_string(n: N) -> String { + let mut buf = Vec::::with_capacity(N::FORMATTED_SIZE_DECIMAL); + unsafe { + // JUSTIFICATION + // Benefit + // Allows using the faster serializer lexical core and convert to string + // Soundness + // Length of buf is set as written length afterwards. lexical_core + // creates a valid string, so doesn't need to be checked. + let slice = std::slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity()); + let len = lexical_core::write(n, slice).len(); + buf.set_len(len); + String::from_utf8_unchecked(buf) + } +} + fn write_primitive_value(array: &ArrayRef, i: usize) -> String where T: ArrowNumericType, - T::Native: std::string::ToString, + T::Native: lexical_core::ToLexical, { let c = array.as_any().downcast_ref::>().unwrap(); - c.value(i).to_string() + to_string(c.value(i)) } /// A CSV writer @@ -124,14 +139,18 @@ impl Writer { } /// Convert a record to a string vector - fn convert(&self, batch: &RecordBatch, row_index: usize) -> Result> { + fn convert( + &self, + batch: &RecordBatch, + row_index: usize, + buffer: &mut [String], + ) -> Result<()> { // TODO: it'd be more efficient if we could create `record: Vec<&[u8]> - let mut record: Vec = Vec::with_capacity(batch.num_columns()); - for col_index in 0..batch.num_columns() { + for (col_index, item) in buffer.iter_mut().enumerate() { let col = batch.column(col_index); if col.is_null(row_index) { // write an empty value - record.push(String::from("")); + *item = "".to_string(); continue; } let string = match col.data_type() { @@ -243,10 +262,9 @@ impl Writer { ))); } }; - - record.push(string); + *item = string; } - Ok(record) + Ok(()) } /// Write a vector of record batches to a writable object @@ -265,9 +283,11 @@ impl Writer { self.beginning = false; } + let mut buffer = vec!["".to_string(); batch.num_columns()]; + for row_index in 0..batch.num_rows() { - let record = self.convert(batch, row_index)?; - self.writer.write_record(&record[..])?; + self.convert(batch, row_index, &mut buffer)?; + self.writer.write_record(&buffer)?; } self.writer.flush()?; diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs index 8a6fca78930..769668c958b 100644 --- a/rust/benchmarks/src/bin/tpch.rs +++ b/rust/benchmarks/src/bin/tpch.rs @@ -89,6 +89,10 @@ struct ConvertOpt { /// Number of partitions to produce #[structopt(short = "p", long = "partitions", default_value = "1")] partitions: usize, + + /// Batch size when reading CSV or Parquet files + #[structopt(short = "s", long = "batch-size", default_value = "4096")] + batch_size: usize, } #[derive(Debug, StructOpt)] @@ -1019,7 +1023,8 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> { .delimiter(b'|') .file_extension(".tbl"); - let mut ctx = ExecutionContext::new(); + let config = ExecutionConfig::new().with_batch_size(opt.batch_size); + let mut ctx = ExecutionContext::with_config(config); // build plan to read the TBL file let mut csv = ctx.read_csv(&input_path, options)?;