From 6b382a310ff8719285c892fc33872bcb9a249c1a Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Mon, 31 Jan 2022 17:22:16 -0600 Subject: [PATCH 01/10] Add split/rsplit; Need to refactor regex --- sdks/python/apache_beam/dataframe/frames.py | 83 +++++++++++++++++-- .../dataframe/pandas_doctests_test.py | 12 ++- .../apache_beam/dataframe/transforms_test.py | 53 +++++++++++- 3 files changed, 139 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 29b93cfcc91c..c6aeb67e861a 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -4721,13 +4721,84 @@ def repeat(self, repeats): pd.core.strings.StringMethods, 'get_dummies', reason='non-deferred-columns') - split = frame_base.wont_implement_method( - pd.core.strings.StringMethods, 'split', - reason='non-deferred-columns') + # TODO: Add for pandas 1.4.0 + # def _split_helper( + # self, + # rsplit=False, + # pat=None, + # expand=False, + # regex=None, + # **kwargs + # ): + def _split_helper(self, rsplit=False, pat=None, expand=False, **kwargs): + # Not creating separate columns + if not expand: + proxy = self._expr.proxy() + else: + # Creating separate columns, so data type is more strict + dtype = self._expr.proxy().dtype + if not isinstance(dtype, pd.CategoricalDtype): + method_name = 'rsplit' if rsplit else 'split' + raise frame_base.WontImplementError( + method_name + "() of non-categorical type is not supported because " + "the type of the output column depends on the data. Please use " + "pd.CategoricalDtype with explicit categories.", + reason="non-deferred-columns") - rsplit = frame_base.wont_implement_method( - pd.core.strings.StringMethods, 'rsplit', - reason='non-deferred-columns') + split_cats = [ + cat.split(sep=kwargs.get('pat'), maxsplit=kwargs.get('n', -1)) + for cat in dtype.categories + ] + + # TODO: Replace for pandas 1.4.0 + # Treat pat as literal string + # if not regex or (regex is None and len(pat) == 1): + # split_cats = [ + # cat.split( + # sep=kwargs.get('pat'), + # maxsplit=kwargs.get('n', -1) + # ) for cat in dtype.categories + # ] + # Treat pat as regex + # else: + # split_cats = [ + # re.split( + # pattern=pat, + # string=cat, + # maxsplit=kwargs.get('n', 0) + # ) for cat in dtype.categories + # ] + + max_splits = len(max(split_cats, key=len)) + proxy = pd.DataFrame(columns=range(max_splits)) + + func = lambda s: proxy.combine_first( + (s.str.split(pat=pat, expand=expand, **kwargs) + if not rsplit else s.str.rsplit(pat=pat, expand=expand, **kwargs)) + ) + + return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( + 'split', + func, + [self._expr], + proxy=proxy, + requires_partition_by=partitionings.Arbitrary(), + preserves_partition_by=partitionings.Arbitrary())) + + @frame_base.with_docs_from(pd.core.strings.StringMethods) + @frame_base.args_to_kwargs(pd.core.strings.StringMethods) + # TODO: Add for pandas 1.4.0 + # def split(self, pat=None, expand=False, regex=None, **kwargs): + def split(self, pat=None, expand=False, **kwargs): + return self._split_helper(rsplit=False, pat=pat, expand=expand, **kwargs) + + @frame_base.with_docs_from(pd.core.strings.StringMethods) + @frame_base.args_to_kwargs(pd.core.strings.StringMethods) + # TODO: Add for pandas 1.4.0 + # def rsplit(self, pat=None, expand=False, regex=None, **kwargs): + def rsplit(self, pat=None, expand=False, **kwargs): + return self._split_helper(rsplit=True, pat=pat, expand=expand, **kwargs) ELEMENTWISE_STRING_METHODS = [ diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py index 99b64c03d2d0..d96ac7fb7f26 100644 --- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py +++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py @@ -584,8 +584,16 @@ def test_string_tests(self): f'{module_name}.StringMethods.get_dummies': ['*'], f'{module_name}.str_get_dummies': ['*'], f'{module_name}.StringMethods': ['s.str.split("_")'], - f'{module_name}.StringMethods.rsplit': ['*'], - f'{module_name}.StringMethods.split': ['*'], + f'{module_name}.StringMethods.rsplit': [ + "s.str.split(expand=True)", + """s.str.split(r"\\+|=", expand=True)""", + """s.str.rsplit("/", n=1, expand=True)""", + ], + f'{module_name}.StringMethods.split': [ + "s.str.split(expand=True)", + """s.str.split(r"\\+|=", expand=True)""", + """s.str.rsplit("/", n=1, expand=True)""" + ], }, skip={ # count() on Series with a NaN produces mismatched type if we diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index cd76f07133fc..c6e87f2b1037 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -17,7 +17,10 @@ import typing import unittest +import numpy as np import pandas as pd +# TODO: Add for pandas 1.4.0 +# import re import apache_beam as beam from apache_beam import coders @@ -52,6 +55,7 @@ def check_correct(expected, actual): def concat(parts): + # import pdb; pdb.set_trace() if len(parts) > 1: return pd.concat(parts) elif len(parts) == 1: @@ -81,7 +85,7 @@ def run_scenario(self, input, func): input_deferred = frame_base.DeferredFrame.wrap(input_placeholder) actual_deferred = func(input_deferred)._expr.evaluate_at( expressions.Session({input_placeholder: input})) - + # import pdb; pdb.set_trace() check_correct(expected, actual_deferred) with beam.Pipeline() as p: @@ -349,6 +353,53 @@ def test_rename(self): 0: 2, 2: 0 }, errors='raise')) + def test_split(self): + s = pd.Series([ + "this is a regular sentence", + "https://docs.python.org/3/tutorial/index.html", + np.nan + ]) + self.run_scenario(s, lambda s: s.str.split()) + self.run_scenario(s, lambda s: s.str.rsplit()) + self.run_scenario(s, lambda s: s.str.split(n=2)) + self.run_scenario(s, lambda s: s.str.rsplit(n=2)) + self.run_scenario(s, lambda s: s.str.split(pat="/")) + + # When expand=True, there is exception because series is not categorical + with self.assertRaisesRegex( + frame_base.WontImplementError, + r"split\(\) of non-categorical type is not supported"): + self.run_scenario(s, lambda s: s.str.split(expand=True)) + with self.assertRaisesRegex( + frame_base.WontImplementError, + r"rsplit\(\) of non-categorical type is not supported"): + self.run_scenario(s, lambda s: s.str.rsplit(expand=True)) + + # When expand=True, and series is categorical type + s = s.astype('category') + self.run_scenario(s, lambda s: s.str.split(expand=True)) + self.run_scenario(s, lambda s: s.str.rsplit("/", n=1, expand=True)) + + # TODO: Remote this example for pandas 1.4.0 + s = pd.Series(["1+1=2"]).astype('category') + self.run_scenario(s, lambda s: s.str.rsplit(r"\+|=", expand=True)) + + # TODO: Test below examples for pandas 1.4.0 + # When expand=True and testing regex + # s = pd.Series(["foo and bar plus baz"]).astype('category') + # self.run_scenario(s, lambda s: s.str.split(r"and|plus", expand=True)) + + # s = pd.Series(['foojpgbar.jpg']).astype('category') + # self.run_scenario(s, lambda s: s.str.split(r".", expand=True)) + # self.run_scenario(s, lambda s: s.str.split(r".", expand=True)) + + # self.run_scenario(s, + # lambda s: s.str.split(r"\.jpg", regex=True, expand=True)) + # self.run_scenario(s, + # lambda s: s.str.split(re.compile(r"\.jpg"), expand=True)) + # self.run_scenario(s, + # lambda s: s.str.split(r"\.jpg", regex=False, expand=True)) + class FusionTest(unittest.TestCase): @staticmethod From 5f6123b2da525d218a98eba2a239f9fd7ef2f0d4 Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Mon, 14 Feb 2022 07:09:26 -0600 Subject: [PATCH 02/10] Support Regex; Refactor tests --- sdks/python/apache_beam/dataframe/frames.py | 88 +++++++------- .../apache_beam/dataframe/frames_test.py | 112 ++++++++++++++++++ .../dataframe/pandas_doctests_test.py | 12 +- .../apache_beam/dataframe/transforms_test.py | 50 -------- 4 files changed, 157 insertions(+), 105 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index c6aeb67e861a..1f1fceea38ba 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -4721,21 +4721,17 @@ def repeat(self, repeats): pd.core.strings.StringMethods, 'get_dummies', reason='non-deferred-columns') - # TODO: Add for pandas 1.4.0 - # def _split_helper( - # self, - # rsplit=False, - # pat=None, - # expand=False, - # regex=None, - # **kwargs - # ): - def _split_helper(self, rsplit=False, pat=None, expand=False, **kwargs): - # Not creating separate columns + def _split_helper( + self, rsplit=False, pat=None, expand=False, regex=None, **kwargs): if not expand: + # Not creating separate columns proxy = self._expr.proxy() + func = lambda s: pd.concat([proxy, + (s.str.split(pat=pat, expand=expand, regex=regex, **kwargs) + if not rsplit else s.str.rsplit(pat=pat, expand=expand, **kwargs))] + ) else: - # Creating separate columns, so data type is more strict + # Creating separate columns, so must be more strict on dtype dtype = self._expr.proxy().dtype if not isinstance(dtype, pd.CategoricalDtype): method_name = 'rsplit' if rsplit else 'split' @@ -4745,37 +4741,32 @@ def _split_helper(self, rsplit=False, pat=None, expand=False, **kwargs): "pd.CategoricalDtype with explicit categories.", reason="non-deferred-columns") - split_cats = [ - cat.split(sep=kwargs.get('pat'), maxsplit=kwargs.get('n', -1)) - for cat in dtype.categories - ] - - # TODO: Replace for pandas 1.4.0 - # Treat pat as literal string - # if not regex or (regex is None and len(pat) == 1): - # split_cats = [ - # cat.split( - # sep=kwargs.get('pat'), - # maxsplit=kwargs.get('n', -1) - # ) for cat in dtype.categories - # ] - # Treat pat as regex - # else: - # split_cats = [ - # re.split( - # pattern=pat, - # string=cat, - # maxsplit=kwargs.get('n', 0) - # ) for cat in dtype.categories - # ] + if regex is False or ( + regex is None and isinstance(pat, str) and len(pat) == 1): + # Treat pat as literal string + split_cats = [ + cat.split( + sep=kwargs.get('pat'), + maxsplit=kwargs.get('n', -1) + ) for cat in dtype.categories + ] + else: + # Treat pat as regex + split_cats = [ + re.split( + pattern=pat, + string=cat, + maxsplit=kwargs.get('n', 0) + ) for cat in dtype.categories + ] max_splits = len(max(split_cats, key=len)) proxy = pd.DataFrame(columns=range(max_splits)) - func = lambda s: proxy.combine_first( - (s.str.split(pat=pat, expand=expand, **kwargs) - if not rsplit else s.str.rsplit(pat=pat, expand=expand, **kwargs)) - ) + func = lambda s: pd.concat([proxy, + (s.str.split(pat=pat, expand=expand, regex=regex, **kwargs) + if not rsplit else s.str.rsplit(pat=pat, expand=expand, **kwargs))] + ).replace(np.nan, value=None) return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( @@ -4788,16 +4779,23 @@ def _split_helper(self, rsplit=False, pat=None, expand=False, **kwargs): @frame_base.with_docs_from(pd.core.strings.StringMethods) @frame_base.args_to_kwargs(pd.core.strings.StringMethods) - # TODO: Add for pandas 1.4.0 - # def split(self, pat=None, expand=False, regex=None, **kwargs): - def split(self, pat=None, expand=False, **kwargs): - return self._split_helper(rsplit=False, pat=pat, expand=expand, **kwargs) + def split(self, pat=None, expand=False, regex=None, **kwargs): + """ + Like other non-deferred methods, dtype must be CategoricalDtype. + One exception is when ``expand`` is ``False``. Because we are not + creating new columns at construction time dtype can be `str`. + """ + return self._split_helper( + rsplit=False, pat=pat, expand=expand, regex=regex, **kwargs) @frame_base.with_docs_from(pd.core.strings.StringMethods) @frame_base.args_to_kwargs(pd.core.strings.StringMethods) - # TODO: Add for pandas 1.4.0 - # def rsplit(self, pat=None, expand=False, regex=None, **kwargs): def rsplit(self, pat=None, expand=False, **kwargs): + """ + Like other non-deferred methods, dtype must be CategoricalDtype. + One exception is when ``expand`` is ``False``. Because we are not + creating new columns at construction time dtype can be `str`. + """ return self._split_helper(rsplit=True, pat=pat, expand=expand, **kwargs) diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index 6b48e31128de..adeffd2fa4ee 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -18,6 +18,7 @@ import numpy as np import pandas as pd +import re from parameterized import parameterized import apache_beam as beam @@ -2247,6 +2248,117 @@ def test_sample_with_weights_distribution(self): expected = num_samples * target_prob self.assertTrue(expected / 3 < result < expected * 2, (expected, result)) + def test_split_pandas_examples_no_expand(self): + # if expand=False (default), then no need to cast dtype to be + # CategoricalDtype. + s = pd.Series([ + "this is a regular sentence", + "https://docs.python.org/3/tutorial/index.html", + np.nan + ]) + result = self._evaluate(lambda s: s.str.split(), s) + self.assert_frame_data_equivalent(result, s.str.split()) + + result = self._evaluate(lambda s: s.str.rsplit(), s) + self.assert_frame_data_equivalent(result, s.str.rsplit()) + + result = self._evaluate(lambda s: s.str.split(n=2), s) + self.assert_frame_data_equivalent(result, s.str.split(n=2)) + + result = self._evaluate(lambda s: s.str.rsplit(n=2), s) + self.assert_frame_data_equivalent(result, s.str.rsplit(n=2)) + + result = self._evaluate(lambda s: s.str.split(pat="/"), s) + self.assert_frame_data_equivalent(result, s.str.split(pat="/")) + + def test_split_pandas_examples_expand_not_categorical(self): + # When expand=True, there is exception because series is not categorical + s = pd.Series([ + "this is a regular sentence", + "https://docs.python.org/3/tutorial/index.html", + np.nan + ]) + with self.assertRaisesRegex( + frame_base.WontImplementError, + r"split\(\) of non-categorical type is not supported"): + self._evaluate(lambda s: s.str.split(expand=True), s) + + with self.assertRaisesRegex( + frame_base.WontImplementError, + r"rsplit\(\) of non-categorical type is not supported"): + self._evaluate(lambda s: s.str.rsplit(expand=True), s) + + def test_split_pandas_examples_expand_pat_is_string_literal1(self): + # When expand=True and pattern is treated as a string literal + s = pd.Series([ + "this is a regular sentence", + "https://docs.python.org/3/tutorial/index.html", + np.nan + ]) + s = s.astype( + pd.CategoricalDtype( + categories=[ + 'this is a regular sentence', + 'https://docs.python.org/3/tutorial/index.html' + ])) + result = self._evaluate(lambda s: s.str.split(expand=True), s) + self.assert_frame_data_equivalent(result, s.str.split(expand=True)) + + result = self._evaluate(lambda s: s.str.rsplit("/", n=1, expand=True), s) + self.assert_frame_data_equivalent( + result, s.str.rsplit("/", n=1, expand=True)) + + @unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4") + def test_split_pandas_examples_expand_pat_is_string_literal2(self): + # when regex is None (default) regex pat is string literal if len(pat) == 1 + s = pd.Series(['foojpgbar.jpg']).astype('category') + s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"])) + result = self._evaluate(lambda s: s.str.split(r".", expand=True), s) + self.assert_frame_data_equivalent(result, s.str.split(r".", expand=True)) + + # When regex=False, pat is interpreted as the string itself + result = self._evaluate( + lambda s: s.str.split(r"\.jpg", regex=False, expand=True), s) + self.assert_frame_data_equivalent( + result, s.str.split(r"\.jpg", regex=False, expand=True)) + + @unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4") + def test_split_pandas_examples_expand_pat_is_regex(self): + # when regex is None (default) regex pat is compiled if len(pat) != 1 + s = pd.Series(["foo and bar plus baz"]) + s = s.astype(pd.CategoricalDtype(categories=["foo and bar plus baz"])) + result = self._evaluate(lambda s: s.str.split(r"and|plus", expand=True), s) + self.assert_frame_data_equivalent( + result, s.str.split(r"and|plus", expand=True)) + + s = pd.Series(['foojpgbar.jpg']).astype('category') + s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"])) + result = self._evaluate(lambda s: s.str.split(r"\.jpg", expand=True), s) + self.assert_frame_data_equivalent( + result, s.str.split(r"\.jpg", expand=True)) + + # When regex=True, pat is interpreted as a regex + result = self._evaluate( + lambda s: s.str.split(r"\.jpg", regex=True, expand=True), s) + self.assert_frame_data_equivalent( + result, s.str.split(r"\.jpg", regex=True, expand=True)) + + # A compiled regex can be passed as pat + result = self._evaluate( + lambda s: s.str.split(re.compile(r"\.jpg"), expand=True), s) + self.assert_frame_data_equivalent( + result, s.str.split(re.compile(r"\.jpg"), expand=True)) + + @unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4") + def test_split_pat_is_regex(self): + # My own one: regex, but expand=False + s = pd.Series(['foojpgbar.jpg']).astype('category') + s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"])) + result = self._evaluate( + lambda s: s.str.split(r"\.jpg", regex=True, expand=False), s) + self.assert_frame_data_equivalent( + result, s.str.split(r"\.jpg", regex=True, expand=False)) + class AllowNonParallelTest(unittest.TestCase): def _use_non_parallel_operation(self): diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py index d96ac7fb7f26..99b64c03d2d0 100644 --- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py +++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py @@ -584,16 +584,8 @@ def test_string_tests(self): f'{module_name}.StringMethods.get_dummies': ['*'], f'{module_name}.str_get_dummies': ['*'], f'{module_name}.StringMethods': ['s.str.split("_")'], - f'{module_name}.StringMethods.rsplit': [ - "s.str.split(expand=True)", - """s.str.split(r"\\+|=", expand=True)""", - """s.str.rsplit("/", n=1, expand=True)""", - ], - f'{module_name}.StringMethods.split': [ - "s.str.split(expand=True)", - """s.str.split(r"\\+|=", expand=True)""", - """s.str.rsplit("/", n=1, expand=True)""" - ], + f'{module_name}.StringMethods.rsplit': ['*'], + f'{module_name}.StringMethods.split': ['*'], }, skip={ # count() on Series with a NaN produces mismatched type if we diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index c6e87f2b1037..48ba7202015c 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -17,10 +17,7 @@ import typing import unittest -import numpy as np import pandas as pd -# TODO: Add for pandas 1.4.0 -# import re import apache_beam as beam from apache_beam import coders @@ -353,53 +350,6 @@ def test_rename(self): 0: 2, 2: 0 }, errors='raise')) - def test_split(self): - s = pd.Series([ - "this is a regular sentence", - "https://docs.python.org/3/tutorial/index.html", - np.nan - ]) - self.run_scenario(s, lambda s: s.str.split()) - self.run_scenario(s, lambda s: s.str.rsplit()) - self.run_scenario(s, lambda s: s.str.split(n=2)) - self.run_scenario(s, lambda s: s.str.rsplit(n=2)) - self.run_scenario(s, lambda s: s.str.split(pat="/")) - - # When expand=True, there is exception because series is not categorical - with self.assertRaisesRegex( - frame_base.WontImplementError, - r"split\(\) of non-categorical type is not supported"): - self.run_scenario(s, lambda s: s.str.split(expand=True)) - with self.assertRaisesRegex( - frame_base.WontImplementError, - r"rsplit\(\) of non-categorical type is not supported"): - self.run_scenario(s, lambda s: s.str.rsplit(expand=True)) - - # When expand=True, and series is categorical type - s = s.astype('category') - self.run_scenario(s, lambda s: s.str.split(expand=True)) - self.run_scenario(s, lambda s: s.str.rsplit("/", n=1, expand=True)) - - # TODO: Remote this example for pandas 1.4.0 - s = pd.Series(["1+1=2"]).astype('category') - self.run_scenario(s, lambda s: s.str.rsplit(r"\+|=", expand=True)) - - # TODO: Test below examples for pandas 1.4.0 - # When expand=True and testing regex - # s = pd.Series(["foo and bar plus baz"]).astype('category') - # self.run_scenario(s, lambda s: s.str.split(r"and|plus", expand=True)) - - # s = pd.Series(['foojpgbar.jpg']).astype('category') - # self.run_scenario(s, lambda s: s.str.split(r".", expand=True)) - # self.run_scenario(s, lambda s: s.str.split(r".", expand=True)) - - # self.run_scenario(s, - # lambda s: s.str.split(r"\.jpg", regex=True, expand=True)) - # self.run_scenario(s, - # lambda s: s.str.split(re.compile(r"\.jpg"), expand=True)) - # self.run_scenario(s, - # lambda s: s.str.split(r"\.jpg", regex=False, expand=True)) - class FusionTest(unittest.TestCase): @staticmethod From 04378512dfe531aece0c162c32885edd9fca1c49 Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Mon, 14 Feb 2022 07:16:45 -0600 Subject: [PATCH 03/10] Remove debugger --- sdks/python/apache_beam/dataframe/transforms_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index 48ba7202015c..cd76f07133fc 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -52,7 +52,6 @@ def check_correct(expected, actual): def concat(parts): - # import pdb; pdb.set_trace() if len(parts) > 1: return pd.concat(parts) elif len(parts) == 1: @@ -82,7 +81,7 @@ def run_scenario(self, input, func): input_deferred = frame_base.DeferredFrame.wrap(input_placeholder) actual_deferred = func(input_deferred)._expr.evaluate_at( expressions.Session({input_placeholder: input})) - # import pdb; pdb.set_trace() + check_correct(expected, actual_deferred) with beam.Pipeline() as p: From 43ff9b85abcb8aa6670dc3c016b05e0bc0069c2a Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Mon, 14 Feb 2022 08:57:36 -0600 Subject: [PATCH 04/10] fix grammar --- sdks/python/apache_beam/dataframe/frames.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 1f1fceea38ba..ebd29a02b4c8 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -4783,7 +4783,7 @@ def split(self, pat=None, expand=False, regex=None, **kwargs): """ Like other non-deferred methods, dtype must be CategoricalDtype. One exception is when ``expand`` is ``False``. Because we are not - creating new columns at construction time dtype can be `str`. + creating new columns at construction time, dtype can be `str`. """ return self._split_helper( rsplit=False, pat=pat, expand=expand, regex=regex, **kwargs) @@ -4794,7 +4794,7 @@ def rsplit(self, pat=None, expand=False, **kwargs): """ Like other non-deferred methods, dtype must be CategoricalDtype. One exception is when ``expand`` is ``False``. Because we are not - creating new columns at construction time dtype can be `str`. + creating new columns at construction time, dtype can be `str`. """ return self._split_helper(rsplit=True, pat=pat, expand=expand, **kwargs) From 20fda68d6a95811356985a5eda891946cb79589f Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Mon, 14 Feb 2022 14:28:28 -0600 Subject: [PATCH 05/10] Fix passing regex arg --- sdks/python/apache_beam/dataframe/frames.py | 19 ++++++++++++++----- .../apache_beam/dataframe/frames_test.py | 2 +- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index ebd29a02b4c8..bceef52889e4 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -4723,12 +4723,19 @@ def repeat(self, repeats): def _split_helper( self, rsplit=False, pat=None, expand=False, regex=None, **kwargs): + + # Adding arguments to kwargs. regex introduced in pandas 1.4 + # but only for split, not rsplit + kwargs['pat'] = pat + kwargs['expand'] = expand + if PD_VERSION >= (1, 4) and not rsplit: + kwargs['regex'] = regex + if not expand: # Not creating separate columns proxy = self._expr.proxy() func = lambda s: pd.concat([proxy, - (s.str.split(pat=pat, expand=expand, regex=regex, **kwargs) - if not rsplit else s.str.rsplit(pat=pat, expand=expand, **kwargs))] + (s.str.split(**kwargs) if not rsplit else s.str.rsplit(**kwargs))] ) else: # Creating separate columns, so must be more strict on dtype @@ -4742,7 +4749,10 @@ def _split_helper( reason="non-deferred-columns") if regex is False or ( - regex is None and isinstance(pat, str) and len(pat) == 1): + regex is None and ( + (not pat) or (isinstance(pat, str) and len(pat) == 1) + ) + ): # Treat pat as literal string split_cats = [ cat.split( @@ -4764,8 +4774,7 @@ def _split_helper( proxy = pd.DataFrame(columns=range(max_splits)) func = lambda s: pd.concat([proxy, - (s.str.split(pat=pat, expand=expand, regex=regex, **kwargs) - if not rsplit else s.str.rsplit(pat=pat, expand=expand, **kwargs))] + (s.str.split(**kwargs) if not rsplit else s.str.rsplit(**kwargs))] ).replace(np.nan, value=None) return frame_base.DeferredFrame.wrap( diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index adeffd2fa4ee..0a51c1247ebb 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -2351,7 +2351,7 @@ def test_split_pandas_examples_expand_pat_is_regex(self): @unittest.skipIf(PD_VERSION < (1, 4), "regex arg is new in pandas 1.4") def test_split_pat_is_regex(self): - # My own one: regex, but expand=False + # regex=True, but expand=False s = pd.Series(['foojpgbar.jpg']).astype('category') s = s.astype(pd.CategoricalDtype(categories=["foojpgbar.jpg"])) result = self._evaluate( From a08094c89f28de2bf9a40b683cdb61c52f4bbc26 Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Wed, 16 Feb 2022 15:01:19 -0600 Subject: [PATCH 06/10] Reorder imports --- sdks/python/apache_beam/dataframe/frames_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index 0a51c1247ebb..0cff789bcccf 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -14,11 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re import unittest import numpy as np import pandas as pd -import re from parameterized import parameterized import apache_beam as beam From c64e42c2292b17b2625a6cb7a06656246db67b61 Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Thu, 3 Mar 2022 14:29:29 -0600 Subject: [PATCH 07/10] Address PR comments; Simplify kwargs --- sdks/python/apache_beam/dataframe/frames.py | 48 ++++++++++++--------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index bceef52889e4..11a2bf046033 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -4721,33 +4721,35 @@ def repeat(self, repeats): pd.core.strings.StringMethods, 'get_dummies', reason='non-deferred-columns') - def _split_helper( - self, rsplit=False, pat=None, expand=False, regex=None, **kwargs): + def _split_helper(self, rsplit=False, **kwargs): + pat = kwargs.get('pat', None) + expand = kwargs.get('expand', False) + regex = kwargs.get('regex', None) - # Adding arguments to kwargs. regex introduced in pandas 1.4 - # but only for split, not rsplit - kwargs['pat'] = pat - kwargs['expand'] = expand - if PD_VERSION >= (1, 4) and not rsplit: - kwargs['regex'] = regex + # regex introduced in pandas 1.4 but only for split, not + # rsplit, so removing it from kwargs + if PD_VERSION < (1, 4) and not rsplit: + kwargs.pop('regex', None) if not expand: # Not creating separate columns proxy = self._expr.proxy() - func = lambda s: pd.concat([proxy, - (s.str.split(**kwargs) if not rsplit else s.str.rsplit(**kwargs))] - ) + if not rsplit: + func = lambda s: pd.concat([proxy, s.str.split(**kwargs)]) + else: + func = lambda s: pd.concat([proxy, s.str.rsplit(**kwargs)]) else: # Creating separate columns, so must be more strict on dtype dtype = self._expr.proxy().dtype if not isinstance(dtype, pd.CategoricalDtype): method_name = 'rsplit' if rsplit else 'split' raise frame_base.WontImplementError( - method_name + "() of non-categorical type is not supported because " + f"{method_name}() of non-categorical type is not supported because " "the type of the output column depends on the data. Please use " "pd.CategoricalDtype with explicit categories.", reason="non-deferred-columns") + # Split (if applicable) the categories if regex is False or ( regex is None and ( (not pat) or (isinstance(pat, str) and len(pat) == 1) @@ -4770,12 +4772,17 @@ def _split_helper( ) for cat in dtype.categories ] + # Count the number of new columns to create for proxy max_splits = len(max(split_cats, key=len)) proxy = pd.DataFrame(columns=range(max_splits)) - func = lambda s: pd.concat([proxy, - (s.str.split(**kwargs) if not rsplit else s.str.rsplit(**kwargs))] - ).replace(np.nan, value=None) + def func(s): + if not rsplit: + result = s.str.split(**kwargs) + else: + result = s.str.rsplit(**kwargs) + result[~result.isna()].replace(np.nan, value=None) + return result return frame_base.DeferredFrame.wrap( expressions.ComputedExpression( @@ -4788,24 +4795,25 @@ def _split_helper( @frame_base.with_docs_from(pd.core.strings.StringMethods) @frame_base.args_to_kwargs(pd.core.strings.StringMethods) - def split(self, pat=None, expand=False, regex=None, **kwargs): + @frame_base.populate_defaults(pd.core.strings.StringMethods) + def split(self, **kwargs): """ Like other non-deferred methods, dtype must be CategoricalDtype. One exception is when ``expand`` is ``False``. Because we are not creating new columns at construction time, dtype can be `str`. """ - return self._split_helper( - rsplit=False, pat=pat, expand=expand, regex=regex, **kwargs) + return self._split_helper(rsplit=False, **kwargs) @frame_base.with_docs_from(pd.core.strings.StringMethods) @frame_base.args_to_kwargs(pd.core.strings.StringMethods) - def rsplit(self, pat=None, expand=False, **kwargs): + @frame_base.populate_defaults(pd.core.strings.StringMethods) + def rsplit(self, **kwargs): """ Like other non-deferred methods, dtype must be CategoricalDtype. One exception is when ``expand`` is ``False``. Because we are not creating new columns at construction time, dtype can be `str`. """ - return self._split_helper(rsplit=True, pat=pat, expand=expand, **kwargs) + return self._split_helper(rsplit=True, **kwargs) ELEMENTWISE_STRING_METHODS = [ From 92df15af9cae43447ad3fb79115661afa6120076 Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Thu, 3 Mar 2022 17:14:28 -0600 Subject: [PATCH 08/10] Simplify getting columns for split_cat --- sdks/python/apache_beam/dataframe/frames.py | 31 ++------------------- 1 file changed, 2 insertions(+), 29 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 11a2bf046033..d87c2dfdcddb 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -4722,14 +4722,7 @@ def repeat(self, repeats): reason='non-deferred-columns') def _split_helper(self, rsplit=False, **kwargs): - pat = kwargs.get('pat', None) expand = kwargs.get('expand', False) - regex = kwargs.get('regex', None) - - # regex introduced in pandas 1.4 but only for split, not - # rsplit, so removing it from kwargs - if PD_VERSION < (1, 4) and not rsplit: - kwargs.pop('regex', None) if not expand: # Not creating separate columns @@ -4749,28 +4742,8 @@ def _split_helper(self, rsplit=False, **kwargs): "pd.CategoricalDtype with explicit categories.", reason="non-deferred-columns") - # Split (if applicable) the categories - if regex is False or ( - regex is None and ( - (not pat) or (isinstance(pat, str) and len(pat) == 1) - ) - ): - # Treat pat as literal string - split_cats = [ - cat.split( - sep=kwargs.get('pat'), - maxsplit=kwargs.get('n', -1) - ) for cat in dtype.categories - ] - else: - # Treat pat as regex - split_cats = [ - re.split( - pattern=pat, - string=cat, - maxsplit=kwargs.get('n', 0) - ) for cat in dtype.categories - ] + # Split the categories + split_cats = dtype.categories.str.split(**kwargs) # Count the number of new columns to create for proxy max_splits = len(max(split_cats, key=len)) From 8e29388e8ea9aec21d042f3b7547d52015148df8 Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Fri, 4 Mar 2022 13:47:07 -0600 Subject: [PATCH 09/10] Update doctests to skip expand=True operations --- .../dataframe/pandas_doctests_test.py | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py index 99b64c03d2d0..a9f4471280cf 100644 --- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py +++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py @@ -584,8 +584,6 @@ def test_string_tests(self): f'{module_name}.StringMethods.get_dummies': ['*'], f'{module_name}.str_get_dummies': ['*'], f'{module_name}.StringMethods': ['s.str.split("_")'], - f'{module_name}.StringMethods.rsplit': ['*'], - f'{module_name}.StringMethods.split': ['*'], }, skip={ # count() on Series with a NaN produces mismatched type if we @@ -602,7 +600,30 @@ def test_string_tests(self): ], # output has incorrect formatting in 1.2.x - f'{module_name}.StringMethods.extractall': ['*'] + f'{module_name}.StringMethods.extractall': ['*'], + + # For split and rsplit, if expand=True, then the series + # must be of CategoricalDtype, which pandas doesn't convert to + f'{module_name}.StringMethods.rsplit': [ + 's.str.split(expand=True)', + 's.str.rsplit("/", n=1, expand=True)', + 's.str.split(r"and|plus", expand=True)', + 's.str.split(r".", expand=True)', + 's.str.split(r"\\.jpg", expand=True)', + 's.str.split(r"\\.jpg", regex=True, expand=True)', + 's.str.split(re.compile(r"\\.jpg"), expand=True)', + 's.str.split(r"\\.jpg", regex=False, expand=True)' + ], + f'{module_name}.StringMethods.split': [ + 's.str.split(expand=True)', + 's.str.rsplit("/", n=1, expand=True)', + 's.str.split(r"and|plus", expand=True)', + 's.str.split(r".", expand=True)', + 's.str.split(r"\\.jpg", expand=True)', + 's.str.split(r"\\.jpg", regex=True, expand=True)', + 's.str.split(re.compile(r"\\.jpg"), expand=True)', + 's.str.split(r"\\.jpg", regex=False, expand=True)' + ] }) self.assertEqual(result.failed, 0) From f094db734e5e4f813ec0043aebe0044d68952fb2 Mon Sep 17 00:00:00 2001 From: Andy Ye Date: Fri, 4 Mar 2022 14:53:39 -0600 Subject: [PATCH 10/10] Fix missing doctest --- sdks/python/apache_beam/dataframe/pandas_doctests_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py index a9f4471280cf..34777e605719 100644 --- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py +++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py @@ -605,6 +605,7 @@ def test_string_tests(self): # For split and rsplit, if expand=True, then the series # must be of CategoricalDtype, which pandas doesn't convert to f'{module_name}.StringMethods.rsplit': [ + 's.str.split(r"\\+|=", expand=True)', # for pandas<1.4 's.str.split(expand=True)', 's.str.rsplit("/", n=1, expand=True)', 's.str.split(r"and|plus", expand=True)', @@ -615,6 +616,7 @@ def test_string_tests(self): 's.str.split(r"\\.jpg", regex=False, expand=True)' ], f'{module_name}.StringMethods.split': [ + 's.str.split(r"\\+|=", expand=True)', # for pandas<1.4 's.str.split(expand=True)', 's.str.rsplit("/", n=1, expand=True)', 's.str.split(r"and|plus", expand=True)',