diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 01e64b6972ae2..9462dfd950bab 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -186,6 +186,9 @@ private[spark] object SerDeUtil extends Logging { val unpickle = new Unpickler iter.flatMap { row => val obj = unpickle.loads(row) + // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map + // of `Unpickler`. This map is cleared when calling `Unpickler.close()`. + unpickle.close() if (batched) { obj match { case array: Array[Any] => array.toSeq diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 3e1bbbacffd16..322ef93473da0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1347,6 +1347,10 @@ private[spark] abstract class SerDeBase { val unpickle = new Unpickler iter.flatMap { row => val obj = unpickle.loads(row) + // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map + // of `Unpickler`. This map is cleared when calling `Unpickler.close()`. Pyrolite + // doesn't clear it up, so we manually clear it. + unpickle.close() if (batched) { obj match { case list: JArrayList[_] => list.asScala diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 531108738f6c9..6058e94d471e9 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -62,12 +62,11 @@ if sys.version < '3': import cPickle as pickle from itertools import izip as zip, imap as map - pickle_protocol = 2 else: import pickle basestring = unicode = str xrange = range - pickle_protocol = 3 +pickle_protocol = pickle.HIGHEST_PROTOCOL from pyspark import cloudpickle from pyspark.util import _exception_message diff --git a/python/pyspark/sql/tests/test_serde.py b/python/pyspark/sql/tests/test_serde.py index 1c18e930eb91d..ed4b9a7755879 100644 --- a/python/pyspark/sql/tests/test_serde.py +++ b/python/pyspark/sql/tests/test_serde.py @@ -128,6 +128,10 @@ def test_BinaryType_serialization(self): def test_int_array_serialization(self): # Note that this test seems dependent on parallelism. + # This issue is because internal object map in Pyrolite is not cleared after op code + # STOP. If we use protocol 4 to pickle Python objects, op code MEMOIZE will store + # objects in the map. We need to clear up it to make sure next unpickling works on + # clear map. data = self.spark.sparkContext.parallelize([[1, 2, 3, 4]] * 100, numSlices=12) df = self.spark.createDataFrame(data, "array") self.assertEqual(len(list(filter(lambda r: None in r.value, df.collect()))), 0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index d3736d24e5019..eff709ef7f729 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -92,6 +92,10 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi outputIterator.flatMap { pickedResult => val unpickledBatch = unpickle.loads(pickedResult) + // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map + // of `Unpickler`. This map is cleared when calling `Unpickler.close()`. Pyrolite + // doesn't clear it up, so we manually clear it. + unpickle.close() unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala }.map { result => if (udfs.length == 1) {