Skip to content

Conversation

@e-dorigatti
Copy link
Contributor

SPARK-23754 was fixed in #21383 by changing the UDF code to wrap the user function, but this required a hack to save its argspec. This PR reverts this change and fixes the StopIteration bug in the worker.

The root of the problem is that when an user-supplied function raises a StopIteration, pyspark might stop processing data, if this function is used in a for-loop. The solution is to catch StopIterations exceptions and re-raise them as RuntimeErrors, so that the execution fails and the error is reported to the user. This is done using the fail_on_stopiteration wrapper, in different ways depending on where the function is used:

  • In RDDs, the user function is wrapped in the driver, because this function is also called in the driver itself.
  • In SQL UDFs, the function is wrapped in the worker, since all processing happens there. Moreover, the worker needs the signature of the user function, which is lost when wrapping it, but passing this signature to the worker requires a not so nice hack.

@HyukjinKwon

Make sure that `StopIteration`s raised in users' code do not silently interrupt processing by spark, but are raised as exceptions to the users. The users' functions are wrapped in `safe_iter` (in `shuffle.py`), which re-raises `StopIteration`s as `RuntimeError`s

Unit tests, making sure that the exceptions are indeed raised. I am not sure how to check whether a `Py4JJavaError` contains my exception, so I simply looked for the exception message in the java exception's `toString`. Can you propose a better way?

This is my original work, licensed in the same way as spark

Author: e-dorigatti <emilio.dorigatti@gmail.com>

Closes apache#21383 from e-dorigatti/fix_spark_23754.

(cherry picked from commit 0ebb0c0)
… driver to executor

SPARK-23754 was fixed in apache#21383 by changing the UDF code to wrap the user function, but this required a hack to save its argspec. This PR reverts this change and fixes the `StopIteration` bug in the worker

The root of the problem is that when an user-supplied function raises a `StopIteration`, pyspark might stop processing data, if this function is used in a for-loop. The solution is to catch `StopIteration`s exceptions and re-raise them as `RuntimeError`s, so that the execution fails and the error is reported to the user. This is done using the `fail_on_stopiteration` wrapper, in different ways depending on where the function is used:
 - In RDDs, the user function is wrapped in the driver, because this function is also called in the driver itself.
 - In SQL UDFs, the function is wrapped in the worker, since all processing happens there. Moreover, the worker needs the signature of the user function, which is lost when wrapping it, but passing this signature to the worker requires a not so nice hack.

Same tests, plus tests for pandas UDFs

Author: edorigatti <emilio.dorigatti@gmail.com>

Closes apache#21467 from e-dorigatti/fix_udf_hack.
@viirya
Copy link
Member

viirya commented Jun 12, 2018

@e-dorigatti Can you add [BACKPORT-2.3] in the PR title? Thanks.

@e-dorigatti e-dorigatti changed the title [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration wrapping from driver to executor [SPARK-23754][PYTHON][FOLLOWUP][BACKPORT-2.3] Move UDF stop iteration wrapping from driver to executor Jun 12, 2018
@HyukjinKwon
Copy link
Member

add to whitelist

@SparkQA
Copy link

SparkQA commented Jun 12, 2018

Test build #91701 has finished for PR 21538 at commit 217e730.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@e-dorigatti
Copy link
Contributor Author

Seems like it skipped the pandas tests, for both python2.7 and pypy

Will skip Pandas related features against Python executable  ...

@HyukjinKwon
Copy link
Member

Yea, it's unfortunate .. we should fix and set up the Jenkins env too.

@SparkQA
Copy link

SparkQA commented Jun 12, 2018

Test build #91704 has finished for PR 21538 at commit 217e730.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


# make sure StopIteration's raised in the user code are not ignored
# when they are processed in a for loop, raise them as RuntimeError's instead
row_func = fail_on_stopiteration(row_func)
Copy link
Member

@HyukjinKwon HyukjinKwon Jun 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@e-dorigatti, I think it's fine to name it func as fixed in master. Let's reduce the diff so that other backports make less conflicts in the future.

@SparkQA
Copy link

SparkQA commented Jun 12, 2018

Test build #91716 has finished for PR 21538 at commit 612781a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too

@HyukjinKwon
Copy link
Member

Merged to branch-2.3.

asfgit pushed a commit that referenced this pull request Jun 13, 2018
… wrapping from driver to executor

SPARK-23754 was fixed in #21383 by changing the UDF code to wrap the user function, but this required a hack to save its argspec. This PR reverts this change and fixes the `StopIteration` bug in the worker.

The root of the problem is that when an user-supplied function raises a `StopIteration`, pyspark might stop processing data, if this function is used in a for-loop. The solution is to catch `StopIteration`s exceptions and re-raise them as `RuntimeError`s, so that the execution fails and the error is reported to the user. This is done using the `fail_on_stopiteration` wrapper, in different ways depending on where the function is used:
 - In RDDs, the user function is wrapped in the driver, because this function is also called in the driver itself.
 - In SQL UDFs, the function is wrapped in the worker, since all processing happens there. Moreover, the worker needs the signature of the user function, which is lost when wrapping it, but passing this signature to the worker requires a not so nice hack.

HyukjinKwon

Author: edorigatti <emilio.dorigatti@gmail.com>
Author: e-dorigatti <emilio.dorigatti@gmail.com>

Closes #21538 from e-dorigatti/branch-2.3.
@HyukjinKwon
Copy link
Member

@e-dorigatti, this got merged into branch-2.3. Likewise, this also should be manually closed. Thanks for working on this.

@e-dorigatti
Copy link
Contributor Author

@HyukjinKwon thank you so much for your patience :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants