Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2482,18 +2482,20 @@ def test_toDF_with_schema_string(self):
self.assertEqual(df.schema.simpleString(), "struct<key:string,value:string>")
self.assertEqual(df.collect(), [Row(key=str(i), value=str(i)) for i in range(100)])

# field names can differ.
df = rdd.toDF(" a: int, b: string ")
Copy link
Member Author

@BryanCutler BryanCutler Jan 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was flawed because it only worked since ("a", "b") is in the same alphabetical order as ("key", "value"). If it was ("key", "aaa") then it would fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But wasn't this testing field names can differ. by user's schema input?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're right. The schema should be able to override names no matter what. I was thinking it was flawed because it relied on the row field to be sorted internally, so just changing the kwarg names (not the order) could cause it to fail. That seems a little strange, but maybe it is the intent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this test is invalid because it only works as the Rows get serialized and lose the __from_dict__ flag. This should be the same think, but would fail:

df = spark.createDataFrame([Row(key=i, value=str(i)) for i in range(100)], schema=" a: int, b: string ")

And it would also fail if the named args of the Row objects were not in the same alphabetical order as the schema:

data = [Row(z=i, y=str(i)) for i in range(100)]
rdd = self.sc.parallelize(data, 5)

df = rdd.toDF(" a: int, b: string ")

This fails because the Row fields would be sorted in a different order, switching the type order.

self.assertEqual(df.schema.simpleString(), "struct<a:int,b:string>")
self.assertEqual(df.collect(), data)
# field order can differ since Rows created with named arguments.
df = rdd.toDF(" value: string, key: int ")
self.assertEqual(df.schema.simpleString(), "struct<value:string,key:int>")
self.assertEqual(df.select("key", "value").collect(), data)

# number of fields must match.
self.assertRaisesRegexp(Exception, "Length of object",
lambda: rdd.toDF("key: int").collect())
# schema field must be also be a field in the row.
with QuietTest(self.sc):
self.assertRaisesRegexp(Exception, "ValueError: foo",
lambda: rdd.toDF("foo: int").collect())

# field types mismatch will cause exception at runtime.
self.assertRaisesRegexp(Exception, "FloatType can not accept",
lambda: rdd.toDF("key: float, value: string").collect())
with QuietTest(self.sc):
self.assertRaisesRegexp(Exception, "FloatType can not accept",
lambda: rdd.toDF("key: float, value: string").collect())

# flat schema values will be wrapped into row.
df = rdd.map(lambda row: row.key).toDF("int")
Expand All @@ -2505,6 +2507,21 @@ def test_toDF_with_schema_string(self):
self.assertEqual(df.schema.simpleString(), "struct<value:int>")
self.assertEqual(df.collect(), [Row(key=i) for i in range(100)])

def test_toDF_with_positional_Row_class(self):
TestRow = Row("b", "a")
data = [TestRow(i, str(i)) for i in range(10)]
rdd = self.sc.parallelize(data, 2)

# field names can differ as long as types are in expected position.
df = rdd.toDF(" key: int, value: string ")
self.assertEqual(df.schema.simpleString(), "struct<key:int,value:string>")
self.assertEqual(df.collect(), data)

# number of fields must match.
with QuietTest(self.sc):
self.assertRaisesRegexp(Exception, "Length of object",
lambda: rdd.toDF("key: int").collect())

def test_join_without_on(self):
df1 = self.spark.range(1).toDF("a")
df2 = self.spark.range(1).toDF("b")
Expand Down
15 changes: 10 additions & 5 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1392,9 +1392,11 @@ def _create_row_inbound_converter(dataType):
return lambda *a: dataType.fromInternal(a)


def _create_row(fields, values):
def _create_row(fields, values, from_dict=False):
row = Row(*values)
row.__fields__ = fields
if from_dict:
row.__from_dict__ = True
return row


Expand Down Expand Up @@ -1445,15 +1447,15 @@ def __new__(self, *args, **kwargs):
raise ValueError("Can not use both args "
"and kwargs to create Row")
if kwargs:
# create row objects
# create row object from named arguments, order not guaranteed so will be sorted
names = sorted(kwargs.keys())
Copy link
Member

@HyukjinKwon HyukjinKwon Oct 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zero323 Yea, this sorting stuff can be removed once we drop Python lower than 3.6. However, keeping __from_dict__ after ser/de might fix another problem such as #26118

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out.

In my opinion dropping 3.5 support should be followed by removal of sorting by name (from what I've seen this behavior is confusing at best and it will have no use, once 3.5 support is dropped), although that might hard to sell, especially in a minor version.

Ideally I would see Row going away completely, but that's just pie in the sky.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're in 3.0, we might have to consider to drop that sorting by name one .. yes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'd be great, but 3.0 only deprecates, not drops Python 2 support. So the main why sorting was introduced stays valid.

If Spark 2.5 was accepted, Python 2 could be deprecated there, and dropped in 3.0. In general I think that bringing Python 2 into Spark 3 is a mistake, but I am first to admit I am biased.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Spark 2.5 idea looks almost failed (and I either don't personally support that idea). This thing is rather legitimate to make it happen between minor releases. Actually, 3.0 is getting close. We could just let 3.0 release first and remove that sorting behaviour too optionally.

row = tuple.__new__(self, [kwargs[n] for n in names])
row.__fields__ = names
row.__from_dict__ = True
row.__from_dict__ = True # Row elements will be accessed by field name, not position
return row

else:
# create row class or objects
# create a row class for generating objects or a tuple-like object
return tuple.__new__(self, args)

def asDict(self, recursive=False):
Expand Down Expand Up @@ -1532,7 +1534,10 @@ def __setattr__(self, key, value):
def __reduce__(self):
"""Returns a tuple so Python knows how to pickle Row."""
if hasattr(self, "__fields__"):
return (_create_row, (self.__fields__, tuple(self)))
if hasattr(self, "__from_dict__"):
return (_create_row, (self.__fields__, tuple(self), True))
else:
return (_create_row, (self.__fields__, tuple(self)))
else:
return tuple.__reduce__(self)

Expand Down