From 4735e5981ecf3a4bce50ce86f706e25830f4a801 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 23 Oct 2017 15:27:22 +0900 Subject: [PATCH 01/25] Add a conf to make Pandas DataFrame respect session local timezone. --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5203e8833fbbb..9737fb1e4a37c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -948,6 +948,14 @@ object SQLConf { .intConf .createWithDefault(10000) + val PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE = + buildConf("spark.sql.execution.pandas.respectSessionTimeZone") + .internal() + .doc("When true, make Pandas DataFrame with timestamp type respecting session local " + + "timezone when converting to/from Pandas DataFrame.") + .booleanConf + .createWithDefault(true) + val REPLACE_EXCEPT_WITH_FILTER = buildConf("spark.sql.optimizer.replaceExceptWithFilter") .internal() .doc("When true, the apply function of the rule verifies whether the right node of the" + @@ -1246,6 +1254,8 @@ class SQLConf extends Serializable with Logging { def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH) + def pandasRespectSessionTimeZone: Boolean = getConf(PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE) + def replaceExceptWithFilter: Boolean = getConf(REPLACE_EXCEPT_WITH_FILTER) /** ********************** SQLConf functionality methods ************ */ From 1f85150dc5b26df21dca6bad2ef4eaec342c4400 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 23 Oct 2017 17:09:16 +0900 Subject: [PATCH 02/25] Fix toPandas() behavior. --- python/pyspark/sql/dataframe.py | 13 +++++++++--- python/pyspark/sql/tests.py | 36 +++++++++++++++++++++++++++++---- python/pyspark/sql/types.py | 16 ++++++++++----- 3 files changed, 53 insertions(+), 12 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 406686e6df724..004fb8da773b5 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1881,15 +1881,22 @@ def toPandas(self): 1 5 Bob """ import pandas as pd + from pyspark.sql.types import _check_dataframe_localize_timestamps + + if self.sql_ctx.getConf("spark.sql.execution.pandas.respectSessionTimeZone").lower() \ + == "true": + timezone = self.sql_ctx.getConf("spark.sql.session.timeZone") + else: + timezone = None + if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true": try: - from pyspark.sql.types import _check_dataframe_localize_timestamps import pyarrow tables = self._collectAsArrow() if tables: table = pyarrow.concat_tables(tables) pdf = table.to_pandas() - return _check_dataframe_localize_timestamps(pdf) + return _check_dataframe_localize_timestamps(pdf, self.schema, timezone) else: return pd.DataFrame.from_records([], columns=self.columns) except ImportError as e: @@ -1913,7 +1920,7 @@ def toPandas(self): for f, t in dtype.items(): pdf[f] = pdf[f].astype(t, copy=False) - return pdf + return _check_dataframe_localize_timestamps(pdf, self.schema, timezone) def _collectAsArrow(self): """ diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 483f39aeef66a..54b5e18e350d0 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3136,14 +3136,42 @@ def test_null_conversion(self): null_counts = pdf.isnull().sum().tolist() self.assertTrue(all([c == 1 for c in null_counts])) - def test_toPandas_arrow_toggle(self): - df = self.spark.createDataFrame(self.data, schema=self.schema) + def _toPandas_arrow_toggle(self, df): self.spark.conf.set("spark.sql.execution.arrow.enabled", "false") - pdf = df.toPandas() - self.spark.conf.set("spark.sql.execution.arrow.enabled", "true") + try: + pdf = df.toPandas() + finally: + self.spark.conf.set("spark.sql.execution.arrow.enabled", "true") pdf_arrow = df.toPandas() + return pdf, pdf_arrow + + def test_toPandas_arrow_toggle(self): + df = self.spark.createDataFrame(self.data, schema=self.schema) + pdf, pdf_arrow = self._toPandas_arrow_toggle(df) self.assertFramesEqual(pdf_arrow, pdf) + def test_toPandas_respect_session_timezone(self): + df = self.spark.createDataFrame(self.data, schema=self.schema) + orig_tz = self.spark.conf.get("spark.sql.session.timeZone") + try: + timezone = "America/New_York" + self.spark.conf.set("spark.sql.session.timeZone", timezone) + self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "false") + try: + pdf_la, pdf_arrow_la = self._toPandas_arrow_toggle(df) + self.assertFramesEqual(pdf_arrow_la, pdf_la) + finally: + self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "true") + pdf_ny, pdf_arrow_ny = self._toPandas_arrow_toggle(df) + self.assertFramesEqual(pdf_arrow_ny, pdf_ny) + + from pyspark.sql.types import _check_dataframe_localize_timestamps + self.assertFalse(pdf_ny.equals(pdf_la)) + self.assertTrue(pdf_ny.equals( + _check_dataframe_localize_timestamps(pdf_la, self.schema, timezone))) + finally: + self.spark.conf.set("spark.sql.session.timeZone", orig_tz) + def test_pandas_round_trip(self): import pandas as pd import numpy as np diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 7dd8fa04160e0..0e0be0ddf2f9c 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1629,18 +1629,24 @@ def to_arrow_type(dt): return arrow_type -def _check_dataframe_localize_timestamps(pdf): +def _check_dataframe_localize_timestamps(pdf, schema, timezone): """ Convert timezone aware timestamps to timezone-naive in local time :param pdf: pandas.DataFrame :return pandas.DataFrame where any timezone aware columns have be converted to tz-naive """ - from pandas.api.types import is_datetime64tz_dtype + from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype + tz = timezone or 'tzlocal()' for column, series in pdf.iteritems(): - # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if is_datetime64tz_dtype(series.dtype): - pdf[column] = series.dt.tz_convert('tzlocal()').dt.tz_localize(None) + if type(schema[str(column)].dataType) == TimestampType: + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if is_datetime64tz_dtype(series.dtype): + pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None) + elif is_datetime64_dtype(series.dtype) and timezone is not None: + # `series.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT. + pdf[column] = series.apply(lambda ts: ts.tz_localize('tzlocal()')) \ + .dt.tz_convert(tz).dt.tz_localize(None) return pdf From 5c08ecf247bfe7e14afcdef8eba1c25cb3b68634 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 23 Oct 2017 18:15:47 +0900 Subject: [PATCH 03/25] Modify pandas UDFs to respect session timezone. --- python/pyspark/serializers.py | 15 +++-- python/pyspark/sql/tests.py | 56 +++++++++++++++++++ python/pyspark/sql/types.py | 54 +++++++++++++++++- python/pyspark/worker.py | 3 +- .../python/ArrowEvalPythonExec.scala | 4 +- .../execution/python/ArrowPythonRunner.scala | 8 ++- .../python/FlatMapGroupsInPandasExec.scala | 4 +- 7 files changed, 133 insertions(+), 11 deletions(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index d7979f095da76..b9ae12c520780 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -213,7 +213,7 @@ def __repr__(self): return "ArrowSerializer" -def _create_batch(series): +def _create_batch(series, timezone): from pyspark.sql.types import _check_series_convert_timestamps_internal import pyarrow as pa # Make input conform to [(series1, type1), (series2, type2), ...] @@ -227,7 +227,7 @@ def _create_batch(series): def cast_series(s, t): if type(t) == pa.TimestampType: # NOTE: convert to 'us' with astype here, unit ignored in `from_pandas` see ARROW-1680 - return _check_series_convert_timestamps_internal(s.fillna(0))\ + return _check_series_convert_timestamps_internal(s.fillna(0), timezone)\ .values.astype('datetime64[us]', copy=False) elif t == pa.date32(): # TODO: this converts the series to Python objects, possibly avoid with Arrow >= 0.8 @@ -252,6 +252,10 @@ class ArrowStreamPandasSerializer(Serializer): Serializes Pandas.Series as Arrow data with Arrow streaming format. """ + def __init__(self, timezone): + super(ArrowStreamPandasSerializer, self).__init__() + self._timezone = timezone + def dump_stream(self, iterator, stream): """ Make ArrowRecordBatches from Pandas Series and serialize. Input is a single series or @@ -261,7 +265,7 @@ def dump_stream(self, iterator, stream): writer = None try: for series in iterator: - batch = _create_batch(series) + batch = _create_batch(series, self._timezone) if writer is None: write_int(SpecialLengths.START_ARROW_STREAM, stream) writer = pa.RecordBatchStreamWriter(stream, batch.schema) @@ -274,12 +278,13 @@ def load_stream(self, stream): """ Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series. """ - from pyspark.sql.types import _check_dataframe_localize_timestamps + from pyspark.sql.types import _check_dataframe_localize_timestamps, from_arrow_schema import pyarrow as pa reader = pa.open_stream(stream) + schema = from_arrow_schema(reader.schema) for batch in reader: # NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1 - pdf = _check_dataframe_localize_timestamps(batch.to_pandas()) + pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), schema, self._timezone) yield [c for _, c in pdf.iteritems()] def __repr__(self): diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 54b5e18e350d0..8cd98c661c210 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3197,6 +3197,27 @@ def test_filtered_frame(self): @unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") class VectorizedUDFTests(ReusedSQLTestCase): + @classmethod + def setUpClass(cls): + ReusedSQLTestCase.setUpClass() + + # Synchronize default timezone between Python and Java + cls.tz_prev = os.environ.get("TZ", None) # save current tz if set + tz = "America/Los_Angeles" + os.environ["TZ"] = tz + time.tzset() + + cls.sc.environment["TZ"] = tz + cls.spark.conf.set("spark.sql.session.timeZone", tz) + + @classmethod + def tearDownClass(cls): + del os.environ["TZ"] + if cls.tz_prev is not None: + os.environ["TZ"] = cls.tz_prev + time.tzset() + ReusedSQLTestCase.tearDownClass() + def test_vectorized_udf_basic(self): from pyspark.sql.functions import pandas_udf, col df = self.spark.range(10).select( @@ -3512,6 +3533,41 @@ def check_records_per_batch(x): else: self.spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", orig_value) + def test_vectorized_udf_timestamps_respect_session_timezone(self): + from pyspark.sql.functions import pandas_udf, col + from datetime import datetime + schema = StructType([ + StructField("idx", LongType(), True), + StructField("timestamp", TimestampType(), True)]) + data = [(1, datetime(1969, 1, 1, 1, 1, 1)), + (2, datetime(2012, 2, 2, 2, 2, 2)), + (3, datetime(2100, 3, 3, 3, 3, 3))] + df = self.spark.createDataFrame(data, schema=schema) + + internal_value = pandas_udf(lambda ts: ts.apply(lambda ts: ts.value), LongType()) + + orig_tz = self.spark.conf.get("spark.sql.session.timeZone") + try: + timezone = "America/New_York" + self.spark.conf.set("spark.sql.session.timeZone", timezone) + self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "false") + try: + df_la = df.withColumn("internal_value", internal_value(col("timestamp"))) + result_la = df_la.select(col("idx"), col("internal_value")).collect() + diff = 3 * 60 * 60 * 1000 * 1000 * 1000 + result_la_corrected = \ + df_la.select(col("idx"), col("internal_value") + diff).collect() + finally: + self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "true") + + df_ny = df.withColumn("internal_value", internal_value(col("timestamp"))) + result_ny = df_ny.select(col("idx"), col("internal_value")).collect() + + self.assertNotEqual(result_ny, result_la) + self.assertEqual(result_ny, result_la_corrected) + finally: + self.spark.conf.set("spark.sql.session.timeZone", orig_tz) + @unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") class GroupbyApplyTests(ReusedSQLTestCase): diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 0e0be0ddf2f9c..9a85a4b46c8a6 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1629,6 +1629,55 @@ def to_arrow_type(dt): return arrow_type +def to_arrow_schema(schema): + """ Convert a schema from Spark to Arrow + """ + import pyarrow as pa + fields = [pa.field(field.name, to_arrow_type(field.dataType), nullable=field.nullable) + for field in schema] + return pa.schema(fields) + + +def from_arrow_type(at): + """ Convert pyarrow type to Spark data type. + """ + # TODO: newer pyarrow has is_boolean(at) functions that would be better to check type + import pyarrow as pa + if at == pa.bool_(): + spark_type = BooleanType() + elif at == pa.int8(): + spark_type = ByteType() + elif at == pa.int16(): + spark_type = ShortType() + elif at == pa.int32(): + spark_type = IntegerType() + elif at == pa.int64(): + spark_type = LongType() + elif at == pa.float32(): + spark_type = FloatType() + elif at == pa.float64(): + spark_type = DoubleType() + elif type(at) == pa.DecimalType: + spark_type = DecimalType(precision=at.precision, scale=at.scale) + elif at == pa.string(): + spark_type = StringType() + elif at == pa.date32(): + spark_type = DateType() + elif type(at) == pa.TimestampType: + spark_type = TimestampType() + else: + raise TypeError("Unsupported type in conversion from Arrow: " + str(at)) + return spark_type + + +def from_arrow_schema(arrow_schema): + """ Convert schema from Arrow to Spark. + """ + return StructType( + [StructField(field.name, from_arrow_type(field.type), nullable=field.nullable) + for field in arrow_schema]) + + def _check_dataframe_localize_timestamps(pdf, schema, timezone): """ Convert timezone aware timestamps to timezone-naive in local time @@ -1650,7 +1699,7 @@ def _check_dataframe_localize_timestamps(pdf, schema, timezone): return pdf -def _check_series_convert_timestamps_internal(s): +def _check_series_convert_timestamps_internal(s, timezone): """ Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage :param s: a pandas.Series @@ -1659,7 +1708,8 @@ def _check_series_convert_timestamps_internal(s): from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64_dtype(s.dtype): - return s.dt.tz_localize('tzlocal()').dt.tz_convert('UTC') + tz = timezone or 'tzlocal()' + return s.dt.tz_localize(tz).dt.tz_convert('UTC') elif is_datetime64tz_dtype(s.dtype): return s.dt.tz_convert('UTC') else: diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 5e100e0a9a95d..9cc5d46b04368 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -129,7 +129,8 @@ def read_udfs(pickleSer, infile, eval_type): if eval_type == PythonEvalType.SQL_PANDAS_UDF \ or eval_type == PythonEvalType.SQL_PANDAS_GROUPED_UDF: - ser = ArrowStreamPandasSerializer() + timezone = utf8_deserializer.loads(infile) + ser = ArrowStreamPandasSerializer(timezone) else: ser = BatchedSerializer(PickleSerializer(), 100) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala index bcda2dae92e53..339e2aa85a748 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala @@ -63,6 +63,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi private val batchSize = conf.arrowMaxRecordsPerBatch private val sessionLocalTimeZone = conf.sessionLocalTimeZone + private val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone protected override def evaluate( funcs: Seq[ChainedPythonFunctions], @@ -81,7 +82,8 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val columnarBatchIter = new ArrowPythonRunner( funcs, bufferSize, reuseWorker, - PythonEvalType.SQL_PANDAS_UDF, argOffsets, schema, sessionLocalTimeZone) + PythonEvalType.SQL_PANDAS_UDF, argOffsets, schema, + sessionLocalTimeZone, pandasRespectSessionTimeZone) .compute(batchIter, context.partitionId(), context) new Iterator[InternalRow] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala index 94c05b9b5e49f..9a94d771a01b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala @@ -44,7 +44,8 @@ class ArrowPythonRunner( evalType: Int, argOffsets: Array[Array[Int]], schema: StructType, - timeZoneId: String) + timeZoneId: String, + respectTimeZone: Boolean) extends BasePythonRunner[Iterator[InternalRow], ColumnarBatch]( funcs, bufferSize, reuseWorker, evalType, argOffsets) { @@ -58,6 +59,11 @@ class ArrowPythonRunner( protected override def writeCommand(dataOut: DataOutputStream): Unit = { PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets) + if (respectTimeZone) { + PythonRDD.writeUTF(timeZoneId, dataOut) + } else { + dataOut.writeInt(SpecialLengths.NULL) + } } protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala index e1e04e34e0c71..48157ec9abc58 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala @@ -78,6 +78,7 @@ case class FlatMapGroupsInPandasExec( val argOffsets = Array((0 until (child.output.length - groupingAttributes.length)).toArray) val schema = StructType(child.schema.drop(groupingAttributes.length)) val sessionLocalTimeZone = conf.sessionLocalTimeZone + val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone inputRDD.mapPartitionsInternal { iter => val grouped = if (groupingAttributes.isEmpty) { @@ -95,7 +96,8 @@ case class FlatMapGroupsInPandasExec( val columnarBatchIter = new ArrowPythonRunner( chainedFunc, bufferSize, reuseWorker, - PythonEvalType.SQL_PANDAS_GROUPED_UDF, argOffsets, schema, sessionLocalTimeZone) + PythonEvalType.SQL_PANDAS_GROUPED_UDF, argOffsets, schema, + sessionLocalTimeZone, pandasRespectSessionTimeZone) .compute(grouped, context.partitionId(), context) columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output)) From ee1a1c87e2a89974e4e299f4ad84e5831526d079 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 1 Nov 2017 13:38:30 +0900 Subject: [PATCH 04/25] Workaround for old pandas. --- python/pyspark/sql/tests.py | 23 ++++++++++++----- python/pyspark/sql/types.py | 51 +++++++++++++++++++++++++++---------- 2 files changed, 54 insertions(+), 20 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 8cd98c661c210..7990c2c290cca 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2560,12 +2560,16 @@ def count_bucketed_cols(names, table="pyspark_bucket"): @unittest.skipIf(not _have_pandas, "Pandas not installed") def test_to_pandas(self): + from datetime import datetime, date import numpy as np schema = StructType().add("a", IntegerType()).add("b", StringType())\ - .add("c", BooleanType()).add("d", FloatType()) + .add("c", BooleanType()).add("d", FloatType())\ + .add("dt", DateType()).add("ts", TimestampType()) data = [ - (1, "foo", True, 3.0), (2, "foo", True, 5.0), - (3, "bar", False, -1.0), (4, "bar", False, 6.0), + (1, "foo", True, 3.0, date(1969, 1, 1), datetime(1969, 1, 1, 1, 1, 1)), + (2, "foo", True, 5.0, None, None), + (3, "bar", False, -1.0, date(2012, 3, 3), datetime(2012, 3, 3, 3, 3, 3)), + (4, "bar", False, 6.0, date(2100, 4, 4), datetime(2100, 4, 4, 4, 4, 4)), ] df = self.spark.createDataFrame(data, schema) types = df.toPandas().dtypes @@ -2573,6 +2577,8 @@ def test_to_pandas(self): self.assertEquals(types[1], np.object) self.assertEquals(types[2], np.bool) self.assertEquals(types[3], np.float32) + self.assertEquals(types[4], 'datetime64[ns]') + self.assertEquals(types[5], 'datetime64[ns]') @unittest.skipIf(not _have_pandas, "Pandas not installed") def test_to_pandas_avoid_astype(self): @@ -3544,6 +3550,7 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): (3, datetime(2100, 3, 3, 3, 3, 3))] df = self.spark.createDataFrame(data, schema=schema) + f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType()) internal_value = pandas_udf(lambda ts: ts.apply(lambda ts: ts.value), LongType()) orig_tz = self.spark.conf.get("spark.sql.session.timeZone") @@ -3552,16 +3559,18 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): self.spark.conf.set("spark.sql.session.timeZone", timezone) self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "false") try: - df_la = df.withColumn("internal_value", internal_value(col("timestamp"))) + df_la = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \ + .withColumn("internal_value", internal_value(col("timestamp"))) result_la = df_la.select(col("idx"), col("internal_value")).collect() diff = 3 * 60 * 60 * 1000 * 1000 * 1000 result_la_corrected = \ - df_la.select(col("idx"), col("internal_value") + diff).collect() + df_la.select(col("idx"), col("tscopy"), col("internal_value") + diff).collect() finally: self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "true") - df_ny = df.withColumn("internal_value", internal_value(col("timestamp"))) - result_ny = df_ny.select(col("idx"), col("internal_value")).collect() + df_ny = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \ + .withColumn("internal_value", internal_value(col("timestamp"))) + result_ny = df_ny.select(col("idx"), col("tscopy"), col("internal_value")).collect() self.assertNotEqual(result_ny, result_la) self.assertEqual(result_ny, result_la_corrected) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 9a85a4b46c8a6..6c14016379842 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1685,17 +1685,36 @@ def _check_dataframe_localize_timestamps(pdf, schema, timezone): :param pdf: pandas.DataFrame :return pandas.DataFrame where any timezone aware columns have be converted to tz-naive """ - from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype - tz = timezone or 'tzlocal()' - for column, series in pdf.iteritems(): - if type(schema[str(column)].dataType) == TimestampType: - # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if is_datetime64tz_dtype(series.dtype): - pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None) - elif is_datetime64_dtype(series.dtype) and timezone is not None: - # `series.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT. - pdf[column] = series.apply(lambda ts: ts.tz_localize('tzlocal()')) \ - .dt.tz_convert(tz).dt.tz_localize(None) + try: + from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype + tz = timezone or 'tzlocal()' + for column, series in pdf.iteritems(): + if type(schema[str(column)].dataType) == TimestampType: + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if is_datetime64tz_dtype(series.dtype): + pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None) + elif is_datetime64_dtype(series.dtype) and timezone is not None: + # `series.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT. + pdf[column] = series.apply(lambda ts: ts.tz_localize('tzlocal()')) \ + .dt.tz_convert(tz).dt.tz_localize(None) + except ImportError: + import pandas as pd + from pandas.core.common import is_datetime64tz_dtype, is_datetime64_dtype + from pandas.tslib import _dateutil_tzlocal + tzlocal = _dateutil_tzlocal() + tz = timezone or tzlocal + for column, series in pdf.iteritems(): + if type(schema[str(column)].dataType) == TimestampType: + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if is_datetime64tz_dtype(series.dtype): + # `series.dt.tz_convert(tzlocal).dt.tz_localize(None)` doesn't work properly. + pdf[column] = pd.Series([ts.tz_convert(tz).tz_localize(None) + if ts is not pd.NaT else pd.NaT for ts in series]) + elif is_datetime64_dtype(series.dtype) and timezone is not None: + # `series.dt.tz_localize(tzlocal)` doesn't work properly. + pdf[column] = pd.Series( + [ts.tz_localize(tzlocal).tz_convert(tz).tz_localize(None) + if ts is not pd.NaT else pd.NaT for ts in series]) return pdf @@ -1705,10 +1724,16 @@ def _check_series_convert_timestamps_internal(s, timezone): :param s: a pandas.Series :return pandas.Series where if it is a timestamp, has been UTC normalized without a time zone """ - from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype + try: + from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype + tzlocal = 'tzlocal()' + except ImportError: + from pandas.core.common import is_datetime64tz_dtype, is_datetime64_dtype + from pandas.tslib import _dateutil_tzlocal + tzlocal = _dateutil_tzlocal() # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64_dtype(s.dtype): - tz = timezone or 'tzlocal()' + tz = timezone or tzlocal return s.dt.tz_localize(tz).dt.tz_convert('UTC') elif is_datetime64tz_dtype(s.dtype): return s.dt.tz_convert('UTC') From b1436b8e4876838efbcb38cf0bffa7ebcc7a5544 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 1 Nov 2017 16:53:13 +0900 Subject: [PATCH 05/25] Don't use is_datetime64tz_dtype for old pandas. --- python/pyspark/sql/types.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 6c14016379842..c6aa188f077fc 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1699,14 +1699,14 @@ def _check_dataframe_localize_timestamps(pdf, schema, timezone): .dt.tz_convert(tz).dt.tz_localize(None) except ImportError: import pandas as pd - from pandas.core.common import is_datetime64tz_dtype, is_datetime64_dtype + from pandas.core.common import is_datetime64_dtype from pandas.tslib import _dateutil_tzlocal tzlocal = _dateutil_tzlocal() tz = timezone or tzlocal for column, series in pdf.iteritems(): if type(schema[str(column)].dataType) == TimestampType: # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if is_datetime64tz_dtype(series.dtype): + if not is_datetime64_dtype(series.dtype): # `series.dt.tz_convert(tzlocal).dt.tz_localize(None)` doesn't work properly. pdf[column] = pd.Series([ts.tz_convert(tz).tz_localize(None) if ts is not pd.NaT else pd.NaT for ts in series]) @@ -1726,19 +1726,23 @@ def _check_series_convert_timestamps_internal(s, timezone): """ try: from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype - tzlocal = 'tzlocal()' + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if is_datetime64_dtype(s.dtype): + tz = timezone or 'tzlocal()' + return s.dt.tz_localize(tz).dt.tz_convert('UTC') + elif is_datetime64tz_dtype(s.dtype): + return s.dt.tz_convert('UTC') + else: + return s except ImportError: - from pandas.core.common import is_datetime64tz_dtype, is_datetime64_dtype + from pandas.core.common import is_datetime64_dtype from pandas.tslib import _dateutil_tzlocal - tzlocal = _dateutil_tzlocal() - # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if is_datetime64_dtype(s.dtype): - tz = timezone or tzlocal - return s.dt.tz_localize(tz).dt.tz_convert('UTC') - elif is_datetime64tz_dtype(s.dtype): - return s.dt.tz_convert('UTC') - else: - return s + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if is_datetime64_dtype(s.dtype): + tz = timezone or _dateutil_tzlocal() + return s.dt.tz_localize(tz).dt.tz_convert('UTC') + else: + return s.dt.tz_convert('UTC') def _test(): From 6872516e8cd9d7f81929c38708571c69a0af7883 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 1 Nov 2017 23:27:34 +0900 Subject: [PATCH 06/25] Fix one of the failed tests. --- python/pyspark/sql/types.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index c6aa188f077fc..2cea9afb476e4 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1685,6 +1685,7 @@ def _check_dataframe_localize_timestamps(pdf, schema, timezone): :param pdf: pandas.DataFrame :return pandas.DataFrame where any timezone aware columns have be converted to tz-naive """ + import pandas as pd try: from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype tz = timezone or 'tzlocal()' @@ -1695,10 +1696,10 @@ def _check_dataframe_localize_timestamps(pdf, schema, timezone): pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None) elif is_datetime64_dtype(series.dtype) and timezone is not None: # `series.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT. - pdf[column] = series.apply(lambda ts: ts.tz_localize('tzlocal()')) \ - .dt.tz_convert(tz).dt.tz_localize(None) + pdf[column] = series.apply( + lambda ts: ts.tz_localize('tzlocal()').tz_convert(tz).tz_localize(None) + if ts is not pd.NaT else pd.NaT) except ImportError: - import pandas as pd from pandas.core.common import is_datetime64_dtype from pandas.tslib import _dateutil_tzlocal tzlocal = _dateutil_tzlocal() From 1f096bf32f742945363cc7d9af978041ad77408b Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 2 Nov 2017 14:41:09 +0900 Subject: [PATCH 07/25] Modify check_data udf for debug messages. --- python/pyspark/sql/tests.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 7990c2c290cca..7da43bd45bd8d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3484,22 +3484,29 @@ def test_vectorized_udf_timestamps(self): f_timestamp_copy = pandas_udf(lambda t: t, returnType=TimestampType()) df = df.withColumn("timestamp_copy", f_timestamp_copy(col("timestamp"))) - @pandas_udf(returnType=BooleanType()) + @pandas_udf(returnType=StringType()) def check_data(idx, timestamp, timestamp_copy): + import pandas as pd + msgs = [] is_equal = timestamp.isnull() # use this array to check values are equal for i in range(len(idx)): # Check that timestamps are as expected in the UDF - is_equal[i] = (is_equal[i] and data[idx[i]][1] is None) or \ - timestamp[i].to_pydatetime() == data[idx[i]][1] - return is_equal - - result = df.withColumn("is_equal", check_data(col("idx"), col("timestamp"), - col("timestamp_copy"))).collect() + if (is_equal[i] and data[idx[i]][1] is None) or \ + timestamp[i].to_pydatetime() == data[idx[i]][1]: + msgs.append(None) + else: + msgs.append( + "timestamp values are not equal (timestamp='%s': data[%d][1]='%s')" + % (timestamp[i], idx[i], data[idx[i]][1])) + return pd.Series(msgs) + + result = df.withColumn("check_data", check_data(col("idx"), col("timestamp"), + col("timestamp_copy"))).collect() # Check that collection values are correct self.assertEquals(len(data), len(result)) for i in range(len(result)): self.assertEquals(data[i][1], result[i][1]) # "timestamp" col - self.assertTrue(result[i][3]) # "is_equal" data in udf was as expected + self.assertIsNone(result[i][3]) # "check_data" col def test_vectorized_udf_return_timestamp_tz(self): from pyspark.sql.functions import pandas_udf, col From 569bb633116b35cd717e9f27b2264e1114f8e35d Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 3 Nov 2017 15:05:46 +0900 Subject: [PATCH 08/25] Remove unused method. --- python/pyspark/sql/types.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 2cea9afb476e4..29032f1396fcb 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1629,15 +1629,6 @@ def to_arrow_type(dt): return arrow_type -def to_arrow_schema(schema): - """ Convert a schema from Spark to Arrow - """ - import pyarrow as pa - fields = [pa.field(field.name, to_arrow_type(field.dataType), nullable=field.nullable) - for field in schema] - return pa.schema(fields) - - def from_arrow_type(at): """ Convert pyarrow type to Spark data type. """ From ce07f39643ef9711419e7e11082e62c578d816f5 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 3 Nov 2017 15:15:03 +0900 Subject: [PATCH 09/25] Modify a test. --- python/pyspark/sql/tests.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 7da43bd45bd8d..9d283389cce80 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3549,16 +3549,19 @@ def check_records_per_batch(x): def test_vectorized_udf_timestamps_respect_session_timezone(self): from pyspark.sql.functions import pandas_udf, col from datetime import datetime + import pandas as pd schema = StructType([ StructField("idx", LongType(), True), StructField("timestamp", TimestampType(), True)]) data = [(1, datetime(1969, 1, 1, 1, 1, 1)), (2, datetime(2012, 2, 2, 2, 2, 2)), - (3, datetime(2100, 3, 3, 3, 3, 3))] + (3, None), + (4, datetime(2100, 4, 4, 4, 4, 4))] df = self.spark.createDataFrame(data, schema=schema) f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType()) - internal_value = pandas_udf(lambda ts: ts.apply(lambda ts: ts.value), LongType()) + internal_value = pandas_udf( + lambda ts: ts.apply(lambda ts: ts.value if ts is not pd.NaT else None), LongType()) orig_tz = self.spark.conf.get("spark.sql.session.timeZone") try: From ba3d6e3cf679e3db0a2e095f8cbe099ab4260532 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 6 Nov 2017 15:59:43 +0900 Subject: [PATCH 10/25] Add debug print, which will be removed later. --- python/pyspark/sql/tests.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 9d283389cce80..5e6f2166d1000 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3478,11 +3478,18 @@ def test_vectorized_udf_timestamps(self): (1, datetime(2012, 2, 2, 2, 2, 2)), (2, None), (3, datetime(2100, 4, 4, 4, 4, 4))] + + # TODO: remove later + t = TimestampType() + print([(idx, t.toInternal(ts)) for idx, ts in data]) + df = self.spark.createDataFrame(data, schema=schema) + df.show() # TODO: remove later # Check that a timestamp passed through a pandas_udf will not be altered by timezone calc f_timestamp_copy = pandas_udf(lambda t: t, returnType=TimestampType()) df = df.withColumn("timestamp_copy", f_timestamp_copy(col("timestamp"))) + df.show() # TODO: remove later @pandas_udf(returnType=StringType()) def check_data(idx, timestamp, timestamp_copy): From 9101a3a12f17b5bd633756139eaa2cb3ee9bb33c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 6 Nov 2017 16:06:07 +0900 Subject: [PATCH 11/25] Fix style. --- python/pyspark/sql/tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 5e6f2166d1000..2009fd7f7d5f6 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3484,12 +3484,12 @@ def test_vectorized_udf_timestamps(self): print([(idx, t.toInternal(ts)) for idx, ts in data]) df = self.spark.createDataFrame(data, schema=schema) - df.show() # TODO: remove later + df.show() # TODO: remove later # Check that a timestamp passed through a pandas_udf will not be altered by timezone calc f_timestamp_copy = pandas_udf(lambda t: t, returnType=TimestampType()) df = df.withColumn("timestamp_copy", f_timestamp_copy(col("timestamp"))) - df.show() # TODO: remove later + df.show() # TODO: remove later @pandas_udf(returnType=StringType()) def check_data(idx, timestamp, timestamp_copy): From ab13bafaec0c715e40e6431b4d427acdb23850e3 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 8 Nov 2017 11:33:45 +0900 Subject: [PATCH 12/25] Remove debug prints. --- python/pyspark/sql/tests.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 2009fd7f7d5f6..1b906fcc7f4b3 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3479,17 +3479,11 @@ def test_vectorized_udf_timestamps(self): (2, None), (3, datetime(2100, 4, 4, 4, 4, 4))] - # TODO: remove later - t = TimestampType() - print([(idx, t.toInternal(ts)) for idx, ts in data]) - df = self.spark.createDataFrame(data, schema=schema) - df.show() # TODO: remove later # Check that a timestamp passed through a pandas_udf will not be altered by timezone calc f_timestamp_copy = pandas_udf(lambda t: t, returnType=TimestampType()) df = df.withColumn("timestamp_copy", f_timestamp_copy(col("timestamp"))) - df.show() # TODO: remove later @pandas_udf(returnType=StringType()) def check_data(idx, timestamp, timestamp_copy): From 4adb073f8d1454fbea0742a16b6d7662e063b37a Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 8 Nov 2017 11:34:49 +0900 Subject: [PATCH 13/25] Modify tests to avoid times within DST. --- python/pyspark/sql/tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 1b906fcc7f4b3..9b701e2fb209d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3477,7 +3477,7 @@ def test_vectorized_udf_timestamps(self): data = [(0, datetime(1969, 1, 1, 1, 1, 1)), (1, datetime(2012, 2, 2, 2, 2, 2)), (2, None), - (3, datetime(2100, 4, 4, 4, 4, 4))] + (3, datetime(2100, 3, 3, 3, 3, 3))] df = self.spark.createDataFrame(data, schema=schema) @@ -3557,7 +3557,7 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): data = [(1, datetime(1969, 1, 1, 1, 1, 1)), (2, datetime(2012, 2, 2, 2, 2, 2)), (3, None), - (4, datetime(2100, 4, 4, 4, 4, 4))] + (4, datetime(2100, 3, 3, 3, 3, 3))] df = self.spark.createDataFrame(data, schema=schema) f_timestamp_copy = pandas_udf(lambda ts: ts, TimestampType()) From 1e0f21715f5ad053b5a5677a129677d498946ea3 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 8 Nov 2017 14:49:03 +0900 Subject: [PATCH 14/25] Clean up. --- python/pyspark/serializers.py | 5 +- python/pyspark/sql/dataframe.py | 15 +++- python/pyspark/sql/tests.py | 11 ++- python/pyspark/sql/types.py | 132 ++++++++++++-------------------- 4 files changed, 72 insertions(+), 91 deletions(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index b9ae12c520780..76fdf19411113 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -278,13 +278,12 @@ def load_stream(self, stream): """ Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series. """ - from pyspark.sql.types import _check_dataframe_localize_timestamps, from_arrow_schema + from pyspark.sql.types import _check_dataframe_localize_timestamps import pyarrow as pa reader = pa.open_stream(stream) - schema = from_arrow_schema(reader.schema) for batch in reader: # NOTE: changed from pa.Columns.to_pandas, timezone issue in conversion fixed in 0.7.1 - pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), schema, self._timezone) + pdf = _check_dataframe_localize_timestamps(batch.to_pandas(), self._timezone) yield [c for _, c in pdf.iteritems()] def __repr__(self): diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 004fb8da773b5..752b6f997dbc8 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1881,7 +1881,6 @@ def toPandas(self): 1 5 Bob """ import pandas as pd - from pyspark.sql.types import _check_dataframe_localize_timestamps if self.sql_ctx.getConf("spark.sql.execution.pandas.respectSessionTimeZone").lower() \ == "true": @@ -1891,12 +1890,13 @@ def toPandas(self): if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true": try: + from pyspark.sql.types import _check_dataframe_localize_timestamps import pyarrow tables = self._collectAsArrow() if tables: table = pyarrow.concat_tables(tables) pdf = table.to_pandas() - return _check_dataframe_localize_timestamps(pdf, self.schema, timezone) + return _check_dataframe_localize_timestamps(pdf, timezone) else: return pd.DataFrame.from_records([], columns=self.columns) except ImportError as e: @@ -1920,7 +1920,16 @@ def toPandas(self): for f, t in dtype.items(): pdf[f] = pdf[f].astype(t, copy=False) - return _check_dataframe_localize_timestamps(pdf, self.schema, timezone) + + if timezone is None: + return pdf + else: + from pyspark.sql.types import _check_series_convert_timestamps_localize + for field in self.schema: + if isinstance(field.dataType, TimestampType): + pdf[field.name] = \ + _check_series_convert_timestamps_localize(pdf[field.name], timezone) + return pdf def _collectAsArrow(self): """ diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 9b701e2fb209d..b88b559295ff5 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3171,10 +3171,15 @@ def test_toPandas_respect_session_timezone(self): pdf_ny, pdf_arrow_ny = self._toPandas_arrow_toggle(df) self.assertFramesEqual(pdf_arrow_ny, pdf_ny) - from pyspark.sql.types import _check_dataframe_localize_timestamps self.assertFalse(pdf_ny.equals(pdf_la)) - self.assertTrue(pdf_ny.equals( - _check_dataframe_localize_timestamps(pdf_la, self.schema, timezone))) + + from pyspark.sql.types import _check_series_convert_timestamps_localize + pdf_la_corrected = pdf_la.copy() + for field in self.schema: + if isinstance(field.dataType, TimestampType): + pdf_la_corrected[field.name] = _check_series_convert_timestamps_localize( + pdf_la_corrected[field.name], timezone) + self.assertFramesEqual(pdf_ny, pdf_la_corrected) finally: self.spark.conf.set("spark.sql.session.timeZone", orig_tz) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 29032f1396fcb..c246999ed14f0 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1629,112 +1629,80 @@ def to_arrow_type(dt): return arrow_type -def from_arrow_type(at): - """ Convert pyarrow type to Spark data type. +def _check_dataframe_localize_timestamps(pdf, timezone): """ - # TODO: newer pyarrow has is_boolean(at) functions that would be better to check type - import pyarrow as pa - if at == pa.bool_(): - spark_type = BooleanType() - elif at == pa.int8(): - spark_type = ByteType() - elif at == pa.int16(): - spark_type = ShortType() - elif at == pa.int32(): - spark_type = IntegerType() - elif at == pa.int64(): - spark_type = LongType() - elif at == pa.float32(): - spark_type = FloatType() - elif at == pa.float64(): - spark_type = DoubleType() - elif type(at) == pa.DecimalType: - spark_type = DecimalType(precision=at.precision, scale=at.scale) - elif at == pa.string(): - spark_type = StringType() - elif at == pa.date32(): - spark_type = DateType() - elif type(at) == pa.TimestampType: - spark_type = TimestampType() - else: - raise TypeError("Unsupported type in conversion from Arrow: " + str(at)) - return spark_type + Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone - -def from_arrow_schema(arrow_schema): - """ Convert schema from Arrow to Spark. + :param pdf: pandas.DataFrame + :param timezone: the timezone to convert. if None then use local timezone + :return pandas.DataFrame where any timezone aware columns have been converted to tz-naive """ - return StructType( - [StructField(field.name, from_arrow_type(field.type), nullable=field.nullable) - for field in arrow_schema]) + from pandas.api.types import is_datetime64tz_dtype + tz = timezone or 'tzlocal()' + for column, series in pdf.iteritems(): + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if is_datetime64tz_dtype(series.dtype): + pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None) + return pdf -def _check_dataframe_localize_timestamps(pdf, schema, timezone): +def _check_series_convert_timestamps_internal(s, timezone): """ - Convert timezone aware timestamps to timezone-naive in local time + Convert a tz-naive timestamp in the specified timezone or local timezone to UTC normalized for + Spark internal storage - :param pdf: pandas.DataFrame - :return pandas.DataFrame where any timezone aware columns have be converted to tz-naive + :param s: a pandas.Series + :param timezone: the timezone to convert. if None then use local timezone + :return pandas.Series where if it is a timestamp, has been UTC normalized without a time zone """ - import pandas as pd - try: - from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype + from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if is_datetime64_dtype(s.dtype): tz = timezone or 'tzlocal()' - for column, series in pdf.iteritems(): - if type(schema[str(column)].dataType) == TimestampType: - # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if is_datetime64tz_dtype(series.dtype): - pdf[column] = series.dt.tz_convert(tz).dt.tz_localize(None) - elif is_datetime64_dtype(series.dtype) and timezone is not None: - # `series.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT. - pdf[column] = series.apply( - lambda ts: ts.tz_localize('tzlocal()').tz_convert(tz).tz_localize(None) - if ts is not pd.NaT else pd.NaT) - except ImportError: - from pandas.core.common import is_datetime64_dtype - from pandas.tslib import _dateutil_tzlocal - tzlocal = _dateutil_tzlocal() - tz = timezone or tzlocal - for column, series in pdf.iteritems(): - if type(schema[str(column)].dataType) == TimestampType: - # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if not is_datetime64_dtype(series.dtype): - # `series.dt.tz_convert(tzlocal).dt.tz_localize(None)` doesn't work properly. - pdf[column] = pd.Series([ts.tz_convert(tz).tz_localize(None) - if ts is not pd.NaT else pd.NaT for ts in series]) - elif is_datetime64_dtype(series.dtype) and timezone is not None: - # `series.dt.tz_localize(tzlocal)` doesn't work properly. - pdf[column] = pd.Series( - [ts.tz_localize(tzlocal).tz_convert(tz).tz_localize(None) - if ts is not pd.NaT else pd.NaT for ts in series]) - return pdf + return s.dt.tz_localize(tz).dt.tz_convert('UTC') + elif is_datetime64tz_dtype(s.dtype): + return s.dt.tz_convert('UTC') + else: + return s -def _check_series_convert_timestamps_internal(s, timezone): +def _check_series_convert_timestamps_localize(s, timezone): """ - Convert a tz-naive timestamp in local tz to UTC normalized for Spark internal storage + Convert timestamp to timezone-naive in the specified timezone or local timezone + :param s: a pandas.Series - :return pandas.Series where if it is a timestamp, has been UTC normalized without a time zone + :param timezone: the timezone to convert. if None then use local timezone + :return pandas.Series where if it is a timestamp, has been converted to tz-naive """ + import pandas as pd try: from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype + tz = timezone or 'tzlocal()' # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if is_datetime64_dtype(s.dtype): - tz = timezone or 'tzlocal()' - return s.dt.tz_localize(tz).dt.tz_convert('UTC') - elif is_datetime64tz_dtype(s.dtype): - return s.dt.tz_convert('UTC') + if is_datetime64tz_dtype(s.dtype): + return s.dt.tz_convert(tz).dt.tz_localize(None) + elif is_datetime64_dtype(s.dtype) and timezone is not None: + # `s.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT. + return s.apply(lambda ts: ts.tz_localize('tzlocal()').tz_convert(tz).tz_localize(None) + if ts is not pd.NaT else pd.NaT) else: return s except ImportError: from pandas.core.common import is_datetime64_dtype from pandas.tslib import _dateutil_tzlocal + tzlocal = _dateutil_tzlocal() + tz = timezone or tzlocal # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if is_datetime64_dtype(s.dtype): - tz = timezone or _dateutil_tzlocal() - return s.dt.tz_localize(tz).dt.tz_convert('UTC') + if not is_datetime64_dtype(s.dtype): + # `s.dt.tz_convert(tzlocal).dt.tz_localize(None)` doesn't work properly. + return pd.Series([ts.tz_convert(tz).tz_localize(None) + if ts is not pd.NaT else pd.NaT for ts in s]) + elif is_datetime64_dtype(s.dtype) and timezone is not None: + # `s.dt.tz_localize(tzlocal)` doesn't work properly. + return pd.Series([ts.tz_localize(tzlocal).tz_convert(tz).tz_localize(None) + if ts is not pd.NaT else pd.NaT for ts in s]) else: - return s.dt.tz_convert('UTC') + return s def _test(): From 292678f3e47a4f5a20fd1af5da10e02cc4017882 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 8 Nov 2017 18:41:07 +0900 Subject: [PATCH 15/25] Fix the behavior of createDataFrame from pandas DataFrame. --- python/pyspark/sql/dataframe.py | 4 +- python/pyspark/sql/session.py | 34 +++++++++++-- python/pyspark/sql/tests.py | 4 +- python/pyspark/sql/types.py | 84 ++++++++++++++++++++++++--------- 4 files changed, 97 insertions(+), 29 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 752b6f997dbc8..5726a6a4fdfa0 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1924,11 +1924,11 @@ def toPandas(self): if timezone is None: return pdf else: - from pyspark.sql.types import _check_series_convert_timestamps_localize + from pyspark.sql.types import _check_series_convert_timestamps_local_tz for field in self.schema: if isinstance(field.dataType, TimestampType): pdf[field.name] = \ - _check_series_convert_timestamps_localize(pdf[field.name], timezone) + _check_series_convert_timestamps_local_tz(pdf[field.name], timezone) return pdf def _collectAsArrow(self): diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index d1d0b8b8fe5d9..02db8489f4940 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -34,8 +34,9 @@ from pyspark.sql.dataframe import DataFrame from pyspark.sql.readwriter import DataFrameReader from pyspark.sql.streaming import DataStreamReader -from pyspark.sql.types import Row, DataType, StringType, StructType, _make_type_verifier, \ - _infer_schema, _has_nulltype, _merge_type, _create_converter, _parse_datatype_string +from pyspark.sql.types import Row, DataType, StringType, StructType, TimestampType, \ + _make_type_verifier, _infer_schema, _has_nulltype, _merge_type, _create_converter, \ + _parse_datatype_string from pyspark.sql.utils import install_exception_handler __all__ = ["SparkSession"] @@ -440,7 +441,7 @@ def _get_numpy_record_dtypes(self, rec): record_type_list.append((str(col_names[i]), curr_type)) return record_type_list if has_rec_fix else None - def _convert_from_pandas(self, pdf, schema): + def _convert_from_pandas(self, pdf, schema, timezone): """ Convert a pandas.DataFrame to list of records that can be used to make a DataFrame :return tuple of list of records and schema @@ -449,6 +450,25 @@ def _convert_from_pandas(self, pdf, schema): if schema is None: schema = [str(x) for x in pdf.columns] + if timezone is not None: + from pyspark.sql.types import _check_series_convert_timestamps_tz_local + copied = False + if isinstance(schema, StructType): + for field in schema: + if isinstance(field.dataType, TimestampType): + s = _check_series_convert_timestamps_tz_local(pdf[field.name], timezone) + if not copied and s is not pdf[field.name]: + pdf = pdf.copy() + copied = True + pdf[field.name] = s + else: + for column, series in pdf.iteritems(): + s = _check_series_convert_timestamps_tz_local(pdf[column], timezone) + if not copied and s is not pdf[column]: + pdf = pdf.copy() + copied = True + pdf[column] = s + # Convert pandas.DataFrame to list of numpy records np_records = pdf.to_records(index=False) @@ -557,7 +577,13 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception: has_pandas = False if has_pandas and isinstance(data, pandas.DataFrame): - data, schema = self._convert_from_pandas(data, schema) + if self.conf.get("spark.sql.execution.pandas.respectSessionTimeZone").lower() \ + == "true": + timezone = self.conf.get("spark.sql.session.timeZone") + else: + timezone = None + + data, schema = self._convert_from_pandas(data, schema, timezone) if isinstance(schema, StructType): verify_func = _make_type_verifier(schema) if verifySchema else lambda _: True diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 5227f172b4c05..18cac25aa273f 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3188,11 +3188,11 @@ def test_toPandas_respect_session_timezone(self): self.assertFalse(pdf_ny.equals(pdf_la)) - from pyspark.sql.types import _check_series_convert_timestamps_localize + from pyspark.sql.types import _check_series_convert_timestamps_local_tz pdf_la_corrected = pdf_la.copy() for field in self.schema: if isinstance(field.dataType, TimestampType): - pdf_la_corrected[field.name] = _check_series_convert_timestamps_localize( + pdf_la_corrected[field.name] = _check_series_convert_timestamps_local_tz( pdf_la_corrected[field.name], timezone) self.assertFramesEqual(pdf_ny, pdf_la_corrected) finally: diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index c246999ed14f0..a733ae4348a8d 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1666,43 +1666,85 @@ def _check_series_convert_timestamps_internal(s, timezone): return s -def _check_series_convert_timestamps_localize(s, timezone): +def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone): """ Convert timestamp to timezone-naive in the specified timezone or local timezone :param s: a pandas.Series - :param timezone: the timezone to convert. if None then use local timezone + :param fromTimezone: the timezone to convert from. if None then use local timezone + :param toTimezone: the timezone to convert to. if None then use local timezone :return pandas.Series where if it is a timestamp, has been converted to tz-naive """ import pandas as pd try: from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype - tz = timezone or 'tzlocal()' + fromTz = fromTimezone or 'tzlocal()' + toTz = toTimezone or 'tzlocal()' # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64tz_dtype(s.dtype): - return s.dt.tz_convert(tz).dt.tz_localize(None) - elif is_datetime64_dtype(s.dtype) and timezone is not None: + return s.dt.tz_convert(toTz).dt.tz_localize(None) + elif is_datetime64_dtype(s.dtype) and fromTz != toTz: # `s.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT. - return s.apply(lambda ts: ts.tz_localize('tzlocal()').tz_convert(tz).tz_localize(None) + return s.apply(lambda ts: ts.tz_localize(fromTz).tz_convert(toTz).tz_localize(None) if ts is not pd.NaT else pd.NaT) else: return s except ImportError: - from pandas.core.common import is_datetime64_dtype - from pandas.tslib import _dateutil_tzlocal - tzlocal = _dateutil_tzlocal() - tz = timezone or tzlocal - # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if not is_datetime64_dtype(s.dtype): - # `s.dt.tz_convert(tzlocal).dt.tz_localize(None)` doesn't work properly. - return pd.Series([ts.tz_convert(tz).tz_localize(None) - if ts is not pd.NaT else pd.NaT for ts in s]) - elif is_datetime64_dtype(s.dtype) and timezone is not None: - # `s.dt.tz_localize(tzlocal)` doesn't work properly. - return pd.Series([ts.tz_localize(tzlocal).tz_convert(tz).tz_localize(None) - if ts is not pd.NaT else pd.NaT for ts in s]) - else: - return s + try: + # Pandas <0.19 + from pandas.core.common import is_datetime64tz_dtype, is_datetime64_dtype + from pandas.tslib import _dateutil_tzlocal + tzlocal = _dateutil_tzlocal() + fromTz = fromTimezone or tzlocal + toTz = toTimezone or tzlocal + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if is_datetime64tz_dtype(s.dtype): + # `s.dt.tz_convert(tzlocal).dt.tz_localize(None)` doesn't work properly. + return pd.Series([ts.tz_convert(toTz).tz_localize(None) + if ts is not pd.NaT else pd.NaT for ts in s]) + elif is_datetime64_dtype(s.dtype) and fromTz != toTz: + # `s.dt.tz_localize(tzlocal)` doesn't work properly. + return pd.Series([ts.tz_localize(fromTz).tz_convert(toTz).tz_localize(None) + if ts is not pd.NaT else pd.NaT for ts in s]) + else: + return s + except ImportError: + # Pandas <0.17 + # can't handle datetime64tz + from pandas.core.common import is_datetime64_dtype + from pandas.tslib import _dateutil_tzlocal + tzlocal = _dateutil_tzlocal() + fromTz = fromTimezone or tzlocal + toTz = toTimezone or tzlocal + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if is_datetime64_dtype(s.dtype) and fromTz != toTz: + # `s.dt.tz_localize(tzlocal)` doesn't work properly. + return pd.Series([ts.tz_localize(fromTz).tz_convert(toTz).tz_localize(None) + if ts is not pd.NaT else pd.NaT for ts in s]) + else: + return s + + +def _check_series_convert_timestamps_local_tz(s, timezone): + """ + Convert timestamp to timezone-naive in the specified timezone or local timezone + + :param s: a pandas.Series + :param timezone: the timezone to convert to. if None then use local timezone + :return pandas.Series where if it is a timestamp, has been converted to tz-naive + """ + return _check_series_convert_timestamps_localize(s, None, timezone) + + +def _check_series_convert_timestamps_tz_local(s, timezone): + """ + Convert timestamp to timezone-naive in the specified timezone or local timezone + + :param s: a pandas.Series + :param timezone: the timezone to convert from. if None then use local timezone + :return pandas.Series where if it is a timestamp, has been converted to tz-naive + """ + return _check_series_convert_timestamps_localize(s, timezone, None) def _test(): From 8b1a4d8eb1c73d87eb3867fe4c1876cb9c48b2cf Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 13 Nov 2017 15:35:51 +0900 Subject: [PATCH 16/25] Add a test to check the behavior of createDataFrame from pandas DataFrame. --- python/pyspark/sql/tests.py | 39 +++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index dc398d3dc472c..31adc432eaae8 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3229,16 +3229,47 @@ def test_filtered_frame(self): self.assertEqual(pdf.columns[0], "i") self.assertTrue(pdf.empty) - def test_createDataFrame_toggle(self): - pdf = self.create_pandas_data_frame() + def _createDataFrame_toggle(self, pdf, schema=None): self.spark.conf.set("spark.sql.execution.arrow.enabled", "false") try: - df_no_arrow = self.spark.createDataFrame(pdf) + df_no_arrow = self.spark.createDataFrame(pdf, schema=schema) finally: self.spark.conf.set("spark.sql.execution.arrow.enabled", "true") - df_arrow = self.spark.createDataFrame(pdf) + df_arrow = self.spark.createDataFrame(pdf, schema=schema) + return df_no_arrow, df_arrow + + def test_createDataFrame_toggle(self): + pdf = self.create_pandas_data_frame() + df_no_arrow, df_arrow = self._createDataFrame_toggle(pdf, schema=self.schema) self.assertEquals(df_no_arrow.collect(), df_arrow.collect()) + def test_createDataFrame_respect_session_timezone(self): + from datetime import timedelta + pdf = self.create_pandas_data_frame() + orig_tz = self.spark.conf.get("spark.sql.session.timeZone") + try: + timezone = "America/New_York" + self.spark.conf.set("spark.sql.session.timeZone", timezone) + self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "false") + try: + df_no_arrow_la, df_arrow_la = self._createDataFrame_toggle(pdf, schema=self.schema) + result_la = df_no_arrow_la.collect() + result_arrow_la = df_arrow_la.collect() + self.assertEqual(result_la, result_arrow_la) + finally: + self.spark.conf.set("spark.sql.execution.pandas.respectSessionTimeZone", "true") + df_no_arrow_ny, df_arrow_ny = self._createDataFrame_toggle(pdf, schema=self.schema) + result_ny = df_no_arrow_ny.collect() + result_arrow_ny = df_arrow_ny.collect() + self.assertEqual(result_ny, result_arrow_ny) + + self.assertNotEqual(result_ny, result_la) + + result_la_corrected = [Row(*(r[0:6] + (r[6] - timedelta(hours=3),))) for r in result_la] + self.assertEqual(result_ny, result_la_corrected) + finally: + self.spark.conf.set("spark.sql.session.timeZone", orig_tz) + def test_createDataFrame_with_schema(self): pdf = self.create_pandas_data_frame() df = self.spark.createDataFrame(pdf, schema=self.schema) From e919ed55758f75733d56287d5a49326b1067a44c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 13 Nov 2017 17:32:27 +0900 Subject: [PATCH 17/25] Clarify the usage of Row. --- python/pyspark/sql/tests.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 31adc432eaae8..75d1f2a359e0c 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3265,7 +3265,9 @@ def test_createDataFrame_respect_session_timezone(self): self.assertNotEqual(result_ny, result_la) - result_la_corrected = [Row(*(r[0:6] + (r[6] - timedelta(hours=3),))) for r in result_la] + result_la_corrected = [Row(**{k: v - timedelta(hours=3) if k == '7_timestamp_t' else v + for k, v in row.asDict().items()}) + for row in result_la] self.assertEqual(result_ny, result_la_corrected) finally: self.spark.conf.set("spark.sql.session.timeZone", orig_tz) From 9cfdde2ce07d00779a9f6f8f5ab86cc442b7655b Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 21 Nov 2017 12:59:14 +0900 Subject: [PATCH 18/25] Add TODOs for nested timestamp fields. --- python/pyspark/sql/dataframe.py | 1 + python/pyspark/sql/session.py | 1 + 2 files changed, 2 insertions(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 5726a6a4fdfa0..a246fe1c3053f 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1926,6 +1926,7 @@ def toPandas(self): else: from pyspark.sql.types import _check_series_convert_timestamps_local_tz for field in self.schema: + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if isinstance(field.dataType, TimestampType): pdf[field.name] = \ _check_series_convert_timestamps_local_tz(pdf[field.name], timezone) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index b882662bd103e..6f48ac1b6f97b 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -455,6 +455,7 @@ def _convert_from_pandas(self, pdf, schema, timezone): copied = False if isinstance(schema, StructType): for field in schema: + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if isinstance(field.dataType, TimestampType): s = _check_series_convert_timestamps_tz_local(pdf[field.name], timezone) if not copied and s is not pdf[field.name]: From 8b1a4a1458d4c6187587e196ea5f1eab399a530c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 21 Nov 2017 14:34:52 +0900 Subject: [PATCH 19/25] Remove workarounds for old Pandas but add some error messages saying we need 0.19.2 or upper. --- python/pyspark/sql/session.py | 8 +++- python/pyspark/sql/types.py | 77 +++++++++++++---------------------- python/setup.py | 2 +- 3 files changed, 36 insertions(+), 51 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 6f48ac1b6f97b..e1093c8e12511 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -489,8 +489,12 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone): data types will be used to coerce the data in Pandas to Arrow conversion. """ from pyspark.serializers import ArrowSerializer, _create_batch - from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType - from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype + from pyspark.sql.types import from_arrow_schema, to_arrow_type, \ + _old_pandas_exception_message, TimestampType + try: + from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype + except ImportError as e: + raise ImportError(_old_pandas_exception_message(e)) # Determine arrow types to coerce data when creating batches if isinstance(schema, StructType): diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 3cc3417766bde..f9fe81b2c17b7 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1678,6 +1678,13 @@ def from_arrow_schema(arrow_schema): for field in arrow_schema]) +def _old_pandas_exception_message(e): + """ Create an error message for importing old Pandas. + """ + msg = "note: Pandas (>=0.19.2) must be installed and available on calling Python process" + return "%s\n%s" % (e.message, msg) + + def _check_dataframe_localize_timestamps(pdf, timezone): """ Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone @@ -1686,7 +1693,10 @@ def _check_dataframe_localize_timestamps(pdf, timezone): :param timezone: the timezone to convert. if None then use local timezone :return pandas.DataFrame where any timezone aware columns have been converted to tz-naive """ - from pandas.api.types import is_datetime64tz_dtype + try: + from pandas.api.types import is_datetime64tz_dtype + except ImportError as e: + raise ImportError(_old_pandas_exception_message(e)) tz = timezone or 'tzlocal()' for column, series in pdf.iteritems(): # TODO: handle nested timestamps, such as ArrayType(TimestampType())? @@ -1704,7 +1714,10 @@ def _check_series_convert_timestamps_internal(s, timezone): :param timezone: the timezone to convert. if None then use local timezone :return pandas.Series where if it is a timestamp, has been UTC normalized without a time zone """ - from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype + try: + from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype + except ImportError as e: + raise ImportError(_old_pandas_exception_message(e)) # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64_dtype(s.dtype): tz = timezone or 'tzlocal()' @@ -1724,54 +1737,22 @@ def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone): :param toTimezone: the timezone to convert to. if None then use local timezone :return pandas.Series where if it is a timestamp, has been converted to tz-naive """ - import pandas as pd try: + import pandas as pd from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype - fromTz = fromTimezone or 'tzlocal()' - toTz = toTimezone or 'tzlocal()' - # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if is_datetime64tz_dtype(s.dtype): - return s.dt.tz_convert(toTz).dt.tz_localize(None) - elif is_datetime64_dtype(s.dtype) and fromTz != toTz: - # `s.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT. - return s.apply(lambda ts: ts.tz_localize(fromTz).tz_convert(toTz).tz_localize(None) - if ts is not pd.NaT else pd.NaT) - else: - return s - except ImportError: - try: - # Pandas <0.19 - from pandas.core.common import is_datetime64tz_dtype, is_datetime64_dtype - from pandas.tslib import _dateutil_tzlocal - tzlocal = _dateutil_tzlocal() - fromTz = fromTimezone or tzlocal - toTz = toTimezone or tzlocal - # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if is_datetime64tz_dtype(s.dtype): - # `s.dt.tz_convert(tzlocal).dt.tz_localize(None)` doesn't work properly. - return pd.Series([ts.tz_convert(toTz).tz_localize(None) - if ts is not pd.NaT else pd.NaT for ts in s]) - elif is_datetime64_dtype(s.dtype) and fromTz != toTz: - # `s.dt.tz_localize(tzlocal)` doesn't work properly. - return pd.Series([ts.tz_localize(fromTz).tz_convert(toTz).tz_localize(None) - if ts is not pd.NaT else pd.NaT for ts in s]) - else: - return s - except ImportError: - # Pandas <0.17 - # can't handle datetime64tz - from pandas.core.common import is_datetime64_dtype - from pandas.tslib import _dateutil_tzlocal - tzlocal = _dateutil_tzlocal() - fromTz = fromTimezone or tzlocal - toTz = toTimezone or tzlocal - # TODO: handle nested timestamps, such as ArrayType(TimestampType())? - if is_datetime64_dtype(s.dtype) and fromTz != toTz: - # `s.dt.tz_localize(tzlocal)` doesn't work properly. - return pd.Series([ts.tz_localize(fromTz).tz_convert(toTz).tz_localize(None) - if ts is not pd.NaT else pd.NaT for ts in s]) - else: - return s + except ImportError as e: + raise ImportError(_old_pandas_exception_message(e)) + fromTz = fromTimezone or 'tzlocal()' + toTz = toTimezone or 'tzlocal()' + # TODO: handle nested timestamps, such as ArrayType(TimestampType())? + if is_datetime64tz_dtype(s.dtype): + return s.dt.tz_convert(toTz).dt.tz_localize(None) + elif is_datetime64_dtype(s.dtype) and fromTz != toTz: + # `s.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT. + return s.apply(lambda ts: ts.tz_localize(fromTz).tz_convert(toTz).tz_localize(None) + if ts is not pd.NaT else pd.NaT) + else: + return s def _check_series_convert_timestamps_local_tz(s, timezone): diff --git a/python/setup.py b/python/setup.py index 02612ff8a7247..310670e697a83 100644 --- a/python/setup.py +++ b/python/setup.py @@ -201,7 +201,7 @@ def _supports_symlinks(): extras_require={ 'ml': ['numpy>=1.7'], 'mllib': ['numpy>=1.7'], - 'sql': ['pandas>=0.13.0'] + 'sql': ['pandas>=0.19.2'] }, classifiers=[ 'Development Status :: 5 - Production/Stable', From 3db2bea22c6bd240c871cb4580fbbf0ee3b659b4 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 21 Nov 2017 15:09:49 +0900 Subject: [PATCH 20/25] Fix tests. --- python/pyspark/sql/tests.py | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 016556c946e4c..e06900a403901 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -49,9 +49,14 @@ import unittest _have_pandas = False +_have_old_pandas = False try: import pandas - _have_pandas = True + try: + import pandas.api + _have_pandas = True + except: + _have_old_pandas = True except: # No Pandas, but that's okay, we'll skip those tests pass @@ -2565,8 +2570,7 @@ def count_bucketed_cols(names, table="pyspark_bucket"): .mode("overwrite").saveAsTable("pyspark_bucket")) self.assertSetEqual(set(data), set(self.spark.table("pyspark_bucket").collect())) - @unittest.skipIf(not _have_pandas, "Pandas not installed") - def test_to_pandas(self): + def _to_pandas(self): from datetime import datetime, date import numpy as np schema = StructType().add("a", IntegerType()).add("b", StringType())\ @@ -2579,7 +2583,12 @@ def test_to_pandas(self): (4, "bar", False, 6.0, date(2100, 4, 4), datetime(2100, 4, 4, 4, 4, 4)), ] df = self.spark.createDataFrame(data, schema) - types = df.toPandas().dtypes + return df.toPandas() + + @unittest.skipIf(not _have_pandas, "Pandas not installed") + def test_to_pandas(self): + pdf = self._to_pandas() + types = pdf.dtypes self.assertEquals(types[0], np.int32) self.assertEquals(types[1], np.object) self.assertEquals(types[2], np.bool) @@ -2587,6 +2596,12 @@ def test_to_pandas(self): self.assertEquals(types[4], 'datetime64[ns]') self.assertEquals(types[5], 'datetime64[ns]') + @unittest.skipIf(not _have_old_pandas, "Old Pandas not installed") + def test_to_pandas_old(self): + with QuietTest(self.sc): + with self.assertRaisesRegexp(ImportError, 'Pandas \(.*\) must be installed'): + self._to_pandas() + @unittest.skipIf(not _have_pandas, "Pandas not installed") def test_to_pandas_avoid_astype(self): import numpy as np @@ -2620,6 +2635,16 @@ def test_create_dataframe_from_pandas_with_timestamp(self): self.assertTrue(isinstance(df.schema['ts'].dataType, TimestampType)) self.assertTrue(isinstance(df.schema['d'].dataType, DateType)) + @unittest.skipIf(not _have_old_pandas, "Old Pandas not installed") + def test_create_dataframe_from_old_pandas(self): + import pandas as pd + from datetime import datetime + pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)], + "d": [pd.Timestamp.now().date()]}) + with QuietTest(self.sc): + with self.assertRaisesRegexp(ImportError, 'Pandas \(.*\) must be installed'): + self.spark.createDataFrame(pdf) + class HiveSparkSubmitTests(SparkSubmitTests): @@ -3109,7 +3134,7 @@ def __init__(self, **kwargs): _make_type_verifier(data_type, nullable=False)(obj) -@unittest.skipIf(not _have_arrow, "Arrow not installed") +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") class ArrowTests(ReusedSQLTestCase): @classmethod From 3e23653572b9d555f5479cdd58ac3e15c7f88c28 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 21 Nov 2017 17:21:46 +0900 Subject: [PATCH 21/25] Use `_exception_message()` to access error messages. --- python/pyspark/sql/dataframe.py | 3 ++- python/pyspark/sql/types.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a246fe1c3053f..9864dc98c1f33 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -39,6 +39,7 @@ from pyspark.sql.streaming import DataStreamWriter from pyspark.sql.types import IntegralType from pyspark.sql.types import * +from pyspark.util import _exception_message __all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"] @@ -1902,7 +1903,7 @@ def toPandas(self): except ImportError as e: msg = "note: pyarrow must be installed and available on calling Python process " \ "if using spark.sql.execution.arrow.enabled=true" - raise ImportError("%s\n%s" % (e.message, msg)) + raise ImportError("%s\n%s" % (_exception_message(e), msg)) else: pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index f9fe81b2c17b7..40f82832950ef 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -35,6 +35,7 @@ from pyspark import SparkContext from pyspark.serializers import CloudPickleSerializer +from pyspark.util import _exception_message __all__ = [ "DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType", @@ -1682,7 +1683,7 @@ def _old_pandas_exception_message(e): """ Create an error message for importing old Pandas. """ msg = "note: Pandas (>=0.19.2) must be installed and available on calling Python process" - return "%s\n%s" % (e.message, msg) + return "%s\n%s" % (_exception_message(e), msg) def _check_dataframe_localize_timestamps(pdf, timezone): From d7411717c7c8c7c7aba63e4e19ebdfadfa7ea0c0 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 21 Nov 2017 23:11:38 +0900 Subject: [PATCH 22/25] Fix a test. --- python/pyspark/sql/tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index e06900a403901..fc6e575c08035 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2587,6 +2587,7 @@ def _to_pandas(self): @unittest.skipIf(not _have_pandas, "Pandas not installed") def test_to_pandas(self): + import numpy as np pdf = self._to_pandas() types = pdf.dtypes self.assertEquals(types[0], np.int32) From e2406312c54fd793fe636b1cf835c5ab45ac75f6 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 27 Nov 2017 13:19:25 +0900 Subject: [PATCH 23/25] Add a description about deprecation of the config. --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 713ce1e33c0eb..3cd2d225a6c0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1001,7 +1001,8 @@ object SQLConf { buildConf("spark.sql.execution.pandas.respectSessionTimeZone") .internal() .doc("When true, make Pandas DataFrame with timestamp type respecting session local " + - "timezone when converting to/from Pandas DataFrame.") + "timezone when converting to/from Pandas DataFrame. This configuration will be " + + "deprecated in the future releases.") .booleanConf .createWithDefault(true) From f92eae35767a766ad80ac576a67f521e365549c7 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 27 Nov 2017 14:07:49 +0900 Subject: [PATCH 24/25] Add migration guide. --- docs/sql-programming-guide.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 686fcb159d09d..50d1dd5a6136a 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1577,6 +1577,8 @@ options. - Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`. - The `percentile_approx` function previously accepted numeric type input and output double type results. Now it supports date type, timestamp type and numeric types as input types. The result type is also changed to be the same as the input type, which is more reasonable for percentiles. + - In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas related functionalities, such as `toPandas`, `createDataFrame` from Pandas DataFrame, etc. + - In PySpark, the behavior of timestamp values for Pandas related functionalities was changed to respect session timezone. If you want to use the old behavior, you need to set a configuration `spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See [SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details. ## Upgrading From Spark SQL 2.1 to 2.2 From 9200f38b6414255a5c60127aeeae517086ba108b Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 28 Nov 2017 12:11:27 +0900 Subject: [PATCH 25/25] Address comments. --- python/pyspark/sql/session.py | 4 ++++ python/pyspark/sql/tests.py | 2 ++ python/pyspark/sql/types.py | 16 ++++++++-------- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index e1093c8e12511..e2435e09af23d 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -459,6 +459,8 @@ def _convert_from_pandas(self, pdf, schema, timezone): if isinstance(field.dataType, TimestampType): s = _check_series_convert_timestamps_tz_local(pdf[field.name], timezone) if not copied and s is not pdf[field.name]: + # Copy once if the series is modified to prevent the original Pandas + # DataFrame from being updated pdf = pdf.copy() copied = True pdf[field.name] = s @@ -466,6 +468,8 @@ def _convert_from_pandas(self, pdf, schema, timezone): for column, series in pdf.iteritems(): s = _check_series_convert_timestamps_tz_local(pdf[column], timezone) if not copied and s is not pdf[column]: + # Copy once if the series is modified to prevent the original Pandas + # DataFrame from being updated pdf = pdf.copy() copied = True pdf[column] = s diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index fc6e575c08035..b4d32d8de8a22 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3291,6 +3291,7 @@ def test_createDataFrame_respect_session_timezone(self): self.assertNotEqual(result_ny, result_la) + # Correct result_la by adjusting 3 hours difference between Los Angeles and New York result_la_corrected = [Row(**{k: v - timedelta(hours=3) if k == '7_timestamp_t' else v for k, v in row.asDict().items()}) for row in result_la] @@ -3834,6 +3835,7 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): df_la = df.withColumn("tscopy", f_timestamp_copy(col("timestamp"))) \ .withColumn("internal_value", internal_value(col("timestamp"))) result_la = df_la.select(col("idx"), col("internal_value")).collect() + # Correct result_la by adjusting 3 hours difference between Los Angeles and New York diff = 3 * 60 * 60 * 1000 * 1000 * 1000 result_la_corrected = \ df_la.select(col("idx"), col("tscopy"), col("internal_value") + diff).collect() diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 40f82832950ef..78abc32a35a1c 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1729,13 +1729,13 @@ def _check_series_convert_timestamps_internal(s, timezone): return s -def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone): +def _check_series_convert_timestamps_localize(s, from_timezone, to_timezone): """ Convert timestamp to timezone-naive in the specified timezone or local timezone :param s: a pandas.Series - :param fromTimezone: the timezone to convert from. if None then use local timezone - :param toTimezone: the timezone to convert to. if None then use local timezone + :param from_timezone: the timezone to convert from. if None then use local timezone + :param to_timezone: the timezone to convert to. if None then use local timezone :return pandas.Series where if it is a timestamp, has been converted to tz-naive """ try: @@ -1743,14 +1743,14 @@ def _check_series_convert_timestamps_localize(s, fromTimezone, toTimezone): from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype except ImportError as e: raise ImportError(_old_pandas_exception_message(e)) - fromTz = fromTimezone or 'tzlocal()' - toTz = toTimezone or 'tzlocal()' + from_tz = from_timezone or 'tzlocal()' + to_tz = to_timezone or 'tzlocal()' # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64tz_dtype(s.dtype): - return s.dt.tz_convert(toTz).dt.tz_localize(None) - elif is_datetime64_dtype(s.dtype) and fromTz != toTz: + return s.dt.tz_convert(to_tz).dt.tz_localize(None) + elif is_datetime64_dtype(s.dtype) and from_tz != to_tz: # `s.dt.tz_localize('tzlocal()')` doesn't work properly when including NaT. - return s.apply(lambda ts: ts.tz_localize(fromTz).tz_convert(toTz).tz_localize(None) + return s.apply(lambda ts: ts.tz_localize(from_tz).tz_convert(to_tz).tz_localize(None) if ts is not pd.NaT else pd.NaT) else: return s