Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ message PhysicalPlanNode {
CrossJoinExecNode cross_join = 19;
AvroScanExecNode avro_scan = 20;
PhysicalExtensionNode extension = 21;
ShuffleStreamReaderExecNode shuffle_stream_reader = 22;
}
}

Expand Down Expand Up @@ -731,6 +732,14 @@ message ShuffleWriterExecNode {
uint32 stage_id = 2;
PhysicalPlanNode input = 3;
PhysicalHashRepartition output_partitioning = 4;
bool push_shuffle = 5;
repeated ExecutorMetadata execs = 6;
}

message ShuffleStreamReaderExecNode {
uint32 stage_id = 1;
uint64 partition_count = 2;
Schema schema = 3;
}

message ShuffleReaderExecNode {
Expand Down Expand Up @@ -795,6 +804,7 @@ message Action {
oneof ActionType {
// Fetch a partition from an executor
FetchPartition fetch_partition = 3;
PushPartition push_partition = 4;
}

// configuration settings
Expand All @@ -819,6 +829,12 @@ message FetchPartition {
string path = 4;
}

message PushPartition {
string job_id = 1;
uint32 stage_id = 2;
uint32 partition_id = 3;
}

// Mapping from partition id to executor id
message PartitionLocation {
PartitionId partition_id = 1;
Expand Down
134 changes: 132 additions & 2 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ use crate::serde::scheduler::{
};

use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::Ticket;
use arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
use arrow_flight::{FlightDescriptor, SchemaAsIpc, Ticket};
use datafusion::arrow::ipc::writer::IpcWriteOptions;
use datafusion::arrow::{
array::{StringArray, StructArray},
datatypes::{Schema, SchemaRef},
Expand All @@ -44,7 +45,7 @@ use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use datafusion::{logical_plan::LogicalPlan, physical_plan::RecordBatchStream};
use futures::{Stream, StreamExt};
use log::debug;
use log::{debug, warn};
use prost::Message;
use tonic::Streaming;
use uuid::Uuid;
Expand Down Expand Up @@ -92,6 +93,58 @@ impl BallistaClient {
self.execute_action(&action).await
}

/// Push a partition to an executor
pub async fn push_partition(
&mut self,
job_id: String,
stage_id: usize,
partition_id: usize,
input: SendableRecordBatchStream,
) -> Result<PartitionStats> {
let action = Action::PushPartition {
job_id: job_id.to_string(),
stage_id,
partition_id,
};

let serialized_action: protobuf::Action = action.try_into()?;
let mut cmd: Vec<u8> = Vec::with_capacity(serialized_action.encoded_len());
serialized_action
.encode(&mut cmd)
.map_err(|e| BallistaError::General(format!("{:?}", e)))?;

// add an initial FlightData message that sends schema and cmd
let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let schema_flight_data =
SchemaAsIpc::new(input.schema().as_ref(), &options).into();

let fd = Some(FlightDescriptor::new_cmd(cmd));
let cmd_data = FlightData {
flight_descriptor: fd,
..schema_flight_data
};

let flight_stream = Arc::new(Mutex::new(RecordBatchToFlightDataStream::new(
input, cmd_data, options,
)));
let flight_stream_ref = RecordBatchToFlightDataStreamRef {
batch_to_stream: flight_stream.clone(),
};

self.flight_client
.do_put(flight_stream_ref)
.await
.map_err(|e| BallistaError::General(format!("{:?}", e)))?
.into_inner();

let metrics = flight_stream.lock();
Ok(PartitionStats::new(
Some(metrics.num_rows),
Some(metrics.num_batches),
Some(metrics.num_bytes),
))
}

/// Execute an action and retrieve the results
pub async fn execute_action(
&mut self,
Expand Down Expand Up @@ -179,3 +232,80 @@ impl RecordBatchStream for FlightDataStream {
self.schema.clone()
}
}

struct RecordBatchToFlightDataStreamRef {
batch_to_stream: Arc<Mutex<RecordBatchToFlightDataStream>>,
}

struct RecordBatchToFlightDataStream {
num_batches: u64,
num_rows: u64,
num_bytes: u64,
batch: SendableRecordBatchStream,
buffered_data: Vec<FlightData>,
options: IpcWriteOptions,
}

impl RecordBatchToFlightDataStream {
pub fn new(
batch: SendableRecordBatchStream,
schema: FlightData,
options: IpcWriteOptions,
) -> Self {
Self {
num_batches: 0,
num_rows: 0,
num_bytes: 0,
batch,
buffered_data: vec![schema],
options,
}
}
}

impl Stream for RecordBatchToFlightDataStreamRef {
type Item = FlightData;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut self_mut = self.get_mut().batch_to_stream.lock();
if let Some(event) = self_mut.buffered_data.pop() {
Poll::Ready(Some(event))
} else {
loop {
match self_mut.batch.poll_next_unpin(cx) {
Poll::Ready(Some(Err(e))) => {
warn!("Error when poll input batches : {}", e);
continue;
}
Poll::Ready(Some(Ok(record_batch))) => {
self_mut.num_batches += 1;
self_mut.num_rows += record_batch.num_rows() as u64;
let num_bytes: usize = record_batch
.columns()
.iter()
.map(|array| array.get_array_memory_size())
.sum();
self_mut.num_bytes += num_bytes as u64;
let converted_chunk =
arrow_flight::utils::flight_data_from_arrow_batch(
&record_batch,
&self_mut.options,
);
self_mut.buffered_data.extend(converted_chunk.0.into_iter());
self_mut.buffered_data.push(converted_chunk.1);
if let Some(event) = self_mut.buffered_data.pop() {
return Poll::Ready(Some(event));
} else {
continue;
}
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
}
}
}
}
4 changes: 2 additions & 2 deletions ballista/rust/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ impl BallistaConfig {
// needs to be visible to code generated by configure_me
#[derive(Clone, ArgEnum, Copy, Debug, serde::Deserialize)]
pub enum TaskSchedulingPolicy {
PullStaged,
PushStaged,
Pull,
Push,
}

impl std::str::FromStr for TaskSchedulingPolicy {
Expand Down
3 changes: 3 additions & 0 deletions ballista/rust/core/src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

mod distributed_query;
mod shuffle_reader;
mod shuffle_stream_reader;
mod shuffle_writer;
mod unresolved_shuffle;

pub use distributed_query::DistributedQueryExec;
pub use shuffle_reader::ShuffleReaderExec;
pub use shuffle_stream_reader::ShuffleStreamReaderExec;
pub use shuffle_writer::OutputLocation;
pub use shuffle_writer::ShuffleWriterExec;
pub use unresolved_shuffle::UnresolvedShuffleExec;
Loading