From 96cb0292a0d18c3daa72c2dfa38d5c4a17eb756d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 23 Dec 2022 08:04:59 -0600 Subject: [PATCH] Minor: refactor streaming CSV inference code --- .../core/src/datasource/file_format/csv.rs | 174 +++++++++++------- 1 file changed, 105 insertions(+), 69 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index ee4e262fa449b..358189627d277 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -25,11 +25,11 @@ use std::sync::Arc; use arrow::datatypes::{DataType, Field, Schema}; use arrow::{self, datatypes::SchemaRef}; use async_trait::async_trait; -use bytes::Buf; +use bytes::{Buf, Bytes}; use datafusion_common::DataFusionError; -use futures::{pin_mut, StreamExt, TryStreamExt}; +use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{ObjectMeta, ObjectStore}; use super::FileFormat; @@ -125,75 +125,16 @@ impl FileFormat for CsvFormat { let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX); - 'iterating_objects: for object in objects { + for object in objects { // stream to only read as many rows as needed into memory - let stream = store - .get(&object.location) - .await? - .into_stream() - .map_err(|e| DataFusionError::External(Box::new(e))); - let stream = newline_delimited_stream(stream); - pin_mut!(stream); - - let mut column_names = vec![]; - let mut column_type_possibilities = vec![]; - let mut first_chunk = true; - - 'reading_object: while let Some(data) = stream.next().await.transpose()? { - let (Schema { fields, .. }, records_read) = - arrow::csv::reader::infer_reader_schema( - self.file_compression_type.convert_read(data.reader())?, - self.delimiter, - Some(records_to_read), - // only consider header for first chunk - self.has_header && first_chunk, - )?; - records_to_read -= records_read; - - if first_chunk { - // set up initial structures for recording inferred schema across chunks - (column_names, column_type_possibilities) = fields - .into_iter() - .map(|field| { - let mut possibilities = HashSet::new(); - if records_read > 0 { - // at least 1 data row read, record the inferred datatype - possibilities.insert(field.data_type().clone()); - } - (field.name().clone(), possibilities) - }) - .unzip(); - first_chunk = false; - } else { - if fields.len() != column_type_possibilities.len() { - return Err(DataFusionError::Execution( - format!( - "Encountered unequal lengths between records on CSV file whilst inferring schema. \ - Expected {} records, found {} records", - column_type_possibilities.len(), - fields.len() - ) - )); - } - - column_type_possibilities.iter_mut().zip(fields).for_each( - |(possibilities, field)| { - possibilities.insert(field.data_type().clone()); - }, - ); - } - - if records_to_read == 0 { - break 'reading_object; - } - } - - schemas.push(build_schema_helper( - column_names, - &column_type_possibilities, - )); + let stream = read_to_delimited_chunks(store, object).await; + let (schema, records_read) = self + .infer_schema_from_stream(records_to_read, stream) + .await?; + records_to_read -= records_read; + schemas.push(schema); if records_to_read == 0 { - break 'iterating_objects; + break; } } @@ -227,6 +168,101 @@ impl FileFormat for CsvFormat { } } +/// Return a newline delimited stream from the specified file on +/// object store +/// +/// Each returned `Bytes` has a whole number of newline delimited rows +async fn read_to_delimited_chunks( + store: &Arc, + object: &ObjectMeta, +) -> impl Stream> { + // stream to only read as many rows as needed into memory + let stream = store + .get(&object.location) + .await + .map_err(DataFusionError::ObjectStore); + + match stream { + Ok(s) => newline_delimited_stream( + s.into_stream() + .map_err(|e| DataFusionError::External(Box::new(e))), + ) + .left_stream(), + Err(e) => futures::stream::iter(vec![Err(e)]).right_stream(), + } +} + +impl CsvFormat { + /// Return the inferred schema reading up to records_to_read from a + /// stream of delimited chunks returning the inferred schema and the + /// number of lines that were read + async fn infer_schema_from_stream( + &self, + mut records_to_read: usize, + stream: impl Stream>, + ) -> Result<(Schema, usize)> { + let mut total_records_read = 0; + let mut column_names = vec![]; + let mut column_type_possibilities = vec![]; + let mut first_chunk = true; + + pin_mut!(stream); + + while let Some(chunk) = stream.next().await.transpose()? { + let (Schema { fields, .. }, records_read) = + arrow::csv::reader::infer_reader_schema( + self.file_compression_type.convert_read(chunk.reader())?, + self.delimiter, + Some(records_to_read), + // only consider header for first chunk + self.has_header && first_chunk, + )?; + records_to_read -= records_read; + total_records_read += records_read; + + if first_chunk { + // set up initial structures for recording inferred schema across chunks + (column_names, column_type_possibilities) = fields + .into_iter() + .map(|field| { + let mut possibilities = HashSet::new(); + if records_read > 0 { + // at least 1 data row read, record the inferred datatype + possibilities.insert(field.data_type().clone()); + } + (field.name().clone(), possibilities) + }) + .unzip(); + first_chunk = false; + } else { + if fields.len() != column_type_possibilities.len() { + return Err(DataFusionError::Execution( + format!( + "Encountered unequal lengths between records on CSV file whilst inferring schema. \ + Expected {} records, found {} records", + column_type_possibilities.len(), + fields.len() + ) + )); + } + + column_type_possibilities.iter_mut().zip(fields).for_each( + |(possibilities, field)| { + possibilities.insert(field.data_type().clone()); + }, + ); + } + + if records_to_read == 0 { + break; + } + } + + let schema = build_schema_helper(column_names, &column_type_possibilities); + Ok((schema, total_records_read)) + } +} + fn build_schema_helper(names: Vec, types: &[HashSet]) -> Schema { let fields = names .into_iter()