-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-32916: [C++] [Python] User-defined tabular functions #14682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The CI job failures seem unrelated to this PR's changes. |
wjones127
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Yaron. This all looks very useful.
I have left several suggestions.
python/pyarrow/compute.py
Outdated
| list_functions, | ||
| _group_by, | ||
| # Udf | ||
| get_record_batches_from_tabular_function, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels off to me.
There are these function classes here:
arrow/python/pyarrow/_compute.pyx
Line 363 in b9dd416
| cdef class ScalarFunction(Function): |
Perhaps you should implement a TableFunction class, and have this be a method on that class. So the API would be:
registry: FunctionRegistry
func: TableFunction = registry.get_function("my_table_function")
func.get_batch_iterator()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of encapsulating this in a class. However, I suspect these classes are supposed to correspond to arrow::compute::Function::Kind values, and adding a new value here would require many changes in Arrow where this enum is used. If so, I'd prefer to defer, or find a more localized alternative.
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think tabular UDFs would be a great feature. However, I think I would rather see them implemented as an exec node (they would be "batch UDFs"), so that they can run on greater than memory data.
For example, a good use case for this I think is the "drop_null" compute kernel (which we actually have today). Given a record batch, it will drop any rows that have a null value in any column (or one could imagine supplying a list of columns).
To implement this we would need options like:
class TableUdfOptions {
Expression table_udf;
};
Fitting a tabular UDF to an Expression would be a little tricky though. I think one way you could do it is to treat the "input schema" for the expression as a single column schema, where that one column is a struct array consisting of each of the columns of the table.
So, to drop nulls on the entire table it would just be drop_null(!0) and to drop nulls from a selection of columns you could do something like drop_null(select(!0), [0, 1, 3])
It might be worth checking how Ibis represents tabular functions too.
|
@westonpace I think you may misunderstand what this is adding; tabular function is a misnomer. These are more like UDF record batch generators or UDF source node. They are take no inputs, and emit record batches / struct arrays. |
You are correct, I did misunderstand. This does also seem to be the definition of table functions from the literature. Thanks for the clarification. In that case, could this be a source node? |
|
Sorry, I should have given more context for this PR. @wjones127's comment is correct. The concept of user-defined tabular (UDT) functions is clarified here. In quick-reply to a couple of questions about the naming:
I'm open to a different name than "tabular", in particular because it caused confusion. However, I'm not in favor of names with "batch" in them due to point 1 above. Could you propose a name considering the above 2 points? Still related to context for this PR: @wjones127, you asked why have an in-type parameter at all. While the current implementation does not handle UDT inputs, I'm designing the UDT API to support inputs - see this explanation. @westonpace, you asked whether this could be a source node. The same explanation discusses your proposal of a source node as an alternative. |
|
The CI jobs failures appear to be unrelated. In particular, no Python test, including |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few questions inline.
What's the ultimate goal here? Do you hope to support an exec plan node that acts as a source node and is given a function name? Or are you planning on purely using this via get_record_batches_from_tabular_function?
The short answer is yes. A longer answer is that such source nodes have a specific structure - they accept parameters in a standard way and employ a user-defined function for the implementation.
|
|
The CI job failures seem unrelated to this PR's changes. |
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry it took me so long to do an in-depth review of this. Overall I really like these changes, thank you for this.
I have some questions around terminology and structure but I do think I understand the concept now and I like the new functionality.
Thinking ahead I'm imagining both a "function source" node to represent a query like SELECT * FROM my_tabular_udf() and then also a "apply" node to represent something like SELECT * FROM my_table t CROSS APPLY my_tabular_udf(t.id). I think an "apply" node was recently proposed to substrait as well. Does this match your understanding?
| registry = compute::GetFunctionRegistry(); | ||
| } | ||
| ARROW_ASSIGN_OR_RAISE(auto func, registry->GetFunction(func_name)); | ||
| if (func->kind() != compute::Function::SCALAR) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can tackle this in a future PR but I think we will eventually want to introduce a new function kind for tabular functions. Or, if we have to pick one of the two options we have today, I think vector would be more accurate. Since we have no inputs I think we might get away with this for now but if we had inputs I would expect the length of the array returned from a scalar function to match the length of the incoming inputs and I don't think this will be true here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this some more, since "tabular"characterizes the output of a function whereas "scalar"/"vector" characterizes its unit of operation, a tabular function may be tabular-scalar or tabular-vector. So, I think the right way to capture a function being tabular is something to discuss (separately).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would an example of a tabular-scalar function be? Would that be a function that returns one row for each input row but might return many columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This particular line will still need to change at some point though right? Right now you're asserting this must be scalar but the tabular functions you are interested in (generators) are not scalar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this will need to change late when we distinguish between a tabular-attribute and a scalar-attribute of a function.
| if (array->length() == 0) { | ||
| return IterationTraits<std::shared_ptr<RecordBatch>>::End(); | ||
| } | ||
| return RecordBatchFromArray(std::move(schema), std::move(array)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now it seems a tabular function must return a struct array. I think that is "ok" but for consistency with the rest of the kernels might it be better to return an ExecSpan?
Of course, we have no concept of exec span in python so the python UDF would still return a struct array. The C++ wrapper could then convert that returned struct array into an exec span.
Although, continuing down this line, would it make more sense to have the python UDFs return record batches? pyarrow doesn't really have much concept / functionality for struct array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look into this. I vaguely remember that I had some reason for using StructArray, but maybe this can be worked out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewing this, I believe I used StructArray because the original result-handling code deals with arrays only, so introducing a new type of result should probably be done with care and consistency. You should know this original code - WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I think we are balancing between:
- Array-in / Array-out is a simple model so tabular functions can return arrays
- Struct arrays aren't used much in pyarrow but record batches are
I think there are merits to both. However, I don't feel too strongly about either choice. So we can always leave it how it is and adjust if needed. This feature will still be fairly experimental for a while I think.
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, meant to click "request changes" in that last review (makes it easier for me to keep track).
| // passed_length of -1 or 0 with args.size() of 0 leads to an empty ExecSpanIterator | ||
| // in exec.cc and to never invoking the source function, so 1 is passed instead | ||
| ARROW_ASSIGN_OR_RAISE(auto datum, func_exec->Execute(args, /*passed_length=*/1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should document then that the batch_size on the ctx is meaningless for UDTFs, which is implied by this, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. This statement is true for the recent commit in this PR. but it is not inherently true for UDTFs, since one can imagine a UDTF that respects an upper limit on the size of batches it outputs. So, I think we should document this statement and create an issue to fix this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to-do comment in recent commit.
| except StopIteration: | ||
| arrays = [pa.array([], type=field.type) | ||
| for field in schema] | ||
| batch = pa.RecordBatch.from_arrays( | ||
| arrays=arrays, schema=schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a good reason why we have to return an empty batch? It seems like it would be a lot easier to just have users write throw StopIteration() (or use the default behavior from a generator).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that, with the current design here, a UDTF must return an array (see this post). I don't know whether it's a good enough reason... If we'd like to allow a StopIteration, then I'd prefer this be done in a separate issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it is a good enough reason. Let's raise a new issue then to support natural stop iteration. I think people will expect to be able to write normal generators.
I think one way you could do this today would be, in register_tabular_udf, to wrap the iterator-maker with a function that wraps each iterator with something that converts stop iteration into an empty table.
I think the issue is that you've grouped both scalar and tabular UDFs into a common "udf" type. In other words, we have "regular UDFs" which have an init function and an execute function and we have "aggregate UDFs" which have an init function, a consume function, a merge function, and a finalize function.
What you've done here is use regular UDFs and allow the "init" function to be "given this input create a generator" and the "execute function" to be "return the next batch of data from the generator".
Since you're reusing the "regular UDF" you have to maintain the semantics of its "execute" function which is "batch-in array-out". You're passing nothing for "batch-in" (the args and batch length are passed to the init function) and then interpreting an empty array out as stop.
In theory I think this can all work. I suspect you may run into trouble once you want to start passing in arguments (you'd want those to go to the init function right? I'm not sure how you can do that). It smells a little bit to me of "using a hammer because you have one lying around and not because its the best tool for the job" but I think an alternative approach (creating a new function category) would be a considerable rewrite of the function registry because "function category" has always been a bit of an implicit concept in the function registry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with much of what @westonpace writes here.
What you've done here is use regular UDFs and allow the "init" function to be "given this input create a generator" and the "execute function" to be "return the next batch of data from the generator".
There are two main views for what's going on in this PR. One view is that a tabular UDF is not a scalar UDF, and they should be modeled separately, and hence this PR is abusing the scalar UDF construction. A second view is that both a scalar UDF and a tabular UDF are forms of a common concept, which is not explicit in the code - a kind of array provider (currently, stateless with arguments, or stateful without arguments) that is internally modeled by an "init" and an "execute" functions. I think both views are valid, and I take the latter view which is the easier one.
It smells a little bit to me of "using a hammer because you have one lying around and not because its the best tool for the job"
I believe this smell is an artifact of the first view and goes away with the second view. In particular, I think that if we decided to define a new kind of function category with the second view in mind, we'd probably be content with an "init" and "execute" functions anyway.
I suspect you may run into trouble once you want to start passing in arguments (you'd want those to go to the init function right? I'm not sure how you can do that).
A related discussion is here.
UDT arguments wouldn't be passed to the "init" nor to the "execute" function but to the registered UDT function (like udf_func in the tester) returned by the Python UDT maker (like make_udt_func in the tester). These arguments, or values derived from them, can be saved on the returned UDT instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding StopIteration, since it is handled by the UDT class, perhaps what we'd want is to make (something like) this class part of the API so that the user would only need to provide the generator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding StopIteration, since it is handled by the UDT class, perhaps what we'd want is to make (something like) this class part of the API so that the user would only need to provide the generator.
Agreed. That would be a good solution. It can wait for a future PR if needed.
UDT arguments wouldn't be passed to the "init" nor to the "execute" function but to the registered UDT function (like udf_func in the tester) returned by the Python UDT maker (like make_udt_func in the tester). These arguments, or values derived from them, can be saved on the returned UDT instance.
First, to clarify, by "UDT arguments" I am referring to the input rows (i.e. a UDT with non-null arity). I think that is what you are referring to as well. I agree that these args will need to be passed to the equivalent of udf_func. However, udf_func is currently called by the kernel's init function.
In other words, if I add these prints:
// udf.cc CallTabularFunction
std::cout << "About to get executor" << std::endl;
ARROW_ASSIGN_OR_RAISE(auto func_exec,
GetFunctionExecutor(func_name, in_types, NULLPTR, registry));
std::cout << "Retrieved executor" << std::endl;
// test_udf.py
def udf_func(ctx):
class UDT:
...
print("Creating UDT")
return UDT()
Then I see the output:
About to get executor
Creating UDT
Retrieved executor
The problem is that a kernel's init function does not accept arguments today (e.g. you can't have a unary or binary init function). So you will need to solve this problem when you're ready to create UDTs that have non-null arity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, to clarify, by "UDT arguments" I am referring to the input rows (i.e. a UDT with non-null arity). I think that is what you are referring to as well.
I think we're on the same page here, though I actually had in mind simpler arguments, like numbers or strings, rather than (a table with) input rows, which I guess is also possible.
The problem is that a kernel's init function does not accept arguments today (e.g. you can't have a unary or binary init function). So you will need to solve this problem when you're ready to create UDTs that have non-null arity.
Though I haven't validated this, the way I think it could work is via KernelInitArgs. That is, the arguments would be placed in an appropriate FunctionOptions and passed to FunctionExecutor::Init, which would deliver them to Kernel::Init. This would arrive at PythonTableUdfKernelInit::operator()(compute::KernelContext* ctx, const compute::KernelInitArgs& init_args), whose logic would be changed to extract arguments from init_args and place them in a tuple, replacing the current empty_tuple in the PR code. These arguments would then be passed via the cb callback as extra arguments (in addition to ctx) to udf_func, which would bake them into the UDT instance it returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. That would be a good solution. It can wait for a future PR if needed.
|
@westonpace, is this ready to go pending CI? |
|
|
westonpace
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite sure yet what we're converging too. In addition to my comments there is an ask in #14320 to simplify the way we register UDFs and I think that ask applies here as well.
I'm also a little concerned that, without some way to execute these UDFs in the exec plan, all we've really done is come up with a very complicated way of executing python generators. Are you planning on addressing the use of tabular UDFs in the exec plan soon? Can we create an issue for that?
| } | ||
| auto arity = func->arity(); | ||
| if (arity.num_args != 0 || arity.is_varargs) { | ||
| return Status::NotImplemented("tabular function of non-null arity"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| return Status::NotImplemented("tabular function of non-null arity"); | |
| return Status::NotImplemented("tabular function of non-null / vararg arity"); |
| def __call__(self, ctx): | ||
| try: | ||
| if self.caller is None: | ||
| self.caller, ctx = batch_gen(ctx).send, None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you assigning ctx to None here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is due to the Python generator protocol. When the UDT instance is initialized, it has self.caller set to None. The first time its __call__ method is invoke, it sets up self.caller as the send method of a generator, preparing to invoke it in the next line. The first invocation of this send method requires a None argument, which explains the assignment you asked about. In future invocations of __call__, the send method is passed ctx, which is returned by the yield expression in the generator.
| except StopIteration: | ||
| arrays = [pa.array([], type=field.type) | ||
| for field in schema] | ||
| batch = pa.RecordBatch.from_arrays( | ||
| arrays=arrays, schema=schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it is a good enough reason. Let's raise a new issue then to support natural stop iteration. I think people will expect to be able to write normal generators.
I think one way you could do this today would be, in register_tabular_udf, to wrap the iterator-maker with a function that wraps each iterator with something that converts stop iteration into an empty table.
I think the issue is that you've grouped both scalar and tabular UDFs into a common "udf" type. In other words, we have "regular UDFs" which have an init function and an execute function and we have "aggregate UDFs" which have an init function, a consume function, a merge function, and a finalize function.
What you've done here is use regular UDFs and allow the "init" function to be "given this input create a generator" and the "execute function" to be "return the next batch of data from the generator".
Since you're reusing the "regular UDF" you have to maintain the semantics of its "execute" function which is "batch-in array-out". You're passing nothing for "batch-in" (the args and batch length are passed to the init function) and then interpreting an empty array out as stop.
In theory I think this can all work. I suspect you may run into trouble once you want to start passing in arguments (you'd want those to go to the init function right? I'm not sure how you can do that). It smells a little bit to me of "using a hammer because you have one lying around and not because its the best tool for the job" but I think an alternative approach (creating a new function category) would be a considerable rewrite of the function registry because "function category" has always been a bit of an implicit concept in the function registry.
Do you mean as part of this PR or in a future one?
I believe the PR code supports executing a plan with a registered UDT, and it's just a matter of adding a test case or two to cover this. I could try to do that as part of this PR. Side note: I agree we have here a rather complicated way of Arrow operating Python user-code, be it for a generator or whatever function, but if so then that applies already to UDFs, not only to UDTs. |
Future PR is fine. I just want to make sure that:
|
True, but for UDFs we are executing them as part of a streaming execution. We haven't gotten there yet with UDTs (though I believe we can) |
OK. Is there an existing issue for this ask, or should one be created? |
I just created #33883 |
SQL server does this with |
Yes, my interest is in something closer to a source of data than to a cross-apply. See also this post and the explanation linked there. |
|
|
Benchmark runs are scheduled for baseline = 295c664 and contender = a1d9b51. a1d9b51 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
|
['Python', 'R'] benchmarks have high level of regressions. |
See https://issues.apache.org/jira/browse/ARROW-17676