-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-17613: [C++] Add function execution API for a preconfigured kernel #14043
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@lidavidm, are you a good person to review this? or can you suggest someone? |
lidavidm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea sounds reasonable to me. I'd like to see unit tests here.
Possibly expression evaluation can take advantage of this? Right now it manually resolves a kernel and evaluates it, being able to replace that would help prove out the API/implementation here
Will do.
Perhaps. My motivation for this change is related to UDFs, which would have their kernel bound once then executed multiple times over a stream of batches.
In principle, the new code gets covered through existing unit tests because the original function API now goes through the new code for kernel-binding and execution. I'll add unit tests specific to the new code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding these APIs is a welcome improvement. My comments above tend to mirror @lidavidm 's own observations.
bkietz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what purpose this serves; when will we be able to use this? When executing an ExecPlan for example we already resolve all the kernels and types in Expression::Bind, then re-use those kernels for each input batch. Maybe this would be better as adding ExecuteScalarOrVectorExpression?
The focus here is on the function execution API, i.e., a lower-level API than the plan execution API. Currently, the function execution API goes through kernel selection on each invocation, because the types of the passed arguments may change each time. This PR adds a faster-path for executing a preconfigured kernel when the argument types are known to be fixed across invocations. Note that, in general, these invocations need not be over a stream of batches, like in an execution plan, but could be dynamically driven. Regarding when to use this: First, a user may directly use this where they would use the original function execution API. Second, as noted above, my motivation for this is related to UDFs, where their kernel would be preconfigured once then executed multiple times over a stream of batches (the kernel state ends up holding Python stuff). It's possible this kernel-preconfiguration can be integrated into expression binding too; I haven't looked into this. |
Will do. |
I understand, but when do users touch the function execution API? I think that'd primarily be through the python or R bindings to handle ad-hoc cases like adding two arrays together... and in that case, constructing a FunctionExecutor would not be useful since the user input time delay will greatly outweigh kernel lookup. A FunctionExecutor would only be useful when executing the same function multiple times- for example when applied to multiple batches from a stream of data. What I'd like to hear is when that's beneficial and isn't served by construction of an ExecPlan.
Kernel preconfiguration is precisely the function of Expression::Bind, among other things:
In short, it seems that we won't be able to use FunctionExecutor where it seems to me we'd most like to see UDF capabilities: in filter and project expressions in ExecPlans. Since that will eventually require refactoring/extension of the Expression utilities, I'd prefer we start there so that we can have a better picture of the ways ExecPlan etc will need to change to accommodate UDFs. Building parallel streaming execution functionality which will ultimately need to be accommodated or assimilated by ExecPlans seems like much more churn. |
|
@bkietz, before I answer your points, I should note that the code here is extracted from a working end-to-end (Ibis/Substrait/PyArrow) prototype for UDFs and UDTs, which are UDFs that provide a stream of tabular data, that I developed. While this doesn't mean its design would be accepted as is (and I do welcome feedback on it, or parts of it that I extract), there is currently no alternative working design. I'd expect to see a comparable alternative put forward, so I could evaluate pros and cons in the context of end-to-end support for UDFs and UDTs. In my mind, just evolving expression binding is not a comparable alternative.
I said "user" but didn't mean "end-user" necessarily; I should have said "caller" for clarity. Still, the pre-PR function execution API is public, so we should assume it is used by end-users and the burden is actually in claiming the opposite (e.g., for deprecation purposes). The fact that there exists a higher-level API, which may be convenient for a lot of use cases (like streaming), does not change this. Granted, there is also a burden of showing the proposed API is useful. I could point you to how the prototype uses this new function execution API, if that would be helpful. The general idea is that the end-user is driving from PyArrow and registers a UDT. The UDT is a Python-implemented function that may be invoked multiple times, each at a different source node in the execution plan. Each such invocation returns a stream object implemented in Python that is managed in a kernel state. Invoking the kernel returns tabular data that is part of the dynamically generated stream. The new function execution API is designed to enable this setup.
Yes, on arguments of the same types.
AFAICS, the above described UDT functionality cannot be served by an ExecPlan nor by expression binding.
IIUC, C++ code (for expression binding) is the driver here. In the prototype's design, it is end-user code, via PyArrow, who is the driver. Also, IIUC, you have a call-expression in mind and it is designed to be stateless whereas the stream generator in the prototype is stateful. It's not clear how to reconcile these differences in a proposal based on expression binding. At least this will need to be explained in the context of a more complete description of an alternative. |
|
A few thoughts:
I'm pretty sure there are existing cases where users have interacted directly with functions and not via an execution plan (I think @marsupialtail does this with Take and I seem to recall @drin using the compute API directly as well). I'm not sure those cases couldn't be converted to an execution plan but they do exist. IIRC these are cases where the user already has an engine / execution plan of their own and they are simply trying to integrate Arrow compute. That being said, if we were going to go down this road, I think it would be more valuable to have an "expression executor" and not merely a "function executor". Also, removing the lookup / argument resolution time is nice but the biggest win would be removing allocations for temporaries / outputs but that can be deferred for a future PR :).
I think the alternative (and this may be a misunderstanding of your goal) is that a UDT not be put into the function registry, even if it looks like a UDF elsewhere (e.g. Substrait). As an example, consider an embedded python Substrait UDT (which does very much look like a UDF). When we consume that plan we would convert that embedded UDT into a function. Let's say it is a python function that returns an iterator of tabular data. Instead of creating a stateful function to poll that iterator we could put that iterator into a source node, probably one of the source nodes you just created in #14207. The |
While I don't know enough about expression use cases, this sounds right to me in the wider context, i.e., outside of just UDFs/UDTs. Does this mean the expression executor should be built on top of the function executor proposed here?
I agree that removing allocations etc would be a significant win. What do you think is missing from this PR to do so? At least at the function executor API level, I believe repeated invocations of
This is an interesting alternative to compare to. I'll try to explain below the differences between this and the one proposed in this PR. I think each of the two alternatives has its merits, and we'll just need to choose whether we want one or both. The source-node (Weston's) proposal for UDTs has several pros that I can see. It requires less changes to Arrow in an end-to-end solution, probably just in the Substrait engine component. It bypasses the need to manage nested registries for UDTs (though these are still needed for UDFs). And its PyArrow part builds on fewer Arrow APIs, probably just the source-node related APIs. OTOH, it also has some cons. It requires a separate source-node per UDT. It does not directly support composing UDFs (say, from a library) with a UDT. And it does not directly support ordering of UDTs within one execution. The function-executor (my) proposal for UDTs, besides supporting the expression executor feature Weston mentioned, has pretty much the reverse pros and cons. I'll elaborate on the less trivial points about UDT composition and ordering. In the prototype, from Arrow's point of view, a UDT is defined as a function that returns a generator of tabular data. However, from Substrait's point of view, a UDT is modeled like a monad, which abstracts out side-effects. This enables composition of UDTs and UDFs in a single expression (rather than placing each UDT in a separate node), kind of like functions and monads can be composed in a functional programming language. For example, one can consider an expression like |
|
Any idea how to deal with the error in this job: |
|
@rtpsw It probably means that the |
Fix doc Co-authored-by: Antoine Pitrou <pitrou@free.fr>
Fix doc Co-authored-by: Antoine Pitrou <pitrou@free.fr>
|
OK, I believe I have addressed your remaining comments. Let me know if you see anything more. |
pitrou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @rtpsw . The API looks ok to me, I wrote some comments on the implementation and tests.
| for (size_t i = 0; i < 2; i++) { | ||
| DCHECK(args.values[i].is_array()); | ||
| const ArraySpan& array = args.values[i].array; | ||
| DCHECK_EQ(*int32(), *array.type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use AssertTypeEqual or check the type id.
| DCHECK_EQ(*int32(), *array.type); | |
| AssertTypeEqual(int32(), array.type); |
or
| DCHECK_EQ(*int32(), *array.type); | |
| EXPECT_EQ(array.type->id(), Type::INT32); |
|
@pitrou, the CI job's failure copied below seems to need attention. Let me know if you have an idea about how to fix it. |
The reason for the error is that the Arrow function |
|
This is probably because we were returning |
|
@pitrou, I think I addressed all issues. Let me know if you see anything more. |
pitrou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. LGTM, let's wait for CI.
|
CI failures are unrelated. |
|
Thanks a lot @rtpsw ! By the way, we have a long PR queue. If you're interested, you might want to start reviewing some of them. |
After [ARROW-17613](https://issues.apache.org/jira/browse/ARROW-17613) (#14043), which made the error message better, we see an error in our tests because the message changed. Authored-by: Dewey Dunnington <dewey@fishandwhistle.net> Signed-off-by: Dewey Dunnington <dewey@fishandwhistle.net>
|
Benchmark runs are scheduled for baseline = 4e99f59 and contender = 18326f9. 18326f9 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
See https://issues.apache.org/jira/browse/ARROW-17613