diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index c304382a9b630..369a87d2255b4 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -147,6 +147,11 @@ pub struct ExecutorData { pub available_task_slots: u32, } +pub struct ExecutorDataChange { + pub executor_id: String, + pub task_slots: i32, +} + struct ExecutorResourcePair { total: protobuf::executor_resource::Resource, available: protobuf::executor_resource::Resource, diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs index 46d05f1a8eb1f..a7d656c94618d 100644 --- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs +++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs @@ -24,7 +24,7 @@ use log::{debug, warn}; use ballista_core::error::{BallistaError, Result}; use ballista_core::event_loop::EventAction; use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition}; -use ballista_core::serde::scheduler::ExecutorData; +use ballista_core::serde::scheduler::ExecutorDataChange; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; use crate::scheduler_server::ExecutorsClient; @@ -70,12 +70,28 @@ impl return Ok(Some(SchedulerServerEvent::JobSubmitted(job_id))); } + let mut executors_data_change: Vec = available_executors + .iter() + .map(|executor_data| ExecutorDataChange { + executor_id: executor_data.executor_id.clone(), + task_slots: executor_data.available_task_slots as i32, + }) + .collect(); + let (tasks_assigment, num_tasks) = self .state .fetch_tasks(&mut available_executors, &job_id) .await?; + for (data_change, data) in executors_data_change + .iter_mut() + .zip(available_executors.iter()) + { + data_change.task_slots = + data.available_task_slots as i32 - data_change.task_slots; + } + if num_tasks > 0 { - self.launch_tasks(&available_executors, tasks_assigment) + self.launch_tasks(&executors_data_change, tasks_assigment) .await?; } @@ -84,12 +100,12 @@ impl async fn launch_tasks( &self, - executors: &[ExecutorData], + executors: &[ExecutorDataChange], tasks_assigment: Vec>, ) -> Result<()> { for (idx_executor, tasks) in tasks_assigment.into_iter().enumerate() { if !tasks.is_empty() { - let executor_data = &executors[idx_executor]; + let executor_data_change = &executors[idx_executor]; debug!( "Start to launch tasks {:?} to executor {:?}", tasks @@ -107,14 +123,17 @@ impl } }) .collect::>(), - executor_data.executor_id + executor_data_change.executor_id ); let mut client = { let clients = self.executors_client.read().await; - clients.get(&executor_data.executor_id).unwrap().clone() + clients + .get(&executor_data_change.executor_id) + .unwrap() + .clone() }; // Update the resources first - self.state.save_executor_data(executor_data.clone()); + self.state.update_executor_data(executor_data_change); // TODO check whether launching task is successful or not client.launch_task(LaunchTaskParams { task: tasks }).await?; } else { diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs index fe4eb36a00738..6b96d41e990c0 100644 --- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs @@ -30,7 +30,9 @@ use ballista_core::serde::protobuf::{ TaskDefinition, UpdateTaskStatusParams, UpdateTaskStatusResult, }; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; -use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata}; +use ballista_core::serde::scheduler::{ + ExecutorData, ExecutorDataChange, ExecutorMetadata, +}; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; @@ -290,9 +292,12 @@ impl SchedulerGrpc jobs.insert(task_id.job_id.clone()); } } - if let Some(mut executor_data) = self.state.get_executor_data(&executor_id) { - executor_data.available_task_slots += num_tasks as u32; - self.state.save_executor_data(executor_data); + + if let Some(executor_data) = self.state.get_executor_data(&executor_id) { + self.state.update_executor_data(&ExecutorDataChange { + executor_id: executor_data.executor_id, + task_slots: num_tasks as i32, + }); } else { error!("Fail to get executor data for {:?}", &executor_id); } diff --git a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs index 5a2800c904d65..31a0f9d3127e8 100644 --- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs +++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs @@ -19,12 +19,14 @@ use std::sync::Arc; use std::time::Instant; use async_trait::async_trait; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use tokio::sync::RwLock; use ballista_core::error::{BallistaError, Result}; use ballista_core::event_loop::{EventAction, EventSender}; -use ballista_core::serde::protobuf::{PartitionId, TaskStatus}; +use ballista_core::serde::protobuf::{ + job_status, JobStatus, PartitionId, RunningJob, TaskStatus, +}; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::ExecutionPlan; @@ -160,7 +162,20 @@ impl ) -> Result> { match event { QueryStageSchedulerEvent::JobSubmitted(job_id, plan) => { + info!("Job {} submitted", job_id); let plan = self.create_physical_plan(plan).await?; + if let Err(e) = self + .state + .save_job_metadata( + &job_id, + &JobStatus { + status: Some(job_status::Status::Running(RunningJob {})), + }, + ) + .await + { + warn!("Could not update job {} status to running: {}", job_id, e); + } self.generate_stages(&job_id, plan).await?; if let Some(event_sender) = self.event_sender.as_ref() { diff --git a/ballista/rust/scheduler/src/state/in_memory_state.rs b/ballista/rust/scheduler/src/state/in_memory_state.rs index 4eac923717801..666d2294011f8 100644 --- a/ballista/rust/scheduler/src/state/in_memory_state.rs +++ b/ballista/rust/scheduler/src/state/in_memory_state.rs @@ -16,7 +16,8 @@ // under the License. use ballista_core::serde::protobuf::{ExecutorHeartbeat, TaskStatus}; -use ballista_core::serde::scheduler::ExecutorData; +use ballista_core::serde::scheduler::{ExecutorData, ExecutorDataChange}; +use log::{error, info, warn}; use parking_lot::RwLock; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -85,6 +86,33 @@ impl InMemorySchedulerState { executors_data.insert(executor_data.executor_id.clone(), executor_data); } + pub(crate) fn update_executor_data(&self, executor_data_change: &ExecutorDataChange) { + let mut executors_data = self.executors_data.write(); + if let Some(executor_data) = + executors_data.get_mut(&executor_data_change.executor_id) + { + let available_task_slots = executor_data.available_task_slots as i32 + + executor_data_change.task_slots; + if available_task_slots < 0 { + error!( + "Available task slots {} for executor {} is less than 0", + available_task_slots, executor_data.executor_id + ); + } else { + info!( + "available_task_slots for executor {} becomes {}", + executor_data.executor_id, available_task_slots + ); + executor_data.available_task_slots = available_task_slots as u32; + } + } else { + warn!( + "Could not find executor data for {}", + executor_data_change.executor_id + ); + } + } + pub(crate) fn get_executor_data(&self, executor_id: &str) -> Option { let executors_data = self.executors_data.read(); executors_data.get(executor_id).cloned() @@ -100,7 +128,8 @@ impl InMemorySchedulerState { executors_data .iter() .filter_map(|(exec, data)| { - alive_executors.contains(exec).then(|| data.clone()) + (data.available_task_slots > 0 && alive_executors.contains(exec)) + .then(|| data.clone()) }) .collect::>() }; diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index e015e5a0da411..cc7252f680005 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -32,7 +32,7 @@ use ballista_core::serde::protobuf::{ FailedTask, JobStatus, RunningJob, RunningTask, TaskStatus, }; use ballista_core::serde::scheduler::{ - ExecutorData, ExecutorMetadata, PartitionId, PartitionStats, + ExecutorData, ExecutorDataChange, ExecutorMetadata, PartitionId, PartitionStats, }; use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec}; use datafusion::prelude::ExecutionContext; @@ -227,6 +227,11 @@ impl SchedulerState Vec { self.in_memory_state.get_available_executors_data() }