Skip to content

Conversation

@mingmwang
Copy link
Contributor

Which issue does this PR close?

Closes #1805.

Rationale for this change

Add a new streaming style push based shuffle implementation.

What changes are included in this PR?

  1. new stream shuffle reader implementation
  2. PushPartition gRpc call in Arrow-Flight
  3. All-at-once stage scheduler

Are there any user-facing changes?

No

// re-plan the input execution plan and create All-at-once query stages.
// Now we just simply depends on the the stage count to decide whether to create All-at-once or normal stages.
// In future, we can have more sophisticated way to decide which way to go.
if stages.len() > 1 && stages.len() <= 4 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the original design correctly, the "all-at-once" plan will only get scheduled when there are sufficient task slots available to run the entire plan. So should this be a function of the total number of partitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the original design correctly, the "all-at-once" plan will only get scheduled when there are sufficient task slots available to run the entire plan. So should this be a function of the total number of partitions?

Yes, you are right. But currently the scheduler server doesn't have a clear view of how many task slots available. So here I just add simple check on the stage count. After @yahoNanJing refactor the scheduler state and keep more cpu/task info into the memory state, we can add more sophisticated check logic.

Comment on lines +173 to +177

// let schema = &self.schema;
// let rx = self.batch_receiver.lock().unwrap().pop().unwrap();
// let join_handle = tokio::task::spawn(async move {});
// Ok(RecordBatchReceiverStream::create(schema, rx, join_handle))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// let schema = &self.schema;
// let rx = self.batch_receiver.lock().unwrap().pop().unwrap();
// let join_handle = tokio::task::spawn(async move {});
// Ok(RecordBatchReceiverStream::create(schema, rx, join_handle))

Comment on lines +67 to +69
info!("planning query stages for job {}", job_id);
let (modified_plan, mut stages) = self
.plan_query_stages_internal(job_id, execution_plan.clone())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this block is only used in the else branch below when all at once mode is disabled?

@alamb
Copy link
Contributor

alamb commented Apr 15, 2022

marking as draft (so it is easer to see what PRs are waiting for review)

@andygrove
Copy link
Member

Closing this PR since it has not been updated in a long time. Feel free to re-open if this is still being worked on.

@andygrove andygrove closed this Nov 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Ballista] Streaming style push-based shuffle and All-at-once stage scheduling in Ballista

5 participants