Skip to content

Conversation

@BryanCutler
Copy link
Member

What changes were proposed in this pull request?

When a Row object is created using kwargs, the order of the keywords can not be relied upon (except for Python 3.5 that uses an OrderedDict). The fields are sorted in the constructor and a flag __from_dict__ is set to indicate that this object was created from kwargs so that other areas in Spark can access row data using field names instead of by position. This change includes the __from_dict__ flag only when pickling a Row that was made from kwargs so that the behavior is preserved if the Row becomes pickled.

How was this patch tested?

Fixed existing tests that relied on fields and schema being in the same alphabetical order. Added new test to create Row from positional arguments where order matters.

self.assertEqual(df.collect(), [Row(key=str(i), value=str(i)) for i in range(100)])

# field names can differ.
df = rdd.toDF(" a: int, b: string ")
Copy link
Member Author

@BryanCutler BryanCutler Jan 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was flawed because it only worked since ("a", "b") is in the same alphabetical order as ("key", "value"). If it was ("key", "aaa") then it would fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But wasn't this testing field names can differ. by user's schema input?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're right. The schema should be able to override names no matter what. I was thinking it was flawed because it relied on the row field to be sorted internally, so just changing the kwarg names (not the order) could cause it to fail. That seems a little strange, but maybe it is the intent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this test is invalid because it only works as the Rows get serialized and lose the __from_dict__ flag. This should be the same think, but would fail:

df = spark.createDataFrame([Row(key=i, value=str(i)) for i in range(100)], schema=" a: int, b: string ")

And it would also fail if the named args of the Row objects were not in the same alphabetical order as the schema:

data = [Row(z=i, y=str(i)) for i in range(100)]
rdd = self.sc.parallelize(data, 5)

df = rdd.toDF(" a: int, b: string ")

This fails because the Row fields would be sorted in a different order, switching the type order.

@BryanCutler
Copy link
Member Author

After looking into this, it seems like the behavior of the Row class is as follows:

If a Row is made from kwargs, then the order of the fields can not be relied upon and whenever accessing data, it must be done like a dict with the field name. When this is the case, the order of the supplied schema doesn't matter but the field name must be a subset of what is in each row.

If a Row is made from generating a custom class, like TestRow = Row("key", "value") then row = TestRow('a', 1), then the position of each element is what is important and data is accessed by position in the tuple. The supplied schema for this must match the types of the rows exactly, however field names are not important and can be changed.

@BryanCutler
Copy link
Member Author

@MrBago @HyukjinKwon I think the above behavior of the Row class is a little screwy, but at least this fixes it to be more consistent. I'm not sure if there is a way to rectify the two different uses without breaking one way or the other. Also to note, using kwargs the performance will likely be really poor because it must find the index for each field and this should maybe be discouraged. cc @holdenk

@SparkQA
Copy link

SparkQA commented Jan 16, 2018

Test build #86193 has finished for PR 20280 at commit 315b8de.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 16, 2018

Test build #86204 has finished for PR 20280 at commit 2192e49.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler BryanCutler force-pushed the pyspark-Row-serialize-SPARK-22232 branch from 2192e49 to a7d3396 Compare January 16, 2018 21:54
@SparkQA
Copy link

SparkQA commented Jan 16, 2018

Test build #86205 has finished for PR 20280 at commit a7d3396.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MrBago
Copy link
Contributor

MrBago commented Jan 16, 2018

I think we should raise an error if __from_dict__ is set and the user tries to index using a position or a slice.

Indexing by field name takes the same code path for Rows that are and are not __from_dict__ and I don't think we can "discourage" this because row.fieldName is commonly used in our docs and other pyspark learning materials. If performance is an issue, maybe we should replace __fields__ with a dict that maps fileldName -> position.

@HyukjinKwon
Copy link
Member

Ahh. @zero323 too if you are available because I think we had a talk about this somewhere long time ago. Yea, I am aware of the issue itself. Will take a look soon.

@MrBago
Copy link
Contributor

MrBago commented Jan 17, 2018

BTW the performance issue is orthogonal to the serialization issue raised in this jira/PR. Maybe we should avoid scope creep in this thread.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jan 17, 2018

^ Yup, let's leave the performance issue out. I think we might have to raise an error too but it's kind of a radical change.

As a note, sorted fields are documented:

Row can be used to create a row object by using named arguments,
the fields will be sorted by names. It is not allowed to omit

My only main concern is:

... the field name must be a subset of what is in each row.

... field names are not important and can be changed.

I think this is kind of a breaking change because we will basically now disallow the names in the schema given by user explicitly for Row made by kwargs with different names IIUC?

@BryanCutler
Copy link
Member Author

Thanks @HyukjinKwon and @MrBago for reviewing. After thinking about this some more, I don't think this is the right solution. Like @HyukjinKwon pointed out, the supplied schema names should always override any specified from Row even if it was made from kwargs. So that means toDF must go by position, and for Row with kwargs that is with field names sorted. That seems a little strange but I believe it is mostly due to a Python limitation of kwargs not being in a specific order and I don't know if there is much we can do about it.

Something still seems wrong though because the example @MrBago has in the JIRA is inconsistent. I'll go over it again tomorrow, when I'm more awake..

@holdenk
Copy link
Contributor

holdenk commented Apr 6, 2018

Hey @BryanCutler is this still on your radar?

@BryanCutler
Copy link
Member Author

Hey @holdenk , yeah I've been meaning to circle back to this and get some kind of resolution. I'll try to take another look later this week.

@holdenk
Copy link
Contributor

holdenk commented Apr 13, 2018

Awesome, looking forward to the update.

@BryanCutler
Copy link
Member Author

BryanCutler commented Apr 18, 2018

Looking at this again, I'm back to thinking this is the right fix. Based on #14469, if the Row objects were made with named arguments, then the intent is for elements to be looked up by field name since the schema could be in a different order. This shouldn't change depending on if the Row objects were serialized.

@BryanCutler
Copy link
Member Author

BryanCutler commented Apr 18, 2018

Let me restate what I think the intended behavior of Row is:

If a Row is made from kwargs, then the order of the fields can not be relied upon and whenever accessing data, it must be done like a dict with the field name. Because of this, when applying a schema to the data, the schema fields must also be fields in the Row objects. Field position can change as long as the name matches.

If a Row is made from generating a custom class, like TestRow = Row("key", "value") then row = TestRow('a', 1), the the schema will be applied based on position and the elements in the Row objects are accessed by index. The name of each field in the schema can differ as long as the element at that index can be converted to the specified schema type.

@SparkQA
Copy link

SparkQA commented Apr 18, 2018

Test build #89530 has finished for PR 20280 at commit 10bf2d0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

BryanCutler commented Apr 18, 2018

I think we should raise an error if from_dict is set and the user tries to index using a position or a slice.

I'd also like to follow up with another PR to address some of the usability issues with Row. I found a couple other unfriendly behaviors and we can discuss this ^ there as well.

@BryanCutler
Copy link
Member Author

Also, this will cause a breaking change if Rows are defined with kwargs and schema changes field names, like this:

data = [Row(key=i, value=str(i)) for i in range(100)]
rdd = self.sc.parallelize(data, 5)
df = rdd.toDF(" a: int, b: string ")

and this would work but might be slower, depending on how complicated the schema is, because now the field names are searched for instead of just going by position

df = rdd.toDF(" key: int, value: string ")

So if we go forward with this fix, I should probably add something in the migration guide

@BryanCutler
Copy link
Member Author

@HyukjinKwon @holdenk and @MrBago have any thoughts on moving forward with this change?

@HyukjinKwon
Copy link
Member

oops, I missed this. will take a look shortly.

@HyukjinKwon
Copy link
Member

Right. Will triple check for sure but I am with you for now. Yup, something in the migration guide makes much more sense to me too.

@felixcheung
Copy link
Member

I'm kinda worry the example you give above is actually fairly common - construct with kwargs, and then (re-)name the columns.

perhaps worthwhile to consider a config switch?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Apr 22, 2018

If the renaming scenario works in most of cases as expected, I think it'd be worthwhile to have a configuration; however, the previous behaviour looks actually odd because it's going to work only in certain weird conditions when fields in the given schema is in the alphabetical order mapping to fields in Row (#20280 (comment)). Otherwise this case fails already as well.

The test case modified in #20280 (comment) actually works only because key and value in Row and a and b in the schema can be mapped in the sorted same order. I think the test case should be invalid ..

I thought about this for a while and failed to describe what the configuration does .. It sounded describing a bug like it was a proper behaviour that can be controlled by a configuration ..

I think this one sounds more like a bug fix to me so far. Workaround should be relatively easy. Maybe, would it be good enough to describe workaround in the guide instead? I think it should be fine if we just use a map to convert Row to things like a tuple.

@HyukjinKwon
Copy link
Member

BTW, I believe it's not so easy to pass a configuration from a very quick look because the exception usually would be thrown in a Python worker process.

@HyukjinKwon
Copy link
Member

@BryanCutler, mind if I ask to clarify what happens for end-to-end cases in the PR description (like before & after with explaining the reasons)? the change looks small but possibly a breaking change about end-to-end cases although I think for now we are restoring the correct behaviour as expected.

@BryanCutler
Copy link
Member Author

Thanks @HyukjinKwon and @felixcheung , I'm a bit worried too that this might break someones code, but it doesn't affect createDataFrame from Rows, it's only when the Row is serialized like going from an RDD of Rows toDF. Even then the schema gets alphabetized, which I'm sure the users would agree that it is strange.

I'm not sure about adding a config switch, it might be a little hard to add and could be confusing to the user to explain that its only when serialized and the schema would need to be sorted by the original Row keywords.

I'll go ahead and update the migration guide, and expand on the PR description to hopefully make the change as clear as possible.

@holdenk
Copy link
Contributor

holdenk commented Apr 26, 2018

I'm worried that people might have two rows with different meanings but the same type and their application will start producing garbage #s. I think a lot of people go from RDDs of Rows to DFs in PySpark these days, so I'm a little nervous about this change even though I think its a step in the right direction.

How about a config flag defaulting to off and we switch over @ 3.0?

@HyukjinKwon
Copy link
Member

Probably that'd work but also it'd be trickier to add / remove that configuration. Another similar option maybe just close this for now and target this for 3.0.0 since we already started to talk about it.

@BryanCutler
Copy link
Member Author

BryanCutler commented May 1, 2018

@HyukjinKwon , I was also thinking about holding off on this until 3.0.0 and then make a clean switch. What do you think about that @holdenk ? This bug doesn't block anything since there are workarounds, so fixing it is just to make more consistent.

@HyukjinKwon
Copy link
Member

holding off is fine; however, I am less sure about the configuration if that's not something you guys feel strongly.

@BryanCutler
Copy link
Member Author

closing now, will revisit for Spark 3.0

@HyukjinKwon
Copy link
Member

@BryanCutler, do you think we should try it out in 3.0?

@zero323
Copy link
Member

zero323 commented Oct 24, 2019

@HyukjinKwon Does it really make any sense to introduce such change at this point? If SPARK-27884 is to be followed, it will survive at most two minor versions. Unless Spark developers are committed to supporting Python 3.4 (already end-of-life) and 3.5 (will reach its end-of-life in the middle of 2020) after dropping Python 2 support.

if kwargs:
# create row objects
# create row object from named arguments, order not guaranteed so will be sorted
names = sorted(kwargs.keys())
Copy link
Member

@HyukjinKwon HyukjinKwon Oct 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zero323 Yea, this sorting stuff can be removed once we drop Python lower than 3.6. However, keeping __from_dict__ after ser/de might fix another problem such as #26118

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing that out.

In my opinion dropping 3.5 support should be followed by removal of sorting by name (from what I've seen this behavior is confusing at best and it will have no use, once 3.5 support is dropped), although that might hard to sell, especially in a minor version.

Ideally I would see Row going away completely, but that's just pie in the sky.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're in 3.0, we might have to consider to drop that sorting by name one .. yes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'd be great, but 3.0 only deprecates, not drops Python 2 support. So the main why sorting was introduced stays valid.

If Spark 2.5 was accepted, Python 2 could be deprecated there, and dropped in 3.0. In general I think that bringing Python 2 into Spark 3 is a mistake, but I am first to admit I am biased.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Spark 2.5 idea looks almost failed (and I either don't personally support that idea). This thing is rather legitimate to make it happen between minor releases. Actually, 3.0 is getting close. We could just let 3.0 release first and remove that sorting behaviour too optionally.

@BryanCutler
Copy link
Member Author

BryanCutler commented Oct 25, 2019 via email

@zero323
Copy link
Member

zero323 commented Oct 25, 2019

Besides dropping support for python versions, is it possible to remove the need for sorting fields by requiring an OrderedDict be used for versions before 3.6 or something similar?

One could drop **kwargs completely (standardish workaround)

def __new__(self, *args):
    # "Keyword" path
    if args and len(args) == 1 and isinstance(args[0], OrderedDict):
        ...
    else:
         return tuple.__new__(self, args)

but that's a breaking change with significant potential impact (updating docs alone would require a lot of work).

In my opinion, not something that is justified only to support EOL Python versions, especially when 3.6 has been around for almost 3 years now, and migration doesn't require any changes in the user codebase.

@BryanCutler
Copy link
Member Author

Yes, that is kind of what I was thinking @zero323 , but still allow kwargs for python >= 3.6, where they will be ordered. To prevent a breaking change, we could offer a conf that would fall back to creating a LegacyRow that sorts the fields. For example:

class Row(tuple):
  def __new__(self, *args, **kwargs):
        if args and kwargs:
            raise ValueError("Can not use both args "
                             "and kwargs to create Row")
        if py_version < "3.6":
            if len(args) != 1 and not isinstance(args[0], OrderedDict):
              if conf.get("python.useLegacyRow"):
                return LegacyRow(args, kwargs)
              else:
                raise ValueError(...)
           else:
             kwargs = args[0].to_dict()

        if kwargs:
            # create row objects
            names = kwargs.keys()
            row = tuple.__new__(self, [kwargs[n] for n in names])
            row.__fields__ = names
            # row.__from_dict__ = True  ## Don't need anymore, I think
            return row

        else:
            # create row class or objects
            return tuple.__new__(self, args)

class LegacyRow(Row):
    ...
    names = sorted(kwargs.keys())
    ...
    row.__from_dict__ = True

@HyukjinKwon and @zero323 does this look like a possibility?

@zero323
Copy link
Member

zero323 commented Oct 26, 2019

@BryanCutler The idea of controlling sorting behavior via configuration seem legit, but I wouldn't go so far, as adding a subclass, unless we plan to have diverging implementation later in time (thought I think that in such case it would make more sense to have common base class or dynamically provide one, or another implementation).

In other words we could resort input if:

  • We get kwargs
  • Python is 3.6 or later (Python < 3.6 should stay as-is to avoid confusion).
  • Corresponding config option is set to true.
class Row(tuple):
    def __new__(self, *args, **kwargs):
        if args and kwargs:
            raise ValueError("Can not use both args "
                             "and kwargs to create Row")

        if kwargs:
            if sys.version_info > (3, 6) and conf.get("python.useLegacyRowBehaviour"):
                names, values = zip(*sorted(kwargs.items()))
            else:
                names, values = list(kwargs.keys()), kwargs.values()
            row = tuple.__new__(self, values)
            row.__fields__ = names
            row.__from_dict__ = True
            return row

        else:
            # create row class or objects
            return tuple.__new__(self, args)

I don't think that providing separate path for OrderedDict makes much sense as, despite outstanding deprecations, dict can be used in most of the contexts where Row is expected, and even if it wasn't the case, users can always unpack the dict:

d: OrderedDict = ...
Row(**d)

If anything there should be a separate path for OrderedDicts in _infer_schema*

if isinstance(row, dict):
items = sorted(row.items())

Side notes:

  • If Row is to be rewritten could we replace dunders (__fields__, __from_dict__ ) with leading underscore? That's a code smell, and even if it is unlikely to break in the future, it quite confusing for the reader.
  • I asked on dev about deprecating Python < 3.6 support. Feedback provided there might be relevant here.
  • What should be done about Row.asDict? In general in Python 3.6+ ordering will be preserved, but maybe we should be explicit, and use OrderedDict?

* In general schema inference will require some work to make it consistent with Row behavior. For example 3.6+ classes do preserve attribute definition order PEP 520 so the current approach

items = sorted(row.__dict__.items())

would be undesired in 3.7 without legacy mode.

@BryanCutler
Copy link
Member Author

@zero323 and @HyukjinKwon I think we should open another JIRA for this to continue the discussion and send a msg to the dev list, what do you guys think?

@HyukjinKwon
Copy link
Member

Yeah, +1 for JIRA. It's too long to read .. lol.

Seems like @zero323 sent an email to dev list about dropping Python 3.4 and 3.5 (http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-Deprecate-Python-lt-3-6-in-Spark-3-0-td28168.html)

@BryanCutler
Copy link
Member Author

I made https://issues.apache.org/jira/browse/SPARK-29748 to discuss dropping the sorting

@BryanCutler BryanCutler deleted the pyspark-Row-serialize-SPARK-22232 branch January 10, 2020 22:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants