Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ballista/rust/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ doc = "Print version of this executable"
[[param]]
abbr = "b"
name = "config_backend"
type = "ballista_scheduler::state::ConfigBackend"
doc = "The configuration backend for the scheduler, see ConfigBackend::variants() for options. Default: Standalone"
default = "ballista_scheduler::state::ConfigBackend::Standalone"
type = "ballista_scheduler::state::backend::StateBackend"
doc = "The configuration backend for the scheduler, see StateBackend::variants() for options. Default: Standalone"
default = "ballista_scheduler::state::backend::StateBackend::Standalone"

[[param]]
abbr = "n"
Expand Down
3 changes: 0 additions & 3 deletions ballista/rust/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,3 @@ pub mod state;

#[cfg(test)]
pub mod test_utils;

#[cfg(feature = "sled")]
extern crate sled_package as sled;
18 changes: 9 additions & 9 deletions ballista/rust/scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ use ballista_core::{
};
use ballista_scheduler::api::{get_routes, EitherBody, Error};
#[cfg(feature = "etcd")]
use ballista_scheduler::state::EtcdClient;
use ballista_scheduler::state::backend::etcd::EtcdClient;
#[cfg(feature = "sled")]
use ballista_scheduler::state::StandaloneClient;
use ballista_scheduler::state::backend::standalone::StandaloneClient;

use ballista_scheduler::scheduler_server::SchedulerServer;
use ballista_scheduler::state::{ConfigBackend, ConfigBackendClient};
use ballista_scheduler::state::backend::{StateBackend, StateBackendClient};

use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::serde::BallistaCodec;
Expand All @@ -65,7 +65,7 @@ use config::prelude::*;
use datafusion::prelude::ExecutionContext;

async fn start_server(
config_backend: Arc<dyn ConfigBackendClient>,
config_backend: Arc<dyn StateBackendClient>,
namespace: String,
addr: SocketAddr,
policy: TaskSchedulingPolicy,
Expand Down Expand Up @@ -161,34 +161,34 @@ async fn main() -> Result<()> {
let addr = format!("{}:{}", bind_host, port);
let addr = addr.parse()?;

let client: Arc<dyn ConfigBackendClient> = match opt.config_backend {
let client: Arc<dyn StateBackendClient> = match opt.config_backend {
#[cfg(not(any(feature = "sled", feature = "etcd")))]
_ => std::compile_error!(
"To build the scheduler enable at least one config backend feature (`etcd` or `sled`)"
),
#[cfg(feature = "etcd")]
ConfigBackend::Etcd => {
StateBackend::Etcd => {
let etcd = etcd_client::Client::connect(&[opt.etcd_urls], None)
.await
.context("Could not connect to etcd")?;
Arc::new(EtcdClient::new(etcd))
}
#[cfg(not(feature = "etcd"))]
ConfigBackend::Etcd => {
StateBackend::Etcd => {
unimplemented!(
"build the scheduler with the `etcd` feature to use the etcd config backend"
)
}
#[cfg(feature = "sled")]
ConfigBackend::Standalone => {
StateBackend::Standalone => {
// TODO: Use a real file and make path is configurable
Arc::new(
StandaloneClient::try_new_temporary()
.context("Could not create standalone config backend")?,
)
}
#[cfg(not(feature = "sled"))]
ConfigBackend::Standalone => {
StateBackend::Standalone => {
unimplemented!(
"build the scheduler with the `sled` feature to use the standalone config backend"
)
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/scheduler/src/scheduler_server/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition};
use ballista_core::serde::scheduler::ExecutorData;
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};

use crate::scheduler_server::task_scheduler::TaskScheduler;
use crate::scheduler_server::ExecutorsClient;
use crate::state::task_scheduler::TaskScheduler;
use crate::state::SchedulerState;

#[derive(Clone)]
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ mod test {

use tonic::Request;

use crate::state::{SchedulerState, StandaloneClient};
use crate::state::{backend::standalone::StandaloneClient, SchedulerState};
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::{
executor_registration::OptionalHost, ExecutorRegistration, LogicalPlanNode,
Expand Down
8 changes: 4 additions & 4 deletions ballista/rust/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use datafusion::prelude::{ExecutionConfig, ExecutionContext};
use crate::scheduler_server::event_loop::{
SchedulerServerEvent, SchedulerServerEventAction,
};
use crate::state::{ConfigBackendClient, SchedulerState};
use crate::state::backend::StateBackendClient;
use crate::state::SchedulerState;

// include the generated protobuf source as a submodule
#[allow(clippy::all)]
Expand All @@ -44,7 +45,6 @@ pub mod externalscaler {
mod event_loop;
mod external_scaler;
mod grpc;
mod task_scheduler;

type ExecutorsClient = Arc<RwLock<HashMap<String, ExecutorGrpcClient<Channel>>>>;

Expand All @@ -61,7 +61,7 @@ pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP

impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T, U> {
pub fn new(
config: Arc<dyn ConfigBackendClient>,
config: Arc<dyn StateBackendClient>,
namespace: String,
ctx: Arc<RwLock<ExecutionContext>>,
codec: BallistaCodec<T, U>,
Expand All @@ -76,7 +76,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
}

pub fn new_with_policy(
config: Arc<dyn ConfigBackendClient>,
config: Arc<dyn StateBackendClient>,
namespace: String,
policy: TaskSchedulingPolicy,
ctx: Arc<RwLock<ExecutionContext>>,
Expand Down
4 changes: 3 additions & 1 deletion ballista/rust/scheduler/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use tokio::net::TcpListener;
use tokio::sync::RwLock;
use tonic::transport::Server;

use crate::{scheduler_server::SchedulerServer, state::StandaloneClient};
use crate::{
scheduler_server::SchedulerServer, state::backend::standalone::StandaloneClient,
};

pub async fn new_standalone_scheduler() -> Result<SocketAddr> {
let client = StandaloneClient::try_new_temporary()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@

use std::task::Poll;

use crate::state::ConfigBackendClient;
use ballista_core::error::{ballista_error, Result};

use etcd_client::{GetOptions, LockResponse, WatchOptions, WatchStream, Watcher};
use futures::{Stream, StreamExt};
use log::warn;

use super::{Lock, Watch, WatchEvent};
use crate::state::backend::{Lock, StateBackendClient, Watch, WatchEvent};

/// A [`ConfigBackendClient`] implementation that uses etcd to save cluster configuration.
/// A [`StateBackendClient`] implementation that uses etcd to save cluster configuration.
#[derive(Clone)]
pub struct EtcdClient {
etcd: etcd_client::Client,
Expand All @@ -41,7 +40,7 @@ impl EtcdClient {
}

#[tonic::async_trait]
impl ConfigBackendClient for EtcdClient {
impl StateBackendClient for EtcdClient {
async fn get(&self, key: &str) -> Result<Vec<u8>> {
Ok(self
.etcd
Expand Down
94 changes: 94 additions & 0 deletions ballista/rust/scheduler/src/state/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use ballista_core::error::Result;
use clap::ArgEnum;
use futures::Stream;
use std::fmt;
use tokio::sync::OwnedMutexGuard;

#[cfg(feature = "etcd")]
pub mod etcd;
#[cfg(feature = "sled")]
pub mod standalone;

// an enum used to configure the backend
// needs to be visible to code generated by configure_me
#[derive(Debug, Clone, ArgEnum, serde::Deserialize)]
pub enum StateBackend {
Etcd,
Standalone,
}

impl std::str::FromStr for StateBackend {
type Err = String;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
ArgEnum::from_str(s, true)
}
}

impl parse_arg::ParseArgFromStr for StateBackend {
fn describe_type<W: fmt::Write>(mut writer: W) -> fmt::Result {
write!(writer, "The configuration backend for the scheduler")
}
}

/// A trait that contains the necessary methods to save and retrieve the state and configuration of a cluster.
#[tonic::async_trait]
pub trait StateBackendClient: Send + Sync {
/// Retrieve the data associated with a specific key.
///
/// An empty vec is returned if the key does not exist.
async fn get(&self, key: &str) -> Result<Vec<u8>>;

/// Retrieve all data associated with a specific key.
async fn get_from_prefix(&self, prefix: &str) -> Result<Vec<(String, Vec<u8>)>>;

/// Saves the value into the provided key, overriding any previous data that might have been associated to that key.
async fn put(&self, key: String, value: Vec<u8>) -> Result<()>;

async fn lock(&self) -> Result<Box<dyn Lock>>;

/// Watch all events that happen on a specific prefix.
async fn watch(&self, prefix: String) -> Result<Box<dyn Watch>>;
}

/// A Watch is a cancelable stream of put or delete events in the [StateBackendClient]
#[tonic::async_trait]
pub trait Watch: Stream<Item = WatchEvent> + Send + Unpin {
async fn cancel(&mut self) -> Result<()>;
}

#[derive(Debug, PartialEq)]
pub enum WatchEvent {
/// Contains the inserted or updated key and the new value
Put(String, Vec<u8>),

/// Contains the deleted key
Delete(String),
}

#[tonic::async_trait]
pub trait Lock: Send + Sync {
async fn unlock(&mut self);
}

#[tonic::async_trait]
impl<T: Send + Sync> Lock for OwnedMutexGuard<T> {
async fn unlock(&mut self) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@

use std::{sync::Arc, task::Poll};

use crate::state::ConfigBackendClient;
use ballista_core::error::{ballista_error, BallistaError, Result};

use futures::{FutureExt, Stream};
use log::warn;
use sled::{Event, Subscriber};
use sled_package as sled;
use tokio::sync::Mutex;

use super::{Lock, Watch, WatchEvent};
use crate::state::backend::{Lock, StateBackendClient, Watch, WatchEvent};

/// A [`ConfigBackendClient`] implementation that uses file-based storage to save cluster configuration.
/// A [`StateBackendClient`] implementation that uses file-based storage to save cluster configuration.
#[derive(Clone)]
pub struct StandaloneClient {
db: sled::Db,
Expand Down Expand Up @@ -63,7 +62,7 @@ fn sled_to_ballista_error(e: sled::Error) -> BallistaError {
}

#[tonic::async_trait]
impl ConfigBackendClient for StandaloneClient {
impl StateBackendClient for StandaloneClient {
async fn get(&self, key: &str) -> Result<Vec<u8>> {
Ok(self
.db
Expand Down Expand Up @@ -111,7 +110,7 @@ impl ConfigBackendClient for StandaloneClient {
}

struct SledWatch {
subscriber: Subscriber,
subscriber: sled::Subscriber,
}

#[tonic::async_trait]
Expand All @@ -131,11 +130,11 @@ impl Stream for SledWatch {
match self.get_mut().subscriber.poll_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Event::Insert { key, value })) => {
Poll::Ready(Some(sled::Event::Insert { key, value })) => {
let key = std::str::from_utf8(&key).unwrap().to_owned();
Poll::Ready(Some(WatchEvent::Put(key, value.to_vec())))
}
Poll::Ready(Some(Event::Remove { key })) => {
Poll::Ready(Some(sled::Event::Remove { key })) => {
let key = std::str::from_utf8(&key).unwrap().to_owned();
Poll::Ready(Some(WatchEvent::Delete(key)))
}
Expand All @@ -149,9 +148,8 @@ impl Stream for SledWatch {

#[cfg(test)]
mod tests {
use crate::state::{ConfigBackendClient, Watch, WatchEvent};
use super::{StandaloneClient, StateBackendClient, Watch, WatchEvent};

use super::StandaloneClient;
use futures::StreamExt;
use std::result::Result;

Expand Down
Loading