Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
let store = get_object_store(&ctx.state(), scheme, url, &table_options).await?;

// Register the retrieved object store in the session context's runtime environment
ctx.runtime_env().register_object_store(url, store);
ctx.register_object_store(url, store);

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ async fn main() -> Result<()> {
let path = format!("s3://{bucket_name}");
let s3_url = Url::parse(&path).unwrap();
let arc_s3 = Arc::new(s3);
ctx.runtime_env()
.register_object_store(&s3_url, arc_s3.clone());
ctx.register_object_store(&s3_url, arc_s3.clone());

let path = format!("s3://{bucket_name}/test_data/");
let file_format = ParquetFormat::default().with_enable_pruning(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ async fn main() -> Result<()> {

let path = format!("s3://{bucket_name}");
let s3_url = Url::parse(&path).unwrap();
ctx.runtime_env()
.register_object_store(&s3_url, Arc::new(s3));
ctx.register_object_store(&s3_url, Arc::new(s3));

// cannot query the parquet files from this bucket because the path contains a whitespace
// and we don't support that yet
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/parquet_sql_multiple_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let local_fs = Arc::new(LocalFileSystem::default());

let u = url::Url::parse("file://./")?;
ctx.runtime_env().register_object_store(&u, local_fs);
ctx.register_object_store(&u, local_fs);

// Register a listing table - this will use all files in the directory as data sources
// for the query
Expand Down
3 changes: 1 addition & 2 deletions datafusion-examples/examples/query-http-csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ async fn main() -> Result<()> {
.with_url(base_url.clone())
.build()
.unwrap();
ctx.runtime_env()
.register_object_store(&base_url, Arc::new(http_store));
ctx.register_object_store(&base_url, Arc::new(http_store));

// register csv file with the execution context
ctx.register_csv(
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,7 @@ mod tests {
let state = session_ctx.state();

let url = Url::parse("file://").unwrap();
state
.runtime_env()
.register_object_store(&url, store.clone());
session_ctx.register_object_store(&url, store.clone());

let testdata = crate::test_util::arrow_test_data();
let filename = format!("{testdata}/avro/alltypes_plain.avro");
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ mod tests {
) -> Result<()> {
let ctx = SessionContext::new();
let url = Url::parse("file://").unwrap();
ctx.runtime_env().register_object_store(&url, store.clone());
ctx.register_object_store(&url, store.clone());

let task_ctx = ctx.task_ctx();

Expand Down Expand Up @@ -968,9 +968,7 @@ mod tests {
store.put(&path, data).await.unwrap();

let url = Url::parse("memory://").unwrap();
session_ctx
.runtime_env()
.register_object_store(&url, Arc::new(store));
session_ctx.register_object_store(&url, Arc::new(store));

let df = session_ctx
.read_csv("memory:///", CsvReadOptions::new())
Expand Down Expand Up @@ -999,7 +997,7 @@ mod tests {
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
ctx.runtime_env().register_object_store(&local_url, local);
ctx.register_object_store(&local_url, local);
let options = CsvReadOptions::default()
.schema_infer_max_records(2)
.has_header(true);
Expand Down Expand Up @@ -1039,7 +1037,7 @@ mod tests {
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();

ctx.runtime_env().register_object_store(&local_url, local);
ctx.register_object_store(&local_url, local);

// execute a simple query and write the results to CSV
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ mod tests {
) -> Result<()> {
let ctx = SessionContext::new();
let url = Url::parse("file://").unwrap();
ctx.runtime_env().register_object_store(&url, store.clone());
ctx.register_object_store(&url, store.clone());
let filename = "1.json";
let tmp_dir = TempDir::new()?;
let file_groups = partitioned_file_groups(
Expand Down Expand Up @@ -752,7 +752,7 @@ mod tests {
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
ctx.runtime_env().register_object_store(&local_url, local);
ctx.register_object_store(&local_url, local);

// execute a simple query and write the results to CSV
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
Expand Down Expand Up @@ -845,7 +845,7 @@ mod tests {
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
ctx.runtime_env().register_object_store(&local_url, local);
ctx.register_object_store(&local_url, local);
let options = CsvReadOptions::default()
.schema_infer_max_records(2)
.has_header(true);
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ mod tests {
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
ctx.runtime_env().register_object_store(&local_url, local);
ctx.register_object_store(&local_url, local);

let options = CsvReadOptions::default()
.schema_infer_max_records(2)
Expand Down Expand Up @@ -2047,7 +2047,7 @@ mod tests {
// register a local file system object store for /tmp directory
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
ctx.runtime_env().register_object_store(&local_url, local);
ctx.register_object_store(&local_url, local);

// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(true);
Expand Down
24 changes: 24 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ use sqlparser::dialect::dialect_from_str;

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use object_store::ObjectStore;
use parking_lot::RwLock;
use url::Url;
use uuid::Uuid;
Expand Down Expand Up @@ -354,6 +355,29 @@ impl SessionContext {
self
}

/// Registers an [`ObjectStore`] to be used with a specific URL prefix.
///
/// See [`RuntimeEnv::register_object_store`] for more details.
///
/// # Example: register a local object store for the "file://" URL prefix
/// ```
/// # use std::sync::Arc;
/// # use datafusion::prelude::SessionContext;
/// # use datafusion_execution::object_store::ObjectStoreUrl;
/// let object_store_url = ObjectStoreUrl::parse("file://").unwrap();
/// let object_store = object_store::local::LocalFileSystem::new();
/// let mut ctx = SessionContext::new();
/// // All files with the file:// url prefix will be read from the local file system
/// ctx.register_object_store(object_store_url.as_ref(), Arc::new(object_store));
/// ```
pub fn register_object_store(
&self,
url: &Url,
object_store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
self.runtime_env().register_object_store(url, object_store)
}

/// Registers the [`RecordBatch`] as the specified table name
pub fn register_batch(
&self,
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/test/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use url::Url;
/// Returns a test object store with the provided `ctx`
pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
let url = Url::parse("test://").unwrap();
ctx.runtime_env()
.register_object_store(&url, make_test_store_and_state(files).0);
ctx.register_object_store(&url, make_test_store_and_state(files).0);
}

/// Create a test object store with the provided files
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2070,7 +2070,7 @@ async fn write_partitioned_parquet_results() -> Result<()> {

let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
ctx.runtime_env().register_object_store(&local_url, local);
ctx.register_object_store(&local_url, local);

// execute a simple query and write the results to parquet
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
Expand Down Expand Up @@ -2140,7 +2140,7 @@ async fn write_parquet_results() -> Result<()> {
// register a local file system object store for /tmp directory
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
ctx.runtime_env().register_object_store(&local_url, local);
ctx.register_object_store(&local_url, local);

// execute a simple query and write the results to parquet
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ fn register_partitioned_aggregate_csv(
let csv_file_path = format!("{testdata}/csv/aggregate_test_100.csv");
let file_schema = test_util::aggr_test_schema();
let url = Url::parse("mirror://").unwrap();
ctx.runtime_env().register_object_store(
ctx.register_object_store(
&url,
MirroringObjectStore::new_arc(csv_file_path, store_paths),
);
Expand Down Expand Up @@ -566,7 +566,7 @@ async fn register_partitioned_alltypes_parquet(
let testdata = parquet_test_data();
let parquet_file_path = format!("{testdata}/{source_file}");
let url = Url::parse("mirror://").unwrap();
ctx.runtime_env().register_object_store(
ctx.register_object_store(
&url,
MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths),
);
Expand Down