From f10bd742c7c2155a824e076d4d42c1cc65281f4c Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 26 Nov 2021 17:05:45 +0100 Subject: [PATCH 1/7] Use `BufRead` to improve performance --- datafusion/src/datasource/file_format/parquet.rs | 3 ++- datafusion/src/datasource/listing/table.rs | 2 +- datafusion/src/datasource/object_store/local.rs | 9 ++++++--- datafusion/src/datasource/object_store/mod.rs | 6 +++--- datafusion/src/execution/context.rs | 2 +- datafusion/src/logical_plan/builder.rs | 2 +- .../src/physical_plan/file_format/file_stream.rs | 12 +++--------- 7 files changed, 17 insertions(+), 19 deletions(-) diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 819f374486369..01770effb288c 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -18,6 +18,7 @@ //! Parquet format abstractions use std::any::Any; +use std::io::BufRead; use std::io::Read; use std::sync::Arc; @@ -311,7 +312,7 @@ impl Length for ChunkObjectReader { } impl ChunkReader for ChunkObjectReader { - type T = Box; + type T = Box; fn get_read(&self, start: u64, length: usize) -> ParquetResult { self.0 diff --git a/datafusion/src/datasource/listing/table.rs b/datafusion/src/datasource/listing/table.rs index 22e3f750370ce..8ca599f4d6442 100644 --- a/datafusion/src/datasource/listing/table.rs +++ b/datafusion/src/datasource/listing/table.rs @@ -76,7 +76,7 @@ impl ListingOptions { file_extension: String::new(), format, table_partition_cols: vec![], - collect_stat: true, + collect_stat: false, target_partitions: 1, } } diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index b2a2ddfa950b9..5b87bee1a7512 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -18,7 +18,7 @@ //! Object store that represents the Local File System. use std::fs::{self, File, Metadata}; -use std::io::{Read, Seek, SeekFrom}; +use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use async_trait::async_trait; @@ -82,12 +82,15 @@ impl ObjectReader for LocalFileReader { &self, start: u64, length: usize, - ) -> Result> { + ) -> Result> { // A new file descriptor is opened for each chunk reader. // This okay because chunks are usually fairly large. let mut file = File::open(&self.file.path)?; file.seek(SeekFrom::Start(start))?; - Ok(Box::new(file.take(length as u64))) + + let file = BufReader::new(file.take(length as u64)); + + Ok(Box::new(file)) } fn length(&self) -> u64 { diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index 59e184103d2a3..97085c5df36c6 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -21,7 +21,7 @@ pub mod local; use std::collections::HashMap; use std::fmt::{self, Debug}; -use std::io::Read; +use std::io::{BufRead, Read}; use std::pin::Pin; use std::sync::{Arc, RwLock}; @@ -48,10 +48,10 @@ pub trait ObjectReader: Send + Sync { &self, start: u64, length: usize, - ) -> Result>; + ) -> Result>; /// Get reader for the entire file - fn sync_reader(&self) -> Result> { + fn sync_reader(&self) -> Result> { self.sync_chunk_reader(0, self.length() as usize) } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 27116c0a4a952..66ddcf14f5487 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -481,7 +481,7 @@ impl ExecutionContext { let listing_options = ListingOptions { format: Arc::new(file_format), - collect_stat: true, + collect_stat: false, file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), target_partitions, table_partition_cols: vec![], diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index fc7df595586fb..0d24dc44ae239 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -278,7 +278,7 @@ impl LogicalPlanBuilder { let listing_options = ListingOptions { format: Arc::new(file_format), - collect_stat: true, + collect_stat: false, file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), target_partitions, table_partition_cols: vec![], diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs b/datafusion/src/physical_plan/file_format/file_stream.rs index 958b1721bb392..ca3a69a6eac80 100644 --- a/datafusion/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/src/physical_plan/file_format/file_stream.rs @@ -32,13 +32,7 @@ use arrow::{ record_batch::RecordBatch, }; use futures::Stream; -use std::{ - io::Read, - iter, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; +use std::{io::{BufRead, Read}, iter, pin::Pin, sync::Arc, task::{Context, Poll}}; use super::PartitionColumnProjector; @@ -48,12 +42,12 @@ pub type BatchIter = Box> + Send + /// A closure that creates a file format reader (iterator over `RecordBatch`) from a `Read` object /// and an optional number of required records. pub trait FormatReaderOpener: - FnMut(Box, &Option) -> BatchIter + Send + Unpin + 'static + FnMut(Box, &Option) -> BatchIter + Send + Unpin + 'static { } impl FormatReaderOpener for T where - T: FnMut(Box, &Option) -> BatchIter + T: FnMut(Box, &Option) -> BatchIter + Send + Unpin + 'static From 70322f098f0237124ca65ac9650667ed27c1527b Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 26 Nov 2021 17:08:03 +0100 Subject: [PATCH 2/7] Undo stat change --- datafusion/src/execution/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 66ddcf14f5487..27116c0a4a952 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -481,7 +481,7 @@ impl ExecutionContext { let listing_options = ListingOptions { format: Arc::new(file_format), - collect_stat: false, + collect_stat: true, file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), target_partitions, table_partition_cols: vec![], From fbb14101ccc315664114080d87ea3f234249dc77 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 26 Nov 2021 17:13:37 +0100 Subject: [PATCH 3/7] Format --- datafusion/src/datasource/object_store/local.rs | 4 ++-- .../src/physical_plan/file_format/file_stream.rs | 13 +++++++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index 5b87bee1a7512..0caf73c51b3e0 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -87,9 +87,9 @@ impl ObjectReader for LocalFileReader { // This okay because chunks are usually fairly large. let mut file = File::open(&self.file.path)?; file.seek(SeekFrom::Start(start))?; - + let file = BufReader::new(file.take(length as u64)); - + Ok(Box::new(file)) } diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs b/datafusion/src/physical_plan/file_format/file_stream.rs index ca3a69a6eac80..f4076ff758a99 100644 --- a/datafusion/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/src/physical_plan/file_format/file_stream.rs @@ -32,7 +32,13 @@ use arrow::{ record_batch::RecordBatch, }; use futures::Stream; -use std::{io::{BufRead, Read}, iter, pin::Pin, sync::Arc, task::{Context, Poll}}; +use std::{ + io::{BufRead, Read}, + iter, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; use super::PartitionColumnProjector; @@ -42,7 +48,10 @@ pub type BatchIter = Box> + Send + /// A closure that creates a file format reader (iterator over `RecordBatch`) from a `Read` object /// and an optional number of required records. pub trait FormatReaderOpener: - FnMut(Box, &Option) -> BatchIter + Send + Unpin + 'static + FnMut(Box, &Option) -> BatchIter + + Send + + Unpin + + 'static { } From 9cbaf670271b34ba62bfffcb25c4c7686c00aeec Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 26 Nov 2021 17:14:56 +0100 Subject: [PATCH 4/7] Undo stat change --- datafusion/src/datasource/listing/table.rs | 2 +- datafusion/src/logical_plan/builder.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/datasource/listing/table.rs b/datafusion/src/datasource/listing/table.rs index 8ca599f4d6442..22e3f750370ce 100644 --- a/datafusion/src/datasource/listing/table.rs +++ b/datafusion/src/datasource/listing/table.rs @@ -76,7 +76,7 @@ impl ListingOptions { file_extension: String::new(), format, table_partition_cols: vec![], - collect_stat: false, + collect_stat: true, target_partitions: 1, } } diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 0d24dc44ae239..fc7df595586fb 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -278,7 +278,7 @@ impl LogicalPlanBuilder { let listing_options = ListingOptions { format: Arc::new(file_format), - collect_stat: false, + collect_stat: true, file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), target_partitions, table_partition_cols: vec![], From 10c2a8c5da120f2652f8ecd9c53116c087eeb833 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 26 Nov 2021 17:33:05 +0100 Subject: [PATCH 5/7] Unneeded imports --- datafusion/src/datasource/file_format/parquet.rs | 1 - datafusion/src/datasource/object_store/mod.rs | 2 +- datafusion/src/physical_plan/file_format/file_stream.rs | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 01770effb288c..43022527d0e66 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -19,7 +19,6 @@ use std::any::Any; use std::io::BufRead; -use std::io::Read; use std::sync::Arc; use arrow::datatypes::Schema; diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index 97085c5df36c6..4e6f347932ad1 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -21,7 +21,7 @@ pub mod local; use std::collections::HashMap; use std::fmt::{self, Debug}; -use std::io::{BufRead, Read}; +use std::io::BufRead; use std::pin::Pin; use std::sync::{Arc, RwLock}; diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs b/datafusion/src/physical_plan/file_format/file_stream.rs index f4076ff758a99..0f260122e2b71 100644 --- a/datafusion/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/src/physical_plan/file_format/file_stream.rs @@ -33,7 +33,7 @@ use arrow::{ }; use futures::Stream; use std::{ - io::{BufRead, Read}, + io::BufRead, iter, pin::Pin, sync::Arc, From 0926029f81112f5e66a4b4a4da2be8a82e922a8f Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Fri, 26 Nov 2021 18:25:34 +0100 Subject: [PATCH 6/7] Use BufRead in test code --- datafusion/src/test/object_store.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/src/test/object_store.rs b/datafusion/src/test/object_store.rs index e93b4cd2d410d..0260c1672aa8d 100644 --- a/datafusion/src/test/object_store.rs +++ b/datafusion/src/test/object_store.rs @@ -18,7 +18,7 @@ use std::{ io, - io::{Cursor, Read}, + io::{BufRead, Cursor}, sync::Arc, }; @@ -111,7 +111,7 @@ impl ObjectReader for EmptyObjectReader { &self, _start: u64, _length: usize, - ) -> Result> { + ) -> Result> { Ok(Box::new(Cursor::new(vec![0; self.0 as usize]))) } From 0843b61d751e1d966f0dff3bc01ef0c8578b65f4 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 27 Nov 2021 14:57:57 +0100 Subject: [PATCH 7/7] Revert trait change --- datafusion/src/datasource/file_format/parquet.rs | 4 ++-- datafusion/src/datasource/object_store/local.rs | 4 ++-- datafusion/src/datasource/object_store/mod.rs | 6 +++--- datafusion/src/physical_plan/file_format/file_stream.rs | 9 +++------ datafusion/src/test/object_store.rs | 4 ++-- 5 files changed, 12 insertions(+), 15 deletions(-) diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 43022527d0e66..819f374486369 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -18,7 +18,7 @@ //! Parquet format abstractions use std::any::Any; -use std::io::BufRead; +use std::io::Read; use std::sync::Arc; use arrow::datatypes::Schema; @@ -311,7 +311,7 @@ impl Length for ChunkObjectReader { } impl ChunkReader for ChunkObjectReader { - type T = Box; + type T = Box; fn get_read(&self, start: u64, length: usize) -> ParquetResult { self.0 diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index 0caf73c51b3e0..0e857c8485828 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -18,7 +18,7 @@ //! Object store that represents the Local File System. use std::fs::{self, File, Metadata}; -use std::io::{BufRead, BufReader, Read, Seek, SeekFrom}; +use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use async_trait::async_trait; @@ -82,7 +82,7 @@ impl ObjectReader for LocalFileReader { &self, start: u64, length: usize, - ) -> Result> { + ) -> Result> { // A new file descriptor is opened for each chunk reader. // This okay because chunks are usually fairly large. let mut file = File::open(&self.file.path)?; diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index 4e6f347932ad1..59e184103d2a3 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -21,7 +21,7 @@ pub mod local; use std::collections::HashMap; use std::fmt::{self, Debug}; -use std::io::BufRead; +use std::io::Read; use std::pin::Pin; use std::sync::{Arc, RwLock}; @@ -48,10 +48,10 @@ pub trait ObjectReader: Send + Sync { &self, start: u64, length: usize, - ) -> Result>; + ) -> Result>; /// Get reader for the entire file - fn sync_reader(&self) -> Result> { + fn sync_reader(&self) -> Result> { self.sync_chunk_reader(0, self.length() as usize) } diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs b/datafusion/src/physical_plan/file_format/file_stream.rs index 0f260122e2b71..958b1721bb392 100644 --- a/datafusion/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/src/physical_plan/file_format/file_stream.rs @@ -33,7 +33,7 @@ use arrow::{ }; use futures::Stream; use std::{ - io::BufRead, + io::Read, iter, pin::Pin, sync::Arc, @@ -48,15 +48,12 @@ pub type BatchIter = Box> + Send + /// A closure that creates a file format reader (iterator over `RecordBatch`) from a `Read` object /// and an optional number of required records. pub trait FormatReaderOpener: - FnMut(Box, &Option) -> BatchIter - + Send - + Unpin - + 'static + FnMut(Box, &Option) -> BatchIter + Send + Unpin + 'static { } impl FormatReaderOpener for T where - T: FnMut(Box, &Option) -> BatchIter + T: FnMut(Box, &Option) -> BatchIter + Send + Unpin + 'static diff --git a/datafusion/src/test/object_store.rs b/datafusion/src/test/object_store.rs index 0260c1672aa8d..e93b4cd2d410d 100644 --- a/datafusion/src/test/object_store.rs +++ b/datafusion/src/test/object_store.rs @@ -18,7 +18,7 @@ use std::{ io, - io::{BufRead, Cursor}, + io::{Cursor, Read}, sync::Arc, }; @@ -111,7 +111,7 @@ impl ObjectReader for EmptyObjectReader { &self, _start: u64, _length: usize, - ) -> Result> { + ) -> Result> { Ok(Box::new(Cursor::new(vec![0; self.0 as usize]))) }