diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index fc0828b06a234..ddca2a71828ac 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -296,8 +296,7 @@ def create_array(s, t): mask = s.isnull() # Ensure timestamp series are in expected form for Spark internal representation if t is not None and pa.types.is_timestamp(t): - s = _check_series_convert_timestamps_internal(s.fillna(0), self._timezone) - + s = _check_series_convert_timestamps_internal(s, self._timezone) try: array = pa.Array.from_pandas(s, mask=mask, type=t, safe=self._safecheck) except pa.ArrowException as e: diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 067113722adb5..1361c0c2d1f55 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -383,6 +383,15 @@ def test_timestamp_dst(self): assert_frame_equal(pdf, df_from_python.toPandas()) assert_frame_equal(pdf, df_from_pandas.toPandas()) + # Regression test for SPARK-28003 + def test_timestamp_nat(self): + dt = [pd.NaT, pd.Timestamp('2019-06-11'), None] * 100 + pdf = pd.DataFrame({'time': dt}) + df_no_arrow, df_arrow = self._createDataFrame_toggle(pdf) + + assert_frame_equal(pdf, df_no_arrow.toPandas()) + assert_frame_equal(pdf, df_arrow.toPandas()) + def test_toPandas_batch_order(self): def delay_first_part(partition_index, iterator):