From 2e5cab32cadb48103ebc01ea6465d7d1e079faf7 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Wed, 25 Jun 2014 16:06:14 -0700 Subject: [PATCH] SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark JIRA: https://issues.apache.org/jira/browse/SPARK-2282 This issue is caused by a buildup of sockets in the TIME_WAIT stage of TCP, which is a stage that lasts for some period of time after the communication closes. This solution simply allows us to reuse sockets that are in TIME_WAIT, to avoid issues with the buildup of the rapid creation of these sockets. --- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index f6570d335757a..462e09466bfa6 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -599,6 +599,8 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort: } else { // This happens on the master, where we pass the updates to Python through a socket val socket = new Socket(serverHost, serverPort) + // SPARK-2282: Immediately reuse closed sockets because we create one per task. + socket.setReuseAddress(true) val in = socket.getInputStream val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize)) out.writeInt(val2.size)