From f9861a869385c3294cd3d0705cb3f39db0a6d541 Mon Sep 17 00:00:00 2001 From: Ximo Guanter Date: Mon, 26 Apr 2021 08:29:39 +0200 Subject: [PATCH 1/2] Make external hostname in executor optional --- ballista/rust/core/proto/ballista.proto | 12 +- ballista/rust/core/src/client.rs | 1 - .../src/execution_plans/unresolved_shuffle.rs | 1 - .../rust/executor/executor_config_spec.toml | 3 +- ballista/rust/executor/src/execution_loop.rs | 5 +- ballista/rust/executor/src/flight_service.rs | 14 +- ballista/rust/executor/src/lib.rs | 31 --- ballista/rust/executor/src/main.rs | 48 +++-- ballista/rust/scheduler/src/lib.rs | 59 ++++-- ballista/rust/scheduler/src/main.rs | 13 +- ballista/rust/scheduler/src/planner.rs | 177 +----------------- benchmarks/docker-compose.yaml | 19 +- 12 files changed, 113 insertions(+), 270 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 5733921bc92fb..b6bc5d09c3925 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -514,6 +514,16 @@ message ExecutorMetadata { uint32 port = 3; } +message ExecutorRegistration { + string id = 1; + // "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/danburkert/prost/issues/430) + // this syntax is ugly but is binary compatible with the "optional" keyword (see https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3) + oneof optional_host { + string host = 2; + } + uint32 port = 3; +} + message GetExecutorMetadataParams {} message GetExecutorMetadataResult { @@ -542,7 +552,7 @@ message TaskStatus { } message PollWorkParams { - ExecutorMetadata metadata = 1; + ExecutorRegistration metadata = 1; bool can_accept_task = 2; // All tasks must be reported until they reach the failed or completed state repeated TaskStatus task_status = 3; diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs index f64f95f7cfe25..1d0fedca7b4ef 100644 --- a/ballista/rust/core/src/client.rs +++ b/ballista/rust/core/src/client.rs @@ -58,7 +58,6 @@ pub struct BallistaClient { impl BallistaClient { /// Create a new BallistaClient to connect to the executor listening on the specified /// host and port - pub async fn try_new(host: &str, port: u16) -> Result { let addr = format!("http://{}:{}", host, port); debug!("BallistaClient connecting to {}", addr); diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs index a62a2513ff4a9..7d147d53537c4 100644 --- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs +++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::{any::Any, pin::Pin}; -use crate::client::BallistaClient; use crate::memory_stream::MemoryStream; use crate::serde::scheduler::PartitionLocation; diff --git a/ballista/rust/executor/executor_config_spec.toml b/ballista/rust/executor/executor_config_spec.toml index 2a7c96bde3aff..8d817fee9cc5c 100644 --- a/ballista/rust/executor/executor_config_spec.toml +++ b/ballista/rust/executor/executor_config_spec.toml @@ -49,8 +49,7 @@ doc = "Local IP address to bind to." [[param]] name = "external_host" type = "String" -default = "std::string::String::from(\"localhost\")" -doc = "Host name or IP address to register with scheduler so that other executors can connect to this executor." +doc = "Host name or IP address to register with scheduler so that other executors can connect to this executor. If none is provided, the scheduler will use the connecting IP address to communicate with the executor." [[param]] abbr = "p" diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs index cf641ddcc5c5e..5574a14a0915a 100644 --- a/ballista/rust/executor/src/execution_loop.rs +++ b/ballista/rust/executor/src/execution_loop.rs @@ -24,7 +24,7 @@ use datafusion::physical_plan::ExecutionPlan; use log::{debug, error, info, warn}; use tonic::transport::Channel; -use ballista_core::serde::scheduler::ExecutorMeta; +use ballista_core::serde::protobuf::ExecutorRegistration; use ballista_core::{ client::BallistaClient, serde::protobuf::{ @@ -37,10 +37,9 @@ use protobuf::CompletedTask; pub async fn poll_loop( mut scheduler: SchedulerGrpcClient, executor_client: BallistaClient, - executor_meta: ExecutorMeta, + executor_meta: ExecutorRegistration, concurrent_tasks: usize, ) { - let executor_meta: protobuf::ExecutorMetadata = executor_meta.into(); let available_tasks_slots = Arc::new(AtomicUsize::new(concurrent_tasks)); let (task_status_sender, mut task_status_receiver) = std::sync::mpsc::channel::(); diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs index 8fff3dbcade77..115e1ab0d800e 100644 --- a/ballista/rust/executor/src/flight_service.rs +++ b/ballista/rust/executor/src/flight_service.rs @@ -23,7 +23,6 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Instant; -use crate::BallistaExecutor; use ballista_core::error::BallistaError; use ballista_core::serde::decode_protobuf; use ballista_core::serde::scheduler::{Action as BallistaAction, PartitionStats}; @@ -59,12 +58,12 @@ type FlightDataReceiver = Receiver>; /// Service implementing the Apache Arrow Flight Protocol #[derive(Clone)] pub struct BallistaFlightService { - executor: Arc, + work_dir: String, } impl BallistaFlightService { - pub fn new(executor: Arc) -> Self { - Self { executor } + pub fn new(work_dir: String) -> Self { + Self { work_dir } } } @@ -103,11 +102,10 @@ impl FlightService for BallistaFlightService { ); let mut tasks: Vec>> = vec![]; - for part in partition.partition_id.clone() { - let work_dir = self.executor.config.work_dir.clone(); + for &part in &partition.partition_id { + let mut path = PathBuf::from(&self.work_dir); let partition = partition.clone(); tasks.push(tokio::spawn(async move { - let mut path = PathBuf::from(&work_dir); path.push(partition.job_id); path.push(&format!("{}", partition.stage_id)); path.push(&format!("{}", part)); @@ -208,7 +206,7 @@ impl FlightService for BallistaFlightService { // fetch a partition that was previously executed by this executor info!("FetchPartition {:?}", partition_id); - let mut path = PathBuf::from(&self.executor.config.work_dir); + let mut path = PathBuf::from(&self.work_dir); path.push(&partition_id.job_id); path.push(&format!("{}", partition_id.stage_id)); path.push(&format!("{}", partition_id.partition_id)); diff --git a/ballista/rust/executor/src/lib.rs b/ballista/rust/executor/src/lib.rs index 3d7bbaca3f1f0..08646ebda6b7f 100644 --- a/ballista/rust/executor/src/lib.rs +++ b/ballista/rust/executor/src/lib.rs @@ -19,34 +19,3 @@ pub mod collect; pub mod flight_service; - -#[derive(Debug, Clone)] -pub struct ExecutorConfig { - pub(crate) host: String, - pub(crate) port: u16, - /// Directory for temporary files, such as IPC files - pub(crate) work_dir: String, - pub(crate) concurrent_tasks: usize, -} - -impl ExecutorConfig { - pub fn new(host: &str, port: u16, work_dir: &str, concurrent_tasks: usize) -> Self { - Self { - host: host.to_owned(), - port, - work_dir: work_dir.to_owned(), - concurrent_tasks, - } - } -} - -#[allow(dead_code)] -pub struct BallistaExecutor { - pub(crate) config: ExecutorConfig, -} - -impl BallistaExecutor { - pub fn new(config: ExecutorConfig) -> Self { - Self { config } - } -} diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index 9c8d466add4f4..4a3ae4ef9d48f 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -17,7 +17,10 @@ //! Ballista Rust executor binary. -use std::sync::Arc; +use std::{ + net::{IpAddr, Ipv4Addr}, + sync::Arc, +}; use anyhow::{Context, Result}; use arrow_flight::flight_service_server::FlightServiceServer; @@ -28,15 +31,17 @@ use tonic::transport::Server; use uuid::Uuid; use ballista_core::{ - client::BallistaClient, serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient, + client::BallistaClient, + serde::protobuf::{ + executor_registration, scheduler_grpc_client::SchedulerGrpcClient, + ExecutorRegistration, + }, }; use ballista_core::{ print_version, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer, - serde::scheduler::ExecutorMeta, BALLISTA_VERSION, -}; -use ballista_executor::{ - flight_service::BallistaFlightService, BallistaExecutor, ExecutorConfig, + BALLISTA_VERSION, }; +use ballista_executor::flight_service::BallistaFlightService; use ballista_scheduler::{state::StandaloneClient, SchedulerServer}; use config::prelude::*; @@ -80,7 +85,7 @@ async fn main() -> Result<()> { .with_context(|| format!("Could not parse address: {}", addr))?; let scheduler_host = if opt.local { - external_host.to_owned() + "localhost".to_string() } else { opt.scheduler_host }; @@ -94,14 +99,16 @@ async fn main() -> Result<()> { .into_string() .unwrap(), ); - let config = - ExecutorConfig::new(&external_host, port, &work_dir, opt.concurrent_tasks); - info!("Running with config: {:?}", config); + info!("Running with config:"); + info!("work_dir: {}", work_dir); + info!("concurrent_tasks: {}", opt.concurrent_tasks); - let executor_meta = ExecutorMeta { + let executor_meta = ExecutorRegistration { id: Uuid::new_v4().to_string(), // assign this executor a unique ID - host: external_host.clone(), - port, + optional_host: external_host + .clone() + .map(executor_registration::OptionalHost::Host), + port: port as u32, }; if opt.local { @@ -117,8 +124,9 @@ async fn main() -> Result<()> { let server = SchedulerGrpcServer::new(SchedulerServer::new( Arc::new(client), "ballista".to_string(), + IpAddr::V4(Ipv4Addr::LOCALHOST), )); - let addr = format!("{}:{}", bind_host, scheduler_port); + let addr = format!("localhost:{}", scheduler_port); let addr = addr .parse() .with_context(|| format!("Could not parse {}", addr))?; @@ -158,8 +166,7 @@ async fn main() -> Result<()> { let scheduler = SchedulerGrpcClient::connect(scheduler_url) .await .context("Could not connect to scheduler")?; - let executor = Arc::new(BallistaExecutor::new(config)); - let service = BallistaFlightService::new(executor); + let service = BallistaFlightService::new(work_dir); let server = FlightServiceServer::new(service); info!( @@ -167,7 +174,14 @@ async fn main() -> Result<()> { BALLISTA_VERSION, addr ); let server_future = tokio::spawn(Server::builder().add_service(server).serve(addr)); - let client = BallistaClient::try_new(&external_host, port).await?; + let client_host = external_host.as_deref().unwrap_or_else(|| { + if bind_host == "0.0.0.0" { + "localhost" + } else { + &bind_host + } + }); + let client = BallistaClient::try_new(client_host, port).await?; tokio::spawn(execution_loop::poll_loop( scheduler, client, diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index a675153897be3..3dc8df29bd038 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -24,16 +24,16 @@ pub mod state; #[cfg(test)] pub mod test_utils; -use std::fmt; use std::{convert::TryInto, sync::Arc}; +use std::{fmt, net::IpAddr}; use ballista_core::serde::protobuf::{ - execute_query_params::Query, job_status, scheduler_grpc_server::SchedulerGrpc, - ExecuteQueryParams, ExecuteQueryResult, FailedJob, FilePartitionMetadata, FileType, - GetExecutorMetadataParams, GetExecutorMetadataResult, GetFileMetadataParams, - GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus, - PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, TaskDefinition, - TaskStatus, + execute_query_params::Query, executor_registration::OptionalHost, job_status, + scheduler_grpc_server::SchedulerGrpc, ExecuteQueryParams, ExecuteQueryResult, + FailedJob, FilePartitionMetadata, FileType, GetExecutorMetadataParams, + GetExecutorMetadataResult, GetFileMetadataParams, GetFileMetadataResult, + GetJobStatusParams, GetJobStatusResult, JobStatus, PartitionId, PollWorkParams, + PollWorkResult, QueuedJob, RunningJob, TaskDefinition, TaskStatus, }; use ballista_core::serde::scheduler::ExecutorMeta; @@ -71,13 +71,18 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH}; #[derive(Clone)] pub struct SchedulerServer { + caller_ip: IpAddr, state: Arc, start_time: u128, version: String, } impl SchedulerServer { - pub fn new(config: Arc, namespace: String) -> Self { + pub fn new( + config: Arc, + namespace: String, + caller_ip: IpAddr, + ) -> Self { const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION"); let state = Arc::new(SchedulerState::new(config, namespace)); let state_clone = state.clone(); @@ -86,6 +91,7 @@ impl SchedulerServer { tokio::spawn(async move { state_clone.synchronize_job_status_loop().await }); Self { + caller_ip, state, start_time: SystemTime::now() .duration_since(UNIX_EPOCH) @@ -131,7 +137,16 @@ impl SchedulerGrpc for SchedulerServer { } = request.into_inner() { debug!("Received poll_work request for {:?}", metadata); - let metadata: ExecutorMeta = metadata.into(); + let metadata: ExecutorMeta = ExecutorMeta { + id: metadata.id, + host: metadata + .optional_host + .map(|h| match h { + OptionalHost::Host(host) => host, + }) + .unwrap_or_else(|| self.caller_ip.to_string()), + port: metadata.port as u16, + }; let mut lock = self.state.lock().await.map_err(|e| { let msg = format!("Could not lock the state: {}", e); error!("{}", msg); @@ -359,12 +374,7 @@ impl SchedulerGrpc for SchedulerServer { job_id_spawn, e ); } - let mut planner = fail_job!(DistributedPlanner::try_new(executors) - .map_err(|e| { - let msg = format!("Could not create distributed planner: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })); + let mut planner = DistributedPlanner::new(); let stages = fail_job!(planner .plan_query_stages(&job_id_spawn, plan) .map_err(|e| { @@ -433,12 +443,17 @@ impl SchedulerGrpc for SchedulerServer { #[cfg(all(test, feature = "sled"))] mod test { - use std::sync::Arc; + use std::{ + net::{IpAddr, Ipv4Addr}, + sync::Arc, + }; use tonic::Request; use ballista_core::error::BallistaError; - use ballista_core::serde::protobuf::{ExecutorMetadata, PollWorkParams}; + use ballista_core::serde::protobuf::{ + executor_registration::OptionalHost, ExecutorRegistration, PollWorkParams, + }; use super::{ state::{SchedulerState, StandaloneClient}, @@ -449,11 +464,15 @@ mod test { async fn test_poll_work() -> Result<(), BallistaError> { let state = Arc::new(StandaloneClient::try_new_temporary()?); let namespace = "default"; - let scheduler = SchedulerServer::new(state.clone(), namespace.to_owned()); + let scheduler = SchedulerServer::new( + state.clone(), + namespace.to_owned(), + IpAddr::V4(Ipv4Addr::LOCALHOST), + ); let state = SchedulerState::new(state, namespace.to_string()); - let exec_meta = ExecutorMetadata { + let exec_meta = ExecutorRegistration { id: "abc".to_owned(), - host: "".to_owned(), + optional_host: Some(OptionalHost::Host("".to_owned())), port: 0, }; let request: Request = Request::new(PollWorkParams { diff --git a/ballista/rust/scheduler/src/main.rs b/ballista/rust/scheduler/src/main.rs index 205023a4c34c6..713103fcf0439 100644 --- a/ballista/rust/scheduler/src/main.rs +++ b/ballista/rust/scheduler/src/main.rs @@ -19,7 +19,7 @@ use anyhow::{Context, Result}; use futures::future::{self, Either, TryFutureExt}; -use hyper::{service::make_service_fn, Server}; +use hyper::{server::conn::AddrStream, service::make_service_fn, Server}; use std::convert::Infallible; use std::{net::SocketAddr, sync::Arc}; use tonic::transport::Server as TonicServer; @@ -62,17 +62,20 @@ async fn start_server( BALLISTA_VERSION, addr ); - let scheduler_server = - SchedulerServer::new(config_backend.clone(), namespace.clone()); Ok(Server::bind(&addr) - .serve(make_service_fn(move |_| { + .serve(make_service_fn(move |request: &AddrStream| { + let scheduler_server = SchedulerServer::new( + config_backend.clone(), + namespace.clone(), + request.remote_addr().ip(), + ); let scheduler_grpc_server = SchedulerGrpcServer::new(scheduler_server.clone()); let mut tonic = TonicServer::builder() .add_service(scheduler_grpc_server) .into_service(); - let mut warp = warp::service(get_routes(scheduler_server.clone())); + let mut warp = warp::service(get_routes(scheduler_server)); future::ok::<_, Infallible>(tower::service_fn( move |req: hyper::Request| { diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index e791fa8b54597..20dd0d36d9ab9 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -19,17 +19,11 @@ //! //! This code is EXPERIMENTAL and still under development -use std::pin::Pin; +use std::collections::HashMap; use std::sync::Arc; -use std::time::Instant; -use std::{collections::HashMap, future::Future}; -use ballista_core::client::BallistaClient; use ballista_core::datasource::DfTableAdapter; use ballista_core::error::{BallistaError, Result}; -use ballista_core::serde::scheduler::ExecutorMeta; -use ballista_core::serde::scheduler::PartitionId; -use ballista_core::utils::format_plan; use ballista_core::{ execution_plans::{QueryStageExec, ShuffleReaderExec, UnresolvedShuffleExec}, serde::scheduler::PartitionLocation, @@ -42,59 +36,27 @@ use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec use datafusion::physical_plan::hash_join::HashJoinExec; use datafusion::physical_plan::merge::MergeExec; use datafusion::physical_plan::ExecutionPlan; -use log::{debug, info}; -use tokio::task::JoinHandle; +use log::info; -type SendableExecutionPlan = - Pin>> + Send>>; type PartialQueryStageResult = (Arc, Vec>); pub struct DistributedPlanner { - executors: Vec, next_stage_id: usize, } impl DistributedPlanner { - pub fn try_new(executors: Vec) -> Result { - if executors.is_empty() { - Err(BallistaError::General( - "DistributedPlanner requires at least one executor".to_owned(), - )) - } else { - Ok(Self { - executors, - next_stage_id: 0, - }) - } + pub fn new() -> Self { + Self { next_stage_id: 0 } } } -impl DistributedPlanner { - /// Execute a distributed query against a cluster, leaving the final results on the - /// executors. The [ExecutionPlan] returned by this method is guaranteed to be a - /// [ShuffleReaderExec] that can be used to fetch the final results from the executors - /// in parallel. - pub async fn execute_distributed_query( - &mut self, - job_id: String, - execution_plan: Arc, - ) -> Result> { - let now = Instant::now(); - let execution_plans = self.plan_query_stages(&job_id, execution_plan)?; - - info!( - "DistributedPlanner created {} execution plans in {} seconds:", - execution_plans.len(), - now.elapsed().as_secs() - ); - - for plan in &execution_plans { - info!("{}", format_plan(plan.as_ref(), 0)?); - } - - execute(execution_plans, self.executors.clone()).await +impl Default for DistributedPlanner { + fn default() -> Self { + Self::new() } +} +impl DistributedPlanner { /// Returns a vector of ExecutionPlans, where the root node is a [QueryStageExec]. /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec]. /// A [QueryStageExec] is created whenever the partitioning changes. @@ -221,38 +183,6 @@ impl DistributedPlanner { } } -fn execute( - stages: Vec>, - executors: Vec, -) -> SendableExecutionPlan { - Box::pin(async move { - let mut partition_locations: HashMap> = - HashMap::new(); - let mut result_partition_locations = vec![]; - for stage in &stages { - debug!("execute() {}", &format!("{:?}", stage)[0..60]); - let stage = remove_unresolved_shuffles(stage.as_ref(), &partition_locations)?; - let stage = stage.as_any().downcast_ref::().unwrap(); - result_partition_locations = execute_query_stage( - &stage.job_id.clone(), - stage.stage_id, - stage.children()[0].clone(), - executors.clone(), - ) - .await?; - partition_locations - .insert(stage.stage_id, result_partition_locations.clone()); - } - - let shuffle_reader: Arc = - Arc::new(ShuffleReaderExec::try_new( - result_partition_locations, - stages.last().unwrap().schema(), - )?); - Ok(shuffle_reader) - }) -} - pub fn remove_unresolved_shuffles( stage: &dyn ExecutionPlan, partition_locations: &HashMap>, @@ -298,88 +228,6 @@ fn create_query_stage( Ok(Arc::new(QueryStageExec::try_new(job_id, stage_id, plan)?)) } -/// Execute a query stage by sending each partition to an executor -async fn execute_query_stage( - job_id: &str, - stage_id: usize, - plan: Arc, - executors: Vec, -) -> Result> { - info!( - "execute_query_stage() stage_id={}\n{}", - stage_id, - format_plan(plan.as_ref(), 0)? - ); - - let partition_count = plan.output_partitioning().partition_count(); - - let num_chunks = partition_count / executors.len(); - let num_chunks = num_chunks.max(1); - let partition_chunks: Vec> = (0..partition_count) - .collect::>() - .chunks(num_chunks) - .map(|r| r.to_vec()) - .collect(); - - info!( - "Executing query stage with {} chunks of partition ranges", - partition_chunks.len() - ); - - let mut executions: Vec>>> = - Vec::with_capacity(partition_count); - for i in 0..partition_chunks.len() { - let plan = plan.clone(); - let executor_meta = executors[i % executors.len()].clone(); - let partition_ids = partition_chunks[i].to_vec(); - let job_id = job_id.to_owned(); - executions.push(tokio::spawn(async move { - let mut client = - BallistaClient::try_new(&executor_meta.host, executor_meta.port).await?; - let stats = client - .execute_partition(job_id.clone(), stage_id, partition_ids.clone(), plan) - .await?; - - Ok(partition_ids - .iter() - .map(|part| PartitionLocation { - partition_id: PartitionId::new(&job_id, stage_id, *part), - executor_meta: executor_meta.clone(), - partition_stats: *stats[*part].statistics(), - }) - .collect()) - })); - } - - // wait for all partitions to complete - let results = futures::future::join_all(executions).await; - - // check for errors - let mut meta = Vec::with_capacity(partition_count); - for result in results { - match result { - Ok(partition_result) => { - let final_result = partition_result?; - debug!("Query stage partition result: {:?}", final_result); - meta.extend(final_result); - } - Err(e) => { - return Err(BallistaError::General(format!( - "Query stage {} failed: {:?}", - stage_id, e - ))) - } - } - } - - debug!( - "execute_query_stage() stage_id={} produced {:?}", - stage_id, meta - ); - - Ok(meta) -} - #[cfg(test)] mod test { use crate::planner::DistributedPlanner; @@ -387,7 +235,6 @@ mod test { use ballista_core::error::BallistaError; use ballista_core::execution_plans::UnresolvedShuffleExec; use ballista_core::serde::protobuf; - use ballista_core::serde::scheduler::ExecutorMeta; use ballista_core::utils::format_plan; use datafusion::physical_plan::hash_aggregate::HashAggregateExec; use datafusion::physical_plan::merge::MergeExec; @@ -420,11 +267,7 @@ mod test { let plan = ctx.optimize(&plan)?; let plan = ctx.create_physical_plan(&plan)?; - let mut planner = DistributedPlanner::try_new(vec![ExecutorMeta { - id: "".to_string(), - host: "".to_string(), - port: 0, - }])?; + let mut planner = DistributedPlanner::new(); let job_uuid = Uuid::new_v4(); let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?; for stage in &stages { diff --git a/benchmarks/docker-compose.yaml b/benchmarks/docker-compose.yaml index 6015dbac2cc25..bbb31078cf0a5 100644 --- a/benchmarks/docker-compose.yaml +++ b/benchmarks/docker-compose.yaml @@ -14,7 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -version: '2.0' +version: '2.2' services: etcd: image: quay.io/coreos/etcd:v3.4.9 @@ -28,18 +28,10 @@ services: - ./data:/data depends_on: - etcd - ballista-executor-1: + ballista-executor: image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT - command: "/executor --bind-host 0.0.0.0 --port 50051 --external-host ballista-executor-1 --scheduler-host ballista-scheduler" - environment: - - RUST_LOG=info - volumes: - - ./data:/data - depends_on: - - ballista-scheduler - ballista-executor-2: - image: ballistacompute/ballista-rust:0.5.0-SNAPSHOT - command: "/executor --bind-host 0.0.0.0 --port 50052 --external-host ballista-executor-2 --scheduler-host ballista-scheduler" + command: "/executor --bind-host 0.0.0.0 --port 50051 --scheduler-host ballista-scheduler" + scale: 2 environment: - RUST_LOG=info volumes: @@ -57,6 +49,5 @@ services: - ../..:/ballista depends_on: - ballista-scheduler - - ballista-executor-1 - - ballista-executor-2 + - ballista-executor From 1898ade4d052b16afd67bcff76f3c0315f7c4604 Mon Sep 17 00:00:00 2001 From: Ximo Guanter Date: Sat, 1 May 2021 21:50:44 +0200 Subject: [PATCH 2/2] add comment --- ballista/rust/executor/src/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index 4a3ae4ef9d48f..ad7c001e654af 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -176,6 +176,8 @@ async fn main() -> Result<()> { let server_future = tokio::spawn(Server::builder().add_service(server).serve(addr)); let client_host = external_host.as_deref().unwrap_or_else(|| { if bind_host == "0.0.0.0" { + // If the executor is being bound to "0.0.0.0" (which means use all ips in all eth devices) + // then use "localhost" to connect to itself through the BallistaClient "localhost" } else { &bind_host