Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@
if sys.version < '3':
import cPickle as pickle
from itertools import izip as zip, imap as map
pickle_protocol = 2
else:
import pickle
basestring = unicode = str
xrange = range
pickle_protocol = pickle.HIGHEST_PROTOCOL
pickle_protocol = 3
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use pickle.DEFAULT_PROTOCOL too but let me stick with constant since seems protocol 4 has this bug.

Copy link
Member

@dongjoon-hyun dongjoon-hyun May 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks nice and solid. BTW, do you think we can have a pointer for the upstream bug issue against pickle?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the bug issue related to the root cause somewhere? (I think) it's more like an issue within Pryolite library .. I am not 100% sure yet. I will update that when I have it. I am looking into this to identify the cause.


from pyspark import cloudpickle
from pyspark.util import _exception_message
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/sql/tests/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ def test_BinaryType_serialization(self):
df = self.spark.createDataFrame(data, schema=schema)
df.collect()

def test_int_array_serialization(self):
# Note that this test seems dependent on parallelism.
data = self.spark.sparkContext.parallelize([[1, 2, 3, 4]] * 100, numSlices=12)
df = self.spark.createDataFrame(data, "array<integer>")
self.assertEqual(len(list(filter(lambda r: None in r.value, df.collect()))), 0)


if __name__ == "__main__":
import unittest
Expand Down