From 21a0945b6030a38957301fc6b19d83eec9aa1742 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Mon, 14 Mar 2022 13:53:31 +0800 Subject: [PATCH 1/4] Fix concurrently updating ExecutorData --- ballista/rust/core/src/serde/scheduler/mod.rs | 5 +++ .../src/scheduler_server/event_loop.rs | 33 +++++++++++++++---- .../scheduler/src/scheduler_server/grpc.rs | 13 +++++--- .../scheduler/src/state/in_memory_state.rs | 30 ++++++++++++++++- ballista/rust/scheduler/src/state/mod.rs | 7 +++- 5 files changed, 75 insertions(+), 13 deletions(-) diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index c304382a9b630..4c76cf8549eac 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -147,6 +147,11 @@ pub struct ExecutorData { pub available_task_slots: u32, } +pub struct ExecutorDeltaData { + pub executor_id: String, + pub task_slots: i32, +} + struct ExecutorResourcePair { total: protobuf::executor_resource::Resource, available: protobuf::executor_resource::Resource, diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs index 46d05f1a8eb1f..c5a1800f35be4 100644 --- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs +++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs @@ -24,7 +24,7 @@ use log::{debug, warn}; use ballista_core::error::{BallistaError, Result}; use ballista_core::event_loop::EventAction; use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition}; -use ballista_core::serde::scheduler::ExecutorData; +use ballista_core::serde::scheduler::ExecutorDeltaData; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; use crate::scheduler_server::ExecutorsClient; @@ -70,12 +70,28 @@ impl return Ok(Some(SchedulerServerEvent::JobSubmitted(job_id))); } + let mut executors_delta_data: Vec = available_executors + .iter() + .map(|executor_data| ExecutorDeltaData { + executor_id: executor_data.executor_id.clone(), + task_slots: executor_data.available_task_slots as i32, + }) + .collect(); + let (tasks_assigment, num_tasks) = self .state .fetch_tasks(&mut available_executors, &job_id) .await?; + for (delta_data, data) in executors_delta_data + .iter_mut() + .zip(available_executors.iter()) + { + delta_data.task_slots = + data.available_task_slots as i32 - delta_data.task_slots; + } + if num_tasks > 0 { - self.launch_tasks(&available_executors, tasks_assigment) + self.launch_tasks(&executors_delta_data, tasks_assigment) .await?; } @@ -84,12 +100,12 @@ impl async fn launch_tasks( &self, - executors: &[ExecutorData], + executors: &[ExecutorDeltaData], tasks_assigment: Vec>, ) -> Result<()> { for (idx_executor, tasks) in tasks_assigment.into_iter().enumerate() { if !tasks.is_empty() { - let executor_data = &executors[idx_executor]; + let executor_delta_data = &executors[idx_executor]; debug!( "Start to launch tasks {:?} to executor {:?}", tasks @@ -107,14 +123,17 @@ impl } }) .collect::>(), - executor_data.executor_id + executor_delta_data.executor_id ); let mut client = { let clients = self.executors_client.read().await; - clients.get(&executor_data.executor_id).unwrap().clone() + clients + .get(&executor_delta_data.executor_id) + .unwrap() + .clone() }; // Update the resources first - self.state.save_executor_data(executor_data.clone()); + self.state.update_executor_data(executor_delta_data); // TODO check whether launching task is successful or not client.launch_task(LaunchTaskParams { task: tasks }).await?; } else { diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs index fe4eb36a00738..990a253b4aa64 100644 --- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs @@ -30,7 +30,9 @@ use ballista_core::serde::protobuf::{ TaskDefinition, UpdateTaskStatusParams, UpdateTaskStatusResult, }; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; -use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata}; +use ballista_core::serde::scheduler::{ + ExecutorData, ExecutorDeltaData, ExecutorMetadata, +}; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; @@ -290,9 +292,12 @@ impl SchedulerGrpc jobs.insert(task_id.job_id.clone()); } } - if let Some(mut executor_data) = self.state.get_executor_data(&executor_id) { - executor_data.available_task_slots += num_tasks as u32; - self.state.save_executor_data(executor_data); + + if let Some(executor_data) = self.state.get_executor_data(&executor_id) { + self.state.update_executor_data(&ExecutorDeltaData { + executor_id: executor_data.executor_id, + task_slots: num_tasks as i32, + }); } else { error!("Fail to get executor data for {:?}", &executor_id); } diff --git a/ballista/rust/scheduler/src/state/in_memory_state.rs b/ballista/rust/scheduler/src/state/in_memory_state.rs index 4eac923717801..f535d154400e2 100644 --- a/ballista/rust/scheduler/src/state/in_memory_state.rs +++ b/ballista/rust/scheduler/src/state/in_memory_state.rs @@ -16,7 +16,8 @@ // under the License. use ballista_core::serde::protobuf::{ExecutorHeartbeat, TaskStatus}; -use ballista_core::serde::scheduler::ExecutorData; +use ballista_core::serde::scheduler::{ExecutorData, ExecutorDeltaData}; +use log::{error, info, warn}; use parking_lot::RwLock; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -85,6 +86,33 @@ impl InMemorySchedulerState { executors_data.insert(executor_data.executor_id.clone(), executor_data); } + pub(crate) fn update_executor_data(&self, executor_delta_data: &ExecutorDeltaData) { + let mut executors_data = self.executors_data.write(); + if let Some(executor_data) = + executors_data.get_mut(&executor_delta_data.executor_id) + { + let available_task_slots = executor_data.available_task_slots as i32 + + executor_delta_data.task_slots; + if available_task_slots < 0 { + error!( + "Available task slots {} for executor {} is less than 0", + available_task_slots, executor_data.executor_id + ); + } else { + info!( + "available_task_slots for executor {} becomes {}", + executor_data.executor_id, available_task_slots + ); + executor_data.available_task_slots = available_task_slots as u32; + } + } else { + warn!( + "Could not find executor data for {}", + executor_delta_data.executor_id + ); + } + } + pub(crate) fn get_executor_data(&self, executor_id: &str) -> Option { let executors_data = self.executors_data.read(); executors_data.get(executor_id).cloned() diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index e015e5a0da411..f21ab008bc8a0 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -32,7 +32,7 @@ use ballista_core::serde::protobuf::{ FailedTask, JobStatus, RunningJob, RunningTask, TaskStatus, }; use ballista_core::serde::scheduler::{ - ExecutorData, ExecutorMetadata, PartitionId, PartitionStats, + ExecutorData, ExecutorDeltaData, ExecutorMetadata, PartitionId, PartitionStats, }; use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec}; use datafusion::prelude::ExecutionContext; @@ -227,6 +227,11 @@ impl SchedulerState Vec { self.in_memory_state.get_available_executors_data() } From 4e70fe0b56d2e1620c1186d6fc6b5897a2e6c89e Mon Sep 17 00:00:00 2001 From: yangzhong Date: Mon, 14 Mar 2022 13:55:20 +0800 Subject: [PATCH 2/4] Fix get_available_executors_data() to avoid losing event of SchedulerServerEvent::JobSubmitted in offer_resources() --- ballista/rust/scheduler/src/state/in_memory_state.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ballista/rust/scheduler/src/state/in_memory_state.rs b/ballista/rust/scheduler/src/state/in_memory_state.rs index f535d154400e2..ae05fc1fb072d 100644 --- a/ballista/rust/scheduler/src/state/in_memory_state.rs +++ b/ballista/rust/scheduler/src/state/in_memory_state.rs @@ -128,7 +128,8 @@ impl InMemorySchedulerState { executors_data .iter() .filter_map(|(exec, data)| { - alive_executors.contains(exec).then(|| data.clone()) + (data.available_task_slots > 0 && alive_executors.contains(exec)) + .then(|| data.clone()) }) .collect::>() }; From e5aeb53bc974d835bcd1ae61a04a15bac511a571 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Mon, 14 Mar 2022 13:57:02 +0800 Subject: [PATCH 3/4] Fix missing update JobStatus from Queued to Running --- .../scheduler_server/query_stage_scheduler.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs index 5a2800c904d65..31a0f9d3127e8 100644 --- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs +++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs @@ -19,12 +19,14 @@ use std::sync::Arc; use std::time::Instant; use async_trait::async_trait; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use tokio::sync::RwLock; use ballista_core::error::{BallistaError, Result}; use ballista_core::event_loop::{EventAction, EventSender}; -use ballista_core::serde::protobuf::{PartitionId, TaskStatus}; +use ballista_core::serde::protobuf::{ + job_status, JobStatus, PartitionId, RunningJob, TaskStatus, +}; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::ExecutionPlan; @@ -160,7 +162,20 @@ impl ) -> Result> { match event { QueryStageSchedulerEvent::JobSubmitted(job_id, plan) => { + info!("Job {} submitted", job_id); let plan = self.create_physical_plan(plan).await?; + if let Err(e) = self + .state + .save_job_metadata( + &job_id, + &JobStatus { + status: Some(job_status::Status::Running(RunningJob {})), + }, + ) + .await + { + warn!("Could not update job {} status to running: {}", job_id, e); + } self.generate_stages(&job_id, plan).await?; if let Some(event_sender) = self.event_sender.as_ref() { From 1d462154e58bbfcd653c9ba154365ce4cb26ba17 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Wed, 16 Mar 2022 12:49:03 +0800 Subject: [PATCH 4/4] Fix for PR review --- ballista/rust/core/src/serde/scheduler/mod.rs | 2 +- .../src/scheduler_server/event_loop.rs | 24 +++++++++---------- .../scheduler/src/scheduler_server/grpc.rs | 4 ++-- .../scheduler/src/state/in_memory_state.rs | 10 ++++---- ballista/rust/scheduler/src/state/mod.rs | 6 ++--- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index 4c76cf8549eac..369a87d2255b4 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -147,7 +147,7 @@ pub struct ExecutorData { pub available_task_slots: u32, } -pub struct ExecutorDeltaData { +pub struct ExecutorDataChange { pub executor_id: String, pub task_slots: i32, } diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs index c5a1800f35be4..a7d656c94618d 100644 --- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs +++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs @@ -24,7 +24,7 @@ use log::{debug, warn}; use ballista_core::error::{BallistaError, Result}; use ballista_core::event_loop::EventAction; use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition}; -use ballista_core::serde::scheduler::ExecutorDeltaData; +use ballista_core::serde::scheduler::ExecutorDataChange; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; use crate::scheduler_server::ExecutorsClient; @@ -70,9 +70,9 @@ impl return Ok(Some(SchedulerServerEvent::JobSubmitted(job_id))); } - let mut executors_delta_data: Vec = available_executors + let mut executors_data_change: Vec = available_executors .iter() - .map(|executor_data| ExecutorDeltaData { + .map(|executor_data| ExecutorDataChange { executor_id: executor_data.executor_id.clone(), task_slots: executor_data.available_task_slots as i32, }) @@ -82,16 +82,16 @@ impl .state .fetch_tasks(&mut available_executors, &job_id) .await?; - for (delta_data, data) in executors_delta_data + for (data_change, data) in executors_data_change .iter_mut() .zip(available_executors.iter()) { - delta_data.task_slots = - data.available_task_slots as i32 - delta_data.task_slots; + data_change.task_slots = + data.available_task_slots as i32 - data_change.task_slots; } if num_tasks > 0 { - self.launch_tasks(&executors_delta_data, tasks_assigment) + self.launch_tasks(&executors_data_change, tasks_assigment) .await?; } @@ -100,12 +100,12 @@ impl async fn launch_tasks( &self, - executors: &[ExecutorDeltaData], + executors: &[ExecutorDataChange], tasks_assigment: Vec>, ) -> Result<()> { for (idx_executor, tasks) in tasks_assigment.into_iter().enumerate() { if !tasks.is_empty() { - let executor_delta_data = &executors[idx_executor]; + let executor_data_change = &executors[idx_executor]; debug!( "Start to launch tasks {:?} to executor {:?}", tasks @@ -123,17 +123,17 @@ impl } }) .collect::>(), - executor_delta_data.executor_id + executor_data_change.executor_id ); let mut client = { let clients = self.executors_client.read().await; clients - .get(&executor_delta_data.executor_id) + .get(&executor_data_change.executor_id) .unwrap() .clone() }; // Update the resources first - self.state.update_executor_data(executor_delta_data); + self.state.update_executor_data(executor_data_change); // TODO check whether launching task is successful or not client.launch_task(LaunchTaskParams { task: tasks }).await?; } else { diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs index 990a253b4aa64..6b96d41e990c0 100644 --- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs @@ -31,7 +31,7 @@ use ballista_core::serde::protobuf::{ }; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; use ballista_core::serde::scheduler::{ - ExecutorData, ExecutorDeltaData, ExecutorMetadata, + ExecutorData, ExecutorDataChange, ExecutorMetadata, }; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; use datafusion::datasource::file_format::parquet::ParquetFormat; @@ -294,7 +294,7 @@ impl SchedulerGrpc } if let Some(executor_data) = self.state.get_executor_data(&executor_id) { - self.state.update_executor_data(&ExecutorDeltaData { + self.state.update_executor_data(&ExecutorDataChange { executor_id: executor_data.executor_id, task_slots: num_tasks as i32, }); diff --git a/ballista/rust/scheduler/src/state/in_memory_state.rs b/ballista/rust/scheduler/src/state/in_memory_state.rs index ae05fc1fb072d..666d2294011f8 100644 --- a/ballista/rust/scheduler/src/state/in_memory_state.rs +++ b/ballista/rust/scheduler/src/state/in_memory_state.rs @@ -16,7 +16,7 @@ // under the License. use ballista_core::serde::protobuf::{ExecutorHeartbeat, TaskStatus}; -use ballista_core::serde::scheduler::{ExecutorData, ExecutorDeltaData}; +use ballista_core::serde::scheduler::{ExecutorData, ExecutorDataChange}; use log::{error, info, warn}; use parking_lot::RwLock; use std::collections::{HashMap, HashSet}; @@ -86,13 +86,13 @@ impl InMemorySchedulerState { executors_data.insert(executor_data.executor_id.clone(), executor_data); } - pub(crate) fn update_executor_data(&self, executor_delta_data: &ExecutorDeltaData) { + pub(crate) fn update_executor_data(&self, executor_data_change: &ExecutorDataChange) { let mut executors_data = self.executors_data.write(); if let Some(executor_data) = - executors_data.get_mut(&executor_delta_data.executor_id) + executors_data.get_mut(&executor_data_change.executor_id) { let available_task_slots = executor_data.available_task_slots as i32 - + executor_delta_data.task_slots; + + executor_data_change.task_slots; if available_task_slots < 0 { error!( "Available task slots {} for executor {} is less than 0", @@ -108,7 +108,7 @@ impl InMemorySchedulerState { } else { warn!( "Could not find executor data for {}", - executor_delta_data.executor_id + executor_data_change.executor_id ); } } diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index f21ab008bc8a0..cc7252f680005 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -32,7 +32,7 @@ use ballista_core::serde::protobuf::{ FailedTask, JobStatus, RunningJob, RunningTask, TaskStatus, }; use ballista_core::serde::scheduler::{ - ExecutorData, ExecutorDeltaData, ExecutorMetadata, PartitionId, PartitionStats, + ExecutorData, ExecutorDataChange, ExecutorMetadata, PartitionId, PartitionStats, }; use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec}; use datafusion::prelude::ExecutionContext; @@ -227,9 +227,9 @@ impl SchedulerState Vec {