diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 32e363d7c012a..1c6275a163c75 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -495,6 +495,7 @@ message PhysicalPlanNode { CrossJoinExecNode cross_join = 19; AvroScanExecNode avro_scan = 20; PhysicalExtensionNode extension = 21; + ShuffleStreamReaderExecNode shuffle_stream_reader = 22; } } @@ -731,6 +732,14 @@ message ShuffleWriterExecNode { uint32 stage_id = 2; PhysicalPlanNode input = 3; PhysicalHashRepartition output_partitioning = 4; + bool push_shuffle = 5; + repeated ExecutorMetadata execs = 6; +} + +message ShuffleStreamReaderExecNode { + uint32 stage_id = 1; + uint64 partition_count = 2; + Schema schema = 3; } message ShuffleReaderExecNode { @@ -795,6 +804,7 @@ message Action { oneof ActionType { // Fetch a partition from an executor FetchPartition fetch_partition = 3; + PushPartition push_partition = 4; } // configuration settings @@ -819,6 +829,12 @@ message FetchPartition { string path = 4; } +message PushPartition { + string job_id = 1; + uint32 stage_id = 2; + uint32 partition_id = 3; +} + // Mapping from partition id to executor id message PartitionLocation { PartitionId partition_id = 1; diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs index aae4b2bb1bb26..074e86fa3b998 100644 --- a/ballista/rust/core/src/client.rs +++ b/ballista/rust/core/src/client.rs @@ -32,8 +32,9 @@ use crate::serde::scheduler::{ }; use arrow_flight::utils::flight_data_to_arrow_batch; -use arrow_flight::Ticket; use arrow_flight::{flight_service_client::FlightServiceClient, FlightData}; +use arrow_flight::{FlightDescriptor, SchemaAsIpc, Ticket}; +use datafusion::arrow::ipc::writer::IpcWriteOptions; use datafusion::arrow::{ array::{StringArray, StructArray}, datatypes::{Schema, SchemaRef}, @@ -44,7 +45,7 @@ use datafusion::physical_plan::common::collect; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use datafusion::{logical_plan::LogicalPlan, physical_plan::RecordBatchStream}; use futures::{Stream, StreamExt}; -use log::debug; +use log::{debug, warn}; use prost::Message; use tonic::Streaming; use uuid::Uuid; @@ -92,6 +93,58 @@ impl BallistaClient { self.execute_action(&action).await } + /// Push a partition to an executor + pub async fn push_partition( + &mut self, + job_id: String, + stage_id: usize, + partition_id: usize, + input: SendableRecordBatchStream, + ) -> Result { + let action = Action::PushPartition { + job_id: job_id.to_string(), + stage_id, + partition_id, + }; + + let serialized_action: protobuf::Action = action.try_into()?; + let mut cmd: Vec = Vec::with_capacity(serialized_action.encoded_len()); + serialized_action + .encode(&mut cmd) + .map_err(|e| BallistaError::General(format!("{:?}", e)))?; + + // add an initial FlightData message that sends schema and cmd + let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); + let schema_flight_data = + SchemaAsIpc::new(input.schema().as_ref(), &options).into(); + + let fd = Some(FlightDescriptor::new_cmd(cmd)); + let cmd_data = FlightData { + flight_descriptor: fd, + ..schema_flight_data + }; + + let flight_stream = Arc::new(Mutex::new(RecordBatchToFlightDataStream::new( + input, cmd_data, options, + ))); + let flight_stream_ref = RecordBatchToFlightDataStreamRef { + batch_to_stream: flight_stream.clone(), + }; + + self.flight_client + .do_put(flight_stream_ref) + .await + .map_err(|e| BallistaError::General(format!("{:?}", e)))? + .into_inner(); + + let metrics = flight_stream.lock(); + Ok(PartitionStats::new( + Some(metrics.num_rows), + Some(metrics.num_batches), + Some(metrics.num_bytes), + )) + } + /// Execute an action and retrieve the results pub async fn execute_action( &mut self, @@ -179,3 +232,80 @@ impl RecordBatchStream for FlightDataStream { self.schema.clone() } } + +struct RecordBatchToFlightDataStreamRef { + batch_to_stream: Arc>, +} + +struct RecordBatchToFlightDataStream { + num_batches: u64, + num_rows: u64, + num_bytes: u64, + batch: SendableRecordBatchStream, + buffered_data: Vec, + options: IpcWriteOptions, +} + +impl RecordBatchToFlightDataStream { + pub fn new( + batch: SendableRecordBatchStream, + schema: FlightData, + options: IpcWriteOptions, + ) -> Self { + Self { + num_batches: 0, + num_rows: 0, + num_bytes: 0, + batch, + buffered_data: vec![schema], + options, + } + } +} + +impl Stream for RecordBatchToFlightDataStreamRef { + type Item = FlightData; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let mut self_mut = self.get_mut().batch_to_stream.lock(); + if let Some(event) = self_mut.buffered_data.pop() { + Poll::Ready(Some(event)) + } else { + loop { + match self_mut.batch.poll_next_unpin(cx) { + Poll::Ready(Some(Err(e))) => { + warn!("Error when poll input batches : {}", e); + continue; + } + Poll::Ready(Some(Ok(record_batch))) => { + self_mut.num_batches += 1; + self_mut.num_rows += record_batch.num_rows() as u64; + let num_bytes: usize = record_batch + .columns() + .iter() + .map(|array| array.get_array_memory_size()) + .sum(); + self_mut.num_bytes += num_bytes as u64; + let converted_chunk = + arrow_flight::utils::flight_data_from_arrow_batch( + &record_batch, + &self_mut.options, + ); + self_mut.buffered_data.extend(converted_chunk.0.into_iter()); + self_mut.buffered_data.push(converted_chunk.1); + if let Some(event) = self_mut.buffered_data.pop() { + return Poll::Ready(Some(event)); + } else { + continue; + } + } + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + } + } + } + } +} diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs index fa60231ea8d1c..aee800e529770 100644 --- a/ballista/rust/core/src/config.rs +++ b/ballista/rust/core/src/config.rs @@ -201,8 +201,8 @@ impl BallistaConfig { // needs to be visible to code generated by configure_me #[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)] pub enum TaskSchedulingPolicy { - PullStaged, - PushStaged, + Pull, + Push, } impl std::str::FromStr for TaskSchedulingPolicy { diff --git a/ballista/rust/core/src/execution_plans/mod.rs b/ballista/rust/core/src/execution_plans/mod.rs index b10ff341e9032..75798614d2e84 100644 --- a/ballista/rust/core/src/execution_plans/mod.rs +++ b/ballista/rust/core/src/execution_plans/mod.rs @@ -20,10 +20,13 @@ mod distributed_query; mod shuffle_reader; +mod shuffle_stream_reader; mod shuffle_writer; mod unresolved_shuffle; pub use distributed_query::DistributedQueryExec; pub use shuffle_reader::ShuffleReaderExec; +pub use shuffle_stream_reader::ShuffleStreamReaderExec; +pub use shuffle_writer::OutputLocation; pub use shuffle_writer::ShuffleWriterExec; pub use unresolved_shuffle::UnresolvedShuffleExec; diff --git a/ballista/rust/core/src/execution_plans/shuffle_stream_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_stream_reader.rs new file mode 100644 index 0000000000000..986c000ffe768 --- /dev/null +++ b/ballista/rust/core/src/execution_plans/shuffle_stream_reader.rs @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cell::Cell; +use std::fmt::Formatter; +use std::sync::{Arc, Mutex}; +use std::{any::Any, pin::Pin}; + +use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::error::Result as ArrowResult; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::physical_plan::metrics::{ + ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, +}; +use datafusion::physical_plan::stream::RecordBatchReceiverStream; +use datafusion::physical_plan::{ + DisplayFormatType, ExecutionPlan, Metric, Partitioning, SendableRecordBatchStream, + Statistics, +}; +use datafusion::{ + error::{DataFusionError, Result}, + physical_plan::RecordBatchStream, +}; +use futures::{future, Stream, StreamExt}; +use log::info; +use std::time::Instant; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::task; +use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::physical_plan::expressions::PhysicalSortExpr; + +/// ShuffleStreamReaderExec reads partitions streams that are pushed by the multiple ShuffleWriterExec +/// being executed by an executor +#[derive(Debug, Clone)] +pub struct ShuffleStreamReaderExec { + /// The query stage id which the shuffle reader depends on + pub stage_id: usize, + + /// Schema + pub(crate) schema: SchemaRef, + + /// Partition count + pub partition_count: usize, + + /// Record Batch input receiver + batch_input: Arc>>>>, + + /// Execution metrics + metrics: ExecutionPlanMetricsSet, +} + +impl ShuffleStreamReaderExec { + /// Create a new ShuffleStreamReaderExec + pub fn new(stage_id: usize, schema: SchemaRef, partition_count: usize) -> Self { + Self { + stage_id, + schema, + partition_count, + batch_input: Arc::new(Mutex::new(Vec::new())), + metrics: ExecutionPlanMetricsSet::new(), + } + } + + pub fn create_record_batch_channel(&self) -> Sender> { + let (response_tx, response_rx): ( + Sender>, + Receiver>, + ) = channel(10); + self.batch_input.lock().unwrap().push(response_rx); + response_tx + } + + /// Returns the the streaming shuffle readers in the execution plan + pub fn find_stream_shuffle_readers( + plan: Arc, + ) -> Vec { + if let Some(shuffle_reader) = + plan.as_any().downcast_ref::() + { + vec![shuffle_reader.clone()] + } else { + let readers = plan + .children() + .into_iter() + .map(|child| ShuffleStreamReaderExec::find_stream_shuffle_readers(child)) + .collect::>() + .into_iter() + .flatten() + .collect(); + readers + } + } +} + +#[async_trait] +impl ExecutionPlan for ShuffleStreamReaderExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + // TODO partitioning may be known and could be populated here + // see https://github.com/apache/arrow-datafusion/issues/758 + Partitioning::UnknownPartitioning(self.partition_count) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn relies_on_input_order(&self) -> bool { + false + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + &self, + _children: Vec>, + ) -> Result> { + Err(DataFusionError::Plan( + "Ballista ShuffleStreamReaderExec does not support with_new_children()" + .to_owned(), + )) + } + + async fn execute( + &self, + partition: usize, + _runtime: Arc, + ) -> Result { + info!("ShuffleStreamReaderExec::execute({})", partition); + let output_rows = MetricBuilder::new(&self.metrics).output_rows(partition); + let (sender, receiver): ( + Sender>, + Receiver>, + ) = channel(2); + + let schema = &self.schema; + let mut rx = self.batch_input.lock().unwrap().pop().unwrap(); + let join_handle = task::spawn(async move { + while let Some(batch) = rx.recv().await { + output_rows.add(batch.as_ref().unwrap().num_rows()); + sender.send(batch).await.ok(); + } + }); + Ok(RecordBatchReceiverStream::create( + schema, + receiver, + Some(join_handle), + )) + + // let schema = &self.schema; + // let rx = self.batch_receiver.lock().unwrap().pop().unwrap(); + // let join_handle = tokio::task::spawn(async move {}); + // Ok(RecordBatchReceiverStream::create(schema, rx, join_handle)) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "ShuffleStreamReaderExec") + } + } + } + + fn statistics(&self) -> Statistics { + // TODD need to implement + Statistics::default() + } +} diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index fbce0653a9cc0..00cab329279aa 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -20,8 +20,6 @@ //! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query //! will use the ShuffleReaderExec to read these results. -use datafusion::physical_plan::expressions::PhysicalSortExpr; -use parking_lot::Mutex; use std::fs::File; use std::iter::Iterator; use std::path::PathBuf; @@ -32,8 +30,9 @@ use std::{any::Any, pin::Pin}; use crate::error::BallistaError; use crate::utils; +use crate::client::BallistaClient; use crate::serde::protobuf::ShuffleWritePartition; -use crate::serde::scheduler::{PartitionLocation, PartitionStats}; +use crate::serde::scheduler::{ExecutorMeta, PartitionLocation, PartitionStats}; use async_trait::async_trait; use datafusion::arrow::array::{ Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder, @@ -41,18 +40,21 @@ use datafusion::arrow::array::{ }; use datafusion::arrow::compute::take; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::error::Result as ArrowResult; use datafusion::arrow::ipc::reader::FileReader; use datafusion::arrow::ipc::writer::FileWriter; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::physical_plan::common::IPCWriter; +use datafusion::physical_plan::expressions::PhysicalSortExpr; use datafusion::physical_plan::hash_utils::create_hashes; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::metrics::{ self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, }; use datafusion::physical_plan::repartition::RepartitionExec; +use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::Partitioning::RoundRobinBatch; use datafusion::physical_plan::{ DisplayFormatType, ExecutionPlan, Metric, Partitioning, RecordBatchStream, @@ -61,6 +63,7 @@ use datafusion::physical_plan::{ use futures::StreamExt; use hashbrown::HashMap; use log::{debug, info}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use uuid::Uuid; /// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and @@ -75,14 +78,20 @@ pub struct ShuffleWriterExec { stage_id: usize, /// Physical execution plan for this query stage plan: Arc, - /// Path to write output streams to - work_dir: String, + /// output location to write output streams to + pub output_loc: OutputLocation, /// Optional shuffle output partitioning shuffle_output_partitioning: Option, /// Execution metrics metrics: ExecutionPlanMetricsSet, } +#[derive(Debug, Clone)] +pub enum OutputLocation { + LocalDir(String), + Executors(Vec), +} + #[derive(Debug, Clone)] struct ShuffleWriteMetrics { /// Time spend writing batches to shuffle files @@ -110,6 +119,24 @@ impl ShuffleWriteMetrics { impl ShuffleWriterExec { /// Create a new shuffle writer pub fn try_new( + job_id: String, + stage_id: usize, + plan: Arc, + output_loc: OutputLocation, + shuffle_output_partitioning: Option, + ) -> Result { + Ok(Self { + job_id, + stage_id, + plan, + output_loc, + shuffle_output_partitioning, + metrics: ExecutionPlanMetricsSet::new(), + }) + } + + /// Create a new shuffle writer for pull based shuffle + pub fn try_new_pull_shuffle( job_id: String, stage_id: usize, plan: Arc, @@ -120,7 +147,25 @@ impl ShuffleWriterExec { job_id, stage_id, plan, - work_dir, + output_loc: OutputLocation::LocalDir(work_dir), + shuffle_output_partitioning, + metrics: ExecutionPlanMetricsSet::new(), + }) + } + + /// Create a new shuffle writer for push based shuffle + pub fn try_new_push_shuffle( + job_id: String, + stage_id: usize, + plan: Arc, + execs: Vec, + shuffle_output_partitioning: Option, + ) -> Result { + Ok(Self { + job_id, + stage_id, + plan, + output_loc: OutputLocation::Executors(execs), shuffle_output_partitioning, metrics: ExecutionPlanMetricsSet::new(), }) @@ -141,6 +186,14 @@ impl ShuffleWriterExec { self.shuffle_output_partitioning.as_ref() } + /// Is push based shuffle + pub fn is_push_shuffle(&self) -> bool { + match self.output_loc { + OutputLocation::LocalDir(_) => false, + OutputLocation::Executors(_) => true, + } + } + pub async fn execute_shuffle_write( &self, input_partition: usize, @@ -149,30 +202,59 @@ impl ShuffleWriterExec { let now = Instant::now(); let mut stream = self.plan.execute(input_partition, runtime).await?; - - let mut path = PathBuf::from(&self.work_dir); - path.push(&self.job_id); - path.push(&format!("{}", self.stage_id)); - let write_metrics = ShuffleWriteMetrics::new(input_partition, &self.metrics); - match &self.shuffle_output_partitioning { None => { let timer = write_metrics.write_time.timer(); - path.push(&format!("{}", input_partition)); - std::fs::create_dir_all(&path)?; - path.push("data.arrow"); - let path = path.to_str().unwrap(); - info!("Writing results to {}", path); - - // stream results to disk - let stats = utils::write_stream_to_disk( - &mut stream, - path, - &write_metrics.write_time, - ) - .await - .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + let (stats, path) = match &self.output_loc { + OutputLocation::LocalDir(work_dir) => { + let mut path = PathBuf::from(work_dir); + path.push(&self.job_id); + path.push(&format!("{}", self.stage_id)); + + path.push(&format!("{}", input_partition)); + std::fs::create_dir_all(&path)?; + path.push("data.arrow"); + let path = path.to_str().unwrap(); + info!("Writing results to local path {}", path); + + // stream results to disk + let stats = utils::write_stream_to_disk( + &mut stream, + path, + &write_metrics.write_time, + ) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + + (stats, path.to_string()) + } + + OutputLocation::Executors(execs) => { + assert_eq!(execs.len(), 1); + + let executor = execs[0].to_owned(); + info!( + "Writing results to host {}, port {}", + executor.host.as_str(), + executor.port + ); + + // stream results to network + let stats = utils::write_stream_to_flight( + stream, + executor.host.as_str(), + executor.port, + self.job_id.clone(), + self.stage_id, + 0, + &write_metrics.write_time, + ) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + (stats, String::from("")) + } + }; write_metrics .input_rows @@ -191,7 +273,7 @@ impl ShuffleWriterExec { Ok(vec![ShuffleWritePartition { partition_id: input_partition as u64, - path: path.to_owned(), + path, num_batches: stats.num_batches.unwrap_or(0), num_rows: stats.num_rows.unwrap_or(0), num_bytes: stats.num_bytes.unwrap_or(0), @@ -203,7 +285,7 @@ impl ShuffleWriterExec { // we won't necessary produce output for every possible partition, so we // create writers on demand - let mut writers: Vec> = vec![]; + let mut writers: Vec> = vec![]; for _ in 0..num_output_partitions { writers.push(None); } @@ -252,6 +334,8 @@ impl ShuffleWriterExec { let output_batch = RecordBatch::try_new(input_batch.schema(), columns)?; + let num_rows = output_batch.num_rows(); + // write non-empty batch out //TODO optimize so we don't write or fetch empty partitions @@ -259,24 +343,53 @@ impl ShuffleWriterExec { let timer = write_metrics.write_time.timer(); match &mut writers[output_partition] { Some(w) => { - w.write(&output_batch)?; + w.write(output_batch).await?; } None => { - let mut path = path.clone(); - path.push(&format!("{}", output_partition)); - std::fs::create_dir_all(&path)?; - - path.push(format!("data-{}.arrow", input_partition)); - info!("Writing results to {:?}", path); - - let mut writer = - IPCWriter::new(&path, stream.schema().as_ref())?; - - writer.write(&output_batch)?; - writers[output_partition] = Some(writer); + // create proper shuffle writer + match &self.output_loc { + OutputLocation::LocalDir(work_dir) => { + let mut path = PathBuf::from(work_dir); + path.push(&self.job_id); + path.push(&format!("{}", self.stage_id)); + + path.push(&format!("{}", output_partition)); + std::fs::create_dir_all(&path)?; + + path.push(format!( + "data-{}.arrow", + input_partition + )); + info!("Writing results to {:?}", path); + + let mut writer = IPCWriter::new( + &path, + stream.schema().as_ref(), + )?; + writer.write(&output_batch)?; + writers[output_partition] = + Some(ShuffleWriter::File(Box::from(writer))); + } + OutputLocation::Executors(execs) => { + assert_eq!(execs.len(), num_output_partitions); + + let exec = &execs[output_partition]; + let mut writer = FlightShuffleWriter::new( + exec.host.clone(), + exec.port, + self.job_id.clone(), + self.stage_id, + output_partition, + &stream.schema(), + )?; + writer.write(output_batch).await?; + writers[output_partition] = + Some(ShuffleWriter::Flight(writer)); + } + } } } - write_metrics.output_rows.add(output_batch.num_rows()); + write_metrics.output_rows.add(num_rows); timer.done(); } } @@ -288,20 +401,20 @@ impl ShuffleWriterExec { Some(w) => { w.finish()?; info!( - "Finished writing shuffle partition {} at {:?}. Batches: {}. Rows: {}. Bytes: {}.", - i, - w.path(), - w.num_batches, - w.num_rows, - w.num_bytes - ); + "Finished writing shuffle partition {} at {}. Batches: {}. Rows: {}. Bytes: {}.", + i, + w.path(), + w.num_batches(), + w.num_rows(), + w.num_bytes() + ); part_locs.push(ShuffleWritePartition { partition_id: i as u64, - path: w.path().to_string_lossy().to_string(), - num_batches: w.num_batches, - num_rows: w.num_rows, - num_bytes: w.num_bytes, + path: w.path().to_owned(), + num_batches: w.num_batches(), + num_rows: w.num_rows(), + num_bytes: w.num_bytes(), }); } None => {} @@ -351,12 +464,12 @@ impl ExecutionPlan for ShuffleWriterExec { &self, children: Vec>, ) -> Result> { - assert!(children.len() == 1); + assert_eq!(children.len(), 1); Ok(Arc::new(ShuffleWriterExec::try_new( self.job_id.clone(), self.stage_id, children[0].clone(), - self.work_dir.clone(), + self.output_loc.clone(), self.shuffle_output_partitioning.clone(), )?)) } @@ -446,6 +559,122 @@ fn result_schema() -> SchemaRef { ])) } +/// Different Shuffle writers +enum ShuffleWriter { + File(Box), + Flight(FlightShuffleWriter), +} + +impl ShuffleWriter { + async fn write(&mut self, batch: RecordBatch) -> Result<()> { + match self { + ShuffleWriter::File(writer) => writer.write(&batch), + ShuffleWriter::Flight(writer) => writer.write(batch).await, + } + } + + fn finish(&mut self) -> Result<()> { + match self { + ShuffleWriter::File(writer) => writer.finish(), + ShuffleWriter::Flight(writer) => writer.finish(), + } + } + + fn path(&self) -> &str { + match self { + ShuffleWriter::File(writer) => writer.path().to_str().unwrap_or(""), + ShuffleWriter::Flight(writer) => writer.path(), + } + } + + pub fn num_batches(&self) -> u64 { + match self { + ShuffleWriter::File(writer) => writer.num_batches, + ShuffleWriter::Flight(writer) => writer.num_batches, + } + } + + pub fn num_rows(&self) -> u64 { + match self { + ShuffleWriter::File(writer) => writer.num_rows, + ShuffleWriter::Flight(writer) => writer.num_rows, + } + } + + pub fn num_bytes(&self) -> u64 { + match self { + ShuffleWriter::File(writer) => writer.num_bytes, + ShuffleWriter::Flight(writer) => writer.num_bytes, + } + } +} + +struct FlightShuffleWriter { + pub num_batches: u64, + pub num_rows: u64, + pub num_bytes: u64, + pub sender: Sender>, +} + +impl FlightShuffleWriter { + fn new( + host: String, + port: u16, + job_id: String, + stage_id: usize, + partition_id: usize, + schema: &SchemaRef, + ) -> Result { + let (sender, receiver): ( + Sender>, + Receiver>, + ) = channel(2); + + let stream = RecordBatchReceiverStream::create(schema, receiver, None); + + tokio::task::spawn(async move { + let mut client = BallistaClient::try_new(host.as_str(), port) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + + let stat = client + .push_partition(job_id, stage_id, partition_id, stream) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e))); + stat + }); + + Ok(Self { + num_batches: 0, + num_rows: 0, + num_bytes: 0, + sender, + }) + } + + async fn write(&mut self, batch: RecordBatch) -> Result<()> { + let num_rows = batch.num_rows(); + let num_bytes: usize = batch + .columns() + .iter() + .map(|array| array.get_array_memory_size()) + .sum(); + self.sender.send(ArrowResult::Ok(batch)).await.ok(); + self.num_batches += 1; + self.num_rows += num_rows as u64; + self.num_bytes += num_bytes as u64; + Ok(()) + } + + fn finish(&mut self) -> Result<()> { + Ok(()) + } + + fn path(&self) -> &str { + "" + } +} + #[cfg(test)] mod tests { use super::*; @@ -462,7 +691,7 @@ mod tests { let input_plan = Arc::new(CoalescePartitionsExec::new(create_input_plan()?)); let work_dir = TempDir::new()?; - let query_stage = ShuffleWriterExec::try_new( + let query_stage = ShuffleWriterExec::try_new_pull_shuffle( "jobOne".to_owned(), 1, input_plan, @@ -516,7 +745,7 @@ mod tests { let input_plan = create_input_plan()?; let work_dir = TempDir::new()?; - let query_stage = ShuffleWriterExec::try_new( + let query_stage = ShuffleWriterExec::try_new_pull_shuffle( "jobOne".to_owned(), 1, input_plan, diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs index 50aa31755037b..411cb68db51b5 100644 --- a/ballista/rust/core/src/serde/physical_plan/mod.rs +++ b/ballista/rust/core/src/serde/physical_plan/mod.rs @@ -17,7 +17,8 @@ use crate::error::BallistaError; use crate::execution_plans::{ - ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec, + OutputLocation, ShuffleReaderExec, ShuffleStreamReaderExec, ShuffleWriterExec, + UnresolvedShuffleExec, }; use crate::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning; use crate::serde::protobuf::physical_expr_node::ExprType; @@ -25,7 +26,7 @@ use crate::serde::protobuf::physical_plan_node::PhysicalPlanType; use crate::serde::protobuf::repartition_exec_node::PartitionMethod; use crate::serde::protobuf::ShuffleReaderPartition; use crate::serde::protobuf::{PhysicalExtensionNode, PhysicalPlanNode}; -use crate::serde::scheduler::PartitionLocation; +use crate::serde::scheduler::{ExecutorMeta, PartitionLocation}; use crate::serde::{ byte_to_string, proto_error, protobuf, str_to_byte, AsExecutionPlan, PhysicalExtensionCodec, @@ -391,14 +392,38 @@ impl AsExecutionPlan for PhysicalPlanNode { let output_partitioning = parse_protobuf_hash_partitioning( shuffle_writer.output_partitioning.as_ref(), )?; - - Ok(Arc::new(ShuffleWriterExec::try_new( - shuffle_writer.job_id.clone(), - shuffle_writer.stage_id as usize, - input, - "".to_string(), // this is intentional but hacky - the executor will fill this in - output_partitioning, - )?)) + if !shuffle_writer.push_shuffle { + Ok(Arc::new(ShuffleWriterExec::try_new_pull_shuffle( + shuffle_writer.job_id.clone(), + shuffle_writer.stage_id as usize, + input, + "".to_string(), // this is intentional but hacky - the executor will fill this in + output_partitioning, + )?)) + } else { + let _execs: Vec = shuffle_writer + .execs + .to_owned() + .into_iter() + .map(|e| e.into()) + .collect(); + Ok(Arc::new(ShuffleWriterExec::try_new_push_shuffle( + shuffle_writer.job_id.clone(), + shuffle_writer.stage_id as usize, + input, + _execs, + output_partitioning, + )?)) + } + } + PhysicalPlanType::ShuffleStreamReader(shuffle_stream_reader) => { + let schema = Arc::new(convert_required!(shuffle_stream_reader.schema)?); + let shuffle_reader = ShuffleStreamReaderExec::new( + shuffle_stream_reader.stage_id as usize, + schema, + shuffle_stream_reader.partition_count as usize, + ); + Ok(Arc::new(shuffle_reader)) } PhysicalPlanType::ShuffleReader(shuffle_reader) => { let schema = Arc::new(convert_required!(shuffle_reader.schema)?); @@ -717,6 +742,16 @@ impl AsExecutionPlan for PhysicalPlanNode { }, )), }) + } else if let Some(exec) = plan.downcast_ref::() { + Ok(protobuf::PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::ShuffleStreamReader( + protobuf::ShuffleStreamReaderExecNode { + stage_id: exec.stage_id as u32, + partition_count: exec.partition_count as u64, + schema: Some(exec.schema().as_ref().into()), + }, + )), + }) } else if let Some(exec) = plan.downcast_ref::() { let mut partition = vec![]; for location in &exec.partition { @@ -833,16 +868,36 @@ impl AsExecutionPlan for PhysicalPlanNode { ))) } }; - Ok(protobuf::PhysicalPlanNode { - physical_plan_type: Some(PhysicalPlanType::ShuffleWriter(Box::new( - protobuf::ShuffleWriterExecNode { - job_id: exec.job_id().to_string(), - stage_id: exec.stage_id() as u32, - input: Some(Box::new(input)), - output_partitioning, - }, - ))), - }) + match &exec.output_loc { + OutputLocation::LocalDir(_) => Ok(protobuf::PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::ShuffleWriter(Box::new( + protobuf::ShuffleWriterExecNode { + job_id: exec.job_id().to_string(), + stage_id: exec.stage_id() as u32, + input: Some(Box::new(input)), + output_partitioning, + push_shuffle: false, + execs: Vec::new(), + }, + ))), + }), + OutputLocation::Executors(executors) => { + let _execs: Vec = + executors.to_owned().into_iter().map(|e| e.into()).collect(); + Ok(protobuf::PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::ShuffleWriter( + Box::new(protobuf::ShuffleWriterExecNode { + job_id: exec.job_id().to_string(), + stage_id: exec.stage_id() as u32, + input: Some(Box::new(input)), + output_partitioning, + push_shuffle: true, + execs: _execs, + }), + )), + }) + } + } } else if let Some(exec) = plan.downcast_ref::() { Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Unresolved( @@ -953,8 +1008,9 @@ mod roundtrip_tests { use super::super::super::error::Result; use super::super::protobuf; - use crate::execution_plans::ShuffleWriterExec; + use crate::execution_plans::{OutputLocation, ShuffleWriterExec}; use crate::serde::protobuf::{LogicalPlanNode, PhysicalPlanNode}; + use crate::serde::scheduler::ExecutorMeta; fn roundtrip_test(exec_plan: Arc) -> Result<()> { let ctx = ExecutionContext::new(); @@ -1112,7 +1168,7 @@ mod roundtrip_tests { let field_b = Field::new("b", DataType::Int64, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); - roundtrip_test(Arc::new(ShuffleWriterExec::try_new( + roundtrip_test(Arc::new(ShuffleWriterExec::try_new_pull_shuffle( "job123".to_string(), 123, Arc::new(EmptyExec::new(false, schema)), @@ -1120,4 +1176,25 @@ mod roundtrip_tests { Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 4)), )?)) } + + #[test] + fn roundtrip_push_shuffle_writer() -> Result<()> { + let field_a = Field::new("a", DataType::Int64, false); + let field_b = Field::new("b", DataType::Int64, false); + let schema = Arc::new(Schema::new(vec![field_a, field_b])); + let meta = ExecutorMeta { + id: "123".to_owned(), + host: "localhost".to_owned(), + port: 123, + grpc_port: 124, + }; + let output = OutputLocation::Executors(vec![meta]); + roundtrip_test(Arc::new(ShuffleWriterExec::try_new( + "job456".to_string(), + 456, + Arc::new(EmptyExec::new(false, schema)), + output, + Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 4)), + )?)) + } } diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs b/ballista/rust/core/src/serde/scheduler/from_proto.rs index 8d4e279395fa8..b7a27a365e7e2 100644 --- a/ballista/rust/core/src/serde/scheduler/from_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs @@ -38,6 +38,11 @@ impl TryInto for protobuf::Action { partition_id: fetch.partition_id as usize, path: fetch.path, }), + Some(ActionType::PushPartition(push)) => Ok(Action::PushPartition { + job_id: push.job_id, + stage_id: push.stage_id as usize, + partition_id: push.partition_id as usize, + }), _ => Err(BallistaError::General( "scheduler::from_proto(Action) invalid or missing action".to_owned(), )), diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index 43438e29dbe2e..3011a03bb79fe 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -43,6 +43,13 @@ pub enum Action { partition_id: usize, path: String, }, + + /// Push a shuffle partition + PushPartition { + job_id: String, + stage_id: usize, + partition_id: usize, + } } /// Unique identifier for the output partition of an operator. diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs b/ballista/rust/core/src/serde/scheduler/to_proto.rs index 71a02d34b75f0..dac6d2c43a567 100644 --- a/ballista/rust/core/src/serde/scheduler/to_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs @@ -44,6 +44,18 @@ impl TryInto for Action { })), settings: vec![], }), + Action::PushPartition { + job_id, + stage_id, + partition_id, + } => Ok(protobuf::Action { + action_type: Some(ActionType::PushPartition(protobuf::PushPartition { + job_id, + stage_id: stage_id as u32, + partition_id: partition_id as u32, + })), + settings: vec![], + }), } } } diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index fae3826c32d1b..1d6000e5b9b39 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -29,6 +29,7 @@ use crate::execution_plans::{ }; use crate::serde::scheduler::PartitionStats; +use crate::client::BallistaClient; use crate::config::BallistaConfig; use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec}; use async_trait::async_trait; @@ -109,6 +110,30 @@ pub async fn write_stream_to_disk( )) } +/// Stream data to executor in Arrow IPC format +// "Unnecessary" lifetime syntax due to https://github.com/rust-lang/rust/issues/63033 +pub async fn write_stream_to_flight<'a>( + stream: Pin>, + host: &'a str, + port: u16, + job_id: String, + stage_id: usize, + partition_id: usize, + flight_write_metric: &'a metrics::Time, +) -> Result { + let timer = flight_write_metric.timer(); + let mut client = BallistaClient::try_new(host, port) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + let stats = client + .push_partition(job_id, stage_id, partition_id, stream) + .await + .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + + timer.done(); + Ok(stats) +} + pub async fn collect_stream( stream: &mut Pin>, ) -> Result> { diff --git a/ballista/rust/executor/executor_config_spec.toml b/ballista/rust/executor/executor_config_spec.toml index 1dd3de99012c1..70fb6ed0aaaae 100644 --- a/ballista/rust/executor/executor_config_spec.toml +++ b/ballista/rust/executor/executor_config_spec.toml @@ -76,5 +76,5 @@ doc = "Max concurrent tasks." abbr = "s" name = "task_scheduling_policy" type = "ballista_core::config::TaskSchedulingPolicy" -doc = "The task scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: PullStaged" -default = "ballista_core::config::TaskSchedulingPolicy::PullStaged" +doc = "The task scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: Pull" +default = "ballista_core::config::TaskSchedulingPolicy::Pull" diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs index 5902897f30c25..796ee076f6a87 100644 --- a/ballista/rust/executor/src/executor.rs +++ b/ballista/rust/executor/src/executor.rs @@ -17,9 +17,15 @@ //! Ballista executor logic +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; +use parking_lot::RwLock; +use std::collections::HashMap; use std::sync::Arc; +use tokio::sync::mpsc::Sender; use ballista_core::error::BallistaError; +use ballista_core::execution_plans::ShuffleStreamReaderExec; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::protobuf; use ballista_core::serde::scheduler::ExecutorSpecification; @@ -29,11 +35,18 @@ use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{ExecutionPlan, Partitioning}; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; +type ExecutorChannel = + RwLock>>>; + /// Ballista executor pub struct Executor { /// Directory for storing partial results work_dir: String, + /// Channels for sending partial shuffle partitions to stream shuffle reader. + /// Key is the jobId + stageId + partition. + pub channels: ExecutorChannel, + /// Specification like total task slots pub specification: ExecutorSpecification, @@ -58,6 +71,7 @@ impl Executor { ) -> Self { Self { work_dir: work_dir.to_owned(), + channels: RwLock::new(HashMap::new()), specification, ctx, } @@ -79,14 +93,37 @@ impl Executor { let exec = if let Some(shuffle_writer) = plan.as_any().downcast_ref::() { + // find all the stream shuffle readers and bind them to the executor context + let stream_shuffle_readers = + ShuffleStreamReaderExec::find_stream_shuffle_readers(plan.clone()); + for shuffle_reader in stream_shuffle_readers { + let _job_id = job_id.clone(); + // The stage_id in shuffle stream reader is the stage id which the reader depends on. + let _stage_id = shuffle_reader.stage_id; + { + let mut _map = self.channels.write(); + let sender = shuffle_reader.create_record_batch_channel(); + _map.insert((_job_id, _stage_id, part), sender); + } + } // recreate the shuffle writer with the correct working directory - ShuffleWriterExec::try_new( - job_id.clone(), - stage_id, - plan.children()[0].clone(), - self.work_dir.clone(), - shuffle_writer.shuffle_output_partitioning().cloned(), - ) + if !shuffle_writer.is_push_shuffle() { + ShuffleWriterExec::try_new_pull_shuffle( + job_id.clone(), + stage_id, + plan.children()[0].clone(), + self.work_dir.clone(), + shuffle_writer.shuffle_output_partitioning().cloned(), + ) + } else { + ShuffleWriterExec::try_new( + job_id.clone(), + stage_id, + plan.children()[0].clone(), + shuffle_writer.output_loc.clone(), + shuffle_writer.shuffle_output_partitioning().cloned(), + ) + } } else { Err(DataFusionError::Internal( "Plan passed to execute_shuffle_write is not a ShuffleWriterExec" diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs index cf5ab179813bb..62e2b50b60d09 100644 --- a/ballista/rust/executor/src/flight_service.rs +++ b/ballista/rust/executor/src/flight_service.rs @@ -17,6 +17,7 @@ //! Implementation of the Apache Arrow Flight protocol that wraps an executor. +use std::convert::TryFrom; use std::fs::File; use std::pin::Pin; use std::sync::Arc; @@ -27,6 +28,8 @@ use ballista_core::error::BallistaError; use ballista_core::serde::decode_protobuf; use ballista_core::serde::scheduler::Action as BallistaAction; +use arrow::datatypes::Schema; +use arrow_flight::utils::flight_data_to_arrow_batch; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, @@ -39,6 +42,7 @@ use datafusion::arrow::{ use futures::{Stream, StreamExt}; use log::{info, warn}; use std::io::{Read, Seek}; +use std::task::{Context, Poll}; use tokio::sync::mpsc::channel; use tokio::{ sync::mpsc::{Receiver, Sender}, @@ -112,6 +116,7 @@ impl FlightService for BallistaFlightService { Box::pin(ReceiverStream::new(rx)) as Self::DoGetStream )) } + _ => Err(Status::unimplemented("unimplemented action")), } } @@ -147,13 +152,83 @@ impl FlightService for BallistaFlightService { &self, request: Request>, ) -> Result, Status> { - let mut request = request.into_inner(); + let mut stream = request.into_inner(); + let first_block_maybe = stream + .next() + .await + .ok_or_else(|| Status::invalid_argument("empty FlightData stream"))?; + + let first_block = match first_block_maybe { + Ok(fb) => fb, + Err(e) => { + return Err(Status::invalid_argument(format!( + "first block did not contain a schema: {}", + e + ))); + } + }; + + // Extract the FlightDescriptor and Schema from the first FlightData block + let descriptor = match &first_block.flight_descriptor { + Some(d) => d, + None => { + return Err(Status::invalid_argument( + "No Flight Descriptor in first block", + )); + } + }; + + let arrow_schema = Schema::try_from(&first_block).map_err(|e| { + Status::invalid_argument(format!( + "first block did not contain a schema: {}", + e + )) + })?; + + let arrow_schema_arc = Arc::new(arrow_schema); + + let action = + decode_protobuf(&descriptor.cmd).map_err(|e| from_ballista_err(&e))?; + + let (sender, channel_key) = match action { + BallistaAction::PushPartition { + job_id, + stage_id, + partition_id, + } => { + info!("Trying to read PushPartition job {:?}, stage {:?} and partition {:?}", job_id, stage_id, partition_id); + { + let channels_map = self._executor.channels.read(); + let channel_key = (job_id, stage_id, partition_id); + match channels_map.get(&channel_key) { + Some(d) => (d.clone(), channel_key), + None => { + return Err(Status::invalid_argument(format!( + "No receive channels registered for this PushPartition job {:?}, stage {:?} ", &channel_key.0, &channel_key.1) + )); + } + } + } + } + _ => return Err(Status::unimplemented("Invalid Action")), + }; - while let Some(data) = request.next().await { - let _data = data?; + while let Some(flight_data) = stream.message().await? { + let record_batch = + flight_data_to_arrow_batch(&flight_data, arrow_schema_arc.clone(), &[]); + + // ignore the error + sender.send(record_batch).await.ok(); + } + + { + let mut channels_map = self._executor.channels.write(); + channels_map.remove(&channel_key); } - Err(Status::unimplemented("do_put")) + Ok(Response::new( + Box::pin(EmptyPutStream {}) as Self::DoPutStream + )) } async fn do_action( @@ -242,3 +317,16 @@ fn from_arrow_err(e: &ArrowError) -> Status { fn from_ballista_err(e: &ballista_core::error::BallistaError) -> Status { Status::internal(format!("Ballista Error: {:?}", e)) } + +pub struct EmptyPutStream {} + +impl Stream for EmptyPutStream { + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(None) + } +} diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index 3440f821134cd..4251758a2abe0 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -119,7 +119,7 @@ async fn main() -> Result<()> { let scheduler_policy = opt.task_scheduling_policy; match scheduler_policy { - TaskSchedulingPolicy::PushStaged => { + TaskSchedulingPolicy::Push => { tokio::spawn(executor_server::startup( scheduler, executor.clone(), diff --git a/ballista/rust/scheduler/scheduler_config_spec.toml b/ballista/rust/scheduler/scheduler_config_spec.toml index cf03fc08a72a9..092e5a01f66a0 100644 --- a/ballista/rust/scheduler/scheduler_config_spec.toml +++ b/ballista/rust/scheduler/scheduler_config_spec.toml @@ -63,5 +63,5 @@ doc = "bind port. Default: 50050" abbr = "s" name = "scheduler_policy" type = "ballista_core::config::TaskSchedulingPolicy" -doc = "The scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: PullStaged" -default = "ballista_core::config::TaskSchedulingPolicy::PullStaged" \ No newline at end of file +doc = "The scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: Pull" +default = "ballista_core::config::TaskSchedulingPolicy::Pull" \ No newline at end of file diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 3459cce9a4bf1..fba227bb32873 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -102,7 +102,7 @@ use self::state::{ConfigBackendClient, SchedulerState}; use anyhow::Context; use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy}; use ballista_core::error::BallistaError; -use ballista_core::execution_plans::ShuffleWriterExec; +use ballista_core::execution_plans::{ShuffleStreamReaderExec, ShuffleWriterExec}; use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec}; @@ -137,7 +137,7 @@ impl SchedulerServer SchedulerServer, job_id: &str, ) -> Result<(Vec>, usize), BallistaError> { + if let TaskSchedulingPolicy::Pull = self.policy { + let msg = "fetch_tasks call is only available for push-based task scheduling" + .to_string(); + error!("{}", msg); + return Err(BallistaError::General(msg)); + } let mut ret: Vec> = Vec::with_capacity(available_executors.len()); for _idx in 0..available_executors.len() { @@ -415,7 +421,7 @@ impl SchedulerGrpc &self, request: Request, ) -> std::result::Result, tonic::Status> { - if let TaskSchedulingPolicy::PushStaged = self.policy { + if let TaskSchedulingPolicy::Push = self.policy { error!("Poll work interface is not supported for push-based task scheduling"); return Err(tonic::Status::failed_precondition( "Bad request because poll work is not supported for push-based task scheduling", @@ -830,8 +836,8 @@ impl SchedulerGrpc let state = self.state.clone(); let job_id_spawn = job_id.clone(); let tx_job: Option> = match self.policy { - TaskSchedulingPolicy::PullStaged => None, - TaskSchedulingPolicy::PushStaged => { + TaskSchedulingPolicy::Pull => None, + TaskSchedulingPolicy::Push => { Some(self.scheduler_env.as_ref().unwrap().tx_job.clone()) } }; @@ -916,6 +922,28 @@ impl SchedulerGrpc // save stages into state for shuffle_writer in stages { + // save stage lineages for all-at-once stages + let stream_shuffle_readers = + ShuffleStreamReaderExec::find_stream_shuffle_readers( + shuffle_writer.clone(), + ); + for shuffle_reader in stream_shuffle_readers { + fail_job!(state + .save_stage_lineages( + &job_id_spawn, + shuffle_reader.stage_id, + shuffle_writer.stage_id(), + ) + .await + .map_err(|e| { + let msg = format!( + "Could save plan query stage lineages: {}", + e + ); + error!("{}", msg); + tonic::Status::internal(msg) + })); + } fail_job!(state .save_stage_plan( &job_id_spawn, @@ -947,6 +975,11 @@ impl SchedulerGrpc } )); } + info!( + "Adding stage {} with {} pending tasks", + shuffle_writer.stage_id(), + num_partitions + ); } if let Some(tx_job) = tx_job { diff --git a/ballista/rust/scheduler/src/main.rs b/ballista/rust/scheduler/src/main.rs index c9066027e3875..a7e0afe7b2e02 100644 --- a/ballista/rust/scheduler/src/main.rs +++ b/ballista/rust/scheduler/src/main.rs @@ -82,7 +82,7 @@ async fn start_server( ); let scheduler_server: SchedulerServer = match policy { - TaskSchedulingPolicy::PushStaged => { + TaskSchedulingPolicy::Push => { // TODO make the buffer size configurable let (tx_job, rx_job) = mpsc::channel::(10000); let scheduler_server = SchedulerServer::new_with_policy( diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 68f26e9ffa1d6..5bac0d36d4e39 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -23,8 +23,9 @@ use std::collections::HashMap; use std::sync::Arc; use ballista_core::error::{BallistaError, Result}; +use ballista_core::serde::scheduler::ExecutorMeta; use ballista_core::{ - execution_plans::{ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec}, + execution_plans::{ShuffleStreamReaderExec, ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec}, serde::scheduler::PartitionLocation, }; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -55,24 +56,41 @@ impl Default for DistributedPlanner { impl DistributedPlanner { /// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec]. - /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec]. + /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec] + /// or of type [ShuffleStreamReaderExec] if the created stages are all-at-once stages. /// A [ShuffleWriterExec] is created whenever the partitioning changes. pub async fn plan_query_stages<'a>( &'a mut self, job_id: &'a str, execution_plan: Arc, ) -> Result>> { - info!("planning query stages"); - let (new_plan, mut stages) = self - .plan_query_stages_internal(job_id, execution_plan) + info!("planning query stages for job {}", job_id); + let (modified_plan, mut stages) = self + .plan_query_stages_internal(job_id, execution_plan.clone()) .await?; - stages.push(create_shuffle_writer( - job_id, - self.next_stage_id(), - new_plan, - None, - )?); - Ok(stages) + // re-plan the input execution plan and create All-at-once query stages. + // Now we just simply depends on the the stage count to decide whether to create All-at-once or normal stages. + // In future, we can have more sophisticated way to decide which way to go. + if stages.len() > 1 && stages.len() <= 4 { + let (new_modified_plan, mut new_stages) = self + .plan_all_at_once_query_stages_internal(job_id, execution_plan.clone()) + .await?; + new_stages.push(create_shuffle_writer( + job_id, + self.next_stage_id(), + new_modified_plan, + None, + )?); + Ok(new_stages) + } else { + stages.push(create_shuffle_writer( + job_id, + self.next_stage_id(), + modified_plan, + None, + )?); + Ok(stages) + } } /// Returns a potentially modified version of the input execution_plan along with the resulting query stages. @@ -169,6 +187,85 @@ impl DistributedPlanner { .boxed() } + /// Re-plan the input execution_plan and create new All-at-once query stages. + fn plan_all_at_once_query_stages_internal<'a>( + &'a mut self, + job_id: &'a str, + execution_plan: Arc, + ) -> BoxFuture<'a, Result> { + async move { + // recurse down and replace children + if execution_plan.children().is_empty() { + return Ok((execution_plan, vec![])); + } + + let mut stages = vec![]; + let mut children = vec![]; + for child in execution_plan.children() { + let (new_child, mut child_stages) = self + .plan_all_at_once_query_stages_internal(job_id, child.clone()) + .await?; + children.push(new_child); + stages.append(&mut child_stages); + } + + if let Some(coalesce) = execution_plan + .as_any() + .downcast_ref::() + { + let shuffle_writer = create_push_shuffle_writer( + job_id, + self.next_stage_id(), + children[0].clone(), + None, + )?; + + let shuffle_reader = Arc::new(ShuffleStreamReaderExec::new( + shuffle_writer.stage_id(), + shuffle_writer.schema(), + 1, + )); + stages.push(shuffle_writer); + Ok((coalesce.with_new_children(vec![shuffle_reader])?, stages)) + } else if let Some(repart) = + execution_plan.as_any().downcast_ref::() + { + match repart.output_partitioning() { + Partitioning::Hash(_, p) => { + let shuffle_writer = create_push_shuffle_writer( + job_id, + self.next_stage_id(), + children[0].clone(), + Some(repart.partitioning().to_owned()), + )?; + + let shuffle_reader = Arc::new(ShuffleStreamReaderExec::new( + shuffle_writer.stage_id(), + shuffle_writer.schema(), + p, + )); + stages.push(shuffle_writer); + Ok((shuffle_reader, stages)) + } + _ => { + // remove any non-hash repartition from the distributed plan + Ok((children[0].clone(), stages)) + } + } + } else if let Some(window) = + execution_plan.as_any().downcast_ref::() + { + Err(BallistaError::NotImplemented(format!( + "WindowAggExec with window {:?}", + window + ))) + } else { + Ok((execution_plan.with_new_children(children)?, stages)) + } + } + .boxed() + } + /// Generate a new stage ID fn next_stage_id(&mut self) -> usize { self.next_stage_id += 1; @@ -229,13 +326,35 @@ pub fn remove_unresolved_shuffles( Ok(stage.with_new_children(new_children)?) } +pub fn update_shuffle_locs( + stage: Arc, + output_locs: Vec, +) -> Result> { + if let Some(shuffle_writer) = stage.as_any().downcast_ref::() { + if shuffle_writer.is_push_shuffle() { + let new_shuffle_writer = Arc::new(ShuffleWriterExec::try_new_push_shuffle( + shuffle_writer.job_id().to_string(), + shuffle_writer.stage_id(), + shuffle_writer.children()[0].clone(), + output_locs, + shuffle_writer.shuffle_output_partitioning().cloned(), + )?); + Ok(new_shuffle_writer) + } else { + Ok(stage) + } + } else { + Ok(stage) + } +} + fn create_shuffle_writer( job_id: &str, stage_id: usize, plan: Arc, partitioning: Option, ) -> Result> { - Ok(Arc::new(ShuffleWriterExec::try_new( + Ok(Arc::new(ShuffleWriterExec::try_new_pull_shuffle( job_id.to_owned(), stage_id, plan, @@ -244,12 +363,27 @@ fn create_shuffle_writer( )?)) } +fn create_push_shuffle_writer( + job_id: &str, + stage_id: usize, + plan: Arc, + partitioning: Option, +) -> Result> { + Ok(Arc::new(ShuffleWriterExec::try_new_push_shuffle( + job_id.to_owned(), + stage_id, + plan, + Vec::new(), // executors will be decided later during task scheduling + partitioning, + )?)) +} + #[cfg(test)] mod test { use crate::planner::DistributedPlanner; use crate::test_utils::datafusion_test_context; use ballista_core::error::BallistaError; - use ballista_core::execution_plans::UnresolvedShuffleExec; + use ballista_core::execution_plans::ShuffleStreamReaderExec; use ballista_core::serde::{protobuf, AsExecutionPlan, BallistaCodec}; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; @@ -308,12 +442,12 @@ mod test { ProjectionExec: expr=[l_returnflag@0 as l_returnflag, SUM(lineitem.l_extendedprice Multiply Int64(1))@1 as sum_disc_price] HashAggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))] CoalesceBatchesExec: target_batch_size=4096 - UnresolvedShuffleExec + ShuffleStreamReaderExec ShuffleWriterExec: None SortExec: [l_returnflag@0 ASC] CoalescePartitionsExec - UnresolvedShuffleExec + ShuffleStreamReaderExec */ assert_eq!(3, stages.len()); @@ -331,12 +465,12 @@ mod test { assert!(*final_hash.mode() == AggregateMode::FinalPartitioned); let coalesce = final_hash.children()[0].clone(); let coalesce = downcast_exec!(coalesce, CoalesceBatchesExec); - let unresolved_shuffle = coalesce.children()[0].clone(); - let unresolved_shuffle = - downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec); - assert_eq!(unresolved_shuffle.stage_id, 1); - assert_eq!(unresolved_shuffle.input_partition_count, 2); - assert_eq!(unresolved_shuffle.output_partition_count, 2); + let shuffle_stream_reader = coalesce.children()[0].clone(); + let shuffle_stream_reader = + downcast_exec!(shuffle_stream_reader, ShuffleStreamReaderExec); + // recreate new stages + assert_eq!(shuffle_stream_reader.stage_id, 3); + assert_eq!(shuffle_stream_reader.partition_count, 2); // verify stage 2 let stage2 = stages[2].children()[0].clone(); @@ -348,12 +482,11 @@ mod test { coalesce_partitions.output_partitioning().partition_count(), 1 ); - let unresolved_shuffle = coalesce_partitions.children()[0].clone(); - let unresolved_shuffle = - downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec); - assert_eq!(unresolved_shuffle.stage_id, 2); - assert_eq!(unresolved_shuffle.input_partition_count, 2); - assert_eq!(unresolved_shuffle.output_partition_count, 2); + let shuffle_stream_reader = coalesce_partitions.children()[0].clone(); + let shuffle_stream_reader = + downcast_exec!(shuffle_stream_reader, ShuffleStreamReaderExec); + assert_eq!(shuffle_stream_reader.stage_id, 4); + assert_eq!(shuffle_stream_reader.partition_count, 1); Ok(()) } @@ -427,20 +560,20 @@ order by CoalesceBatchesExec: target_batch_size=4096 HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })] CoalesceBatchesExec: target_batch_size=4096 - UnresolvedShuffleExec + ShuffleStreamReaderExec CoalesceBatchesExec: target_batch_size=4096 - UnresolvedShuffleExec + ShuffleStreamReaderExec ShuffleWriterExec: None ProjectionExec: expr=[l_shipmode@0 as l_shipmode, SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count] HashAggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)] CoalesceBatchesExec: target_batch_size=4096 - UnresolvedShuffleExec + ShuffleStreamReaderExec ShuffleWriterExec: None SortExec: [l_shipmode@0 ASC] CoalescePartitionsExec - UnresolvedShuffleExec + ShuffleStreamReaderExec */ assert_eq!(5, stages.len()); @@ -499,18 +632,16 @@ order by let join_input_1 = join.children()[0].clone(); // skip CoalesceBatches let join_input_1 = join_input_1.children()[0].clone(); - let unresolved_shuffle_reader_1 = - downcast_exec!(join_input_1, UnresolvedShuffleExec); - assert_eq!(unresolved_shuffle_reader_1.input_partition_count, 2); // lineitem - assert_eq!(unresolved_shuffle_reader_1.output_partition_count, 2); + let shuffle_stream_reader_1 = + downcast_exec!(join_input_1, ShuffleStreamReaderExec); + assert_eq!(shuffle_stream_reader_1.partition_count, 2); // lineitem let join_input_2 = join.children()[1].clone(); // skip CoalesceBatches let join_input_2 = join_input_2.children()[0].clone(); - let unresolved_shuffle_reader_2 = - downcast_exec!(join_input_2, UnresolvedShuffleExec); - assert_eq!(unresolved_shuffle_reader_2.input_partition_count, 1); //orders - assert_eq!(unresolved_shuffle_reader_2.output_partition_count, 2); + let shuffle_stream_reader_2 = + downcast_exec!(join_input_2, ShuffleStreamReaderExec); + assert_eq!(shuffle_stream_reader_2.partition_count, 2); //orders // final partitioned hash aggregate assert_eq!( diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index 974516d8b19b8..cf5f4666942b4 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::convert::TryInto; use std::time::{SystemTime, UNIX_EPOCH}; use std::{any::type_name, collections::HashMap, sync::Arc, time::Duration}; @@ -24,6 +25,7 @@ use log::{debug, error, info}; use prost::Message; use tokio::sync::OwnedMutexGuard; +use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::protobuf::{ self, job_status, task_status, CompletedJob, CompletedTask, ExecutorHeartbeat, ExecutorMetadata, FailedJob, FailedTask, JobStatus, RunningJob, RunningTask, @@ -36,6 +38,7 @@ use ballista_core::{error::Result, execution_plans::UnresolvedShuffleExec}; use datafusion::prelude::ExecutionContext; use super::planner::remove_unresolved_shuffles; +use super::planner::update_shuffle_locs; #[cfg(feature = "etcd")] mod etcd; @@ -420,7 +423,75 @@ impl SchedulerState SchedulerState SchedulerState Result<()> { + let key = get_stage_lineage_key(&self.namespace, job_id, stage_id); + let value = depend_stage_id.to_be_bytes(); + self.config_client.clone().put(key, value.to_vec()).await + } + + pub async fn get_depend_stage(&self, job_id: &str, stage_id: usize) -> Result { + let key = get_stage_lineage_key(&self.namespace, job_id, stage_id); + let value = self.config_client.clone().get(&key).await?; + if value.is_empty() { + return Err(BallistaError::General(format!( + "Can not find depend stage for shuffle write stage {}", + key + ))); + } + let array: [u8; 8] = match value.as_slice().try_into() { + Ok(ba) => ba, + Err(_) => panic!("Expected a Vec of length {} but it was {}", 8, value.len()), + }; + Ok(usize::from_be_bytes(array)) + } + // Global lock for the state. We should get rid of this to be able to scale. pub async fn lock(&self) -> Result> { self.config_client.lock().await @@ -714,6 +817,19 @@ fn find_unresolved_shuffles( } } +/// Return the the push shuffles in the execution plan, shuffle writer should be the root node +fn find_push_shuffle(plan: &Arc) -> Option { + if let Some(shuffle_writer) = plan.as_any().downcast_ref::() { + if shuffle_writer.is_push_shuffle() { + Some(shuffle_writer.clone()) + } else { + None + } + } else { + None + } +} + fn get_executors_prefix(namespace: &str) -> String { format!("/ballista/{}/executors", namespace) } @@ -770,6 +886,13 @@ fn get_stage_plan_key(namespace: &str, job_id: &str, stage_id: usize) -> String format!("/ballista/{}/stages/{}/{}", namespace, job_id, stage_id,) } +fn get_stage_lineage_key(namespace: &str, job_id: &str, stage_id: usize) -> String { + format!( + "/ballista/{}/stages/lineage/{}/{}", + namespace, job_id, stage_id, + ) +} + fn decode_protobuf(bytes: &[u8]) -> Result { T::decode(bytes).map_err(|e| { BallistaError::Internal(format!( diff --git a/datafusion/src/physical_plan/analyze.rs b/datafusion/src/physical_plan/analyze.rs index 6857ad532273b..d7efd632a1244 100644 --- a/datafusion/src/physical_plan/analyze.rs +++ b/datafusion/src/physical_plan/analyze.rs @@ -211,7 +211,7 @@ impl ExecutionPlan for AnalyzeExec { Ok(RecordBatchReceiverStream::create( &self.schema, rx, - join_handle, + Some(join_handle), )) } diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 539904e583673..050de80ecbd70 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -250,7 +250,7 @@ impl ExecutionPlan for ParquetExec { Ok(RecordBatchReceiverStream::create( &self.projected_schema, response_rx, - join_handle, + Some(join_handle), )) } diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index 1428e1627d8f8..4cd3096148ac3 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -332,7 +332,7 @@ fn read_spill_as_stream( Ok(RecordBatchReceiverStream::create( &schema, receiver, - join_handle, + Some(join_handle), )) } diff --git a/datafusion/src/physical_plan/stream.rs b/datafusion/src/physical_plan/stream.rs index 67b7090406901..5afdbeefea389 100644 --- a/datafusion/src/physical_plan/stream.rs +++ b/datafusion/src/physical_plan/stream.rs @@ -36,7 +36,7 @@ pub struct RecordBatchReceiverStream { inner: ReceiverStream>, #[allow(dead_code)] - drop_helper: AbortOnDropSingle<()>, + drop_helper: Option>, } impl RecordBatchReceiverStream { @@ -45,14 +45,14 @@ impl RecordBatchReceiverStream { pub fn create( schema: &SchemaRef, rx: tokio::sync::mpsc::Receiver>, - join_handle: JoinHandle<()>, + join_handle: Option>, ) -> SendableRecordBatchStream { let schema = schema.clone(); let inner = ReceiverStream::new(rx); Box::pin(Self { schema, inner, - drop_helper: AbortOnDropSingle::new(join_handle), + drop_helper: join_handle.map(AbortOnDropSingle::new), }) } } diff --git a/datafusion/src/test/exec.rs b/datafusion/src/test/exec.rs index 5a6b27865d133..72be335b36a91 100644 --- a/datafusion/src/test/exec.rs +++ b/datafusion/src/test/exec.rs @@ -204,7 +204,7 @@ impl ExecutionPlan for MockExec { Ok(RecordBatchReceiverStream::create( &self.schema, rx, - join_handle, + Some(join_handle), )) } @@ -335,7 +335,7 @@ impl ExecutionPlan for BarrierExec { Ok(RecordBatchReceiverStream::create( &self.schema, rx, - join_handle, + Some(join_handle), )) }