From 67f287cd5461c7ea77a58e42f1db06b4b8701073 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 5 Nov 2021 11:15:22 +0100 Subject: [PATCH 01/26] Initial Experiment --- cpp/src/arrow/compute/function.cc | 2 ++ cpp/src/arrow/compute/kernel.cc | 4 ++++ python/pyarrow/_compute.pyx | 25 +++++++++++++++++++++++++ python/pyarrow/compute.py | 1 + python/pyarrow/includes/libarrow.pxd | 9 +++++++++ 5 files changed, 41 insertions(+) diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index dda5788c54c..1f5883da379 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include "arrow/compute/api_scalar.h" #include "arrow/compute/cast.h" @@ -119,6 +120,7 @@ const KernelType* DispatchExactImpl(const std::vector& kernels, // Validate arity for (const auto& kernel : kernels) { + std::cout << "KERNEL " << kernel->signature->MatchesInputs(values) << " " << values.size() << std::endl; if (kernel->signature->MatchesInputs(values)) { kernel_matches[kernel->simd_level] = kernel; } diff --git a/cpp/src/arrow/compute/kernel.cc b/cpp/src/arrow/compute/kernel.cc index 666b73e415c..772c98c25b3 100644 --- a/cpp/src/arrow/compute/kernel.cc +++ b/cpp/src/arrow/compute/kernel.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include "arrow/buffer.h" #include "arrow/compute/exec.h" @@ -449,6 +450,8 @@ bool KernelSignature::Equals(const KernelSignature& other) const { } bool KernelSignature::MatchesInputs(const std::vector& args) const { + std::cout << "MatchesInputs: " << in_types_.size() << std::endl; + if (is_varargs_) { for (size_t i = 0; i < args.size(); ++i) { if (!in_types_[std::min(i, in_types_.size() - 1)].Matches(args[i])) { @@ -460,6 +463,7 @@ bool KernelSignature::MatchesInputs(const std::vector& args) const { return false; } for (size_t i = 0; i < in_types_.size(); ++i) { + std::cout << " MatchesInputs#" << i << ": " << in_types_[i].ToString() << " " << in_types_[i].Matches(args[i]) << std::endl; if (!in_types_[i].Matches(args[i])) { return false; } diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index d62c9c0ee85..3cc8c6005a3 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -1294,3 +1294,28 @@ class TDigestOptions(_TDigestOptions): if not isinstance(q, (list, tuple, np.ndarray)): q = [q] self._set_options(q, delta, buffer_size, skip_nulls, min_count) + + +def group_by(args, keys, aggregation): + cdef: + vector[CDatum] c_args + vector[CDatum] c_keys + vector[CAggregate] c_aggregations + CDatum result + CAggregate c_aggr + + + _pack_compute_args(args, &c_args) + _pack_compute_args(keys, &c_keys) + + c_aggr.function = aggregation + c_aggregations.push_back(c_aggr) + + with nogil: + result = GetResultValue( + GroupBy(c_args, c_keys, c_aggregations) + ) + + return wrap_datum(result) + + diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index 6e3bd7fcab3..148142bc7ab 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -70,6 +70,7 @@ function_registry, get_function, list_functions, + group_by ) import inspect diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 815238f112c..81c41dc13cb 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2217,6 +2217,15 @@ cdef extern from * namespace "arrow::compute": const CBuffer& buffer) +cdef extern from "arrow/compute/api_aggregate.h" namespace "arrow::compute::internal" nogil: + cdef cppclass CAggregate "arrow::compute::internal::Aggregate": + c_string function + + CResult[CDatum] GroupBy(const vector[CDatum]& arguments, + const vector[CDatum]& keys, + const vector[CAggregate]& aggregates) + + cdef extern from "arrow/python/api.h" namespace "arrow::py": # Requires GIL CResult[shared_ptr[CDataType]] InferArrowType( From 039b8deb6aa4fbbdd3f80a5a7906377cdb42024e Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 5 Nov 2021 11:21:35 +0100 Subject: [PATCH 02/26] Fix --- python/pyarrow/_compute.pyx | 1 + python/pyarrow/includes/libarrow.pxd | 1 + 2 files changed, 2 insertions(+) diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index 3cc8c6005a3..c3b620779d9 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -1309,6 +1309,7 @@ def group_by(args, keys, aggregation): _pack_compute_args(keys, &c_keys) c_aggr.function = aggregation + c_aggr.options = NULL c_aggregations.push_back(c_aggr) with nogil: diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 81c41dc13cb..4dbc0d712b6 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2220,6 +2220,7 @@ cdef extern from * namespace "arrow::compute": cdef extern from "arrow/compute/api_aggregate.h" namespace "arrow::compute::internal" nogil: cdef cppclass CAggregate "arrow::compute::internal::Aggregate": c_string function + const CFunctionOptions* options CResult[CDatum] GroupBy(const vector[CDatum]& arguments, const vector[CDatum]& keys, From 2cfc926f5f713218569333227db9f5844bec9673 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 5 Nov 2021 15:26:08 +0100 Subject: [PATCH 03/26] Refine --- python/pyarrow/_compute.pyx | 17 +++++++++-------- python/pyarrow/includes/libarrow.pxd | 9 +++++---- python/pyarrow/table.pxi | 21 +++++++++++++++++++++ python/pyarrow/tests/test_table.py | 26 ++++++++++++++++++++++++++ 4 files changed, 61 insertions(+), 12 deletions(-) diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index c3b620779d9..53608a9ff24 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -1296,7 +1296,7 @@ class TDigestOptions(_TDigestOptions): self._set_options(q, delta, buffer_size, skip_nulls, min_count) -def group_by(args, keys, aggregation): +def group_by(args, keys, aggregations): cdef: vector[CDatum] c_args vector[CDatum] c_keys @@ -1304,19 +1304,20 @@ def group_by(args, keys, aggregation): CDatum result CAggregate c_aggr - _pack_compute_args(args, &c_args) _pack_compute_args(keys, &c_keys) - c_aggr.function = aggregation - c_aggr.options = NULL - c_aggregations.push_back(c_aggr) + for aggr_func_name, aggr_opts in aggregations: + c_aggr.function = tobytes(aggr_func_name) + if aggr_opts is not None: + c_aggr.options = (aggr_opts).get_options() + else: + c_aggr.options = NULL + c_aggregations.push_back(c_aggr) with nogil: result = GetResultValue( GroupBy(c_args, c_keys, c_aggregations) ) - return wrap_datum(result) - - + return wrap_datum(result) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 4dbc0d712b6..a10bcf9f78f 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2217,12 +2217,13 @@ cdef extern from * namespace "arrow::compute": const CBuffer& buffer) -cdef extern from "arrow/compute/api_aggregate.h" namespace "arrow::compute::internal" nogil: +cdef extern from "arrow/compute/api_aggregate.h" namespace \ + "arrow::compute::internal" nogil: cdef cppclass CAggregate "arrow::compute::internal::Aggregate": - c_string function + c_string function const CFunctionOptions* options - - CResult[CDatum] GroupBy(const vector[CDatum]& arguments, + + CResult[CDatum] GroupBy(const vector[CDatum]& arguments, const vector[CDatum]& keys, const vector[CAggregate]& aggregates) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 8105ce48294..2e32cc29944 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2192,6 +2192,27 @@ cdef class Table(_PandasConvertible): return table + def group_by(self, key, columns, aggregations): + """ + Perform a group by aggregation over the columns of the table. + """ + from pyarrow._compute import group_by + + if isinstance(aggregations, str): + aggregations = [aggregations] + + aggrs = [] + for aggr in aggregations: + if isinstance(aggr, str): + aggr = (aggr, None) + aggrs.append(aggr) + + return group_by( + [self[c] for c in columns], + [self[key]], + aggrs + ) + def _reconstruct_table(arrays, schema): """ diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index ef41a733de8..fac3306d891 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1746,3 +1746,29 @@ def test_table_select(): result = table.select(['f2']) expected = pa.table([a2], ['f2']) assert result.equals(expected) + + +def test_table_group_by(): + table = pa.table([ + pa.array([1, 2, 3, 4, 5]), + pa.array(["a", "a", "b", "b", "c"]), + ], names=["values", "keys"]) + + r = table.group_by("keys", ["values"], "hash_sum") + + r_asdict = {v["key_0"].as_py(): v["hash_sum"].as_py() for v in r} + assert r_asdict == {'a': 3, 'b': 7, 'c': 5} + + r = table.group_by("keys", ["values", "values"], [ + "hash_sum", "hash_count"]) + assert sorted(r.to_pylist(), key=lambda x: x["key_0"]) == [ + {'hash_count': 2, + 'hash_sum': 3, + 'key_0': 'a'}, + {'hash_count': 2, + 'hash_sum': 7, + 'key_0': 'b'}, + {'hash_count': 1, + 'hash_sum': 5, + 'key_0': 'c'}, + ] From 9329bcfd896c3f29affe66e77921601cdc73a690 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 5 Nov 2021 15:29:12 +0100 Subject: [PATCH 04/26] Remove stray prints --- cpp/src/arrow/compute/function.cc | 2 -- cpp/src/arrow/compute/kernel.cc | 4 ---- 2 files changed, 6 deletions(-) diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 1f5883da379..dda5788c54c 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -20,7 +20,6 @@ #include #include #include -#include #include "arrow/compute/api_scalar.h" #include "arrow/compute/cast.h" @@ -120,7 +119,6 @@ const KernelType* DispatchExactImpl(const std::vector& kernels, // Validate arity for (const auto& kernel : kernels) { - std::cout << "KERNEL " << kernel->signature->MatchesInputs(values) << " " << values.size() << std::endl; if (kernel->signature->MatchesInputs(values)) { kernel_matches[kernel->simd_level] = kernel; } diff --git a/cpp/src/arrow/compute/kernel.cc b/cpp/src/arrow/compute/kernel.cc index 772c98c25b3..666b73e415c 100644 --- a/cpp/src/arrow/compute/kernel.cc +++ b/cpp/src/arrow/compute/kernel.cc @@ -21,7 +21,6 @@ #include #include #include -#include #include "arrow/buffer.h" #include "arrow/compute/exec.h" @@ -450,8 +449,6 @@ bool KernelSignature::Equals(const KernelSignature& other) const { } bool KernelSignature::MatchesInputs(const std::vector& args) const { - std::cout << "MatchesInputs: " << in_types_.size() << std::endl; - if (is_varargs_) { for (size_t i = 0; i < args.size(); ++i) { if (!in_types_[std::min(i, in_types_.size() - 1)].Matches(args[i])) { @@ -463,7 +460,6 @@ bool KernelSignature::MatchesInputs(const std::vector& args) const { return false; } for (size_t i = 0; i < in_types_.size(); ++i) { - std::cout << " MatchesInputs#" << i << ": " << in_types_[i].ToString() << " " << in_types_[i].Matches(args[i]) << std::endl; if (!in_types_[i].Matches(args[i])) { return false; } From d04716e5f8ee89dd36216ea37a0503842380a221 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 5 Nov 2021 15:35:27 +0100 Subject: [PATCH 05/26] Further tweak --- python/pyarrow/_compute.pyx | 2 +- python/pyarrow/compute.py | 1 - python/pyarrow/table.pxi | 4 ++-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index 53608a9ff24..d315cc40a2d 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -1296,7 +1296,7 @@ class TDigestOptions(_TDigestOptions): self._set_options(q, delta, buffer_size, skip_nulls, min_count) -def group_by(args, keys, aggregations): +def _group_by(args, keys, aggregations): cdef: vector[CDatum] c_args vector[CDatum] c_keys diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index 148142bc7ab..6e3bd7fcab3 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -70,7 +70,6 @@ function_registry, get_function, list_functions, - group_by ) import inspect diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 2e32cc29944..777f0f1678c 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2196,7 +2196,7 @@ cdef class Table(_PandasConvertible): """ Perform a group by aggregation over the columns of the table. """ - from pyarrow._compute import group_by + from pyarrow._compute import _group_by if isinstance(aggregations, str): aggregations = [aggregations] @@ -2207,7 +2207,7 @@ cdef class Table(_PandasConvertible): aggr = (aggr, None) aggrs.append(aggr) - return group_by( + return _group_by( [self[c] for c in columns], [self[key]], aggrs From c70cbc6ce1e1ff58c653aca07dfd7975c0c955bf Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 5 Nov 2021 17:13:53 +0100 Subject: [PATCH 06/26] Docstring --- python/pyarrow/table.pxi | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 777f0f1678c..c4029f25ccb 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2195,6 +2195,22 @@ cdef class Table(_PandasConvertible): def group_by(self, key, columns, aggregations): """ Perform a group by aggregation over the columns of the table. + + Parameters + ---------- + key : str + Name of the column that should be used as the grouping key. + columns : list of str + Names of the columns that contain values for the aggregations. + aggregations : str or list of str or list of tuple(str, FunctionOptions) + Name of the hash aggregation function, or list of aggregation + function names or list of aggregation function names together + with their options. + + Returns + ------- + StructArray + Results of the aggregation functions. """ from pyarrow._compute import _group_by From a32a31480419a84c43eaf791443210ce5480c018 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 10 Nov 2021 11:59:13 +0100 Subject: [PATCH 07/26] Support omitting hash_ prefix --- python/pyarrow/table.pxi | 2 ++ python/pyarrow/tests/test_table.py | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index c4029f25ccb..2579373822d 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2221,6 +2221,8 @@ cdef class Table(_PandasConvertible): for aggr in aggregations: if isinstance(aggr, str): aggr = (aggr, None) + if not aggr[0].startswith("hash_"): + aggr = ("hash_" + aggr[0], aggr[1]) aggrs.append(aggr) return _group_by( diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index fac3306d891..6ca968d48cf 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1772,3 +1772,8 @@ def test_table_group_by(): 'hash_sum': 5, 'key_0': 'c'}, ] + + # Test without hash_ prefix + r = table.group_by("keys", ["values"], "sum") + r_asdict = {v["key_0"].as_py(): v["hash_sum"].as_py() for v in r} + assert r_asdict == {'a': 3, 'b': 7, 'c': 5} \ No newline at end of file From d19f61d7e7af4bc4b5c0ec1c055c5601ebeea271 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 10 Nov 2021 12:19:20 +0100 Subject: [PATCH 08/26] Lint --- python/pyarrow/table.pxi | 2 +- python/pyarrow/tests/test_table.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 2579373822d..1429fcd36a7 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2202,7 +2202,7 @@ cdef class Table(_PandasConvertible): Name of the column that should be used as the grouping key. columns : list of str Names of the columns that contain values for the aggregations. - aggregations : str or list of str or list of tuple(str, FunctionOptions) + aggregations : str or list[str] or list of tuple(str, FunctionOptions) Name of the hash aggregation function, or list of aggregation function names or list of aggregation function names together with their options. diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 6ca968d48cf..e9f07f1e484 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1776,4 +1776,4 @@ def test_table_group_by(): # Test without hash_ prefix r = table.group_by("keys", ["values"], "sum") r_asdict = {v["key_0"].as_py(): v["hash_sum"].as_py() for v in r} - assert r_asdict == {'a': 3, 'b': 7, 'c': 5} \ No newline at end of file + assert r_asdict == {'a': 3, 'b': 7, 'c': 5} From 9e2c16f3f72800a97556cc8713bfb6f88c276370 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 10 Nov 2021 15:05:21 +0100 Subject: [PATCH 09/26] Add helper for sorting --- python/pyarrow/compute.py | 1 + python/pyarrow/table.pxi | 30 +++++++++++++++++++++++++++--- python/pyarrow/tests/test_table.py | 17 +++++++++++++++++ 3 files changed, 45 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index 6e3bd7fcab3..0a9a660ccdc 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -70,6 +70,7 @@ function_registry, get_function, list_functions, + _group_by ) import inspect diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 1429fcd36a7..d37b3abdd1c 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2212,8 +2212,6 @@ cdef class Table(_PandasConvertible): StructArray Results of the aggregation functions. """ - from pyarrow._compute import _group_by - if isinstance(aggregations, str): aggregations = [aggregations] @@ -2225,12 +2223,38 @@ cdef class Table(_PandasConvertible): aggr = ("hash_" + aggr[0], aggr[1]) aggrs.append(aggr) - return _group_by( + return _pc()._group_by( [self[c] for c in columns], [self[key]], aggrs ) + def sort_by(self, sorting): + """ + Sort the table by one or multiple columns. + + Parameters + ---------- + sorting : str or list[tuple(name, order)] + Name of the column to use to sort, or + a list of multiple sorting conditions where + each entry is a tuple with column name + and sorting order ("ascending" or "descending") + + Returns + ------- + Table + A new table sorted according to the sort key. + """ + if isinstance(sorting, str): + sorting = [(sorting, "ascending")] + + indices = _pc().sort_indices( + self, + sort_keys=sorting + ) + return self.take(indices) + def _reconstruct_table(arrays, schema): """ diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index e9f07f1e484..ba9523df0a2 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1777,3 +1777,20 @@ def test_table_group_by(): r = table.group_by("keys", ["values"], "sum") r_asdict = {v["key_0"].as_py(): v["hash_sum"].as_py() for v in r} assert r_asdict == {'a': 3, 'b': 7, 'c': 5} + + +def test_table_sort_by(): + table = pa.table([ + pa.array([3, 1, 4, 2, 5]), + pa.array(["b", "a", "b", "a", "c"]), + ], names=["values", "keys"]) + + table.sort_by("values").to_pydict() == { + "keys": ["a", "a", "b", "b", "c"], + "values": [1, 2, 3, 4, 5] + } + + table.sort_by([("values", "descending")]).to_pydict() == { + "keys": ["c", "b", "b", "a", "a"], + "values": [5, 4, 3, 2, 1] + } From 05aaafb7e4133f9fb872427ef2244e237d2150cd Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 10 Nov 2021 16:07:29 +0100 Subject: [PATCH 10/26] Generate a Table as the output of Table.group_by --- python/pyarrow/table.pxi | 12 ++++++++++-- python/pyarrow/tests/test_table.py | 29 +++++++++++++---------------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index d37b3abdd1c..c77d502c76c 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2209,26 +2209,34 @@ cdef class Table(_PandasConvertible): Returns ------- - StructArray + Table Results of the aggregation functions. """ if isinstance(aggregations, str): aggregations = [aggregations] + column_names = [] aggrs = [] for aggr in aggregations: if isinstance(aggr, str): aggr = (aggr, None) if not aggr[0].startswith("hash_"): aggr = ("hash_" + aggr[0], aggr[1]) + column_names.append(aggr[0]) aggrs.append(aggr) - return _pc()._group_by( + result = _pc()._group_by( [self[c] for c in columns], [self[key]], aggrs ) + t = Table.from_batches([RecordBatch.from_struct_array(result)]) + return t.rename_columns([ + cname if cname not in column_names else cname.replace("hash_", "") + for cname in t.column_names + ]) + def sort_by(self, sorting): """ Sort the table by one or multiple columns. diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index ba9523df0a2..9adf2cdc4f9 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1755,28 +1755,25 @@ def test_table_group_by(): ], names=["values", "keys"]) r = table.group_by("keys", ["values"], "hash_sum") - - r_asdict = {v["key_0"].as_py(): v["hash_sum"].as_py() for v in r} - assert r_asdict == {'a': 3, 'b': 7, 'c': 5} + assert r.to_pydict() == { + "key_0": ["a", "b", "c"], + "sum": [3, 7, 5] + } r = table.group_by("keys", ["values", "values"], [ "hash_sum", "hash_count"]) - assert sorted(r.to_pylist(), key=lambda x: x["key_0"]) == [ - {'hash_count': 2, - 'hash_sum': 3, - 'key_0': 'a'}, - {'hash_count': 2, - 'hash_sum': 7, - 'key_0': 'b'}, - {'hash_count': 1, - 'hash_sum': 5, - 'key_0': 'c'}, - ] + assert r.to_pydict() == { + "key_0": ["a", "b", "c"], + "sum": [3, 7, 5], + "count": [2, 2, 1] + } # Test without hash_ prefix r = table.group_by("keys", ["values"], "sum") - r_asdict = {v["key_0"].as_py(): v["hash_sum"].as_py() for v in r} - assert r_asdict == {'a': 3, 'b': 7, 'c': 5} + assert r.to_pydict() == { + "key_0": ["a", "b", "c"], + "sum": [3, 7, 5] + } def test_table_sort_by(): From 30b64fe45bc3318ab59ad39194b5a5a12226aba9 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 10 Nov 2021 17:43:08 +0100 Subject: [PATCH 11/26] Improve column names --- python/pyarrow/table.pxi | 14 ++++++++------ python/pyarrow/tests/test_table.py | 24 ++++++++++++++++-------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index c77d502c76c..b06a5faab1c 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2215,16 +2215,21 @@ cdef class Table(_PandasConvertible): if isinstance(aggregations, str): aggregations = [aggregations] - column_names = [] aggrs = [] for aggr in aggregations: if isinstance(aggr, str): aggr = (aggr, None) if not aggr[0].startswith("hash_"): aggr = ("hash_" + aggr[0], aggr[1]) - column_names.append(aggr[0]) aggrs.append(aggr) + # Build unique names for aggregation result columns + # so that it's obvious what they refer to. + column_names = [] + for value_name, (aggr_name, _) in zip(columns, aggrs): + column_names.append(aggr_name.replace("hash", value_name)) + column_names.append("key") + result = _pc()._group_by( [self[c] for c in columns], [self[key]], @@ -2232,10 +2237,7 @@ cdef class Table(_PandasConvertible): ) t = Table.from_batches([RecordBatch.from_struct_array(result)]) - return t.rename_columns([ - cname if cname not in column_names else cname.replace("hash_", "") - for cname in t.column_names - ]) + return t.rename_columns(column_names) def sort_by(self, sorting): """ diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 9adf2cdc4f9..5e5215c6ded 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1752,27 +1752,35 @@ def test_table_group_by(): table = pa.table([ pa.array([1, 2, 3, 4, 5]), pa.array(["a", "a", "b", "b", "c"]), - ], names=["values", "keys"]) + pa.array([10, 20, 30, 40, 50]) + ], names=["values", "keys", "bigvalues"]) r = table.group_by("keys", ["values"], "hash_sum") assert r.to_pydict() == { - "key_0": ["a", "b", "c"], - "sum": [3, 7, 5] + "key": ["a", "b", "c"], + "values_sum": [3, 7, 5] } r = table.group_by("keys", ["values", "values"], [ "hash_sum", "hash_count"]) assert r.to_pydict() == { - "key_0": ["a", "b", "c"], - "sum": [3, 7, 5], - "count": [2, 2, 1] + "key": ["a", "b", "c"], + "values_sum": [3, 7, 5], + "values_count": [2, 2, 1] } # Test without hash_ prefix r = table.group_by("keys", ["values"], "sum") assert r.to_pydict() == { - "key_0": ["a", "b", "c"], - "sum": [3, 7, 5] + "key": ["a", "b", "c"], + "values_sum": [3, 7, 5] + } + + r = table.group_by("keys", ["values", "bigvalues"], ["sum", "sum"]) + assert r.to_pydict() == { + "key": ["a", "b", "c"], + "values_sum": [3, 7, 5], + "bigvalues_sum": [30, 70, 50] } From 8ec31530df4150f698f822262daffe6952a10c8e Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Thu, 11 Nov 2021 10:26:27 +0100 Subject: [PATCH 12/26] missing assertions --- python/pyarrow/tests/test_table.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 5e5215c6ded..673849cd800 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1790,12 +1790,12 @@ def test_table_sort_by(): pa.array(["b", "a", "b", "a", "c"]), ], names=["values", "keys"]) - table.sort_by("values").to_pydict() == { + assert table.sort_by("values").to_pydict() == { "keys": ["a", "a", "b", "b", "c"], "values": [1, 2, 3, 4, 5] } - table.sort_by([("values", "descending")]).to_pydict() == { + assert table.sort_by([("values", "descending")]).to_pydict() == { "keys": ["c", "b", "b", "a", "a"], "values": [5, 4, 3, 2, 1] } From 25e6276c57c4296e79c749c6cfdc0bf8847ad133 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 12 Nov 2021 16:51:36 +0100 Subject: [PATCH 13/26] Add test with different order --- python/pyarrow/tests/test_table.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 673849cd800..b4a48480d7d 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1776,13 +1776,19 @@ def test_table_group_by(): "values_sum": [3, 7, 5] } - r = table.group_by("keys", ["values", "bigvalues"], ["sum", "sum"]) + r = table.group_by("keys", ["values", "bigvalues"], ["max", "sum"]) assert r.to_pydict() == { "key": ["a", "b", "c"], - "values_sum": [3, 7, 5], + "values_max": [2, 4, 5], "bigvalues_sum": [30, 70, 50] } + r = table.group_by("keys", ["bigvalues", "values"], ["max", "sum"]) + assert r.to_pydict() == { + "key": ["a", "b", "c"], + "values_sum": [3, 7, 5], + "bigvalues_max": [20, 40, 50] + } def test_table_sort_by(): table = pa.table([ From 94c64e43bad8f125d5957ef1c7dea5a43a89e3c5 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 12 Nov 2021 17:19:20 +0100 Subject: [PATCH 14/26] lint --- python/pyarrow/tests/test_table.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index b4a48480d7d..18552bd6c54 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1790,6 +1790,7 @@ def test_table_group_by(): "bigvalues_max": [20, 40, 50] } + def test_table_sort_by(): table = pa.table([ pa.array([3, 1, 4, 2, 5]), From 76fff1b6909abc23f60856f792ca9688d28a2d3c Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 12 Nov 2021 17:48:03 +0100 Subject: [PATCH 15/26] Sort aggregation keys in tests --- python/pyarrow/tests/test_table.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 18552bd6c54..81bf9d07707 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1749,6 +1749,18 @@ def test_table_select(): def test_table_group_by(): + def sorted_by_key(d): + # Ensure a guaranteed order of keys for aggregation results. + sorted_keys = sorted(d["key"]) + sorted_d = {"key": sorted_keys} + for entry in d: + if entry == "key": + continue + values = dict(zip(d["key"], d[entry])) + for k in sorted_keys: + sorted_d.setdefault(entry, []).append(values[k]) + return sorted_d + table = pa.table([ pa.array([1, 2, 3, 4, 5]), pa.array(["a", "a", "b", "b", "c"]), @@ -1756,14 +1768,14 @@ def test_table_group_by(): ], names=["values", "keys", "bigvalues"]) r = table.group_by("keys", ["values"], "hash_sum") - assert r.to_pydict() == { + assert sorted_by_key(r.to_pydict()) == { "key": ["a", "b", "c"], "values_sum": [3, 7, 5] } r = table.group_by("keys", ["values", "values"], [ "hash_sum", "hash_count"]) - assert r.to_pydict() == { + assert sorted_by_key(r.to_pydict()) == { "key": ["a", "b", "c"], "values_sum": [3, 7, 5], "values_count": [2, 2, 1] @@ -1771,20 +1783,20 @@ def test_table_group_by(): # Test without hash_ prefix r = table.group_by("keys", ["values"], "sum") - assert r.to_pydict() == { + assert sorted_by_key(r.to_pydict()) == { "key": ["a", "b", "c"], "values_sum": [3, 7, 5] } r = table.group_by("keys", ["values", "bigvalues"], ["max", "sum"]) - assert r.to_pydict() == { + assert sorted_by_key(r.to_pydict()) == { "key": ["a", "b", "c"], "values_max": [2, 4, 5], "bigvalues_sum": [30, 70, 50] } r = table.group_by("keys", ["bigvalues", "values"], ["max", "sum"]) - assert r.to_pydict() == { + assert sorted_by_key(r.to_pydict()) == { "key": ["a", "b", "c"], "values_sum": [3, 7, 5], "bigvalues_max": [20, 40, 50] From dfecba12901e6ff13181886b052164f734170d67 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 16 Nov 2021 15:03:17 +0100 Subject: [PATCH 16/26] Support multiple keys in grouping --- python/pyarrow/table.pxi | 14 +++++---- python/pyarrow/tests/test_table.py | 46 +++++++++++++++++++----------- 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index b06a5faab1c..1619f5cde43 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2192,14 +2192,14 @@ cdef class Table(_PandasConvertible): return table - def group_by(self, key, columns, aggregations): + def group_by(self, keys, columns, aggregations): """ Perform a group by aggregation over the columns of the table. Parameters ---------- - key : str - Name of the column that should be used as the grouping key. + keys : str or list[str] + Name of the columns that should be used as the grouping key. columns : list of str Names of the columns that contain values for the aggregations. aggregations : str or list[str] or list of tuple(str, FunctionOptions) @@ -2215,6 +2215,9 @@ cdef class Table(_PandasConvertible): if isinstance(aggregations, str): aggregations = [aggregations] + if isinstance(keys, str): + keys = [keys] + aggrs = [] for aggr in aggregations: if isinstance(aggr, str): @@ -2228,11 +2231,12 @@ cdef class Table(_PandasConvertible): column_names = [] for value_name, (aggr_name, _) in zip(columns, aggrs): column_names.append(aggr_name.replace("hash", value_name)) - column_names.append("key") + for key_name in keys: + column_names.append(key_name) result = _pc()._group_by( [self[c] for c in columns], - [self[key]], + [self[k] for k in keys], aggrs ) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 81bf9d07707..2622abddac6 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1749,59 +1749,71 @@ def test_table_select(): def test_table_group_by(): - def sorted_by_key(d): + def sorted_by_keys(d): # Ensure a guaranteed order of keys for aggregation results. - sorted_keys = sorted(d["key"]) - sorted_d = {"key": sorted_keys} + if "keys2" in d: + keys = tuple(zip(d["keys"], d["keys2"])) + else: + keys = d["keys"] + sorted_keys = sorted(keys) + sorted_d = {"keys": sorted(d["keys"])} for entry in d: - if entry == "key": + if entry == "keys": continue - values = dict(zip(d["key"], d[entry])) + values = dict(zip(keys, d[entry])) for k in sorted_keys: sorted_d.setdefault(entry, []).append(values[k]) return sorted_d table = pa.table([ - pa.array([1, 2, 3, 4, 5]), pa.array(["a", "a", "b", "b", "c"]), + pa.array(["X", "X", "Y", "Z", "Z"]), + pa.array([1, 2, 3, 4, 5]), pa.array([10, 20, 30, 40, 50]) - ], names=["values", "keys", "bigvalues"]) + ], names=["keys", "keys2", "values", "bigvalues"]) r = table.group_by("keys", ["values"], "hash_sum") - assert sorted_by_key(r.to_pydict()) == { - "key": ["a", "b", "c"], + assert sorted_by_keys(r.to_pydict()) == { + "keys": ["a", "b", "c"], "values_sum": [3, 7, 5] } r = table.group_by("keys", ["values", "values"], [ "hash_sum", "hash_count"]) - assert sorted_by_key(r.to_pydict()) == { - "key": ["a", "b", "c"], + assert sorted_by_keys(r.to_pydict()) == { + "keys": ["a", "b", "c"], "values_sum": [3, 7, 5], "values_count": [2, 2, 1] } # Test without hash_ prefix r = table.group_by("keys", ["values"], "sum") - assert sorted_by_key(r.to_pydict()) == { - "key": ["a", "b", "c"], + assert sorted_by_keys(r.to_pydict()) == { + "keys": ["a", "b", "c"], "values_sum": [3, 7, 5] } r = table.group_by("keys", ["values", "bigvalues"], ["max", "sum"]) - assert sorted_by_key(r.to_pydict()) == { - "key": ["a", "b", "c"], + assert sorted_by_keys(r.to_pydict()) == { + "keys": ["a", "b", "c"], "values_max": [2, 4, 5], "bigvalues_sum": [30, 70, 50] } r = table.group_by("keys", ["bigvalues", "values"], ["max", "sum"]) - assert sorted_by_key(r.to_pydict()) == { - "key": ["a", "b", "c"], + assert sorted_by_keys(r.to_pydict()) == { + "keys": ["a", "b", "c"], "values_sum": [3, 7, 5], "bigvalues_max": [20, 40, 50] } + r = table.group_by(["keys", "keys2"], ["values"], ["sum"]) + assert sorted_by_keys(r.to_pydict()) == { + "keys": ["a", "b", "b", "c"], + "keys2": ["X", "Y", "Z", "Z"], + "values_sum": [3, 3, 4, 5] + } + def test_table_sort_by(): table = pa.table([ From 111f2587d4af9c4190865f8d692e9c8692832306 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 17 Nov 2021 12:31:51 +0100 Subject: [PATCH 17/26] Multistep aggregation API --- python/pyarrow/table.pxi | 103 +++++++++++++++++------------ python/pyarrow/tests/test_table.py | 28 ++++++-- 2 files changed, 83 insertions(+), 48 deletions(-) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 1619f5cde43..367a87ed995 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2192,56 +2192,21 @@ cdef class Table(_PandasConvertible): return table - def group_by(self, keys, columns, aggregations): - """ - Perform a group by aggregation over the columns of the table. + def group_by(self, keys): + """Declare a grouping over the columns of the table. + + resulting grouping can then be used to perform aggregations. Parameters ---------- keys : str or list[str] Name of the columns that should be used as the grouping key. - columns : list of str - Names of the columns that contain values for the aggregations. - aggregations : str or list[str] or list of tuple(str, FunctionOptions) - Name of the hash aggregation function, or list of aggregation - function names or list of aggregation function names together - with their options. Returns ------- - Table - Results of the aggregation functions. + TableGroupBy """ - if isinstance(aggregations, str): - aggregations = [aggregations] - - if isinstance(keys, str): - keys = [keys] - - aggrs = [] - for aggr in aggregations: - if isinstance(aggr, str): - aggr = (aggr, None) - if not aggr[0].startswith("hash_"): - aggr = ("hash_" + aggr[0], aggr[1]) - aggrs.append(aggr) - - # Build unique names for aggregation result columns - # so that it's obvious what they refer to. - column_names = [] - for value_name, (aggr_name, _) in zip(columns, aggrs): - column_names.append(aggr_name.replace("hash", value_name)) - for key_name in keys: - column_names.append(key_name) - - result = _pc()._group_by( - [self[c] for c in columns], - [self[k] for k in keys], - aggrs - ) - - t = Table.from_batches([RecordBatch.from_struct_array(result)]) - return t.rename_columns(column_names) + return TableGroupBy(self, keys) def sort_by(self, sorting): """ @@ -2464,3 +2429,59 @@ def _from_pydict(cls, mapping, schema, metadata): return cls.from_arrays(arrays, schema=schema, metadata=metadata) else: raise TypeError('Schema must be an instance of pyarrow.Schema') + + +class TableGroupBy: + def __init__(self, table, keys): + if isinstance(keys, str): + keys = [keys] + + self._table = table + self.keys = keys + + def aggregate(self, aggregations): + """ + Perform an aggregation over the grouped columns of the table. + + Parameters + ---------- + aggregations : list[tuple(str, str)] or + list[tuple(str, str, FunctionOptions)] + List of tuples made of aggregation functions names followed + by column names and optionally aggregation function options. + + Returns + ------- + Table + Results of the aggregation functions. + """ + columns = [a[1] for a in aggregations] + aggrfuncs = [ + (a[0], a[2]) if len(a) > 2 else (a[0], None) + for a in aggregations + ] + + group_by_aggrs = [] + for aggr in aggrfuncs: + if isinstance(aggr, str): + aggr = (aggr, None) + if not aggr[0].startswith("hash_"): + aggr = ("hash_" + aggr[0], aggr[1]) + group_by_aggrs.append(aggr) + + # Build unique names for aggregation result columns + # so that it's obvious what they refer to. + column_names = [] + for value_name, (aggr_name, _) in zip(columns, group_by_aggrs): + column_names.append(aggr_name.replace("hash", value_name)) + for key_name in self.keys: + column_names.append(key_name) + + result = _pc()._group_by( + [self._table[c] for c in columns], + [self._table[k] for k in self.keys], + group_by_aggrs + ) + + t = Table.from_batches([RecordBatch.from_struct_array(result)]) + return t.rename_columns(column_names) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 2622abddac6..b65396c50c3 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1772,14 +1772,18 @@ def sorted_by_keys(d): pa.array([10, 20, 30, 40, 50]) ], names=["keys", "keys2", "values", "bigvalues"]) - r = table.group_by("keys", ["values"], "hash_sum") + r = table.group_by("keys").aggregate([ + ("hash_sum", "values") + ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], "values_sum": [3, 7, 5] } - r = table.group_by("keys", ["values", "values"], [ - "hash_sum", "hash_count"]) + r = table.group_by("keys").aggregate([ + ("hash_sum", "values"), + ("hash_count", "values") + ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], "values_sum": [3, 7, 5], @@ -1787,27 +1791,37 @@ def sorted_by_keys(d): } # Test without hash_ prefix - r = table.group_by("keys", ["values"], "sum") + r = table.group_by("keys").aggregate([ + ("sum", "values") + ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], "values_sum": [3, 7, 5] } - r = table.group_by("keys", ["values", "bigvalues"], ["max", "sum"]) + r = table.group_by("keys").aggregate([ + ("max", "values"), + ("sum", "bigvalues") + ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], "values_max": [2, 4, 5], "bigvalues_sum": [30, 70, 50] } - r = table.group_by("keys", ["bigvalues", "values"], ["max", "sum"]) + r = table.group_by("keys").aggregate([ + ("max", "bigvalues"), + ("sum", "values") + ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], "values_sum": [3, 7, 5], "bigvalues_max": [20, 40, 50] } - r = table.group_by(["keys", "keys2"], ["values"], ["sum"]) + r = table.group_by(["keys", "keys2"]).aggregate([ + ("sum", "values") + ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "b", "c"], "keys2": ["X", "Y", "Z", "Z"], From 1bc4fd4b76a9f5a71a18d428cfcb91519f352030 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 17 Nov 2021 15:18:10 +0100 Subject: [PATCH 18/26] Tweak docs --- docs/source/python/api/tables.rst | 1 + python/pyarrow/__init__.py | 2 +- python/pyarrow/table.pxi | 5 ++++- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/source/python/api/tables.rst b/docs/source/python/api/tables.rst index 6e7a3b6e155..4cdba24d6bf 100644 --- a/docs/source/python/api/tables.rst +++ b/docs/source/python/api/tables.rst @@ -43,6 +43,7 @@ Classes ChunkedArray RecordBatch Table + TableGroupBy .. _api.tensor: diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 1ec229d5381..f1e7ca90ad2 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -182,7 +182,7 @@ def show_versions(): from pyarrow._hdfsio import HdfsFile, have_libhdfs from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table, - concat_arrays, concat_tables) + concat_arrays, concat_tables, TableGroupBy) # Exceptions from pyarrow.lib import (ArrowCancelled, diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 367a87ed995..e889f43382d 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2432,6 +2432,9 @@ def _from_pydict(cls, mapping, schema, metadata): class TableGroupBy: + """ + A grouping of columns in a table on which to perform aggregations. + """ def __init__(self, table, keys): if isinstance(keys, str): keys = [keys] @@ -2445,7 +2448,7 @@ class TableGroupBy: Parameters ---------- - aggregations : list[tuple(str, str)] or + aggregations : list[tuple(str, str)] or\ list[tuple(str, str, FunctionOptions)] List of tuples made of aggregation functions names followed by column names and optionally aggregation function options. From 712dc94a6671634c1b1569303470a506a5c3d88a Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 17 Nov 2021 15:24:51 +0100 Subject: [PATCH 19/26] Tweak --- python/pyarrow/table.pxi | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index e889f43382d..52dae0238d5 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2474,11 +2474,10 @@ class TableGroupBy: # Build unique names for aggregation result columns # so that it's obvious what they refer to. - column_names = [] - for value_name, (aggr_name, _) in zip(columns, group_by_aggrs): - column_names.append(aggr_name.replace("hash", value_name)) - for key_name in self.keys: - column_names.append(key_name) + column_names = [ + aggr_name.replace("hash", col_name) + for col_name, (aggr_name, _) in zip(columns, group_by_aggrs) + ] + self.keys result = _pc()._group_by( [self._table[c] for c in columns], From c9bd87de34c96d4fe66e86f8321269a80c279349 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 17 Nov 2021 15:55:03 +0100 Subject: [PATCH 20/26] lint --- python/pyarrow/table.pxi | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 52dae0238d5..f0c80b3504b 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2435,6 +2435,7 @@ class TableGroupBy: """ A grouping of columns in a table on which to perform aggregations. """ + def __init__(self, table, keys): if isinstance(keys, str): keys = [keys] From 73a60d479f5eb6f2576bf97c7ccb1b02c5dc494f Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Thu, 18 Nov 2021 11:40:22 +0100 Subject: [PATCH 21/26] stray if --- python/pyarrow/table.pxi | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index f0c80b3504b..7b9b641b383 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2223,7 +2223,7 @@ cdef class Table(_PandasConvertible): Returns ------- Table - A new table sorted according to the sort key. + A new table sorted according to the sort keys. """ if isinstance(sorting, str): sorting = [(sorting, "ascending")] @@ -2467,8 +2467,6 @@ class TableGroupBy: group_by_aggrs = [] for aggr in aggrfuncs: - if isinstance(aggr, str): - aggr = (aggr, None) if not aggr[0].startswith("hash_"): aggr = ("hash_" + aggr[0], aggr[1]) group_by_aggrs.append(aggr) From 0126d5c7b29285ffba9622014915fdc7e9664617 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 19 Nov 2021 10:25:15 +0100 Subject: [PATCH 22/26] Update python/pyarrow/table.pxi Co-authored-by: Joris Van den Bossche --- python/pyarrow/table.pxi | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 7b9b641b383..6394088396e 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2449,8 +2449,8 @@ class TableGroupBy: Parameters ---------- - aggregations : list[tuple(str, str)] or\ - list[tuple(str, str, FunctionOptions)] + aggregations : list[tuple(str, str)] or \ +list[tuple(str, str, FunctionOptions)] List of tuples made of aggregation functions names followed by column names and optionally aggregation function options. From 990e6747e47705ff84626c679cda766b3473a2fa Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 19 Nov 2021 11:12:30 +0100 Subject: [PATCH 23/26] Reverse arguments order --- python/pyarrow/table.pxi | 8 ++++---- python/pyarrow/tests/test_table.py | 18 +++++++++--------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 6394088396e..c3d21d9f6d2 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2451,17 +2451,17 @@ class TableGroupBy: ---------- aggregations : list[tuple(str, str)] or \ list[tuple(str, str, FunctionOptions)] - List of tuples made of aggregation functions names followed - by column names and optionally aggregation function options. + List of tuples made of aggregation column names followed + by function names and optionally aggregation function options. Returns ------- Table Results of the aggregation functions. """ - columns = [a[1] for a in aggregations] + columns = [a[0] for a in aggregations] aggrfuncs = [ - (a[0], a[2]) if len(a) > 2 else (a[0], None) + (a[1], a[2]) if len(a) > 2 else (a[1], None) for a in aggregations ] diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index b65396c50c3..f30f40e2beb 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -1773,7 +1773,7 @@ def sorted_by_keys(d): ], names=["keys", "keys2", "values", "bigvalues"]) r = table.group_by("keys").aggregate([ - ("hash_sum", "values") + ("values", "hash_sum") ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], @@ -1781,8 +1781,8 @@ def sorted_by_keys(d): } r = table.group_by("keys").aggregate([ - ("hash_sum", "values"), - ("hash_count", "values") + ("values", "hash_sum"), + ("values", "hash_count") ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], @@ -1792,7 +1792,7 @@ def sorted_by_keys(d): # Test without hash_ prefix r = table.group_by("keys").aggregate([ - ("sum", "values") + ("values", "sum") ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], @@ -1800,8 +1800,8 @@ def sorted_by_keys(d): } r = table.group_by("keys").aggregate([ - ("max", "values"), - ("sum", "bigvalues") + ("values", "max"), + ("bigvalues", "sum") ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], @@ -1810,8 +1810,8 @@ def sorted_by_keys(d): } r = table.group_by("keys").aggregate([ - ("max", "bigvalues"), - ("sum", "values") + ("bigvalues", "max"), + ("values", "sum") ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], @@ -1820,7 +1820,7 @@ def sorted_by_keys(d): } r = table.group_by(["keys", "keys2"]).aggregate([ - ("sum", "values") + ("values", "sum") ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "b", "c"], From 3e06ddff9d35463927477399c41202cbe1dfa7b1 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Fri, 19 Nov 2021 11:23:06 +0100 Subject: [PATCH 24/26] Add example --- python/pyarrow/table.pxi | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index c3d21d9f6d2..66b4b6c7e92 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2458,6 +2458,20 @@ list[tuple(str, str, FunctionOptions)] ------- Table Results of the aggregation functions. + + Example + ------- + >>> t = pa.table([ + ... pa.array(["a", "a", "b", "b", "c"]), + ... pa.array([1, 2, 3, 4, 5]), + ... ], names=["keys", "values"]) + >>> t.group_by("keys").aggregate([("values", "sum")]) + pyarrow.Table + values_sum: int64 + keys: string + ---- + values_sum: [[3,7,5]] + keys: [["a","b","c"]] """ columns = [a[0] for a in aggregations] aggrfuncs = [ From 45af47857a4daa1e4a247d33bf909cd4ce5c25bf Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 23 Nov 2021 12:39:45 +0100 Subject: [PATCH 25/26] Apply suggestions from code review Co-authored-by: Joris Van den Bossche --- python/pyarrow/table.pxi | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 66b4b6c7e92..d55f41d9efb 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -2195,7 +2195,8 @@ cdef class Table(_PandasConvertible): def group_by(self, keys): """Declare a grouping over the columns of the table. - resulting grouping can then be used to perform aggregations. + Resulting grouping can then be used to perform aggregations + with a subsequent ``aggregate()`` method. Parameters ---------- @@ -2205,6 +2206,10 @@ cdef class Table(_PandasConvertible): Returns ------- TableGroupBy + + See Also + -------- + TableGroupBy.aggregate """ return TableGroupBy(self, keys) @@ -2215,7 +2220,7 @@ cdef class Table(_PandasConvertible): Parameters ---------- sorting : str or list[tuple(name, order)] - Name of the column to use to sort, or + Name of the column to use to sort (ascending), or a list of multiple sorting conditions where each entry is a tuple with column name and sorting order ("ascending" or "descending") From be8201494d1981cb677dc1b313602a1121882772 Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Tue, 23 Nov 2021 14:03:45 +0100 Subject: [PATCH 26/26] Add test with options --- python/pyarrow/tests/test_table.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index f30f40e2beb..dd0ce074af3 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -24,6 +24,7 @@ import numpy as np import pytest import pyarrow as pa +import pyarrow.compute as pc def test_chunked_array_basics(): @@ -1828,6 +1829,35 @@ def sorted_by_keys(d): "values_sum": [3, 3, 4, 5] } + table_with_nulls = pa.table([ + pa.array(["a", "a", "a"]), + pa.array([1, None, None]) + ], names=["keys", "values"]) + + r = table_with_nulls.group_by(["keys"]).aggregate([ + ("values", "count", pc.CountOptions(mode="all")) + ]) + assert r.to_pydict() == { + "keys": ["a"], + "values_count": [3] + } + + r = table_with_nulls.group_by(["keys"]).aggregate([ + ("values", "count", pc.CountOptions(mode="only_null")) + ]) + assert r.to_pydict() == { + "keys": ["a"], + "values_count": [2] + } + + r = table_with_nulls.group_by(["keys"]).aggregate([ + ("values", "count", pc.CountOptions(mode="only_valid")) + ]) + assert r.to_pydict() == { + "keys": ["a"], + "values_count": [1] + } + def test_table_sort_by(): table = pa.table([