-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-11930: [C++][Dataset][Compute] Use an ExecPlan for dataset scans #10397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a lot to digest but I think looks good overall, with much of it being refactoring or restructuring. (Thank you for taking care of cleaning up things all over!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that stopping a producer doesn't necessarily immediately terminate everything, the consumer needs to be prepared to get and handle/ignore an error anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd agree that handling/ignoring trailing batches is necessary; the producer may take a while to stop. However I wonder if it's reasonable to do the same for trailing errors. For example: let's say we have a plan where a LimitNode is taking the first 99 rows from EmitsErrorAfterHundredthRowNode. There's a race condition here (also depends on chunking) because the LimitNode will sometimes receive the trailing error before it can stop the producer and sometimes will succeed in stopping its producer before it gets around to raising an error. I'm not sure what the correct answer here is, but I lean toward: if any node emits any error, that always puts all subsequent nodes into an error state too (unless explicitly intercepted). The above example seems like a problem we need to fix in EmitsErrorAfterHundredthRowNode rather than requiring all consumers to ignore post-stop errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. I think I have the same inclination then; except for maybe a sink node that's already gotten all its results, in which case subsequent errors are probably irrelevant, propagating errors even when otherwise 'finished' makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree to keep the logic as simple as possible.
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first glance it seems like a good overall cleanup, thanks. How do you see things evolving? Do you think the various operations achieved by a scanner today will be achieved by an execution plan? For example, will ScanBatches, CountRows, etc. create and execute an execution plan instead of maintaining the dual paths?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what is going on here (though that is likely my own problem). If the value is a scalar record batch you want to end up with one each value being a scalar. Can you not just grab the first item from each column of partial_array? Why do you need to go back in and patch things up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was as compact as I could write this case; if you see a way to compress/simplify it then I'll take it but the scalar/array cases are really just for testing purposes
pitrou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only took a partial look at this.
From a high-level point of view, it seems to me that we want an entire execution pipeline to be wrapped in a single exec node, rather than having individual project, filter... nodes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that we'll want to be able to apply backpressure at some point. But a generator doesn't allow for that. So it seems that, instead of wrapping a generator, you should really have a ExecNode that wraps a dataset scanner directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull-based models (e.g. generators) apply backpressure by default, you have to specifically ask for each item of data that you want. Although it's turned around so maybe it's right to just call it pressure. If we want to apply it here it could be done by adding a flag in the loop (perhaps near if (finished_)) that looks something like...
if (pause_future_) {
return pause_future_;
}
Then PauseProducing becomes:
pause_future_ = Future<>::Make();
and ResumeProducing becomes:
pause_future_.MarkFinished();
pause_future_ = Future<>(); // Maybe we need a `Reset` or `MakeInvalid`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, you're right, my bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For any kind of "map-like" node that has an input and an output a call to PauseProducing should always call PauseProducing on the input. That's the only way to ensure that back pressure is properly channeled to the source (which can actually pause)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are currently stubs due to lack of support for pausing in any source node. For now, I'll remove these and add a follow up to support pause/resume
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My vote is that ErrorReceived is sufficient. I think a node could recover from a failure but, if it does so, it shouldn't call ErrorReceived.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pondering how back pressure would be applied. I think there would be a new argument added to this SinkNode for max_items_queued or something like that. However, we could not naively apply that limit to received_batches_ because of the resequencing.
Since we are delivering to a pull-based model I think the appropriate way to apply back pressure would be to have the PushGenerator keep track of how many undelivered items it has. Then there would need to be a check in this code and, after pushing, if the PushGenerator is full, then apply back pressure to the inputs. The PushGenerator would also need some way of signalling back into the SinkNode that the pressure has been relieved and it is ready for more items.
I don't think this has to be implemented now, but does that sound reasonable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds reasonable, but the prerequisite is to support pause/resume in a source node
I'd like the ExecPlan to be usable enough to replace all filtering and projection currently in Scanner. So for example ScanBatches could assemble an ExecPlan to handle filtering and projection then receive and reorder batches; never needing to explicitly evaluate an expression. Ultimately, I'm not positive we'll keep Scanner. It's possible we could simplify the dataset module to a factory for source/sink nodes. In that case, anything which currently builds a Scanner would instead produce an ExecPlan. We'll see |
bb8a0a7 to
5fe4d10
Compare
10f3aea to
1dff3bf
Compare
47710c1 to
0791f80
Compare
05365cf to
371d5ca
Compare
|
@pitrou PTAL |
pitrou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more comments.
| under the License. | ||
| --> | ||
|
|
||
| # ExecNodes and logical operators |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand the status of this document. If this is meant to be a persistent document, then can it be part of the Sphinx development docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll promote this to a Sphinx doc in a follow up. https://issues.apache.org/jira/browse/ARROW-13227
|
|
||
| /// \brief Like MapVector, but where the function can fail. | ||
| template <typename Fn, typename From = internal::call_traits::argument_type<0, Fn>, | ||
| typename To = typename internal::call_traits::return_type<Fn>::ValueType> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the decltype(declval) pattern here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's not a good reason; just uniformity with getting From from call_traits.
| if (auto name = this->name()) { | ||
| return internal::MapVector([](int i) { return FieldPath{i}; }, | ||
| schema.GetAllFieldIndices(*name)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
529a805 to
828c188
Compare
11bc3c1 to
e91ef9f
Compare
|
@pitrou I think I've addressed your comments. Could we merge this and address anything else in follow up? |
|
+1, merging |
So far this involved a lot of refactoring of Expressions to be compatible with ExecBatches. The next step is to add a ScanNode wrapping a ScannerBuilder