Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/python/api/tables.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Classes
ChunkedArray
RecordBatch
Table
TableGroupBy

.. _api.tensor:

Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def show_versions():
from pyarrow._hdfsio import HdfsFile, have_libhdfs

from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table,
concat_arrays, concat_tables)
concat_arrays, concat_tables, TableGroupBy)

# Exceptions
from pyarrow.lib import (ArrowCancelled,
Expand Down
27 changes: 27 additions & 0 deletions python/pyarrow/_compute.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1294,3 +1294,30 @@ class TDigestOptions(_TDigestOptions):
if not isinstance(q, (list, tuple, np.ndarray)):
q = [q]
self._set_options(q, delta, buffer_size, skip_nulls, min_count)


def _group_by(args, keys, aggregations):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also make this a public function in the compute module?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, should we? I made it internal because we plan to replace this with the exec engine on long term, so I guess that the Table.group_by implementation will switch to use something different in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it internal because we plan to replace this with the exec engine on long term, so I guess that the Table.group_by implementation will switch to use something different in the future.

The same could be done for a pyarrow.compute function? (it doesn't map 1:1 to a C++ kernel anyway)

For me one reason to put it in the compute functions as a pc.group_by(table, keys, ...) is to sidestep the 1-step vs 2-step API discussion for the method a bit. For a function in compute, I think it's totally fine to be a one step function

cdef:
vector[CDatum] c_args
vector[CDatum] c_keys
vector[CAggregate] c_aggregations
CDatum result
CAggregate c_aggr

_pack_compute_args(args, &c_args)
_pack_compute_args(keys, &c_keys)

for aggr_func_name, aggr_opts in aggregations:
c_aggr.function = tobytes(aggr_func_name)
if aggr_opts is not None:
c_aggr.options = (<FunctionOptions?>aggr_opts).get_options()
else:
c_aggr.options = NULL
c_aggregations.push_back(c_aggr)

with nogil:
result = GetResultValue(
GroupBy(c_args, c_keys, c_aggregations)
)

return wrap_datum(result)
1 change: 1 addition & 0 deletions python/pyarrow/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
function_registry,
get_function,
list_functions,
_group_by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reaons for exposing this publicly? Is this just a leftover from previous attempts?

Copy link
Member Author

@amol- amol- Nov 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to make it available when _pc() is used to access compute functions from other modules/files (in this case from table.pxi). I adhered to that practice instead of injecting a import pyarrow._compute.

Given that there are many more internal functions in the pyarrow.compute module I thought it wasn't concerning.

)

import inspect
Expand Down
11 changes: 11 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2217,6 +2217,17 @@ cdef extern from * namespace "arrow::compute":
const CBuffer& buffer)


cdef extern from "arrow/compute/api_aggregate.h" namespace \
"arrow::compute::internal" nogil:
cdef cppclass CAggregate "arrow::compute::internal::Aggregate":
c_string function
const CFunctionOptions* options

CResult[CDatum] GroupBy(const vector[CDatum]& arguments,
const vector[CDatum]& keys,
const vector[CAggregate]& aggregates)


cdef extern from "arrow/python/api.h" namespace "arrow::py":
# Requires GIL
CResult[shared_ptr[CDataType]] InferArrowType(
Expand Down
118 changes: 118 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -2192,6 +2192,53 @@ cdef class Table(_PandasConvertible):

return table

def group_by(self, keys):
"""Declare a grouping over the columns of the table.

Resulting grouping can then be used to perform aggregations
with a subsequent ``aggregate()`` method.

Parameters
----------
keys : str or list[str]
Name of the columns that should be used as the grouping key.

Returns
-------
TableGroupBy

See Also
--------
TableGroupBy.aggregate
"""
return TableGroupBy(self, keys)

def sort_by(self, sorting):
"""
Sort the table by one or multiple columns.

Parameters
----------
sorting : str or list[tuple(name, order)]
Name of the column to use to sort (ascending), or
a list of multiple sorting conditions where
each entry is a tuple with column name
and sorting order ("ascending" or "descending")

Returns
-------
Table
A new table sorted according to the sort keys.
"""
if isinstance(sorting, str):
sorting = [(sorting, "ascending")]

indices = _pc().sort_indices(
self,
sort_keys=sorting
)
return self.take(indices)


def _reconstruct_table(arrays, schema):
"""
Expand Down Expand Up @@ -2387,3 +2434,74 @@ def _from_pydict(cls, mapping, schema, metadata):
return cls.from_arrays(arrays, schema=schema, metadata=metadata)
else:
raise TypeError('Schema must be an instance of pyarrow.Schema')


class TableGroupBy:
"""
A grouping of columns in a table on which to perform aggregations.
"""

def __init__(self, table, keys):
if isinstance(keys, str):
keys = [keys]

self._table = table
self.keys = keys

def aggregate(self, aggregations):
"""
Perform an aggregation over the grouped columns of the table.

Parameters
----------
aggregations : list[tuple(str, str)] or \
list[tuple(str, str, FunctionOptions)]
List of tuples made of aggregation column names followed
by function names and optionally aggregation function options.

Returns
-------
Table
Results of the aggregation functions.

Example
-------
>>> t = pa.table([
... pa.array(["a", "a", "b", "b", "c"]),
... pa.array([1, 2, 3, 4, 5]),
... ], names=["keys", "values"])
>>> t.group_by("keys").aggregate([("values", "sum")])
pyarrow.Table
values_sum: int64
keys: string
----
values_sum: [[3,7,5]]
keys: [["a","b","c"]]
"""
columns = [a[0] for a in aggregations]
aggrfuncs = [
(a[1], a[2]) if len(a) > 2 else (a[1], None)
for a in aggregations
]

group_by_aggrs = []
for aggr in aggrfuncs:
if not aggr[0].startswith("hash_"):
aggr = ("hash_" + aggr[0], aggr[1])
group_by_aggrs.append(aggr)

# Build unique names for aggregation result columns
# so that it's obvious what they refer to.
column_names = [
aggr_name.replace("hash", col_name)
for col_name, (aggr_name, _) in zip(columns, group_by_aggrs)
] + self.keys

result = _pc()._group_by(
[self._table[c] for c in columns],
[self._table[k] for k in self.keys],
group_by_aggrs
)

t = Table.from_batches([RecordBatch.from_struct_array(result)])
return t.rename_columns(column_names)
128 changes: 128 additions & 0 deletions python/pyarrow/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import numpy as np
import pytest
import pyarrow as pa
import pyarrow.compute as pc


def test_chunked_array_basics():
Expand Down Expand Up @@ -1746,3 +1747,130 @@ def test_table_select():
result = table.select(['f2'])
expected = pa.table([a2], ['f2'])
assert result.equals(expected)


def test_table_group_by():
def sorted_by_keys(d):
# Ensure a guaranteed order of keys for aggregation results.
if "keys2" in d:
keys = tuple(zip(d["keys"], d["keys2"]))
else:
keys = d["keys"]
sorted_keys = sorted(keys)
sorted_d = {"keys": sorted(d["keys"])}
for entry in d:
if entry == "keys":
continue
values = dict(zip(keys, d[entry]))
for k in sorted_keys:
sorted_d.setdefault(entry, []).append(values[k])
return sorted_d

table = pa.table([
pa.array(["a", "a", "b", "b", "c"]),
pa.array(["X", "X", "Y", "Z", "Z"]),
pa.array([1, 2, 3, 4, 5]),
pa.array([10, 20, 30, 40, 50])
], names=["keys", "keys2", "values", "bigvalues"])

r = table.group_by("keys").aggregate([
("values", "hash_sum")
])
assert sorted_by_keys(r.to_pydict()) == {
"keys": ["a", "b", "c"],
"values_sum": [3, 7, 5]
}

r = table.group_by("keys").aggregate([
("values", "hash_sum"),
("values", "hash_count")
])
assert sorted_by_keys(r.to_pydict()) == {
"keys": ["a", "b", "c"],
"values_sum": [3, 7, 5],
"values_count": [2, 2, 1]
}

# Test without hash_ prefix
r = table.group_by("keys").aggregate([
("values", "sum")
])
assert sorted_by_keys(r.to_pydict()) == {
"keys": ["a", "b", "c"],
"values_sum": [3, 7, 5]
}

r = table.group_by("keys").aggregate([
("values", "max"),
("bigvalues", "sum")
])
assert sorted_by_keys(r.to_pydict()) == {
"keys": ["a", "b", "c"],
"values_max": [2, 4, 5],
"bigvalues_sum": [30, 70, 50]
}

r = table.group_by("keys").aggregate([
("bigvalues", "max"),
("values", "sum")
])
assert sorted_by_keys(r.to_pydict()) == {
"keys": ["a", "b", "c"],
"values_sum": [3, 7, 5],
"bigvalues_max": [20, 40, 50]
}

r = table.group_by(["keys", "keys2"]).aggregate([
("values", "sum")
])
assert sorted_by_keys(r.to_pydict()) == {
"keys": ["a", "b", "b", "c"],
"keys2": ["X", "Y", "Z", "Z"],
"values_sum": [3, 3, 4, 5]
}

table_with_nulls = pa.table([
pa.array(["a", "a", "a"]),
pa.array([1, None, None])
], names=["keys", "values"])

r = table_with_nulls.group_by(["keys"]).aggregate([
("values", "count", pc.CountOptions(mode="all"))
])
assert r.to_pydict() == {
"keys": ["a"],
"values_count": [3]
}

r = table_with_nulls.group_by(["keys"]).aggregate([
("values", "count", pc.CountOptions(mode="only_null"))
])
assert r.to_pydict() == {
"keys": ["a"],
"values_count": [2]
}

r = table_with_nulls.group_by(["keys"]).aggregate([
("values", "count", pc.CountOptions(mode="only_valid"))
])
assert r.to_pydict() == {
"keys": ["a"],
"values_count": [1]
}


def test_table_sort_by():
table = pa.table([
pa.array([3, 1, 4, 2, 5]),
pa.array(["b", "a", "b", "a", "c"]),
], names=["values", "keys"])

assert table.sort_by("values").to_pydict() == {
"keys": ["a", "a", "b", "b", "c"],
"values": [1, 2, 3, 4, 5]
}

assert table.sort_by([("values", "descending")]).to_pydict() == {
"keys": ["c", "b", "b", "a", "a"],
"values": [5, 4, 3, 2, 1]
}