From 219b8d64792ac279678f37e7e90e642d401a467e Mon Sep 17 00:00:00 2001 From: Sathis Kumar Date: Mon, 12 Apr 2021 00:12:37 +0530 Subject: [PATCH 1/3] Add simple api server in scheduler. Implements GET /executors. --- rust/ballista/rust/core/Cargo.toml | 1 + .../rust/core/src/serde/scheduler/mod.rs | 3 +- rust/ballista/rust/scheduler/Cargo.toml | 7 +- .../rust/scheduler/src/api/handlers.rs | 40 +++++++++ rust/ballista/rust/scheduler/src/api/mod.rs | 85 +++++++++++++++++++ rust/ballista/rust/scheduler/src/lib.rs | 2 + rust/ballista/rust/scheduler/src/main.rs | 43 ++++++++-- 7 files changed, 172 insertions(+), 9 deletions(-) create mode 100644 rust/ballista/rust/scheduler/src/api/handlers.rs create mode 100644 rust/ballista/rust/scheduler/src/api/mod.rs diff --git a/rust/ballista/rust/core/Cargo.toml b/rust/ballista/rust/core/Cargo.toml index b6301918a1f..f5f6f8574b3 100644 --- a/rust/ballista/rust/core/Cargo.toml +++ b/rust/ballista/rust/core/Cargo.toml @@ -34,6 +34,7 @@ async-trait = "0.1.36" futures = "0.3" log = "0.4" prost = "0.7" +serde = {version = "1", features = ["derive"]} sqlparser = "0.8" tokio = "1.0" tonic = "0.4" diff --git a/rust/ballista/rust/core/src/serde/scheduler/mod.rs b/rust/ballista/rust/core/src/serde/scheduler/mod.rs index efee82dbdf3..81d8722d7f4 100644 --- a/rust/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/rust/ballista/rust/core/src/serde/scheduler/mod.rs @@ -23,6 +23,7 @@ use arrow::array::{ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::ExecutionPlan; +use serde::Serialize; use uuid::Uuid; use super::protobuf; @@ -67,7 +68,7 @@ pub struct PartitionLocation { } /// Meta-data for an executor, used when fetching shuffle partitions from other executors -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct ExecutorMeta { pub id: String, pub host: String, diff --git a/rust/ballista/rust/scheduler/Cargo.toml b/rust/ballista/rust/scheduler/Cargo.toml index 525e28a63cc..b0213d37bda 100644 --- a/rust/ballista/rust/scheduler/Cargo.toml +++ b/rust/ballista/rust/scheduler/Cargo.toml @@ -38,14 +38,19 @@ configure_me = "0.4.0" env_logger = "0.8" etcd-client = { version = "0.6", optional = true } futures = "0.3" +http = "0.2" +http-body = "0.4" +hyper = "0.14.4" log = "0.4" parse_arg = "0.1.3" prost = "0.7" rand = "0.8" serde = {version = "1", features = ["derive"]} sled_package = { package = "sled", version = "0.34", optional = true } -tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] } +tokio = { version = "1.0", features = ["full"] } tonic = "0.4" +tower = { version = "0.4" } +warp = "0.3" arrow = { git = "https://github.com/apache/arrow", rev="46161d2" } datafusion = { git = "https://github.com/apache/arrow", rev="46161d2" } diff --git a/rust/ballista/rust/scheduler/src/api/handlers.rs b/rust/ballista/rust/scheduler/src/api/handlers.rs new file mode 100644 index 00000000000..2a992b62169 --- /dev/null +++ b/rust/ballista/rust/scheduler/src/api/handlers.rs @@ -0,0 +1,40 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::SchedulerServer; +use ballista_core::serde::protobuf::{ + scheduler_grpc_server::SchedulerGrpc, ExecutorMetadata, GetExecutorMetadataParams, + GetExecutorMetadataResult, +}; +use ballista_core::serde::scheduler::ExecutorMeta; +use tonic::{Request, Response}; +use warp::Rejection; + +pub(crate) async fn list_executors_data( + data_server: SchedulerServer, +) -> Result { + let data: Result, tonic::Status> = data_server + .get_executors_metadata(Request::new(GetExecutorMetadataParams {})) + .await; + let result = data.unwrap(); + let res: &GetExecutorMetadataResult = result.get_ref(); + let vec: &Vec = &res.metadata; + let metadata: Vec = vec + .iter() + .map(|v: &ExecutorMetadata| ExecutorMeta { + host: v.host.clone(), + port: v.port as u16, + id: v.id.clone(), + }) + .collect(); + Ok(warp::reply::json(&metadata)) +} \ No newline at end of file diff --git a/rust/ballista/rust/scheduler/src/api/mod.rs b/rust/ballista/rust/scheduler/src/api/mod.rs new file mode 100644 index 00000000000..34789e6240a --- /dev/null +++ b/rust/ballista/rust/scheduler/src/api/mod.rs @@ -0,0 +1,85 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod handlers; + +use crate::SchedulerServer; +use anyhow::Result; +use std::{ + pin::Pin, + task::{Context as TaskContext, Poll}, +}; +use warp::filters::BoxedFilter; +use warp::{Buf, Filter, Reply}; + +pub enum EitherBody { + Left(A), + Right(B), +} + +pub type Error = Box; +pub type HttpBody = dyn http_body::Body + 'static; + +impl http_body::Body for EitherBody + where + A: http_body::Body + Send + Unpin, + B: http_body::Body + Send + Unpin, + A::Error: Into, + B::Error: Into, +{ + type Data = A::Data; + type Error = Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut TaskContext<'_>, + ) -> Poll>> { + match self.get_mut() { + EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err), + EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err), + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut TaskContext<'_>, + ) -> Poll, Self::Error>> { + match self.get_mut() { + EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into), + EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into), + } + } + + fn is_end_stream(&self) -> bool { + match self { + EitherBody::Left(b) => b.is_end_stream(), + EitherBody::Right(b) => b.is_end_stream(), + } + } +} + +fn map_option_err>(err: Option>) -> Option> { + err.map(|e| e.map_err(Into::into)) +} + +fn with_data_server( + db: SchedulerServer, +) -> impl Filter + Clone { + warp::any().map(move || db.clone()) +} + +pub fn get_routes(scheduler_server: SchedulerServer) -> BoxedFilter<(impl Reply,)> { + let routes = warp::path("executors") + .and(with_data_server(scheduler_server)) + .and_then(handlers::list_executors_data); + routes.boxed() +} \ No newline at end of file diff --git a/rust/ballista/rust/scheduler/src/lib.rs b/rust/ballista/rust/scheduler/src/lib.rs index 8ad2cc7a448..6df6c9ac57c 100644 --- a/rust/ballista/rust/scheduler/src/lib.rs +++ b/rust/ballista/rust/scheduler/src/lib.rs @@ -17,6 +17,7 @@ //! Support for distributed schedulers, such as Kubernetes +pub mod api; pub mod planner; pub mod state; @@ -68,6 +69,7 @@ use self::state::{ConfigBackendClient, SchedulerState}; use datafusion::physical_plan::parquet::ParquetExec; use std::time::Instant; +#[derive(Clone)] pub struct SchedulerServer { state: SchedulerState, namespace: String, diff --git a/rust/ballista/rust/scheduler/src/main.rs b/rust/ballista/rust/scheduler/src/main.rs index 785ffb47b17..c166fdc388d 100644 --- a/rust/ballista/rust/scheduler/src/main.rs +++ b/rust/ballista/rust/scheduler/src/main.rs @@ -17,9 +17,14 @@ //! Ballista Rust scheduler binary. +use anyhow::{Context, Result}; +use futures::future::{self, Either, TryFutureExt}; +use hyper::{service::make_service_fn, Server}; +use std::convert::Infallible; use std::{net::SocketAddr, sync::Arc}; +use tonic::transport::Server as TonicServer; +use tower::Service; -use anyhow::{Context, Result}; use ballista_core::BALLISTA_VERSION; use ballista_core::{ print_version, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer, @@ -29,9 +34,9 @@ use ballista_scheduler::state::EtcdClient; #[cfg(feature = "sled")] use ballista_scheduler::state::StandaloneClient; use ballista_scheduler::{state::ConfigBackendClient, ConfigBackend, SchedulerServer}; +use ballista_scheduler::api::{get_routes, EitherBody, Error}; use log::info; -use tonic::transport::Server; #[macro_use] extern crate configure_me; @@ -56,11 +61,35 @@ async fn start_server( "Ballista v{} Scheduler listening on {:?}", BALLISTA_VERSION, addr ); - let server = - SchedulerGrpcServer::new(SchedulerServer::new(config_backend, namespace)); - Ok(Server::builder() - .add_service(server) - .serve(addr) + Ok(Server::bind(&addr) + .serve(make_service_fn(move |_| { + let scheduler_server = SchedulerServer::new(config_backend.clone(), namespace.clone()); + let scheduler_grpc_server = SchedulerGrpcServer::new(scheduler_server.clone()); + + let mut tonic = TonicServer::builder() + .add_service(scheduler_grpc_server) + .into_service(); + let mut warp = warp::service(get_routes(scheduler_server)); + + future::ok::<_, Infallible>(tower::service_fn( + move |req: hyper::Request| { + let header = req.headers().get(hyper::header::ACCEPT); + if header.is_some() && header.unwrap().eq("application/json") { + return Either::Left( + warp.call(req) + .map_ok(|res| res.map(EitherBody::Left)) + .map_err(Error::from), + ); + } + Either::Right( + tonic + .call(req) + .map_ok(|res| res.map(EitherBody::Right)) + .map_err(Error::from), + ) + }, + )) + })) .await .context("Could not start grpc server")?) } From 28edd5b715958ce706b21630752ec1937ce53504 Mon Sep 17 00:00:00 2001 From: Sathis Kumar Date: Mon, 12 Apr 2021 09:23:56 +0530 Subject: [PATCH 2/3] Add small documentation explaining REST api --- rust/ballista/rust/scheduler/README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/rust/ballista/rust/scheduler/README.md b/rust/ballista/rust/scheduler/README.md index facc6d17698..c2cc090bd67 100644 --- a/rust/ballista/rust/scheduler/README.md +++ b/rust/ballista/rust/scheduler/README.md @@ -30,3 +30,14 @@ $ RUST_LOG=info cargo run --release ``` By default, the scheduler will bind to `localhost` and listen on port `50051`. + +## Connecting to Scheduler +Scheduler supports REST model also using content negotiation. +For e.x if you want to get list of executors connected to the scheduler, +you can do (assuming you use default config) + +```bash +curl --request GET \ + --url http://localhost:50050/executors \ + --header 'Accept: application/json' +``` From f9de3ca13b0f76856b1e3fb0d292562d6989d4c5 Mon Sep 17 00:00:00 2001 From: Sathis Kumar Date: Mon, 12 Apr 2021 16:42:39 +0530 Subject: [PATCH 3/3] Add new line at the end of the file --- rust/ballista/rust/scheduler/src/api/handlers.rs | 2 +- rust/ballista/rust/scheduler/src/api/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/ballista/rust/scheduler/src/api/handlers.rs b/rust/ballista/rust/scheduler/src/api/handlers.rs index 2a992b62169..c3450215007 100644 --- a/rust/ballista/rust/scheduler/src/api/handlers.rs +++ b/rust/ballista/rust/scheduler/src/api/handlers.rs @@ -37,4 +37,4 @@ pub(crate) async fn list_executors_data( }) .collect(); Ok(warp::reply::json(&metadata)) -} \ No newline at end of file +} diff --git a/rust/ballista/rust/scheduler/src/api/mod.rs b/rust/ballista/rust/scheduler/src/api/mod.rs index 34789e6240a..29c5cb1af67 100644 --- a/rust/ballista/rust/scheduler/src/api/mod.rs +++ b/rust/ballista/rust/scheduler/src/api/mod.rs @@ -82,4 +82,4 @@ pub fn get_routes(scheduler_server: SchedulerServer) -> BoxedFilter<(impl Reply, .and(with_data_server(scheduler_server)) .and_then(handlers::list_executors_data); routes.boxed() -} \ No newline at end of file +}