From e26092033551b0939cf925fecd38f13da49b870b Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 12 Nov 2019 23:08:58 -0800 Subject: [PATCH 01/14] Remove Row field sorting and add LegacyRow with env var --- python/pyspark/sql/types.py | 71 +++++++++++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 14 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 86447a346ad18..3df0e5e11e44f 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -15,6 +15,7 @@ # limitations under the License. # +import os import sys import decimal import time @@ -611,7 +612,7 @@ def toInternal(self, obj): else: if isinstance(obj, dict): return tuple(obj.get(n) for n in self.names) - elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False): + elif isinstance(obj, LegacyRow) and getattr(obj, "__from_dict__", False): return tuple(obj[n] for n in self.names) elif isinstance(obj, (list, tuple)): return tuple(obj) @@ -1376,7 +1377,7 @@ def verify_struct(obj): if isinstance(obj, dict): for f, verifier in verifiers: verifier(obj.get(f)) - elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False): + elif isinstance(obj, LegacyRow) and getattr(obj, "__from_dict__", False): # the order in obj could be different than dataType.fields for f, verifier in verifiers: verifier(obj[f]) @@ -1421,6 +1422,9 @@ def _create_row(fields, values): return row +_legacy_row_enabled = os.environ.get('PYSPARK_LEGACY_ROW_ENABLED', 'false').lower() == 'true' + + class Row(tuple): """ @@ -1432,10 +1436,12 @@ class Row(tuple): ``key in row`` will search through row keys. - Row can be used to create a row object by using named arguments, - the fields will be sorted by names. It is not allowed to omit - a named argument to represent the value is None or missing. This should be - explicitly set to None in this case. + Row can be used to create a row object by using named arguments. + It is not allowed to omit a named argument to represent the value is + None or missing. This should be explicitly set to None in this case. + + NOTE: For Python version < 3.6, named arguments can not be used due + to >>> row = Row(name="Alice", age=11) >>> row @@ -1474,21 +1480,31 @@ class Row(tuple): True """ - def __new__(self, *args, **kwargs): + def __new__(cls, *args, **kwargs): + if _legacy_row_enabled: + return LegacyRow(args, kwargs) if args and kwargs: raise ValueError("Can not use both args " "and kwargs to create Row") + if sys.version_info[:2] < (3, 6): + # Remove after Python < 3.6 dropped + from collections import OrderedDict + if kwargs: + raise ValueError("Named arguments are not allowed for Python version < 3.6, " + "use a collections.OrderedDict instead. To enable Spark 2.x " + "compatible Rows, set the environment variable " + "'PYSPARK_LEGACY_ROW_ENABLED' to 'true'.") + elif len(args) == 1 and isinstance(args[0], OrderedDict): + kwargs = args[0] + if kwargs: # create row objects - names = sorted(kwargs.keys()) - row = tuple.__new__(self, [kwargs[n] for n in names]) - row.__fields__ = names - row.__from_dict__ = True + row = tuple.__new__(cls, list(kwargs.values())) + row.__fields__ = list(kwargs.keys()) return row - else: # create row class or objects - return tuple.__new__(self, args) + return tuple.__new__(cls, args) def asDict(self, recursive=False): """ @@ -1562,7 +1578,7 @@ def __getattr__(self, item): raise AttributeError(item) def __setattr__(self, key, value): - if key != '__fields__' and key != "__from_dict__": + if key != '__fields__': raise Exception("Row is read-only") self.__dict__[key] = value @@ -1582,6 +1598,33 @@ def __repr__(self): return "" % ", ".join("%r" % field for field in self) +class LegacyRow(Row): + """ + + .. note:: Deprecated in 3.0. See SPARK-29748 + """ + + def __new__(cls, *args, **kwargs): + if args and kwargs: + raise ValueError("Can not use both args " + "and kwargs to create Row") + if kwargs: + # create row objects + names = sorted(kwargs.keys()) + row = tuple.__new__(cls, [kwargs[n] for n in names]) + row.__fields__ = names + row.__from_dict__ = True + return row + else: + # create row class or objects + return tuple.__new__(cls, args) + + def __setattr__(self, key, value): + if key != '__fields__' and key != "__from_dict__": + raise Exception("Row is read-only") + self.__dict__[key] = value + + class DateConverter(object): def can_convert(self, obj): return isinstance(obj, datetime.date) From 43ff88d1c0b6c3a5661cc1b553e19af1d25d061f Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 12 Nov 2019 23:09:10 -0800 Subject: [PATCH 02/14] Start fixing tests --- python/pyspark/sql/avro/functions.py | 8 ++++---- python/pyspark/sql/column.py | 4 ++-- python/pyspark/sql/tests/test_types.py | 10 ++++++---- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/avro/functions.py b/python/pyspark/sql/avro/functions.py index ed62a72d6c8fb..31200d5d3e4b8 100644 --- a/python/pyspark/sql/avro/functions.py +++ b/python/pyspark/sql/avro/functions.py @@ -49,13 +49,13 @@ def from_avro(data, jsonFormatSchema, options={}): >>> df = spark.createDataFrame(data, ("key", "value")) >>> avroDf = df.select(to_avro(df.value).alias("avro")) >>> avroDf.collect() - [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))] + [Row(avro=bytearray(b'\\x00\\x00\\nAlice\\x00\\x04'))] >>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields": ... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord", - ... "fields":[{"name":"age","type":["long","null"]}, - ... {"name":"name","type":["string","null"]}]},"null"]}]}''' + ... "fields":[{"name":"name","type":["string","null"]}, + ... {"name":"age","type":["long","null"]}]},"null"]}]}''' >>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect() - [Row(value=Row(avro=Row(age=2, name=u'Alice')))] + [Row(value=Row(avro=Row(name=u'Alice', age=2)))] """ sc = SparkContext._active_spark_context diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index b472a4221cd0c..cdf9f87624291 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -518,7 +518,7 @@ def isin(self, *cols): >>> from pyspark.sql import Row >>> df = spark.createDataFrame([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)]) >>> df.filter(df.height.isNull()).collect() - [Row(height=None, name=u'Alice')] + [Row(name=u'Alice', height=None)] """ _isNotNull_doc = """ True if the current expression is NOT null. @@ -526,7 +526,7 @@ def isin(self, *cols): >>> from pyspark.sql import Row >>> df = spark.createDataFrame([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)]) >>> df.filter(df.height.isNotNull()).collect() - [Row(height=80, name=u'Tom')] + [Row(name=u'Tom', height=80)] """ isNull = ignore_unicode_prefix(_unary_op("isNull", _isNull_doc)) diff --git a/python/pyspark/sql/tests/test_types.py b/python/pyspark/sql/tests/test_types.py index 244eae1b1ce16..73d17d21d2d0b 100644 --- a/python/pyspark/sql/tests/test_types.py +++ b/python/pyspark/sql/tests/test_types.py @@ -64,10 +64,10 @@ def test_apply_schema_to_dict_and_rows(self): df2 = self.spark.createDataFrame(rdd, schema, verifySchema=verify) self.assertEqual(df.schema, df2.schema) - rdd = self.sc.parallelize(range(10)).map(lambda x: Row(a=x, b=None)) + rdd = self.sc.parallelize(range(10)).map(lambda x: Row(b=None, a=x)) df3 = self.spark.createDataFrame(rdd, schema, verifySchema=verify) self.assertEqual(10, df3.count()) - input = [Row(a=x, b=str(x)) for x in range(10)] + input = [Row(b=str(x), a=x) for x in range(10)] df4 = self.spark.createDataFrame(input, schema, verifySchema=verify) self.assertEqual(10, df4.count()) @@ -883,7 +883,8 @@ def __init__(self, **kwargs): ({"s": "a", "f": 1.0}, schema), (Row(s="a", i=1), schema), (Row(s="a", i=None), schema), - (Row(s="a", i=1, f=1.0), schema), + (Row(s="a", i=1, f=1.0).asDict(), schema), + (Row(i=1, f=1.0, s="a").asDict(), schema), (["a", 1], schema), (["a", None], schema), (("a", 1), schema), @@ -947,7 +948,8 @@ def __init__(self, **kwargs): # Struct ({"s": "a", "i": "1"}, schema, TypeError), - (Row(s="a"), schema, ValueError), # Row can't have missing field + (Row(s="a"), schema, ValueError), # Row can't have missing field + (Row(s="a", i=1, f=1.0), schema, ValueError), # Row can't have additional fields (Row(s="a", i="1"), schema, TypeError), (["a"], schema, ValueError), (["a", "1"], schema, TypeError), From c4f5bc8b7cc5aebd13de6f4d434fd5b9bf5b1888 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 13 Nov 2019 23:05:02 -0800 Subject: [PATCH 03/14] Fix dataframe doctests --- python/pyspark/sql/dataframe.py | 144 ++++++++++++++++---------------- 1 file changed, 72 insertions(+), 72 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 2fa90d67880c3..dbee80cd7d910 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1601,19 +1601,19 @@ def dropDuplicates(self, subset=None): ... Row(name='Alice', age=5, height=80), \\ ... Row(name='Alice', age=10, height=80)]).toDF() >>> df.dropDuplicates().show() - +---+------+-----+ - |age|height| name| - +---+------+-----+ - | 5| 80|Alice| - | 10| 80|Alice| - +---+------+-----+ + +-----+---+------+ + | name|age|height| + +-----+---+------+ + |Alice| 5| 80| + |Alice| 10| 80| + +-----+---+------+ >>> df.dropDuplicates(['name', 'height']).show() - +---+------+-----+ - |age|height| name| - +---+------+-----+ - | 5| 80|Alice| - +---+------+-----+ + +-----+---+------+ + | name|age|height| + +-----+---+------+ + |Alice| 5| 80| + +-----+---+------+ """ if subset is None: jdf = self._jdf.dropDuplicates() @@ -1635,11 +1635,11 @@ def dropna(self, how='any', thresh=None, subset=None): :param subset: optional list of column names to consider. >>> df4.na.drop().show() - +---+------+-----+ - |age|height| name| - +---+------+-----+ - | 10| 80|Alice| - +---+------+-----+ + +-----+---+------+ + | name|age|height| + +-----+---+------+ + |Alice| 10| 80| + +-----+---+------+ """ if how is not None and how not in ['any', 'all']: raise ValueError("how ('" + how + "') should be 'any' or 'all'") @@ -1672,33 +1672,33 @@ def fillna(self, value, subset=None): then the non-string column is simply ignored. >>> df4.na.fill(50).show() - +---+------+-----+ - |age|height| name| - +---+------+-----+ - | 10| 80|Alice| - | 5| 50| Bob| - | 50| 50| Tom| - | 50| 50| null| - +---+------+-----+ + +-----+---+------+ + | name|age|height| + +-----+---+------+ + |Alice| 10| 80| + | Bob| 5| 50| + | Tom| 50| 50| + | null| 50| 50| + +-----+---+------+ >>> df5.na.fill(False).show() - +----+-------+-----+ - | age| name| spy| - +----+-------+-----+ - | 10| Alice|false| - | 5| Bob|false| - |null|Mallory| true| - +----+-------+-----+ + +-------+-----+----+ + | name| spy| age| + +-------+-----+----+ + | Alice|false| 10| + | Bob|false| 5| + |Mallory| true|null| + +-------+-----+----+ >>> df4.na.fill({'age': 50, 'name': 'unknown'}).show() - +---+------+-------+ - |age|height| name| - +---+------+-------+ - | 10| 80| Alice| - | 5| null| Bob| - | 50| null| Tom| - | 50| null|unknown| - +---+------+-------+ + +-------+---+------+ + | name|age|height| + +-------+---+------+ + | Alice| 10| 80| + | Bob| 5| null| + | Tom| 50| null| + |unknown| 50| null| + +-------+---+------+ """ if not isinstance(value, (float, int, long, basestring, bool, dict)): raise ValueError("value should be a float, int, long, string, bool or dict") @@ -1748,44 +1748,44 @@ def replace(self, to_replace, value=_NoValue, subset=None): then the non-string column is simply ignored. >>> df4.na.replace(10, 20).show() - +----+------+-----+ - | age|height| name| - +----+------+-----+ - | 20| 80|Alice| - | 5| null| Bob| - |null| null| Tom| - |null| null| null| - +----+------+-----+ + +-----+----+------+ + | name| age|height| + +-----+----+------+ + |Alice| 20| 80| + | Bob| 5| null| + | Tom|null| null| + | null|null| null| + +-----+----+------+ >>> df4.na.replace('Alice', None).show() - +----+------+----+ - | age|height|name| - +----+------+----+ - | 10| 80|null| - | 5| null| Bob| - |null| null| Tom| - |null| null|null| - +----+------+----+ + +----+----+------+ + |name| age|height| + +----+----+------+ + |null| 10| 80| + | Bob| 5| null| + | Tom|null| null| + |null|null| null| + +----+----+------+ >>> df4.na.replace({'Alice': None}).show() - +----+------+----+ - | age|height|name| - +----+------+----+ - | 10| 80|null| - | 5| null| Bob| - |null| null| Tom| - |null| null|null| - +----+------+----+ + +----+----+------+ + |name| age|height| + +----+----+------+ + |null| 10| 80| + | Bob| 5| null| + | Tom|null| null| + |null|null| null| + +----+----+------+ >>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show() - +----+------+----+ - | age|height|name| - +----+------+----+ - | 10| 80| A| - | 5| null| B| - |null| null| Tom| - |null| null|null| - +----+------+----+ + +----+----+------+ + |name| age|height| + +----+----+------+ + | A| 10| 80| + | B| 5| null| + | Tom|null| null| + |null|null| null| + +----+----+------+ """ if value is _NoValue: if isinstance(to_replace, dict): @@ -2074,7 +2074,7 @@ def drop(self, *cols): [Row(name=u'Alice'), Row(name=u'Bob')] >>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect() - [Row(age=5, height=85, name=u'Bob')] + [Row(age=5, name=u'Bob', height=85)] >>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect() [Row(age=5, name=u'Bob', height=85)] From dedb258ef4c3552cbee165aa3648e815ba1fb2a8 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 14 Nov 2019 14:59:08 -0800 Subject: [PATCH 04/14] Fix sql.types doctests --- python/pyspark/sql/types.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 3df0e5e11e44f..3ae4ec8513df5 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1441,11 +1441,11 @@ class Row(tuple): None or missing. This should be explicitly set to None in this case. NOTE: For Python version < 3.6, named arguments can not be used due - to + to *** TODO *** >>> row = Row(name="Alice", age=11) >>> row - Row(age=11, name='Alice') + Row(name='Alice', age=11) >>> row['name'], row['age'] ('Alice', 11) >>> row.name, row.age @@ -1469,15 +1469,16 @@ class Row(tuple): Row(name='Alice', age=11) This form can also be used to create rows as tuple values, i.e. with unnamed - fields. Beware that such Row objects have different equality semantics: + fields. Row objects are evaluated for equality by data values in each + position, field names are not compared: >>> row1 = Row("Alice", 11) >>> row2 = Row(name="Alice", age=11) >>> row1 == row2 - False - >>> row3 = Row(a="Alice", b=11) - >>> row1 == row3 True + >>> row3 = Row(age=11, name="Alice") + >>> row2 == row3 + False """ def __new__(cls, *args, **kwargs): @@ -1515,7 +1516,7 @@ def asDict(self, recursive=False): >>> Row(name="Alice", age=11).asDict() == {'name': 'Alice', 'age': 11} True >>> row = Row(key=1, value=Row(name='a', age=2)) - >>> row.asDict() == {'key': 1, 'value': Row(age=2, name='a')} + >>> row.asDict() == {'key': 1, 'value': Row(name='a', age=2)} True >>> row.asDict(True) == {'key': 1, 'value': {'name': 'a', 'age': 2}} True From cfa13648ada3ae4e880dd248d037e5efa26701cd Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 14 Nov 2019 15:13:02 -0800 Subject: [PATCH 05/14] Fix sql.functions doctests --- python/pyspark/sql/functions.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 2cd91ec2b1aef..c77e20bfc52a3 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -589,8 +589,8 @@ def rand(seed=None): .. note:: The function is non-deterministic in general case. >>> df.withColumn('rand', rand(seed=42) * 3).collect() - [Row(age=2, name=u'Alice', rand=2.4052597283576684), - Row(age=5, name=u'Bob', rand=2.3913904055683974)] + [Row(name=u'Alice', age=2, rand=2.4052597283576684), + Row(name=u'Bob', age=5, rand=2.3913904055683974)] """ sc = SparkContext._active_spark_context if seed is not None: @@ -609,8 +609,8 @@ def randn(seed=None): .. note:: The function is non-deterministic in general case. >>> df.withColumn('randn', randn(seed=42)).collect() - [Row(age=2, name=u'Alice', randn=1.1027054481455365), - Row(age=5, name=u'Bob', randn=0.7400395449950132)] + [Row(name=u'Alice', age=2, randn=1.1027054481455365), + Row(name=u'Bob', age=5, randn=0.7400395449950132)] """ sc = SparkContext._active_spark_context if seed is not None: @@ -2379,11 +2379,11 @@ def to_json(col, options={}): >>> data = [(1, Row(name='Alice', age=2))] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() - [Row(json=u'{"age":2,"name":"Alice"}')] + [Row(json=u'{"name":"Alice","age":2}')] >>> data = [(1, [Row(name='Alice', age=2), Row(name='Bob', age=3)])] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() - [Row(json=u'[{"age":2,"name":"Alice"},{"age":3,"name":"Bob"}]')] + [Row(json=u'[{"name":"Alice","age":2},{"name":"Bob","age":3}]')] >>> data = [(1, {"name": "Alice"})] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() @@ -2475,7 +2475,7 @@ def to_csv(col, options={}): >>> data = [(1, Row(name='Alice', age=2))] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_csv(df.value).alias("csv")).collect() - [Row(csv=u'2,Alice')] + [Row(csv=u'Alice,2')] """ sc = SparkContext._active_spark_context From 294f55102fe9815efa87f6c0d6c6168e95783d90 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 14 Nov 2019 15:38:52 -0800 Subject: [PATCH 06/14] Made _LegacyRow private, added pydoc --- python/pyspark/sql/types.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 3ae4ec8513df5..fd3fc02631a7e 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -612,7 +612,7 @@ def toInternal(self, obj): else: if isinstance(obj, dict): return tuple(obj.get(n) for n in self.names) - elif isinstance(obj, LegacyRow) and getattr(obj, "__from_dict__", False): + elif isinstance(obj, _LegacyRow) and getattr(obj, "__from_dict__", False): return tuple(obj[n] for n in self.names) elif isinstance(obj, (list, tuple)): return tuple(obj) @@ -1377,7 +1377,7 @@ def verify_struct(obj): if isinstance(obj, dict): for f, verifier in verifiers: verifier(obj.get(f)) - elif isinstance(obj, LegacyRow) and getattr(obj, "__from_dict__", False): + elif isinstance(obj, _LegacyRow) and getattr(obj, "__from_dict__", False): # the order in obj could be different than dataType.fields for f, verifier in verifiers: verifier(obj[f]) @@ -1440,8 +1440,13 @@ class Row(tuple): It is not allowed to omit a named argument to represent the value is None or missing. This should be explicitly set to None in this case. - NOTE: For Python version < 3.6, named arguments can not be used due - to *** TODO *** + NOTE: For Python versions < 3.6, named arguments can no longer be used + since the order is not guaranteed to be the same as entered, see + https://www.python.org/dev/peps/pep-0468. Instead, an OrderedDict can + be passed as the first argument. To create a Row that is compatible + with Spark 2.x (with fields sorted alphabetically) set the environment + variable "PYSPARK_LEGACY_ROW_ENABLED" to "true". This option is + deprecated and will be removed in future versions of Spark. >>> row = Row(name="Alice", age=11) >>> row @@ -1483,7 +1488,7 @@ class Row(tuple): def __new__(cls, *args, **kwargs): if _legacy_row_enabled: - return LegacyRow(args, kwargs) + return _LegacyRow(args, kwargs) if args and kwargs: raise ValueError("Can not use both args " "and kwargs to create Row") @@ -1599,10 +1604,13 @@ def __repr__(self): return "" % ", ".join("%r" % field for field in self) -class LegacyRow(Row): +class _LegacyRow(Row): """ + Creates a Row that is compatible with with Spark 2.x, such that when + made with kwargs, the fields are sorted alphabetically. - .. note:: Deprecated in 3.0. See SPARK-29748 + .. note:: Deprecated in 3.0.0, will be removed in future versions of Spark. + See SPARK-29748 """ def __new__(cls, *args, **kwargs): From 30ec57b5ab0ade7d9b589559a7440a9d29e973c4 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 3 Dec 2019 12:01:00 -0800 Subject: [PATCH 07/14] Env var now controls sorting only, removed LegacyRow and OrderedDict as arg --- python/pyspark/sql/types.py | 95 +++++++++++++++---------------------- 1 file changed, 39 insertions(+), 56 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index fd3fc02631a7e..82c393b0a232a 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -26,6 +26,7 @@ import base64 from array import array import ctypes +import warnings if sys.version >= "3": long = int @@ -612,7 +613,7 @@ def toInternal(self, obj): else: if isinstance(obj, dict): return tuple(obj.get(n) for n in self.names) - elif isinstance(obj, _LegacyRow) and getattr(obj, "__from_dict__", False): + elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False): return tuple(obj[n] for n in self.names) elif isinstance(obj, (list, tuple)): return tuple(obj) @@ -1377,7 +1378,7 @@ def verify_struct(obj): if isinstance(obj, dict): for f, verifier in verifiers: verifier(obj.get(f)) - elif isinstance(obj, _LegacyRow) and getattr(obj, "__from_dict__", False): + elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False): # the order in obj could be different than dataType.fields for f, verifier in verifiers: verifier(obj[f]) @@ -1422,7 +1423,14 @@ def _create_row(fields, values): return row -_legacy_row_enabled = os.environ.get('PYSPARK_LEGACY_ROW_ENABLED', 'false').lower() == 'true' +# Remove after Python < 3.6 dropped, see SPARK-29748 +_row_field_sorting_enabled = \ + os.environ.get('PYSPARK_ROW_FIELD_SORTING_ENABLED', 'false').lower() == 'true' + + +if _row_field_sorting_enabled: + warnings.warn("The environment variable 'PYSPARK_ROW_FIELD_SORTING_ENABLED' " + "is deprecated and will be removed in future versions of Spark") class Row(tuple): @@ -1440,13 +1448,16 @@ class Row(tuple): It is not allowed to omit a named argument to represent the value is None or missing. This should be explicitly set to None in this case. - NOTE: For Python versions < 3.6, named arguments can no longer be used - since the order is not guaranteed to be the same as entered, see - https://www.python.org/dev/peps/pep-0468. Instead, an OrderedDict can - be passed as the first argument. To create a Row that is compatible - with Spark 2.x (with fields sorted alphabetically) set the environment - variable "PYSPARK_LEGACY_ROW_ENABLED" to "true". This option is - deprecated and will be removed in future versions of Spark. + NOTE: As of Spark 3.0.0, the Row field names are no longer sorted + alphabetically. To enable field sorting to create Rows compatible with + Spark 2.x, set the environment variable "PYSPARK_ROW_FIELD_SORTING_ENABLED" + to "true". This option is deprecated and will be removed in future versions + of Spark. For Python versions < 3.6, named arguments can no longer be used + without enabling field sorting with the environment variable above because + order or the arguments is not guaranteed to be the same as entered, see + https://www.python.org/dev/peps/pep-0468. If this is detected, a warning + will be issued and the Row will fallback to sort the field names + automatically. >>> row = Row(name="Alice", age=11) >>> row @@ -1487,26 +1498,28 @@ class Row(tuple): """ def __new__(cls, *args, **kwargs): - if _legacy_row_enabled: - return _LegacyRow(args, kwargs) if args and kwargs: raise ValueError("Can not use both args " "and kwargs to create Row") - if sys.version_info[:2] < (3, 6): - # Remove after Python < 3.6 dropped - from collections import OrderedDict - if kwargs: - raise ValueError("Named arguments are not allowed for Python version < 3.6, " - "use a collections.OrderedDict instead. To enable Spark 2.x " - "compatible Rows, set the environment variable " - "'PYSPARK_LEGACY_ROW_ENABLED' to 'true'.") - elif len(args) == 1 and isinstance(args[0], OrderedDict): - kwargs = args[0] - if kwargs: + field_sorting_enabled = _row_field_sorting_enabled + if not field_sorting_enabled and sys.version_info[:2] < (3, 6): + warnings.warn("To use named arguments for Python version < 3.6, Row " + "field sorting must be enabled by setting the environment " + "variable 'PYSPARK_ROW_FIELD_SORTING_ENABLED' to 'true'.") + field_sorting_enabled = True + # create row objects - row = tuple.__new__(cls, list(kwargs.values())) - row.__fields__ = list(kwargs.keys()) + if field_sorting_enabled: + # Remove after Python < 3.6 dropped, see SPARK-29748 + names = sorted(kwargs.keys()) + row = tuple.__new__(cls, [kwargs[n] for n in names]) + row.__fields__ = names + row.__from_dict__ = True + else: + row = tuple.__new__(cls, list(kwargs.values())) + row.__fields__ = list(kwargs.keys()) + return row else: # create row class or objects @@ -1584,7 +1597,7 @@ def __getattr__(self, item): raise AttributeError(item) def __setattr__(self, key, value): - if key != '__fields__': + if key != '__fields__' and key != "__from_dict__": raise Exception("Row is read-only") self.__dict__[key] = value @@ -1604,36 +1617,6 @@ def __repr__(self): return "" % ", ".join("%r" % field for field in self) -class _LegacyRow(Row): - """ - Creates a Row that is compatible with with Spark 2.x, such that when - made with kwargs, the fields are sorted alphabetically. - - .. note:: Deprecated in 3.0.0, will be removed in future versions of Spark. - See SPARK-29748 - """ - - def __new__(cls, *args, **kwargs): - if args and kwargs: - raise ValueError("Can not use both args " - "and kwargs to create Row") - if kwargs: - # create row objects - names = sorted(kwargs.keys()) - row = tuple.__new__(cls, [kwargs[n] for n in names]) - row.__fields__ = names - row.__from_dict__ = True - return row - else: - # create row class or objects - return tuple.__new__(cls, args) - - def __setattr__(self, key, value): - if key != '__fields__' and key != "__from_dict__": - raise Exception("Row is read-only") - self.__dict__[key] = value - - class DateConverter(object): def can_convert(self, obj): return isinstance(obj, datetime.date) From de18dce934f6e9e448f1ec5933d3539ec388920b Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 3 Dec 2019 12:05:24 -0800 Subject: [PATCH 08/14] Revert all test fixes --- python/pyspark/sql/avro/functions.py | 8 +- python/pyspark/sql/column.py | 4 +- python/pyspark/sql/dataframe.py | 144 ++++++++++++------------- python/pyspark/sql/functions.py | 14 +-- python/pyspark/sql/tests/test_types.py | 10 +- python/pyspark/sql/types.py | 13 ++- 6 files changed, 95 insertions(+), 98 deletions(-) diff --git a/python/pyspark/sql/avro/functions.py b/python/pyspark/sql/avro/functions.py index 31200d5d3e4b8..ed62a72d6c8fb 100644 --- a/python/pyspark/sql/avro/functions.py +++ b/python/pyspark/sql/avro/functions.py @@ -49,13 +49,13 @@ def from_avro(data, jsonFormatSchema, options={}): >>> df = spark.createDataFrame(data, ("key", "value")) >>> avroDf = df.select(to_avro(df.value).alias("avro")) >>> avroDf.collect() - [Row(avro=bytearray(b'\\x00\\x00\\nAlice\\x00\\x04'))] + [Row(avro=bytearray(b'\\x00\\x00\\x04\\x00\\nAlice'))] >>> jsonFormatSchema = '''{"type":"record","name":"topLevelRecord","fields": ... [{"name":"avro","type":[{"type":"record","name":"value","namespace":"topLevelRecord", - ... "fields":[{"name":"name","type":["string","null"]}, - ... {"name":"age","type":["long","null"]}]},"null"]}]}''' + ... "fields":[{"name":"age","type":["long","null"]}, + ... {"name":"name","type":["string","null"]}]},"null"]}]}''' >>> avroDf.select(from_avro(avroDf.avro, jsonFormatSchema).alias("value")).collect() - [Row(value=Row(avro=Row(name=u'Alice', age=2)))] + [Row(value=Row(avro=Row(age=2, name=u'Alice')))] """ sc = SparkContext._active_spark_context diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index cdf9f87624291..b472a4221cd0c 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -518,7 +518,7 @@ def isin(self, *cols): >>> from pyspark.sql import Row >>> df = spark.createDataFrame([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)]) >>> df.filter(df.height.isNull()).collect() - [Row(name=u'Alice', height=None)] + [Row(height=None, name=u'Alice')] """ _isNotNull_doc = """ True if the current expression is NOT null. @@ -526,7 +526,7 @@ def isin(self, *cols): >>> from pyspark.sql import Row >>> df = spark.createDataFrame([Row(name=u'Tom', height=80), Row(name=u'Alice', height=None)]) >>> df.filter(df.height.isNotNull()).collect() - [Row(name=u'Tom', height=80)] + [Row(height=80, name=u'Tom')] """ isNull = ignore_unicode_prefix(_unary_op("isNull", _isNull_doc)) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index dbee80cd7d910..2fa90d67880c3 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1601,19 +1601,19 @@ def dropDuplicates(self, subset=None): ... Row(name='Alice', age=5, height=80), \\ ... Row(name='Alice', age=10, height=80)]).toDF() >>> df.dropDuplicates().show() - +-----+---+------+ - | name|age|height| - +-----+---+------+ - |Alice| 5| 80| - |Alice| 10| 80| - +-----+---+------+ + +---+------+-----+ + |age|height| name| + +---+------+-----+ + | 5| 80|Alice| + | 10| 80|Alice| + +---+------+-----+ >>> df.dropDuplicates(['name', 'height']).show() - +-----+---+------+ - | name|age|height| - +-----+---+------+ - |Alice| 5| 80| - +-----+---+------+ + +---+------+-----+ + |age|height| name| + +---+------+-----+ + | 5| 80|Alice| + +---+------+-----+ """ if subset is None: jdf = self._jdf.dropDuplicates() @@ -1635,11 +1635,11 @@ def dropna(self, how='any', thresh=None, subset=None): :param subset: optional list of column names to consider. >>> df4.na.drop().show() - +-----+---+------+ - | name|age|height| - +-----+---+------+ - |Alice| 10| 80| - +-----+---+------+ + +---+------+-----+ + |age|height| name| + +---+------+-----+ + | 10| 80|Alice| + +---+------+-----+ """ if how is not None and how not in ['any', 'all']: raise ValueError("how ('" + how + "') should be 'any' or 'all'") @@ -1672,33 +1672,33 @@ def fillna(self, value, subset=None): then the non-string column is simply ignored. >>> df4.na.fill(50).show() - +-----+---+------+ - | name|age|height| - +-----+---+------+ - |Alice| 10| 80| - | Bob| 5| 50| - | Tom| 50| 50| - | null| 50| 50| - +-----+---+------+ + +---+------+-----+ + |age|height| name| + +---+------+-----+ + | 10| 80|Alice| + | 5| 50| Bob| + | 50| 50| Tom| + | 50| 50| null| + +---+------+-----+ >>> df5.na.fill(False).show() - +-------+-----+----+ - | name| spy| age| - +-------+-----+----+ - | Alice|false| 10| - | Bob|false| 5| - |Mallory| true|null| - +-------+-----+----+ + +----+-------+-----+ + | age| name| spy| + +----+-------+-----+ + | 10| Alice|false| + | 5| Bob|false| + |null|Mallory| true| + +----+-------+-----+ >>> df4.na.fill({'age': 50, 'name': 'unknown'}).show() - +-------+---+------+ - | name|age|height| - +-------+---+------+ - | Alice| 10| 80| - | Bob| 5| null| - | Tom| 50| null| - |unknown| 50| null| - +-------+---+------+ + +---+------+-------+ + |age|height| name| + +---+------+-------+ + | 10| 80| Alice| + | 5| null| Bob| + | 50| null| Tom| + | 50| null|unknown| + +---+------+-------+ """ if not isinstance(value, (float, int, long, basestring, bool, dict)): raise ValueError("value should be a float, int, long, string, bool or dict") @@ -1748,44 +1748,44 @@ def replace(self, to_replace, value=_NoValue, subset=None): then the non-string column is simply ignored. >>> df4.na.replace(10, 20).show() - +-----+----+------+ - | name| age|height| - +-----+----+------+ - |Alice| 20| 80| - | Bob| 5| null| - | Tom|null| null| - | null|null| null| - +-----+----+------+ + +----+------+-----+ + | age|height| name| + +----+------+-----+ + | 20| 80|Alice| + | 5| null| Bob| + |null| null| Tom| + |null| null| null| + +----+------+-----+ >>> df4.na.replace('Alice', None).show() - +----+----+------+ - |name| age|height| - +----+----+------+ - |null| 10| 80| - | Bob| 5| null| - | Tom|null| null| - |null|null| null| - +----+----+------+ + +----+------+----+ + | age|height|name| + +----+------+----+ + | 10| 80|null| + | 5| null| Bob| + |null| null| Tom| + |null| null|null| + +----+------+----+ >>> df4.na.replace({'Alice': None}).show() - +----+----+------+ - |name| age|height| - +----+----+------+ - |null| 10| 80| - | Bob| 5| null| - | Tom|null| null| - |null|null| null| - +----+----+------+ + +----+------+----+ + | age|height|name| + +----+------+----+ + | 10| 80|null| + | 5| null| Bob| + |null| null| Tom| + |null| null|null| + +----+------+----+ >>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show() - +----+----+------+ - |name| age|height| - +----+----+------+ - | A| 10| 80| - | B| 5| null| - | Tom|null| null| - |null|null| null| - +----+----+------+ + +----+------+----+ + | age|height|name| + +----+------+----+ + | 10| 80| A| + | 5| null| B| + |null| null| Tom| + |null| null|null| + +----+------+----+ """ if value is _NoValue: if isinstance(to_replace, dict): @@ -2074,7 +2074,7 @@ def drop(self, *cols): [Row(name=u'Alice'), Row(name=u'Bob')] >>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect() - [Row(age=5, name=u'Bob', height=85)] + [Row(age=5, height=85, name=u'Bob')] >>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect() [Row(age=5, name=u'Bob', height=85)] diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index c77e20bfc52a3..2cd91ec2b1aef 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -589,8 +589,8 @@ def rand(seed=None): .. note:: The function is non-deterministic in general case. >>> df.withColumn('rand', rand(seed=42) * 3).collect() - [Row(name=u'Alice', age=2, rand=2.4052597283576684), - Row(name=u'Bob', age=5, rand=2.3913904055683974)] + [Row(age=2, name=u'Alice', rand=2.4052597283576684), + Row(age=5, name=u'Bob', rand=2.3913904055683974)] """ sc = SparkContext._active_spark_context if seed is not None: @@ -609,8 +609,8 @@ def randn(seed=None): .. note:: The function is non-deterministic in general case. >>> df.withColumn('randn', randn(seed=42)).collect() - [Row(name=u'Alice', age=2, randn=1.1027054481455365), - Row(name=u'Bob', age=5, randn=0.7400395449950132)] + [Row(age=2, name=u'Alice', randn=1.1027054481455365), + Row(age=5, name=u'Bob', randn=0.7400395449950132)] """ sc = SparkContext._active_spark_context if seed is not None: @@ -2379,11 +2379,11 @@ def to_json(col, options={}): >>> data = [(1, Row(name='Alice', age=2))] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() - [Row(json=u'{"name":"Alice","age":2}')] + [Row(json=u'{"age":2,"name":"Alice"}')] >>> data = [(1, [Row(name='Alice', age=2), Row(name='Bob', age=3)])] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() - [Row(json=u'[{"name":"Alice","age":2},{"name":"Bob","age":3}]')] + [Row(json=u'[{"age":2,"name":"Alice"},{"age":3,"name":"Bob"}]')] >>> data = [(1, {"name": "Alice"})] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_json(df.value).alias("json")).collect() @@ -2475,7 +2475,7 @@ def to_csv(col, options={}): >>> data = [(1, Row(name='Alice', age=2))] >>> df = spark.createDataFrame(data, ("key", "value")) >>> df.select(to_csv(df.value).alias("csv")).collect() - [Row(csv=u'Alice,2')] + [Row(csv=u'2,Alice')] """ sc = SparkContext._active_spark_context diff --git a/python/pyspark/sql/tests/test_types.py b/python/pyspark/sql/tests/test_types.py index 73d17d21d2d0b..244eae1b1ce16 100644 --- a/python/pyspark/sql/tests/test_types.py +++ b/python/pyspark/sql/tests/test_types.py @@ -64,10 +64,10 @@ def test_apply_schema_to_dict_and_rows(self): df2 = self.spark.createDataFrame(rdd, schema, verifySchema=verify) self.assertEqual(df.schema, df2.schema) - rdd = self.sc.parallelize(range(10)).map(lambda x: Row(b=None, a=x)) + rdd = self.sc.parallelize(range(10)).map(lambda x: Row(a=x, b=None)) df3 = self.spark.createDataFrame(rdd, schema, verifySchema=verify) self.assertEqual(10, df3.count()) - input = [Row(b=str(x), a=x) for x in range(10)] + input = [Row(a=x, b=str(x)) for x in range(10)] df4 = self.spark.createDataFrame(input, schema, verifySchema=verify) self.assertEqual(10, df4.count()) @@ -883,8 +883,7 @@ def __init__(self, **kwargs): ({"s": "a", "f": 1.0}, schema), (Row(s="a", i=1), schema), (Row(s="a", i=None), schema), - (Row(s="a", i=1, f=1.0).asDict(), schema), - (Row(i=1, f=1.0, s="a").asDict(), schema), + (Row(s="a", i=1, f=1.0), schema), (["a", 1], schema), (["a", None], schema), (("a", 1), schema), @@ -948,8 +947,7 @@ def __init__(self, **kwargs): # Struct ({"s": "a", "i": "1"}, schema, TypeError), - (Row(s="a"), schema, ValueError), # Row can't have missing field - (Row(s="a", i=1, f=1.0), schema, ValueError), # Row can't have additional fields + (Row(s="a"), schema, ValueError), # Row can't have missing field (Row(s="a", i="1"), schema, TypeError), (["a"], schema, ValueError), (["a", "1"], schema, TypeError), diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 82c393b0a232a..a78f5bf5904f8 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1461,7 +1461,7 @@ class Row(tuple): >>> row = Row(name="Alice", age=11) >>> row - Row(name='Alice', age=11) + Row(age=11, name='Alice') >>> row['name'], row['age'] ('Alice', 11) >>> row.name, row.age @@ -1485,16 +1485,15 @@ class Row(tuple): Row(name='Alice', age=11) This form can also be used to create rows as tuple values, i.e. with unnamed - fields. Row objects are evaluated for equality by data values in each - position, field names are not compared: + fields. Beware that such Row objects have different equality semantics: >>> row1 = Row("Alice", 11) >>> row2 = Row(name="Alice", age=11) >>> row1 == row2 - True - >>> row3 = Row(age=11, name="Alice") - >>> row2 == row3 False + >>> row3 = Row(a="Alice", b=11) + >>> row1 == row3 + True """ def __new__(cls, *args, **kwargs): @@ -1534,7 +1533,7 @@ def asDict(self, recursive=False): >>> Row(name="Alice", age=11).asDict() == {'name': 'Alice', 'age': 11} True >>> row = Row(key=1, value=Row(name='a', age=2)) - >>> row.asDict() == {'key': 1, 'value': Row(name='a', age=2)} + >>> row.asDict() == {'key': 1, 'value': Row(age=2, name='a')} True >>> row.asDict(True) == {'key': 1, 'value': {'name': 'a', 'age': 2}} True From 93fdd45bdffcf6d58b6e7b464709ae3d541e20b1 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 3 Dec 2019 12:21:39 -0800 Subject: [PATCH 09/14] Set PYSPARK_ROW_FIELD_SORTING_ENABLED to 'true' for run_tests --- python/run-tests.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/run-tests.py b/python/run-tests.py index 88b148c6587d5..b52eca06a3f6a 100755 --- a/python/run-tests.py +++ b/python/run-tests.py @@ -74,7 +74,8 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): 'SPARK_TESTING': '1', 'SPARK_PREPEND_CLASSES': '1', 'PYSPARK_PYTHON': which(pyspark_python), - 'PYSPARK_DRIVER_PYTHON': which(pyspark_python) + 'PYSPARK_DRIVER_PYTHON': which(pyspark_python), + 'PYSPARK_ROW_FIELD_SORTING_ENABLED': 'true' }) # Create a unique temp directory under 'target/' for each run. The TMPDIR variable is From af6d1d9cf93b0eea6a331badce19730fb4a7cf7e Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 3 Dec 2019 12:39:17 -0800 Subject: [PATCH 10/14] Added test with sorting disabled --- python/pyspark/sql/tests/test_types.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/python/pyspark/sql/tests/test_types.py b/python/pyspark/sql/tests/test_types.py index 244eae1b1ce16..06538db0deb1b 100644 --- a/python/pyspark/sql/tests/test_types.py +++ b/python/pyspark/sql/tests/test_types.py @@ -968,6 +968,19 @@ def __init__(self, **kwargs): with self.assertRaises(exp, msg=msg): _make_type_verifier(data_type, nullable=False)(obj) + def test_Row_without_field_sorting(self): + from pyspark.sql import types + sorting_enabled_tmp = types._row_field_sorting_enabled + types._row_field_sorting_enabled = False + + r = types.Row(b=1, a=2) + TestRow = types.Row("b", "a") + expected = TestRow(1, 2) + + self.assertEqual(r, expected) + self.assertEqual(repr(r), "Row(b=1, a=2)") + types._row_field_sorting_enabled = sorting_enabled_tmp + if __name__ == "__main__": from pyspark.sql.tests.test_types import * From 30a3b12e1ed0c0a72fa7b37196d976418b34c319 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 3 Dec 2019 14:29:27 -0800 Subject: [PATCH 11/14] Move flag to static member of Row, fix test to skip for Python < 3.6 --- python/pyspark/sql/tests/test_types.py | 12 ++++++------ python/pyspark/sql/types.py | 25 +++++++++++-------------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/python/pyspark/sql/tests/test_types.py b/python/pyspark/sql/tests/test_types.py index 06538db0deb1b..7ec3f5ae6019c 100644 --- a/python/pyspark/sql/tests/test_types.py +++ b/python/pyspark/sql/tests/test_types.py @@ -968,18 +968,18 @@ def __init__(self, **kwargs): with self.assertRaises(exp, msg=msg): _make_type_verifier(data_type, nullable=False)(obj) + @unittest.skipIf(sys.version_info[:2] < (3, 6), "Create Row without sorting fields") def test_Row_without_field_sorting(self): - from pyspark.sql import types - sorting_enabled_tmp = types._row_field_sorting_enabled - types._row_field_sorting_enabled = False + sorting_enabled_tmp = Row._row_field_sorting_enabled + Row._row_field_sorting_enabled = False - r = types.Row(b=1, a=2) - TestRow = types.Row("b", "a") + r = Row(b=1, a=2) + TestRow = Row("b", "a") expected = TestRow(1, 2) self.assertEqual(r, expected) self.assertEqual(repr(r), "Row(b=1, a=2)") - types._row_field_sorting_enabled = sorting_enabled_tmp + Row._row_field_sorting_enabled = sorting_enabled_tmp if __name__ == "__main__": diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index a78f5bf5904f8..4e0fbae9f0f16 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1423,16 +1423,6 @@ def _create_row(fields, values): return row -# Remove after Python < 3.6 dropped, see SPARK-29748 -_row_field_sorting_enabled = \ - os.environ.get('PYSPARK_ROW_FIELD_SORTING_ENABLED', 'false').lower() == 'true' - - -if _row_field_sorting_enabled: - warnings.warn("The environment variable 'PYSPARK_ROW_FIELD_SORTING_ENABLED' " - "is deprecated and will be removed in future versions of Spark") - - class Row(tuple): """ @@ -1496,20 +1486,27 @@ class Row(tuple): True """ + # Remove after Python < 3.6 dropped, see SPARK-29748 + _row_field_sorting_enabled = \ + os.environ.get('PYSPARK_ROW_FIELD_SORTING_ENABLED', 'false').lower() == 'true' + + if _row_field_sorting_enabled: + warnings.warn("The environment variable 'PYSPARK_ROW_FIELD_SORTING_ENABLED' " + "is deprecated and will be removed in future versions of Spark") + def __new__(cls, *args, **kwargs): if args and kwargs: raise ValueError("Can not use both args " "and kwargs to create Row") if kwargs: - field_sorting_enabled = _row_field_sorting_enabled - if not field_sorting_enabled and sys.version_info[:2] < (3, 6): + if not Row._row_field_sorting_enabled and sys.version_info[:2] < (3, 6): warnings.warn("To use named arguments for Python version < 3.6, Row " "field sorting must be enabled by setting the environment " "variable 'PYSPARK_ROW_FIELD_SORTING_ENABLED' to 'true'.") - field_sorting_enabled = True + Row._row_field_sorting_enabled = True # create row objects - if field_sorting_enabled: + if Row._row_field_sorting_enabled: # Remove after Python < 3.6 dropped, see SPARK-29748 names = sorted(kwargs.keys()) row = tuple.__new__(cls, [kwargs[n] for n in names]) From 140103ba8f67f6de4dc335a13f66f6d71aa5eb4c Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 3 Dec 2019 17:35:34 -0800 Subject: [PATCH 12/14] Fixed wording and grammar --- python/pyspark/sql/tests/test_types.py | 2 +- python/pyspark/sql/types.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/tests/test_types.py b/python/pyspark/sql/tests/test_types.py index 7ec3f5ae6019c..b41c4381984c1 100644 --- a/python/pyspark/sql/tests/test_types.py +++ b/python/pyspark/sql/tests/test_types.py @@ -969,7 +969,7 @@ def __init__(self, **kwargs): _make_type_verifier(data_type, nullable=False)(obj) @unittest.skipIf(sys.version_info[:2] < (3, 6), "Create Row without sorting fields") - def test_Row_without_field_sorting(self): + def test_row_without_field_sorting(self): sorting_enabled_tmp = Row._row_field_sorting_enabled Row._row_field_sorting_enabled = False diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 4e0fbae9f0f16..406e7f398edc1 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1444,7 +1444,7 @@ class Row(tuple): to "true". This option is deprecated and will be removed in future versions of Spark. For Python versions < 3.6, named arguments can no longer be used without enabling field sorting with the environment variable above because - order or the arguments is not guaranteed to be the same as entered, see + order of the arguments is not guaranteed to be the same as entered, see https://www.python.org/dev/peps/pep-0468. If this is detected, a warning will be issued and the Row will fallback to sort the field names automatically. @@ -1500,9 +1500,9 @@ def __new__(cls, *args, **kwargs): "and kwargs to create Row") if kwargs: if not Row._row_field_sorting_enabled and sys.version_info[:2] < (3, 6): - warnings.warn("To use named arguments for Python version < 3.6, Row " - "field sorting must be enabled by setting the environment " - "variable 'PYSPARK_ROW_FIELD_SORTING_ENABLED' to 'true'.") + warnings.warn("To use named arguments for Python version < 3.6, Row fields will be " + "automatically sorted. This warning can be skipped by setting the " + "environment variable 'PYSPARK_ROW_FIELD_SORTING_ENABLED' to 'true'.") Row._row_field_sorting_enabled = True # create row objects From 3a69539d15cbbaee40876832066a1a49b077e8d4 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 6 Jan 2020 15:46:46 -0800 Subject: [PATCH 13/14] Added note in migration guide --- docs/pyspark-migration-guide.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/pyspark-migration-guide.md b/docs/pyspark-migration-guide.md index 1b8d1fc1c5776..8ea4fec75edf8 100644 --- a/docs/pyspark-migration-guide.md +++ b/docs/pyspark-migration-guide.md @@ -87,6 +87,8 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide. - Since Spark 3.0, `Column.getItem` is fixed such that it does not call `Column.apply`. Consequently, if `Column` is used as an argument to `getItem`, the indexing operator should be used. For example, `map_col.getItem(col('id'))` should be replaced with `map_col[col('id')]`. + - As of Spark 3.0 `Row` field names are no longer sorted alphabetically when constructing with named arguments for Python versions 3.6 and above, and the order of fields will match that as entered. To enable sorted fields by default, as in Spark 2.4, set the environment variable `PYSPARK_ROW_FIELD_SORTING_ENABLED` to "true". For Python versions less than 3.6, the field names will be sorted alphabetically as the only option. + ## Upgrading from PySpark 2.3 to 2.4 - In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unable to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`. From 14e691d3fba4dd416d88f84d0cb5e7489cd69aa3 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 9 Jan 2020 16:04:02 -0800 Subject: [PATCH 14/14] Clarified wording in doc and added a note about example output --- python/pyspark/sql/types.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 406e7f398edc1..39615cdf45877 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1438,16 +1438,19 @@ class Row(tuple): It is not allowed to omit a named argument to represent the value is None or missing. This should be explicitly set to None in this case. - NOTE: As of Spark 3.0.0, the Row field names are no longer sorted - alphabetically. To enable field sorting to create Rows compatible with - Spark 2.x, set the environment variable "PYSPARK_ROW_FIELD_SORTING_ENABLED" - to "true". This option is deprecated and will be removed in future versions - of Spark. For Python versions < 3.6, named arguments can no longer be used - without enabling field sorting with the environment variable above because - order of the arguments is not guaranteed to be the same as entered, see - https://www.python.org/dev/peps/pep-0468. If this is detected, a warning - will be issued and the Row will fallback to sort the field names - automatically. + NOTE: As of Spark 3.0.0, Rows created from named arguments no longer have + field names sorted alphabetically and will be ordered in the position as + entered. To enable sorting for Rows compatible with Spark 2.x, set the + environment variable "PYSPARK_ROW_FIELD_SORTING_ENABLED" to "true". This + option is deprecated and will be removed in future versions of Spark. For + Python versions < 3.6, the order of named arguments is not guaranteed to + be the same as entered, see https://www.python.org/dev/peps/pep-0468. In + this case, a warning will be issued and the Row will fallback to sort the + field names automatically. + + NOTE: Examples with Row in pydocs are run with the environment variable + "PYSPARK_ROW_FIELD_SORTING_ENABLED" set to "true" which results in output + where fields are sorted. >>> row = Row(name="Alice", age=11) >>> row