diff --git a/Cargo.toml b/Cargo.toml index dcf660cc62af7..9a060707c7ced 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ members = [ "datafusion-cli", "datafusion-examples", "datafusion-proto", + "datafusion-storage", "benchmarks", "ballista/rust/client", "ballista/rust/core", diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs index e1fc0a84aacb9..82e47874fae82 100644 --- a/ballista/rust/core/src/serde/logical_plan/mod.rs +++ b/ballista/rust/core/src/serde/logical_plan/mod.rs @@ -885,14 +885,17 @@ mod roundtrip_tests { use crate::serde::{AsLogicalPlan, BallistaCodec}; use async_trait::async_trait; use core::panic; - use datafusion::datasource::listing::ListingTable; - use datafusion::datasource::object_store::{ - FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile, + use datafusion::datafusion_storage::{ + object_store::{ + local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader, + ObjectStore, + }, + SizedFile, }; + use datafusion::datasource::listing::ListingTable; use datafusion::error::DataFusionError; use datafusion::{ arrow::datatypes::{DataType, Field, Schema}, - datasource::object_store::local::LocalFileSystem, logical_plan::{ col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder, Repartition, ToDFSchema, diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs index 9e60c1b64b62a..1068d2cd90b11 100644 --- a/ballista/rust/core/src/serde/mod.rs +++ b/ballista/rust/core/src/serde/mod.rs @@ -347,7 +347,7 @@ fn str_to_byte(s: &str) -> Result { mod tests { use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; - use datafusion::datasource::object_store::local::LocalFileSystem; + use datafusion::datafusion_storage::object_store::local::LocalFileSystem; use datafusion::error::DataFusionError; use datafusion::execution::context::{QueryPlanner, SessionState, TaskContext}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 97c31b9a9d018..20205d96bab1e 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -26,9 +26,9 @@ use crate::serde::{from_proto_binary_op, proto_error, protobuf}; use crate::{convert_box_required, convert_required}; use chrono::{TimeZone, Utc}; -use datafusion::datasource::object_store::local::LocalFileSystem; -use datafusion::datasource::object_store::{FileMeta, SizedFile}; -use datafusion::datasource::PartitionedFile; +use datafusion::datafusion_storage::{ + object_store::local::LocalFileSystem, FileMeta, PartitionedFile, SizedFile, +}; use datafusion::execution::context::ExecutionProps; use datafusion::physical_plan::file_format::FileScanConfig; diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs index ef42b796cda0d..e7d803d54dd95 100644 --- a/ballista/rust/core/src/serde/physical_plan/mod.rs +++ b/ballista/rust/core/src/serde/physical_plan/mod.rs @@ -33,8 +33,8 @@ use crate::serde::{ use crate::{convert_box_required, convert_required, into_physical_plan, into_required}; use datafusion::arrow::compute::SortOptions; use datafusion::arrow::datatypes::SchemaRef; -use datafusion::datasource::object_store::local::LocalFileSystem; -use datafusion::datasource::PartitionedFile; +use datafusion::datafusion_storage::object_store::local::LocalFileSystem; +use datafusion::datafusion_storage::PartitionedFile; use datafusion::logical_plan::window_frames::WindowFrame; use datafusion::physical_plan::aggregates::create_aggregate_expr; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; @@ -941,8 +941,9 @@ mod roundtrip_tests { use std::sync::Arc; use crate::serde::{AsExecutionPlan, BallistaCodec}; - use datafusion::datasource::object_store::local::LocalFileSystem; - use datafusion::datasource::PartitionedFile; + use datafusion::datafusion_storage::{ + object_store::local::LocalFileSystem, PartitionedFile, + }; use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::prelude::SessionContext; use datafusion::{ diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 975e012719674..06605ec78ee62 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -35,7 +35,7 @@ use datafusion::physical_plan::{ Statistics, }; -use datafusion::datasource::PartitionedFile; +use datafusion::datafusion_storage::PartitionedFile; use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::expressions::{Count, Literal}; diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs index 428af6a433134..a421bbdbb7baf 100644 --- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs @@ -33,9 +33,9 @@ use ballista_core::serde::scheduler::{ ExecutorData, ExecutorDataChange, ExecutorMetadata, }; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; +use datafusion::datafusion_storage::object_store::{local::LocalFileSystem, ObjectStore}; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; -use datafusion::datasource::object_store::{local::LocalFileSystem, ObjectStore}; use futures::StreamExt; use log::{debug, error, info, trace, warn}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 84fd432b35eea..aa70500d19e3d 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -50,10 +50,8 @@ use datafusion::{ }; use datafusion::{ arrow::util::pretty, - datasource::{ - listing::{ListingOptions, ListingTable, ListingTableConfig}, - object_store::local::LocalFileSystem, - }, + datafusion_storage::object_store::local::LocalFileSystem, + datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}, }; use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 5e77f3a57f9cb..d8b0405f19b99 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -19,9 +19,9 @@ use std::pin::Pin; use std::sync::Arc; use arrow_flight::SchemaAsIpc; +use datafusion::datafusion_storage::object_store::local::LocalFileSystem; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; -use datafusion::datasource::object_store::local::LocalFileSystem; use futures::Stream; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; diff --git a/datafusion-storage/Cargo.toml b/datafusion-storage/Cargo.toml new file mode 100644 index 0000000000000..377a087ec997b --- /dev/null +++ b/datafusion-storage/Cargo.toml @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-storage" +description = "Storage for DataFusion query engine" +version = "7.0.0" +homepage = "https://github.com/apache/arrow-datafusion" +repository = "https://github.com/apache/arrow-datafusion" +readme = "README.md" +authors = ["Apache Arrow "] +license = "Apache-2.0" +keywords = [ "arrow", "query", "sql" ] +edition = "2021" +rust-version = "1.59" + +[lib] +name = "datafusion_storage" +path = "src/lib.rs" + +[dependencies] +async-trait = "0.1.41" +chrono = { version = "0.4", default-features = false } +datafusion-common = { path = "../datafusion-common", version = "7.0.0" } +futures = "0.3" +parking_lot = "0.12" +tempfile = "3" +tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } diff --git a/datafusion-storage/README.md b/datafusion-storage/README.md new file mode 100644 index 0000000000000..859223e29242f --- /dev/null +++ b/datafusion-storage/README.md @@ -0,0 +1,24 @@ + + +# DataFusion Storage + +This module contains an `async` API for the [DataFusion][df] to access data, either remotely or locally. + +[df]: https://crates.io/crates/datafusion diff --git a/datafusion-storage/src/lib.rs b/datafusion-storage/src/lib.rs new file mode 100644 index 0000000000000..ce6e1774ba4f9 --- /dev/null +++ b/datafusion-storage/src/lib.rs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod object_store; + +use chrono::{DateTime, Utc}; +use datafusion_common::ScalarValue; + +/// Represents a specific file or a prefix (folder) that may +/// require further resolution +#[derive(Debug)] +pub enum ListEntry { + /// Specific file with metadata + FileMeta(FileMeta), + /// Prefix to be further resolved during partition discovery + Prefix(String), +} + +/// The path and size of the file. +#[derive(Debug, Clone, PartialEq)] +pub struct SizedFile { + /// Path of the file. It is relative to the current object + /// store (it does not specify the `xx://` scheme). + pub path: String, + /// File size in total + pub size: u64, +} + +/// Description of a file as returned by the listing command of a +/// given object store. The resulting path is relative to the +/// object store that generated it. +#[derive(Debug, Clone, PartialEq)] +pub struct FileMeta { + /// The path and size of the file. + pub sized_file: SizedFile, + /// The last modification time of the file according to the + /// object store metadata. This information might be used by + /// catalog systems like Delta Lake for time travel (see + /// ) + pub last_modified: Option>, +} + +impl FileMeta { + /// The path that describes this file. It is relative to the + /// associated object store. + pub fn path(&self) -> &str { + &self.sized_file.path + } + + /// The size of the file. + pub fn size(&self) -> u64 { + self.sized_file.size + } +} + +impl std::fmt::Display for FileMeta { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{} (size: {})", self.path(), self.size()) + } +} + +#[derive(Debug, Clone)] +/// A single file that should be read, along with its schema, statistics +/// and partition column values that need to be appended to each row. +pub struct PartitionedFile { + /// Path for the file (e.g. URL, filesystem path, etc) + pub file_meta: FileMeta, + /// Values of partition columns to be appended to each row + pub partition_values: Vec, + // We may include row group range here for a more fine-grained parallel execution +} + +impl PartitionedFile { + /// Create a simple file without metadata or partition + pub fn new(path: String, size: u64) -> Self { + Self { + file_meta: FileMeta { + sized_file: SizedFile { path, size }, + last_modified: None, + }, + partition_values: vec![], + } + } +} + +impl std::fmt::Display for PartitionedFile { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.file_meta) + } +} diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion-storage/src/object_store/local.rs similarity index 96% rename from datafusion/src/datasource/object_store/local.rs rename to datafusion-storage/src/object_store/local.rs index edfe5e2cecd6a..540996e915fb2 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion-storage/src/object_store/local.rs @@ -24,13 +24,15 @@ use std::sync::Arc; use async_trait::async_trait; use futures::{stream, AsyncRead, StreamExt}; -use crate::datasource::object_store::{ - FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, +use datafusion_common::{DataFusionError, Result}; + +use crate::{FileMeta, PartitionedFile, SizedFile}; + +use super::{ + FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream, ObjectStore, }; -use crate::datasource::PartitionedFile; -use crate::error::{DataFusionError, Result}; -use super::{ObjectReaderStream, SizedFile}; +pub static LOCAL_SCHEME: &str = "file"; #[derive(Debug)] /// Local File System as Object Store. diff --git a/datafusion-storage/src/object_store/mod.rs b/datafusion-storage/src/object_store/mod.rs new file mode 100644 index 0000000000000..047964855cb6d --- /dev/null +++ b/datafusion-storage/src/object_store/mod.rs @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Object Store abstracts access to an underlying file/object storage. + +pub mod local; + +use std::fmt::Debug; +use std::io::Read; +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::{AsyncRead, Stream, StreamExt}; + +use crate::{FileMeta, ListEntry, PartitionedFile, SizedFile}; +use datafusion_common::Result; + +/// Stream of files listed from object store +pub type FileMetaStream = + Pin> + Send + Sync + 'static>>; + +/// Stream of files get listed from object store +pub type PartitionedFileStream = + Pin> + Send + Sync + 'static>>; + +/// Stream of list entries obtained from object store +pub type ListEntryStream = + Pin> + Send + Sync + 'static>>; + +/// Stream readers opened on a given object store +pub type ObjectReaderStream = + Pin>> + Send + Sync>>; + +/// Object Reader for one file in an object store. +/// +/// Note that the dynamic dispatch on the reader might +/// have some performance impacts. +#[async_trait] +pub trait ObjectReader: Send + Sync { + /// Get reader for a part [start, start + length] in the file asynchronously + async fn chunk_reader(&self, start: u64, length: usize) + -> Result>; + + /// Get reader for a part [start, start + length] in the file + fn sync_chunk_reader( + &self, + start: u64, + length: usize, + ) -> Result>; + + /// Get reader for the entire file + fn sync_reader(&self) -> Result> { + self.sync_chunk_reader(0, self.length() as usize) + } + + /// Get the size of the file + fn length(&self) -> u64; +} + +/// A ObjectStore abstracts access to an underlying file/object storage. +/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes +#[async_trait] +pub trait ObjectStore: Sync + Send + Debug { + /// Returns all the files in path `prefix` + async fn list_file(&self, prefix: &str) -> Result; + + /// Calls `list_file` with a suffix filter + async fn list_file_with_suffix( + &self, + prefix: &str, + suffix: &str, + ) -> Result { + let file_stream = self.list_file(prefix).await?; + let suffix = suffix.to_owned(); + Ok(Box::pin(file_stream.filter(move |fr| { + let has_suffix = match fr { + Ok(f) => f.path().ends_with(&suffix), + Err(_) => true, + }; + async move { has_suffix } + }))) + } + + /// Returns all the files in `prefix` if the `prefix` is already a leaf dir, + /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided. + async fn list_dir( + &self, + prefix: &str, + delimiter: Option, + ) -> Result; + + /// Get object reader for one file + fn file_reader(&self, file: SizedFile) -> Result>; +} diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 2bab2b3028448..d8d4a115f30e5 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -63,6 +63,7 @@ datafusion-common = { path = "../datafusion-common", version = "7.0.0", features datafusion-expr = { path = "../datafusion-expr", version = "7.0.0" } datafusion-jit = { path = "../datafusion-jit", version = "7.0.0", optional = true } datafusion-physical-expr = { path = "../datafusion-physical-expr", version = "7.0.0" } +datafusion-storage = { path = "../datafusion-storage", version = "7.0.0" } futures = "0.3" hashbrown = { version = "0.12", features = ["raw"] } lazy_static = { version = "^1.4.0" } diff --git a/datafusion/benches/sort_limit_query_sql.rs b/datafusion/benches/sort_limit_query_sql.rs index 7860ebc450be8..e3806f22d0d52 100644 --- a/datafusion/benches/sort_limit_query_sql.rs +++ b/datafusion/benches/sort_limit_query_sql.rs @@ -18,9 +18,9 @@ #[macro_use] extern crate criterion; use criterion::Criterion; +use datafusion::datafusion_storage::object_store::local::LocalFileSystem; use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; -use datafusion::datasource::object_store::local::LocalFileSystem; use parking_lot::Mutex; use std::sync::Arc; diff --git a/datafusion/src/catalog/schema.rs b/datafusion/src/catalog/schema.rs index 24830a8d60b7e..73e23bc5984ad 100644 --- a/datafusion/src/catalog/schema.rs +++ b/datafusion/src/catalog/schema.rs @@ -24,9 +24,10 @@ use std::collections::HashMap; use std::sync::Arc; use crate::datasource::listing::{ListingTable, ListingTableConfig}; -use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry}; +use crate::datasource::object_store_registry::ObjectStoreRegistry; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; +use datafusion_storage::object_store::ObjectStore; /// Represents a schema, comprising a number of named tables. pub trait SchemaProvider: Sync + Send { @@ -250,8 +251,8 @@ mod tests { use crate::catalog::schema::{ MemorySchemaProvider, ObjectStoreSchemaProvider, SchemaProvider, }; + use crate::datafusion_storage::object_store::local::LocalFileSystem; use crate::datasource::empty::EmptyTable; - use crate::datasource::object_store::local::LocalFileSystem; use crate::execution::context::SessionContext; use futures::StreamExt; diff --git a/datafusion/src/datasource/file_format/avro.rs b/datafusion/src/datasource/file_format/avro.rs index ed77faaf564e2..0fc9c7a373309 100644 --- a/datafusion/src/datasource/file_format/avro.rs +++ b/datafusion/src/datasource/file_format/avro.rs @@ -27,12 +27,12 @@ use futures::StreamExt; use super::FileFormat; use crate::avro_to_arrow::read_avro_schema_from_reader; -use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::error::Result; use crate::logical_plan::Expr; use crate::physical_plan::file_format::{AvroExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; +use datafusion_storage::object_store::{ObjectReader, ObjectReaderStream}; /// The default file extension of avro files pub const DEFAULT_AVRO_EXTENSION: &str = ".avro"; @@ -75,7 +75,7 @@ impl FileFormat for AvroFormat { #[cfg(feature = "avro")] mod tests { use crate::{ - datasource::object_store::local::{ + datafusion_storage::object_store::local::{ local_object_reader, local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, }, @@ -394,7 +394,7 @@ mod tests { mod tests { use super::*; - use crate::datasource::object_store::local::local_object_reader_stream; + use crate::datafusion_storage::object_store::local::local_object_reader_stream; use crate::error::DataFusionError; #[tokio::test] diff --git a/datafusion/src/datasource/file_format/csv.rs b/datafusion/src/datasource/file_format/csv.rs index 3abe9e6482cd0..41f6df19087a2 100644 --- a/datafusion/src/datasource/file_format/csv.rs +++ b/datafusion/src/datasource/file_format/csv.rs @@ -26,12 +26,12 @@ use async_trait::async_trait; use futures::StreamExt; use super::FileFormat; -use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::error::Result; use crate::logical_plan::Expr; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; +use datafusion_storage::object_store::{ObjectReader, ObjectReaderStream}; /// The default file extension of csv files pub const DEFAULT_CSV_EXTENSION: &str = ".csv"; @@ -140,13 +140,11 @@ mod tests { use super::*; use crate::prelude::{SessionConfig, SessionContext}; use crate::{ - datasource::{ - file_format::FileScanConfig, - object_store::local::{ - local_object_reader, local_object_reader_stream, - local_unpartitioned_file, LocalFileSystem, - }, + datafusion_storage::object_store::local::{ + local_object_reader, local_object_reader_stream, local_unpartitioned_file, + LocalFileSystem, }, + datasource::file_format::FileScanConfig, physical_plan::collect, }; diff --git a/datafusion/src/datasource/file_format/json.rs b/datafusion/src/datasource/file_format/json.rs index 2f5f631a5a30e..01e7982cabe80 100644 --- a/datafusion/src/datasource/file_format/json.rs +++ b/datafusion/src/datasource/file_format/json.rs @@ -30,12 +30,12 @@ use futures::StreamExt; use super::FileFormat; use super::FileScanConfig; -use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::error::Result; use crate::logical_plan::Expr; use crate::physical_plan::file_format::NdJsonExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; +use datafusion_storage::object_store::{ObjectReader, ObjectReaderStream}; /// The default file extension of json files pub const DEFAULT_JSON_EXTENSION: &str = ".json"; @@ -102,13 +102,11 @@ mod tests { use super::*; use crate::prelude::{SessionConfig, SessionContext}; use crate::{ - datasource::{ - file_format::FileScanConfig, - object_store::local::{ - local_object_reader, local_object_reader_stream, - local_unpartitioned_file, LocalFileSystem, - }, + datafusion_storage::object_store::local::{ + local_object_reader, local_object_reader_stream, local_unpartitioned_file, + LocalFileSystem, }, + datasource::file_format::FileScanConfig, physical_plan::collect, }; diff --git a/datafusion/src/datasource/file_format/mod.rs b/datafusion/src/datasource/file_format/mod.rs index 21da2e1e6a276..611da32ee46ec 100644 --- a/datafusion/src/datasource/file_format/mod.rs +++ b/datafusion/src/datasource/file_format/mod.rs @@ -34,10 +34,10 @@ use crate::physical_plan::{ExecutionPlan, Statistics}; use async_trait::async_trait; -use super::object_store::{ObjectReader, ObjectReaderStream}; +use datafusion_storage::object_store::{ObjectReader, ObjectReaderStream}; /// This trait abstracts all the file format specific implementations -/// from the `TableProvider`. This helps code re-utilization accross +/// from the `TableProvider`. This helps code re-utilization across /// providers that support the the same file formats. #[async_trait] pub trait FileFormat: Send + Sync + fmt::Debug { diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 1dd6b02b325ff..08998d2469b8d 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -40,7 +40,6 @@ use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, }; use crate::arrow::datatypes::{DataType, Field}; -use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::DataFusionError; use crate::error::Result; @@ -50,6 +49,7 @@ use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::file_format::ParquetExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::{Accumulator, Statistics}; +use datafusion_storage::object_store::{ObjectReader, ObjectReaderStream}; /// The default file exetension of parquet files pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; @@ -357,12 +357,10 @@ impl ChunkReader for ChunkObjectReader { #[cfg(test)] mod tests { - use crate::{ - datasource::object_store::local::{ - local_object_reader, local_object_reader_stream, local_unpartitioned_file, - LocalFileSystem, - }, - physical_plan::collect, + use crate::physical_plan::collect; + use datafusion_storage::object_store::local::{ + local_object_reader, local_object_reader_stream, local_unpartitioned_file, + LocalFileSystem, }; use super::*; diff --git a/datafusion/src/datasource/listing/helpers.rs b/datafusion/src/datasource/listing/helpers.rs index 92f4511845ad3..a52e497b31313 100644 --- a/datafusion/src/datasource/listing/helpers.rs +++ b/datafusion/src/datasource/listing/helpers.rs @@ -36,6 +36,7 @@ use futures::{ use log::debug; use crate::{ + datasource::MemTable, error::Result, execution::context::SessionContext, logical_plan::{self, Expr, ExprVisitable, ExpressionVisitor, Recursion}, @@ -43,9 +44,9 @@ use crate::{ scalar::ScalarValue, }; -use crate::datasource::{ - object_store::{FileMeta, ObjectStore, SizedFile}, - MemTable, PartitionedFile, PartitionedFileStream, +use datafusion_storage::{ + object_store::{ObjectStore, PartitionedFileStream}, + FileMeta, PartitionedFile, SizedFile, }; const FILE_SIZE_COLUMN_NAME: &str = "_df_part_file_size_"; diff --git a/datafusion/src/datasource/listing/table.rs b/datafusion/src/datasource/listing/table.rs index 3fbd6c12397df..440bbbaddd2f2 100644 --- a/datafusion/src/datasource/listing/table.rs +++ b/datafusion/src/datasource/listing/table.rs @@ -24,10 +24,6 @@ use async_trait::async_trait; use futures::StreamExt; use crate::{ - datasource::file_format::avro::AvroFormat, - datasource::file_format::csv::CsvFormat, - datasource::file_format::json::JsonFormat, - datasource::file_format::parquet::ParquetFormat, error::{DataFusionError, Result}, logical_plan::Expr, physical_plan::{ @@ -38,10 +34,16 @@ use crate::{ }; use crate::datasource::{ - datasource::TableProviderFilterPushDown, file_format::FileFormat, - get_statistics_with_limit, object_store::ObjectStore, PartitionedFile, TableProvider, + datasource::TableProviderFilterPushDown, + file_format::{ + avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat, + FileFormat, + }, + get_statistics_with_limit, TableProvider, }; +use datafusion_storage::{object_store::ObjectStore, PartitionedFile}; + use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; /// Configuration for creating a 'ListingTable' @@ -395,10 +397,8 @@ impl ListingTable { mod tests { use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; use crate::{ - datasource::{ - file_format::{avro::AvroFormat, parquet::ParquetFormat}, - object_store::local::LocalFileSystem, - }, + datafusion_storage::object_store::local::LocalFileSystem, + datasource::file_format::{avro::AvroFormat, parquet::ParquetFormat}, logical_plan::{col, lit}, test::{columns, object_store::TestObjectStore}, }; diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 9a7b17d1a867b..24bc68765ea4f 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -23,20 +23,18 @@ pub mod empty; pub mod file_format; pub mod listing; pub mod memory; -pub mod object_store; +pub mod object_store_registry; use futures::Stream; pub use self::datasource::{TableProvider, TableType}; pub use self::memory::MemTable; -use self::object_store::{FileMeta, SizedFile}; use crate::arrow::datatypes::{Schema, SchemaRef}; use crate::error::Result; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics}; -use crate::scalar::ScalarValue; +use datafusion_storage::PartitionedFile; use futures::StreamExt; -use std::pin::Pin; /// Get all files as well as the file level summary statistics (no statistic for partition columns). /// If the optional `limit` is provided, includes only sufficient files. @@ -124,40 +122,6 @@ pub async fn get_statistics_with_limit( Ok((result_files, statistics)) } -#[derive(Debug, Clone)] -/// A single file that should be read, along with its schema, statistics -/// and partition column values that need to be appended to each row. -pub struct PartitionedFile { - /// Path for the file (e.g. URL, filesystem path, etc) - pub file_meta: FileMeta, - /// Values of partition columns to be appended to each row - pub partition_values: Vec, - // We may include row group range here for a more fine-grained parallel execution -} - -impl PartitionedFile { - /// Create a simple file without metadata or partition - pub fn new(path: String, size: u64) -> Self { - Self { - file_meta: FileMeta { - sized_file: SizedFile { path, size }, - last_modified: None, - }, - partition_values: vec![], - } - } -} - -/// Stream of files get listed from object store -pub type PartitionedFileStream = - Pin> + Send + Sync + 'static>>; - -impl std::fmt::Display for PartitionedFile { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.file_meta) - } -} - fn create_max_min_accs( schema: &Schema, ) -> (Vec>, Vec>) { diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs deleted file mode 100644 index 3a9da67017001..0000000000000 --- a/datafusion/src/datasource/object_store/mod.rs +++ /dev/null @@ -1,244 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Object Store abstracts access to an underlying file/object storage. - -pub mod local; - -use parking_lot::RwLock; -use std::collections::HashMap; -use std::fmt::{self, Debug}; -use std::io::Read; -use std::pin::Pin; -use std::sync::Arc; - -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use futures::{AsyncRead, Stream, StreamExt}; - -use local::LocalFileSystem; - -use crate::error::{DataFusionError, Result}; - -/// Object Reader for one file in an object store. -/// -/// Note that the dynamic dispatch on the reader might -/// have some performance impacts. -#[async_trait] -pub trait ObjectReader: Send + Sync { - /// Get reader for a part [start, start + length] in the file asynchronously - async fn chunk_reader(&self, start: u64, length: usize) - -> Result>; - - /// Get reader for a part [start, start + length] in the file - fn sync_chunk_reader( - &self, - start: u64, - length: usize, - ) -> Result>; - - /// Get reader for the entire file - fn sync_reader(&self) -> Result> { - self.sync_chunk_reader(0, self.length() as usize) - } - - /// Get the size of the file - fn length(&self) -> u64; -} - -/// Represents a specific file or a prefix (folder) that may -/// require further resolution -#[derive(Debug)] -pub enum ListEntry { - /// Specific file with metadata - FileMeta(FileMeta), - /// Prefix to be further resolved during partition discovery - Prefix(String), -} - -/// The path and size of the file. -#[derive(Debug, Clone, PartialEq)] -pub struct SizedFile { - /// Path of the file. It is relative to the current object - /// store (it does not specify the `xx://` scheme). - pub path: String, - /// File size in total - pub size: u64, -} - -/// Description of a file as returned by the listing command of a -/// given object store. The resulting path is relative to the -/// object store that generated it. -#[derive(Debug, Clone, PartialEq)] -pub struct FileMeta { - /// The path and size of the file. - pub sized_file: SizedFile, - /// The last modification time of the file according to the - /// object store metadata. This information might be used by - /// catalog systems like Delta Lake for time travel (see - /// ) - pub last_modified: Option>, -} - -impl FileMeta { - /// The path that describes this file. It is relative to the - /// associated object store. - pub fn path(&self) -> &str { - &self.sized_file.path - } - - /// The size of the file. - pub fn size(&self) -> u64 { - self.sized_file.size - } -} - -impl std::fmt::Display for FileMeta { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{} (size: {})", self.path(), self.size()) - } -} - -/// Stream of files listed from object store -pub type FileMetaStream = - Pin> + Send + Sync + 'static>>; - -/// Stream of list entries obtained from object store -pub type ListEntryStream = - Pin> + Send + Sync + 'static>>; - -/// Stream readers opened on a given object store -pub type ObjectReaderStream = - Pin>> + Send + Sync>>; - -/// A ObjectStore abstracts access to an underlying file/object storage. -/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes -#[async_trait] -pub trait ObjectStore: Sync + Send + Debug { - /// Returns all the files in path `prefix` - async fn list_file(&self, prefix: &str) -> Result; - - /// Calls `list_file` with a suffix filter - async fn list_file_with_suffix( - &self, - prefix: &str, - suffix: &str, - ) -> Result { - let file_stream = self.list_file(prefix).await?; - let suffix = suffix.to_owned(); - Ok(Box::pin(file_stream.filter(move |fr| { - let has_suffix = match fr { - Ok(f) => f.path().ends_with(&suffix), - Err(_) => true, - }; - async move { has_suffix } - }))) - } - - /// Returns all the files in `prefix` if the `prefix` is already a leaf dir, - /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided. - async fn list_dir( - &self, - prefix: &str, - delimiter: Option, - ) -> Result; - - /// Get object reader for one file - fn file_reader(&self, file: SizedFile) -> Result>; -} - -static LOCAL_SCHEME: &str = "file"; - -/// A Registry holds all the object stores at Runtime with a scheme for each store. -/// This allows the user to extend DataFusion with different storage systems such as S3 or HDFS -/// and query data inside these systems. -pub struct ObjectStoreRegistry { - /// A map from scheme to object store that serve list / read operations for the store - pub object_stores: RwLock>>, -} - -impl fmt::Debug for ObjectStoreRegistry { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("ObjectStoreRegistry") - .field( - "schemes", - &self.object_stores.read().keys().collect::>(), - ) - .finish() - } -} - -impl Default for ObjectStoreRegistry { - fn default() -> Self { - Self::new() - } -} - -impl ObjectStoreRegistry { - /// Create the registry that object stores can registered into. - /// ['LocalFileSystem'] store is registered in by default to support read local files natively. - pub fn new() -> Self { - let mut map: HashMap> = HashMap::new(); - map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem)); - - Self { - object_stores: RwLock::new(map), - } - } - - /// Adds a new store to this registry. - /// If a store of the same prefix existed before, it is replaced in the registry and returned. - pub fn register_store( - &self, - scheme: String, - store: Arc, - ) -> Option> { - let mut stores = self.object_stores.write(); - stores.insert(scheme, store) - } - - /// Get the store registered for scheme - pub fn get(&self, scheme: &str) -> Option> { - let stores = self.object_stores.read(); - stores.get(scheme).cloned() - } - - /// Get a suitable store for the URI based on it's scheme. For example: - /// - URI with scheme `file://` or no schema will return the default LocalFS store - /// - URI with scheme `s3://` will return the S3 store if it's registered - /// Returns a tuple with the store and the self-described uri of the file in that store - pub fn get_by_uri<'a>( - &self, - uri: &'a str, - ) -> Result<(Arc, &'a str)> { - if let Some((scheme, _path)) = uri.split_once("://") { - let stores = self.object_stores.read(); - let store = stores - .get(&*scheme.to_lowercase()) - .map(Clone::clone) - .ok_or_else(|| { - DataFusionError::Internal(format!( - "No suitable object store found for {}", - scheme - )) - })?; - Ok((store, uri)) - } else { - Ok((Arc::new(LocalFileSystem), uri)) - } - } -} diff --git a/datafusion/src/datasource/object_store_registry.rs b/datafusion/src/datasource/object_store_registry.rs new file mode 100644 index 0000000000000..e0504c0035480 --- /dev/null +++ b/datafusion/src/datasource/object_store_registry.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme for each store. +//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS +//! and query data inside these systems. + +use datafusion_common::{DataFusionError, Result}; +use datafusion_storage::object_store::local::{LocalFileSystem, LOCAL_SCHEME}; +use datafusion_storage::object_store::ObjectStore; +use parking_lot::RwLock; +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; + +/// Object store registry +pub struct ObjectStoreRegistry { + /// A map from scheme to object store that serve list / read operations for the store + pub object_stores: RwLock>>, +} + +impl fmt::Debug for ObjectStoreRegistry { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ObjectStoreRegistry") + .field( + "schemes", + &self.object_stores.read().keys().collect::>(), + ) + .finish() + } +} + +impl Default for ObjectStoreRegistry { + fn default() -> Self { + Self::new() + } +} + +impl ObjectStoreRegistry { + /// Create the registry that object stores can registered into. + /// ['LocalFileSystem'] store is registered in by default to support read local files natively. + pub fn new() -> Self { + let mut map: HashMap> = HashMap::new(); + map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem)); + + Self { + object_stores: RwLock::new(map), + } + } + + /// Adds a new store to this registry. + /// If a store of the same prefix existed before, it is replaced in the registry and returned. + pub fn register_store( + &self, + scheme: String, + store: Arc, + ) -> Option> { + let mut stores = self.object_stores.write(); + stores.insert(scheme, store) + } + + /// Get the store registered for scheme + pub fn get(&self, scheme: &str) -> Option> { + let stores = self.object_stores.read(); + stores.get(scheme).cloned() + } + + /// Get a suitable store for the URI based on it's scheme. For example: + /// - URI with scheme `file://` or no schema will return the default LocalFS store + /// - URI with scheme `s3://` will return the S3 store if it's registered + /// Returns a tuple with the store and the self-described uri of the file in that store + pub fn get_by_uri<'a>( + &self, + uri: &'a str, + ) -> Result<(Arc, &'a str)> { + if let Some((scheme, _path)) = uri.split_once("://") { + let stores = self.object_stores.read(); + let store = stores + .get(&*scheme.to_lowercase()) + .map(Clone::clone) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "No suitable object store found for {}", + scheme + )) + })?; + Ok((store, uri)) + } else { + Ok((Arc::new(LocalFileSystem), uri)) + } + } +} diff --git a/datafusion/src/execution/runtime_env.rs b/datafusion/src/execution/runtime_env.rs index b5925d24f8833..70df02248beef 100644 --- a/datafusion/src/execution/runtime_env.rs +++ b/datafusion/src/execution/runtime_env.rs @@ -26,8 +26,9 @@ use crate::{ }, }; -use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry}; +use crate::datasource::object_store_registry::ObjectStoreRegistry; use datafusion_common::DataFusionError; +use datafusion_storage::object_store::ObjectStore; use std::fmt::{Debug, Formatter}; use std::path::PathBuf; use std::sync::Arc; diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs index 2d396a61b325e..9f3915c63185e 100644 --- a/datafusion/src/lib.rs +++ b/datafusion/src/lib.rs @@ -225,6 +225,9 @@ pub mod variable; pub use arrow; pub use parquet; +// re-export object store dependencies +pub use datafusion_storage; + #[cfg(feature = "row")] pub mod row; diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index f2798b96a0d5d..a2bead08916a5 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -21,7 +21,6 @@ use crate::datasource::{ empty::EmptyTable, file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION}, listing::{ListingOptions, ListingTable, ListingTableConfig}, - object_store::ObjectStore, MemTable, TableProvider, }; use crate::error::{DataFusionError, Result}; @@ -37,6 +36,7 @@ use arrow::{ datatypes::{DataType, Schema, SchemaRef}, record_batch::RecordBatch, }; +use datafusion_storage::object_store::ObjectStore; use std::convert::TryFrom; use std::iter; use std::{ diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 00a9b9e610797..c3556c394e184 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -238,7 +238,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use super::*; - use crate::datasource::PartitionedFile; + use crate::datafusion_storage::PartitionedFile; use crate::physical_plan::expressions::{col, PhysicalSortExpr}; use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs b/datafusion/src/physical_plan/coalesce_partitions.rs index 0e550f5ef7d68..d0a776a57d9d0 100644 --- a/datafusion/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/src/physical_plan/coalesce_partitions.rs @@ -221,7 +221,7 @@ mod tests { use futures::FutureExt; use super::*; - use crate::datasource::object_store::local::LocalFileSystem; + use crate::datafusion_storage::object_store::local::LocalFileSystem; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::{collect, common}; use crate::prelude::SessionContext; diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index 37418b0e976e2..10872a7d15650 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -220,13 +220,12 @@ pub async fn plan_to_csv( #[cfg(test)] mod tests { use super::*; + use crate::datafusion_storage::object_store::local::{ + local_unpartitioned_file, LocalFileSystem, + }; use crate::prelude::*; use crate::test_util::aggr_test_schema_with_missing_col; - use crate::{ - datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem}, - scalar::ScalarValue, - test_util::aggr_test_schema, - }; + use crate::{scalar::ScalarValue, test_util::aggr_test_schema}; use arrow::datatypes::*; use futures::StreamExt; use std::fs::File; diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs b/datafusion/src/physical_plan/file_format/file_stream.rs index 958b1721bb392..6d8fa5fc431e4 100644 --- a/datafusion/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/src/physical_plan/file_format/file_stream.rs @@ -21,16 +21,13 @@ //! Note: Most traits here need to be marked `Sync + Send` to be //! compliant with the `SendableRecordBatchStream` trait. -use crate::{ - datasource::{object_store::ObjectStore, PartitionedFile}, - physical_plan::RecordBatchStream, - scalar::ScalarValue, -}; +use crate::{physical_plan::RecordBatchStream, scalar::ScalarValue}; use arrow::{ datatypes::SchemaRef, error::{ArrowError, Result as ArrowResult}, record_batch::RecordBatch, }; +use datafusion_storage::{object_store::ObjectStore, PartitionedFile}; use futures::Stream; use std::{ io::Read, diff --git a/datafusion/src/physical_plan/file_format/json.rs b/datafusion/src/physical_plan/file_format/json.rs index 14dfcf52509af..e96ff8bfc3e2c 100644 --- a/datafusion/src/physical_plan/file_format/json.rs +++ b/datafusion/src/physical_plan/file_format/json.rs @@ -193,12 +193,10 @@ mod tests { use arrow::datatypes::{Field, Schema}; use futures::StreamExt; - use crate::datasource::{ - file_format::{json::JsonFormat, FileFormat}, - object_store::local::{ - local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, - }, + use crate::datafusion_storage::object_store::local::{ + local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, }; + use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use crate::prelude::NdJsonReadOptions; use crate::prelude::*; use tempfile::TempDir; diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index 1e359ab0d85c3..176f94a451700 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -38,13 +38,12 @@ pub use csv::CsvExec; pub(crate) use json::plan_to_json; pub use json::NdJsonExec; -use crate::error::DataFusionError; use crate::{ - datasource::{object_store::ObjectStore, PartitionedFile}, - error::Result, + error::{DataFusionError, Result}, scalar::ScalarValue, }; use arrow::array::{new_null_array, UInt16BufferBuilder}; +use datafusion_storage::{object_store::ObjectStore, PartitionedFile}; use lazy_static::lazy_static; use log::info; use std::{ diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 1a58928f23a46..42766993e6f7c 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -25,8 +25,6 @@ use std::sync::Arc; use std::{any::Any, convert::TryInto}; use crate::datasource::file_format::parquet::ChunkObjectReader; -use crate::datasource::object_store::ObjectStore; -use crate::datasource::PartitionedFile; use crate::execution::context::{SessionState, TaskContext}; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::{ @@ -43,6 +41,7 @@ use crate::{ }; use datafusion_common::Column; use datafusion_expr::Expr; +use datafusion_storage::{object_store::ObjectStore, PartitionedFile}; use arrow::{ array::ArrayRef, @@ -568,15 +567,13 @@ pub async fn plan_to_parquet( mod tests { use crate::{ assert_batches_sorted_eq, assert_contains, - datasource::{ - file_format::{parquet::ParquetFormat, FileFormat}, - object_store::{ - local::{ - local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, - }, - FileMeta, SizedFile, + datafusion_storage::{ + object_store::local::{ + local_object_reader_stream, local_unpartitioned_file, LocalFileSystem, }, + FileMeta, SizedFile, }, + datasource::file_format::{parquet::ParquetFormat, FileFormat}, physical_plan::collect, }; diff --git a/datafusion/src/physical_plan/filter.rs b/datafusion/src/physical_plan/filter.rs index 689cbe244eec7..218fea5c5b0b7 100644 --- a/datafusion/src/physical_plan/filter.rs +++ b/datafusion/src/physical_plan/filter.rs @@ -241,7 +241,7 @@ impl RecordBatchStream for FilterExecStream { mod tests { use super::*; - use crate::datasource::object_store::local::LocalFileSystem; + use crate::datafusion_storage::object_store::local::LocalFileSystem; use crate::physical_plan::collect; use crate::physical_plan::expressions::*; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs index f5de703eab105..6b37a67c7806c 100644 --- a/datafusion/src/physical_plan/limit.rs +++ b/datafusion/src/physical_plan/limit.rs @@ -428,7 +428,7 @@ mod tests { use common::collect; use super::*; - use crate::datasource::object_store::local::LocalFileSystem; + use crate::datafusion_storage::object_store::local::LocalFileSystem; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::common; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index ee4494cf58bf4..803414f2cd8cf 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -1441,7 +1441,7 @@ fn tuple_err(value: (Result, Result)) -> Result<(T, R)> { #[cfg(test)] mod tests { use super::*; - use crate::datasource::object_store::local::LocalFileSystem; + use crate::datafusion_storage::object_store::local::LocalFileSystem; use crate::execution::context::TaskContext; use crate::execution::options::CsvReadOptions; use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs index a9bb8481a9d1b..425b730277618 100644 --- a/datafusion/src/physical_plan/projection.rs +++ b/datafusion/src/physical_plan/projection.rs @@ -300,7 +300,7 @@ impl RecordBatchStream for ProjectionStream { mod tests { use super::*; - use crate::datasource::object_store::local::LocalFileSystem; + use crate::datafusion_storage::object_store::local::LocalFileSystem; use crate::physical_plan::expressions::{self, col}; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::prelude::SessionContext; diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index 62a3f84ca9401..6ac4a47c86ae5 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -595,7 +595,7 @@ async fn do_sort( #[cfg(test)] mod tests { use super::*; - use crate::datasource::object_store::local::LocalFileSystem; + use crate::datafusion_storage::object_store::local::LocalFileSystem; use crate::execution::context::SessionConfig; use crate::execution::runtime_env::RuntimeConfig; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index 5082b9f7ffc15..71e51571161da 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -605,7 +605,7 @@ impl RecordBatchStream for SortPreservingMergeStream { #[cfg(test)] mod tests { - use crate::datasource::object_store::local::LocalFileSystem; + use crate::datafusion_storage::object_store::local::LocalFileSystem; use crate::from_slice::FromSlice; use crate::physical_plan::metrics::MetricValue; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; diff --git a/datafusion/src/physical_plan/union.rs b/datafusion/src/physical_plan/union.rs index bd040df2f1b93..fb25cf30e8dc2 100644 --- a/datafusion/src/physical_plan/union.rs +++ b/datafusion/src/physical_plan/union.rs @@ -234,7 +234,7 @@ fn stats_union(mut left: Statistics, right: Statistics) -> Statistics { #[cfg(test)] mod tests { use super::*; - use crate::datasource::object_store::{local::LocalFileSystem, ObjectStore}; + use crate::datafusion_storage::object_store::{local::LocalFileSystem, ObjectStore}; use crate::{test, test_util}; use crate::prelude::SessionContext; diff --git a/datafusion/src/physical_plan/windows/mod.rs b/datafusion/src/physical_plan/windows/mod.rs index 03e7342e938bc..f45b0560a6e67 100644 --- a/datafusion/src/physical_plan/windows/mod.rs +++ b/datafusion/src/physical_plan/windows/mod.rs @@ -152,7 +152,7 @@ fn create_built_in_window_expr( #[cfg(test)] mod tests { use super::*; - use crate::datasource::object_store::local::LocalFileSystem; + use crate::datafusion_storage::object_store::local::LocalFileSystem; use crate::physical_plan::aggregates::AggregateFunction; use crate::physical_plan::expressions::col; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs index cebd9ee02d1c7..b96d981c1c7e9 100644 --- a/datafusion/src/test/mod.rs +++ b/datafusion/src/test/mod.rs @@ -18,8 +18,10 @@ //! Common unit test utility methods use crate::arrow::array::UInt32Array; -use crate::datasource::object_store::local::local_unpartitioned_file; -use crate::datasource::{MemTable, PartitionedFile, TableProvider}; +use crate::datafusion_storage::{ + object_store::local::local_unpartitioned_file, PartitionedFile, +}; +use crate::datasource::{MemTable, TableProvider}; use crate::error::Result; use crate::from_slice::FromSlice; use crate::logical_plan::{LogicalPlan, LogicalPlanBuilder}; diff --git a/datafusion/src/test/object_store.rs b/datafusion/src/test/object_store.rs index e93b4cd2d410d..9646890b631e9 100644 --- a/datafusion/src/test/object_store.rs +++ b/datafusion/src/test/object_store.rs @@ -23,8 +23,9 @@ use std::{ }; use crate::{ - datasource::object_store::{ - FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, SizedFile, + datafusion_storage::{ + object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore}, + FileMeta, SizedFile, }, error::{DataFusionError, Result}, }; diff --git a/datafusion/tests/path_partition.rs b/datafusion/tests/path_partition.rs index 64f6462c082b7..3e4604906f1ac 100644 --- a/datafusion/tests/path_partition.rs +++ b/datafusion/tests/path_partition.rs @@ -22,13 +22,16 @@ use std::{fs, io, sync::Arc}; use async_trait::async_trait; use datafusion::{ assert_batches_sorted_eq, + datafusion_storage::{ + object_store::{ + local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader, + ObjectStore, + }, + FileMeta, SizedFile, + }, datasource::{ file_format::{csv::CsvFormat, parquet::ParquetFormat}, listing::{ListingOptions, ListingTable, ListingTableConfig}, - object_store::{ - local::LocalFileSystem, FileMeta, FileMetaStream, ListEntryStream, - ObjectReader, ObjectStore, SizedFile, - }, }, error::{DataFusionError, Result}, physical_plan::ColumnStatistics,