From af6c2db872b7f6614c261c50261e5dee0182e882 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Tue, 11 Jun 2019 16:25:35 -0400 Subject: [PATCH 1/5] Fix NaT bug with Arrow --- python/pyspark/serializers.py | 3 +-- python/pyspark/sql/tests/test_arrow.py | 13 +++++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index fc0828b06a234..ddca2a71828ac 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -296,8 +296,7 @@ def create_array(s, t): mask = s.isnull() # Ensure timestamp series are in expected form for Spark internal representation if t is not None and pa.types.is_timestamp(t): - s = _check_series_convert_timestamps_internal(s.fillna(0), self._timezone) - + s = _check_series_convert_timestamps_internal(s, self._timezone) try: array = pa.Array.from_pandas(s, mask=mask, type=t, safe=self._safecheck) except pa.ArrowException as e: diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 067113722adb5..9293d5eaac423 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -383,6 +383,19 @@ def test_timestamp_dst(self): assert_frame_equal(pdf, df_from_python.toPandas()) assert_frame_equal(pdf, df_from_pandas.toPandas()) + def test_timestamp_nat(self): + import pandas as pd + dt1 = [pd.NaT, pd.Timestamp('2019-06-11')] * 100 + dt2 = [None, pd.Timestamp('2019-06-11')] * 100 + pdf1 = pd.DataFrame({'time': dt1}) + pdf2 = pd.DataFrame({'time': dt2}) + + df1 = self.spark.createDataFrame(pdf1) + df2 = self.spark.createDataFrame(pdf2) + + assert_frame_equal(pdf1, df1.toPandas()) + assert_frame_equal(pdf2, df2.toPandas()) + def test_toPandas_batch_order(self): def delay_first_part(partition_index, iterator): From 1c5b8a706ad38518a456c38f69480d8f991ec60c Mon Sep 17 00:00:00 2001 From: Li Jin Date: Wed, 12 Jun 2019 18:12:59 -0400 Subject: [PATCH 2/5] Simplify test cases; Test both arrow and non arrow path --- python/pyspark/sql/tests/test_arrow.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 9293d5eaac423..670ddc6219a00 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -384,17 +384,16 @@ def test_timestamp_dst(self): assert_frame_equal(pdf, df_from_pandas.toPandas()) def test_timestamp_nat(self): - import pandas as pd - dt1 = [pd.NaT, pd.Timestamp('2019-06-11')] * 100 - dt2 = [None, pd.Timestamp('2019-06-11')] * 100 - pdf1 = pd.DataFrame({'time': dt1}) - pdf2 = pd.DataFrame({'time': dt2}) + dt = [pd.NaT, pd.Timestamp('2019-06-11'), None] * 100 + pdf = pd.DataFrame({'time': dt}) - df1 = self.spark.createDataFrame(pdf1) - df2 = self.spark.createDataFrame(pdf2) + with self.sql_conf({'spark.sql.execution.arrow.pyspark.enabled': "false"}): + df = self.spark.createDataFrame(pdf) + assert_frame_equal(pdf, df.toPandas()) - assert_frame_equal(pdf1, df1.toPandas()) - assert_frame_equal(pdf2, df2.toPandas()) + with self.sql_conf({'spark.sql.execution.arrow.pyspark.enabled': "true"}): + df = self.spark.createDataFrame(pdf) + assert_frame_equal(pdf, df.toPandas()) def test_toPandas_batch_order(self): From ae144e1d4a6bb1ce6ec9c4c7ed35f1af72635be5 Mon Sep 17 00:00:00 2001 From: Li Jin Date: Thu, 13 Jun 2019 11:43:00 -0400 Subject: [PATCH 3/5] Add Jira numbers --- python/pyspark/sql/tests/test_arrow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 670ddc6219a00..4332bea2951da 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -383,6 +383,7 @@ def test_timestamp_dst(self): assert_frame_equal(pdf, df_from_python.toPandas()) assert_frame_equal(pdf, df_from_pandas.toPandas()) + # Regression test for SPARK-28003 def test_timestamp_nat(self): dt = [pd.NaT, pd.Timestamp('2019-06-11'), None] * 100 pdf = pd.DataFrame({'time': dt}) From 60733ac9188477a8c942a39ce97856c2812012af Mon Sep 17 00:00:00 2001 From: Li Jin Date: Thu, 13 Jun 2019 11:45:50 -0400 Subject: [PATCH 4/5] Refactor to forloop --- python/pyspark/sql/tests/test_arrow.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 4332bea2951da..11c3cb42ddb8e 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -388,13 +388,10 @@ def test_timestamp_nat(self): dt = [pd.NaT, pd.Timestamp('2019-06-11'), None] * 100 pdf = pd.DataFrame({'time': dt}) - with self.sql_conf({'spark.sql.execution.arrow.pyspark.enabled': "false"}): - df = self.spark.createDataFrame(pdf) - assert_frame_equal(pdf, df.toPandas()) - - with self.sql_conf({'spark.sql.execution.arrow.pyspark.enabled': "true"}): - df = self.spark.createDataFrame(pdf) - assert_frame_equal(pdf, df.toPandas()) + for arrow_enabled in [False, True]: + with self.sql_conf({'spark.sql.execution.arrow.pyspark.enabled': arrow_enabled}): + df = self.spark.createDataFrame(pdf) + assert_frame_equal(pdf, df.toPandas()) def test_toPandas_batch_order(self): From bda31b5811c9ac2eff27bfcec548ad2b3b0d2f3a Mon Sep 17 00:00:00 2001 From: Li Jin Date: Thu, 13 Jun 2019 14:30:57 -0400 Subject: [PATCH 5/5] Use _createDataFrame_toggle --- python/pyspark/sql/tests/test_arrow.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 11c3cb42ddb8e..1361c0c2d1f55 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -387,11 +387,10 @@ def test_timestamp_dst(self): def test_timestamp_nat(self): dt = [pd.NaT, pd.Timestamp('2019-06-11'), None] * 100 pdf = pd.DataFrame({'time': dt}) + df_no_arrow, df_arrow = self._createDataFrame_toggle(pdf) - for arrow_enabled in [False, True]: - with self.sql_conf({'spark.sql.execution.arrow.pyspark.enabled': arrow_enabled}): - df = self.spark.createDataFrame(pdf) - assert_frame_equal(pdf, df.toPandas()) + assert_frame_equal(pdf, df_no_arrow.toPandas()) + assert_frame_equal(pdf, df_arrow.toPandas()) def test_toPandas_batch_order(self):