From 4eb617b916472e85fd47fdbcd47d24fc3834a7ed Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Fri, 4 Aug 2023 00:27:28 -0700 Subject: [PATCH 1/3] finish --- .../api/python/StreamingPythonRunner.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala index f14289f984a2f..5da14a10c232d 100644 --- a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala @@ -66,12 +66,17 @@ private[spark] class StreamingPythonRunner( envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) - conf.set(PYTHON_USE_DAEMON, false) envVars.put("SPARK_CONNECT_LOCAL_URL", connectUrl) - val (worker, _) = env.createPythonWorker( - pythonExec, workerModule, envVars.asScala.toMap) - pythonWorker = Some(worker) + val prevConf = conf.get(PYTHON_USE_DAEMON) + conf.set(PYTHON_USE_DAEMON, false) + try { + val (worker, _) = env.createPythonWorker( + pythonExec, workerModule, envVars.asScala.toMap) + pythonWorker = Some(worker) + } finally { + conf.set(PYTHON_USE_DAEMON, prevConf) + } val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize) val dataOut = new DataOutputStream(stream) @@ -100,7 +105,13 @@ private[spark] class StreamingPythonRunner( */ def stop(): Unit = { pythonWorker.foreach { worker => - SparkEnv.get.destroyPythonWorker(pythonExec, workerModule, envVars.asScala.toMap, worker) + val prevConf = conf.get(PYTHON_USE_DAEMON) + conf.set(PYTHON_USE_DAEMON, false) + try { + SparkEnv.get.destroyPythonWorker(pythonExec, workerModule, envVars.asScala.toMap, worker) + } finally { + conf.set(PYTHON_USE_DAEMON, prevConf) + } } } } From ea5dac2ea91e89def36d3fc5ded4100825e5a6e0 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Fri, 4 Aug 2023 10:29:05 -0700 Subject: [PATCH 2/3] minor --- .../org/apache/spark/api/python/StreamingPythonRunner.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala index 5da14a10c232d..edabc0f69398b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala @@ -78,7 +78,7 @@ private[spark] class StreamingPythonRunner( conf.set(PYTHON_USE_DAEMON, prevConf) } - val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize) + val stream = new BufferedOutputStream(pythonWorker.get.getOutputStream, bufferSize) val dataOut = new DataOutputStream(stream) // TODO(SPARK-44461): verify python version @@ -92,7 +92,8 @@ private[spark] class StreamingPythonRunner( dataOut.write(command.toArray) dataOut.flush() - val dataIn = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) + val dataIn = new DataInputStream( + new BufferedInputStream(pythonWorker.get.getInputStream, bufferSize)) val resFromPython = dataIn.readInt() logInfo(s"Runner initialization returned $resFromPython") From 70c13791b4be1697b53b1915757e69e7d3145258 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Fri, 4 Aug 2023 11:19:59 -0700 Subject: [PATCH 3/3] remove conf recover in stop runner --- .../apache/spark/api/python/StreamingPythonRunner.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala index edabc0f69398b..a079743c847ae 100644 --- a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala @@ -106,13 +106,7 @@ private[spark] class StreamingPythonRunner( */ def stop(): Unit = { pythonWorker.foreach { worker => - val prevConf = conf.get(PYTHON_USE_DAEMON) - conf.set(PYTHON_USE_DAEMON, false) - try { - SparkEnv.get.destroyPythonWorker(pythonExec, workerModule, envVars.asScala.toMap, worker) - } finally { - conf.set(PYTHON_USE_DAEMON, prevConf) - } + SparkEnv.get.destroyPythonWorker(pythonExec, workerModule, envVars.asScala.toMap, worker) } } }