diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index aca712e1d8782..b8210cbc26266 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -23,15 +23,15 @@ use std::sync::{Arc, Mutex}; use std::{collections::HashMap, convert::TryInto}; use std::{fs, time::Duration}; -use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient; -use ballista_core::serde::protobuf::PartitionLocation; +use ballista_core::config::BallistaConfig; use ballista_core::serde::protobuf::{ - execute_query_params::Query, job_status, ExecuteQueryParams, GetJobStatusParams, - GetJobStatusResult, + execute_query_params::Query, job_status, scheduler_grpc_client::SchedulerGrpcClient, + ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair, + PartitionLocation, }; -use ballista_core::utils::WrappedStream; use ballista_core::{ client::BallistaClient, datasource::DfTableAdapter, utils::create_datafusion_context, + utils::WrappedStream, }; use datafusion::arrow::datatypes::Schema; @@ -45,6 +45,8 @@ use futures::StreamExt; use log::{error, info}; struct BallistaContextState { + /// Ballista configuration + config: BallistaConfig, /// Scheduler host scheduler_host: String, /// Scheduler port @@ -54,8 +56,13 @@ struct BallistaContextState { } impl BallistaContextState { - pub fn new(scheduler_host: String, scheduler_port: u16) -> Self { + pub fn new( + scheduler_host: String, + scheduler_port: u16, + config: &BallistaConfig, + ) -> Self { Self { + config: config.clone(), scheduler_host, scheduler_port, tables: HashMap::new(), @@ -64,6 +71,7 @@ impl BallistaContextState { #[cfg(feature = "standalone")] pub async fn new_standalone( + config: &BallistaConfig, concurrent_tasks: usize, ) -> ballista_core::error::Result { info!("Running in local mode. Scheduler will be run in-proc"); @@ -87,11 +95,16 @@ impl BallistaContextState { ballista_executor::new_standalone_executor(scheduler, concurrent_tasks).await?; Ok(Self { + config: config.clone(), scheduler_host: "localhost".to_string(), scheduler_port: addr.port(), tables: HashMap::new(), }) } + + pub fn config(&self) -> &BallistaConfig { + &self.config + } } pub struct BallistaContext { @@ -100,8 +113,8 @@ pub struct BallistaContext { impl BallistaContext { /// Create a context for executing queries against a remote Ballista scheduler instance - pub fn remote(host: &str, port: u16) -> Self { - let state = BallistaContextState::new(host.to_owned(), port); + pub fn remote(host: &str, port: u16, config: &BallistaConfig) -> Self { + let state = BallistaContextState::new(host.to_owned(), port, config); Self { state: Arc::new(Mutex::new(state)), @@ -110,9 +123,11 @@ impl BallistaContext { #[cfg(feature = "standalone")] pub async fn standalone( + config: &BallistaConfig, concurrent_tasks: usize, ) -> ballista_core::error::Result { - let state = BallistaContextState::new_standalone(concurrent_tasks).await?; + let state = + BallistaContextState::new_standalone(config, concurrent_tasks).await?; Ok(Self { state: Arc::new(Mutex::new(state)), @@ -127,7 +142,7 @@ impl BallistaContext { let path = fs::canonicalize(&path)?; // use local DataFusion context for now but later this might call the scheduler - let mut ctx = create_datafusion_context(); + let mut ctx = create_datafusion_context(&self.state.lock().unwrap().config()); let df = ctx.read_parquet(path.to_str().unwrap())?; Ok(df) } @@ -144,7 +159,7 @@ impl BallistaContext { let path = fs::canonicalize(&path)?; // use local DataFusion context for now but later this might call the scheduler - let mut ctx = create_datafusion_context(); + let mut ctx = create_datafusion_context(&self.state.lock().unwrap().config()); let df = ctx.read_csv(path.to_str().unwrap(), options)?; Ok(df) } @@ -176,9 +191,9 @@ impl BallistaContext { /// Create a DataFrame from a SQL statement pub fn sql(&self, sql: &str) -> Result> { // use local DataFusion context for now but later this might call the scheduler - let mut ctx = create_datafusion_context(); // register tables let state = self.state.lock().unwrap(); + let mut ctx = create_datafusion_context(&state.config()); for (name, plan) in &state.tables { let plan = ctx.optimize(plan)?; let execution_plan = ctx.create_physical_plan(&plan)?; @@ -217,10 +232,11 @@ impl BallistaContext { &self, plan: &LogicalPlan, ) -> Result>> { - let scheduler_url = { + let (scheduler_url, config) = { let state = self.state.lock().unwrap(); - - format!("http://{}:{}", state.scheduler_host, state.scheduler_port) + let scheduler_url = + format!("http://{}:{}", state.scheduler_host, state.scheduler_port); + (scheduler_url, state.config.clone()) }; info!("Connecting to Ballista scheduler at {}", scheduler_url); @@ -238,6 +254,14 @@ impl BallistaContext { .try_into() .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?, )), + settings: config + .settings() + .iter() + .map(|(k, v)| KeyValuePair { + key: k.to_owned(), + value: v.to_owned(), + }) + .collect::>(), }) .await .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))? diff --git a/ballista/rust/client/src/prelude.rs b/ballista/rust/client/src/prelude.rs index 2f940aef4c976..d162d0c017bd4 100644 --- a/ballista/rust/client/src/prelude.rs +++ b/ballista/rust/client/src/prelude.rs @@ -18,6 +18,8 @@ //! Ballista Prelude (common imports) pub use crate::context::BallistaContext; +pub use ballista_core::config::BallistaConfig; +pub use ballista_core::config::BALLISTA_DEFAULT_SHUFFLE_PARTITIONS; pub use ballista_core::error::{BallistaError, Result}; pub use futures::StreamExt; diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 4696d21852fc2..b1c153de64c24 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -803,7 +803,9 @@ message ExecuteQueryParams { oneof query { LogicalPlanNode logical_plan = 1; string sql = 2; - }} + } + repeated KeyValuePair settings = 3; +} message ExecuteSqlParams { string sql = 1; diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs new file mode 100644 index 0000000000000..dcc0bdb06cded --- /dev/null +++ b/ballista/rust/core/src/config.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +//! Ballista configuration + +use std::collections::HashMap; + +use crate::error::{BallistaError, Result}; + +use datafusion::arrow::datatypes::DataType; +use log::warn; + +pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitions"; + +/// Configuration option meta-data +#[derive(Debug, Clone)] +pub struct ConfigEntry { + name: String, + description: String, + data_type: DataType, + default_value: Option, +} + +impl ConfigEntry { + fn new( + name: String, + description: String, + data_type: DataType, + default_value: Option, + ) -> Self { + Self { + name, + description, + data_type, + default_value, + } + } +} + +/// Ballista configuration builder +pub struct BallistaConfigBuilder { + settings: HashMap, +} + +impl Default for BallistaConfigBuilder { + /// Create a new config builder + fn default() -> Self { + Self { + settings: HashMap::new(), + } + } +} + +impl BallistaConfigBuilder { + /// Create a new config with an additional setting + pub fn set(&self, k: &str, v: &str) -> Self { + let mut settings = self.settings.clone(); + settings.insert(k.to_owned(), v.to_owned()); + Self { settings } + } + + pub fn build(&self) -> Result { + BallistaConfig::with_settings(self.settings.clone()) + } +} + +/// Ballista configuration +#[derive(Debug, Clone)] +pub struct BallistaConfig { + /// Settings stored in map for easy serde + settings: HashMap, +} + +impl BallistaConfig { + /// Create a default configuration + pub fn new() -> Result { + Self::with_settings(HashMap::new()) + } + + /// Create a configuration builder + pub fn builder() -> BallistaConfigBuilder { + BallistaConfigBuilder::default() + } + + /// Create a new configuration based on key-value pairs + pub fn with_settings(settings: HashMap) -> Result { + let supported_entries = BallistaConfig::valid_entries(); + for (name, entry) in &supported_entries { + if let Some(v) = settings.get(name) { + // validate that we can parse the user-supplied value + let _ = v.parse::().map_err(|e| BallistaError::General(format!("Failed to parse user-supplied value '{}' for configuration setting '{}': {:?}", name, v, e)))?; + } else if let Some(v) = entry.default_value.clone() { + let _ = v.parse::().map_err(|e| BallistaError::General(format!("Failed to parse default value '{}' for configuration setting '{}': {:?}", name, v, e)))?; + } else { + return Err(BallistaError::General(format!( + "No value specified for mandatory configuration setting '{}'", + name + ))); + } + } + + Ok(Self { settings }) + } + + /// All available configuration options + pub fn valid_entries() -> HashMap { + let entries = vec![ + ConfigEntry::new(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_string(), + "Sets the default number of partitions to create when repartitioning query stages".to_string(), + DataType::UInt16, Some("2".to_string())), + ]; + entries + .iter() + .map(|e| (e.name.clone(), e.clone())) + .collect::>() + } + + pub fn settings(&self) -> &HashMap { + &self.settings + } + + pub fn default_shuffle_partitions(&self) -> usize { + self.get_usize_setting(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS) + } + + fn get_usize_setting(&self, key: &str) -> usize { + if let Some(v) = self.settings.get(key) { + // infallible because we validate all configs in the constructor + v.parse().unwrap() + } else { + let entries = Self::valid_entries(); + // infallible because we validate all configs in the constructor + let v = entries.get(key).unwrap().default_value.as_ref().unwrap(); + v.parse().unwrap() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn default_config() -> Result<()> { + let config = BallistaConfig::new()?; + assert_eq!(2, config.default_shuffle_partitions()); + Ok(()) + } + + #[test] + fn custom_config() -> Result<()> { + let config = BallistaConfig::builder() + .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "123") + .build()?; + assert_eq!(123, config.default_shuffle_partitions()); + Ok(()) + } + + #[test] + fn custom_config_invalid() -> Result<()> { + let config = BallistaConfig::builder() + .set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "true") + .build(); + assert!(config.is_err()); + assert_eq!("General(\"Failed to parse user-supplied value 'ballista.shuffle.partitions' for configuration setting 'true': ParseIntError { kind: InvalidDigit }\")", format!("{:?}", config.unwrap_err())); + Ok(()) + } +} diff --git a/ballista/rust/core/src/lib.rs b/ballista/rust/core/src/lib.rs index 425dbab34c132..2a8486945ad0a 100644 --- a/ballista/rust/core/src/lib.rs +++ b/ballista/rust/core/src/lib.rs @@ -24,6 +24,7 @@ pub fn print_version() { } pub mod client; +pub mod config; pub mod datasource; pub mod error; pub mod execution_plans; diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index f7d884d502985..7e9a55af1a777 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -27,6 +27,7 @@ use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec}; use crate::memory_stream::MemoryStream; use crate::serde::scheduler::PartitionStats; +use crate::config::BallistaConfig; use datafusion::arrow::error::Result as ArrowResult; use datafusion::arrow::{ array::{ @@ -233,8 +234,9 @@ fn build_exec_plan_diagram( } /// Create a DataFusion context that is compatible with Ballista -pub fn create_datafusion_context() -> ExecutionContext { - let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins +pub fn create_datafusion_context(config: &BallistaConfig) -> ExecutionContext { + let config = + ExecutionConfig::new().with_concurrency(config.default_shuffle_partitions()); ExecutionContext::with_config(config) } diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 3bd4c03aa9c33..905437d4d980f 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -79,6 +79,7 @@ use rand::{distributions::Alphanumeric, thread_rng, Rng}; use tonic::{Request, Response}; use self::state::{ConfigBackendClient, SchedulerState}; +use ballista_core::config::BallistaConfig; use ballista_core::utils::create_datafusion_context; use datafusion::physical_plan::parquet::ParquetExec; use std::time::{Instant, SystemTime, UNIX_EPOCH}; @@ -290,7 +291,22 @@ impl SchedulerGrpc for SchedulerServer { &self, request: Request, ) -> std::result::Result, tonic::Status> { - if let ExecuteQueryParams { query: Some(query) } = request.into_inner() { + if let ExecuteQueryParams { + query: Some(query), + settings, + } = request.into_inner() + { + // parse config + let mut config_builder = BallistaConfig::builder(); + for kv_pair in &settings { + config_builder = config_builder.set(&kv_pair.key, &kv_pair.value); + } + let config = config_builder.build().map_err(|e| { + let msg = format!("Could not parse configs: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + let plan = match query { Query::LogicalPlan(logical_plan) => { // parse protobuf @@ -303,7 +319,7 @@ impl SchedulerGrpc for SchedulerServer { Query::Sql(sql) => { //TODO we can't just create a new context because we need a context that has // tables registered from previous SQL statements that have been executed - let mut ctx = create_datafusion_context(); + let mut ctx = create_datafusion_context(&config); let df = ctx.sql(&sql).map_err(|e| { let msg = format!("Error parsing SQL: {}", e); error!("{}", msg); @@ -339,7 +355,7 @@ impl SchedulerGrpc for SchedulerServer { let job_id_spawn = job_id.clone(); tokio::spawn(async move { // create physical plan using DataFusion - let datafusion_ctx = create_datafusion_context(); + let datafusion_ctx = create_datafusion_context(&config); macro_rules! fail_job { ($code :expr) => {{ match $code { diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs index aa1e2b2575aa9..5b7b685d7be9e 100644 --- a/ballista/rust/scheduler/src/test_utils.rs +++ b/ballista/rust/scheduler/src/test_utils.rs @@ -26,7 +26,8 @@ pub const TPCH_TABLES: &[&str] = &[ ]; pub fn datafusion_test_context(path: &str) -> Result { - let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins + let default_shuffle_partitions = 2; + let config = ExecutionConfig::new().with_concurrency(default_shuffle_partitions); let mut ctx = ExecutionContext::with_config(config); for table in TPCH_TABLES { let schema = get_tpch_schema(table); diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index a52b6d208cff4..169319d30beef 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -28,21 +28,21 @@ use std::{ use futures::StreamExt; use ballista::context::BallistaContext; +use ballista::prelude::{BallistaConfig, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::util::pretty; - use datafusion::datasource::parquet::ParquetTable; use datafusion::datasource::{CsvFile, MemTable, TableProvider}; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_plan::LogicalPlan; -use datafusion::physical_plan::{collect, displayable}; -use datafusion::prelude::*; - use datafusion::parquet::basic::Compression; use datafusion::parquet::file::properties::WriterProperties; use datafusion::physical_plan::display::DisplayableExecutionPlan; +use datafusion::physical_plan::{collect, displayable}; +use datafusion::prelude::*; + use structopt::StructOpt; #[cfg(feature = "snmalloc")] @@ -94,6 +94,10 @@ struct BallistaBenchmarkOpt { /// Ballista executor port #[structopt(long = "port")] port: Option, + + /// Number of shuffle partitions + #[structopt(short, long, default_value = "2")] + shuffle_partitions: usize, } #[derive(Debug, StructOpt, Clone)] @@ -252,7 +256,16 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result Result<()> { println!("Running benchmarks with the following options: {:?}", opt); - let ctx = BallistaContext::remote(opt.host.unwrap().as_str(), opt.port.unwrap()); + let config = BallistaConfig::builder() + .set( + BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, + &format!("{}", opt.shuffle_partitions), + ) + .build() + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + + let ctx = + BallistaContext::remote(opt.host.unwrap().as_str(), opt.port.unwrap(), &config); // register tables with Ballista context let path = opt.path.to_str().unwrap();