From 4f113662035e719d0ca1ecef44b426a1d0a4fb65 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 24 May 2016 18:03:28 +0800 Subject: [PATCH 1/4] [SPARK-15345][SQL][PYSPARK]. SparkSession's conf doesn't take effect when this already an existing SparkContext --- .../main/scala/org/apache/spark/SparkContext.scala | 1 + .../scala/org/apache/spark/sql/SparkSession.scala | 10 ++++++++-- .../apache/spark/sql/SparkSessionBuilderSuite.scala | 13 ++++++++++++- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e6cdd0d298f37..c29d699556252 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2245,6 +2245,7 @@ object SparkContext extends Logging { if (activeContext.get() == null) { setActiveContext(new SparkContext(config), allowMultipleContexts = false) } + logWarning("Use an existing SparkContext, some configuration may not take effect.") activeContext.get() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index f697769bdcdb5..a80825b5290ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -633,7 +633,7 @@ object SparkSession { /** * Builder for [[SparkSession]]. */ - class Builder { + class Builder extends Logging { private[this] val options = new scala.collection.mutable.HashMap[String, String] @@ -750,6 +750,7 @@ object SparkSession { var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.conf.set(k, v) } + logWarning("Use an existing SparkSession, some configuration may not take effect.") return session } @@ -759,6 +760,7 @@ object SparkSession { session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.conf.set(k, v) } + logWarning("Use an existing SparkSession, some configuration may not take effect.") return session } @@ -771,7 +773,11 @@ object SparkSession { val sparkConf = new SparkConf() options.foreach { case (k, v) => sparkConf.set(k, v) } - SparkContext.getOrCreate(sparkConf) + val sc = SparkContext.getOrCreate(sparkConf) + // maybe this is an existing SparkContext, update its SparkConf which maybe used + // by SparkSession + options.foreach { case (k, v) => sc.conf.set(k, v) } + sc } session = new SparkSession(sparkContext) options.foreach { case (k, v) => session.conf.set(k, v) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index ec6a2b3575869..ac7a63f7d3fdc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} /** * Test cases for the builder pattern of [[SparkSession]]. @@ -90,4 +90,15 @@ class SparkSessionBuilderSuite extends SparkFunSuite { assert(newSession != activeSession) newSession.stop() } + + test("create sparkContext first then sparkSession") { + sparkContext.stop() + val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1") + val sparkContext2 = new SparkContext(conf) + val session = SparkSession.builder().config("key2", "value2").getOrCreate() + assert(session.conf.get("key1") == "value1") + assert(session.conf.get("key2") == "value2") + assert(session.sparkContext.conf.get("key1") == "value1") + assert(session.sparkContext.conf.get("key2") == "value2") + } } From d89bd060c0a6d6fc71f34fb1ab77a3f269e7cb08 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 24 May 2016 20:10:37 +0800 Subject: [PATCH 2/4] fix test failure --- .../scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index ac7a63f7d3fdc..6379639682f31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -100,5 +100,6 @@ class SparkSessionBuilderSuite extends SparkFunSuite { assert(session.conf.get("key2") == "value2") assert(session.sparkContext.conf.get("key1") == "value1") assert(session.sparkContext.conf.get("key2") == "value2") + session.stop() } } From a4f160e77535f9c15e95db2ced636ad3a58f35fc Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 24 May 2016 20:58:04 +0800 Subject: [PATCH 3/4] fix build --- .../scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 6379639682f31..786956df8a555 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -91,7 +91,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite { newSession.stop() } - test("create sparkContext first then sparkSession") { + test("create SparkContext first then SparkSession") { sparkContext.stop() val conf = new SparkConf().setAppName("test").setMaster("local").set("key1", "value1") val sparkContext2 = new SparkContext(conf) From 071839f7255c9136d0b094f38cd0e98bf6f23a53 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Wed, 25 May 2016 08:36:36 +0800 Subject: [PATCH 4/4] address comments --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 +++- .../main/scala/org/apache/spark/sql/SparkSession.scala | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c29d699556252..87250268595e7 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2245,7 +2245,9 @@ object SparkContext extends Logging { if (activeContext.get() == null) { setActiveContext(new SparkContext(config), allowMultipleContexts = false) } - logWarning("Use an existing SparkContext, some configuration may not take effect.") + if (config.getAll.nonEmpty) { + logWarning("Use an existing SparkContext, some configuration may not take effect.") + } activeContext.get() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index a80825b5290ee..9cfc1df6300eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -750,7 +750,9 @@ object SparkSession { var session = activeThreadSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.conf.set(k, v) } - logWarning("Use an existing SparkSession, some configuration may not take effect.") + if (options.nonEmpty) { + logWarning("Use an existing SparkSession, some configuration may not take effect.") + } return session } @@ -760,7 +762,9 @@ object SparkSession { session = defaultSession.get() if ((session ne null) && !session.sparkContext.isStopped) { options.foreach { case (k, v) => session.conf.set(k, v) } - logWarning("Use an existing SparkSession, some configuration may not take effect.") + if (options.nonEmpty) { + logWarning("Use an existing SparkSession, some configuration may not take effect.") + } return session }