diff --git a/docs/source/python/api/tables.rst b/docs/source/python/api/tables.rst index 6e7a3b6e155..4cdba24d6bf 100644 --- a/docs/source/python/api/tables.rst +++ b/docs/source/python/api/tables.rst @@ -43,6 +43,7 @@ Classes ChunkedArray RecordBatch Table + TableGroupBy .. _api.tensor: diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 1ec229d5381..f1e7ca90ad2 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -182,7 +182,7 @@ def show_versions(): from pyarrow._hdfsio import HdfsFile, have_libhdfs from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table, - concat_arrays, concat_tables) + concat_arrays, concat_tables, TableGroupBy) # Exceptions from pyarrow.lib import (ArrowCancelled, diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index d62c9c0ee85..d315cc40a2d 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -1294,3 +1294,30 @@ class TDigestOptions(_TDigestOptions): if not isinstance(q, (list, tuple, np.ndarray)): q = [q] self._set_options(q, delta, buffer_size, skip_nulls, min_count) + + +def _group_by(args, keys, aggregations): + cdef: + vector[CDatum] c_args + vector[CDatum] c_keys + vector[CAggregate] c_aggregations + CDatum result + CAggregate c_aggr + + _pack_compute_args(args, &c_args) + _pack_compute_args(keys, &c_keys) + + for aggr_func_name, aggr_opts in aggregations: + c_aggr.function = tobytes(aggr_func_name) + if aggr_opts is not None: + c_aggr.options = (aggr_opts).get_options() + else: + c_aggr.options = NULL + c_aggregations.push_back(c_aggr) + + with nogil: + result = GetResultValue( + GroupBy(c_args, c_keys, c_aggregations) + ) + + return wrap_datum(result) diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index 6e3bd7fcab3..0a9a660ccdc 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -70,6 +70,7 @@ function_registry, get_function, list_functions, + _group_by ) import inspect diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 815238f112c..a10bcf9f78f 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2217,6 +2217,17 @@ cdef extern from * namespace "arrow::compute": const CBuffer& buffer) +cdef extern from "arrow/compute/api_aggregate.h" namespace \ + "arrow::compute::internal" nogil: + cdef cppclass CAggregate "arrow::compute::internal::Aggregate": + c_string function + const CFunctionOptions* options + + CResult[CDatum] GroupBy(const vector[CDatum]& arguments, + const vector[CDatum]& keys, + const vector[CAggregate]& aggregates) + + cdef extern from "arrow/python/api.h" namespace "arrow::py": # Requires GIL CResult[shared_ptr[CDataType]] InferArrowType( diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 8105ce48294..d55f41d9efb 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2192,6 +2192,53 @@ cdef class Table(_PandasConvertible): return table + def group_by(self, keys): + """Declare a grouping over the columns of the table. + + Resulting grouping can then be used to perform aggregations + with a subsequent ``aggregate()`` method. + + Parameters + ---------- + keys : str or list[str] + Name of the columns that should be used as the grouping key. + + Returns + ------- + TableGroupBy + + See Also + -------- + TableGroupBy.aggregate + """ + return TableGroupBy(self, keys) + + def sort_by(self, sorting): + """ + Sort the table by one or multiple columns. + + Parameters + ---------- + sorting : str or list[tuple(name, order)] + Name of the column to use to sort (ascending), or + a list of multiple sorting conditions where + each entry is a tuple with column name + and sorting order ("ascending" or "descending") + + Returns + ------- + Table + A new table sorted according to the sort keys. + """ + if isinstance(sorting, str): + sorting = [(sorting, "ascending")] + + indices = _pc().sort_indices( + self, + sort_keys=sorting + ) + return self.take(indices) + def _reconstruct_table(arrays, schema): """ @@ -2387,3 +2434,74 @@ def _from_pydict(cls, mapping, schema, metadata): return cls.from_arrays(arrays, schema=schema, metadata=metadata) else: raise TypeError('Schema must be an instance of pyarrow.Schema') + + +class TableGroupBy: + """ + A grouping of columns in a table on which to perform aggregations. + """ + + def __init__(self, table, keys): + if isinstance(keys, str): + keys = [keys] + + self._table = table + self.keys = keys + + def aggregate(self, aggregations): + """ + Perform an aggregation over the grouped columns of the table. + + Parameters + ---------- + aggregations : list[tuple(str, str)] or \ +list[tuple(str, str, FunctionOptions)] + List of tuples made of aggregation column names followed + by function names and optionally aggregation function options. + + Returns + ------- + Table + Results of the aggregation functions. + + Example + ------- + >>> t = pa.table([ + ... pa.array(["a", "a", "b", "b", "c"]), + ... pa.array([1, 2, 3, 4, 5]), + ... ], names=["keys", "values"]) + >>> t.group_by("keys").aggregate([("values", "sum")]) + pyarrow.Table + values_sum: int64 + keys: string + ---- + values_sum: [[3,7,5]] + keys: [["a","b","c"]] + """ + columns = [a[0] for a in aggregations] + aggrfuncs = [ + (a[1], a[2]) if len(a) > 2 else (a[1], None) + for a in aggregations + ] + + group_by_aggrs = [] + for aggr in aggrfuncs: + if not aggr[0].startswith("hash_"): + aggr = ("hash_" + aggr[0], aggr[1]) + group_by_aggrs.append(aggr) + + # Build unique names for aggregation result columns + # so that it's obvious what they refer to. + column_names = [ + aggr_name.replace("hash", col_name) + for col_name, (aggr_name, _) in zip(columns, group_by_aggrs) + ] + self.keys + + result = _pc()._group_by( + [self._table[c] for c in columns], + [self._table[k] for k in self.keys], + group_by_aggrs + ) + + t = Table.from_batches([RecordBatch.from_struct_array(result)]) + return t.rename_columns(column_names) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index ef41a733de8..dd0ce074af3 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -24,6 +24,7 @@ import numpy as np import pytest import pyarrow as pa +import pyarrow.compute as pc def test_chunked_array_basics(): @@ -1746,3 +1747,130 @@ def test_table_select(): result = table.select(['f2']) expected = pa.table([a2], ['f2']) assert result.equals(expected) + + +def test_table_group_by(): + def sorted_by_keys(d): + # Ensure a guaranteed order of keys for aggregation results. + if "keys2" in d: + keys = tuple(zip(d["keys"], d["keys2"])) + else: + keys = d["keys"] + sorted_keys = sorted(keys) + sorted_d = {"keys": sorted(d["keys"])} + for entry in d: + if entry == "keys": + continue + values = dict(zip(keys, d[entry])) + for k in sorted_keys: + sorted_d.setdefault(entry, []).append(values[k]) + return sorted_d + + table = pa.table([ + pa.array(["a", "a", "b", "b", "c"]), + pa.array(["X", "X", "Y", "Z", "Z"]), + pa.array([1, 2, 3, 4, 5]), + pa.array([10, 20, 30, 40, 50]) + ], names=["keys", "keys2", "values", "bigvalues"]) + + r = table.group_by("keys").aggregate([ + ("values", "hash_sum") + ]) + assert sorted_by_keys(r.to_pydict()) == { + "keys": ["a", "b", "c"], + "values_sum": [3, 7, 5] + } + + r = table.group_by("keys").aggregate([ + ("values", "hash_sum"), + ("values", "hash_count") + ]) + assert sorted_by_keys(r.to_pydict()) == { + "keys": ["a", "b", "c"], + "values_sum": [3, 7, 5], + "values_count": [2, 2, 1] + } + + # Test without hash_ prefix + r = table.group_by("keys").aggregate([ + ("values", "sum") + ]) + assert sorted_by_keys(r.to_pydict()) == { + "keys": ["a", "b", "c"], + "values_sum": [3, 7, 5] + } + + r = table.group_by("keys").aggregate([ + ("values", "max"), + ("bigvalues", "sum") + ]) + assert sorted_by_keys(r.to_pydict()) == { + "keys": ["a", "b", "c"], + "values_max": [2, 4, 5], + "bigvalues_sum": [30, 70, 50] + } + + r = table.group_by("keys").aggregate([ + ("bigvalues", "max"), + ("values", "sum") + ]) + assert sorted_by_keys(r.to_pydict()) == { + "keys": ["a", "b", "c"], + "values_sum": [3, 7, 5], + "bigvalues_max": [20, 40, 50] + } + + r = table.group_by(["keys", "keys2"]).aggregate([ + ("values", "sum") + ]) + assert sorted_by_keys(r.to_pydict()) == { + "keys": ["a", "b", "b", "c"], + "keys2": ["X", "Y", "Z", "Z"], + "values_sum": [3, 3, 4, 5] + } + + table_with_nulls = pa.table([ + pa.array(["a", "a", "a"]), + pa.array([1, None, None]) + ], names=["keys", "values"]) + + r = table_with_nulls.group_by(["keys"]).aggregate([ + ("values", "count", pc.CountOptions(mode="all")) + ]) + assert r.to_pydict() == { + "keys": ["a"], + "values_count": [3] + } + + r = table_with_nulls.group_by(["keys"]).aggregate([ + ("values", "count", pc.CountOptions(mode="only_null")) + ]) + assert r.to_pydict() == { + "keys": ["a"], + "values_count": [2] + } + + r = table_with_nulls.group_by(["keys"]).aggregate([ + ("values", "count", pc.CountOptions(mode="only_valid")) + ]) + assert r.to_pydict() == { + "keys": ["a"], + "values_count": [1] + } + + +def test_table_sort_by(): + table = pa.table([ + pa.array([3, 1, 4, 2, 5]), + pa.array(["b", "a", "b", "a", "c"]), + ], names=["values", "keys"]) + + assert table.sort_by("values").to_pydict() == { + "keys": ["a", "a", "b", "b", "c"], + "values": [1, 2, 3, 4, 5] + } + + assert table.sort_by([("values", "descending")]).to_pydict() == { + "keys": ["c", "b", "b", "a", "a"], + "values": [5, 4, 3, 2, 1] + }