Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,16 @@ message ExecutorMetadata {
uint32 port = 3;
}

message ExecutorRegistration {
string id = 1;
// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/danburkert/prost/issues/430)
// this syntax is ugly but is binary compatible with the "optional" keyword (see https://stackoverflow.com/questions/42622015/how-to-define-an-optional-field-in-protobuf-3)
oneof optional_host {
string host = 2;
}
uint32 port = 3;
}

message GetExecutorMetadataParams {}

message GetExecutorMetadataResult {
Expand Down Expand Up @@ -542,7 +552,7 @@ message TaskStatus {
}

message PollWorkParams {
ExecutorMetadata metadata = 1;
ExecutorRegistration metadata = 1;
bool can_accept_task = 2;
// All tasks must be reported until they reach the failed or completed state
repeated TaskStatus task_status = 3;
Expand Down
1 change: 0 additions & 1 deletion ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ pub struct BallistaClient {
impl BallistaClient {
/// Create a new BallistaClient to connect to the executor listening on the specified
/// host and port

pub async fn try_new(host: &str, port: u16) -> Result<Self> {
let addr = format!("http://{}:{}", host, port);
debug!("BallistaClient connecting to {}", addr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use std::sync::Arc;
use std::{any::Any, pin::Pin};

use crate::client::BallistaClient;
use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionLocation;

Expand Down
3 changes: 1 addition & 2 deletions ballista/rust/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ doc = "Local IP address to bind to."
[[param]]
name = "external_host"
type = "String"
default = "std::string::String::from(\"localhost\")"
doc = "Host name or IP address to register with scheduler so that other executors can connect to this executor."
doc = "Host name or IP address to register with scheduler so that other executors can connect to this executor. If none is provided, the scheduler will use the connecting IP address to communicate with the executor."

[[param]]
abbr = "p"
Expand Down
5 changes: 2 additions & 3 deletions ballista/rust/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion::physical_plan::ExecutionPlan;
use log::{debug, error, info, warn};
use tonic::transport::Channel;

use ballista_core::serde::scheduler::ExecutorMeta;
use ballista_core::serde::protobuf::ExecutorRegistration;
use ballista_core::{
client::BallistaClient,
serde::protobuf::{
Expand All @@ -37,10 +37,9 @@ use protobuf::CompletedTask;
pub async fn poll_loop(
mut scheduler: SchedulerGrpcClient<Channel>,
executor_client: BallistaClient,
executor_meta: ExecutorMeta,
executor_meta: ExecutorRegistration,
concurrent_tasks: usize,
) {
let executor_meta: protobuf::ExecutorMetadata = executor_meta.into();
let available_tasks_slots = Arc::new(AtomicUsize::new(concurrent_tasks));
let (task_status_sender, mut task_status_receiver) =
std::sync::mpsc::channel::<TaskStatus>();
Expand Down
14 changes: 6 additions & 8 deletions ballista/rust/executor/src/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use crate::BallistaExecutor;
use ballista_core::error::BallistaError;
use ballista_core::serde::decode_protobuf;
use ballista_core::serde::scheduler::{Action as BallistaAction, PartitionStats};
Expand Down Expand Up @@ -59,12 +58,12 @@ type FlightDataReceiver = Receiver<Result<FlightData, Status>>;
/// Service implementing the Apache Arrow Flight Protocol
#[derive(Clone)]
pub struct BallistaFlightService {
executor: Arc<BallistaExecutor>,
work_dir: String,
}

impl BallistaFlightService {
pub fn new(executor: Arc<BallistaExecutor>) -> Self {
Self { executor }
pub fn new(work_dir: String) -> Self {
Self { work_dir }
}
}

Expand Down Expand Up @@ -103,11 +102,10 @@ impl FlightService for BallistaFlightService {
);

let mut tasks: Vec<JoinHandle<Result<_, BallistaError>>> = vec![];
for part in partition.partition_id.clone() {
let work_dir = self.executor.config.work_dir.clone();
for &part in &partition.partition_id {
let mut path = PathBuf::from(&self.work_dir);
let partition = partition.clone();
tasks.push(tokio::spawn(async move {
let mut path = PathBuf::from(&work_dir);
path.push(partition.job_id);
path.push(&format!("{}", partition.stage_id));
path.push(&format!("{}", part));
Expand Down Expand Up @@ -208,7 +206,7 @@ impl FlightService for BallistaFlightService {
// fetch a partition that was previously executed by this executor
info!("FetchPartition {:?}", partition_id);

let mut path = PathBuf::from(&self.executor.config.work_dir);
let mut path = PathBuf::from(&self.work_dir);
path.push(&partition_id.job_id);
path.push(&format!("{}", partition_id.stage_id));
path.push(&format!("{}", partition_id.partition_id));
Expand Down
31 changes: 0 additions & 31 deletions ballista/rust/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,3 @@

pub mod collect;
pub mod flight_service;

#[derive(Debug, Clone)]
pub struct ExecutorConfig {
pub(crate) host: String,
pub(crate) port: u16,
/// Directory for temporary files, such as IPC files
pub(crate) work_dir: String,
pub(crate) concurrent_tasks: usize,
}

impl ExecutorConfig {
pub fn new(host: &str, port: u16, work_dir: &str, concurrent_tasks: usize) -> Self {
Self {
host: host.to_owned(),
port,
work_dir: work_dir.to_owned(),
concurrent_tasks,
}
}
}

#[allow(dead_code)]
pub struct BallistaExecutor {
pub(crate) config: ExecutorConfig,
}

impl BallistaExecutor {
pub fn new(config: ExecutorConfig) -> Self {
Self { config }
}
}
50 changes: 33 additions & 17 deletions ballista/rust/executor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

//! Ballista Rust executor binary.

use std::sync::Arc;
use std::{
net::{IpAddr, Ipv4Addr},
sync::Arc,
};

use anyhow::{Context, Result};
use arrow_flight::flight_service_server::FlightServiceServer;
Expand All @@ -28,15 +31,17 @@ use tonic::transport::Server;
use uuid::Uuid;

use ballista_core::{
client::BallistaClient, serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient,
client::BallistaClient,
serde::protobuf::{
executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
ExecutorRegistration,
},
};
use ballista_core::{
print_version, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
serde::scheduler::ExecutorMeta, BALLISTA_VERSION,
};
use ballista_executor::{
flight_service::BallistaFlightService, BallistaExecutor, ExecutorConfig,
BALLISTA_VERSION,
};
use ballista_executor::flight_service::BallistaFlightService;
use ballista_scheduler::{state::StandaloneClient, SchedulerServer};
use config::prelude::*;

Expand Down Expand Up @@ -80,7 +85,7 @@ async fn main() -> Result<()> {
.with_context(|| format!("Could not parse address: {}", addr))?;

let scheduler_host = if opt.local {
external_host.to_owned()
"localhost".to_string()
} else {
opt.scheduler_host
};
Expand All @@ -94,14 +99,16 @@ async fn main() -> Result<()> {
.into_string()
.unwrap(),
);
let config =
ExecutorConfig::new(&external_host, port, &work_dir, opt.concurrent_tasks);
info!("Running with config: {:?}", config);
info!("Running with config:");
info!("work_dir: {}", work_dir);
info!("concurrent_tasks: {}", opt.concurrent_tasks);

let executor_meta = ExecutorMeta {
let executor_meta = ExecutorRegistration {
id: Uuid::new_v4().to_string(), // assign this executor a unique ID
host: external_host.clone(),
port,
optional_host: external_host
.clone()
.map(executor_registration::OptionalHost::Host),
port: port as u32,
};

if opt.local {
Expand All @@ -117,8 +124,9 @@ async fn main() -> Result<()> {
let server = SchedulerGrpcServer::new(SchedulerServer::new(
Arc::new(client),
"ballista".to_string(),
IpAddr::V4(Ipv4Addr::LOCALHOST),
));
let addr = format!("{}:{}", bind_host, scheduler_port);
let addr = format!("localhost:{}", scheduler_port);
let addr = addr
.parse()
.with_context(|| format!("Could not parse {}", addr))?;
Expand Down Expand Up @@ -158,16 +166,24 @@ async fn main() -> Result<()> {
let scheduler = SchedulerGrpcClient::connect(scheduler_url)
.await
.context("Could not connect to scheduler")?;
let executor = Arc::new(BallistaExecutor::new(config));
let service = BallistaFlightService::new(executor);
let service = BallistaFlightService::new(work_dir);

let server = FlightServiceServer::new(service);
info!(
"Ballista v{} Rust Executor listening on {:?}",
BALLISTA_VERSION, addr
);
let server_future = tokio::spawn(Server::builder().add_service(server).serve(addr));
let client = BallistaClient::try_new(&external_host, port).await?;
let client_host = external_host.as_deref().unwrap_or_else(|| {
if bind_host == "0.0.0.0" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what the intent is here. According to https://en.wikipedia.org/wiki/0.0.0.0, binding to 0.0.0.0 means binding to "any IPv4 address at all". This seems to change that behavior and prevents the user from doing that. Perhaps you could add some documentation here to explain this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a comment to clarify. Right now the executor does a really weird thing which has a big TODO here:

https://github.com/apache/arrow-datafusion/blob/70afe4c459af33b8cb190383c923fcee09cde252/ballista/rust/executor/src/execution_loop.rs#L100-L101

Basically, the executor needs to connect to itself through a BallistaClient in order to work. If there is an external host defined, then it is clear how to connect to oneself. If not, we need to check the bind address, but since 0.0.0.0 is a meta-address, for that case we can just use localhost to connect to ourselves. Does that make sense?

I started working on the TODO to get rid of this ugliness, but then the PR would have been too big, so I was planning on tackling that separately.

// If the executor is being bound to "0.0.0.0" (which means use all ips in all eth devices)
// then use "localhost" to connect to itself through the BallistaClient
"localhost"
} else {
&bind_host
}
});
let client = BallistaClient::try_new(client_host, port).await?;
tokio::spawn(execution_loop::poll_loop(
scheduler,
client,
Expand Down
59 changes: 39 additions & 20 deletions ballista/rust/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ pub mod state;
#[cfg(test)]
pub mod test_utils;

use std::fmt;
use std::{convert::TryInto, sync::Arc};
use std::{fmt, net::IpAddr};

use ballista_core::serde::protobuf::{
execute_query_params::Query, job_status, scheduler_grpc_server::SchedulerGrpc,
ExecuteQueryParams, ExecuteQueryResult, FailedJob, FilePartitionMetadata, FileType,
GetExecutorMetadataParams, GetExecutorMetadataResult, GetFileMetadataParams,
GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus,
PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, TaskDefinition,
TaskStatus,
execute_query_params::Query, executor_registration::OptionalHost, job_status,
scheduler_grpc_server::SchedulerGrpc, ExecuteQueryParams, ExecuteQueryResult,
FailedJob, FilePartitionMetadata, FileType, GetExecutorMetadataParams,
GetExecutorMetadataResult, GetFileMetadataParams, GetFileMetadataResult,
GetJobStatusParams, GetJobStatusResult, JobStatus, PartitionId, PollWorkParams,
PollWorkResult, QueuedJob, RunningJob, TaskDefinition, TaskStatus,
};
use ballista_core::serde::scheduler::ExecutorMeta;

Expand Down Expand Up @@ -71,13 +71,18 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH};

#[derive(Clone)]
pub struct SchedulerServer {
caller_ip: IpAddr,
state: Arc<SchedulerState>,
start_time: u128,
version: String,
}

impl SchedulerServer {
pub fn new(config: Arc<dyn ConfigBackendClient>, namespace: String) -> Self {
pub fn new(
config: Arc<dyn ConfigBackendClient>,
namespace: String,
caller_ip: IpAddr,
) -> Self {
const VERSION: Option<&'static str> = option_env!("CARGO_PKG_VERSION");
let state = Arc::new(SchedulerState::new(config, namespace));
let state_clone = state.clone();
Expand All @@ -86,6 +91,7 @@ impl SchedulerServer {
tokio::spawn(async move { state_clone.synchronize_job_status_loop().await });

Self {
caller_ip,
state,
start_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand Down Expand Up @@ -131,7 +137,16 @@ impl SchedulerGrpc for SchedulerServer {
} = request.into_inner()
{
debug!("Received poll_work request for {:?}", metadata);
let metadata: ExecutorMeta = metadata.into();
let metadata: ExecutorMeta = ExecutorMeta {
id: metadata.id,
host: metadata
.optional_host
.map(|h| match h {
OptionalHost::Host(host) => host,
})
.unwrap_or_else(|| self.caller_ip.to_string()),
port: metadata.port as u16,
};
let mut lock = self.state.lock().await.map_err(|e| {
let msg = format!("Could not lock the state: {}", e);
error!("{}", msg);
Expand Down Expand Up @@ -359,12 +374,7 @@ impl SchedulerGrpc for SchedulerServer {
job_id_spawn, e
);
}
let mut planner = fail_job!(DistributedPlanner::try_new(executors)
.map_err(|e| {
let msg = format!("Could not create distributed planner: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
}));
let mut planner = DistributedPlanner::new();
let stages = fail_job!(planner
.plan_query_stages(&job_id_spawn, plan)
.map_err(|e| {
Expand Down Expand Up @@ -433,12 +443,17 @@ impl SchedulerGrpc for SchedulerServer {

#[cfg(all(test, feature = "sled"))]
mod test {
use std::sync::Arc;
use std::{
net::{IpAddr, Ipv4Addr},
sync::Arc,
};

use tonic::Request;

use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::{ExecutorMetadata, PollWorkParams};
use ballista_core::serde::protobuf::{
executor_registration::OptionalHost, ExecutorRegistration, PollWorkParams,
};

use super::{
state::{SchedulerState, StandaloneClient},
Expand All @@ -449,11 +464,15 @@ mod test {
async fn test_poll_work() -> Result<(), BallistaError> {
let state = Arc::new(StandaloneClient::try_new_temporary()?);
let namespace = "default";
let scheduler = SchedulerServer::new(state.clone(), namespace.to_owned());
let scheduler = SchedulerServer::new(
state.clone(),
namespace.to_owned(),
IpAddr::V4(Ipv4Addr::LOCALHOST),
);
let state = SchedulerState::new(state, namespace.to_string());
let exec_meta = ExecutorMetadata {
let exec_meta = ExecutorRegistration {
id: "abc".to_owned(),
host: "".to_owned(),
optional_host: Some(OptionalHost::Host("".to_owned())),
port: 0,
};
let request: Request<PollWorkParams> = Request::new(PollWorkParams {
Expand Down
Loading