diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index 6181d731a0718..9dd9d63509c9d 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -31,7 +31,7 @@ simd = ["datafusion/simd"] [dependencies] ahash = { version = "0.7", default-features = false } -async-trait = "0.1.36" +async-trait = "0.1.41" futures = "0.3" hashbrown = "0.12" log = "0.4" diff --git a/ballista/rust/core/src/event_loop.rs b/ballista/rust/core/src/event_loop.rs new file mode 100644 index 0000000000000..225bfce532f95 --- /dev/null +++ b/ballista/rust/core/src/event_loop.rs @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use async_trait::async_trait; +use log::{error, info}; +use tokio::sync::mpsc; + +use crate::error::{BallistaError, Result}; + +#[async_trait] +pub trait EventAction: Send + Sync { + fn on_start(&self); + + fn on_stop(&self); + + async fn on_receive(&self, event: E) -> Result>; + + fn on_error(&self, error: BallistaError); +} + +#[derive(Clone)] +pub struct EventLoop { + name: String, + stopped: Arc, + buffer_size: usize, + action: Arc>, + tx_event: Option>, +} + +impl EventLoop { + pub fn new( + name: String, + buffer_size: usize, + action: Arc>, + ) -> Self { + Self { + name, + stopped: Arc::new(AtomicBool::new(false)), + buffer_size, + action, + tx_event: None, + } + } + + fn run(&self, mut rx_event: mpsc::Receiver) { + assert!( + self.tx_event.is_some(), + "The event sender should be initialized first!" + ); + let tx_event = self.tx_event.as_ref().unwrap().clone(); + let name = self.name.clone(); + let stopped = self.stopped.clone(); + let action = self.action.clone(); + tokio::spawn(async move { + info!("Starting the event loop {}", name); + while !stopped.load(Ordering::SeqCst) { + if let Some(event) = rx_event.recv().await { + match action.on_receive(event).await { + Ok(Some(event)) => { + if let Err(e) = tx_event.send(event).await { + let msg = format!("Fail to send event due to {}", e); + error!("{}", msg); + action.on_error(BallistaError::General(msg)); + } + } + Err(e) => { + error!("Fail to process event due to {}", e); + action.on_error(e); + } + _ => {} + } + } else { + info!("Event Channel closed, shutting down"); + break; + } + } + info!("The event loop {} has been stopped", name); + }); + } + + pub fn start(&mut self) -> Result<()> { + if self.stopped.load(Ordering::SeqCst) { + return Err(BallistaError::General(format!( + "{} has already been stopped", + self.name + ))); + } + self.action.on_start(); + + let (tx_event, rx_event) = mpsc::channel::(self.buffer_size); + self.tx_event = Some(tx_event); + self.run(rx_event); + + Ok(()) + } + + pub fn stop(&self) { + if !self.stopped.swap(true, Ordering::SeqCst) { + self.action.on_stop(); + } else { + // Keep quiet to allow calling `stop` multiple times. + } + } + + pub fn get_sender(&self) -> Result> { + Ok(EventSender { + tx_event: self.tx_event.as_ref().cloned().ok_or_else(|| { + BallistaError::General("Event sender not exist!!!".to_string()) + })?, + }) + } +} + +pub struct EventSender { + tx_event: mpsc::Sender, +} + +impl EventSender { + pub async fn post_event(&self, event: E) -> Result<()> { + Ok(self.tx_event.send(event).await.map_err(|e| { + BallistaError::General(format!("Fail to send event due to {}", e)) + })?) + } +} diff --git a/ballista/rust/core/src/lib.rs b/ballista/rust/core/src/lib.rs index 8329f63a7ba39..c452a45b10877 100644 --- a/ballista/rust/core/src/lib.rs +++ b/ballista/rust/core/src/lib.rs @@ -25,6 +25,7 @@ pub fn print_version() { pub mod client; pub mod config; pub mod error; +pub mod event_loop; pub mod execution_plans; pub mod utils; diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 27ba56eded96f..29af4838288b8 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -32,7 +32,7 @@ snmalloc = ["snmalloc-rs"] arrow = { version = "9.1" } arrow-flight = { version = "9.1" } anyhow = "1" -async-trait = "0.1.36" +async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.6.0" } configure_me = "0.4.0" datafusion = { path = "../../../datafusion", version = "7.0.0" } diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml index e726beb257a2c..2ff0073756a17 100644 --- a/ballista/rust/scheduler/Cargo.toml +++ b/ballista/rust/scheduler/Cargo.toml @@ -54,6 +54,7 @@ tonic = "0.6" tower = { version = "0.4" } warp = "0.3" parking_lot = "0.12" +async-trait = "0.1.41" [dev-dependencies] ballista-core = { path = "../core", version = "0.6.0" } diff --git a/ballista/rust/scheduler/src/main.rs b/ballista/rust/scheduler/src/main.rs index 316fe45ff9c1d..6646ce32428ad 100644 --- a/ballista/rust/scheduler/src/main.rs +++ b/ballista/rust/scheduler/src/main.rs @@ -40,15 +40,13 @@ use ballista_scheduler::state::EtcdClient; #[cfg(feature = "sled")] use ballista_scheduler::state::StandaloneClient; -use ballista_scheduler::scheduler_server::{ - SchedulerEnv, SchedulerServer, TaskScheduler, -}; +use ballista_scheduler::scheduler_server::SchedulerServer; use ballista_scheduler::state::{ConfigBackend, ConfigBackendClient}; use ballista_core::config::TaskSchedulingPolicy; use ballista_core::serde::BallistaCodec; use log::info; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::RwLock; #[macro_use] extern crate configure_me; @@ -81,24 +79,15 @@ async fn start_server( "Starting Scheduler grpc server with task scheduling policy of {:?}", policy ); - let scheduler_server: SchedulerServer = + let mut scheduler_server: SchedulerServer = match policy { - TaskSchedulingPolicy::PushStaged => { - // TODO make the buffer size configurable - let (tx_job, rx_job) = mpsc::channel::(10000); - let scheduler_server = SchedulerServer::new_with_policy( - config_backend.clone(), - namespace.clone(), - policy, - Some(SchedulerEnv { tx_job }), - Arc::new(RwLock::new(ExecutionContext::new())), - BallistaCodec::default(), - ); - let task_scheduler = - TaskScheduler::new(Arc::new(scheduler_server.clone())); - task_scheduler.start(rx_job); - scheduler_server - } + TaskSchedulingPolicy::PushStaged => SchedulerServer::new_with_policy( + config_backend.clone(), + namespace.clone(), + policy, + Arc::new(RwLock::new(ExecutionContext::new())), + BallistaCodec::default(), + ), _ => SchedulerServer::new( config_backend.clone(), namespace.clone(), @@ -107,6 +96,8 @@ async fn start_server( ), }; + scheduler_server.init().await?; + Server::bind(&addr) .serve(make_service_fn(move |request: &AddrStream| { let scheduler_grpc_server = diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs new file mode 100644 index 0000000000000..140cc5992f911 --- /dev/null +++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use log::{debug, warn}; + +use ballista_core::error::{BallistaError, Result}; +use ballista_core::event_loop::EventAction; +use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition}; +use ballista_core::serde::scheduler::ExecutorData; +use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; + +use crate::scheduler_server::task_scheduler::TaskScheduler; +use crate::scheduler_server::ExecutorsClient; +use crate::state::SchedulerState; + +#[derive(Clone)] +pub(crate) enum SchedulerServerEvent { + JobSubmitted(String), +} + +pub(crate) struct SchedulerServerEventAction< + T: 'static + AsLogicalPlan, + U: 'static + AsExecutionPlan, +> { + state: Arc>, + executors_client: ExecutorsClient, +} + +impl + SchedulerServerEventAction +{ + pub fn new( + state: Arc>, + executors_client: ExecutorsClient, + ) -> Self { + Self { + state, + executors_client, + } + } + + async fn offer_resources( + &self, + job_id: String, + ) -> Result> { + let mut available_executors = self.state.get_available_executors_data(); + // In case of there's no enough resources, reschedule the tasks of the job + if available_executors.is_empty() { + // TODO Maybe it's better to use an exclusive runtime for this kind task scheduling + warn!("Not enough available executors for task running"); + tokio::time::sleep(Duration::from_millis(100)).await; + return Ok(Some(SchedulerServerEvent::JobSubmitted(job_id))); + } + + let (tasks_assigment, num_tasks) = self + .state + .fetch_tasks(&mut available_executors, &job_id) + .await?; + if num_tasks > 0 { + self.launch_tasks(&available_executors, tasks_assigment) + .await?; + } + + Ok(None) + } + + async fn launch_tasks( + &self, + executors: &[ExecutorData], + tasks_assigment: Vec>, + ) -> Result<()> { + for (idx_executor, tasks) in tasks_assigment.into_iter().enumerate() { + if !tasks.is_empty() { + let executor_data = &executors[idx_executor]; + debug!( + "Start to launch tasks {:?} to executor {:?}", + tasks + .iter() + .map(|task| { + if let Some(task_id) = task.task_id.as_ref() { + format!( + "{}/{}/{}", + task_id.job_id, + task_id.stage_id, + task_id.partition_id + ) + } else { + "".to_string() + } + }) + .collect::>(), + executor_data.executor_id + ); + let mut client = { + let clients = self.executors_client.read().await; + clients.get(&executor_data.executor_id).unwrap().clone() + }; + // Update the resources first + self.state.save_executor_data(executor_data.clone()); + // TODO check whether launching task is successful or not + client.launch_task(LaunchTaskParams { task: tasks }).await?; + } else { + // Since the task assignment policy is round robin, + // if find tasks for one executor is empty, just break fast + break; + } + } + + Ok(()) + } +} + +#[async_trait] +impl + EventAction for SchedulerServerEventAction +{ + // TODO + fn on_start(&self) {} + + // TODO + fn on_stop(&self) {} + + async fn on_receive( + &self, + event: SchedulerServerEvent, + ) -> Result> { + match event { + SchedulerServerEvent::JobSubmitted(job_id) => { + self.offer_resources(job_id).await + } + } + } + + // TODO + fn on_error(&self, _error: BallistaError) {} +} diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs index ab9de01391428..7f7764cbe5c28 100644 --- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs @@ -17,6 +17,7 @@ use anyhow::Context; use ballista_core::config::TaskSchedulingPolicy; +use ballista_core::error::BallistaError; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::protobuf::execute_query_params::Query; use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient; @@ -36,6 +37,7 @@ use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::object_store::{local::LocalFileSystem, ObjectStore}; +use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::ExecutionPlan; use futures::StreamExt; use log::{debug, error, info, trace, warn}; @@ -44,10 +46,10 @@ use std::collections::HashSet; use std::convert::TryInto; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; -use tokio::sync::mpsc; use tonic::{Request, Response, Status}; use crate::planner::DistributedPlanner; +use crate::scheduler_server::event_loop::SchedulerServerEvent; use crate::scheduler_server::SchedulerServer; #[tonic::async_trait] @@ -299,17 +301,21 @@ impl SchedulerGrpc error!("Fail to get executor data for {:?}", &executor_id); } } - if let Some(scheduler_env) = self.scheduler_env.as_ref() { - let tx_job = scheduler_env.tx_job.clone(); + if let Some(event_loop) = self.event_loop.as_ref() { for job_id in jobs { - tx_job.send(job_id.clone()).await.map_err(|e| { - let msg = format!( - "Could not send job {} to the channel due to {:?}", - &job_id, e - ); - error!("{}", msg); - tonic::Status::internal(msg) - })?; + event_loop + .get_sender() + .map_err(|e| tonic::Status::internal(format!("{}", e)))? + .post_event(SchedulerServerEvent::JobSubmitted(job_id.clone())) + .await + .map_err(|e| { + let msg = format!( + "Could not send job {} to the channel due to {:?}", + &job_id, e + ); + error!("{}", msg); + tonic::Status::internal(msg) + })?; } } Ok(Response::new(UpdateTaskStatusResult { success: true })) @@ -398,14 +404,10 @@ impl SchedulerGrpc } }; debug!("Received plan for execution: {:?}", plan); - let job_id: String = { - let mut rng = thread_rng(); - std::iter::repeat(()) - .map(|()| rng.sample(Alphanumeric)) - .map(char::from) - .take(7) - .collect() - }; + + // Generate job id. + // TODO Maybe the format will be changed in the future + let job_id = generate_job_id(); // Save placeholder job metadata self.state @@ -420,135 +422,27 @@ impl SchedulerGrpc tonic::Status::internal(format!("Could not save job metadata: {}", e)) })?; - let state = self.state.clone(); - let job_id_spawn = job_id.clone(); - let tx_job: Option> = match self.policy { - TaskSchedulingPolicy::PullStaged => None, - TaskSchedulingPolicy::PushStaged => { - Some(self.scheduler_env.as_ref().unwrap().tx_job.clone()) - } - }; - let datafusion_ctx = self.ctx.read().await.clone(); - tokio::spawn(async move { - // create physical plan using DataFusion - macro_rules! fail_job { - ($code :expr) => {{ - match $code { - Err(error) => { - warn!("Job {} failed with {}", job_id_spawn, error); - state - .save_job_metadata( - &job_id_spawn, - &JobStatus { - status: Some(job_status::Status::Failed( - FailedJob { - error: format!("{}", error), - }, - )), - }, - ) - .await - .unwrap(); - return; - } - Ok(value) => value, - } - }}; - } - - let start = Instant::now(); - - let optimized_plan = - fail_job!(datafusion_ctx.optimize(&plan).map_err(|e| { - let msg = - format!("Could not create optimized logical plan: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })); - - debug!("Calculated optimized plan: {:?}", optimized_plan); - - let plan = fail_job!(datafusion_ctx - .create_physical_plan(&optimized_plan) - .await - .map_err(|e| { - let msg = format!("Could not create physical plan: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })); - - info!( - "DataFusion created physical plan in {} milliseconds", - start.elapsed().as_millis(), - ); - - // create distributed physical plan using Ballista - if let Err(e) = state - .save_job_metadata( - &job_id_spawn, - &JobStatus { - status: Some(job_status::Status::Running(RunningJob {})), - }, - ) - .await - { - warn!( - "Could not update job {} status to running: {}", - job_id_spawn, e - ); - } - let mut planner = DistributedPlanner::new(); - let stages = fail_job!(planner - .plan_query_stages(&job_id_spawn, plan) - .await - .map_err(|e| { - let msg = format!("Could not plan query stages: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })); - - // save stages into state - for shuffle_writer in stages { - fail_job!(state - .save_stage_plan( - &job_id_spawn, - shuffle_writer.stage_id(), - shuffle_writer.clone() + // Create job details for the plan, like stages, tasks, etc + // TODO To achieve more throughput, maybe change it to be event-based processing in the future + match create_job(self, job_id.clone(), plan).await { + Err(error) => { + let msg = format!("Job {} failed due to {}", job_id, error); + warn!("{}", msg); + self.state + .save_job_metadata( + &job_id, + &JobStatus { + status: Some(job_status::Status::Failed(FailedJob { + error: msg.to_string(), + })), + }, ) .await - .map_err(|e| { - let msg = format!("Could not save stage plan: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })); - let num_partitions = - shuffle_writer.output_partitioning().partition_count(); - for partition_id in 0..num_partitions { - let pending_status = TaskStatus { - task_id: Some(PartitionId { - job_id: job_id_spawn.clone(), - stage_id: shuffle_writer.stage_id() as u32, - partition_id: partition_id as u32, - }), - status: None, - }; - fail_job!(state.save_task_status(&pending_status).await.map_err( - |e| { - let msg = format!("Could not save task status: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - } - )); - } - } - - if let Some(tx_job) = tx_job { - // Send job_id to the scheduler channel - tx_job.send(job_id_spawn).await.unwrap(); + .unwrap(); + return Err(tonic::Status::internal(msg)); } - }); - - Ok(Response::new(ExecuteQueryResult { job_id })) + Ok(_) => Ok(Response::new(ExecuteQueryResult { job_id })), + } } else { Err(tonic::Status::internal("Error parsing request")) } @@ -567,6 +461,121 @@ impl SchedulerGrpc } } +fn generate_job_id() -> String { + let mut rng = thread_rng(); + std::iter::repeat(()) + .map(|()| rng.sample(Alphanumeric)) + .map(char::from) + .take(7) + .collect() +} + +async fn create_job( + scheduler_server: &SchedulerServer, + job_id: String, + plan: LogicalPlan, +) -> Result<(), BallistaError> { + // create physical plan using DataFusion + let plan = async move { + let start = Instant::now(); + + let ctx = scheduler_server.ctx.read().await.clone(); + let optimized_plan = ctx.optimize(&plan).map_err(|e| { + let msg = format!("Could not create optimized logical plan: {}", e); + error!("{}", msg); + BallistaError::General(msg) + })?; + + debug!("Calculated optimized plan: {:?}", optimized_plan); + + let plan = ctx + .create_physical_plan(&optimized_plan) + .await + .map_err(|e| { + let msg = format!("Could not create physical plan: {}", e); + error!("{}", msg); + BallistaError::General(msg) + }); + + info!( + "DataFusion created physical plan in {} milliseconds", + start.elapsed().as_millis() + ); + + plan + } + .await?; + + scheduler_server + .state + .save_job_metadata( + &job_id, + &JobStatus { + status: Some(job_status::Status::Running(RunningJob {})), + }, + ) + .await + .map_err(|e| { + warn!("Could not update job {} status to running: {}", job_id, e); + e + })?; + + // create distributed physical plan using Ballista + let mut planner = DistributedPlanner::new(); + let stages = planner + .plan_query_stages(&job_id, plan) + .await + .map_err(|e| { + let msg = format!("Could not plan query stages: {}", e); + error!("{}", msg); + BallistaError::General(msg) + })?; + + // save stages into state + for shuffle_writer in stages { + scheduler_server + .state + .save_stage_plan(&job_id, shuffle_writer.stage_id(), shuffle_writer.clone()) + .await + .map_err(|e| { + let msg = format!("Could not save stage plan: {}", e); + error!("{}", msg); + BallistaError::General(msg) + })?; + let num_partitions = shuffle_writer.output_partitioning().partition_count(); + for partition_id in 0..num_partitions { + let pending_status = TaskStatus { + task_id: Some(PartitionId { + job_id: job_id.clone(), + stage_id: shuffle_writer.stage_id() as u32, + partition_id: partition_id as u32, + }), + status: None, + }; + scheduler_server + .state + .save_task_status(&pending_status) + .await + .map_err(|e| { + let msg = format!("Could not save task status: {}", e); + error!("{}", msg); + BallistaError::General(msg) + })?; + } + } + + if let Some(event_loop) = scheduler_server.event_loop.as_ref() { + // Send job_id to the scheduler channel + event_loop + .get_sender()? + .post_event(SchedulerServerEvent::JobSubmitted(job_id)) + .await + .unwrap(); + }; + + Ok(()) +} + #[cfg(all(test, feature = "sled"))] mod test { use std::sync::Arc; diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs index 29fb0c954861c..029b80246538b 100644 --- a/ballista/rust/scheduler/src/scheduler_server/mod.rs +++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs @@ -16,25 +16,24 @@ // under the License. use std::collections::HashMap; -use std::marker::PhantomData; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; -use log::{debug, error, info, warn}; -use tonic::Status; +use tokio::sync::RwLock; +use tonic::transport::Channel; -use crate::state::{ConfigBackendClient, SchedulerState}; use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy}; -use ballista_core::error::BallistaError; -use ballista_core::execution_plans::ShuffleWriterExec; +use ballista_core::error::Result; +use ballista_core::event_loop::EventLoop; use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient; -use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition}; -use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; -use ballista_core::serde::scheduler::ExecutorData; + use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec}; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tokio::sync::{mpsc, RwLock}; -use tonic::transport::Channel; + +use crate::scheduler_server::event_loop::{ + SchedulerServerEvent, SchedulerServerEventAction, +}; +use crate::state::{ConfigBackendClient, SchedulerState}; // include the generated protobuf source as a submodule #[allow(clippy::all)] @@ -42,8 +41,10 @@ pub mod externalscaler { include!(concat!(env!("OUT_DIR"), "/externalscaler.rs")); } +mod event_loop; mod external_scaler; mod grpc; +mod task_scheduler; type ExecutorsClient = Arc>>>; @@ -52,17 +53,12 @@ pub struct SchedulerServer>, pub start_time: u128, policy: TaskSchedulingPolicy, - scheduler_env: Option, executors_client: Option, + event_loop: Option>, ctx: Arc>, codec: BallistaCodec, } -#[derive(Clone)] -pub struct SchedulerEnv { - pub tx_job: mpsc::Sender, -} - impl SchedulerServer { pub fn new( config: Arc, @@ -74,7 +70,6 @@ impl SchedulerServer SchedulerServer, namespace: String, policy: TaskSchedulingPolicy, - scheduler_env: Option, ctx: Arc>, codec: BallistaCodec, ) -> Self { let state = Arc::new(SchedulerState::new(config, namespace, codec.clone())); - let executors_client = if matches!(policy, TaskSchedulingPolicy::PushStaged) { - Some(Arc::new(RwLock::new(HashMap::new()))) - } else { - None - }; + let (executors_client, event_loop) = + if matches!(policy, TaskSchedulingPolicy::PushStaged) { + let executors_client = Arc::new(RwLock::new(HashMap::new())); + let event_action: Arc> = + Arc::new(SchedulerServerEventAction::new( + state.clone(), + executors_client.clone(), + )); + let event_loop = + EventLoop::new("scheduler".to_owned(), 10000, event_action); + (Some(executors_client), Some(event_loop)) + } else { + (None, None) + }; Self { state, start_time: SystemTime::now() @@ -102,188 +105,28 @@ impl SchedulerServer Result<(), BallistaError> { - let ctx = self.ctx.read().await; - self.state.init(&ctx).await?; - - Ok(()) - } - - async fn schedule_job(&self, job_id: String) -> Result<(), BallistaError> { - let mut available_executors = self.state.get_available_executors_data(); - - // In case of there's no enough resources, reschedule the tasks of the job - if available_executors.is_empty() { - let tx_job = self.scheduler_env.as_ref().unwrap().tx_job.clone(); - // TODO Maybe it's better to use an exclusive runtime for this kind task scheduling - warn!("Not enough available executors for task running"); - tokio::time::sleep(Duration::from_millis(100)).await; - tx_job.send(job_id).await.unwrap(); - return Ok(()); + pub async fn init(&mut self) -> Result<()> { + { + // initialize state + let ctx = self.ctx.read().await; + self.state.init(&ctx).await?; } - let (tasks_assigment, num_tasks) = - self.fetch_tasks(&mut available_executors, &job_id).await?; - if num_tasks > 0 { - for (idx_executor, tasks) in tasks_assigment.into_iter().enumerate() { - if !tasks.is_empty() { - let executor_data = &available_executors[idx_executor]; - debug!( - "Start to launch tasks {:?} to executor {:?}", - tasks, executor_data.executor_id - ); - let mut client = { - let clients = - self.executors_client.as_ref().unwrap().read().await; - info!("Size of executor clients: {:?}", clients.len()); - clients.get(&executor_data.executor_id).unwrap().clone() - }; - // Update the resources first - self.state.save_executor_data(executor_data.clone()); - // TODO check whether launching task is successful or not - client.launch_task(LaunchTaskParams { task: tasks }).await?; - } else { - // Since the task assignment policy is round robin, - // if find tasks for one executor is empty, just break fast - break; - } + { + if let Some(event_loop) = self.event_loop.as_mut() { + event_loop.start()?; } - return Ok(()); } Ok(()) } - - async fn fetch_tasks( - &self, - available_executors: &mut [ExecutorData], - job_id: &str, - ) -> Result<(Vec>, usize), BallistaError> { - let mut ret: Vec> = - Vec::with_capacity(available_executors.len()); - for _idx in 0..available_executors.len() { - ret.push(Vec::new()); - } - let mut num_tasks = 0; - loop { - info!("Go inside fetching task loop"); - let mut has_tasks = true; - for (idx, executor) in available_executors.iter_mut().enumerate() { - if executor.available_task_slots == 0 { - break; - } - let plan = self - .state - .assign_next_schedulable_job_task(&executor.executor_id, job_id) - .await - .map_err(|e| { - let msg = format!("Error finding next assignable task: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })?; - if let Some((task, _plan)) = &plan { - let task_id = task.task_id.as_ref().unwrap(); - info!( - "Sending new task to {}: {}/{}/{}", - executor.executor_id, - task_id.job_id, - task_id.stage_id, - task_id.partition_id - ); - } - match plan { - Some((status, plan)) => { - let plan_clone = plan.clone(); - let output_partitioning = if let Some(shuffle_writer) = - plan_clone.as_any().downcast_ref::() - { - shuffle_writer.shuffle_output_partitioning() - } else { - return Err(BallistaError::General(format!( - "Task root plan was not a ShuffleWriterExec: {:?}", - plan_clone - ))); - }; - - let mut buf: Vec = vec![]; - U::try_from_physical_plan( - plan, - self.codec.physical_extension_codec(), - ) - .and_then(|m| m.try_encode(&mut buf)) - .map_err(|e| { - Status::internal(format!( - "error serializing execution plan: {:?}", - e - )) - })?; - - ret[idx].push(TaskDefinition { - plan: buf, - task_id: status.task_id, - output_partitioning: hash_partitioning_to_proto( - output_partitioning, - ) - .map_err(|_| Status::internal("TBD".to_string()))?, - }); - executor.available_task_slots -= 1; - num_tasks += 1; - } - _ => { - // Indicate there's no more tasks to be scheduled - has_tasks = false; - break; - } - } - } - if !has_tasks { - break; - } - let has_executors = - available_executors.get(0).unwrap().available_task_slots > 0; - if !has_executors { - break; - } - } - Ok((ret, num_tasks)) - } -} - -pub struct TaskScheduler { - scheduler_server: Arc>, - plan_repr: PhantomData, - exec_repr: PhantomData, -} - -impl TaskScheduler { - pub fn new(scheduler_server: Arc>) -> Self { - Self { - scheduler_server, - plan_repr: PhantomData, - exec_repr: PhantomData, - } - } - - pub fn start(&self, mut rx_job: mpsc::Receiver) { - let scheduler_server = self.scheduler_server.clone(); - tokio::spawn(async move { - info!("Starting the task scheduler"); - loop { - let job_id = rx_job.recv().await.unwrap(); - info!("Fetch job {:?} to be scheduled", job_id.clone()); - - let server = scheduler_server.clone(); - server.schedule_job(job_id).await.unwrap(); - } - }); - } } /// Create a DataFusion context that is compatible with Ballista diff --git a/ballista/rust/scheduler/src/scheduler_server/task_scheduler.rs b/ballista/rust/scheduler/src/scheduler_server/task_scheduler.rs new file mode 100644 index 0000000000000..3cbf783beb61e --- /dev/null +++ b/ballista/rust/scheduler/src/scheduler_server/task_scheduler.rs @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::state::SchedulerState; +use async_trait::async_trait; +use ballista_core::error::BallistaError; +use ballista_core::execution_plans::ShuffleWriterExec; +use ballista_core::serde::protobuf::TaskDefinition; +use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; +use ballista_core::serde::scheduler::ExecutorData; +use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; +use log::{error, info}; +use tonic::Status; + +#[async_trait] +pub trait TaskScheduler { + async fn fetch_tasks( + &self, + available_executors: &mut [ExecutorData], + job_id: &str, + ) -> Result<(Vec>, usize), BallistaError>; +} + +#[async_trait] +impl TaskScheduler + for SchedulerState +{ + async fn fetch_tasks( + &self, + available_executors: &mut [ExecutorData], + job_id: &str, + ) -> Result<(Vec>, usize), BallistaError> { + let mut ret: Vec> = + Vec::with_capacity(available_executors.len()); + for _idx in 0..available_executors.len() { + ret.push(Vec::new()); + } + let mut num_tasks = 0; + loop { + info!("Go inside fetching task loop"); + let mut has_tasks = true; + for (idx, executor) in available_executors.iter_mut().enumerate() { + if executor.available_task_slots == 0 { + break; + } + let plan = self + .assign_next_schedulable_job_task(&executor.executor_id, job_id) + .await + .map_err(|e| { + let msg = format!("Error finding next assignable task: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + if let Some((task, _plan)) = &plan { + let task_id = task.task_id.as_ref().unwrap(); + info!( + "Sending new task to {}: {}/{}/{}", + executor.executor_id, + task_id.job_id, + task_id.stage_id, + task_id.partition_id + ); + } + match plan { + Some((status, plan)) => { + let plan_clone = plan.clone(); + let output_partitioning = if let Some(shuffle_writer) = + plan_clone.as_any().downcast_ref::() + { + shuffle_writer.shuffle_output_partitioning() + } else { + return Err(BallistaError::General(format!( + "Task root plan was not a ShuffleWriterExec: {:?}", + plan_clone + ))); + }; + + let mut buf: Vec = vec![]; + U::try_from_physical_plan( + plan, + self.get_codec().physical_extension_codec(), + ) + .and_then(|m| m.try_encode(&mut buf)) + .map_err(|e| { + Status::internal(format!( + "error serializing execution plan: {:?}", + e + )) + })?; + + ret[idx].push(TaskDefinition { + plan: buf, + task_id: status.task_id, + output_partitioning: hash_partitioning_to_proto( + output_partitioning, + ) + .map_err(|_| Status::internal("TBD".to_string()))?, + }); + executor.available_task_slots -= 1; + num_tasks += 1; + } + _ => { + // Indicate there's no more tasks to be scheduled + has_tasks = false; + break; + } + } + } + if !has_tasks { + break; + } + let has_executors = + available_executors.get(0).unwrap().available_task_slots > 0; + if !has_executors { + break; + } + } + Ok((ret, num_tasks)) + } +} diff --git a/ballista/rust/scheduler/src/standalone.rs b/ballista/rust/scheduler/src/standalone.rs index c75902f44e677..52e30cd096f33 100644 --- a/ballista/rust/scheduler/src/standalone.rs +++ b/ballista/rust/scheduler/src/standalone.rs @@ -33,7 +33,7 @@ use crate::{scheduler_server::SchedulerServer, state::StandaloneClient}; pub async fn new_standalone_scheduler() -> Result { let client = StandaloneClient::try_new_temporary()?; - let scheduler_server: SchedulerServer = + let mut scheduler_server: SchedulerServer = SchedulerServer::new( Arc::new(client), "ballista".to_string(), @@ -41,7 +41,7 @@ pub async fn new_standalone_scheduler() -> Result { BallistaCodec::default(), ); scheduler_server.init().await?; - let server = SchedulerGrpcServer::new(scheduler_server); + let server = SchedulerGrpcServer::new(scheduler_server.clone()); // Let the OS assign a random, free port let listener = TcpListener::bind("localhost:0").await?; let addr = listener.local_addr()?; diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index c125a9ae9fff0..e30075e827eb2 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -620,6 +620,10 @@ impl SchedulerState &BallistaCodec { + &self.stable_state.codec + } + pub async fn get_executors_metadata( &self, ) -> Result> {