From 6a7d07de394e3704da6e313482cf6d831d931984 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 10 Jul 2021 09:26:46 -0600 Subject: [PATCH 1/5] Roughing out configuration for Ballista --- ballista/rust/client/src/context.rs | 31 ++++++++++---- ballista/rust/client/src/prelude.rs | 2 + ballista/rust/core/proto/ballista.proto | 4 +- ballista/rust/core/src/config.rs | 56 +++++++++++++++++++++++++ ballista/rust/core/src/lib.rs | 1 + ballista/rust/core/src/utils.rs | 6 ++- ballista/rust/scheduler/src/lib.rs | 19 +++++++-- benchmarks/src/bin/tpch.rs | 18 +++++--- 8 files changed, 119 insertions(+), 18 deletions(-) create mode 100644 ballista/rust/core/src/config.rs diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index aca712e1d8782..210bb1337cf66 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -23,6 +23,7 @@ use std::sync::{Arc, Mutex}; use std::{collections::HashMap, convert::TryInto}; use std::{fs, time::Duration}; +use ballista_core::config::BallistaConfig; use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient; use ballista_core::serde::protobuf::PartitionLocation; use ballista_core::serde::protobuf::{ @@ -45,6 +46,8 @@ use futures::StreamExt; use log::{error, info}; struct BallistaContextState { + /// Ballista configuration + config: BallistaConfig, /// Scheduler host scheduler_host: String, /// Scheduler port @@ -54,8 +57,13 @@ struct BallistaContextState { } impl BallistaContextState { - pub fn new(scheduler_host: String, scheduler_port: u16) -> Self { + pub fn new( + scheduler_host: String, + scheduler_port: u16, + config: &BallistaConfig, + ) -> Self { Self { + config: config.clone(), scheduler_host, scheduler_port, tables: HashMap::new(), @@ -64,6 +72,7 @@ impl BallistaContextState { #[cfg(feature = "standalone")] pub async fn new_standalone( + config: &BallistaConfig, concurrent_tasks: usize, ) -> ballista_core::error::Result { info!("Running in local mode. Scheduler will be run in-proc"); @@ -87,11 +96,16 @@ impl BallistaContextState { ballista_executor::new_standalone_executor(scheduler, concurrent_tasks).await?; Ok(Self { + config: config.clone(), scheduler_host: "localhost".to_string(), scheduler_port: addr.port(), tables: HashMap::new(), }) } + + pub fn config(&self) -> &BallistaConfig { + &self.config + } } pub struct BallistaContext { @@ -100,8 +114,8 @@ pub struct BallistaContext { impl BallistaContext { /// Create a context for executing queries against a remote Ballista scheduler instance - pub fn remote(host: &str, port: u16) -> Self { - let state = BallistaContextState::new(host.to_owned(), port); + pub fn remote(host: &str, port: u16, config: &BallistaConfig) -> Self { + let state = BallistaContextState::new(host.to_owned(), port, config); Self { state: Arc::new(Mutex::new(state)), @@ -110,9 +124,11 @@ impl BallistaContext { #[cfg(feature = "standalone")] pub async fn standalone( + config: &BallistaConfig, concurrent_tasks: usize, ) -> ballista_core::error::Result { - let state = BallistaContextState::new_standalone(concurrent_tasks).await?; + let state = + BallistaContextState::new_standalone(config, concurrent_tasks).await?; Ok(Self { state: Arc::new(Mutex::new(state)), @@ -127,7 +143,7 @@ impl BallistaContext { let path = fs::canonicalize(&path)?; // use local DataFusion context for now but later this might call the scheduler - let mut ctx = create_datafusion_context(); + let mut ctx = create_datafusion_context(&self.state.lock().unwrap().config()); let df = ctx.read_parquet(path.to_str().unwrap())?; Ok(df) } @@ -144,7 +160,7 @@ impl BallistaContext { let path = fs::canonicalize(&path)?; // use local DataFusion context for now but later this might call the scheduler - let mut ctx = create_datafusion_context(); + let mut ctx = create_datafusion_context(&self.state.lock().unwrap().config()); let df = ctx.read_csv(path.to_str().unwrap(), options)?; Ok(df) } @@ -176,9 +192,9 @@ impl BallistaContext { /// Create a DataFrame from a SQL statement pub fn sql(&self, sql: &str) -> Result> { // use local DataFusion context for now but later this might call the scheduler - let mut ctx = create_datafusion_context(); // register tables let state = self.state.lock().unwrap(); + let mut ctx = create_datafusion_context(&state.config()); for (name, plan) in &state.tables { let plan = ctx.optimize(plan)?; let execution_plan = ctx.create_physical_plan(&plan)?; @@ -238,6 +254,7 @@ impl BallistaContext { .try_into() .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?, )), + settings: vec![], //TODO: serde for &self.state.lock().unwrap().config().clone(), }) .await .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))? diff --git a/ballista/rust/client/src/prelude.rs b/ballista/rust/client/src/prelude.rs index 2f940aef4c976..d162d0c017bd4 100644 --- a/ballista/rust/client/src/prelude.rs +++ b/ballista/rust/client/src/prelude.rs @@ -18,6 +18,8 @@ //! Ballista Prelude (common imports) pub use crate::context::BallistaContext; +pub use ballista_core::config::BallistaConfig; +pub use ballista_core::config::BALLISTA_DEFAULT_SHUFFLE_PARTITIONS; pub use ballista_core::error::{BallistaError, Result}; pub use futures::StreamExt; diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 4696d21852fc2..b1c153de64c24 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -803,7 +803,9 @@ message ExecuteQueryParams { oneof query { LogicalPlanNode logical_plan = 1; string sql = 2; - }} + } + repeated KeyValuePair settings = 3; +} message ExecuteSqlParams { string sql = 1; diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs new file mode 100644 index 0000000000000..e95487096f6ca --- /dev/null +++ b/ballista/rust/core/src/config.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +//! Ballista configuration + +use std::collections::HashMap; + +use log::warn; + +pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitions"; + +/// Ballista configuration +#[derive(Debug, Clone)] +pub struct BallistaConfig { + /// Settings stored in map for easy serde + settings: HashMap, +} + +impl BallistaConfig { + /// Create a new configuration based on key-value pairs + pub fn new(settings: HashMap) -> Self { + Self { settings } + } + + pub fn settings(&self) -> &HashMap { + &self.settings + } + + pub fn default_shuffle_partitions(&self) -> usize { + self.get_usize_setting(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, 2) + } + + fn get_usize_setting(&self, key: &str, default_value: usize) -> usize { + if let Some(v) = self.settings.get(key) { + //TODO error handling + v.parse().unwrap() + } else { + default_value + } + } +} diff --git a/ballista/rust/core/src/lib.rs b/ballista/rust/core/src/lib.rs index 425dbab34c132..2a8486945ad0a 100644 --- a/ballista/rust/core/src/lib.rs +++ b/ballista/rust/core/src/lib.rs @@ -24,6 +24,7 @@ pub fn print_version() { } pub mod client; +pub mod config; pub mod datasource; pub mod error; pub mod execution_plans; diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index f7d884d502985..7e9a55af1a777 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -27,6 +27,7 @@ use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec}; use crate::memory_stream::MemoryStream; use crate::serde::scheduler::PartitionStats; +use crate::config::BallistaConfig; use datafusion::arrow::error::Result as ArrowResult; use datafusion::arrow::{ array::{ @@ -233,8 +234,9 @@ fn build_exec_plan_diagram( } /// Create a DataFusion context that is compatible with Ballista -pub fn create_datafusion_context() -> ExecutionContext { - let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins +pub fn create_datafusion_context(config: &BallistaConfig) -> ExecutionContext { + let config = + ExecutionConfig::new().with_concurrency(config.default_shuffle_partitions()); ExecutionContext::with_config(config) } diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 3bd4c03aa9c33..811935b7587e7 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -79,8 +79,10 @@ use rand::{distributions::Alphanumeric, thread_rng, Rng}; use tonic::{Request, Response}; use self::state::{ConfigBackendClient, SchedulerState}; +use ballista_core::config::BallistaConfig; use ballista_core::utils::create_datafusion_context; use datafusion::physical_plan::parquet::ParquetExec; +use std::collections::HashMap; use std::time::{Instant, SystemTime, UNIX_EPOCH}; #[derive(Clone)] @@ -290,7 +292,18 @@ impl SchedulerGrpc for SchedulerServer { &self, request: Request, ) -> std::result::Result, tonic::Status> { - if let ExecuteQueryParams { query: Some(query) } = request.into_inner() { + if let ExecuteQueryParams { + query: Some(query), + settings, + } = request.into_inner() + { + // parse config + let mut config = HashMap::new(); + for kv_pair in &settings { + config.insert(kv_pair.key.clone(), kv_pair.value.clone()); + } + let config = BallistaConfig::new(config); + let plan = match query { Query::LogicalPlan(logical_plan) => { // parse protobuf @@ -303,7 +316,7 @@ impl SchedulerGrpc for SchedulerServer { Query::Sql(sql) => { //TODO we can't just create a new context because we need a context that has // tables registered from previous SQL statements that have been executed - let mut ctx = create_datafusion_context(); + let mut ctx = create_datafusion_context(&config); let df = ctx.sql(&sql).map_err(|e| { let msg = format!("Error parsing SQL: {}", e); error!("{}", msg); @@ -339,7 +352,7 @@ impl SchedulerGrpc for SchedulerServer { let job_id_spawn = job_id.clone(); tokio::spawn(async move { // create physical plan using DataFusion - let datafusion_ctx = create_datafusion_context(); + let datafusion_ctx = create_datafusion_context(&config); macro_rules! fail_job { ($code :expr) => {{ match $code { diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index a52b6d208cff4..e903fc5572838 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -18,6 +18,7 @@ //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark. use std::{ + collections::HashMap, fs, iter::Iterator, path::{Path, PathBuf}, @@ -28,21 +29,21 @@ use std::{ use futures::StreamExt; use ballista::context::BallistaContext; +use ballista::prelude::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::util::pretty; - use datafusion::datasource::parquet::ParquetTable; use datafusion::datasource::{CsvFile, MemTable, TableProvider}; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::LogicalPlan; -use datafusion::physical_plan::{collect, displayable}; -use datafusion::prelude::*; - use datafusion::parquet::basic::Compression; use datafusion::parquet::file::properties::WriterProperties; use datafusion::physical_plan::display::DisplayableExecutionPlan; +use datafusion::physical_plan::{collect, displayable}; +use datafusion::prelude::*; + use structopt::StructOpt; #[cfg(feature = "snmalloc")] @@ -252,7 +253,14 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result Result<()> { println!("Running benchmarks with the following options: {:?}", opt); - let ctx = BallistaContext::remote(opt.host.unwrap().as_str(), opt.port.unwrap()); + let mut config = HashMap::new(); + config.insert( + BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_owned(), + "4".to_owned(), + ); + let config = BallistaConfig::new(config); + let ctx = + BallistaContext::remote(opt.host.unwrap().as_str(), opt.port.unwrap(), &config); // register tables with Ballista context let path = opt.path.to_str().unwrap(); From b09e96e99acead6b478c21a4e546c5358dd18f08 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 10 Jul 2021 09:36:39 -0600 Subject: [PATCH 2/5] serde --- ballista/rust/client/src/context.rs | 25 +++++++++++++++-------- ballista/rust/scheduler/src/test_utils.rs | 3 ++- benchmarks/src/bin/tpch.rs | 6 +++++- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index 210bb1337cf66..b8210cbc26266 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -24,15 +24,14 @@ use std::{collections::HashMap, convert::TryInto}; use std::{fs, time::Duration}; use ballista_core::config::BallistaConfig; -use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient; -use ballista_core::serde::protobuf::PartitionLocation; use ballista_core::serde::protobuf::{ - execute_query_params::Query, job_status, ExecuteQueryParams, GetJobStatusParams, - GetJobStatusResult, + execute_query_params::Query, job_status, scheduler_grpc_client::SchedulerGrpcClient, + ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair, + PartitionLocation, }; -use ballista_core::utils::WrappedStream; use ballista_core::{ client::BallistaClient, datasource::DfTableAdapter, utils::create_datafusion_context, + utils::WrappedStream, }; use datafusion::arrow::datatypes::Schema; @@ -233,10 +232,11 @@ impl BallistaContext { &self, plan: &LogicalPlan, ) -> Result>> { - let scheduler_url = { + let (scheduler_url, config) = { let state = self.state.lock().unwrap(); - - format!("http://{}:{}", state.scheduler_host, state.scheduler_port) + let scheduler_url = + format!("http://{}:{}", state.scheduler_host, state.scheduler_port); + (scheduler_url, state.config.clone()) }; info!("Connecting to Ballista scheduler at {}", scheduler_url); @@ -254,7 +254,14 @@ impl BallistaContext { .try_into() .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?, )), - settings: vec![], //TODO: serde for &self.state.lock().unwrap().config().clone(), + settings: config + .settings() + .iter() + .map(|(k, v)| KeyValuePair { + key: k.to_owned(), + value: v.to_owned(), + }) + .collect::>(), }) .await .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))? diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs index aa1e2b2575aa9..5b7b685d7be9e 100644 --- a/ballista/rust/scheduler/src/test_utils.rs +++ b/ballista/rust/scheduler/src/test_utils.rs @@ -26,7 +26,8 @@ pub const TPCH_TABLES: &[&str] = &[ ]; pub fn datafusion_test_context(path: &str) -> Result { - let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins + let default_shuffle_partitions = 2; + let config = ExecutionConfig::new().with_concurrency(default_shuffle_partitions); let mut ctx = ExecutionContext::with_config(config); for table in TPCH_TABLES { let schema = get_tpch_schema(table); diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index e903fc5572838..e63ccb09be976 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -95,6 +95,10 @@ struct BallistaBenchmarkOpt { /// Ballista executor port #[structopt(long = "port")] port: Option, + + /// Number of shuffle partitions + #[structopt(short, long, default_value = "2")] + shuffle_partitions: usize, } #[derive(Debug, StructOpt, Clone)] @@ -256,7 +260,7 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> { let mut config = HashMap::new(); config.insert( BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_owned(), - "4".to_owned(), + format!("{}", opt.shuffle_partitions), ); let config = BallistaConfig::new(config); let ctx = From 9b0df1448fd383cca523ccdcb2e942c758887d66 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 10 Jul 2021 14:12:41 -0600 Subject: [PATCH 3/5] error handling --- ballista/rust/core/src/config.rs | 108 +++++++++++++++++++++++++++-- ballista/rust/scheduler/src/lib.rs | 6 +- benchmarks/src/bin/tpch.rs | 3 +- 3 files changed, 109 insertions(+), 8 deletions(-) diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs index e95487096f6ca..1788fccd99472 100644 --- a/ballista/rust/core/src/config.rs +++ b/ballista/rust/core/src/config.rs @@ -20,10 +20,38 @@ use std::collections::HashMap; +use crate::error::{BallistaError, Result}; + +use datafusion::arrow::datatypes::DataType; use log::warn; pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitions"; +/// Configuration option meta-data +#[derive(Debug, Clone)] +pub struct ConfigEntry { + name: String, + description: String, + data_type: DataType, + default_value: Option, +} + +impl ConfigEntry { + fn new( + name: String, + description: String, + data_type: DataType, + default_value: Option, + ) -> Self { + Self { + name, + description, + data_type, + default_value, + } + } +} + /// Ballista configuration #[derive(Debug, Clone)] pub struct BallistaConfig { @@ -33,8 +61,36 @@ pub struct BallistaConfig { impl BallistaConfig { /// Create a new configuration based on key-value pairs - pub fn new(settings: HashMap) -> Self { - Self { settings } + pub fn new(settings: HashMap) -> Result { + let supported_entries = BallistaConfig::valid_entries(); + for (name, entry) in &supported_entries { + if let Some(v) = settings.get(name) { + // validate that we can parse the user-supplied value + let _ = v.parse::().map_err(|e| BallistaError::General(format!("Failed to parse user-supplied value '{}' for configuration setting '{}': {:?}", name, v, e)))?; + } else if let Some(v) = entry.default_value.clone() { + let _ = v.parse::().map_err(|e| BallistaError::General(format!("Failed to parse default value '{}' for configuration setting '{}': {:?}", name, v, e)))?; + } else { + return Err(BallistaError::General(format!( + "No value specified for mandatory configuration setting '{}'", + name + ))); + } + } + + Ok(Self { settings }) + } + + /// All available configuration options + pub fn valid_entries() -> HashMap { + let entries = vec![ + ConfigEntry::new(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_string(), + "Sets the default number of partitions to create when repartitioning query stages".to_string(), + DataType::UInt16, Some("2".to_string())), + ]; + entries + .iter() + .map(|e| (e.name.clone(), e.clone())) + .collect::>() } pub fn settings(&self) -> &HashMap { @@ -42,15 +98,55 @@ impl BallistaConfig { } pub fn default_shuffle_partitions(&self) -> usize { - self.get_usize_setting(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, 2) + self.get_usize_setting(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS) } - fn get_usize_setting(&self, key: &str, default_value: usize) -> usize { + fn get_usize_setting(&self, key: &str) -> usize { if let Some(v) = self.settings.get(key) { - //TODO error handling + // infallible because we validate all configs in the constructor v.parse().unwrap() } else { - default_value + let entries = Self::valid_entries(); + // infallible because we validate all configs in the constructor + let v = entries.get(key).unwrap().default_value.as_ref().unwrap(); + v.parse().unwrap() } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn default_config() -> Result<()> { + let config = BallistaConfig::new(HashMap::new())?; + assert_eq!(2, config.default_shuffle_partitions()); + Ok(()) + } + + #[test] + fn custom_config() -> Result<()> { + let mut settings = HashMap::new(); + settings.insert( + BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_string(), + "123".to_string(), + ); + let config = BallistaConfig::new(settings)?; + assert_eq!(123, config.default_shuffle_partitions()); + Ok(()) + } + + #[test] + fn custom_config_invalid() -> Result<()> { + let mut settings = HashMap::new(); + settings.insert( + BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_string(), + "true".to_string(), + ); + let config = BallistaConfig::new(settings); + assert!(config.is_err()); + assert_eq!("General(\"Failed to parse user-supplied value 'ballista.shuffle.partitions' for configuration setting 'true': ParseIntError { kind: InvalidDigit }\")", format!("{:?}", config.unwrap_err())); + Ok(()) + } +} diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 811935b7587e7..aabc32c0df593 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -302,7 +302,11 @@ impl SchedulerGrpc for SchedulerServer { for kv_pair in &settings { config.insert(kv_pair.key.clone(), kv_pair.value.clone()); } - let config = BallistaConfig::new(config); + let config = BallistaConfig::new(config).map_err(|e| { + let msg = format!("Could not parse configs: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; let plan = match query { Query::LogicalPlan(logical_plan) => { diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index e63ccb09be976..66541906ab4d4 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -262,7 +262,8 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> { BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_owned(), format!("{}", opt.shuffle_partitions), ); - let config = BallistaConfig::new(config); + let config = BallistaConfig::new(config) + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; let ctx = BallistaContext::remote(opt.host.unwrap().as_str(), opt.port.unwrap(), &config); From 2f0a202795444bbc418aed5fcb4897e416ebd56b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 10 Jul 2021 14:48:36 -0600 Subject: [PATCH 4/5] use config builder for better UX --- ballista/rust/core/src/config.rs | 58 ++++++++++++++++++++++++-------- benchmarks/src/bin/tpch.rs | 14 ++++---- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs index 1788fccd99472..3cf5fb8bbe446 100644 --- a/ballista/rust/core/src/config.rs +++ b/ballista/rust/core/src/config.rs @@ -52,6 +52,32 @@ impl ConfigEntry { } } +/// Ballista configuration builder +pub struct BallistaConfigBuilder { + settings: HashMap, +} + +impl Default for BallistaConfigBuilder { + /// Create a new config builder + fn default() -> Self { + Self { settings: HashMap::new() } + } +} + +impl BallistaConfigBuilder { + + /// Create a new config with an additional setting + pub fn set(&self, k: &str, v: &str) -> Self { + let mut settings = self.settings.clone(); + settings.insert(k.to_owned(), v.to_owned()); + Self { settings } + } + + pub fn build(&self) -> Result { + BallistaConfig::with_settings(self.settings.clone()) + } +} + /// Ballista configuration #[derive(Debug, Clone)] pub struct BallistaConfig { @@ -60,8 +86,19 @@ pub struct BallistaConfig { } impl BallistaConfig { + + /// Create a default configuration + pub fn new() -> Result { + Self::with_settings(HashMap::new()) + } + + /// Create a configuration builder + pub fn builder() -> BallistaConfigBuilder { + BallistaConfigBuilder::default() + } + /// Create a new configuration based on key-value pairs - pub fn new(settings: HashMap) -> Result { + pub fn with_settings(settings: HashMap) -> Result { let supported_entries = BallistaConfig::valid_entries(); for (name, entry) in &supported_entries { if let Some(v) = settings.get(name) { @@ -120,31 +157,24 @@ mod tests { #[test] fn default_config() -> Result<()> { - let config = BallistaConfig::new(HashMap::new())?; + let config = BallistaConfig::new()?; assert_eq!(2, config.default_shuffle_partitions()); Ok(()) } #[test] fn custom_config() -> Result<()> { - let mut settings = HashMap::new(); - settings.insert( - BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_string(), - "123".to_string(), - ); - let config = BallistaConfig::new(settings)?; + let config = BallistaConfig::builder() + .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "123") + .build()?; assert_eq!(123, config.default_shuffle_partitions()); Ok(()) } #[test] fn custom_config_invalid() -> Result<()> { - let mut settings = HashMap::new(); - settings.insert( - BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_string(), - "true".to_string(), - ); - let config = BallistaConfig::new(settings); + let config = BallistaConfig::builder() + .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "true").build(); assert!(config.is_err()); assert_eq!("General(\"Failed to parse user-supplied value 'ballista.shuffle.partitions' for configuration setting 'true': ParseIntError { kind: InvalidDigit }\")", format!("{:?}", config.unwrap_err())); Ok(()) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 66541906ab4d4..169319d30beef 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -18,7 +18,6 @@ //! Benchmark derived from TPC-H. This is not an official TPC-H benchmark. use std::{ - collections::HashMap, fs, iter::Iterator, path::{Path, PathBuf}, @@ -257,13 +256,14 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result Result<()> { println!("Running benchmarks with the following options: {:?}", opt); - let mut config = HashMap::new(); - config.insert( - BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_owned(), - format!("{}", opt.shuffle_partitions), - ); - let config = BallistaConfig::new(config) + let config = BallistaConfig::builder() + .set( + BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, + &format!("{}", opt.shuffle_partitions), + ) + .build() .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + let ctx = BallistaContext::remote(opt.host.unwrap().as_str(), opt.port.unwrap(), &config); From 1e9e0189989b28cfa641eb443c6c84374183d996 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 11 Jul 2021 08:28:47 -0600 Subject: [PATCH 5/5] Fix issues --- ballista/rust/core/src/config.rs | 9 +++++---- ballista/rust/scheduler/src/lib.rs | 7 +++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs index 3cf5fb8bbe446..dcc0bdb06cded 100644 --- a/ballista/rust/core/src/config.rs +++ b/ballista/rust/core/src/config.rs @@ -60,12 +60,13 @@ pub struct BallistaConfigBuilder { impl Default for BallistaConfigBuilder { /// Create a new config builder fn default() -> Self { - Self { settings: HashMap::new() } + Self { + settings: HashMap::new(), + } } } impl BallistaConfigBuilder { - /// Create a new config with an additional setting pub fn set(&self, k: &str, v: &str) -> Self { let mut settings = self.settings.clone(); @@ -86,7 +87,6 @@ pub struct BallistaConfig { } impl BallistaConfig { - /// Create a default configuration pub fn new() -> Result { Self::with_settings(HashMap::new()) @@ -174,7 +174,8 @@ mod tests { #[test] fn custom_config_invalid() -> Result<()> { let config = BallistaConfig::builder() - .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "true").build(); + .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "true") + .build(); assert!(config.is_err()); assert_eq!("General(\"Failed to parse user-supplied value 'ballista.shuffle.partitions' for configuration setting 'true': ParseIntError { kind: InvalidDigit }\")", format!("{:?}", config.unwrap_err())); Ok(()) diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index aabc32c0df593..905437d4d980f 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -82,7 +82,6 @@ use self::state::{ConfigBackendClient, SchedulerState}; use ballista_core::config::BallistaConfig; use ballista_core::utils::create_datafusion_context; use datafusion::physical_plan::parquet::ParquetExec; -use std::collections::HashMap; use std::time::{Instant, SystemTime, UNIX_EPOCH}; #[derive(Clone)] @@ -298,11 +297,11 @@ impl SchedulerGrpc for SchedulerServer { } = request.into_inner() { // parse config - let mut config = HashMap::new(); + let mut config_builder = BallistaConfig::builder(); for kv_pair in &settings { - config.insert(kv_pair.key.clone(), kv_pair.value.clone()); + config_builder = config_builder.set(&kv_pair.key, &kv_pair.value); } - let config = BallistaConfig::new(config).map_err(|e| { + let config = config_builder.build().map_err(|e| { let msg = format!("Could not parse configs: {}", e); error!("{}", msg); tonic::Status::internal(msg)