diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index cfbc97ecbe238..ffe447e79fd70 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -379,7 +379,7 @@ pub(crate) async fn register_object_store_and_config_extensions( let store = get_object_store(&ctx.state(), scheme, url, &table_options).await?; // Register the retrieved object store in the session context's runtime environment - ctx.runtime_env().register_object_store(url, store); + ctx.register_object_store(url, store); Ok(()) } diff --git a/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs b/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs index 8d56c440da368..4d71ed7589121 100644 --- a/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs +++ b/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs @@ -49,8 +49,7 @@ async fn main() -> Result<()> { let path = format!("s3://{bucket_name}"); let s3_url = Url::parse(&path).unwrap(); let arc_s3 = Arc::new(s3); - ctx.runtime_env() - .register_object_store(&s3_url, arc_s3.clone()); + ctx.register_object_store(&s3_url, arc_s3.clone()); let path = format!("s3://{bucket_name}/test_data/"); let file_format = ParquetFormat::default().with_enable_pruning(true); diff --git a/datafusion-examples/examples/external_dependency/query-aws-s3.rs b/datafusion-examples/examples/external_dependency/query-aws-s3.rs index cbb6486b4eece..e32286e30e4fd 100644 --- a/datafusion-examples/examples/external_dependency/query-aws-s3.rs +++ b/datafusion-examples/examples/external_dependency/query-aws-s3.rs @@ -48,8 +48,7 @@ async fn main() -> Result<()> { let path = format!("s3://{bucket_name}"); let s3_url = Url::parse(&path).unwrap(); - ctx.runtime_env() - .register_object_store(&s3_url, Arc::new(s3)); + ctx.register_object_store(&s3_url, Arc::new(s3)); // cannot query the parquet files from this bucket because the path contains a whitespace // and we don't support that yet diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs index 30ca1df73d91c..b0d3922a32789 100644 --- a/datafusion-examples/examples/parquet_sql_multiple_files.rs +++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs @@ -80,7 +80,7 @@ async fn main() -> Result<(), Box> { let local_fs = Arc::new(LocalFileSystem::default()); let u = url::Url::parse("file://./")?; - ctx.runtime_env().register_object_store(&u, local_fs); + ctx.register_object_store(&u, local_fs); // Register a listing table - this will use all files in the directory as data sources // for the query diff --git a/datafusion-examples/examples/query-http-csv.rs b/datafusion-examples/examples/query-http-csv.rs index 928d702711591..fa3fd2ac068df 100644 --- a/datafusion-examples/examples/query-http-csv.rs +++ b/datafusion-examples/examples/query-http-csv.rs @@ -34,8 +34,7 @@ async fn main() -> Result<()> { .with_url(base_url.clone()) .build() .unwrap(); - ctx.runtime_env() - .register_object_store(&base_url, Arc::new(http_store)); + ctx.register_object_store(&base_url, Arc::new(http_store)); // register csv file with the execution context ctx.register_csv( diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs index 4e5140e82d3ff..a8a29e9bbabe4 100644 --- a/datafusion/core/src/datasource/physical_plan/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -261,9 +261,7 @@ mod tests { let state = session_ctx.state(); let url = Url::parse("file://").unwrap(); - state - .runtime_env() - .register_object_store(&url, store.clone()); + session_ctx.register_object_store(&url, store.clone()); let testdata = crate::test_util::arrow_test_data(); let filename = format!("{testdata}/avro/alltypes_plain.avro"); diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index cc7c837e471e3..a266b9b014e61 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -895,7 +895,7 @@ mod tests { ) -> Result<()> { let ctx = SessionContext::new(); let url = Url::parse("file://").unwrap(); - ctx.runtime_env().register_object_store(&url, store.clone()); + ctx.register_object_store(&url, store.clone()); let task_ctx = ctx.task_ctx(); @@ -968,9 +968,7 @@ mod tests { store.put(&path, data).await.unwrap(); let url = Url::parse("memory://").unwrap(); - session_ctx - .runtime_env() - .register_object_store(&url, Arc::new(store)); + session_ctx.register_object_store(&url, Arc::new(store)); let df = session_ctx .read_csv("memory:///", CsvReadOptions::new()) @@ -999,7 +997,7 @@ mod tests { let tmp_dir = TempDir::new()?; let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); let local_url = Url::parse("file://local").unwrap(); - ctx.runtime_env().register_object_store(&local_url, local); + ctx.register_object_store(&local_url, local); let options = CsvReadOptions::default() .schema_infer_max_records(2) .has_header(true); @@ -1039,7 +1037,7 @@ mod tests { let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); let local_url = Url::parse("file://local").unwrap(); - ctx.runtime_env().register_object_store(&local_url, local); + ctx.register_object_store(&local_url, local); // execute a simple query and write the results to CSV let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/"; diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs index 0180caa850112..4728069f19db9 100644 --- a/datafusion/core/src/datasource/physical_plan/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -446,7 +446,7 @@ mod tests { ) -> Result<()> { let ctx = SessionContext::new(); let url = Url::parse("file://").unwrap(); - ctx.runtime_env().register_object_store(&url, store.clone()); + ctx.register_object_store(&url, store.clone()); let filename = "1.json"; let tmp_dir = TempDir::new()?; let file_groups = partitioned_file_groups( @@ -752,7 +752,7 @@ mod tests { let tmp_dir = TempDir::new()?; let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); let local_url = Url::parse("file://local").unwrap(); - ctx.runtime_env().register_object_store(&local_url, local); + ctx.register_object_store(&local_url, local); // execute a simple query and write the results to CSV let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/"; @@ -845,7 +845,7 @@ mod tests { let tmp_dir = TempDir::new()?; let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); let local_url = Url::parse("file://local").unwrap(); - ctx.runtime_env().register_object_store(&local_url, local); + ctx.register_object_store(&local_url, local); let options = CsvReadOptions::default() .schema_infer_max_records(2) .has_header(true); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index dd953878df49e..cd2b9d60a96ce 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -991,7 +991,7 @@ mod tests { let tmp_dir = TempDir::new()?; let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); let local_url = Url::parse("file://local").unwrap(); - ctx.runtime_env().register_object_store(&local_url, local); + ctx.register_object_store(&local_url, local); let options = CsvReadOptions::default() .schema_infer_max_records(2) @@ -2047,7 +2047,7 @@ mod tests { // register a local file system object store for /tmp directory let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); let local_url = Url::parse("file://local").unwrap(); - ctx.runtime_env().register_object_store(&local_url, local); + ctx.register_object_store(&local_url, local); // Configure listing options let file_format = ParquetFormat::default().with_enable_pruning(true); diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 59238ffc559ae..d23982cfa9522 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -91,6 +91,7 @@ use sqlparser::dialect::dialect_from_str; use async_trait::async_trait; use chrono::{DateTime, Utc}; +use object_store::ObjectStore; use parking_lot::RwLock; use url::Url; use uuid::Uuid; @@ -354,6 +355,29 @@ impl SessionContext { self } + /// Registers an [`ObjectStore`] to be used with a specific URL prefix. + /// + /// See [`RuntimeEnv::register_object_store`] for more details. + /// + /// # Example: register a local object store for the "file://" URL prefix + /// ``` + /// # use std::sync::Arc; + /// # use datafusion::prelude::SessionContext; + /// # use datafusion_execution::object_store::ObjectStoreUrl; + /// let object_store_url = ObjectStoreUrl::parse("file://").unwrap(); + /// let object_store = object_store::local::LocalFileSystem::new(); + /// let mut ctx = SessionContext::new(); + /// // All files with the file:// url prefix will be read from the local file system + /// ctx.register_object_store(object_store_url.as_ref(), Arc::new(object_store)); + /// ``` + pub fn register_object_store( + &self, + url: &Url, + object_store: Arc, + ) -> Option> { + self.runtime_env().register_object_store(url, object_store) + } + /// Registers the [`RecordBatch`] as the specified table name pub fn register_batch( &self, diff --git a/datafusion/core/src/test/object_store.rs b/datafusion/core/src/test/object_store.rs index d6f324a7f1f95..bea6f7b9ceb7b 100644 --- a/datafusion/core/src/test/object_store.rs +++ b/datafusion/core/src/test/object_store.rs @@ -27,8 +27,7 @@ use url::Url; /// Returns a test object store with the provided `ctx` pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) { let url = Url::parse("test://").unwrap(); - ctx.runtime_env() - .register_object_store(&url, make_test_store_and_state(files).0); + ctx.register_object_store(&url, make_test_store_and_state(files).0); } /// Create a test object store with the provided files diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 009f45b28057e..4c18a272bc1ee 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2070,7 +2070,7 @@ async fn write_partitioned_parquet_results() -> Result<()> { let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); let local_url = Url::parse("file://local").unwrap(); - ctx.runtime_env().register_object_store(&local_url, local); + ctx.register_object_store(&local_url, local); // execute a simple query and write the results to parquet let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/"; @@ -2140,7 +2140,7 @@ async fn write_parquet_results() -> Result<()> { // register a local file system object store for /tmp directory let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); let local_url = Url::parse("file://local").unwrap(); - ctx.runtime_env().register_object_store(&local_url, local); + ctx.register_object_store(&local_url, local); // execute a simple query and write the results to parquet let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/"; diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index ffe0494dae995..ce71c890698e4 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -533,7 +533,7 @@ fn register_partitioned_aggregate_csv( let csv_file_path = format!("{testdata}/csv/aggregate_test_100.csv"); let file_schema = test_util::aggr_test_schema(); let url = Url::parse("mirror://").unwrap(); - ctx.runtime_env().register_object_store( + ctx.register_object_store( &url, MirroringObjectStore::new_arc(csv_file_path, store_paths), ); @@ -566,7 +566,7 @@ async fn register_partitioned_alltypes_parquet( let testdata = parquet_test_data(); let parquet_file_path = format!("{testdata}/{source_file}"); let url = Url::parse("mirror://").unwrap(); - ctx.runtime_env().register_object_store( + ctx.register_object_store( &url, MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths), );