From ea0bd1ee30ca1356287c3eb386df9ecc60b0ab4c Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 18 Jan 2024 14:33:28 +0100 Subject: [PATCH 01/12] Adjust aggregate --- dask_expr/_groupby.py | 66 ++++++++++++++++++++++++++++++++++------ dask_expr/_reductions.py | 10 +++++- 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index c3851210d..c68cbec1c 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -27,6 +27,7 @@ _cumcount_aggregate, _determine_levels, _groupby_aggregate, + _groupby_aggregate_spec, _groupby_apply_funcs, _groupby_get_group, _groupby_slice_apply, @@ -34,6 +35,7 @@ _groupby_slice_transform, _head_aggregate, _head_chunk, + _non_agg_chunk, _normalize_spec, _nunique_df_chunk, _nunique_df_combine, @@ -295,7 +297,6 @@ class GroupbyAggregation(GroupByApplyConcatApply, GroupByBase): "shuffle_method": None, "_slice": None, } - chunk = staticmethod(_groupby_apply_funcs) @functools.cached_property def spec(self): @@ -329,26 +330,52 @@ def spec(self): else: raise ValueError(f"aggregate on unknown object {self.frame._meta}") - # Median not supported yet - has_median = any(s[1] in ("median", np.median) for s in spec) - if has_median: - raise NotImplementedError("median not yet supported") + return spec + @functools.cached_property + def agg_args(self): keys = ["chunk_funcs", "aggregate_funcs", "finalizers"] - return dict(zip(keys, _build_agg_args(spec))) + return dict(zip(keys, _build_agg_args(self.spec))) + + @functools.cached_property + def has_median(self): + return any(s[1] in ("median", np.median) for s in self.spec) + + @property + def should_shuffle(self): + return self.has_median or super().should_shuffle + + @classmethod + def chunk(cls, df, *by, **kwargs): + if kwargs.pop("has_median"): + return _non_agg_chunk(df, *by, **kwargs) + return _groupby_apply_funcs(df, *by, **kwargs) @classmethod def combine(cls, inputs, **kwargs): + if kwargs.pop("has_median"): + return _groupby_aggregate_spec(_concat(inputs), **kwargs) return _groupby_apply_funcs(_concat(inputs), **kwargs) @classmethod def aggregate(cls, inputs, **kwargs): + if kwargs.pop("has_median"): + return _groupby_aggregate_spec(_concat(inputs), **kwargs) return _agg_finalize(_concat(inputs), **kwargs) @property def chunk_kwargs(self) -> dict: + if self.has_median: + return { + "has_median": self.has_median, + "by": self._by_columns, + "key": list(set(self.frame.columns) - set(self._by_columns)), + **_as_dict("observed", self.observed), + **_as_dict("dropna", self.dropna), + } return { - "funcs": self.spec["chunk_funcs"], + "has_median": self.has_median, + "funcs": self.agg_args["chunk_funcs"], "sort": self.sort, **_as_dict("observed", self.observed), **_as_dict("dropna", self.dropna), @@ -356,8 +383,17 @@ def chunk_kwargs(self) -> dict: @property def combine_kwargs(self) -> dict: + if self.has_median: + return { + "has_median": self.has_median, + "spec": self.arg, + "levels": _determine_levels(self.by), + **_as_dict("observed", self.observed), + **_as_dict("dropna", self.dropna), + } return { - "funcs": self.spec["aggregate_funcs"], + "has_median": self.has_median, + "funcs": self.agg_args["aggregate_funcs"], "level": self.levels, "sort": self.sort, **_as_dict("observed", self.observed), @@ -366,9 +402,19 @@ def combine_kwargs(self) -> dict: @property def aggregate_kwargs(self) -> dict: + if self.has_median: + return { + "has_median": self.has_median, + "spec": self.arg, + "levels": _determine_levels(self.by), + **_as_dict("observed", self.observed), + **_as_dict("dropna", self.dropna), + } + return { - "aggregate_funcs": self.spec["aggregate_funcs"], - "finalize_funcs": self.spec["finalizers"], + "has_median": self.has_median, + "aggregate_funcs": self.agg_args["aggregate_funcs"], + "finalize_funcs": self.agg_args["finalizers"], "level": self.levels, "sort": self.sort, **_as_dict("observed", self.observed), diff --git a/dask_expr/_reductions.py b/dask_expr/_reductions.py index 7a87d5c3c..dcb4a82bc 100644 --- a/dask_expr/_reductions.py +++ b/dask_expr/_reductions.py @@ -447,6 +447,14 @@ def _divisions(self): def _chunk_cls_args(self): return [] + @property + def should_shuffle(self): + # TODO: Make this an actual default param? + sort = getattr(self, "sort", False) + return not ( + not isinstance(self.split_out, bool) and self.split_out == 1 or sort + ) + def _lower(self): # Normalize functions in case not all are defined chunk = self.chunk @@ -470,7 +478,7 @@ def _lower(self): chunked = self._chunk_cls( self.frame, type(self), chunk, chunk_kwargs, *self._chunk_cls_args ) - if not isinstance(self.split_out, bool) and self.split_out == 1 or sort: + if not self.should_shuffle: # Lower into TreeReduce(Chunk) return TreeReduce( chunked, From eb63cf7a24ffc5bafd0ced977c0becc5dad9039c Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 18 Jan 2024 15:19:57 +0100 Subject: [PATCH 02/12] Fixed column order --- dask_expr/_groupby.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index c68cbec1c..84f83910d 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -369,7 +369,9 @@ def chunk_kwargs(self) -> dict: return { "has_median": self.has_median, "by": self._by_columns, - "key": list(set(self.frame.columns) - set(self._by_columns)), + "key": [ + col for col in self.frame.columns if col not in self._by_columns + ], **_as_dict("observed", self.observed), **_as_dict("dropna", self.dropna), } From b9fcb7e0ca3424f24e0cc1333bedaafa4ff6af4f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 19 Jan 2024 10:16:31 +0100 Subject: [PATCH 03/12] Refactor --- dask_expr/_groupby.py | 163 ++++++++++++++++++++------------ dask_expr/tests/test_groupby.py | 1 + 2 files changed, 105 insertions(+), 59 deletions(-) diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index 84f83910d..dba9d8648 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -251,10 +251,10 @@ def _simplify_up(self, parent, dependents): return groupby_projection(self, parent, dependents) -class GroupbyAggregation(GroupByApplyConcatApply, GroupByBase): - """General groupby aggregation +class GroupbyAggregationBase(GroupByBase): + """Base class for groupby aggregation - This class can be used directly to perform a general + This class can be subclassed to perform a general groupby aggregation by passing in a `str`, `list` or `dict`-based specification using the `arg` operand. @@ -271,10 +271,6 @@ class GroupbyAggregation(GroupByApplyConcatApply, GroupByBase): Passed through to dataframe backend. dropna: Whether rows with NA values should be dropped. - chunk_kwargs: - Key-word arguments to pass to `groupby_chunk`. - aggregate_kwargs: - Key-word arguments to pass to `aggregate_chunk`. """ _parameters = [ @@ -298,6 +294,16 @@ class GroupbyAggregation(GroupByApplyConcatApply, GroupByBase): "_slice": None, } + @functools.cached_property + def _meta(self): + meta = meta_nonempty(self.frame._meta) + meta = meta.groupby( + self._by_meta, + **_as_dict("observed", self.observed), + **_as_dict("dropna", self.dropna), + ).aggregate(self.arg) + return make_meta(meta) + @functools.cached_property def spec(self): # Converts the `arg` operand into specific @@ -337,46 +343,116 @@ def agg_args(self): keys = ["chunk_funcs", "aggregate_funcs", "finalizers"] return dict(zip(keys, _build_agg_args(self.spec))) + def _simplify_down(self): + # Use agg-spec information to add column projection + column_projection = None + if isinstance(self.arg, dict): + column_projection = ( + set(self._by_columns) + .union(self.arg.keys()) + .intersection(self.frame.columns) + ) + if column_projection and column_projection < set(self.frame.columns): + return type(self)(self.frame[list(column_projection)], *self.operands[1:]) + + +class GroupbyAggregation(Expr, GroupbyAggregationBase): @functools.cached_property - def has_median(self): - return any(s[1] in ("median", np.median) for s in self.spec) + def _is_decomposable(self): + return not any(s[1] in ("median", np.median) for s in self.spec) + + def _lower(self): + cls = ( + DecomposableGroupbyAggregation + if self._is_decomposable + else HolisticGroupbyAggregation + ) + return cls( + self.frame, + self.arg, + self.observed, + self.dropna, + self.split_every, + self.split_out, + self.sort, + self.shuffle_method, + self._slice, + *self.by, + ) + + +class HolisticGroupbyAggregation(GroupByApplyConcatApply, GroupbyAggregationBase): + """Groupby aggregation for both decomposable and non-decomposable aggregates + + This class always calculates the aggregates by first collecting all the data for + the groups and then aggregating at once. + """ + + chunk = staticmethod(_non_agg_chunk) @property def should_shuffle(self): - return self.has_median or super().should_shuffle + return True @classmethod def chunk(cls, df, *by, **kwargs): - if kwargs.pop("has_median"): - return _non_agg_chunk(df, *by, **kwargs) - return _groupby_apply_funcs(df, *by, **kwargs) + return _non_agg_chunk(df, *by, **kwargs) + + @classmethod + def combine(cls, inputs, **kwargs): + return _groupby_aggregate_spec(_concat(inputs), **kwargs) + + @classmethod + def aggregate(cls, inputs, **kwargs): + return _groupby_aggregate_spec(_concat(inputs), **kwargs) + + @property + def chunk_kwargs(self) -> dict: + return { + "by": self._by_columns, + "key": [col for col in self.frame.columns if col not in self._by_columns], + **_as_dict("observed", self.observed), + **_as_dict("dropna", self.dropna), + } + + @property + def combine_kwargs(self) -> dict: + return { + "spec": self.arg, + "levels": _determine_levels(self.by), + **_as_dict("observed", self.observed), + **_as_dict("dropna", self.dropna), + } + + @property + def aggregate_kwargs(self) -> dict: + return { + "spec": self.arg, + "levels": _determine_levels(self.by), + **_as_dict("observed", self.observed), + **_as_dict("dropna", self.dropna), + } + + +class DecomposableGroupbyAggregation(GroupbyAggregationBase): + """Groupby aggregation for decomposable aggregates + + The results may be calculated via tree or shuffle reduction. + """ + + chunk = staticmethod(_groupby_apply_funcs) @classmethod def combine(cls, inputs, **kwargs): - if kwargs.pop("has_median"): - return _groupby_aggregate_spec(_concat(inputs), **kwargs) return _groupby_apply_funcs(_concat(inputs), **kwargs) @classmethod def aggregate(cls, inputs, **kwargs): - if kwargs.pop("has_median"): - return _groupby_aggregate_spec(_concat(inputs), **kwargs) return _agg_finalize(_concat(inputs), **kwargs) @property def chunk_kwargs(self) -> dict: - if self.has_median: - return { - "has_median": self.has_median, - "by": self._by_columns, - "key": [ - col for col in self.frame.columns if col not in self._by_columns - ], - **_as_dict("observed", self.observed), - **_as_dict("dropna", self.dropna), - } return { - "has_median": self.has_median, "funcs": self.agg_args["chunk_funcs"], "sort": self.sort, **_as_dict("observed", self.observed), @@ -385,16 +461,7 @@ def chunk_kwargs(self) -> dict: @property def combine_kwargs(self) -> dict: - if self.has_median: - return { - "has_median": self.has_median, - "spec": self.arg, - "levels": _determine_levels(self.by), - **_as_dict("observed", self.observed), - **_as_dict("dropna", self.dropna), - } return { - "has_median": self.has_median, "funcs": self.agg_args["aggregate_funcs"], "level": self.levels, "sort": self.sort, @@ -404,17 +471,7 @@ def combine_kwargs(self) -> dict: @property def aggregate_kwargs(self) -> dict: - if self.has_median: - return { - "has_median": self.has_median, - "spec": self.arg, - "levels": _determine_levels(self.by), - **_as_dict("observed", self.observed), - **_as_dict("dropna", self.dropna), - } - return { - "has_median": self.has_median, "aggregate_funcs": self.agg_args["aggregate_funcs"], "finalize_funcs": self.agg_args["finalizers"], "level": self.levels, @@ -423,18 +480,6 @@ def aggregate_kwargs(self) -> dict: **_as_dict("dropna", self.dropna), } - def _simplify_down(self): - # Use agg-spec information to add column projection - column_projection = None - if isinstance(self.arg, dict): - column_projection = ( - set(self._by_columns) - .union(self.arg.keys()) - .intersection(self.frame.columns) - ) - if column_projection and column_projection < set(self.frame.columns): - return type(self)(self.frame[list(column_projection)], *self.operands[1:]) - class Sum(SingleAggregation): groupby_chunk = M.sum diff --git a/dask_expr/tests/test_groupby.py b/dask_expr/tests/test_groupby.py index 810fcd265..d495fd904 100644 --- a/dask_expr/tests/test_groupby.py +++ b/dask_expr/tests/test_groupby.py @@ -238,6 +238,7 @@ def test_dataframe_aggregations_multilevel(df, pdf): {"x": ["sum", "mean"]}, ["min", "mean"], "sum", + "median", ], ) def test_groupby_agg(pdf, df, spec): From fda7135ac4801c86fcc95424d772aeacc4013562 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 19 Jan 2024 10:17:59 +0100 Subject: [PATCH 04/12] Docstring --- dask_expr/_groupby.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index dba9d8648..fbf390d65 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -357,6 +357,12 @@ def _simplify_down(self): class GroupbyAggregation(Expr, GroupbyAggregationBase): + """Logical groupby aggregation class + + This class lowers itself to concrete implementations for decomposable + or holistic aggregations. + """ + @functools.cached_property def _is_decomposable(self): return not any(s[1] in ("median", np.median) for s in self.spec) From b5db3dfc4ad48e739ea29d0a1c5e95bb8f7364f7 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 19 Jan 2024 10:33:04 +0100 Subject: [PATCH 05/12] Fix --- dask_expr/_groupby.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index fbf390d65..c70bcec7f 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -251,7 +251,7 @@ def _simplify_up(self, parent, dependents): return groupby_projection(self, parent, dependents) -class GroupbyAggregationBase(GroupByBase): +class GroupbyAggregationBase(GroupByApplyConcatApply, GroupByBase): """Base class for groupby aggregation This class can be subclassed to perform a general @@ -356,7 +356,7 @@ def _simplify_down(self): return type(self)(self.frame[list(column_projection)], *self.operands[1:]) -class GroupbyAggregation(Expr, GroupbyAggregationBase): +class GroupbyAggregation(GroupbyAggregationBase): """Logical groupby aggregation class This class lowers itself to concrete implementations for decomposable @@ -387,7 +387,7 @@ def _lower(self): ) -class HolisticGroupbyAggregation(GroupByApplyConcatApply, GroupbyAggregationBase): +class HolisticGroupbyAggregation(GroupbyAggregationBase): """Groupby aggregation for both decomposable and non-decomposable aggregates This class always calculates the aggregates by first collecting all the data for From 3c4cb84a005cc79ff0b19b84838ba09c73ceb66d Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 19 Jan 2024 10:34:23 +0100 Subject: [PATCH 06/12] Remove comment --- dask_expr/_reductions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_expr/_reductions.py b/dask_expr/_reductions.py index dcb4a82bc..fddd52511 100644 --- a/dask_expr/_reductions.py +++ b/dask_expr/_reductions.py @@ -449,7 +449,6 @@ def _chunk_cls_args(self): @property def should_shuffle(self): - # TODO: Make this an actual default param? sort = getattr(self, "sort", False) return not ( not isinstance(self.split_out, bool) and self.split_out == 1 or sort From cdd51ddc30017a5b0f4324d907307f15f24dd497 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 19 Jan 2024 14:29:26 +0100 Subject: [PATCH 07/12] Fix --- dask_expr/_groupby.py | 29 +++++++---------------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index c70bcec7f..78c5a8cf8 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -301,7 +301,10 @@ def _meta(self): self._by_meta, **_as_dict("observed", self.observed), **_as_dict("dropna", self.dropna), - ).aggregate(self.arg) + ) + if self._slice is not None: + meta = meta[self._slice] + meta = meta.aggregate(self.arg) return make_meta(meta) @functools.cached_property @@ -479,7 +482,10 @@ def combine_kwargs(self) -> dict: def aggregate_kwargs(self) -> dict: return { "aggregate_funcs": self.agg_args["aggregate_funcs"], + "arg": self.arg, + "columns": self._slice, "finalize_funcs": self.agg_args["finalizers"], + "is_series": self._meta.ndim == 1, "level": self.levels, "sort": self.sort, **_as_dict("observed", self.observed), @@ -1880,27 +1886,6 @@ def __init__( obj, by=by, slice=slice, observed=observed, dropna=dropna, sort=sort ) - def aggregate(self, arg=None, split_every=8, split_out=1, **kwargs): - result = super().aggregate( - arg=arg, split_every=split_every, split_out=split_out - ) - if self._slice: - try: - result = result[self._slice] - except KeyError: - pass - - if ( - arg is not None - and not isinstance(arg, (list, dict)) - and is_dataframe_like(result._meta) - ): - result = result[result.columns[0]] - - return result - - agg = aggregate - def idxmin( self, split_every=None, split_out=1, skipna=True, numeric_only=False, **kwargs ): From 568b790df95e780ec64a488859068b1e5c4525b2 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 29 Jan 2024 09:38:47 +0000 Subject: [PATCH 08/12] Wipe cache and rerun --- .github/workflows/test.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 55cb0bd88..2e3d8b108 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -44,7 +44,7 @@ jobs: # Wipe cache every 24 hours or whenever environment.yml changes. This means it # may take up to a day before changes to unpinned packages are picked up. # To force a cache refresh, change the hardcoded numerical suffix below. - cache-environment-key: environment-${{ steps.date.outputs.date }}-0 + cache-environment-key: environment-${{ steps.date.outputs.date }}-1 - name: Install dask-expr run: python -m pip install -e . --no-deps From b48349317e1cfaae2643e1650b7cc08dc73482c4 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 29 Jan 2024 11:00:06 +0100 Subject: [PATCH 09/12] Move sort --- dask_expr/_reductions.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dask_expr/_reductions.py b/dask_expr/_reductions.py index fddd52511..32e47af95 100644 --- a/dask_expr/_reductions.py +++ b/dask_expr/_reductions.py @@ -472,7 +472,6 @@ def _lower(self): combine = aggregate combine_kwargs = aggregate_kwargs - sort = getattr(self, "sort", False) split_every = getattr(self, "split_every", None) chunked = self._chunk_cls( self.frame, type(self), chunk, chunk_kwargs, *self._chunk_cls_args @@ -503,7 +502,7 @@ def _lower(self): split_by=self.split_by, split_out=self.split_out, split_every=split_every, - sort=sort, + sort=getattr(self, "sort", False), shuffle_by_index=getattr(self, "shuffle_by_index", None), shuffle_method=getattr(self, "shuffle_method", None), ignore_index=getattr(self, "ignore_index", True), From d453b65106eb89e5573fd20b4f2083d3327470fb Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 29 Jan 2024 11:07:41 +0100 Subject: [PATCH 10/12] Fixed column ordering in column projection --- dask_expr/_groupby.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index 78c5a8cf8..faf488b30 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -348,15 +348,18 @@ def agg_args(self): def _simplify_down(self): # Use agg-spec information to add column projection - column_projection = None + required_columns = None if isinstance(self.arg, dict): - column_projection = ( + required_columns = ( set(self._by_columns) .union(self.arg.keys()) .intersection(self.frame.columns) ) - if column_projection and column_projection < set(self.frame.columns): - return type(self)(self.frame[list(column_projection)], *self.operands[1:]) + column_projection = [ + column for column in self.frame.columns if column in required_columns + ] + if column_projection and column_projection != self.frame.columns: + return type(self)(self.frame[column_projection], *self.operands[1:]) class GroupbyAggregation(GroupbyAggregationBase): From 5ee68877c6904b6e98a20892385dbab780bf8ccc Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 29 Jan 2024 11:08:59 +0100 Subject: [PATCH 11/12] Minor --- dask_expr/_groupby.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index faf488b30..a6084f88e 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -348,7 +348,7 @@ def agg_args(self): def _simplify_down(self): # Use agg-spec information to add column projection - required_columns = None + column_projection = None if isinstance(self.arg, dict): required_columns = ( set(self._by_columns) From 10f09f87b6bab04d6e49a7f92f5e15b31c72e137 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 29 Jan 2024 11:11:05 +0100 Subject: [PATCH 12/12] Reduce cyclomatic complexity --- dask_expr/_groupby.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index a6084f88e..e162eaf8f 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -347,18 +347,19 @@ def agg_args(self): return dict(zip(keys, _build_agg_args(self.spec))) def _simplify_down(self): + if not isinstance(self.arg, dict): + return + # Use agg-spec information to add column projection - column_projection = None - if isinstance(self.arg, dict): - required_columns = ( - set(self._by_columns) - .union(self.arg.keys()) - .intersection(self.frame.columns) - ) - column_projection = [ - column for column in self.frame.columns if column in required_columns - ] - if column_projection and column_projection != self.frame.columns: + required_columns = ( + set(self._by_columns) + .union(self.arg.keys()) + .intersection(self.frame.columns) + ) + column_projection = [ + column for column in self.frame.columns if column in required_columns + ] + if column_projection != self.frame.columns: return type(self)(self.frame[column_projection], *self.operands[1:])