From 4060b146a53b47e425380f965ccfd87e39d1d0b9 Mon Sep 17 00:00:00 2001 From: Blake Orth Date: Thu, 9 Oct 2025 17:32:06 -0600 Subject: [PATCH 1/2] Adds Object Store Profiling options/commands to CLI - Adds a CLI option and command to datafusion-cli to enable or disabled object store profiling - Integrates the command with the instrumented object stores to allow the user input to change the mode of the instrumented stores - Adds tests to exercise the expected behavior of the commands - Adds user docs for the commands/CLI options - Updates visibility of `InstrumentedObjectStore` now that it needs to be interacted with outside of its module --- .../examples/cli-session-context.rs | 17 +++- datafusion-cli/src/command.rs | 84 ++++++++++++++++++- datafusion-cli/src/main.rs | 10 ++- .../src/object_storage/instrumented.rs | 52 +++++++++--- datafusion-cli/src/print_options.rs | 83 ++++++++++++++++-- docs/source/user-guide/cli/usage.md | 10 +++ 6 files changed, 235 insertions(+), 21 deletions(-) diff --git a/datafusion-cli/examples/cli-session-context.rs b/datafusion-cli/examples/cli-session-context.rs index 1a8f15c8731b2..89860d5b2b342 100644 --- a/datafusion-cli/examples/cli-session-context.rs +++ b/datafusion-cli/examples/cli-session-context.rs @@ -23,12 +23,19 @@ use std::sync::Arc; use datafusion::{ dataframe::DataFrame, error::DataFusionError, - execution::{context::SessionState, TaskContext}, + execution::{ + context::SessionState, object_store::DefaultObjectStoreRegistry, TaskContext, + }, logical_expr::{LogicalPlan, LogicalPlanBuilder}, prelude::SessionContext, }; use datafusion_cli::{ - cli_context::CliSessionContext, exec::exec_from_repl, print_options::PrintOptions, + cli_context::CliSessionContext, + exec::exec_from_repl, + object_storage::instrumented::{ + InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, + }, + print_options::PrintOptions, }; use object_store::ObjectStore; @@ -84,11 +91,17 @@ impl CliSessionContext for MyUnionerContext { pub async fn main() { let my_ctx = MyUnionerContext::default(); + let profile_mode = InstrumentedObjectStoreMode::default(); + let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new( + Arc::new(DefaultObjectStoreRegistry::new()), + profile_mode, + )); let mut print_options = PrintOptions { format: datafusion_cli::print_format::PrintFormat::Automatic, quiet: false, maxrows: datafusion_cli::print_options::MaxRows::Unlimited, color: true, + instrumented_registry, }; exec_from_repl(&my_ctx, &mut print_options).await.unwrap(); diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index 58c19160d12bb..4e4d4269e5548 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -46,6 +46,7 @@ pub enum Command { SearchFunctions(String), QuietMode(Option), OutputFormat(Option), + ObjectStoreProfileMode(Option), } pub enum OutputFormat { @@ -122,6 +123,29 @@ impl Command { Self::OutputFormat(_) => exec_err!( "Unexpected change output format, this should be handled outside" ), + Self::ObjectStoreProfileMode(mode) => { + if let Some(mode) = mode { + let profile_mode = mode + .parse() + .map_err(|_| + exec_datafusion_err!("Failed to parse input: {mode}. Valid options are disabled, enabled") + )?; + print_options + .instrumented_registry + .set_instrument_mode(profile_mode); + println!( + "ObjectStore Profile mode set to {}", + print_options.instrumented_registry.mode() + ); + } else { + println!( + "ObjectStore Profile mode is {}", + print_options.instrumented_registry.mode() + ); + } + + Ok(()) + } } } @@ -140,11 +164,15 @@ impl Command { Self::OutputFormat(_) => { ("\\pset [NAME [VALUE]]", "set table output option\n(format)") } + Self::ObjectStoreProfileMode(_) => ( + "\\object_store_profiling (disabled|enabled)", + "print or set object store profile mode", + ), } } } -const ALL_COMMANDS: [Command; 9] = [ +const ALL_COMMANDS: [Command; 10] = [ Command::ListTables, Command::DescribeTableStmt(String::new()), Command::Quit, @@ -154,6 +182,7 @@ const ALL_COMMANDS: [Command; 9] = [ Command::SearchFunctions(String::new()), Command::QuietMode(None), Command::OutputFormat(None), + Command::ObjectStoreProfileMode(None), ]; fn all_commands_info() -> RecordBatch { @@ -204,6 +233,10 @@ impl FromStr for Command { Self::OutputFormat(Some(subcommand.to_string())) } ("pset", None) => Self::OutputFormat(None), + ("object_store_profiling", Some(mode)) => { + Self::ObjectStoreProfileMode(Some(mode.to_string())) + } + ("object_store_profiling", None) => Self::ObjectStoreProfileMode(None), _ => return Err(()), }) } @@ -244,3 +277,52 @@ impl OutputFormat { } } } + +#[cfg(test)] +mod tests { + use datafusion::{ + execution::object_store::DefaultObjectStoreRegistry, prelude::SessionContext, + }; + + use crate::{ + object_storage::instrumented::{ + InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, + }, + print_options::MaxRows, + }; + + use super::*; + + #[tokio::test] + async fn command_execute_profile_mode() { + let ctx = SessionContext::new(); + + let profile_mode = InstrumentedObjectStoreMode::default(); + let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new( + Arc::new(DefaultObjectStoreRegistry::new()), + profile_mode, + )); + let mut print_options = PrintOptions { + format: PrintFormat::Automatic, + quiet: false, + maxrows: MaxRows::Unlimited, + color: true, + instrumented_registry: Arc::clone(&instrumented_registry), + }; + + let mut cmd: Command = "object_store_profiling" + .parse() + .expect("expected parse to succeed"); + assert!(cmd.execute(&ctx, &mut print_options).await.is_ok()); + + cmd = "object_store_profiling enabled" + .parse() + .expect("expected parse to succeed"); + assert!(cmd.execute(&ctx, &mut print_options).await.is_ok()); + + cmd = "object_store_profiling does_not_exist" + .parse() + .expect("expected parse to succeed"); + assert!(cmd.execute(&ctx, &mut print_options).await.is_err()); + } +} diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 04a8c2a0f1d84..aa67a47bba67e 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -149,6 +149,13 @@ struct Args { value_parser(extract_disk_limit) )] disk_limit: Option, + + #[clap( + long, + help = "Specify the default object_store_profiling mode, defaults to 'disabled'.\n[possible values: disabled, enabled]", + default_value_t = InstrumentedObjectStoreMode::Disabled + )] + object_store_profiling: InstrumentedObjectStoreMode, } #[tokio::main] @@ -212,7 +219,7 @@ async fn main_inner() -> Result<()> { let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new( Arc::new(DefaultObjectStoreRegistry::new()), - InstrumentedObjectStoreMode::default(), + args.object_store_profiling, )); rt_builder = rt_builder.with_object_store_registry(instrumented_registry.clone()); @@ -243,6 +250,7 @@ async fn main_inner() -> Result<()> { quiet: args.quiet, maxrows: args.maxrows, color: args.color, + instrumented_registry: Arc::clone(&instrumented_registry), }; let commands = args.command; diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index f0313da3a3795..124c196b7e34d 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -31,11 +31,11 @@ use object_store::{ path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, }; +use parking_lot::RwLock; use url::Url; -/// The profiling mode to use for an [`ObjectStore`] instance that has been instrumented to collect -/// profiling data. Collecting profiling data will have a small negative impact on both CPU and -/// memory usage. Default is `Disabled` +/// The profiling mode to use for an [`InstrumentedObjectStore`] instance. Collecting profiling +/// data will have a small negative impact on both CPU and memory usage. Default is `Disabled` #[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] pub enum InstrumentedObjectStoreMode { /// Disable collection of profiling data @@ -75,7 +75,7 @@ impl From for InstrumentedObjectStoreMode { /// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the /// inner [`ObjectStore`] #[derive(Debug)] -struct InstrumentedObjectStore { +pub struct InstrumentedObjectStore { inner: Arc, instrument_mode: AtomicU8, } @@ -88,6 +88,10 @@ impl InstrumentedObjectStore { instrument_mode, } } + + fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) { + self.instrument_mode.store(mode as u8, Ordering::Relaxed) + } } impl fmt::Display for InstrumentedObjectStore { @@ -150,11 +154,12 @@ impl ObjectStore for InstrumentedObjectStore { } } -/// Provides access to [`ObjectStore`] instances that record requests for reporting +/// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting #[derive(Debug)] pub struct InstrumentedObjectStoreRegistry { inner: Arc, - instrument_mode: InstrumentedObjectStoreMode, + instrument_mode: AtomicU8, + stores: RwLock>>, } impl InstrumentedObjectStoreRegistry { @@ -166,7 +171,28 @@ impl InstrumentedObjectStoreRegistry { ) -> Self { Self { inner: registry, - instrument_mode: default_mode, + instrument_mode: AtomicU8::new(default_mode as u8), + stores: RwLock::new(Vec::new()), + } + } + + /// Provides access to all of the [`InstrumentedObjectStore`]s managed by this + /// [`InstrumentedObjectStoreRegistry`] + pub fn stores(&self) -> Vec> { + self.stores.read().clone() + } + + /// Returns the current [`InstrumentedObjectStoreMode`] for this + /// [`InstrumentedObjectStoreRegistry`] + pub fn mode(&self) -> InstrumentedObjectStoreMode { + self.instrument_mode.load(Ordering::Relaxed).into() + } + + /// Sets the [`InstrumentedObjectStoreMode`] for this [`InstrumentedObjectStoreRegistry`] + pub fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) { + self.instrument_mode.store(mode as u8, Ordering::Relaxed); + for s in self.stores.read().iter() { + s.set_instrument_mode(mode) } } } @@ -177,8 +203,10 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { url: &Url, store: Arc, ) -> Option> { - let mode = AtomicU8::new(self.instrument_mode as u8); - let instrumented = Arc::new(InstrumentedObjectStore::new(store, mode)); + let mode = self.instrument_mode.load(Ordering::Relaxed); + let instrumented = + Arc::new(InstrumentedObjectStore::new(store, AtomicU8::new(mode))); + self.stores.write().push(Arc::clone(&instrumented)); self.inner.register_store(url, instrumented) } @@ -223,13 +251,15 @@ mod tests { Arc::new(DefaultObjectStoreRegistry::new()), InstrumentedObjectStoreMode::default(), )); - let store = object_store::memory::InMemory::new(); + assert!(reg.stores().is_empty()); + let store = object_store::memory::InMemory::new(); let url = "mem://test".parse().unwrap(); let registered = reg.register_store(&url, Arc::new(store)); assert!(registered.is_none()); let fetched = reg.get_store(&url); - assert!(fetched.is_ok()) + assert!(fetched.is_ok()); + assert_eq!(reg.stores().len(), 1); } } diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 56d787b0fe087..4addf08e2c284 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -16,10 +16,14 @@ // under the License. use std::fmt::{Display, Formatter}; -use std::io::Write; +use std::io; use std::pin::Pin; use std::str::FromStr; +use std::sync::Arc; +use crate::object_storage::instrumented::{ + InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, +}; use crate::print_format::PrintFormat; use arrow::datatypes::SchemaRef; @@ -67,12 +71,15 @@ impl Display for MaxRows { } } +const OBJECT_STORE_PROFILING_HEADER: &str = "Object Store Profiling"; + #[derive(Debug, Clone)] pub struct PrintOptions { pub format: PrintFormat, pub quiet: bool, pub maxrows: MaxRows, pub color: bool, + pub instrumented_registry: Arc, } // Returns the query execution details formatted @@ -128,11 +135,7 @@ impl PrintOptions { query_start_time, ); - if !self.quiet { - writeln!(writer, "{formatted_exec_details}")?; - } - - Ok(()) + self.write_output(&mut writer, formatted_exec_details) } /// Print the stream to stdout using the specified format @@ -174,10 +177,78 @@ impl PrintOptions { query_start_time, ); + self.write_output(&mut writer, formatted_exec_details) + } + + fn write_output( + &self, + writer: &mut W, + formatted_exec_details: String, + ) -> Result<()> { if !self.quiet { writeln!(writer, "{formatted_exec_details}")?; + + if self.instrumented_registry.mode() != InstrumentedObjectStoreMode::Disabled + { + writeln!(writer, "{OBJECT_STORE_PROFILING_HEADER}")?; + for store in self.instrumented_registry.stores() { + writeln!(writer, "{store}")?; + } + } } Ok(()) } } + +#[cfg(test)] +mod tests { + use datafusion::error::Result; + use datafusion::execution::object_store::DefaultObjectStoreRegistry; + + use super::*; + + #[test] + fn write_output() -> Result<()> { + let profile_mode = InstrumentedObjectStoreMode::default(); + let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new( + Arc::new(DefaultObjectStoreRegistry::new()), + profile_mode, + )); + let mut print_options = PrintOptions { + format: PrintFormat::Automatic, + quiet: true, + maxrows: MaxRows::Unlimited, + color: true, + instrumented_registry: Arc::clone(&instrumented_registry), + }; + + let mut print_output: Vec = Vec::new(); + let exec_out = String::from("Formatted Exec Output"); + print_options.write_output(&mut print_output, exec_out.clone())?; + assert!(print_output.is_empty()); + + print_options.quiet = false; + print_options.write_output(&mut print_output, exec_out.clone())?; + let out_str: String = print_output + .clone() + .try_into() + .expect("Expected successful String conversion"); + assert!(out_str.contains(&exec_out)); + + // clear the previous data from the output so it doesn't pollute the next test + print_output.clear(); + print_options + .instrumented_registry + .set_instrument_mode(InstrumentedObjectStoreMode::Enabled); + print_options.write_output(&mut print_output, exec_out.clone())?; + let out_str: String = print_output + .clone() + .try_into() + .expect("Expected successful String conversion"); + assert!(out_str.contains(&exec_out)); + assert!(out_str.contains(OBJECT_STORE_PROFILING_HEADER)); + + Ok(()) + } +} diff --git a/docs/source/user-guide/cli/usage.md b/docs/source/user-guide/cli/usage.md index 263728f5b04d4..57a96c5d79003 100644 --- a/docs/source/user-guide/cli/usage.md +++ b/docs/source/user-guide/cli/usage.md @@ -63,6 +63,10 @@ OPTIONS: -d, --disk-limit Available disk space for spilling queries (e.g. '10g'), default to None (uses DataFusion's default value of '100g') + --object-store-profiling + Specify the default object_store_profiling mode, defaults to 'disabled'. + [possible values: disabled, enabled] [default: Disabled] + -p, --data-path Path to your data, default to current directory @@ -122,6 +126,12 @@ Available commands inside DataFusion CLI are: > \h function ``` +- Object Store Profiling Mode + +```bash +> \object_store_profiling [disabled|enabled] +``` + ## Supported SQL In addition to the normal [SQL supported in DataFusion], `datafusion-cli` also From e1a95afb409dfeba80eee9ceaa3dc8366a001486 Mon Sep 17 00:00:00 2001 From: Blake Orth Date: Fri, 10 Oct 2025 12:20:50 -0600 Subject: [PATCH 2/2] Improves InstrumentedObjectStoreRegistry ergonomics - Adds better methods to build an InstrumentedObjectStoreRegistry to reduce code duplication in common usage - Enhances test success criteria - Normalizes method names --- .../examples/cli-session-context.rs | 18 ++------ datafusion-cli/src/command.rs | 23 ++++++----- datafusion-cli/src/main.rs | 9 ++-- .../src/object_storage/instrumented.rs | 41 ++++++++++++------- datafusion-cli/src/print_options.rs | 10 ++--- 5 files changed, 50 insertions(+), 51 deletions(-) diff --git a/datafusion-cli/examples/cli-session-context.rs b/datafusion-cli/examples/cli-session-context.rs index 89860d5b2b342..bd2dbb736781f 100644 --- a/datafusion-cli/examples/cli-session-context.rs +++ b/datafusion-cli/examples/cli-session-context.rs @@ -23,18 +23,13 @@ use std::sync::Arc; use datafusion::{ dataframe::DataFrame, error::DataFusionError, - execution::{ - context::SessionState, object_store::DefaultObjectStoreRegistry, TaskContext, - }, + execution::{context::SessionState, TaskContext}, logical_expr::{LogicalPlan, LogicalPlanBuilder}, prelude::SessionContext, }; use datafusion_cli::{ - cli_context::CliSessionContext, - exec::exec_from_repl, - object_storage::instrumented::{ - InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, - }, + cli_context::CliSessionContext, exec::exec_from_repl, + object_storage::instrumented::InstrumentedObjectStoreRegistry, print_options::PrintOptions, }; use object_store::ObjectStore; @@ -91,17 +86,12 @@ impl CliSessionContext for MyUnionerContext { pub async fn main() { let my_ctx = MyUnionerContext::default(); - let profile_mode = InstrumentedObjectStoreMode::default(); - let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new( - Arc::new(DefaultObjectStoreRegistry::new()), - profile_mode, - )); let mut print_options = PrintOptions { format: datafusion_cli::print_format::PrintFormat::Automatic, quiet: false, maxrows: datafusion_cli::print_options::MaxRows::Unlimited, color: true, - instrumented_registry, + instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()), }; exec_from_repl(&my_ctx, &mut print_options).await.unwrap(); diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index 4e4d4269e5548..48fb37e8a8880 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -135,12 +135,12 @@ impl Command { .set_instrument_mode(profile_mode); println!( "ObjectStore Profile mode set to {}", - print_options.instrumented_registry.mode() + print_options.instrumented_registry.instrument_mode() ); } else { println!( "ObjectStore Profile mode is {}", - print_options.instrumented_registry.mode() + print_options.instrumented_registry.instrument_mode() ); } @@ -280,9 +280,7 @@ impl OutputFormat { #[cfg(test)] mod tests { - use datafusion::{ - execution::object_store::DefaultObjectStoreRegistry, prelude::SessionContext, - }; + use datafusion::prelude::SessionContext; use crate::{ object_storage::instrumented::{ @@ -297,28 +295,31 @@ mod tests { async fn command_execute_profile_mode() { let ctx = SessionContext::new(); - let profile_mode = InstrumentedObjectStoreMode::default(); - let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new( - Arc::new(DefaultObjectStoreRegistry::new()), - profile_mode, - )); let mut print_options = PrintOptions { format: PrintFormat::Automatic, quiet: false, maxrows: MaxRows::Unlimited, color: true, - instrumented_registry: Arc::clone(&instrumented_registry), + instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()), }; let mut cmd: Command = "object_store_profiling" .parse() .expect("expected parse to succeed"); assert!(cmd.execute(&ctx, &mut print_options).await.is_ok()); + assert_eq!( + print_options.instrumented_registry.instrument_mode(), + InstrumentedObjectStoreMode::default() + ); cmd = "object_store_profiling enabled" .parse() .expect("expected parse to succeed"); assert!(cmd.execute(&ctx, &mut print_options).await.is_ok()); + assert_eq!( + print_options.instrumented_registry.instrument_mode(), + InstrumentedObjectStoreMode::Enabled + ); cmd = "object_store_profiling does_not_exist" .parse() diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index aa67a47bba67e..3dbe839d3c9b3 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -27,7 +27,6 @@ use datafusion::execution::context::SessionConfig; use datafusion::execution::memory_pool::{ FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, }; -use datafusion::execution::object_store::DefaultObjectStoreRegistry; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::logical_expr::ExplainFormat; use datafusion::prelude::SessionContext; @@ -217,10 +216,10 @@ async fn main_inner() -> Result<()> { rt_builder = rt_builder.with_disk_manager_builder(builder); } - let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new( - Arc::new(DefaultObjectStoreRegistry::new()), - args.object_store_profiling, - )); + let instrumented_registry = Arc::new( + InstrumentedObjectStoreRegistry::new() + .with_profile_mode(args.object_store_profiling), + ); rt_builder = rt_builder.with_object_store_registry(instrumented_registry.clone()); let runtime_env = rt_builder.build_arc()?; diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index 124c196b7e34d..49f174799cde1 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -25,7 +25,10 @@ use std::{ }; use async_trait::async_trait; -use datafusion::{error::DataFusionError, execution::object_store::ObjectStoreRegistry}; +use datafusion::{ + error::DataFusionError, + execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, +}; use futures::stream::BoxStream; use object_store::{ path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, @@ -162,20 +165,28 @@ pub struct InstrumentedObjectStoreRegistry { stores: RwLock>>, } +impl Default for InstrumentedObjectStoreRegistry { + fn default() -> Self { + Self::new() + } +} + impl InstrumentedObjectStoreRegistry { /// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided /// [`ObjectStoreRegistry`] - pub fn new( - registry: Arc, - default_mode: InstrumentedObjectStoreMode, - ) -> Self { + pub fn new() -> Self { Self { - inner: registry, - instrument_mode: AtomicU8::new(default_mode as u8), + inner: Arc::new(DefaultObjectStoreRegistry::new()), + instrument_mode: AtomicU8::new(InstrumentedObjectStoreMode::default() as u8), stores: RwLock::new(Vec::new()), } } + pub fn with_profile_mode(self, mode: InstrumentedObjectStoreMode) -> Self { + self.instrument_mode.store(mode as u8, Ordering::Relaxed); + self + } + /// Provides access to all of the [`InstrumentedObjectStore`]s managed by this /// [`InstrumentedObjectStoreRegistry`] pub fn stores(&self) -> Vec> { @@ -184,7 +195,7 @@ impl InstrumentedObjectStoreRegistry { /// Returns the current [`InstrumentedObjectStoreMode`] for this /// [`InstrumentedObjectStoreRegistry`] - pub fn mode(&self) -> InstrumentedObjectStoreMode { + pub fn instrument_mode(&self) -> InstrumentedObjectStoreMode { self.instrument_mode.load(Ordering::Relaxed).into() } @@ -217,8 +228,6 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { #[cfg(test)] mod tests { - use datafusion::execution::object_store::DefaultObjectStoreRegistry; - use super::*; #[test] @@ -247,11 +256,15 @@ mod tests { #[test] fn instrumented_registry() { - let reg = Arc::new(InstrumentedObjectStoreRegistry::new( - Arc::new(DefaultObjectStoreRegistry::new()), - InstrumentedObjectStoreMode::default(), - )); + let mut reg = InstrumentedObjectStoreRegistry::new(); assert!(reg.stores().is_empty()); + assert_eq!( + reg.instrument_mode(), + InstrumentedObjectStoreMode::default() + ); + + reg = reg.with_profile_mode(InstrumentedObjectStoreMode::Enabled); + assert_eq!(reg.instrument_mode(), InstrumentedObjectStoreMode::Enabled); let store = object_store::memory::InMemory::new(); let url = "mem://test".parse().unwrap(); diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 4addf08e2c284..0df0106fb12af 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -188,7 +188,8 @@ impl PrintOptions { if !self.quiet { writeln!(writer, "{formatted_exec_details}")?; - if self.instrumented_registry.mode() != InstrumentedObjectStoreMode::Disabled + if self.instrumented_registry.instrument_mode() + != InstrumentedObjectStoreMode::Disabled { writeln!(writer, "{OBJECT_STORE_PROFILING_HEADER}")?; for store in self.instrumented_registry.stores() { @@ -204,17 +205,12 @@ impl PrintOptions { #[cfg(test)] mod tests { use datafusion::error::Result; - use datafusion::execution::object_store::DefaultObjectStoreRegistry; use super::*; #[test] fn write_output() -> Result<()> { - let profile_mode = InstrumentedObjectStoreMode::default(); - let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new( - Arc::new(DefaultObjectStoreRegistry::new()), - profile_mode, - )); + let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new()); let mut print_options = PrintOptions { format: PrintFormat::Automatic, quiet: true,