From 2e7399495dbbbdc265220b9653bcf2a49fd353ce Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Sun, 31 Jul 2016 23:10:29 -0700 Subject: [PATCH 1/7] rename setLogLevel --- R/pkg/NAMESPACE | 2 +- R/pkg/R/context.R | 8 ++++---- R/pkg/inst/tests/testthat/test_context.R | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 5e625b2d8dbb4..1db07686b5184 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -43,7 +43,7 @@ export("setJobGroup", "cancelJobGroup") # Export Utility methods -export("setLogLevel") +export("sc.setLogLevel") exportClasses("SparkDataFrame") diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 13ade49eabfa6..26cabced38260 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -275,15 +275,15 @@ spark.lapply <- function(list, func) { #' #' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN" #' -#' @rdname setLogLevel +#' @rdname sc.setLogLevel #' @param level New log level #' @export #' @examples #'\dontrun{ -#' setLogLevel("ERROR") +#' sc.setLogLevel("ERROR") #'} -#' @note setLogLevel since 2.0.0 -setLogLevel <- function(level) { +#' @note sc.setLogLevel since 2.0.0 +sc.setLogLevel <- function(level) { sc <- getSparkContext() callJMethod(sc, "setLogLevel", level) } diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 1ab7f319df9ff..3e4804d19bb9b 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -109,7 +109,7 @@ test_that("job group functions can be called", { test_that("utility function can be called", { sparkR.sparkContext() - setLogLevel("ERROR") + sc.setLogLevel("ERROR") sparkR.session.stop() }) From 1a2093a9b0bc15d8046cdf102159368e78530e8f Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Tue, 2 Aug 2016 11:47:43 -0700 Subject: [PATCH 2/7] revert change of rename function name in R; add a case match when initializing log --- R/pkg/NAMESPACE | 2 +- R/pkg/R/context.R | 8 ++++---- R/pkg/inst/tests/testthat/test_context.R | 2 +- .../main/scala/org/apache/spark/internal/Logging.scala | 8 +++++++- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 1db07686b5184..5e625b2d8dbb4 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -43,7 +43,7 @@ export("setJobGroup", "cancelJobGroup") # Export Utility methods -export("sc.setLogLevel") +export("setLogLevel") exportClasses("SparkDataFrame") diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 26cabced38260..13ade49eabfa6 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -275,15 +275,15 @@ spark.lapply <- function(list, func) { #' #' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN" #' -#' @rdname sc.setLogLevel +#' @rdname setLogLevel #' @param level New log level #' @export #' @examples #'\dontrun{ -#' sc.setLogLevel("ERROR") +#' setLogLevel("ERROR") #'} -#' @note sc.setLogLevel since 2.0.0 -sc.setLogLevel <- function(level) { +#' @note setLogLevel since 2.0.0 +setLogLevel <- function(level) { sc <- getSparkContext() callJMethod(sc, "setLogLevel", level) } diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 3e4804d19bb9b..1ab7f319df9ff 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -109,7 +109,7 @@ test_that("job group functions can be called", { test_that("utility function can be called", { sparkR.sparkContext() - sc.setLogLevel("ERROR") + setLogLevel("ERROR") sparkR.session.stop() }) diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala index 66a0cfec6296d..186accbee76dd 100644 --- a/core/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala @@ -133,9 +133,15 @@ private[spark] trait Logging { val rootLogger = LogManager.getRootLogger() val replLogger = LogManager.getLogger(logName) val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN) + val callerClassName = this.getClass.getName if (replLevel != rootLogger.getEffectiveLevel()) { System.err.printf("Setting default log level to \"%s\".\n", replLevel) - System.err.println("To adjust logging level use sc.setLogLevel(newLevel).") + callerClassName match { + case "org.apache.spark.api.r.RBackend$" => + System.err.println("To adjust logging level use setLogLevel(newLevel).") + case _ => + System.err.println("To adjust logging level use sc.setLogLevel(newLevel).") + } rootLogger.setLevel(replLevel) } } From ca4094ef7fda1f4e903e9fb2f3cb234637e969e1 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Mon, 15 Aug 2016 14:46:32 -0700 Subject: [PATCH 3/7] add Enum and API for checking shell type --- .../org/apache/spark/deploy/SparkSubmit.scala | 32 +++++++++++++++++++ .../org/apache/spark/internal/Logging.scala | 8 ++--- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 7b6d5a394bc35..a71e42b67af71 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -57,6 +57,18 @@ private[deploy] object SparkSubmitAction extends Enumeration { val SUBMIT, KILL, REQUEST_STATUS = Value } +/** + * Whether the application is launched from: + * SPARK_SHELL: ./bin/spark-shell + * SPARKR_SHELL: ./bin/sparkR + * PYSPARK_SHELL: ./bin/pyspark + * NON_SHELL: It is not launched through the above three shells + */ +private[spark] object SparkShellType extends Enumeration { + type SparkShellTYpe = Value + val SPARK_SHELL, SPARKR_SHELL, PYSPARK_SHELL, NON_SHELL = Value +} + /** * Main gateway of launching a Spark application. * @@ -111,6 +123,7 @@ object SparkSubmit { printStream.println("Type --help for more information.") exitFn(0) } + private[spark] var shellType = SparkShellType.NON_SHELL // scalastyle:on println def main(args: Array[String]): Unit = { @@ -227,6 +240,7 @@ object SparkSubmit { val childClasspath = new ArrayBuffer[String]() val sysProps = new HashMap[String, String]() var childMainClass = "" + initShellType(args.primaryResource) // Set the cluster manager val clusterManager: Int = args.master match { @@ -800,6 +814,24 @@ object SparkSubmit { res == SparkLauncher.NO_RESOURCE } + /** + * Initialize the shellType when launching the application + * Default: NON_SHELL + */ + private[deploy] def initShellType(res: String): Unit = { + require(res != null) + res match { + case SPARK_SHELL => shellType = SparkShellType.SPARK_SHELL + case SPARKR_SHELL => shellType = SparkShellType.SPARKR_SHELL + case PYSPARK_SHELL => shellType = SparkShellType.PYSPARK_SHELL + case _ => + } + } + + private[spark] def getShellType(): SparkShellType.Value = { + shellType + } + /** * Merge a sequence of comma-separated file lists, some of which may be null to indicate * no files, into a single comma-separated string. diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala index 186accbee76dd..1f1c83c8c6b7d 100644 --- a/core/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala @@ -18,9 +18,9 @@ package org.apache.spark.internal import org.apache.log4j.{Level, LogManager, PropertyConfigurator} +import org.apache.spark.deploy.{SparkShellType, SparkSubmit} import org.slf4j.{Logger, LoggerFactory} import org.slf4j.impl.StaticLoggerBinder - import org.apache.spark.util.Utils /** @@ -133,11 +133,11 @@ private[spark] trait Logging { val rootLogger = LogManager.getRootLogger() val replLogger = LogManager.getLogger(logName) val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN) - val callerClassName = this.getClass.getName + val shellType = SparkSubmit.getShellType() if (replLevel != rootLogger.getEffectiveLevel()) { System.err.printf("Setting default log level to \"%s\".\n", replLevel) - callerClassName match { - case "org.apache.spark.api.r.RBackend$" => + shellType match { + case SparkShellType.SPARKR_SHELL => System.err.println("To adjust logging level use setLogLevel(newLevel).") case _ => System.err.println("To adjust logging level use sc.setLogLevel(newLevel).") From 1b90e97e3ce67a93af3c1d0908cbb02013fbe299 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Mon, 15 Aug 2016 15:03:05 -0700 Subject: [PATCH 4/7] fix scala style error --- core/src/main/scala/org/apache/spark/internal/Logging.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala index 1f1c83c8c6b7d..7fb11b16251e7 100644 --- a/core/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala @@ -18,9 +18,10 @@ package org.apache.spark.internal import org.apache.log4j.{Level, LogManager, PropertyConfigurator} -import org.apache.spark.deploy.{SparkShellType, SparkSubmit} import org.slf4j.{Logger, LoggerFactory} import org.slf4j.impl.StaticLoggerBinder + +import org.apache.spark.deploy.{SparkShellType, SparkSubmit} import org.apache.spark.util.Utils /** From 89d2b81a0c9500a7568ee4423cb98e8d42a06bf9 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Fri, 26 Aug 2016 10:50:16 -0700 Subject: [PATCH 5/7] revert previous change and add a simple function; manually tested --- .../org/apache/spark/deploy/SparkSubmit.scala | 35 ++++--------------- .../org/apache/spark/internal/Logging.scala | 13 ++++--- 2 files changed, 13 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index a71e42b67af71..c21f3d30234e9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -24,7 +24,6 @@ import java.security.PrivilegedExceptionAction import scala.annotation.tailrec import scala.collection.mutable.{ArrayBuffer, HashMap, Map} - import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.hadoop.security.UserGroupInformation @@ -39,7 +38,6 @@ import org.apache.ivy.core.settings.IvySettings import org.apache.ivy.plugins.matcher.GlobPatternMatcher import org.apache.ivy.plugins.repository.file.FileRepository import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver} - import org.apache.spark.{SPARK_REVISION, SPARK_VERSION, SparkException, SparkUserAppException} import org.apache.spark.{SPARK_BRANCH, SPARK_BUILD_DATE, SPARK_BUILD_USER, SPARK_REPO_URL} import org.apache.spark.api.r.RUtils @@ -57,18 +55,6 @@ private[deploy] object SparkSubmitAction extends Enumeration { val SUBMIT, KILL, REQUEST_STATUS = Value } -/** - * Whether the application is launched from: - * SPARK_SHELL: ./bin/spark-shell - * SPARKR_SHELL: ./bin/sparkR - * PYSPARK_SHELL: ./bin/pyspark - * NON_SHELL: It is not launched through the above three shells - */ -private[spark] object SparkShellType extends Enumeration { - type SparkShellTYpe = Value - val SPARK_SHELL, SPARKR_SHELL, PYSPARK_SHELL, NON_SHELL = Value -} - /** * Main gateway of launching a Spark application. * @@ -123,7 +109,7 @@ object SparkSubmit { printStream.println("Type --help for more information.") exitFn(0) } - private[spark] var shellType = SparkShellType.NON_SHELL + private[deploy] var rShell: Boolean = false // scalastyle:on println def main(args: Array[String]): Unit = { @@ -240,7 +226,7 @@ object SparkSubmit { val childClasspath = new ArrayBuffer[String]() val sysProps = new HashMap[String, String]() var childMainClass = "" - initShellType(args.primaryResource) + initRShell(args.primaryResource) // Set the cluster manager val clusterManager: Int = args.master match { @@ -815,22 +801,15 @@ object SparkSubmit { } /** - * Initialize the shellType when launching the application - * Default: NON_SHELL + * Initialize whether it is a R shell when launching the application + * Default: false */ - private[deploy] def initShellType(res: String): Unit = { + private[deploy] def initRShell(res: String): Unit = { require(res != null) - res match { - case SPARK_SHELL => shellType = SparkShellType.SPARK_SHELL - case SPARKR_SHELL => shellType = SparkShellType.SPARKR_SHELL - case PYSPARK_SHELL => shellType = SparkShellType.PYSPARK_SHELL - case _ => - } + if (res == SPARKR_SHELL) rShell = true } - private[spark] def getShellType(): SparkShellType.Value = { - shellType - } + private[spark] def isRShell: Boolean = rShell /** * Merge a sequence of comma-separated file lists, some of which may be null to indicate diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala index 7fb11b16251e7..770089a1e8618 100644 --- a/core/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala @@ -21,7 +21,7 @@ import org.apache.log4j.{Level, LogManager, PropertyConfigurator} import org.slf4j.{Logger, LoggerFactory} import org.slf4j.impl.StaticLoggerBinder -import org.apache.spark.deploy.{SparkShellType, SparkSubmit} +import org.apache.spark.deploy.SparkSubmit import org.apache.spark.util.Utils /** @@ -134,14 +134,13 @@ private[spark] trait Logging { val rootLogger = LogManager.getRootLogger() val replLogger = LogManager.getLogger(logName) val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN) - val shellType = SparkSubmit.getShellType() if (replLevel != rootLogger.getEffectiveLevel()) { System.err.printf("Setting default log level to \"%s\".\n", replLevel) - shellType match { - case SparkShellType.SPARKR_SHELL => - System.err.println("To adjust logging level use setLogLevel(newLevel).") - case _ => - System.err.println("To adjust logging level use sc.setLogLevel(newLevel).") + if (SparkSubmit.isRShell) { + System.err.println("To adjust logging level use setLogLevel(newLevel).") + } + else { + System.err.println("To adjust logging level use sc.setLogLevel(newLevel).") } rootLogger.setLevel(replLevel) } From f10d5471269c3e67562207e6852c2d3b9a4899be Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Fri, 26 Aug 2016 10:51:54 -0700 Subject: [PATCH 6/7] add blankline --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c21f3d30234e9..611ae89aac18a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -24,6 +24,7 @@ import java.security.PrivilegedExceptionAction import scala.annotation.tailrec import scala.collection.mutable.{ArrayBuffer, HashMap, Map} + import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.hadoop.security.UserGroupInformation @@ -38,6 +39,7 @@ import org.apache.ivy.core.settings.IvySettings import org.apache.ivy.plugins.matcher.GlobPatternMatcher import org.apache.ivy.plugins.repository.file.FileRepository import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver} + import org.apache.spark.{SPARK_REVISION, SPARK_VERSION, SparkException, SparkUserAppException} import org.apache.spark.{SPARK_BRANCH, SPARK_BUILD_DATE, SPARK_BUILD_USER, SPARK_REPO_URL} import org.apache.spark.api.r.RUtils From ef8887d287830113886cbf61c313a59819b94862 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Thu, 1 Sep 2016 23:24:40 -0700 Subject: [PATCH 7/7] simplify the solution --- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 13 ------------- .../scala/org/apache/spark/internal/Logging.scala | 9 ++------- 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 611ae89aac18a..7b6d5a394bc35 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -111,7 +111,6 @@ object SparkSubmit { printStream.println("Type --help for more information.") exitFn(0) } - private[deploy] var rShell: Boolean = false // scalastyle:on println def main(args: Array[String]): Unit = { @@ -228,7 +227,6 @@ object SparkSubmit { val childClasspath = new ArrayBuffer[String]() val sysProps = new HashMap[String, String]() var childMainClass = "" - initRShell(args.primaryResource) // Set the cluster manager val clusterManager: Int = args.master match { @@ -802,17 +800,6 @@ object SparkSubmit { res == SparkLauncher.NO_RESOURCE } - /** - * Initialize whether it is a R shell when launching the application - * Default: false - */ - private[deploy] def initRShell(res: String): Unit = { - require(res != null) - if (res == SPARKR_SHELL) rShell = true - } - - private[spark] def isRShell: Boolean = rShell - /** * Merge a sequence of comma-separated file lists, some of which may be null to indicate * no files, into a single comma-separated string. diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala index 770089a1e8618..4a029a1d9714e 100644 --- a/core/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala @@ -21,7 +21,6 @@ import org.apache.log4j.{Level, LogManager, PropertyConfigurator} import org.slf4j.{Logger, LoggerFactory} import org.slf4j.impl.StaticLoggerBinder -import org.apache.spark.deploy.SparkSubmit import org.apache.spark.util.Utils /** @@ -136,12 +135,8 @@ private[spark] trait Logging { val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN) if (replLevel != rootLogger.getEffectiveLevel()) { System.err.printf("Setting default log level to \"%s\".\n", replLevel) - if (SparkSubmit.isRShell) { - System.err.println("To adjust logging level use setLogLevel(newLevel).") - } - else { - System.err.println("To adjust logging level use sc.setLogLevel(newLevel).") - } + System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " + + "For SparkR, use setLogLevel(newLevel).") rootLogger.setLevel(replLevel) } }