From 0f182d0f6d8e1eb3b92e7e0eb39f2616235c1368 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 14 Aug 2017 11:08:03 +0900 Subject: [PATCH 1/2] Enable timezone-aware timestamp type when creating Pandas DataFrame. --- python/pyspark/sql/dataframe.py | 9 ++++++ python/pyspark/sql/tests.py | 29 +++++++++++++++++++ .../apache/spark/sql/internal/SQLConf.scala | 8 +++++ 3 files changed, 46 insertions(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index edc7ca6f5146f..d90e9e73ec04f 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1760,6 +1760,15 @@ def toPandas(self): for f, t in dtype.items(): pdf[f] = pdf[f].astype(t, copy=False) + + if self.sql_ctx.getConf("spark.sql.execution.pandas.timeZoneAware", "false").lower() \ + == "true": + timezone = self.sql_ctx.getConf("spark.sql.session.timeZone") + for field in self.schema: + if type(field.dataType) == TimestampType: + pdf[field.name] = pdf[field.name].apply( + lambda ts: ts.tz_localize('tzlocal()').tz_convert(timezone)) + return pdf def _collectAsArrow(self): diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index cf2c473a1645c..b946f038c90ea 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2507,6 +2507,35 @@ def test_to_pandas(self): self.assertEquals(types[2], np.bool) self.assertEquals(types[3], np.float32) + @unittest.skipIf(not _have_pandas, "Pandas not installed") + def test_to_pandas_timezone_aware(self): + import pandas as pd + ts = datetime.datetime(1970, 1, 1) + pdf = pd.DataFrame.from_records([[ts]], columns=['ts']) + + self.spark.conf.set('spark.sql.session.timeZone', 'America/Los_Angeles') + + schema = StructType().add("ts", TimestampType()) + df = self.spark.createDataFrame([(ts,)], schema) + + pdf_naive = df.toPandas() + self.assertEqual(pdf_naive['ts'][0].tzinfo, None) + self.assertTrue(pdf_naive.equals(pdf)) + + self.spark.conf.set('spark.sql.execution.pandas.timeZoneAware', 'true') + + pdf_pst = df.toPandas() + self.assertEqual(pdf_pst['ts'][0].tzinfo.zone, 'America/Los_Angeles') + self.assertFalse(pdf_pst.equals(pdf)) + + pdf_pst_naive = pdf_pst.copy() + pdf_pst_naive['ts'] = pdf_pst_naive['ts'].apply( + lambda ts: ts.tz_convert('tzlocal()').tz_localize(None)) + self.assertTrue(pdf_pst_naive.equals(pdf)) + + self.spark.conf.unset('spark.sql.execution.pandas.timeZoneAware') + self.spark.conf.unset('spark.sql.session.timeZone') + def test_create_dataframe_from_array_of_long(self): import array data = [Row(longarray=array.array('l', [-9223372036854775808, 0, 9223372036854775807]))] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 733d80e9d46cd..7f48f41932393 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -912,6 +912,14 @@ object SQLConf { .intConf .createWithDefault(10000) + val PANDAS_TIMEZONE_AWARE = + buildConf("spark.sql.execution.pandas.timeZoneAware") + .internal() + .doc("When true, make Pandas DataFrame with timezone-aware timestamp type when converting " + + "by pyspark.sql.DataFrame.toPandas. The session local timezone is used for the timezone.") + .booleanConf + .createWithDefault(false) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } From 7df7ac941da56ee9ae894ada3ae30661fddd4b03 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 14 Aug 2017 17:43:52 +0900 Subject: [PATCH 2/2] Use dateutil.tz.tzlocal directly. --- python/pyspark/sql/dataframe.py | 4 +++- python/pyspark/sql/tests.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index d90e9e73ec04f..abf68da3a3aa4 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1763,11 +1763,13 @@ def toPandas(self): if self.sql_ctx.getConf("spark.sql.execution.pandas.timeZoneAware", "false").lower() \ == "true": + from dateutil import tz + tzlocal = tz.tzlocal() timezone = self.sql_ctx.getConf("spark.sql.session.timeZone") for field in self.schema: if type(field.dataType) == TimestampType: pdf[field.name] = pdf[field.name].apply( - lambda ts: ts.tz_localize('tzlocal()').tz_convert(timezone)) + lambda ts: ts.tz_localize(tzlocal).tz_convert(timezone)) return pdf diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b946f038c90ea..cee0dabc0a4bd 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2510,6 +2510,8 @@ def test_to_pandas(self): @unittest.skipIf(not _have_pandas, "Pandas not installed") def test_to_pandas_timezone_aware(self): import pandas as pd + from dateutil import tz + tzlocal = tz.tzlocal() ts = datetime.datetime(1970, 1, 1) pdf = pd.DataFrame.from_records([[ts]], columns=['ts']) @@ -2530,7 +2532,7 @@ def test_to_pandas_timezone_aware(self): pdf_pst_naive = pdf_pst.copy() pdf_pst_naive['ts'] = pdf_pst_naive['ts'].apply( - lambda ts: ts.tz_convert('tzlocal()').tz_localize(None)) + lambda ts: ts.tz_convert(tzlocal).tz_localize(None)) self.assertTrue(pdf_pst_naive.equals(pdf)) self.spark.conf.unset('spark.sql.execution.pandas.timeZoneAware')