Skip to content
Merged
71 changes: 46 additions & 25 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
from pandas.core.window.numba_ import generate_numba_apply_func

if TYPE_CHECKING:
from pandas import Series
from pandas import DataFrame, Series
from pandas.core.internals import Block # noqa:F401


def calculate_center_offset(window) -> int:
Expand Down Expand Up @@ -418,35 +419,40 @@ def _wrap_results(self, results, obj, skipped: List[int]) -> FrameOrSeriesUnion:
for i in skipped:
exclude.extend(orig_blocks[i].columns)

kept_blocks = [blk for i, blk in enumerate(orig_blocks) if i not in skipped]

final = []
for result, block in zip(results, kept_blocks):

result = type(obj)(result, index=obj.index, columns=block.columns)
final.append(result)

exclude = exclude or []
columns = [c for c in self._selected_obj.columns if c not in exclude]
if not columns and not len(final) and exclude:
if not columns and not len(results) and exclude:
raise DataError("No numeric types to aggregate")
elif not len(final):
elif not len(results):
return obj.astype("float64")

df = concat(final, axis=1).reindex(columns=columns, copy=False)
df = concat(results, axis=1).reindex(columns=columns, copy=False)
self._insert_on_column(df, obj)
return df

def _insert_on_column(self, result: "DataFrame", obj: "DataFrame"):
# if we have an 'on' column we want to put it back into
# the results in the same location
from pandas import Series

if self.on is not None and not self._on.equals(obj.index):
name = self._on.name
extra_col = Series(self._on, index=obj.index, name=name)
if name not in df.columns and name not in df.index.names:
new_loc = len(df.columns)
df.insert(new_loc, name, extra_col)
elif name in df.columns:
if name in result.columns:
# TODO: sure we want to overwrite results?
df[name] = extra_col
return df
result[name] = extra_col
elif name in result.index.names:
pass
elif name in self._selected_obj.columns:
# insert in the same location as we had in _selected_obj
old_cols = self._selected_obj.columns
new_cols = result.columns
old_loc = old_cols.get_loc(name)
overlap = new_cols.intersection(old_cols[:old_loc])
new_loc = len(overlap)
result.insert(new_loc, name, extra_col)
else:
# insert at the end
result[name] = extra_col

def _center_window(self, result, window) -> np.ndarray:
"""
Expand Down Expand Up @@ -530,21 +536,36 @@ def _apply_blockwise(
# This isn't quite blockwise, since `blocks` is actually a collection
# of homogenenous DataFrames.
blocks, obj = self._create_blocks(self._selected_obj)
mgr = obj._mgr

def hfunc(bvalues: ArrayLike) -> ArrayLike:
# TODO(EA2D): getattr unnecessary with 2D EAs
values = self._prep_values(getattr(bvalues, "T", bvalues))
res_values = homogeneous_func(values)
return getattr(res_values, "T", res_values)

skipped: List[int] = []
results: List[ArrayLike] = []
for i, b in enumerate(blocks):
res_blocks: List["Block"] = []
for i, blk in enumerate(mgr.blocks):
try:
values = self._prep_values(b.values)
nbs = blk.apply(hfunc)

except (TypeError, NotImplementedError):
skipped.append(i)
continue

result = homogeneous_func(values)
results.append(result)
res_blocks.extend(nbs)

if not len(res_blocks) and skipped:
raise DataError("No numeric types to aggregate")
elif not len(res_blocks):
return obj.astype("float64")

return self._wrap_results(results, obj, skipped)
new_cols = mgr.reset_dropped_locs(res_blocks, skipped)
new_mgr = type(mgr).from_blocks(res_blocks, [new_cols, obj.index])
out = obj._constructor(new_mgr)
self._insert_on_column(out, obj)
return out

def _apply(
self,
Expand Down