diff --git a/datafusion-cli/examples/cli-session-context.rs b/datafusion-cli/examples/cli-session-context.rs index 1a8f15c8731b..bd2dbb736781 100644 --- a/datafusion-cli/examples/cli-session-context.rs +++ b/datafusion-cli/examples/cli-session-context.rs @@ -28,7 +28,9 @@ use datafusion::{ prelude::SessionContext, }; use datafusion_cli::{ - cli_context::CliSessionContext, exec::exec_from_repl, print_options::PrintOptions, + cli_context::CliSessionContext, exec::exec_from_repl, + object_storage::instrumented::InstrumentedObjectStoreRegistry, + print_options::PrintOptions, }; use object_store::ObjectStore; @@ -89,6 +91,7 @@ pub async fn main() { quiet: false, maxrows: datafusion_cli::print_options::MaxRows::Unlimited, color: true, + instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()), }; exec_from_repl(&my_ctx, &mut print_options).await.unwrap(); diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index 58c19160d12b..48fb37e8a888 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -46,6 +46,7 @@ pub enum Command { SearchFunctions(String), QuietMode(Option), OutputFormat(Option), + ObjectStoreProfileMode(Option), } pub enum OutputFormat { @@ -122,6 +123,29 @@ impl Command { Self::OutputFormat(_) => exec_err!( "Unexpected change output format, this should be handled outside" ), + Self::ObjectStoreProfileMode(mode) => { + if let Some(mode) = mode { + let profile_mode = mode + .parse() + .map_err(|_| + exec_datafusion_err!("Failed to parse input: {mode}. Valid options are disabled, enabled") + )?; + print_options + .instrumented_registry + .set_instrument_mode(profile_mode); + println!( + "ObjectStore Profile mode set to {}", + print_options.instrumented_registry.instrument_mode() + ); + } else { + println!( + "ObjectStore Profile mode is {}", + print_options.instrumented_registry.instrument_mode() + ); + } + + Ok(()) + } } } @@ -140,11 +164,15 @@ impl Command { Self::OutputFormat(_) => { ("\\pset [NAME [VALUE]]", "set table output option\n(format)") } + Self::ObjectStoreProfileMode(_) => ( + "\\object_store_profiling (disabled|enabled)", + "print or set object store profile mode", + ), } } } -const ALL_COMMANDS: [Command; 9] = [ +const ALL_COMMANDS: [Command; 10] = [ Command::ListTables, Command::DescribeTableStmt(String::new()), Command::Quit, @@ -154,6 +182,7 @@ const ALL_COMMANDS: [Command; 9] = [ Command::SearchFunctions(String::new()), Command::QuietMode(None), Command::OutputFormat(None), + Command::ObjectStoreProfileMode(None), ]; fn all_commands_info() -> RecordBatch { @@ -204,6 +233,10 @@ impl FromStr for Command { Self::OutputFormat(Some(subcommand.to_string())) } ("pset", None) => Self::OutputFormat(None), + ("object_store_profiling", Some(mode)) => { + Self::ObjectStoreProfileMode(Some(mode.to_string())) + } + ("object_store_profiling", None) => Self::ObjectStoreProfileMode(None), _ => return Err(()), }) } @@ -244,3 +277,53 @@ impl OutputFormat { } } } + +#[cfg(test)] +mod tests { + use datafusion::prelude::SessionContext; + + use crate::{ + object_storage::instrumented::{ + InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, + }, + print_options::MaxRows, + }; + + use super::*; + + #[tokio::test] + async fn command_execute_profile_mode() { + let ctx = SessionContext::new(); + + let mut print_options = PrintOptions { + format: PrintFormat::Automatic, + quiet: false, + maxrows: MaxRows::Unlimited, + color: true, + instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()), + }; + + let mut cmd: Command = "object_store_profiling" + .parse() + .expect("expected parse to succeed"); + assert!(cmd.execute(&ctx, &mut print_options).await.is_ok()); + assert_eq!( + print_options.instrumented_registry.instrument_mode(), + InstrumentedObjectStoreMode::default() + ); + + cmd = "object_store_profiling enabled" + .parse() + .expect("expected parse to succeed"); + assert!(cmd.execute(&ctx, &mut print_options).await.is_ok()); + assert_eq!( + print_options.instrumented_registry.instrument_mode(), + InstrumentedObjectStoreMode::Enabled + ); + + cmd = "object_store_profiling does_not_exist" + .parse() + .expect("expected parse to succeed"); + assert!(cmd.execute(&ctx, &mut print_options).await.is_err()); + } +} diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 04a8c2a0f1d8..3dbe839d3c9b 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -27,7 +27,6 @@ use datafusion::execution::context::SessionConfig; use datafusion::execution::memory_pool::{ FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool, }; -use datafusion::execution::object_store::DefaultObjectStoreRegistry; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::logical_expr::ExplainFormat; use datafusion::prelude::SessionContext; @@ -149,6 +148,13 @@ struct Args { value_parser(extract_disk_limit) )] disk_limit: Option, + + #[clap( + long, + help = "Specify the default object_store_profiling mode, defaults to 'disabled'.\n[possible values: disabled, enabled]", + default_value_t = InstrumentedObjectStoreMode::Disabled + )] + object_store_profiling: InstrumentedObjectStoreMode, } #[tokio::main] @@ -210,10 +216,10 @@ async fn main_inner() -> Result<()> { rt_builder = rt_builder.with_disk_manager_builder(builder); } - let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new( - Arc::new(DefaultObjectStoreRegistry::new()), - InstrumentedObjectStoreMode::default(), - )); + let instrumented_registry = Arc::new( + InstrumentedObjectStoreRegistry::new() + .with_profile_mode(args.object_store_profiling), + ); rt_builder = rt_builder.with_object_store_registry(instrumented_registry.clone()); let runtime_env = rt_builder.build_arc()?; @@ -243,6 +249,7 @@ async fn main_inner() -> Result<()> { quiet: args.quiet, maxrows: args.maxrows, color: args.color, + instrumented_registry: Arc::clone(&instrumented_registry), }; let commands = args.command; diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index f0313da3a379..49f174799cde 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -25,17 +25,20 @@ use std::{ }; use async_trait::async_trait; -use datafusion::{error::DataFusionError, execution::object_store::ObjectStoreRegistry}; +use datafusion::{ + error::DataFusionError, + execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, +}; use futures::stream::BoxStream; use object_store::{ path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, }; +use parking_lot::RwLock; use url::Url; -/// The profiling mode to use for an [`ObjectStore`] instance that has been instrumented to collect -/// profiling data. Collecting profiling data will have a small negative impact on both CPU and -/// memory usage. Default is `Disabled` +/// The profiling mode to use for an [`InstrumentedObjectStore`] instance. Collecting profiling +/// data will have a small negative impact on both CPU and memory usage. Default is `Disabled` #[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] pub enum InstrumentedObjectStoreMode { /// Disable collection of profiling data @@ -75,7 +78,7 @@ impl From for InstrumentedObjectStoreMode { /// Wrapped [`ObjectStore`] instances that record information for reporting on the usage of the /// inner [`ObjectStore`] #[derive(Debug)] -struct InstrumentedObjectStore { +pub struct InstrumentedObjectStore { inner: Arc, instrument_mode: AtomicU8, } @@ -88,6 +91,10 @@ impl InstrumentedObjectStore { instrument_mode, } } + + fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) { + self.instrument_mode.store(mode as u8, Ordering::Relaxed) + } } impl fmt::Display for InstrumentedObjectStore { @@ -150,23 +157,53 @@ impl ObjectStore for InstrumentedObjectStore { } } -/// Provides access to [`ObjectStore`] instances that record requests for reporting +/// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting #[derive(Debug)] pub struct InstrumentedObjectStoreRegistry { inner: Arc, - instrument_mode: InstrumentedObjectStoreMode, + instrument_mode: AtomicU8, + stores: RwLock>>, +} + +impl Default for InstrumentedObjectStoreRegistry { + fn default() -> Self { + Self::new() + } } impl InstrumentedObjectStoreRegistry { /// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the provided /// [`ObjectStoreRegistry`] - pub fn new( - registry: Arc, - default_mode: InstrumentedObjectStoreMode, - ) -> Self { + pub fn new() -> Self { Self { - inner: registry, - instrument_mode: default_mode, + inner: Arc::new(DefaultObjectStoreRegistry::new()), + instrument_mode: AtomicU8::new(InstrumentedObjectStoreMode::default() as u8), + stores: RwLock::new(Vec::new()), + } + } + + pub fn with_profile_mode(self, mode: InstrumentedObjectStoreMode) -> Self { + self.instrument_mode.store(mode as u8, Ordering::Relaxed); + self + } + + /// Provides access to all of the [`InstrumentedObjectStore`]s managed by this + /// [`InstrumentedObjectStoreRegistry`] + pub fn stores(&self) -> Vec> { + self.stores.read().clone() + } + + /// Returns the current [`InstrumentedObjectStoreMode`] for this + /// [`InstrumentedObjectStoreRegistry`] + pub fn instrument_mode(&self) -> InstrumentedObjectStoreMode { + self.instrument_mode.load(Ordering::Relaxed).into() + } + + /// Sets the [`InstrumentedObjectStoreMode`] for this [`InstrumentedObjectStoreRegistry`] + pub fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) { + self.instrument_mode.store(mode as u8, Ordering::Relaxed); + for s in self.stores.read().iter() { + s.set_instrument_mode(mode) } } } @@ -177,8 +214,10 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { url: &Url, store: Arc, ) -> Option> { - let mode = AtomicU8::new(self.instrument_mode as u8); - let instrumented = Arc::new(InstrumentedObjectStore::new(store, mode)); + let mode = self.instrument_mode.load(Ordering::Relaxed); + let instrumented = + Arc::new(InstrumentedObjectStore::new(store, AtomicU8::new(mode))); + self.stores.write().push(Arc::clone(&instrumented)); self.inner.register_store(url, instrumented) } @@ -189,8 +228,6 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { #[cfg(test)] mod tests { - use datafusion::execution::object_store::DefaultObjectStoreRegistry; - use super::*; #[test] @@ -219,17 +256,23 @@ mod tests { #[test] fn instrumented_registry() { - let reg = Arc::new(InstrumentedObjectStoreRegistry::new( - Arc::new(DefaultObjectStoreRegistry::new()), - InstrumentedObjectStoreMode::default(), - )); - let store = object_store::memory::InMemory::new(); + let mut reg = InstrumentedObjectStoreRegistry::new(); + assert!(reg.stores().is_empty()); + assert_eq!( + reg.instrument_mode(), + InstrumentedObjectStoreMode::default() + ); + reg = reg.with_profile_mode(InstrumentedObjectStoreMode::Enabled); + assert_eq!(reg.instrument_mode(), InstrumentedObjectStoreMode::Enabled); + + let store = object_store::memory::InMemory::new(); let url = "mem://test".parse().unwrap(); let registered = reg.register_store(&url, Arc::new(store)); assert!(registered.is_none()); let fetched = reg.get_store(&url); - assert!(fetched.is_ok()) + assert!(fetched.is_ok()); + assert_eq!(reg.stores().len(), 1); } } diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index 56d787b0fe08..0df0106fb12a 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -16,10 +16,14 @@ // under the License. use std::fmt::{Display, Formatter}; -use std::io::Write; +use std::io; use std::pin::Pin; use std::str::FromStr; +use std::sync::Arc; +use crate::object_storage::instrumented::{ + InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, +}; use crate::print_format::PrintFormat; use arrow::datatypes::SchemaRef; @@ -67,12 +71,15 @@ impl Display for MaxRows { } } +const OBJECT_STORE_PROFILING_HEADER: &str = "Object Store Profiling"; + #[derive(Debug, Clone)] pub struct PrintOptions { pub format: PrintFormat, pub quiet: bool, pub maxrows: MaxRows, pub color: bool, + pub instrumented_registry: Arc, } // Returns the query execution details formatted @@ -128,11 +135,7 @@ impl PrintOptions { query_start_time, ); - if !self.quiet { - writeln!(writer, "{formatted_exec_details}")?; - } - - Ok(()) + self.write_output(&mut writer, formatted_exec_details) } /// Print the stream to stdout using the specified format @@ -174,10 +177,74 @@ impl PrintOptions { query_start_time, ); + self.write_output(&mut writer, formatted_exec_details) + } + + fn write_output( + &self, + writer: &mut W, + formatted_exec_details: String, + ) -> Result<()> { if !self.quiet { writeln!(writer, "{formatted_exec_details}")?; + + if self.instrumented_registry.instrument_mode() + != InstrumentedObjectStoreMode::Disabled + { + writeln!(writer, "{OBJECT_STORE_PROFILING_HEADER}")?; + for store in self.instrumented_registry.stores() { + writeln!(writer, "{store}")?; + } + } } Ok(()) } } + +#[cfg(test)] +mod tests { + use datafusion::error::Result; + + use super::*; + + #[test] + fn write_output() -> Result<()> { + let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new()); + let mut print_options = PrintOptions { + format: PrintFormat::Automatic, + quiet: true, + maxrows: MaxRows::Unlimited, + color: true, + instrumented_registry: Arc::clone(&instrumented_registry), + }; + + let mut print_output: Vec = Vec::new(); + let exec_out = String::from("Formatted Exec Output"); + print_options.write_output(&mut print_output, exec_out.clone())?; + assert!(print_output.is_empty()); + + print_options.quiet = false; + print_options.write_output(&mut print_output, exec_out.clone())?; + let out_str: String = print_output + .clone() + .try_into() + .expect("Expected successful String conversion"); + assert!(out_str.contains(&exec_out)); + + // clear the previous data from the output so it doesn't pollute the next test + print_output.clear(); + print_options + .instrumented_registry + .set_instrument_mode(InstrumentedObjectStoreMode::Enabled); + print_options.write_output(&mut print_output, exec_out.clone())?; + let out_str: String = print_output + .clone() + .try_into() + .expect("Expected successful String conversion"); + assert!(out_str.contains(&exec_out)); + assert!(out_str.contains(OBJECT_STORE_PROFILING_HEADER)); + + Ok(()) + } +} diff --git a/docs/source/user-guide/cli/usage.md b/docs/source/user-guide/cli/usage.md index 263728f5b04d..57a96c5d7900 100644 --- a/docs/source/user-guide/cli/usage.md +++ b/docs/source/user-guide/cli/usage.md @@ -63,6 +63,10 @@ OPTIONS: -d, --disk-limit Available disk space for spilling queries (e.g. '10g'), default to None (uses DataFusion's default value of '100g') + --object-store-profiling + Specify the default object_store_profiling mode, defaults to 'disabled'. + [possible values: disabled, enabled] [default: Disabled] + -p, --data-path Path to your data, default to current directory @@ -122,6 +126,12 @@ Available commands inside DataFusion CLI are: > \h function ``` +- Object Store Profiling Mode + +```bash +> \object_store_profiling [disabled|enabled] +``` + ## Supported SQL In addition to the normal [SQL supported in DataFusion], `datafusion-cli` also