From 800a184b80d78f9fc4803cdc7dd3d3b6d63afe56 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Tue, 30 Nov 2021 18:04:12 +0800 Subject: [PATCH 01/14] [SPARK-37054][PYSPARK] Pyspark create SparkSession with existed session should not pass static conf --- python/pyspark/sql/session.py | 3 ++- python/pyspark/sql/tests/test_session.py | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index f94b9c2115044..bc4071e66c3a5 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -278,7 +278,8 @@ def getOrCreate(self) -> "SparkSession": # by all sessions. session = SparkSession(sc, options=self._options) for key, value in self._options.items(): - session._jsparkSession.sessionState().conf().setConfString(key, value) + if not session._jvm.org.apache.spark.sql.internal.SQLConf.isStaticConfigKey(key): + session._jsparkSession.sessionState().conf().setConfString(key, value) return session builder = Builder() diff --git a/python/pyspark/sql/tests/test_session.py b/python/pyspark/sql/tests/test_session.py index 06771fac896ba..48c0f1a5906a3 100644 --- a/python/pyspark/sql/tests/test_session.py +++ b/python/pyspark/sql/tests/test_session.py @@ -273,12 +273,12 @@ def test_another_spark_session(self): session2 = None try: session1 = SparkSession.builder.config("key1", "value1").getOrCreate() - session2 = SparkSession.builder.config("key2", "value2").getOrCreate() + session2 = SparkSession.builder.config("spark.sql.codegen.comments", "true").getOrCreate() self.assertEqual(session1.conf.get("key1"), "value1") self.assertEqual(session2.conf.get("key1"), "value1") - self.assertEqual(session1.conf.get("key2"), "value2") - self.assertEqual(session2.conf.get("key2"), "value2") + self.assertEqual(session1.conf.get("spark.sql.codegen.comments"), "false") + self.assertEqual(session2.conf.get("spark.sql.codegen.comments"), "false") self.assertEqual(session1.sparkContext, session2.sparkContext) self.assertEqual(session1.sparkContext.getConf().get("key1"), "value1") From a91cdc9cb6b23fcec95b6bdf60231f793c7539fa Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Tue, 30 Nov 2021 18:22:37 +0800 Subject: [PATCH 02/14] reformat --- python/pyspark/sql/session.py | 4 +++- python/pyspark/sql/tests/test_session.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index bc4071e66c3a5..91c3185d5a3ca 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -278,7 +278,9 @@ def getOrCreate(self) -> "SparkSession": # by all sessions. session = SparkSession(sc, options=self._options) for key, value in self._options.items(): - if not session._jvm.org.apache.spark.sql.internal.SQLConf.isStaticConfigKey(key): + if not session._jvm.org.apache.spark.sql.internal.SQLConf.isStaticConfigKey( + key + ): session._jsparkSession.sessionState().conf().setConfString(key, value) return session diff --git a/python/pyspark/sql/tests/test_session.py b/python/pyspark/sql/tests/test_session.py index 48c0f1a5906a3..84fa23d9edfeb 100644 --- a/python/pyspark/sql/tests/test_session.py +++ b/python/pyspark/sql/tests/test_session.py @@ -273,7 +273,9 @@ def test_another_spark_session(self): session2 = None try: session1 = SparkSession.builder.config("key1", "value1").getOrCreate() - session2 = SparkSession.builder.config("spark.sql.codegen.comments", "true").getOrCreate() + session2 = SparkSession.builder.config( + "spark.sql.codegen.comments", "true" + ).getOrCreate() self.assertEqual(session1.conf.get("key1"), "value1") self.assertEqual(session2.conf.get("key1"), "value1") From 85f9188ede9545c79fc27e2101a8d79c2c9e4bda Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 1 Dec 2021 11:26:45 +0800 Subject: [PATCH 03/14] Update session.py --- python/pyspark/sql/session.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 91c3185d5a3ca..cf1f3ffd0a3ae 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -277,11 +277,25 @@ def getOrCreate(self) -> "SparkSession": # Do not update `SparkConf` for existing `SparkContext`, as it's shared # by all sessions. session = SparkSession(sc, options=self._options) + staticConfs: Dict[str, Any] = {} + otherConfs: Dict[str, Any] = {} for key, value in self._options.items(): - if not session._jvm.org.apache.spark.sql.internal.SQLConf.isStaticConfigKey( - key - ): - session._jsparkSession.sessionState().conf().setConfString(key, value) + if session._jvm.org.apache.spark.sql.internal.SQLConf.isStaticConfigKey(key): + staticConfs[key] = value + else: + otherConfs[key] = value + for key, value in otherConfs.items(): + session._jsparkSession.sessionState().conf().setConfString(key, value) + if not staticConfs: + session._jsparkSession.logWarning( + "Using an existing SparkSession; the static sql configurations will not take" + + " effect." + ) + if not otherConfs: + session._jsparkSession.logWarning( + "Using an existing SparkSession; some spark core configurations may not take" + + " effect." + ) return session builder = Builder() From 8a0f79c2f5272943c580d3cd7883433835436be8 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 1 Dec 2021 12:03:37 +0800 Subject: [PATCH 04/14] Update session.py --- python/pyspark/sql/session.py | 50 ++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index cf1f3ffd0a3ae..9e47d78a12b01 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -277,25 +277,8 @@ def getOrCreate(self) -> "SparkSession": # Do not update `SparkConf` for existing `SparkContext`, as it's shared # by all sessions. session = SparkSession(sc, options=self._options) - staticConfs: Dict[str, Any] = {} - otherConfs: Dict[str, Any] = {} - for key, value in self._options.items(): - if session._jvm.org.apache.spark.sql.internal.SQLConf.isStaticConfigKey(key): - staticConfs[key] = value - else: - otherConfs[key] = value - for key, value in otherConfs.items(): - session._jsparkSession.sessionState().conf().setConfString(key, value) - if not staticConfs: - session._jsparkSession.logWarning( - "Using an existing SparkSession; the static sql configurations will not take" - + " effect." - ) - if not otherConfs: - session._jsparkSession.logWarning( - "Using an existing SparkSession; some spark core configurations may not take" - + " effect." - ) + else: + SparkSession.applyModifiableSetting(session, session._jsparkSession, self._options) return session builder = Builder() @@ -304,6 +287,30 @@ def getOrCreate(self) -> "SparkSession": _instantiatedSession: ClassVar[Optional["SparkSession"]] = None _activeSession: ClassVar[Optional["SparkSession"]] = None + def applyModifiableSetting(self, session: JavaObject, options: Dict[str, Any]): + log4jLogger = self._jvm.org.apache.log4j + LOGGER = log4jLogger.LogManager.getLogger("SparkSession") + staticConfs: Dict[str, Any] = {} + otherConfs: Dict[str, Any] = {} + for key, value in options.items(): + if self._jvm.org.apache.spark.sql.internal.SQLConf.isStaticConfigKey(key): + staticConfs[key] = value + else: + otherConfs[key] = value + for key, value in otherConfs.items(): + session.sessionState().conf().setConfString(key, value) + if not staticConfs: + LOGGER.warn( + "Using an existing SparkSession; the static sql configurations will not take" + + " effect." + ) + if not otherConfs: + LOGGER.warn( + "Using an existing SparkSession; some spark core configurations may not take" + + " effect." + ) + return + def __init__( self, sparkContext: SparkContext, @@ -321,8 +328,13 @@ def __init__( and not self._jvm.SparkSession.getDefaultSession().get().sparkContext().isStopped() ): jsparkSession = self._jvm.SparkSession.getDefaultSession().get() + if options is not None: + self.applyModifiableSetting(jsparkSession, options) else: jsparkSession = self._jvm.SparkSession(self._jsc.sc(), options) + else : + if options is not None: + self.applyModifiableSetting(jsparkSession, options) self._jsparkSession = jsparkSession self._jwrapped = self._jsparkSession.sqlContext() self._wrapped = SQLContext(self._sc, self, self._jwrapped) From 5b406f566cb35ec67ddd74e8f0798e5c41b2c320 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 1 Dec 2021 12:04:35 +0800 Subject: [PATCH 05/14] Update session.py --- python/pyspark/sql/session.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 9e47d78a12b01..70ffd02483fa8 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -278,7 +278,9 @@ def getOrCreate(self) -> "SparkSession": # by all sessions. session = SparkSession(sc, options=self._options) else: - SparkSession.applyModifiableSetting(session, session._jsparkSession, self._options) + SparkSession.applyModifiableSetting( + session, session._jsparkSession, self._options + ) return session builder = Builder() @@ -332,7 +334,7 @@ def __init__( self.applyModifiableSetting(jsparkSession, options) else: jsparkSession = self._jvm.SparkSession(self._jsc.sc(), options) - else : + else: if options is not None: self.applyModifiableSetting(jsparkSession, options) self._jsparkSession = jsparkSession From bc644ff8f238c66ba33337ace8af2c19ab55043d Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 1 Dec 2021 15:08:56 +0800 Subject: [PATCH 06/14] Update session.py --- python/pyspark/sql/session.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 70ffd02483fa8..2357c811e0c4a 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -289,7 +289,7 @@ def getOrCreate(self) -> "SparkSession": _instantiatedSession: ClassVar[Optional["SparkSession"]] = None _activeSession: ClassVar[Optional["SparkSession"]] = None - def applyModifiableSetting(self, session: JavaObject, options: Dict[str, Any]): + def applyModifiableSetting(self, session: JavaObject, options: Dict[str, Any]) -> None: log4jLogger = self._jvm.org.apache.log4j LOGGER = log4jLogger.LogManager.getLogger("SparkSession") staticConfs: Dict[str, Any] = {} @@ -311,7 +311,6 @@ def applyModifiableSetting(self, session: JavaObject, options: Dict[str, Any]): "Using an existing SparkSession; some spark core configurations may not take" + " effect." ) - return def __init__( self, From 00b4c9a8c0b05700e5a9e46001fdbd64743039d2 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Thu, 2 Dec 2021 07:34:22 +0800 Subject: [PATCH 07/14] update --- python/pyspark/sql/session.py | 31 ++------------- .../org/apache/spark/sql/SparkSession.scala | 38 ++++++++++--------- 2 files changed, 23 insertions(+), 46 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 2357c811e0c4a..418da1e27604c 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -278,9 +278,7 @@ def getOrCreate(self) -> "SparkSession": # by all sessions. session = SparkSession(sc, options=self._options) else: - SparkSession.applyModifiableSetting( - session, session._jsparkSession, self._options - ) + session._jvm.SparkSession.applyModifiableSettings(session._jsparkSession, self._options) return session builder = Builder() @@ -289,29 +287,6 @@ def getOrCreate(self) -> "SparkSession": _instantiatedSession: ClassVar[Optional["SparkSession"]] = None _activeSession: ClassVar[Optional["SparkSession"]] = None - def applyModifiableSetting(self, session: JavaObject, options: Dict[str, Any]) -> None: - log4jLogger = self._jvm.org.apache.log4j - LOGGER = log4jLogger.LogManager.getLogger("SparkSession") - staticConfs: Dict[str, Any] = {} - otherConfs: Dict[str, Any] = {} - for key, value in options.items(): - if self._jvm.org.apache.spark.sql.internal.SQLConf.isStaticConfigKey(key): - staticConfs[key] = value - else: - otherConfs[key] = value - for key, value in otherConfs.items(): - session.sessionState().conf().setConfString(key, value) - if not staticConfs: - LOGGER.warn( - "Using an existing SparkSession; the static sql configurations will not take" - + " effect." - ) - if not otherConfs: - LOGGER.warn( - "Using an existing SparkSession; some spark core configurations may not take" - + " effect." - ) - def __init__( self, sparkContext: SparkContext, @@ -330,12 +305,12 @@ def __init__( ): jsparkSession = self._jvm.SparkSession.getDefaultSession().get() if options is not None: - self.applyModifiableSetting(jsparkSession, options) + self._jvm.SparkSession.applyModifiableSettings(jsparkSession, options) else: jsparkSession = self._jvm.SparkSession(self._jsc.sc(), options) else: if options is not None: - self.applyModifiableSetting(jsparkSession, options) + self._jvm.SparkSession.applyModifiableSettings(jsparkSession, options) self._jsparkSession = jsparkSession self._jwrapped = self._jsparkSession.sqlContext() self._wrapped = SQLContext(self._sc, self, self._jwrapped) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index df110aa269e7b..8bd611896e527 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -930,7 +930,7 @@ object SparkSession extends Logging { // Get the session from current thread's active session. var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { - applyModifiableSettings(session) + applyModifiableSettings(session, new java.util.HashMap[String, String](options.asJava)) return session } @@ -939,7 +939,7 @@ object SparkSession extends Logging { // If the current thread does not have an active session, get it from the global session. session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { - applyModifiableSettings(session) + applyModifiableSettings(session, new java.util.HashMap[String, String](options.asJava)) return session } @@ -967,22 +967,6 @@ object SparkSession extends Logging { return session } - - private def applyModifiableSettings(session: SparkSession): Unit = { - val (staticConfs, otherConfs) = - options.partition(kv => SQLConf.isStaticConfigKey(kv._1)) - - otherConfs.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } - - if (staticConfs.nonEmpty) { - logWarning("Using an existing SparkSession; the static sql configurations will not take" + - " effect.") - } - if (otherConfs.nonEmpty) { - logWarning("Using an existing SparkSession; some spark core configurations may not take" + - " effect.") - } - } } /** @@ -1074,6 +1058,24 @@ object SparkSession extends Logging { throw new IllegalStateException("No active or default Spark session found"))) } + def applyModifiableSettings( + session: SparkSession, + options: java.util.HashMap[String, String]): Unit = { + val (staticConfs, otherConfs) = + options.asScala.partition(kv => SQLConf.isStaticConfigKey(kv._1)) + + otherConfs.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } + + if (staticConfs.nonEmpty) { + logWarning("Using an existing SparkSession; the static sql configurations will not take" + + " effect.") + } + if (otherConfs.nonEmpty) { + logWarning("Using an existing SparkSession; some spark core configurations may not take" + + " effect.") + } + } + /** * Returns a cloned SparkSession with all specified configurations disabled, or * the original SparkSession if all configurations are already disabled. From 2889e1f4a36e02f48cf031e2f3d485c8082f75b3 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Thu, 2 Dec 2021 07:59:14 +0800 Subject: [PATCH 08/14] update --- python/pyspark/sql/session.py | 8 +++----- .../main/scala/org/apache/spark/sql/SparkSession.scala | 6 +++++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 418da1e27604c..598f9c94e680d 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -291,7 +291,7 @@ def __init__( self, sparkContext: SparkContext, jsparkSession: Optional[JavaObject] = None, - options: Optional[Dict[str, Any]] = {}, + options: Dict[str, Any] = {}, ): from pyspark.sql.context import SQLContext @@ -304,13 +304,11 @@ def __init__( and not self._jvm.SparkSession.getDefaultSession().get().sparkContext().isStopped() ): jsparkSession = self._jvm.SparkSession.getDefaultSession().get() - if options is not None: - self._jvm.SparkSession.applyModifiableSettings(jsparkSession, options) + self._jvm.SparkSession.applyModifiableSettings(jsparkSession, options) else: jsparkSession = self._jvm.SparkSession(self._jsc.sc(), options) else: - if options is not None: - self._jvm.SparkSession.applyModifiableSettings(jsparkSession, options) + self._jvm.SparkSession.applyModifiableSettings(jsparkSession, options) self._jsparkSession = jsparkSession self._jwrapped = self._jsparkSession.sqlContext() self._wrapped = SQLContext(self._sc, self, self._jwrapped) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 8bd611896e527..706365332a0bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -1058,7 +1058,11 @@ object SparkSession extends Logging { throw new IllegalStateException("No active or default Spark session found"))) } - def applyModifiableSettings( + /** + * Apply modifiable settings to an existed SparkSession. This method are used + * both in scala and pyspark, so put this under SparkSession object. + */ + private[sql] def applyModifiableSettings( session: SparkSession, options: java.util.HashMap[String, String]): Unit = { val (staticConfs, otherConfs) = From a7f4a51273bd1d5749f5c9c7a8322442ff8349e3 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Thu, 2 Dec 2021 07:59:52 +0800 Subject: [PATCH 09/14] Update SparkSession.scala --- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 706365332a0bc..e7ddf5d7dd3b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -1060,7 +1060,7 @@ object SparkSession extends Logging { /** * Apply modifiable settings to an existed SparkSession. This method are used - * both in scala and pyspark, so put this under SparkSession object. + * both in scala and Pyspark, so put this under SparkSession object. */ private[sql] def applyModifiableSettings( session: SparkSession, From af2185f6114df1bb445d0f81dfefe80edd554b30 Mon Sep 17 00:00:00 2001 From: AngersZhuuuu Date: Thu, 2 Dec 2021 08:18:08 +0800 Subject: [PATCH 10/14] Update sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala Co-authored-by: Hyukjin Kwon --- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index e7ddf5d7dd3b7..502bff4380a1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -1060,7 +1060,7 @@ object SparkSession extends Logging { /** * Apply modifiable settings to an existed SparkSession. This method are used - * both in scala and Pyspark, so put this under SparkSession object. + * both in Scala and Python, so put this under [[SparkSession]] object. */ private[sql] def applyModifiableSettings( session: SparkSession, From 93c429e5dafb1f86d5a20f68145978a6d4962f98 Mon Sep 17 00:00:00 2001 From: AngersZhuuuu Date: Thu, 2 Dec 2021 08:18:22 +0800 Subject: [PATCH 11/14] Update sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala Co-authored-by: Hyukjin Kwon --- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 502bff4380a1b..cd101e502f8a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -1059,7 +1059,7 @@ object SparkSession extends Logging { } /** - * Apply modifiable settings to an existed SparkSession. This method are used + * Apply modifiable settings to an existing [[SparkSession]]. This method are used * both in Scala and Python, so put this under [[SparkSession]] object. */ private[sql] def applyModifiableSettings( From f6f06cc45413e1633975c09de20f9011608b841a Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Thu, 2 Dec 2021 13:03:59 +0800 Subject: [PATCH 12/14] Update SparkSession.scala --- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index cd101e502f8a7..f566e23735a2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -1062,7 +1062,7 @@ object SparkSession extends Logging { * Apply modifiable settings to an existing [[SparkSession]]. This method are used * both in Scala and Python, so put this under [[SparkSession]] object. */ - private[sql] def applyModifiableSettings( + def applyModifiableSettings( session: SparkSession, options: java.util.HashMap[String, String]): Unit = { val (staticConfs, otherConfs) = From 47480cfa6c3b5978173344f03c8148c751aaa754 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Thu, 2 Dec 2021 23:28:41 +0800 Subject: [PATCH 13/14] update --- python/pyspark/sql/session.py | 12 +++++++++--- .../scala/org/apache/spark/sql/SparkSession.scala | 2 +- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 598f9c94e680d..616a01ff82800 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -278,7 +278,9 @@ def getOrCreate(self) -> "SparkSession": # by all sessions. session = SparkSession(sc, options=self._options) else: - session._jvm.SparkSession.applyModifiableSettings(session._jsparkSession, self._options) + getattr(getattr(session._jvm, "SparkSession$"), "MODULE$").applyModifiableSettings( + session._jsparkSession, self._options + ) return session builder = Builder() @@ -304,11 +306,15 @@ def __init__( and not self._jvm.SparkSession.getDefaultSession().get().sparkContext().isStopped() ): jsparkSession = self._jvm.SparkSession.getDefaultSession().get() - self._jvm.SparkSession.applyModifiableSettings(jsparkSession, options) + getattr(getattr(self._jvm, "SparkSession$"), "MODULE$").applyModifiableSettings( + jsparkSession, options + ) else: jsparkSession = self._jvm.SparkSession(self._jsc.sc(), options) else: - self._jvm.SparkSession.applyModifiableSettings(jsparkSession, options) + getattr(getattr(self._jvm, "SparkSession$"), "MODULE$").applyModifiableSettings( + jsparkSession, options + ) self._jsparkSession = jsparkSession self._jwrapped = self._jsparkSession.sqlContext() self._wrapped = SQLContext(self._sc, self, self._jwrapped) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index f566e23735a2d..cd101e502f8a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -1062,7 +1062,7 @@ object SparkSession extends Logging { * Apply modifiable settings to an existing [[SparkSession]]. This method are used * both in Scala and Python, so put this under [[SparkSession]] object. */ - def applyModifiableSettings( + private[sql] def applyModifiableSettings( session: SparkSession, options: java.util.HashMap[String, String]): Unit = { val (staticConfs, otherConfs) = From 8e66ec455f0ead26fdbe0d938d7165c613b8beb1 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Thu, 2 Dec 2021 23:29:44 +0800 Subject: [PATCH 14/14] Update session.py --- python/pyspark/sql/session.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 616a01ff82800..586af62269a0a 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -278,9 +278,9 @@ def getOrCreate(self) -> "SparkSession": # by all sessions. session = SparkSession(sc, options=self._options) else: - getattr(getattr(session._jvm, "SparkSession$"), "MODULE$").applyModifiableSettings( - session._jsparkSession, self._options - ) + getattr( + getattr(session._jvm, "SparkSession$"), "MODULE$" + ).applyModifiableSettings(session._jsparkSession, self._options) return session builder = Builder()