From ac82e109620feedb827126b0529e5e4fa4d409c8 Mon Sep 17 00:00:00 2001 From: yangzhong Date: Tue, 22 Feb 2022 18:24:59 +0800 Subject: [PATCH] Refactor scheduler state mod --- .../rust/scheduler/scheduler_config_spec.toml | 6 +- ballista/rust/scheduler/src/lib.rs | 3 - ballista/rust/scheduler/src/main.rs | 18 +- .../src/scheduler_server/event_loop.rs | 2 +- .../scheduler/src/scheduler_server/grpc.rs | 2 +- .../scheduler/src/scheduler_server/mod.rs | 8 +- ballista/rust/scheduler/src/standalone.rs | 4 +- .../scheduler/src/state/{ => backend}/etcd.rs | 7 +- .../rust/scheduler/src/state/backend/mod.rs | 94 +++ .../src/state/{ => backend}/standalone.rs | 18 +- .../scheduler/src/state/in_memory_state.rs | 179 ++++++ ballista/rust/scheduler/src/state/mod.rs | 586 ++---------------- .../scheduler/src/state/persistent_state.rs | 312 ++++++++++ .../task_scheduler.rs | 0 14 files changed, 666 insertions(+), 573 deletions(-) rename ballista/rust/scheduler/src/state/{ => backend}/etcd.rs (96%) create mode 100644 ballista/rust/scheduler/src/state/backend/mod.rs rename ballista/rust/scheduler/src/state/{ => backend}/standalone.rs (92%) create mode 100644 ballista/rust/scheduler/src/state/in_memory_state.rs create mode 100644 ballista/rust/scheduler/src/state/persistent_state.rs rename ballista/rust/scheduler/src/{scheduler_server => state}/task_scheduler.rs (100%) diff --git a/ballista/rust/scheduler/scheduler_config_spec.toml b/ballista/rust/scheduler/scheduler_config_spec.toml index 0a8cd819594db..000d74e7d32d8 100644 --- a/ballista/rust/scheduler/scheduler_config_spec.toml +++ b/ballista/rust/scheduler/scheduler_config_spec.toml @@ -27,9 +27,9 @@ doc = "Print version of this executable" [[param]] abbr = "b" name = "config_backend" -type = "ballista_scheduler::state::ConfigBackend" -doc = "The configuration backend for the scheduler, see ConfigBackend::variants() for options. Default: Standalone" -default = "ballista_scheduler::state::ConfigBackend::Standalone" +type = "ballista_scheduler::state::backend::StateBackend" +doc = "The configuration backend for the scheduler, see StateBackend::variants() for options. Default: Standalone" +default = "ballista_scheduler::state::backend::StateBackend::Standalone" [[param]] abbr = "n" diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 8a4b7cd5da970..ea39ef02efd4f 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -26,6 +26,3 @@ pub mod state; #[cfg(test)] pub mod test_utils; - -#[cfg(feature = "sled")] -extern crate sled_package as sled; diff --git a/ballista/rust/scheduler/src/main.rs b/ballista/rust/scheduler/src/main.rs index 6646ce32428ad..4b74573cc3ae9 100644 --- a/ballista/rust/scheduler/src/main.rs +++ b/ballista/rust/scheduler/src/main.rs @@ -36,12 +36,12 @@ use ballista_core::{ }; use ballista_scheduler::api::{get_routes, EitherBody, Error}; #[cfg(feature = "etcd")] -use ballista_scheduler::state::EtcdClient; +use ballista_scheduler::state::backend::etcd::EtcdClient; #[cfg(feature = "sled")] -use ballista_scheduler::state::StandaloneClient; +use ballista_scheduler::state::backend::standalone::StandaloneClient; use ballista_scheduler::scheduler_server::SchedulerServer; -use ballista_scheduler::state::{ConfigBackend, ConfigBackendClient}; +use ballista_scheduler::state::backend::{StateBackend, StateBackendClient}; use ballista_core::config::TaskSchedulingPolicy; use ballista_core::serde::BallistaCodec; @@ -65,7 +65,7 @@ use config::prelude::*; use datafusion::prelude::ExecutionContext; async fn start_server( - config_backend: Arc, + config_backend: Arc, namespace: String, addr: SocketAddr, policy: TaskSchedulingPolicy, @@ -161,26 +161,26 @@ async fn main() -> Result<()> { let addr = format!("{}:{}", bind_host, port); let addr = addr.parse()?; - let client: Arc = match opt.config_backend { + let client: Arc = match opt.config_backend { #[cfg(not(any(feature = "sled", feature = "etcd")))] _ => std::compile_error!( "To build the scheduler enable at least one config backend feature (`etcd` or `sled`)" ), #[cfg(feature = "etcd")] - ConfigBackend::Etcd => { + StateBackend::Etcd => { let etcd = etcd_client::Client::connect(&[opt.etcd_urls], None) .await .context("Could not connect to etcd")?; Arc::new(EtcdClient::new(etcd)) } #[cfg(not(feature = "etcd"))] - ConfigBackend::Etcd => { + StateBackend::Etcd => { unimplemented!( "build the scheduler with the `etcd` feature to use the etcd config backend" ) } #[cfg(feature = "sled")] - ConfigBackend::Standalone => { + StateBackend::Standalone => { // TODO: Use a real file and make path is configurable Arc::new( StandaloneClient::try_new_temporary() @@ -188,7 +188,7 @@ async fn main() -> Result<()> { ) } #[cfg(not(feature = "sled"))] - ConfigBackend::Standalone => { + StateBackend::Standalone => { unimplemented!( "build the scheduler with the `sled` feature to use the standalone config backend" ) diff --git a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs index 140cc5992f911..46d05f1a8eb1f 100644 --- a/ballista/rust/scheduler/src/scheduler_server/event_loop.rs +++ b/ballista/rust/scheduler/src/scheduler_server/event_loop.rs @@ -27,8 +27,8 @@ use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition}; use ballista_core::serde::scheduler::ExecutorData; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; -use crate::scheduler_server::task_scheduler::TaskScheduler; use crate::scheduler_server::ExecutorsClient; +use crate::state::task_scheduler::TaskScheduler; use crate::state::SchedulerState; #[derive(Clone)] diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs index 7f7764cbe5c28..5dd2a8a0ba4c7 100644 --- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs @@ -583,7 +583,7 @@ mod test { use tonic::Request; - use crate::state::{SchedulerState, StandaloneClient}; + use crate::state::{backend::standalone::StandaloneClient, SchedulerState}; use ballista_core::error::BallistaError; use ballista_core::serde::protobuf::{ executor_registration::OptionalHost, ExecutorRegistration, LogicalPlanNode, diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs index 029b80246538b..fdcd375fbb334 100644 --- a/ballista/rust/scheduler/src/scheduler_server/mod.rs +++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs @@ -33,7 +33,8 @@ use datafusion::prelude::{ExecutionConfig, ExecutionContext}; use crate::scheduler_server::event_loop::{ SchedulerServerEvent, SchedulerServerEventAction, }; -use crate::state::{ConfigBackendClient, SchedulerState}; +use crate::state::backend::StateBackendClient; +use crate::state::SchedulerState; // include the generated protobuf source as a submodule #[allow(clippy::all)] @@ -44,7 +45,6 @@ pub mod externalscaler { mod event_loop; mod external_scaler; mod grpc; -mod task_scheduler; type ExecutorsClient = Arc>>>; @@ -61,7 +61,7 @@ pub struct SchedulerServer SchedulerServer { pub fn new( - config: Arc, + config: Arc, namespace: String, ctx: Arc>, codec: BallistaCodec, @@ -76,7 +76,7 @@ impl SchedulerServer, + config: Arc, namespace: String, policy: TaskSchedulingPolicy, ctx: Arc>, diff --git a/ballista/rust/scheduler/src/standalone.rs b/ballista/rust/scheduler/src/standalone.rs index 52e30cd096f33..45984b6b54766 100644 --- a/ballista/rust/scheduler/src/standalone.rs +++ b/ballista/rust/scheduler/src/standalone.rs @@ -28,7 +28,9 @@ use tokio::net::TcpListener; use tokio::sync::RwLock; use tonic::transport::Server; -use crate::{scheduler_server::SchedulerServer, state::StandaloneClient}; +use crate::{ + scheduler_server::SchedulerServer, state::backend::standalone::StandaloneClient, +}; pub async fn new_standalone_scheduler() -> Result { let client = StandaloneClient::try_new_temporary()?; diff --git a/ballista/rust/scheduler/src/state/etcd.rs b/ballista/rust/scheduler/src/state/backend/etcd.rs similarity index 96% rename from ballista/rust/scheduler/src/state/etcd.rs rename to ballista/rust/scheduler/src/state/backend/etcd.rs index d6741a7d83dcc..fa85e54d5f85d 100644 --- a/ballista/rust/scheduler/src/state/etcd.rs +++ b/ballista/rust/scheduler/src/state/backend/etcd.rs @@ -19,16 +19,15 @@ use std::task::Poll; -use crate::state::ConfigBackendClient; use ballista_core::error::{ballista_error, Result}; use etcd_client::{GetOptions, LockResponse, WatchOptions, WatchStream, Watcher}; use futures::{Stream, StreamExt}; use log::warn; -use super::{Lock, Watch, WatchEvent}; +use crate::state::backend::{Lock, StateBackendClient, Watch, WatchEvent}; -/// A [`ConfigBackendClient`] implementation that uses etcd to save cluster configuration. +/// A [`StateBackendClient`] implementation that uses etcd to save cluster configuration. #[derive(Clone)] pub struct EtcdClient { etcd: etcd_client::Client, @@ -41,7 +40,7 @@ impl EtcdClient { } #[tonic::async_trait] -impl ConfigBackendClient for EtcdClient { +impl StateBackendClient for EtcdClient { async fn get(&self, key: &str) -> Result> { Ok(self .etcd diff --git a/ballista/rust/scheduler/src/state/backend/mod.rs b/ballista/rust/scheduler/src/state/backend/mod.rs new file mode 100644 index 0000000000000..15f244b693a6e --- /dev/null +++ b/ballista/rust/scheduler/src/state/backend/mod.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ballista_core::error::Result; +use clap::ArgEnum; +use futures::Stream; +use std::fmt; +use tokio::sync::OwnedMutexGuard; + +#[cfg(feature = "etcd")] +pub mod etcd; +#[cfg(feature = "sled")] +pub mod standalone; + +// an enum used to configure the backend +// needs to be visible to code generated by configure_me +#[derive(Debug, Clone, ArgEnum, serde::Deserialize)] +pub enum StateBackend { + Etcd, + Standalone, +} + +impl std::str::FromStr for StateBackend { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + ArgEnum::from_str(s, true) + } +} + +impl parse_arg::ParseArgFromStr for StateBackend { + fn describe_type(mut writer: W) -> fmt::Result { + write!(writer, "The configuration backend for the scheduler") + } +} + +/// A trait that contains the necessary methods to save and retrieve the state and configuration of a cluster. +#[tonic::async_trait] +pub trait StateBackendClient: Send + Sync { + /// Retrieve the data associated with a specific key. + /// + /// An empty vec is returned if the key does not exist. + async fn get(&self, key: &str) -> Result>; + + /// Retrieve all data associated with a specific key. + async fn get_from_prefix(&self, prefix: &str) -> Result)>>; + + /// Saves the value into the provided key, overriding any previous data that might have been associated to that key. + async fn put(&self, key: String, value: Vec) -> Result<()>; + + async fn lock(&self) -> Result>; + + /// Watch all events that happen on a specific prefix. + async fn watch(&self, prefix: String) -> Result>; +} + +/// A Watch is a cancelable stream of put or delete events in the [StateBackendClient] +#[tonic::async_trait] +pub trait Watch: Stream + Send + Unpin { + async fn cancel(&mut self) -> Result<()>; +} + +#[derive(Debug, PartialEq)] +pub enum WatchEvent { + /// Contains the inserted or updated key and the new value + Put(String, Vec), + + /// Contains the deleted key + Delete(String), +} + +#[tonic::async_trait] +pub trait Lock: Send + Sync { + async fn unlock(&mut self); +} + +#[tonic::async_trait] +impl Lock for OwnedMutexGuard { + async fn unlock(&mut self) {} +} diff --git a/ballista/rust/scheduler/src/state/standalone.rs b/ballista/rust/scheduler/src/state/backend/standalone.rs similarity index 92% rename from ballista/rust/scheduler/src/state/standalone.rs rename to ballista/rust/scheduler/src/state/backend/standalone.rs index 8514d4cf3e64c..5bb4e384132f7 100644 --- a/ballista/rust/scheduler/src/state/standalone.rs +++ b/ballista/rust/scheduler/src/state/backend/standalone.rs @@ -17,17 +17,16 @@ use std::{sync::Arc, task::Poll}; -use crate::state::ConfigBackendClient; use ballista_core::error::{ballista_error, BallistaError, Result}; use futures::{FutureExt, Stream}; use log::warn; -use sled::{Event, Subscriber}; +use sled_package as sled; use tokio::sync::Mutex; -use super::{Lock, Watch, WatchEvent}; +use crate::state::backend::{Lock, StateBackendClient, Watch, WatchEvent}; -/// A [`ConfigBackendClient`] implementation that uses file-based storage to save cluster configuration. +/// A [`StateBackendClient`] implementation that uses file-based storage to save cluster configuration. #[derive(Clone)] pub struct StandaloneClient { db: sled::Db, @@ -63,7 +62,7 @@ fn sled_to_ballista_error(e: sled::Error) -> BallistaError { } #[tonic::async_trait] -impl ConfigBackendClient for StandaloneClient { +impl StateBackendClient for StandaloneClient { async fn get(&self, key: &str) -> Result> { Ok(self .db @@ -111,7 +110,7 @@ impl ConfigBackendClient for StandaloneClient { } struct SledWatch { - subscriber: Subscriber, + subscriber: sled::Subscriber, } #[tonic::async_trait] @@ -131,11 +130,11 @@ impl Stream for SledWatch { match self.get_mut().subscriber.poll_unpin(cx) { Poll::Pending => Poll::Pending, Poll::Ready(None) => Poll::Ready(None), - Poll::Ready(Some(Event::Insert { key, value })) => { + Poll::Ready(Some(sled::Event::Insert { key, value })) => { let key = std::str::from_utf8(&key).unwrap().to_owned(); Poll::Ready(Some(WatchEvent::Put(key, value.to_vec()))) } - Poll::Ready(Some(Event::Remove { key })) => { + Poll::Ready(Some(sled::Event::Remove { key })) => { let key = std::str::from_utf8(&key).unwrap().to_owned(); Poll::Ready(Some(WatchEvent::Delete(key))) } @@ -149,9 +148,8 @@ impl Stream for SledWatch { #[cfg(test)] mod tests { - use crate::state::{ConfigBackendClient, Watch, WatchEvent}; + use super::{StandaloneClient, StateBackendClient, Watch, WatchEvent}; - use super::StandaloneClient; use futures::StreamExt; use std::result::Result; diff --git a/ballista/rust/scheduler/src/state/in_memory_state.rs b/ballista/rust/scheduler/src/state/in_memory_state.rs new file mode 100644 index 0000000000000..4eac923717801 --- /dev/null +++ b/ballista/rust/scheduler/src/state/in_memory_state.rs @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ballista_core::serde::protobuf::{ExecutorHeartbeat, TaskStatus}; +use ballista_core::serde::scheduler::ExecutorData; +use parking_lot::RwLock; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +type JobTasks = HashMap>; + +#[derive(Clone)] +pub(crate) struct InMemorySchedulerState { + executors_heartbeat: Arc>>, + executors_data: Arc>>, + + // job -> stage -> partition + tasks: Arc>>, +} + +/// For in-memory state, we don't use async to provide related services +impl InMemorySchedulerState { + pub(crate) fn new() -> Self { + Self { + executors_heartbeat: Arc::new(RwLock::new(HashMap::new())), + executors_data: Arc::new(RwLock::new(HashMap::new())), + tasks: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub(crate) fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) { + let mut executors_heartbeat = self.executors_heartbeat.write(); + executors_heartbeat.insert(heartbeat.executor_id.clone(), heartbeat); + } + + pub(crate) fn get_executors_heartbeat(&self) -> Vec { + let executors_heartbeat = self.executors_heartbeat.read(); + executors_heartbeat + .iter() + .map(|(_exec, heartbeat)| heartbeat.clone()) + .collect() + } + + /// last_seen_ts_threshold is in seconds + pub(crate) fn get_alive_executors( + &self, + last_seen_ts_threshold: u64, + ) -> HashSet { + let executors_heartbeat = self.executors_heartbeat.read(); + executors_heartbeat + .iter() + .filter_map(|(exec, heartbeat)| { + (heartbeat.timestamp > last_seen_ts_threshold).then(|| exec.clone()) + }) + .collect() + } + + fn get_alive_executors_within_one_minute(&self) -> HashSet { + let now_epoch_ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"); + let last_seen_threshold = now_epoch_ts + .checked_sub(Duration::from_secs(60)) + .unwrap_or_else(|| Duration::from_secs(0)); + self.get_alive_executors(last_seen_threshold.as_secs()) + } + + pub(crate) fn save_executor_data(&self, executor_data: ExecutorData) { + let mut executors_data = self.executors_data.write(); + executors_data.insert(executor_data.executor_id.clone(), executor_data); + } + + pub(crate) fn get_executor_data(&self, executor_id: &str) -> Option { + let executors_data = self.executors_data.read(); + executors_data.get(executor_id).cloned() + } + + /// There are two checks: + /// 1. firstly alive + /// 2. secondly available task slots > 0 + pub(crate) fn get_available_executors_data(&self) -> Vec { + let mut res = { + let alive_executors = self.get_alive_executors_within_one_minute(); + let executors_data = self.executors_data.read(); + executors_data + .iter() + .filter_map(|(exec, data)| { + alive_executors.contains(exec).then(|| data.clone()) + }) + .collect::>() + }; + res.sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots)); + res + } + + pub(crate) fn save_task_status(&self, status: &TaskStatus) { + let task_id = status.task_id.as_ref().unwrap(); + let mut tasks = self.tasks.write(); + let job_tasks = tasks + .entry(task_id.job_id.clone()) + .or_insert_with(HashMap::new); + let stage_tasks = job_tasks + .entry(task_id.stage_id) + .or_insert_with(HashMap::new); + stage_tasks.insert(task_id.partition_id, status.clone()); + } + + pub(crate) fn _get_task( + &self, + job_id: &str, + stage_id: usize, + partition_id: usize, + ) -> Option { + let tasks = self.tasks.read(); + let job_tasks = tasks.get(job_id); + if let Some(job_tasks) = job_tasks { + let stage_id = stage_id as u32; + let stage_tasks = job_tasks.get(&stage_id); + if let Some(stage_tasks) = stage_tasks { + let partition_id = partition_id as u32; + stage_tasks.get(&partition_id).cloned() + } else { + None + } + } else { + None + } + } + + pub(crate) fn get_job_tasks(&self, job_id: &str) -> Option> { + let tasks = self.tasks.read(); + let job_tasks = tasks.get(job_id); + + if let Some(job_tasks) = job_tasks { + let mut res = vec![]; + fill_job_tasks(&mut res, job_tasks); + Some(res) + } else { + None + } + } + + pub(crate) fn get_tasks(&self) -> Vec { + let mut res = vec![]; + + let tasks = self.tasks.read(); + for (_job_id, job_tasks) in tasks.iter() { + fill_job_tasks(&mut res, job_tasks); + } + + res + } +} + +fn fill_job_tasks( + res: &mut Vec, + job_tasks: &HashMap>, +) { + for stage_tasks in job_tasks.values() { + for task_status in stage_tasks.values() { + res.push(task_status.clone()); + } + } +} diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index e30075e827eb2..e015e5a0da411 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -15,522 +15,38 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashSet; -use std::time::{SystemTime, UNIX_EPOCH}; -use std::{any::type_name, collections::HashMap, sync::Arc, time::Duration}; - -use parking_lot::RwLock; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use datafusion::physical_plan::ExecutionPlan; -use futures::Stream; + use log::{debug, error, info, warn}; -use prost::Message; -use tokio::sync::{mpsc, OwnedMutexGuard}; +use tokio::sync::mpsc; use ballista_core::error::{BallistaError, Result}; use ballista_core::execution_plans::UnresolvedShuffleExec; + use ballista_core::serde::protobuf::{ - self, job_status, task_status, CompletedJob, CompletedTask, ExecutorHeartbeat, - FailedJob, FailedTask, JobStatus, RunningJob, RunningTask, TaskStatus, + job_status, task_status, CompletedJob, CompletedTask, ExecutorHeartbeat, FailedJob, + FailedTask, JobStatus, RunningJob, RunningTask, TaskStatus, }; use ballista_core::serde::scheduler::{ ExecutorData, ExecutorMetadata, PartitionId, PartitionStats, }; -use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec}; +use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec}; use datafusion::prelude::ExecutionContext; use super::planner::remove_unresolved_shuffles; -#[cfg(feature = "etcd")] -mod etcd; -#[cfg(feature = "sled")] -mod standalone; - -use clap::ArgEnum; -#[cfg(feature = "etcd")] -pub use etcd::EtcdClient; -#[cfg(feature = "sled")] -pub use standalone::StandaloneClient; -use std::fmt; - -// an enum used to configure the backend -// needs to be visible to code generated by configure_me -#[derive(Debug, Clone, ArgEnum, serde::Deserialize)] -pub enum ConfigBackend { - Etcd, - Standalone, -} - -impl std::str::FromStr for ConfigBackend { - type Err = String; - - fn from_str(s: &str) -> std::result::Result { - ArgEnum::from_str(s, true) - } -} - -impl parse_arg::ParseArgFromStr for ConfigBackend { - fn describe_type(mut writer: W) -> fmt::Result { - write!(writer, "The configuration backend for the scheduler") - } -} - -/// A trait that contains the necessary methods to save and retrieve the state and configuration of a cluster. -#[tonic::async_trait] -pub trait ConfigBackendClient: Send + Sync { - /// Retrieve the data associated with a specific key. - /// - /// An empty vec is returned if the key does not exist. - async fn get(&self, key: &str) -> Result>; - - /// Retrieve all data associated with a specific key. - async fn get_from_prefix(&self, prefix: &str) -> Result)>>; - - /// Saves the value into the provided key, overriding any previous data that might have been associated to that key. - async fn put(&self, key: String, value: Vec) -> Result<()>; - - async fn lock(&self) -> Result>; - - /// Watch all events that happen on a specific prefix. - async fn watch(&self, prefix: String) -> Result>; -} - -/// A Watch is a cancelable stream of put or delete events in the [ConfigBackendClient] -#[tonic::async_trait] -pub trait Watch: Stream + Send + Unpin { - async fn cancel(&mut self) -> Result<()>; -} - -#[derive(Debug, PartialEq)] -pub enum WatchEvent { - /// Contains the inserted or updated key and the new value - Put(String, Vec), - - /// Contains the deleted key - Delete(String), -} - -type JobTasks = HashMap>; - -#[derive(Clone)] -struct VolatileSchedulerState { - executors_heartbeat: Arc>>, - executors_data: Arc>>, - - // job -> stage -> partition - tasks: Arc>>, -} - -/// For in-memory state, we don't use async to provide related services -impl VolatileSchedulerState { - fn new() -> Self { - Self { - executors_heartbeat: Arc::new(RwLock::new(HashMap::new())), - executors_data: Arc::new(RwLock::new(HashMap::new())), - tasks: Arc::new(RwLock::new(HashMap::new())), - } - } - - fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) { - let mut executors_heartbeat = self.executors_heartbeat.write(); - executors_heartbeat.insert(heartbeat.executor_id.clone(), heartbeat); - } - - fn get_executors_heartbeat(&self) -> Vec { - let executors_heartbeat = self.executors_heartbeat.read(); - executors_heartbeat - .iter() - .map(|(_exec, heartbeat)| heartbeat.clone()) - .collect() - } - - /// last_seen_ts_threshold is in seconds - fn get_alive_executors(&self, last_seen_ts_threshold: u64) -> HashSet { - let executors_heartbeat = self.executors_heartbeat.read(); - executors_heartbeat - .iter() - .filter_map(|(exec, heartbeat)| { - (heartbeat.timestamp > last_seen_ts_threshold).then(|| exec.clone()) - }) - .collect() - } - - fn get_alive_executors_within_one_minute(&self) -> HashSet { - let now_epoch_ts = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards"); - let last_seen_threshold = now_epoch_ts - .checked_sub(Duration::from_secs(60)) - .unwrap_or_else(|| Duration::from_secs(0)); - self.get_alive_executors(last_seen_threshold.as_secs()) - } - - fn save_executor_data(&self, executor_data: ExecutorData) { - let mut executors_data = self.executors_data.write(); - executors_data.insert(executor_data.executor_id.clone(), executor_data); - } - - fn get_executor_data(&self, executor_id: &str) -> Option { - let executors_data = self.executors_data.read(); - executors_data.get(executor_id).cloned() - } - - /// There are two checks: - /// 1. firstly alive - /// 2. secondly available task slots > 0 - fn get_available_executors_data(&self) -> Vec { - let mut res = { - let alive_executors = self.get_alive_executors_within_one_minute(); - let executors_data = self.executors_data.read(); - executors_data - .iter() - .filter_map(|(exec, data)| { - alive_executors.contains(exec).then(|| data.clone()) - }) - .collect::>() - }; - res.sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots)); - res - } - - fn save_task_status(&self, status: &TaskStatus) { - let task_id = status.task_id.as_ref().unwrap(); - let mut tasks = self.tasks.write(); - let job_tasks = tasks - .entry(task_id.job_id.clone()) - .or_insert_with(HashMap::new); - let stage_tasks = job_tasks - .entry(task_id.stage_id) - .or_insert_with(HashMap::new); - stage_tasks.insert(task_id.partition_id, status.clone()); - } - - fn _get_task( - &self, - job_id: &str, - stage_id: usize, - partition_id: usize, - ) -> Option { - let tasks = self.tasks.read(); - let job_tasks = tasks.get(job_id); - if let Some(job_tasks) = job_tasks { - let stage_id = stage_id as u32; - let stage_tasks = job_tasks.get(&stage_id); - if let Some(stage_tasks) = stage_tasks { - let partition_id = partition_id as u32; - stage_tasks.get(&partition_id).cloned() - } else { - None - } - } else { - None - } - } - - fn get_job_tasks(&self, job_id: &str) -> Option> { - let tasks = self.tasks.read(); - let job_tasks = tasks.get(job_id); - - if let Some(job_tasks) = job_tasks { - let mut res = vec![]; - VolatileSchedulerState::fill_job_tasks(&mut res, job_tasks); - Some(res) - } else { - None - } - } - - fn get_tasks(&self) -> Vec { - let mut res = vec![]; - - let tasks = self.tasks.read(); - for (_job_id, job_tasks) in tasks.iter() { - VolatileSchedulerState::fill_job_tasks(&mut res, job_tasks); - } - - res - } - - fn fill_job_tasks( - res: &mut Vec, - job_tasks: &HashMap>, - ) { - for stage_tasks in job_tasks.values() { - for task_status in stage_tasks.values() { - res.push(task_status.clone()); - } - } - } -} - -type StageKey = (String, u32); - -#[derive(Clone)] -struct StableSchedulerState { - // for db - config_client: Arc, - namespace: String, - codec: BallistaCodec, - - // for in-memory cache - executors_metadata: Arc>>, - - jobs: Arc>>, - stages: Arc>>>, -} - -impl - StableSchedulerState -{ - fn new( - config_client: Arc, - namespace: String, - codec: BallistaCodec, - ) -> Self { - Self { - config_client, - namespace, - codec, - executors_metadata: Arc::new(RwLock::new(HashMap::new())), - jobs: Arc::new(RwLock::new(HashMap::new())), - stages: Arc::new(RwLock::new(HashMap::new())), - } - } - - /// Load the state stored in storage into memory - async fn init(&self, ctx: &ExecutionContext) -> Result<()> { - self.init_executors_metadata_from_storage().await?; - self.init_jobs_from_storage().await?; - self.init_stages_from_storage(ctx).await?; - - Ok(()) - } - - async fn init_executors_metadata_from_storage(&self) -> Result<()> { - let entries = self - .config_client - .get_from_prefix(&get_executors_metadata_prefix(&self.namespace)) - .await?; - - let mut executors_metadata = self.executors_metadata.write(); - for (_key, entry) in entries { - let meta: protobuf::ExecutorMetadata = decode_protobuf(&entry)?; - executors_metadata.insert(meta.id.clone(), meta.into()); - } - - Ok(()) - } - - async fn init_jobs_from_storage(&self) -> Result<()> { - let entries = self - .config_client - .get_from_prefix(&get_job_prefix(&self.namespace)) - .await?; - - let mut jobs = self.jobs.write(); - for (key, entry) in entries { - let job: JobStatus = decode_protobuf(&entry)?; - let job_id = extract_job_id_from_job_key(&key) - .map(|job_id| job_id.to_string()) - .unwrap(); - jobs.insert(job_id, job); - } - - Ok(()) - } - - async fn init_stages_from_storage(&self, ctx: &ExecutionContext) -> Result<()> { - let entries = self - .config_client - .get_from_prefix(&get_stage_prefix(&self.namespace)) - .await?; - - let mut stages = self.stages.write(); - for (key, entry) in entries { - let (job_id, stage_id) = extract_stage_id_from_stage_key(&key).unwrap(); - let value = U::try_decode(&entry)?; - let plan = value - .try_into_physical_plan(ctx, self.codec.physical_extension_codec())?; - - stages.insert((job_id, stage_id), plan); - } - - Ok(()) - } - - pub async fn save_executor_metadata( - &self, - executor_meta: ExecutorMetadata, - ) -> Result<()> { - { - // Save in db - let key = get_executor_metadata_key(&self.namespace, &executor_meta.id); - let value = { - let executor_meta: protobuf::ExecutorMetadata = - executor_meta.clone().into(); - encode_protobuf(&executor_meta)? - }; - self.synchronize_save(key, value).await?; - } - - { - // Save in memory - let mut executors_metadata = self.executors_metadata.write(); - executors_metadata.insert(executor_meta.id.clone(), executor_meta); - } - - Ok(()) - } - - fn get_executor_metadata(&self, executor_id: &str) -> Option { - let executors_metadata = self.executors_metadata.read(); - executors_metadata.get(executor_id).cloned() - } - - fn get_executors_metadata(&self) -> Vec { - let executors_metadata = self.executors_metadata.read(); - executors_metadata.values().cloned().collect() - } - - async fn save_job_metadata(&self, job_id: &str, status: &JobStatus) -> Result<()> { - debug!("Saving job metadata: {:?}", status); - { - // Save in db - let key = get_job_key(&self.namespace, job_id); - let value = encode_protobuf(status)?; - self.synchronize_save(key, value).await?; - } - - { - // Save in memory - let mut jobs = self.jobs.write(); - jobs.insert(job_id.to_string(), status.clone()); - } - - Ok(()) - } - - fn get_job_metadata(&self, job_id: &str) -> Option { - let jobs = self.jobs.read(); - jobs.get(job_id).cloned() - } - - async fn save_stage_plan( - &self, - job_id: &str, - stage_id: usize, - plan: Arc, - ) -> Result<()> { - { - // Save in db - let key = get_stage_plan_key(&self.namespace, job_id, stage_id as u32); - let value = { - let mut buf: Vec = vec![]; - let proto = U::try_from_physical_plan( - plan.clone(), - self.codec.physical_extension_codec(), - )?; - proto.try_encode(&mut buf)?; - - buf - }; - self.synchronize_save(key, value).await?; - } - - { - // Save in memory - let mut stages = self.stages.write(); - stages.insert((job_id.to_string(), stage_id as u32), plan); - } - - Ok(()) - } - - fn get_stage_plan( - &self, - job_id: &str, - stage_id: usize, - ) -> Option> { - let stages = self.stages.read(); - let key = (job_id.to_string(), stage_id as u32); - stages.get(&key).cloned() - } - - async fn synchronize_save(&self, key: String, value: Vec) -> Result<()> { - let mut lock = self.config_client.lock().await?; - self.config_client.put(key, value).await?; - lock.unlock().await; - - Ok(()) - } -} - -fn get_executors_metadata_prefix(namespace: &str) -> String { - format!("/ballista/{}/executor_metadata", namespace) -} +use crate::state::backend::StateBackendClient; +use crate::state::in_memory_state::InMemorySchedulerState; +use crate::state::persistent_state::PersistentSchedulerState; -fn get_executor_metadata_key(namespace: &str, id: &str) -> String { - format!("{}/{}", get_executors_metadata_prefix(namespace), id) -} - -fn get_job_prefix(namespace: &str) -> String { - format!("/ballista/{}/jobs", namespace) -} - -fn get_job_key(namespace: &str, id: &str) -> String { - format!("{}/{}", get_job_prefix(namespace), id) -} - -fn get_stage_prefix(namespace: &str) -> String { - format!("/ballista/{}/stages", namespace,) -} - -fn get_stage_plan_key(namespace: &str, job_id: &str, stage_id: u32) -> String { - format!("{}/{}/{}", get_stage_prefix(namespace), job_id, stage_id,) -} - -fn extract_job_id_from_job_key(job_key: &str) -> Result<&str> { - job_key.split('/').nth(2).ok_or_else(|| { - BallistaError::Internal(format!("Unexpected task key: {}", job_key)) - }) -} - -fn extract_stage_id_from_stage_key(stage_key: &str) -> Result { - let splits: Vec<&str> = stage_key.split('/').collect(); - if splits.len() < 4 { - Err(BallistaError::Internal(format!( - "Unexpected stage key: {}", - stage_key - ))) - } else { - Ok(( - splits.get(2).unwrap().to_string(), - splits.get(3).unwrap().parse::().unwrap(), - )) - } -} - -fn decode_protobuf(bytes: &[u8]) -> Result { - T::decode(bytes).map_err(|e| { - BallistaError::Internal(format!( - "Could not deserialize {}: {}", - type_name::(), - e - )) - }) -} - -fn encode_protobuf(msg: &T) -> Result> { - let mut value: Vec = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut value).map_err(|e| { - BallistaError::Internal(format!( - "Could not serialize {}: {}", - type_name::(), - e - )) - })?; - Ok(value) -} +pub mod backend; +mod in_memory_state; +mod persistent_state; +pub mod task_scheduler; #[derive(Clone)] struct SchedulerStateWatcher { @@ -590,22 +106,26 @@ impl SchedulerStateWatcher { #[derive(Clone)] pub(super) struct SchedulerState { - stable_state: StableSchedulerState, - volatile_state: VolatileSchedulerState, + persistent_state: PersistentSchedulerState, + in_memory_state: InMemorySchedulerState, listener: SchedulerStateWatcher, } impl SchedulerState { pub fn new( - config_client: Arc, + config_client: Arc, namespace: String, codec: BallistaCodec, ) -> Self { // TODO Make the buffer size configurable let (tx_task, rx_task) = mpsc::channel::(1000); let ret = Self { - stable_state: StableSchedulerState::new(config_client, namespace, codec), - volatile_state: VolatileSchedulerState::new(), + persistent_state: PersistentSchedulerState::new( + config_client, + namespace, + codec, + ), + in_memory_state: InMemorySchedulerState::new(), listener: SchedulerStateWatcher { tx_task }, }; ret.listener @@ -615,13 +135,13 @@ impl SchedulerState Result<()> { - self.stable_state.init(ctx).await?; + self.persistent_state.init(ctx).await?; Ok(()) } pub fn get_codec(&self) -> &BallistaCodec { - &self.stable_state.codec + &self.persistent_state.codec } pub async fn get_executors_metadata( @@ -630,13 +150,13 @@ impl SchedulerState>(); - let executors_metadata = self.stable_state.get_executors_metadata(); + let executors_metadata = self.persistent_state.get_executors_metadata(); let now_epoch_ts = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -670,7 +190,7 @@ impl SchedulerState SchedulerState Option { - self.stable_state.get_executor_metadata(executor_id) + self.persistent_state.get_executor_metadata(executor_id) } pub async fn save_executor_metadata( &self, executor_meta: ExecutorMetadata, ) -> Result<()> { - self.stable_state + self.persistent_state .save_executor_metadata(executor_meta) .await } pub fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) { - self.volatile_state.save_executor_heartbeat(heartbeat); + self.in_memory_state.save_executor_heartbeat(heartbeat); } pub fn save_executor_data(&self, executor_data: ExecutorData) { - self.volatile_state.save_executor_data(executor_data); + self.in_memory_state.save_executor_data(executor_data); } pub fn get_available_executors_data(&self) -> Vec { - self.volatile_state.get_available_executors_data() + self.in_memory_state.get_available_executors_data() } pub fn get_executor_data(&self, executor_id: &str) -> Option { - self.volatile_state.get_executor_data(executor_id) + self.in_memory_state.get_executor_data(executor_id) } pub async fn save_job_metadata( @@ -720,11 +240,13 @@ impl SchedulerState Result<()> { - self.stable_state.save_job_metadata(job_id, status).await + self.persistent_state + .save_job_metadata(job_id, status) + .await } pub fn get_job_metadata(&self, job_id: &str) -> Option { - self.stable_state.get_job_metadata(job_id) + self.persistent_state.get_job_metadata(job_id) } pub async fn save_stage_plan( @@ -733,7 +255,7 @@ impl SchedulerState, ) -> Result<()> { - self.stable_state + self.persistent_state .save_stage_plan(job_id, stage_id, plan) .await } @@ -743,11 +265,11 @@ impl SchedulerState Option> { - self.stable_state.get_stage_plan(job_id, stage_id) + self.persistent_state.get_stage_plan(job_id, stage_id) } pub async fn save_task_status(&self, status: &TaskStatus) -> Result<()> { - self.volatile_state.save_task_status(status); + self.in_memory_state.save_task_status(status); self.listener.watch(status.clone()).await?; Ok(()) @@ -759,16 +281,16 @@ impl SchedulerState Option { - self.volatile_state + self.in_memory_state ._get_task(job_id, stage_id, partition_id) } pub fn get_job_tasks(&self, job_id: &str) -> Option> { - self.volatile_state.get_job_tasks(job_id) + self.in_memory_state.get_job_tasks(job_id) } pub fn get_all_tasks(&self) -> Vec { - self.volatile_state.get_tasks() + self.in_memory_state.get_tasks() } /// This function ensures that the task wasn't assigned to an executor that died. @@ -995,7 +517,7 @@ impl SchedulerState SchedulerState, ) -> Result> { - let statuses = self.volatile_state.get_job_tasks(job_id); + let statuses = self.in_memory_state.get_job_tasks(job_id); if statuses.is_none() { return Ok(None); } @@ -1096,16 +618,6 @@ impl SchedulerState Lock for OwnedMutexGuard { - async fn unlock(&mut self) {} -} - /// Returns the unresolved shuffles in the execution plan fn find_unresolved_shuffles( plan: &Arc, @@ -1138,7 +650,7 @@ mod test { use ballista_core::serde::scheduler::{ExecutorMetadata, ExecutorSpecification}; use ballista_core::serde::BallistaCodec; - use super::{SchedulerState, StandaloneClient}; + use super::{backend::standalone::StandaloneClient, SchedulerState}; #[tokio::test] async fn executor_metadata() -> Result<(), BallistaError> { diff --git a/ballista/rust/scheduler/src/state/persistent_state.rs b/ballista/rust/scheduler/src/state/persistent_state.rs new file mode 100644 index 0000000000000..5c3417464996c --- /dev/null +++ b/ballista/rust/scheduler/src/state/persistent_state.rs @@ -0,0 +1,312 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use log::debug; +use parking_lot::RwLock; +use prost::Message; +use std::any::type_name; +use std::collections::HashMap; +use std::sync::Arc; + +use ballista_core::error::{BallistaError, Result}; + +use ballista_core::serde::protobuf::JobStatus; + +use crate::state::backend::StateBackendClient; +use ballista_core::serde::scheduler::ExecutorMetadata; +use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::ExecutionContext; + +type StageKey = (String, u32); + +#[derive(Clone)] +pub(crate) struct PersistentSchedulerState< + T: 'static + AsLogicalPlan, + U: 'static + AsExecutionPlan, +> { + // for db + config_client: Arc, + namespace: String, + pub(crate) codec: BallistaCodec, + + // for in-memory cache + executors_metadata: Arc>>, + + jobs: Arc>>, + stages: Arc>>>, +} + +impl + PersistentSchedulerState +{ + pub(crate) fn new( + config_client: Arc, + namespace: String, + codec: BallistaCodec, + ) -> Self { + Self { + config_client, + namespace, + codec, + executors_metadata: Arc::new(RwLock::new(HashMap::new())), + jobs: Arc::new(RwLock::new(HashMap::new())), + stages: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Load the state stored in storage into memory + pub(crate) async fn init(&self, ctx: &ExecutionContext) -> Result<()> { + self.init_executors_metadata_from_storage().await?; + self.init_jobs_from_storage().await?; + self.init_stages_from_storage(ctx).await?; + + Ok(()) + } + + async fn init_executors_metadata_from_storage(&self) -> Result<()> { + let entries = self + .config_client + .get_from_prefix(&get_executors_metadata_prefix(&self.namespace)) + .await?; + + let mut executors_metadata = self.executors_metadata.write(); + for (_key, entry) in entries { + let meta: protobuf::ExecutorMetadata = decode_protobuf(&entry)?; + executors_metadata.insert(meta.id.clone(), meta.into()); + } + + Ok(()) + } + + async fn init_jobs_from_storage(&self) -> Result<()> { + let entries = self + .config_client + .get_from_prefix(&get_job_prefix(&self.namespace)) + .await?; + + let mut jobs = self.jobs.write(); + for (key, entry) in entries { + let job: JobStatus = decode_protobuf(&entry)?; + let job_id = extract_job_id_from_job_key(&key) + .map(|job_id| job_id.to_string()) + .unwrap(); + jobs.insert(job_id, job); + } + + Ok(()) + } + + async fn init_stages_from_storage(&self, ctx: &ExecutionContext) -> Result<()> { + let entries = self + .config_client + .get_from_prefix(&get_stage_prefix(&self.namespace)) + .await?; + + let mut stages = self.stages.write(); + for (key, entry) in entries { + let (job_id, stage_id) = extract_stage_id_from_stage_key(&key).unwrap(); + let value = U::try_decode(&entry)?; + let plan = value + .try_into_physical_plan(ctx, self.codec.physical_extension_codec())?; + + stages.insert((job_id, stage_id), plan); + } + + Ok(()) + } + + pub(crate) async fn save_executor_metadata( + &self, + executor_meta: ExecutorMetadata, + ) -> Result<()> { + { + // Save in db + let key = get_executor_metadata_key(&self.namespace, &executor_meta.id); + let value = { + let executor_meta: protobuf::ExecutorMetadata = + executor_meta.clone().into(); + encode_protobuf(&executor_meta)? + }; + self.synchronize_save(key, value).await?; + } + + { + // Save in memory + let mut executors_metadata = self.executors_metadata.write(); + executors_metadata.insert(executor_meta.id.clone(), executor_meta); + } + + Ok(()) + } + + pub(crate) fn get_executor_metadata( + &self, + executor_id: &str, + ) -> Option { + let executors_metadata = self.executors_metadata.read(); + executors_metadata.get(executor_id).cloned() + } + + pub(crate) fn get_executors_metadata(&self) -> Vec { + let executors_metadata = self.executors_metadata.read(); + executors_metadata.values().cloned().collect() + } + + pub(crate) async fn save_job_metadata( + &self, + job_id: &str, + status: &JobStatus, + ) -> Result<()> { + debug!("Saving job metadata: {:?}", status); + { + // Save in db + let key = get_job_key(&self.namespace, job_id); + let value = encode_protobuf(status)?; + self.synchronize_save(key, value).await?; + } + + { + // Save in memory + let mut jobs = self.jobs.write(); + jobs.insert(job_id.to_string(), status.clone()); + } + + Ok(()) + } + + pub(crate) fn get_job_metadata(&self, job_id: &str) -> Option { + let jobs = self.jobs.read(); + jobs.get(job_id).cloned() + } + + pub(crate) async fn save_stage_plan( + &self, + job_id: &str, + stage_id: usize, + plan: Arc, + ) -> Result<()> { + { + // Save in db + let key = get_stage_plan_key(&self.namespace, job_id, stage_id as u32); + let value = { + let mut buf: Vec = vec![]; + let proto = U::try_from_physical_plan( + plan.clone(), + self.codec.physical_extension_codec(), + )?; + proto.try_encode(&mut buf)?; + + buf + }; + self.synchronize_save(key, value).await?; + } + + { + // Save in memory + let mut stages = self.stages.write(); + stages.insert((job_id.to_string(), stage_id as u32), plan); + } + + Ok(()) + } + + pub(crate) fn get_stage_plan( + &self, + job_id: &str, + stage_id: usize, + ) -> Option> { + let stages = self.stages.read(); + let key = (job_id.to_string(), stage_id as u32); + stages.get(&key).cloned() + } + + async fn synchronize_save(&self, key: String, value: Vec) -> Result<()> { + let mut lock = self.config_client.lock().await?; + self.config_client.put(key, value).await?; + lock.unlock().await; + + Ok(()) + } +} + +fn get_executors_metadata_prefix(namespace: &str) -> String { + format!("/ballista/{}/executor_metadata", namespace) +} + +fn get_executor_metadata_key(namespace: &str, id: &str) -> String { + format!("{}/{}", get_executors_metadata_prefix(namespace), id) +} + +fn get_job_prefix(namespace: &str) -> String { + format!("/ballista/{}/jobs", namespace) +} + +fn get_job_key(namespace: &str, id: &str) -> String { + format!("{}/{}", get_job_prefix(namespace), id) +} + +fn get_stage_prefix(namespace: &str) -> String { + format!("/ballista/{}/stages", namespace,) +} + +fn get_stage_plan_key(namespace: &str, job_id: &str, stage_id: u32) -> String { + format!("{}/{}/{}", get_stage_prefix(namespace), job_id, stage_id,) +} + +fn extract_job_id_from_job_key(job_key: &str) -> Result<&str> { + job_key.split('/').nth(2).ok_or_else(|| { + BallistaError::Internal(format!("Unexpected task key: {}", job_key)) + }) +} + +fn extract_stage_id_from_stage_key(stage_key: &str) -> Result { + let splits: Vec<&str> = stage_key.split('/').collect(); + if splits.len() < 4 { + Err(BallistaError::Internal(format!( + "Unexpected stage key: {}", + stage_key + ))) + } else { + Ok(( + splits.get(2).unwrap().to_string(), + splits.get(3).unwrap().parse::().unwrap(), + )) + } +} + +fn decode_protobuf(bytes: &[u8]) -> Result { + T::decode(bytes).map_err(|e| { + BallistaError::Internal(format!( + "Could not deserialize {}: {}", + type_name::(), + e + )) + }) +} + +fn encode_protobuf(msg: &T) -> Result> { + let mut value: Vec = Vec::with_capacity(msg.encoded_len()); + msg.encode(&mut value).map_err(|e| { + BallistaError::Internal(format!( + "Could not serialize {}: {}", + type_name::(), + e + )) + })?; + Ok(value) +} diff --git a/ballista/rust/scheduler/src/scheduler_server/task_scheduler.rs b/ballista/rust/scheduler/src/state/task_scheduler.rs similarity index 100% rename from ballista/rust/scheduler/src/scheduler_server/task_scheduler.rs rename to ballista/rust/scheduler/src/state/task_scheduler.rs