Skip to content

Conversation

@vibhatha
Copy link
Contributor

@vibhatha vibhatha commented Oct 27, 2022

Initial Draft of Aggregate UDFs for Python

  • Improve the existing logic with more tests
  • Find out a missing aggregate function and try to apply it with the current API (check if the applied theory sustains)
  • Using a custom object for state holding rather than Arrow data structures
  • Improvement function docs and comments
  • Adding binary and other arity tests
  • Self-review 1
  • Self-review 2

@github-actions
Copy link

@github-actions
Copy link

⚠️ Ticket has no components in JIRA, make sure you assign one.

@github-actions
Copy link

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@vibhatha
Copy link
Contributor Author

vibhatha commented Nov 2, 2022

cc @westonpace appreciate an initial review on the draft PR.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shape of this looks correct. I believe you are on the right track. I think I would like to see at least one example of using a custom UDF in an exec plan. Also, similar to scalar UDFs, I think we will eventually want an example that uses something other than pyarrow (e.g. numpy) to do the computation (at least for the consume function)

Comment on lines +514 to +520
@property
def non_null(self):
return self._non_null

@non_null.setter
def non_null(self, value):
self._non_null = value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not much of a python expert but getters and setters seem like overkill here. Are they needed?

Comment on lines +529 to +530
state = State(0)
return state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
state = State(0)
return state
return State(0)

def consume(ctx, x):
if isinstance(x, pa.Array):
non_null = pc.sum(pc.invert(pc.is_nan(x))).as_py()
elif isinstance(x, pa.Scalar):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a unary aggregate ever be called with a scalar?

def finalize(ctx):
return pa.array([ctx.state.non_null])

func_name = "simple_count"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe valid_count or non_null_count?

Comment on lines +571 to +585
@property
def non_null(self):
return self._non_null

@non_null.setter
def non_null(self, value):
self._non_null = value

@property
def null(self):
return self._null

@null.setter
def null(self, value):
self._null = value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment on getters and setters

Comment on lines +2748 to +2750
To define a varargs function, pass a callable that takes
varargs. The last in_type will be the type of all varargs
arguments.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do varargs work? Do I define a *args?

Comment on lines +2760 to +2763
must be merged with. The current state can be retrieved from
the context object which can be acessed by `context.state`.
The state doesn't need to be set in the Python side and it is
autonomously handled in the C++ backend. The updated state must
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the sentence that starts with "The state doesn't need to be set..."

This function returns the updated state after consuming the
received data.
merge_func: callable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concept of a "merge function" is not going to be obvious to most users. It is very possible for an engine to be defined that does not have to worry about the concept of a merge. I think we need to describe some background here for why a merge is needed in the first place. Something like:

Aggregates may be calculated across many threads in parallel. Each thread will call the init function once to generate a state for that thread. Once all values have been consumed then the threads from each state will be merged together to get the final result state. The merge function should take two states and combine them.

The first argument is the context argument of type
ScalarUdfContext.
Using the context argument the state can be extracted and return
type must be an array matching the `out_type`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your C++ example the return type is a scalar?

A callable implementing the user-defined finalize function.
The first argument is the context argument of type
ScalarUdfContext.
Using the context argument the state can be extracted and return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the state can be extracted" is not very obvious. Maybe something like:

The purpose of the finalize function is to transform the state (which is available in the context argument) into an array. This array will be the final result of the aggregation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants