-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-14970: [C++] Make ExecNodes can generate/consume tasks #11923
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
51e9d0d to
f281e50
Compare
|
cc @bkietz for visibility |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take a second look at this on Monday but I have some initial questions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect the signature to be:
virtual void InputReceived(ExecNode* input, std::function<Result<ExecBatch>(ExecBatch)> task) = 0;
I'm not sure what it means to have batch and task?
Is this some kind of intermediate step between the two models?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't really a task though is it. I was expecting something like...
static inline std::function<Result<ExecBatch>(ExecBatch)> IdentityTask() { return [] (ExecBatch batch) { return batch; }; }
|
Thanks for the feedback @westonpace ! |
f281e50 to
3dfd2d3
Compare
|
@westonpace I've sent some changes, feel free to take another look! |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good. I think we've just got things a bit switched at the moment though. The non-pipeline breakers (filter, project) are submitting tasks and the pipeline breakers are not.
What we want is the pipeline breakers to submit tasks and the ordinary nodes to compose the task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| auto prev = task(); | |
| if (!prev.ok()) { | |
| ErrorIfNotOk(prev.status()); | |
| return; | |
| } | |
| if (ErrorIfNotOk(DoConsume(prev.MoveValueUnsafe(), thread_index))) return; | |
| auto func = [this] (Result<ExecBatch> task) { | |
| ARROW_ASSIGN_OR_RAISE(auto prev, task()); | |
| auto thread_index = get_thread_index_(); | |
| return DoConsume(prev.MoveValueUnsafe(), thread_index); | |
| }; | |
| plan_->scheduler()->SubmitTask(std::move(func)); |
This is what I'm thinking pipeline breakers would look like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plan_->scheduler()->SubmitTask(std::move(func));
Yes that is the idea, but this PR is to enable that construction later, this PR is not going to define any scheduler or submitting logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we aren't going to address this now let's make another JIRA (taskify 3?) Something like, "Fix logic in existing nodes so that pipeline breakers submit and non-breakers forward" and then add a comment in all of these spots along the lines of...
// This node should be forwarding the task downstream but that will be addressed in ARROW-XYZ
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would actually not create a task but forward to downstream like filter/project.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what will this eventually look like? If we assume we don't know how many batches a scanner will emit then how many "scan tasks" do we submit individually? I suppose we can always "over-submit" and then the final tasks will just abandon themselves if the scanner is finished. Could this be another spot for backpressure? I don't think we have to solve all of these problems right now.
694d209 to
b5dcaa4
Compare
|
We will want to update the docs in |
b5dcaa4 to
243cbb4
Compare
minor changes format improve the task API for ExecNodes format fix guarantee issues with project and filter nodes minor format fix build dataset examples fix arrow compute docs
243cbb4 to
0a58b6f
Compare
Done! Good catch! thanks. |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we changed the interface there are a number of nodes that aren't really doing things the right way. I agree we don't need to convert all of them right away (since they still work correctly). As a compromise can we add comments in all the places we will need to change referencing a JIRA that will implement that change?
Once those are in place I think this is good to go.
| // by an input of this node to push a task here for processing. | ||
| // For non-terminating nodes (e.g. filter/project/etc.): the node can wrap | ||
| // its own work with the task (using function composition/fusing) and then | ||
| // call InputReceived on the downstream node. | ||
| // A "terminating node" (e.g. sink node / pipeline breaker) could then submit | ||
| // the task to a scheduler. | ||
| void InputReceived(ExecNode* input, | ||
| std::function<Result<ExecBatch>()> task) override { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // by an input of this node to push a task here for processing. | |
| // For non-terminating nodes (e.g. filter/project/etc.): the node can wrap | |
| // its own work with the task (using function composition/fusing) and then | |
| // call InputReceived on the downstream node. | |
| // A "terminating node" (e.g. sink node / pipeline breaker) could then submit | |
| // the task to a scheduler. | |
| void InputReceived(ExecNode* input, | |
| std::function<Result<ExecBatch>()> task) override { | |
| // by an input of this node to push a task here for processing. | |
| // Non-terminating nodes (e.g. filter/project/etc.): should wrap | |
| // their own work with the task (using function composition/fusing) and then | |
| // call InputReceived on the downstream node. | |
| // Terminating nodes (e.g. sink node / pipeline breaker) should submit | |
| // the task to an executor or task group. | |
| void InputReceived(ExecNode* input, | |
| std::function<Result<ExecBatch>()> task) override { |
Some minor wording and removing the term scheduler as one doesn't exist yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we aren't going to address this now let's make another JIRA (taskify 3?) Something like, "Fix logic in existing nodes so that pipeline breakers submit and non-breakers forward" and then add a comment in all of these spots along the lines of...
// This node should be forwarding the task downstream but that will be addressed in ARROW-XYZ
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what will this eventually look like? If we assume we don't know how many batches a scanner will emit then how many "scan tasks" do we submit individually? I suppose we can always "over-submit" and then the final tasks will just abandon themselves if the scanner is finished. Could this be another spot for backpressure? I don't think we have to solve all of these problems right now.
|
Closing because it has been untouched for a while, in case it's still relevant feel free to reopen and move it forward 👍 |
https://issues.apache.org/jira/browse/ARROW-14970