diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index 82e47874fae82..d64fac320fcd4 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -885,17 +885,17 @@ mod roundtrip_tests { use crate::serde::{AsLogicalPlan, BallistaCodec}; use async_trait::async_trait; use core::panic; - use datafusion::datafusion_storage::{ - object_store::{ - local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader, - ObjectStore, - }, - SizedFile, - }; - use datafusion::datasource::listing::ListingTable; - use datafusion::error::DataFusionError; use datafusion::{ arrow::datatypes::{DataType, Field, Schema}, + datafusion_storage::{ + self, + object_store::{ + local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader, + ObjectStore, + }, + SizedFile, + }, + datasource::listing::ListingTable, logical_plan::{ col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder, Repartition, ToDFSchema, @@ -903,6 +903,7 @@ mod roundtrip_tests { prelude::*, sql::parser::FileType, }; + use std::io; use std::sync::Arc; #[derive(Debug)] @@ -913,8 +914,9 @@ mod roundtrip_tests { async fn list_file( &self, _prefix: &str, - ) -> datafusion::error::Result { - Err(DataFusionError::NotImplemented( + ) -> datafusion_storage::Result { + Err(io::Error::new( + io::ErrorKind::Unsupported, "this is only a test object store".to_string(), )) } @@ -923,8 +925,9 @@ mod roundtrip_tests { &self, _prefix: &str, _delimiter: Option, - ) -> datafusion::error::Result { - Err(DataFusionError::NotImplemented( + ) -> datafusion_storage::Result { + Err(io::Error::new( + io::ErrorKind::Unsupported, "this is only a test object store".to_string(), )) } @@ -932,8 +935,9 @@ mod roundtrip_tests { fn file_reader( &self, _file: SizedFile, - ) -> datafusion::error::Result> { - Err(DataFusionError::NotImplemented( + ) -> datafusion_storage::Result> { + Err(io::Error::new( + io::ErrorKind::Unsupported, "this is only a test object store".to_string(), )) } diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 20205d96bab1e..19fdcfa535061 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -27,8 +27,9 @@ use crate::{convert_box_required, convert_required}; use chrono::{TimeZone, Utc}; use datafusion::datafusion_storage::{ - object_store::local::LocalFileSystem, FileMeta, PartitionedFile, SizedFile, + object_store::local::LocalFileSystem, FileMeta, SizedFile, }; +use datafusion::datasource::listing::PartitionedFile; use datafusion::execution::context::ExecutionProps; use datafusion::physical_plan::file_format::FileScanConfig; diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs index e7d803d54dd95..4b91a45f853ca 100644 --- a/ballista/rust/core/src/serde/physical_plan/mod.rs +++ b/ballista/rust/core/src/serde/physical_plan/mod.rs @@ -34,7 +34,7 @@ use crate::{convert_box_required, convert_required, into_physical_plan, into_req use datafusion::arrow::compute::SortOptions; use datafusion::arrow::datatypes::SchemaRef; use datafusion::datafusion_storage::object_store::local::LocalFileSystem; -use datafusion::datafusion_storage::PartitionedFile; +use datafusion::datasource::listing::PartitionedFile; use datafusion::logical_plan::window_frames::WindowFrame; use datafusion::physical_plan::aggregates::create_aggregate_expr; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; @@ -941,33 +941,30 @@ mod roundtrip_tests { use std::sync::Arc; use crate::serde::{AsExecutionPlan, BallistaCodec}; - use datafusion::datafusion_storage::{ - object_store::local::LocalFileSystem, PartitionedFile, - }; - use datafusion::physical_plan::sorts::sort::SortExec; - use datafusion::prelude::SessionContext; use datafusion::{ arrow::{ compute::kernels::sort::SortOptions, datatypes::{DataType, Field, Schema}, }, + datafusion_storage::object_store::local::LocalFileSystem, + datasource::listing::PartitionedFile, logical_plan::{JoinType, Operator}, physical_plan::{ empty::EmptyExec, expressions::{binary, col, lit, InListExpr, NotExpr}, expressions::{Avg, Column, PhysicalSortExpr}, + file_format::{FileScanConfig, ParquetExec}, filter::FilterExec, hash_aggregate::{AggregateMode, HashAggregateExec}, hash_join::{HashJoinExec, PartitionMode}, limit::{GlobalLimitExec, LocalLimitExec}, - AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, + sorts::sort::SortExec, + AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, Statistics, }, + prelude::SessionContext, scalar::ScalarValue, }; - use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; - use datafusion::physical_plan::Statistics; - use super::super::super::error::Result; use super::super::protobuf; use crate::execution_plans::ShuffleWriterExec; diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 06605ec78ee62..9a63762a4d028 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -35,7 +35,7 @@ use datafusion::physical_plan::{ Statistics, }; -use datafusion::datafusion_storage::PartitionedFile; +use datafusion::datasource::listing::PartitionedFile; use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::expressions::{Count, Literal}; diff --git a/datafusion-storage/Cargo.toml b/datafusion-storage/Cargo.toml index 377a087ec997b..44e9152ba9aa3 100644 --- a/datafusion-storage/Cargo.toml +++ b/datafusion-storage/Cargo.toml @@ -35,7 +35,6 @@ path = "src/lib.rs" [dependencies] async-trait = "0.1.41" chrono = { version = "0.4", default-features = false } -datafusion-common = { path = "../datafusion-common", version = "7.0.0" } futures = "0.3" parking_lot = "0.12" tempfile = "3" diff --git a/datafusion-storage/src/lib.rs b/datafusion-storage/src/lib.rs index ce6e1774ba4f9..5da690ad130bc 100644 --- a/datafusion-storage/src/lib.rs +++ b/datafusion-storage/src/lib.rs @@ -18,7 +18,10 @@ pub mod object_store; use chrono::{DateTime, Utc}; -use datafusion_common::ScalarValue; +use std::{io, result}; + +/// Result type for operations that could result in an io error +pub type Result = result::Result; /// Represents a specific file or a prefix (folder) that may /// require further resolution @@ -72,33 +75,3 @@ impl std::fmt::Display for FileMeta { write!(f, "{} (size: {})", self.path(), self.size()) } } - -#[derive(Debug, Clone)] -/// A single file that should be read, along with its schema, statistics -/// and partition column values that need to be appended to each row. -pub struct PartitionedFile { - /// Path for the file (e.g. URL, filesystem path, etc) - pub file_meta: FileMeta, - /// Values of partition columns to be appended to each row - pub partition_values: Vec, - // We may include row group range here for a more fine-grained parallel execution -} - -impl PartitionedFile { - /// Create a simple file without metadata or partition - pub fn new(path: String, size: u64) -> Self { - Self { - file_meta: FileMeta { - sized_file: SizedFile { path, size }, - last_modified: None, - }, - partition_values: vec![], - } - } -} - -impl std::fmt::Display for PartitionedFile { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.file_meta) - } -} diff --git a/datafusion-storage/src/object_store/local.rs b/datafusion-storage/src/object_store/local.rs index 540996e915fb2..f4872ae174207 100644 --- a/datafusion-storage/src/object_store/local.rs +++ b/datafusion-storage/src/object_store/local.rs @@ -18,15 +18,14 @@ //! Object store that represents the Local File System. use std::fs::{self, File, Metadata}; +use std::io; use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use async_trait::async_trait; use futures::{stream, AsyncRead, StreamExt}; -use datafusion_common::{DataFusionError, Result}; - -use crate::{FileMeta, PartitionedFile, SizedFile}; +use crate::{FileMeta, Result, SizedFile}; use super::{ FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream, ObjectStore, @@ -131,7 +130,10 @@ async fn list_all(prefix: String) -> Result { files.push(get_meta(child_path.to_owned(), metadata)) } } else { - return Err(DataFusionError::Plan("Invalid path".to_string())); + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid path".to_string(), + )); } } Ok(files) @@ -171,22 +173,19 @@ pub fn local_object_reader_stream(files: Vec) -> ObjectReaderStream { /// Helper method to convert a file location to a `LocalFileReader` pub fn local_object_reader(file: String) -> Arc { LocalFileSystem - .file_reader(local_unpartitioned_file(file).file_meta.sized_file) + .file_reader(local_unpartitioned_file(file).sized_file) .expect("File not found") } /// Helper method to fetch the file size and date at given path and create a `FileMeta` -pub fn local_unpartitioned_file(file: String) -> PartitionedFile { +pub fn local_unpartitioned_file(file: String) -> FileMeta { let metadata = fs::metadata(&file).expect("Local file metadata"); - PartitionedFile { - file_meta: FileMeta { - sized_file: SizedFile { - size: metadata.len(), - path: file, - }, - last_modified: metadata.modified().map(chrono::DateTime::from).ok(), + FileMeta { + sized_file: SizedFile { + size: metadata.len(), + path: file, }, - partition_values: vec![], + last_modified: metadata.modified().map(chrono::DateTime::from).ok(), } } diff --git a/datafusion-storage/src/object_store/mod.rs b/datafusion-storage/src/object_store/mod.rs index 047964855cb6d..5d2f76e27931d 100644 --- a/datafusion-storage/src/object_store/mod.rs +++ b/datafusion-storage/src/object_store/mod.rs @@ -27,17 +27,12 @@ use std::sync::Arc; use async_trait::async_trait; use futures::{AsyncRead, Stream, StreamExt}; -use crate::{FileMeta, ListEntry, PartitionedFile, SizedFile}; -use datafusion_common::Result; +use crate::{FileMeta, ListEntry, Result, SizedFile}; /// Stream of files listed from object store pub type FileMetaStream = Pin> + Send + Sync + 'static>>; -/// Stream of files get listed from object store -pub type PartitionedFileStream = - Pin> + Send + Sync + 'static>>; - /// Stream of list entries obtained from object store pub type ListEntryStream = Pin> + Send + Sync + 'static>>; diff --git a/datafusion/src/datasource/file_format/avro.rs b/datafusion/src/datasource/file_format/avro.rs index 0fc9c7a373309..f54650f869885 100644 --- a/datafusion/src/datasource/file_format/avro.rs +++ b/datafusion/src/datasource/file_format/avro.rs @@ -76,13 +76,13 @@ impl FileFormat for AvroFormat { mod tests { use crate::{ datafusion_storage::object_store::local::{ - local_object_reader, local_object_reader_stream, local_unpartitioned_file, - LocalFileSystem, + local_object_reader, local_object_reader_stream, LocalFileSystem, }, physical_plan::collect, }; use super::*; + use crate::datasource::listing::local_unpartitioned_file; use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::{ diff --git a/datafusion/src/datasource/file_format/csv.rs b/datafusion/src/datasource/file_format/csv.rs index 41f6df19087a2..955bc0f78eccc 100644 --- a/datafusion/src/datasource/file_format/csv.rs +++ b/datafusion/src/datasource/file_format/csv.rs @@ -138,11 +138,11 @@ mod tests { use arrow::array::StringArray; use super::*; + use crate::datasource::listing::local_unpartitioned_file; use crate::prelude::{SessionConfig, SessionContext}; use crate::{ datafusion_storage::object_store::local::{ - local_object_reader, local_object_reader_stream, local_unpartitioned_file, - LocalFileSystem, + local_object_reader, local_object_reader_stream, LocalFileSystem, }, datasource::file_format::FileScanConfig, physical_plan::collect, @@ -271,7 +271,7 @@ mod tests { let exec = format .create_physical_plan( FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), + object_store: Arc::new(LocalFileSystem), file_schema, file_groups, statistics, diff --git a/datafusion/src/datasource/file_format/json.rs b/datafusion/src/datasource/file_format/json.rs index 01e7982cabe80..0a347fa364baa 100644 --- a/datafusion/src/datasource/file_format/json.rs +++ b/datafusion/src/datasource/file_format/json.rs @@ -103,10 +103,9 @@ mod tests { use crate::prelude::{SessionConfig, SessionContext}; use crate::{ datafusion_storage::object_store::local::{ - local_object_reader, local_object_reader_stream, local_unpartitioned_file, - LocalFileSystem, + local_object_reader, local_object_reader_stream, LocalFileSystem, }, - datasource::file_format::FileScanConfig, + datasource::{file_format::FileScanConfig, listing::local_unpartitioned_file}, physical_plan::collect, }; diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 08998d2469b8d..374e360c2a969 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -89,6 +89,7 @@ impl FileFormat for ParquetFormat { async fn infer_schema(&self, readers: ObjectReaderStream) -> Result { let merged_schema = readers + .map_err(DataFusionError::IoError) .try_fold(Schema::empty(), |acc, reader| async { let next_schema = fetch_schema(reader); Schema::try_merge([acc, next_schema?]) @@ -351,16 +352,17 @@ impl ChunkReader for ChunkObjectReader { fn get_read(&self, start: u64, length: usize) -> ParquetResult { self.0 .sync_chunk_reader(start, length) + .map_err(DataFusionError::IoError) .map_err(|e| ParquetError::ArrowError(e.to_string())) } } #[cfg(test)] mod tests { + use crate::datasource::listing::local_unpartitioned_file; use crate::physical_plan::collect; use datafusion_storage::object_store::local::{ - local_object_reader, local_object_reader_stream, local_unpartitioned_file, - LocalFileSystem, + local_object_reader, local_object_reader_stream, LocalFileSystem, }; use super::*; diff --git a/datafusion/src/datasource/listing/helpers.rs b/datafusion/src/datasource/listing/helpers.rs index 265747402923c..10b3b488b0d00 100644 --- a/datafusion/src/datasource/listing/helpers.rs +++ b/datafusion/src/datasource/listing/helpers.rs @@ -29,6 +29,7 @@ use arrow::{ record_batch::RecordBatch, }; use chrono::{TimeZone, Utc}; +use datafusion_common::DataFusionError; use futures::{ stream::{self}, StreamExt, TryStreamExt, @@ -44,10 +45,8 @@ use crate::{ scalar::ScalarValue, }; -use datafusion_storage::{ - object_store::{ObjectStore, PartitionedFileStream}, - FileMeta, PartitionedFile, SizedFile, -}; +use super::{PartitionedFile, PartitionedFileStream}; +use datafusion_storage::{object_store::ObjectStore, FileMeta, SizedFile}; const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_"; const FILE_PATH_COLUMN_NAME: &str = "_df_part_file_path_"; @@ -234,7 +233,11 @@ pub async fn pruned_partition_list( // store if we switch to a streaming-stlye pruning of the files. For instance S3 lists // 1000 items at a time so batches of 1000 would be ideal with S3 as store. .chunks(1024) - .map(|v| v.into_iter().collect::>>()) + .map(|v| { + v.into_iter() + .collect::>>() + }) + .map_err(DataFusionError::IoError) .map(move |metas| paths_to_batch(table_partition_cols, &stream_path, &metas?)) .try_collect() .await?; diff --git a/datafusion/src/datasource/listing/mod.rs b/datafusion/src/datasource/listing/mod.rs index 2f1d03459ab07..2a2042efe8424 100644 --- a/datafusion/src/datasource/listing/mod.rs +++ b/datafusion/src/datasource/listing/mod.rs @@ -21,4 +21,51 @@ mod helpers; mod table; +use datafusion_common::ScalarValue; +use datafusion_storage::{object_store::local, FileMeta, Result, SizedFile}; +use futures::Stream; +use std::pin::Pin; + pub use table::{ListingOptions, ListingTable, ListingTableConfig}; + +/// Stream of files get listed from object store +pub type PartitionedFileStream = + Pin> + Send + Sync + 'static>>; + +#[derive(Debug, Clone)] +/// A single file that should be read, along with its schema, statistics +/// and partition column values that need to be appended to each row. +pub struct PartitionedFile { + /// Path for the file (e.g. URL, filesystem path, etc) + pub file_meta: FileMeta, + /// Values of partition columns to be appended to each row + pub partition_values: Vec, + // We may include row group range here for a more fine-grained parallel execution +} + +impl PartitionedFile { + /// Create a simple file without metadata or partition + pub fn new(path: String, size: u64) -> Self { + Self { + file_meta: FileMeta { + sized_file: SizedFile { path, size }, + last_modified: None, + }, + partition_values: vec![], + } + } +} + +impl std::fmt::Display for PartitionedFile { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.file_meta) + } +} + +/// Helper method to fetch the file size and date at given path and create a `FileMeta` +pub fn local_unpartitioned_file(file: String) -> PartitionedFile { + PartitionedFile { + file_meta: local::local_unpartitioned_file(file), + partition_values: vec![], + } +} diff --git a/datafusion/src/datasource/listing/table.rs b/datafusion/src/datasource/listing/table.rs index 440bbbaddd2f2..42b836c361b7a 100644 --- a/datafusion/src/datasource/listing/table.rs +++ b/datafusion/src/datasource/listing/table.rs @@ -42,7 +42,8 @@ use crate::datasource::{ get_statistics_with_limit, TableProvider, }; -use datafusion_storage::{object_store::ObjectStore, PartitionedFile}; +use super::PartitionedFile; +use datafusion_storage::object_store::ObjectStore; use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 24bc68765ea4f..2a801ff218fe2 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -28,12 +28,12 @@ pub mod object_store_registry; use futures::Stream; pub use self::datasource::{TableProvider, TableType}; +use self::listing::PartitionedFile; pub use self::memory::MemTable; use crate::arrow::datatypes::{Schema, SchemaRef}; use crate::error::Result; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics}; -use datafusion_storage::PartitionedFile; use futures::StreamExt; /// Get all files as well as the file level summary statistics (no statistic for partition columns). diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index c3556c394e184..f2ab1f40d796b 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -238,7 +238,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use super::*; - use crate::datafusion_storage::PartitionedFile; + use crate::datasource::listing::PartitionedFile; use crate::physical_plan::expressions::{col, PhysicalSortExpr}; use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; diff --git a/datafusion/src/physical_plan/file_format/avro.rs b/datafusion/src/physical_plan/file_format/avro.rs index 3ee23a95af673..3f6b85777a5a0 100644 --- a/datafusion/src/physical_plan/file_format/avro.rs +++ b/datafusion/src/physical_plan/file_format/avro.rs @@ -174,9 +174,12 @@ impl ExecutionPlan for AvroExec { #[cfg(test)] #[cfg(feature = "avro")] mod tests { - use crate::datasource::file_format::{avro::AvroFormat, FileFormat}; use crate::datasource::object_store::local::{ - local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + local_object_reader_stream, LocalFileSystem, + }; + use crate::datasource::{ + file_format::{avro::AvroFormat, FileFormat}, + listing::local_unpartitioned_file, }; use crate::scalar::ScalarValue; use arrow::datatypes::{DataType, Field, Schema}; diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index 10872a7d15650..4a5e2ed8cf764 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -220,9 +220,8 @@ pub async fn plan_to_csv( #[cfg(test)] mod tests { use super::*; - use crate::datafusion_storage::object_store::local::{ - local_unpartitioned_file, LocalFileSystem, - }; + use crate::datafusion_storage::object_store::local::LocalFileSystem; + use crate::datasource::listing::local_unpartitioned_file; use crate::prelude::*; use crate::test_util::aggr_test_schema_with_missing_col; use crate::{scalar::ScalarValue, test_util::aggr_test_schema}; diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs b/datafusion/src/physical_plan/file_format/file_stream.rs index 6d8fa5fc431e4..78222ee420455 100644 --- a/datafusion/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/src/physical_plan/file_format/file_stream.rs @@ -21,13 +21,14 @@ //! Note: Most traits here need to be marked `Sync + Send` to be //! compliant with the `SendableRecordBatchStream` trait. +use crate::datasource::listing::PartitionedFile; use crate::{physical_plan::RecordBatchStream, scalar::ScalarValue}; use arrow::{ datatypes::SchemaRef, error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; -use datafusion_storage::{object_store::ObjectStore, PartitionedFile}; +use datafusion_storage::object_store::ObjectStore; use futures::Stream; use std::{ io::Read, diff --git a/datafusion/src/physical_plan/file_format/json.rs b/datafusion/src/physical_plan/file_format/json.rs index e96ff8bfc3e2c..4eb2fda578216 100644 --- a/datafusion/src/physical_plan/file_format/json.rs +++ b/datafusion/src/physical_plan/file_format/json.rs @@ -194,9 +194,12 @@ mod tests { use futures::StreamExt; use crate::datafusion_storage::object_store::local::{ - local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, + local_object_reader_stream, LocalFileSystem, + }; + use crate::datasource::{ + file_format::{json::JsonFormat, FileFormat}, + listing::local_unpartitioned_file, }; - use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use crate::prelude::NdJsonReadOptions; use crate::prelude::*; use tempfile::TempDir; diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index 176f94a451700..bbb6e265e0412 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -38,12 +38,13 @@ pub use csv::CsvExec; pub(crate) use json::plan_to_json; pub use json::NdJsonExec; +use crate::datasource::listing::PartitionedFile; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, }; use arrow::array::{new_null_array, UInt16BufferBuilder}; -use datafusion_storage::{object_store::ObjectStore, PartitionedFile}; +use datafusion_storage::object_store::ObjectStore; use lazy_static::lazy_static; use log::info; use std::{ diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 42766993e6f7c..4e1e6eefc827a 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -41,7 +41,7 @@ use crate::{ }; use datafusion_common::Column; use datafusion_expr::Expr; -use datafusion_storage::{object_store::ObjectStore, PartitionedFile}; +use datafusion_storage::object_store::ObjectStore; use arrow::{ array::ArrayRef, @@ -66,6 +66,7 @@ use tokio::{ task, }; +use crate::datasource::listing::PartitionedFile; use crate::physical_plan::file_format::SchemaAdapter; use async_trait::async_trait; @@ -568,12 +569,13 @@ mod tests { use crate::{ assert_batches_sorted_eq, assert_contains, datafusion_storage::{ - object_store::local::{ - local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, - }, + object_store::local::{local_object_reader_stream, LocalFileSystem}, FileMeta, SizedFile, }, - datasource::file_format::{parquet::ParquetFormat, FileFormat}, + datasource::{ + file_format::{parquet::ParquetFormat, FileFormat}, + listing::local_unpartitioned_file, + }, physical_plan::collect, }; diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs index b96d981c1c7e9..4bdf6d666d717 100644 --- a/datafusion/src/test/mod.rs +++ b/datafusion/src/test/mod.rs @@ -18,10 +18,10 @@ //! Common unit test utility methods use crate::arrow::array::UInt32Array; -use crate::datafusion_storage::{ - object_store::local::local_unpartitioned_file, PartitionedFile, +use crate::datasource::{ + listing::{local_unpartitioned_file, PartitionedFile}, + MemTable, TableProvider, }; -use crate::datasource::{MemTable, TableProvider}; use crate::error::Result; use crate::from_slice::FromSlice; use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder}; diff --git a/datafusion/src/test/object_store.rs b/datafusion/src/test/object_store.rs index 9646890b631e9..61b4265c0dc1b 100644 --- a/datafusion/src/test/object_store.rs +++ b/datafusion/src/test/object_store.rs @@ -22,12 +22,9 @@ use std::{ sync::Arc, }; -use crate::{ - datafusion_storage::{ - object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore}, - FileMeta, SizedFile, - }, - error::{DataFusionError, Result}, +use crate::datafusion_storage::{ + object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore}, + FileMeta, Result, SizedFile, }; use async_trait::async_trait; use futures::{stream, AsyncRead, StreamExt}; @@ -84,14 +81,14 @@ impl ObjectStore for TestObjectStore { Some((_, size)) if *size == file.size => { Ok(Arc::new(EmptyObjectReader(*size))) } - Some(_) => Err(DataFusionError::IoError(io::Error::new( + Some(_) => Err(io::Error::new( io::ErrorKind::NotFound, "found in test list but wrong size", - ))), - None => Err(DataFusionError::IoError(io::Error::new( + )), + None => Err(io::Error::new( io::ErrorKind::NotFound, "not in provided test list", - ))), + )), } } } diff --git a/datafusion/tests/path_partition.rs b/datafusion/tests/path_partition.rs index 3e4604906f1ac..8475ea5b43a39 100644 --- a/datafusion/tests/path_partition.rs +++ b/datafusion/tests/path_partition.rs @@ -33,7 +33,7 @@ use datafusion::{ file_format::{csv::CsvFormat, parquet::ParquetFormat}, listing::{ListingOptions, ListingTable, ListingTableConfig}, }, - error::{DataFusionError, Result}, + error::Result, physical_plan::ColumnStatistics, prelude::SessionContext, test_util::{self, arrow_test_data, parquet_test_data}, @@ -352,7 +352,10 @@ impl MirroringObjectStore { #[async_trait] impl ObjectStore for MirroringObjectStore { - async fn list_file(&self, prefix: &str) -> Result { + async fn list_file( + &self, + prefix: &str, + ) -> datafusion_storage::Result { let prefix = prefix.to_owned(); let size = self.file_size; Ok(Box::pin( @@ -375,11 +378,14 @@ impl ObjectStore for MirroringObjectStore { &self, _prefix: &str, _delimiter: Option, - ) -> Result { + ) -> datafusion_storage::Result { unimplemented!() } - fn file_reader(&self, file: SizedFile) -> Result> { + fn file_reader( + &self, + file: SizedFile, + ) -> datafusion_storage::Result> { assert_eq!( self.file_size, file.size, "Requested files should have the same size as the mirrored file" @@ -389,10 +395,10 @@ impl ObjectStore for MirroringObjectStore { path: self.mirrored_file.clone(), size: self.file_size, })?), - None => Err(DataFusionError::IoError(io::Error::new( + None => Err(io::Error::new( io::ErrorKind::NotFound, "not in provided test list", - ))), + )), } } }