Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 39 additions & 15 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ use std::sync::{Arc, Mutex};
use std::{collections::HashMap, convert::TryInto};
use std::{fs, time::Duration};

use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
use ballista_core::serde::protobuf::PartitionLocation;
use ballista_core::config::BallistaConfig;
use ballista_core::serde::protobuf::{
execute_query_params::Query, job_status, ExecuteQueryParams, GetJobStatusParams,
GetJobStatusResult,
execute_query_params::Query, job_status, scheduler_grpc_client::SchedulerGrpcClient,
ExecuteQueryParams, GetJobStatusParams, GetJobStatusResult, KeyValuePair,
PartitionLocation,
};
use ballista_core::utils::WrappedStream;
use ballista_core::{
client::BallistaClient, datasource::DfTableAdapter, utils::create_datafusion_context,
utils::WrappedStream,
};

use datafusion::arrow::datatypes::Schema;
Expand All @@ -45,6 +45,8 @@ use futures::StreamExt;
use log::{error, info};

struct BallistaContextState {
/// Ballista configuration
config: BallistaConfig,
/// Scheduler host
scheduler_host: String,
/// Scheduler port
Expand All @@ -54,8 +56,13 @@ struct BallistaContextState {
}

impl BallistaContextState {
pub fn new(scheduler_host: String, scheduler_port: u16) -> Self {
pub fn new(
scheduler_host: String,
scheduler_port: u16,
config: &BallistaConfig,
) -> Self {
Self {
config: config.clone(),
scheduler_host,
scheduler_port,
tables: HashMap::new(),
Expand All @@ -64,6 +71,7 @@ impl BallistaContextState {

#[cfg(feature = "standalone")]
pub async fn new_standalone(
config: &BallistaConfig,
concurrent_tasks: usize,
) -> ballista_core::error::Result<Self> {
info!("Running in local mode. Scheduler will be run in-proc");
Expand All @@ -87,11 +95,16 @@ impl BallistaContextState {

ballista_executor::new_standalone_executor(scheduler, concurrent_tasks).await?;
Ok(Self {
config: config.clone(),
scheduler_host: "localhost".to_string(),
scheduler_port: addr.port(),
tables: HashMap::new(),
})
}

pub fn config(&self) -> &BallistaConfig {
&self.config
}
}

pub struct BallistaContext {
Expand All @@ -100,8 +113,8 @@ pub struct BallistaContext {

impl BallistaContext {
/// Create a context for executing queries against a remote Ballista scheduler instance
pub fn remote(host: &str, port: u16) -> Self {
let state = BallistaContextState::new(host.to_owned(), port);
pub fn remote(host: &str, port: u16, config: &BallistaConfig) -> Self {
let state = BallistaContextState::new(host.to_owned(), port, config);

Self {
state: Arc::new(Mutex::new(state)),
Expand All @@ -110,9 +123,11 @@ impl BallistaContext {

#[cfg(feature = "standalone")]
pub async fn standalone(
config: &BallistaConfig,
concurrent_tasks: usize,
) -> ballista_core::error::Result<Self> {
let state = BallistaContextState::new_standalone(concurrent_tasks).await?;
let state =
BallistaContextState::new_standalone(config, concurrent_tasks).await?;

Ok(Self {
state: Arc::new(Mutex::new(state)),
Expand All @@ -127,7 +142,7 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = create_datafusion_context();
let mut ctx = create_datafusion_context(&self.state.lock().unwrap().config());
let df = ctx.read_parquet(path.to_str().unwrap())?;
Ok(df)
}
Expand All @@ -144,7 +159,7 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = create_datafusion_context();
let mut ctx = create_datafusion_context(&self.state.lock().unwrap().config());
let df = ctx.read_csv(path.to_str().unwrap(), options)?;
Ok(df)
}
Expand Down Expand Up @@ -176,9 +191,9 @@ impl BallistaContext {
/// Create a DataFrame from a SQL statement
pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
// use local DataFusion context for now but later this might call the scheduler
let mut ctx = create_datafusion_context();
// register tables
let state = self.state.lock().unwrap();
let mut ctx = create_datafusion_context(&state.config());
for (name, plan) in &state.tables {
let plan = ctx.optimize(plan)?;
let execution_plan = ctx.create_physical_plan(&plan)?;
Expand Down Expand Up @@ -217,10 +232,11 @@ impl BallistaContext {
&self,
plan: &LogicalPlan,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
let scheduler_url = {
let (scheduler_url, config) = {
let state = self.state.lock().unwrap();

format!("http://{}:{}", state.scheduler_host, state.scheduler_port)
let scheduler_url =
format!("http://{}:{}", state.scheduler_host, state.scheduler_port);
(scheduler_url, state.config.clone())
};

info!("Connecting to Ballista scheduler at {}", scheduler_url);
Expand All @@ -238,6 +254,14 @@ impl BallistaContext {
.try_into()
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?,
)),
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
})
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?
Expand Down
2 changes: 2 additions & 0 deletions ballista/rust/client/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
//! Ballista Prelude (common imports)

pub use crate::context::BallistaContext;
pub use ballista_core::config::BallistaConfig;
pub use ballista_core::config::BALLISTA_DEFAULT_SHUFFLE_PARTITIONS;
pub use ballista_core::error::{BallistaError, Result};

pub use futures::StreamExt;
4 changes: 3 additions & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,9 @@ message ExecuteQueryParams {
oneof query {
LogicalPlanNode logical_plan = 1;
string sql = 2;
}}
}
repeated KeyValuePair settings = 3;
}

message ExecuteSqlParams {
string sql = 1;
Expand Down
183 changes: 183 additions & 0 deletions ballista/rust/core/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

//! Ballista configuration

use std::collections::HashMap;

use crate::error::{BallistaError, Result};

use datafusion::arrow::datatypes::DataType;
use log::warn;

pub const BALLISTA_DEFAULT_SHUFFLE_PARTITIONS: &str = "ballista.shuffle.partitions";

/// Configuration option meta-data
#[derive(Debug, Clone)]
pub struct ConfigEntry {
name: String,
description: String,
data_type: DataType,
default_value: Option<String>,
}

impl ConfigEntry {
fn new(
name: String,
description: String,
data_type: DataType,
default_value: Option<String>,
) -> Self {
Self {
name,
description,
data_type,
default_value,
}
}
}

/// Ballista configuration builder
pub struct BallistaConfigBuilder {
settings: HashMap<String, String>,
}

impl Default for BallistaConfigBuilder {
/// Create a new config builder
fn default() -> Self {
Self {
settings: HashMap::new(),
}
}
}

impl BallistaConfigBuilder {
/// Create a new config with an additional setting
pub fn set(&self, k: &str, v: &str) -> Self {
let mut settings = self.settings.clone();
settings.insert(k.to_owned(), v.to_owned());
Self { settings }
}

pub fn build(&self) -> Result<BallistaConfig> {
BallistaConfig::with_settings(self.settings.clone())
}
}

/// Ballista configuration
#[derive(Debug, Clone)]
pub struct BallistaConfig {
/// Settings stored in map for easy serde
settings: HashMap<String, String>,
}

impl BallistaConfig {
/// Create a default configuration
pub fn new() -> Result<Self> {
Self::with_settings(HashMap::new())
}

/// Create a configuration builder
pub fn builder() -> BallistaConfigBuilder {
BallistaConfigBuilder::default()
}

/// Create a new configuration based on key-value pairs
pub fn with_settings(settings: HashMap<String, String>) -> Result<Self> {
let supported_entries = BallistaConfig::valid_entries();
for (name, entry) in &supported_entries {
if let Some(v) = settings.get(name) {
// validate that we can parse the user-supplied value
let _ = v.parse::<usize>().map_err(|e| BallistaError::General(format!("Failed to parse user-supplied value '{}' for configuration setting '{}': {:?}", name, v, e)))?;
} else if let Some(v) = entry.default_value.clone() {
let _ = v.parse::<usize>().map_err(|e| BallistaError::General(format!("Failed to parse default value '{}' for configuration setting '{}': {:?}", name, v, e)))?;
} else {
return Err(BallistaError::General(format!(
"No value specified for mandatory configuration setting '{}'",
name
)));
}
}

Ok(Self { settings })
}

/// All available configuration options
pub fn valid_entries() -> HashMap<String, ConfigEntry> {
let entries = vec![
ConfigEntry::new(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_string(),
"Sets the default number of partitions to create when repartitioning query stages".to_string(),
DataType::UInt16, Some("2".to_string())),
];
entries
.iter()
.map(|e| (e.name.clone(), e.clone()))
.collect::<HashMap<_, _>>()
}

pub fn settings(&self) -> &HashMap<String, String> {
&self.settings
}

pub fn default_shuffle_partitions(&self) -> usize {
self.get_usize_setting(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS)
}

fn get_usize_setting(&self, key: &str) -> usize {
if let Some(v) = self.settings.get(key) {
// infallible because we validate all configs in the constructor
v.parse().unwrap()
} else {
let entries = Self::valid_entries();
// infallible because we validate all configs in the constructor
let v = entries.get(key).unwrap().default_value.as_ref().unwrap();
v.parse().unwrap()
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn default_config() -> Result<()> {
let config = BallistaConfig::new()?;
assert_eq!(2, config.default_shuffle_partitions());
Ok(())
}

#[test]
fn custom_config() -> Result<()> {
let config = BallistaConfig::builder()
.set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "123")
.build()?;
assert_eq!(123, config.default_shuffle_partitions());
Ok(())
}

#[test]
fn custom_config_invalid() -> Result<()> {
let config = BallistaConfig::builder()
.set(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, "true")
.build();
assert!(config.is_err());
assert_eq!("General(\"Failed to parse user-supplied value 'ballista.shuffle.partitions' for configuration setting 'true': ParseIntError { kind: InvalidDigit }\")", format!("{:?}", config.unwrap_err()));
Ok(())
}
}
1 change: 1 addition & 0 deletions ballista/rust/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub fn print_version() {
}

pub mod client;
pub mod config;
pub mod datasource;
pub mod error;
pub mod execution_plans;
Expand Down
6 changes: 4 additions & 2 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionStats;

use crate::config::BallistaConfig;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::{
array::{
Expand Down Expand Up @@ -233,8 +234,9 @@ fn build_exec_plan_diagram(
}

/// Create a DataFusion context that is compatible with Ballista
pub fn create_datafusion_context() -> ExecutionContext {
let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins
pub fn create_datafusion_context(config: &BallistaConfig) -> ExecutionContext {
let config =
ExecutionConfig::new().with_concurrency(config.default_shuffle_partitions());
ExecutionContext::with_config(config)
}

Expand Down
Loading