|
12 | 12 |
|
13 | 13 | from pandas._libs.tslibs import BaseOffset, to_offset |
14 | 14 | import pandas._libs.window.aggregations as window_aggregations |
15 | | -from pandas._typing import Axis, FrameOrSeries, Scalar |
| 15 | +from pandas._typing import ArrayLike, Axis, FrameOrSeries, Scalar |
16 | 16 | from pandas.compat._optional import import_optional_dependency |
17 | 17 | from pandas.compat.numpy import function as nv |
18 | 18 | from pandas.util._decorators import Appender, Substitution, cache_readonly, doc |
@@ -487,6 +487,38 @@ def _get_window_indexer(self, window: int) -> BaseIndexer: |
487 | 487 | return VariableWindowIndexer(index_array=self._on.asi8, window_size=window) |
488 | 488 | return FixedWindowIndexer(window_size=window) |
489 | 489 |
|
| 490 | + def _apply_blockwise( |
| 491 | + self, homogeneous_func: Callable[..., ArrayLike] |
| 492 | + ) -> FrameOrSeries: |
| 493 | + """ |
| 494 | + Apply the given function to the DataFrame broken down into homogeneous |
| 495 | + sub-frames. |
| 496 | + """ |
| 497 | + # This isn't quite blockwise, since `blocks` is actually a collection |
| 498 | + # of homogenenous DataFrames. |
| 499 | + blocks, obj = self._create_blocks(self._selected_obj) |
| 500 | + |
| 501 | + skipped: List[int] = [] |
| 502 | + results: List[ArrayLike] = [] |
| 503 | + exclude: List[Scalar] = [] |
| 504 | + for i, b in enumerate(blocks): |
| 505 | + try: |
| 506 | + values = self._prep_values(b.values) |
| 507 | + |
| 508 | + except (TypeError, NotImplementedError) as err: |
| 509 | + if isinstance(obj, ABCDataFrame): |
| 510 | + skipped.append(i) |
| 511 | + exclude.extend(b.columns) |
| 512 | + continue |
| 513 | + else: |
| 514 | + raise DataError("No numeric types to aggregate") from err |
| 515 | + |
| 516 | + result = homogeneous_func(values) |
| 517 | + results.append(result) |
| 518 | + |
| 519 | + block_list = [blk for i, blk in enumerate(blocks) if i not in skipped] |
| 520 | + return self._wrap_results(results, block_list, obj, exclude) |
| 521 | + |
490 | 522 | def _apply( |
491 | 523 | self, |
492 | 524 | func: Callable, |
@@ -524,30 +556,14 @@ def _apply( |
524 | 556 | """ |
525 | 557 | win_type = self._get_win_type(kwargs) |
526 | 558 | window = self._get_window(win_type=win_type) |
527 | | - |
528 | | - blocks, obj = self._create_blocks(self._selected_obj) |
529 | | - block_list = list(blocks) |
530 | 559 | window_indexer = self._get_window_indexer(window) |
531 | 560 |
|
532 | | - results = [] |
533 | | - exclude: List[Scalar] = [] |
534 | | - for i, b in enumerate(blocks): |
535 | | - try: |
536 | | - values = self._prep_values(b.values) |
537 | | - |
538 | | - except (TypeError, NotImplementedError) as err: |
539 | | - if isinstance(obj, ABCDataFrame): |
540 | | - exclude.extend(b.columns) |
541 | | - del block_list[i] |
542 | | - continue |
543 | | - else: |
544 | | - raise DataError("No numeric types to aggregate") from err |
| 561 | + def homogeneous_func(values: np.ndarray): |
| 562 | + # calculation function |
545 | 563 |
|
546 | 564 | if values.size == 0: |
547 | | - results.append(values.copy()) |
548 | | - continue |
| 565 | + return values.copy() |
549 | 566 |
|
550 | | - # calculation function |
551 | 567 | offset = calculate_center_offset(window) if center else 0 |
552 | 568 | additional_nans = np.array([np.nan] * offset) |
553 | 569 |
|
@@ -594,9 +610,9 @@ def calc(x): |
594 | 610 | if center: |
595 | 611 | result = self._center_window(result, window) |
596 | 612 |
|
597 | | - results.append(result) |
| 613 | + return result |
598 | 614 |
|
599 | | - return self._wrap_results(results, block_list, obj, exclude) |
| 615 | + return self._apply_blockwise(homogeneous_func) |
600 | 616 |
|
601 | 617 | def aggregate(self, func, *args, **kwargs): |
602 | 618 | result, how = self._aggregate(func, *args, **kwargs) |
|
0 commit comments