From bdf82098f31f7064d082184da3b1a68faaffaebe Mon Sep 17 00:00:00 2001 From: yangzhong Date: Thu, 14 Jul 2022 13:32:58 +0800 Subject: [PATCH 1/4] Introduce ObjectStoreSelfDetector for detector an object store based on the url --- .../core/src/datasource/listing/helpers.rs | 2 +- .../core/src/datasource/object_store.rs | 83 ++++++++++++++++--- datafusion/core/src/execution/context.rs | 11 +-- datafusion/core/src/execution/runtime_env.rs | 14 +++- datafusion/core/src/physical_plan/planner.rs | 4 +- datafusion/core/tests/user_defined_plan.rs | 4 +- 6 files changed, 92 insertions(+), 26 deletions(-) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 873d005b4bafb..b4b93d5fc17e5 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -162,7 +162,7 @@ pub fn split_files( pub async fn pruned_partition_list<'a>( store: &'a dyn ObjectStore, table_path: &'a ListingTableUrl, - filters: &[Expr], + filters: &'a [Expr], file_extension: &'a str, table_partition_cols: &'a [String], ) -> Result>> { diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs index aca5b0ca412e6..3f848074b2d59 100644 --- a/datafusion/core/src/datasource/object_store.rs +++ b/datafusion/core/src/datasource/object_store.rs @@ -81,10 +81,19 @@ impl std::fmt::Display for ObjectStoreUrl { } } +/// Object store self detector can detector an object store based on the url +pub trait ObjectStoreSelfDetector: Send + Sync + 'static { + /// Detector a suitable object store based on its url if possible + /// Return the key and object store + fn get_by_url(&self, url: &Url) -> Option<(String, Arc)>; +} + /// Object store registry +#[derive(Clone)] pub struct ObjectStoreRegistry { /// A map from scheme to object store that serve list / read operations for the store - object_stores: RwLock>>, + object_stores: Arc>>>, + self_detector: Option>, } impl std::fmt::Debug for ObjectStoreRegistry { @@ -105,13 +114,21 @@ impl Default for ObjectStoreRegistry { } impl ObjectStoreRegistry { + /// By default the self detector is None + pub fn new() -> Self { + ObjectStoreRegistry::new_with_detector(None) + } + /// Create the registry that object stores can registered into. /// ['LocalFileSystem'] store is registered in by default to support read local files natively. - pub fn new() -> Self { + pub fn new_with_detector( + self_detector: Option>, + ) -> Self { let mut map: HashMap> = HashMap::new(); map.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); Self { - object_stores: RwLock::new(map), + object_stores: Arc::new(RwLock::new(map)), + self_detector, } } @@ -132,19 +149,51 @@ impl ObjectStoreRegistry { /// /// - URL with scheme `file:///` or no schema will return the default LocalFS store /// - URL with scheme `s3://bucket/` will return the S3 store if it's registered + /// - URL with scheme `hdfs://hostname:port` will return the hdfs store if it's registered /// pub fn get_by_url(&self, url: impl AsRef) -> Result> { let url = url.as_ref(); - let s = &url[url::Position::BeforeScheme..url::Position::AfterHost]; - let stores = self.object_stores.read(); - let store = stores.get(s).ok_or_else(|| { - DataFusionError::Internal(format!( - "No suitable object store found for {}", - url - )) - })?; - - Ok(store.clone()) + // First check whether can get object store from registry + let store = { + let stores = self.object_stores.read(); + let s = &url[url::Position::BeforeScheme..url::Position::BeforeHost]; + stores + .get(s) + .or_else(|| { + let s = &url[url::Position::BeforeScheme..url::Position::AfterHost]; + stores.get(s) + }) + .or_else(|| { + let s = &url[url::Position::BeforeScheme..url::Position::AfterPort]; + stores.get(s) + }) + .cloned() + }; + + // If not, then try to detector based on its url. + let store = store + .or_else(|| { + if let Some(self_detector) = &self.self_detector { + // If detected, register it + if let Some((key, store)) = self_detector.get_by_url(url) { + let mut stores = self.object_stores.write(); + stores.insert(key, store.clone()); + Some(store) + } else { + None + } + } else { + None + } + }) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "No suitable object store found for {}", + url + )) + })?; + + Ok(store) } } @@ -190,6 +239,14 @@ mod tests { assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo"); } + #[test] + fn test_get_by_url_hdfs() { + let sut = ObjectStoreRegistry::default(); + sut.register_store("hdfs", "localhost:8020", Arc::new(LocalFileSystem::new())); + let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap(); + sut.get_by_url(&url).unwrap(); + } + #[test] fn test_get_by_url_s3() { let sut = ObjectStoreRegistry::default(); diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 5cb45be2f0659..e186e829abf34 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -85,7 +85,7 @@ use crate::config::{ ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE, OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES, }; -use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use crate::execution::runtime_env::RuntimeEnv; use crate::logical_plan::plan::Explain; use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet}; use crate::physical_plan::planner::DefaultPhysicalPlanner; @@ -180,7 +180,7 @@ impl SessionContext { /// Creates a new session context using the provided session configuration. pub fn with_config(config: SessionConfig) -> Self { - let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); + let runtime = Arc::new(RuntimeEnv::default()); Self::with_config_rt(config, runtime) } @@ -1211,10 +1211,7 @@ impl Debug for SessionState { /// Default session builder using the provided configuration pub fn default_session_builder(config: SessionConfig) -> SessionState { - SessionState::with_config_rt( - config, - Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()), - ) + SessionState::with_config_rt(config, Arc::new(RuntimeEnv::default())) } impl SessionState { @@ -1902,7 +1899,7 @@ mod tests { #[tokio::test] async fn custom_query_planner() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); + let runtime = Arc::new(RuntimeEnv::default()); let session_state = SessionState::with_config_rt(SessionConfig::new(), runtime) .with_query_planner(Arc::new(MyQueryPlanner {})); let ctx = SessionContext::with_state(session_state); diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs index d810c882fb097..36159db8e8029 100644 --- a/datafusion/core/src/execution/runtime_env.rs +++ b/datafusion/core/src/execution/runtime_env.rs @@ -57,12 +57,13 @@ impl RuntimeEnv { let RuntimeConfig { memory_manager, disk_manager, + object_store_registry, } = config; Ok(Self { memory_manager: MemoryManager::new(memory_manager), disk_manager: DiskManager::try_new(disk_manager)?, - object_store_registry: Arc::new(ObjectStoreRegistry::new()), + object_store_registry: Arc::new(object_store_registry), }) } @@ -121,6 +122,8 @@ pub struct RuntimeConfig { pub disk_manager: DiskManagerConfig, /// MemoryManager to limit access to memory pub memory_manager: MemoryManagerConfig, + /// ObjectStoreRegistry to get object store based on url + pub object_store_registry: ObjectStoreRegistry, } impl RuntimeConfig { @@ -141,6 +144,15 @@ impl RuntimeConfig { self } + /// Customize object store registry + pub fn with_object_store_registry( + mut self, + object_store_registry: ObjectStoreRegistry, + ) -> Self { + self.object_store_registry = object_store_registry; + self + } + /// Specify the total memory to use while running the DataFusion /// plan to `max_memory * memory_fraction` in bytes. /// diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 90ff7a448f5d6..46e09ff27445e 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1582,7 +1582,7 @@ mod tests { use crate::assert_contains; use crate::execution::context::TaskContext; use crate::execution::options::CsvReadOptions; - use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; + use crate::execution::runtime_env::RuntimeEnv; use crate::logical_plan::plan::Extension; use crate::physical_plan::{ expressions, DisplayFormatType, Partitioning, Statistics, @@ -1604,7 +1604,7 @@ mod tests { use std::{any::Any, fmt}; fn make_session_state() -> SessionState { - let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); + let runtime = Arc::new(RuntimeEnv::default()); SessionState::with_config_rt(SessionConfig::new(), runtime) } diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs index 7e0a7a600d0de..13ddb1eb8da14 100644 --- a/datafusion/core/tests/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined_plan.rs @@ -87,7 +87,7 @@ use std::{any::Any, collections::BTreeMap, fmt, sync::Arc}; use async_trait::async_trait; use datafusion::execution::context::TaskContext; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::plan::{Extension, Sort}; use datafusion::logical_plan::{DFSchemaRef, Limit}; use datafusion::optimizer::optimizer::OptimizerConfig; @@ -247,7 +247,7 @@ async fn topk_plan() -> Result<()> { fn make_topk_context() -> SessionContext { let config = SessionConfig::new().with_target_partitions(48); - let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()); + let runtime = Arc::new(RuntimeEnv::default()); let state = SessionState::with_config_rt(config, runtime) .with_query_planner(Arc::new(TopKQueryPlanner {})) .add_optimizer_rule(Arc::new(TopKOptimizerRule {})); From 89841a9b609e55deab5e457f0222ed347a94decd Mon Sep 17 00:00:00 2001 From: yangzhong Date: Thu, 14 Jul 2022 16:22:02 +0800 Subject: [PATCH 2/4] Fix UT --- datafusion/core/src/catalog/schema.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs index 7634328f323ed..b886966bf6db4 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/core/src/catalog/schema.rs @@ -162,7 +162,17 @@ mod tests { #[tokio::test] async fn test_schema_register_listing_table() { let testdata = crate::test_util::parquet_test_data(); - let filename = format!("file:///{}/{}", testdata, "alltypes_plain.parquet"); + let testdir = if testdata.starts_with('/') { + format!("file://{}", testdata) + } else { + format!("file:///{}", testdata) + }; + let filename = if testdir.ends_with('/') { + format!("{}{}", testdir, "alltypes_plain.parquet") + } else { + format!("{}/{}", testdir, "alltypes_plain.parquet") + }; + let table_path = ListingTableUrl::parse(filename).unwrap(); let catalog = MemoryCatalogProvider::new(); From 7dda307b11315c75d09a2956e7b6b015aefcfeb8 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Sun, 17 Jul 2022 09:56:29 +0800 Subject: [PATCH 3/4] Fix PR review --- .../core/src/datasource/object_store.rs | 24 ++++++------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs index 3f848074b2d59..02fc39f303641 100644 --- a/datafusion/core/src/datasource/object_store.rs +++ b/datafusion/core/src/datasource/object_store.rs @@ -81,8 +81,8 @@ impl std::fmt::Display for ObjectStoreUrl { } } -/// Object store self detector can detector an object store based on the url -pub trait ObjectStoreSelfDetector: Send + Sync + 'static { +/// Object store provider can detector an object store based on the url +pub trait ObjectStoreProvider: Send + Sync + 'static { /// Detector a suitable object store based on its url if possible /// Return the key and object store fn get_by_url(&self, url: &Url) -> Option<(String, Arc)>; @@ -93,7 +93,7 @@ pub trait ObjectStoreSelfDetector: Send + Sync + 'static { pub struct ObjectStoreRegistry { /// A map from scheme to object store that serve list / read operations for the store object_stores: Arc>>>, - self_detector: Option>, + self_detector: Option>, } impl std::fmt::Debug for ObjectStoreRegistry { @@ -122,7 +122,7 @@ impl ObjectStoreRegistry { /// Create the registry that object stores can registered into. /// ['LocalFileSystem'] store is registered in by default to support read local files natively. pub fn new_with_detector( - self_detector: Option>, + self_detector: Option>, ) -> Self { let mut map: HashMap> = HashMap::new(); map.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); @@ -149,25 +149,15 @@ impl ObjectStoreRegistry { /// /// - URL with scheme `file:///` or no schema will return the default LocalFS store /// - URL with scheme `s3://bucket/` will return the S3 store if it's registered - /// - URL with scheme `hdfs://hostname:port` will return the hdfs store if it's registered + /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store if it's registered /// pub fn get_by_url(&self, url: impl AsRef) -> Result> { let url = url.as_ref(); // First check whether can get object store from registry let store = { let stores = self.object_stores.read(); - let s = &url[url::Position::BeforeScheme..url::Position::BeforeHost]; - stores - .get(s) - .or_else(|| { - let s = &url[url::Position::BeforeScheme..url::Position::AfterHost]; - stores.get(s) - }) - .or_else(|| { - let s = &url[url::Position::BeforeScheme..url::Position::AfterPort]; - stores.get(s) - }) - .cloned() + let s = &url[url::Position::BeforeScheme..url::Position::BeforePath]; + stores.get(s).cloned() }; // If not, then try to detector based on its url. From 2b0c6ad5b3a29ec72cbf461b3e201fb0006bcc82 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Mon, 18 Jul 2022 09:37:27 +0800 Subject: [PATCH 4/4] Fix PR review --- .../core/src/datasource/object_store.rs | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs index 02fc39f303641..65f3900091e62 100644 --- a/datafusion/core/src/datasource/object_store.rs +++ b/datafusion/core/src/datasource/object_store.rs @@ -85,7 +85,7 @@ impl std::fmt::Display for ObjectStoreUrl { pub trait ObjectStoreProvider: Send + Sync + 'static { /// Detector a suitable object store based on its url if possible /// Return the key and object store - fn get_by_url(&self, url: &Url) -> Option<(String, Arc)>; + fn get_by_url(&self, url: &Url) -> Option>; } /// Object store registry @@ -93,7 +93,7 @@ pub trait ObjectStoreProvider: Send + Sync + 'static { pub struct ObjectStoreRegistry { /// A map from scheme to object store that serve list / read operations for the store object_stores: Arc>>>, - self_detector: Option>, + provider: Option>, } impl std::fmt::Debug for ObjectStoreRegistry { @@ -116,19 +116,17 @@ impl Default for ObjectStoreRegistry { impl ObjectStoreRegistry { /// By default the self detector is None pub fn new() -> Self { - ObjectStoreRegistry::new_with_detector(None) + ObjectStoreRegistry::new_with_provider(None) } /// Create the registry that object stores can registered into. /// ['LocalFileSystem'] store is registered in by default to support read local files natively. - pub fn new_with_detector( - self_detector: Option>, - ) -> Self { + pub fn new_with_provider(provider: Option>) -> Self { let mut map: HashMap> = HashMap::new(); map.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); Self { object_stores: Arc::new(RwLock::new(map)), - self_detector, + provider, } } @@ -163,11 +161,13 @@ impl ObjectStoreRegistry { // If not, then try to detector based on its url. let store = store .or_else(|| { - if let Some(self_detector) = &self.self_detector { + if let Some(provider) = &self.provider { // If detected, register it - if let Some((key, store)) = self_detector.get_by_url(url) { + if let Some(store) = provider.get_by_url(url) { let mut stores = self.object_stores.write(); - stores.insert(key, store.clone()); + let key = + &url[url::Position::BeforeScheme..url::Position::BeforePath]; + stores.insert(key.to_owned(), store.clone()); Some(store) } else { None