From cedfbe211070b09bc109823f3fb74ba20b70feb2 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 10 Jun 2015 19:26:02 +0800 Subject: [PATCH 01/17] [SPARK-6797][SPARKR] Add support for YARN cluster mode. --- R/install-dev.bat | 5 +++++ R/install-dev.sh | 8 ++++++-- R/pkg/inst/profile/general.R | 4 ++-- .../scala/org/apache/spark/deploy/RRunner.scala | 7 +++++-- .../org/apache/spark/deploy/SparkSubmit.scala | 17 +++++++++++++++++ make-distribution.sh | 1 + 6 files changed, 36 insertions(+), 6 deletions(-) diff --git a/R/install-dev.bat b/R/install-dev.bat index 008a5c668bc45..b7505e1a10328 100644 --- a/R/install-dev.bat +++ b/R/install-dev.bat @@ -25,3 +25,8 @@ set SPARK_HOME=%~dp0.. MKDIR %SPARK_HOME%\R\lib R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\ + +REM Zip the SparkR package so that it can be distributed to worker nodes on YARN +pushd %SPARK_HOME%\R\lib +jar cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR +popd diff --git a/R/install-dev.sh b/R/install-dev.sh index 1edd551f8d243..4972bb9217072 100755 --- a/R/install-dev.sh +++ b/R/install-dev.sh @@ -34,7 +34,7 @@ LIB_DIR="$FWDIR/lib" mkdir -p $LIB_DIR -pushd $FWDIR +pushd $FWDIR > /dev/null # Generate Rd files if devtools is installed Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }' @@ -42,4 +42,8 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo # Install SparkR to $LIB_DIR R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/ -popd +# Zip the SparkR package so that it can be distributed to worker nodes on YARN +cd $LIB_DIR +jar cfM "$LIB_DIR/sparkr.zip" SparkR + +popd > /dev/null diff --git a/R/pkg/inst/profile/general.R b/R/pkg/inst/profile/general.R index 8fe711b622086..2a8a8213d0849 100644 --- a/R/pkg/inst/profile/general.R +++ b/R/pkg/inst/profile/general.R @@ -16,7 +16,7 @@ # .First <- function() { - home <- Sys.getenv("SPARK_HOME") - .libPaths(c(file.path(home, "R", "lib"), .libPaths())) + packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR") + .libPaths(c(packageDir, .libPaths())) Sys.setenv(NOAWT=1) } diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala index 4165740312e03..ef3fefa37ee55 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala @@ -71,9 +71,12 @@ object RRunner { val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs) val env = builder.environment() env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString) - val sparkHome = System.getenv("SPARK_HOME") + // The SparkR package distributed as an archive resource should be pointed to + // by a symbol link "sparkr" in the current directory. + val rPackageDir = new File("sparkr").getAbsolutePath + env.put("SPARKR_PACKAGE_DIR", rPackageDir) env.put("R_PROFILE_USER", - Seq(sparkHome, "R", "lib", "SparkR", "profile", "general.R").mkString(File.separator)) + Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator)) builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize val process = builder.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4cec9017b8adb..4305003a0b6a7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -79,6 +79,7 @@ object SparkSubmit { private val SPARK_SHELL = "spark-shell" private val PYSPARK_SHELL = "pyspark-shell" private val SPARKR_SHELL = "sparkr-shell" + private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip" private val CLASS_NOT_FOUND_EXIT_STATUS = 101 @@ -347,6 +348,22 @@ object SparkSubmit { } } + // In yarn mode for an R app, add the SparkR package archive to archives + // that can be distributed with the job + if (args.isR && clusterManager == YARN) { + val sparkHome = sys.env.get("SPARK_HOME") + if (sparkHome.isEmpty) { + printErrorAndExit("SPARK_HOME does not exist for R application in yarn mode.") + } + val rPackagePath = Seq(sparkHome.get, "R", "lib").mkString(File.separator) + val rPackageFile = new File(rPackagePath, SPARKR_PACKAGE_ARCHIVE) + if (!rPackageFile.exists()) { + printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in yarn mode.") + } + val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath) + args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr") + } + // If we're running a R app, set the main class to our specific R runner if (args.isR && deployMode == CLIENT) { if (args.primaryResource == SPARKR_SHELL) { diff --git a/make-distribution.sh b/make-distribution.sh index 9f063da3a16c0..cac7032bb2e87 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -219,6 +219,7 @@ cp -r "$SPARK_HOME/ec2" "$DISTDIR" if [ -d "$SPARK_HOME"/R/lib/SparkR ]; then mkdir -p "$DISTDIR"/R/lib cp -r "$SPARK_HOME/R/lib/SparkR" "$DISTDIR"/R/lib + cp "$SPARK_HOME/R/lib/sparkr.zip" "$DISTDIR"/R/lib fi # Download and copy in tachyon, if requested From 681afb044573d3204f3122cb6eeeda7f3d938215 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Thu, 11 Jun 2015 12:42:04 +0800 Subject: [PATCH 02/17] Fix a bug that RRunner does not handle client deployment modes. --- .../main/scala/org/apache/spark/deploy/RRunner.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala index ef3fefa37ee55..233af3271d346 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala @@ -71,9 +71,15 @@ object RRunner { val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs) val env = builder.environment() env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString) - // The SparkR package distributed as an archive resource should be pointed to - // by a symbol link "sparkr" in the current directory. - val rPackageDir = new File("sparkr").getAbsolutePath + val rPackageDir = + if (System.getProperty("spark.master") == "yarn-cluster") { + // The SparkR package distributed as an archive resource should be pointed to + // by a symbol link "sparkr" in the current directory. + new File("sparkr").getAbsolutePath + } else { + val sparkHome = System.getenv("SPARK_HOME") + Seq(sparkHome, "R", "lib").mkString(File.separator) + } env.put("SPARKR_PACKAGE_DIR", rPackageDir) env.put("R_PROFILE_USER", Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator)) From 3bed4383f7f6731ca940f1bce4e2508baaaa6dee Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Sat, 13 Jun 2015 00:48:49 +0800 Subject: [PATCH 03/17] Add a comment. --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4305003a0b6a7..439705ea84fb8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -361,6 +361,8 @@ object SparkSubmit { printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in yarn mode.") } val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath) + + // Assigns a symbol link name "sparkr" to the shipped package. args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr") } From 7b916c51c7bdf458977d395d50618392d6f516fb Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Sun, 14 Jun 2015 22:58:37 +0800 Subject: [PATCH 04/17] Use 'rem' consistently. --- R/install-dev.bat | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/install-dev.bat b/R/install-dev.bat index b7505e1a10328..c61d3ceaff5c7 100644 --- a/R/install-dev.bat +++ b/R/install-dev.bat @@ -26,7 +26,7 @@ MKDIR %SPARK_HOME%\R\lib R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\ -REM Zip the SparkR package so that it can be distributed to worker nodes on YARN +rem Zip the SparkR package so that it can be distributed to worker nodes on YARN pushd %SPARK_HOME%\R\lib jar cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR popd From 49ff9488528d46a9da60451c063d698bd8f31007 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Sun, 14 Jun 2015 23:04:23 +0800 Subject: [PATCH 05/17] Invoke jar.exe with full path in install-dev.bat. --- R/install-dev.bat | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/install-dev.bat b/R/install-dev.bat index c61d3ceaff5c7..f32670b67de96 100644 --- a/R/install-dev.bat +++ b/R/install-dev.bat @@ -28,5 +28,5 @@ R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\ rem Zip the SparkR package so that it can be distributed to worker nodes on YARN pushd %SPARK_HOME%\R\lib -jar cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR +%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR popd From 41d4f1712cb88298daebcb03c030a7f2a4895a52 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Mon, 29 Jun 2015 21:58:07 +0800 Subject: [PATCH 06/17] Add support for locating SparkR package for R workers required by RDD APIs. --- R/pkg/DESCRIPTION | 1 - R/pkg/R/RDD.R | 2 - R/pkg/R/pairRDD.R | 1 - R/pkg/R/sparkR.R | 10 ----- R/pkg/R/zzz.R | 20 --------- .../scala/org/apache/spark/api/r/RRDD.scala | 21 ++++------ .../scala/org/apache/spark/api/r/RUtils.scala | 42 +++++++++++++++++++ .../org/apache/spark/deploy/RRunner.scala | 12 +----- make-distribution.sh | 1 - 9 files changed, 53 insertions(+), 57 deletions(-) delete mode 100644 R/pkg/R/zzz.R create mode 100644 core/src/main/scala/org/apache/spark/api/r/RUtils.scala diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index efc85bbc4b316..d028821534b1a 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -32,4 +32,3 @@ Collate: 'serialize.R' 'sparkR.R' 'utils.R' - 'zzz.R' diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 89511141d3ef7..d2d096709245d 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -165,7 +165,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"), serializedFuncArr, rdd@env$prev_serializedMode, packageNamesArr, - as.character(.sparkREnv[["libname"]]), broadcastArr, callJMethod(prev_jrdd, "classTag")) } else { @@ -175,7 +174,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"), rdd@env$prev_serializedMode, serializedMode, packageNamesArr, - as.character(.sparkREnv[["libname"]]), broadcastArr, callJMethod(prev_jrdd, "classTag")) } diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R index 0f1179e0aa51a..ebc6ff65e9d0f 100644 --- a/R/pkg/R/pairRDD.R +++ b/R/pkg/R/pairRDD.R @@ -215,7 +215,6 @@ setMethod("partitionBy", serializedHashFuncBytes, getSerializedMode(x), packageNamesArr, - as.character(.sparkREnv$libname), broadcastArr, callJMethod(jrdd, "classTag")) diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 048eb8ed541e4..172335809dec2 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -17,10 +17,6 @@ .sparkREnv <- new.env() -sparkR.onLoad <- function(libname, pkgname) { - .sparkREnv$libname <- libname -} - # Utility function that returns TRUE if we have an active connection to the # backend and FALSE otherwise connExists <- function(env) { @@ -80,7 +76,6 @@ sparkR.stop <- function() { #' @param sparkEnvir Named list of environment variables to set on worker nodes. #' @param sparkExecutorEnv Named list of environment variables to be used when launching executors. #' @param sparkJars Character string vector of jar files to pass to the worker nodes. -#' @param sparkRLibDir The path where R is installed on the worker nodes. #' @param sparkPackages Character string vector of packages from spark-packages.org #' @export #' @examples @@ -101,7 +96,6 @@ sparkR.init <- function( sparkEnvir = list(), sparkExecutorEnv = list(), sparkJars = "", - sparkRLibDir = "", sparkPackages = "") { if (exists(".sparkRjsc", envir = .sparkREnv)) { @@ -170,10 +164,6 @@ sparkR.init <- function( sparkHome <- normalizePath(sparkHome) } - if (nchar(sparkRLibDir) != 0) { - .sparkREnv$libname <- sparkRLibDir - } - sparkEnvirMap <- new.env() for (varname in names(sparkEnvir)) { sparkEnvirMap[[varname]] <- sparkEnvir[[varname]] diff --git a/R/pkg/R/zzz.R b/R/pkg/R/zzz.R deleted file mode 100644 index 301feade65fa3..0000000000000 --- a/R/pkg/R/zzz.R +++ /dev/null @@ -1,20 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -.onLoad <- function(libname, pkgname) { - sparkR.onLoad(libname, pkgname) -} diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala index ff1702f7dea48..cd593159114f5 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala @@ -39,7 +39,6 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag]( deserializer: String, serializer: String, packageNames: Array[Byte], - rLibDir: String, broadcastVars: Array[Broadcast[Object]]) extends RDD[U](parent) with Logging { protected var dataStream: DataInputStream = _ @@ -60,7 +59,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag]( // The stdout/stderr is shared by multiple tasks, because we use one daemon // to launch child process as worker. - val errThread = RRDD.createRWorker(rLibDir, listenPort) + val errThread = RRDD.createRWorker(listenPort) // We use two sockets to separate input and output, then it's easy to manage // the lifecycle of them to avoid deadlock. @@ -235,11 +234,10 @@ private class PairwiseRRDD[T: ClassTag]( hashFunc: Array[Byte], deserializer: String, packageNames: Array[Byte], - rLibDir: String, broadcastVars: Array[Object]) extends BaseRRDD[T, (Int, Array[Byte])]( parent, numPartitions, hashFunc, deserializer, - SerializationFormats.BYTE, packageNames, rLibDir, + SerializationFormats.BYTE, packageNames, broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) { override protected def readData(length: Int): (Int, Array[Byte]) = { @@ -266,10 +264,9 @@ private class RRDD[T: ClassTag]( deserializer: String, serializer: String, packageNames: Array[Byte], - rLibDir: String, broadcastVars: Array[Object]) extends BaseRRDD[T, Array[Byte]]( - parent, -1, func, deserializer, serializer, packageNames, rLibDir, + parent, -1, func, deserializer, serializer, packageNames, broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) { override protected def readData(length: Int): Array[Byte] = { @@ -293,10 +290,9 @@ private class StringRRDD[T: ClassTag]( func: Array[Byte], deserializer: String, packageNames: Array[Byte], - rLibDir: String, broadcastVars: Array[Object]) extends BaseRRDD[T, String]( - parent, -1, func, deserializer, SerializationFormats.STRING, packageNames, rLibDir, + parent, -1, func, deserializer, SerializationFormats.STRING, packageNames, broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) { override protected def readData(length: Int): String = { @@ -392,9 +388,10 @@ private[r] object RRDD { thread } - private def createRProcess(rLibDir: String, port: Int, script: String): BufferedStreamThread = { + private def createRProcess(port: Int, script: String): BufferedStreamThread = { val rCommand = SparkEnv.get.conf.get("spark.sparkr.r.command", "Rscript") val rOptions = "--vanilla" + val rLibDir = RUtils.sparkRPackagePath(false) val rExecScript = rLibDir + "/SparkR/worker/" + script val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript)) // Unset the R_TESTS environment variable for workers. @@ -413,7 +410,7 @@ private[r] object RRDD { /** * ProcessBuilder used to launch worker R processes. */ - def createRWorker(rLibDir: String, port: Int): BufferedStreamThread = { + def createRWorker(port: Int): BufferedStreamThread = { val useDaemon = SparkEnv.get.conf.getBoolean("spark.sparkr.use.daemon", true) if (!Utils.isWindows && useDaemon) { synchronized { @@ -421,7 +418,7 @@ private[r] object RRDD { // we expect one connections val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost")) val daemonPort = serverSocket.getLocalPort - errThread = createRProcess(rLibDir, daemonPort, "daemon.R") + errThread = createRProcess(daemonPort, "daemon.R") // the socket used to send out the input of task serverSocket.setSoTimeout(10000) val sock = serverSocket.accept() @@ -443,7 +440,7 @@ private[r] object RRDD { errThread } } else { - createRProcess(rLibDir, port, "worker.R") + createRProcess(port, "worker.R") } } diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala new file mode 100644 index 0000000000000..e8cfd26cf8dc3 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.r + +import java.io.{File} + +import org.apache.spark.SparkException + +private[spark] object RUtils { + /** Get the SparkR package path in various deployment modes **/ + def sparkRPackagePath(driver: Boolean): String = { + val yarnMode = sys.env.get("SPARK_YARN_MODE") + if (!yarnMode.isEmpty && yarnMode.get == "true" && + !(driver && System.getProperty("spark.master") == "yarn-client")) { + // The SparkR package distributed as an archive resource should be pointed to + // by a symbol link "sparkr" in the current directory. + new File("sparkr").getAbsolutePath + } else { + // TBD: add support for MESOS + val sparkHome = sys.env.get("SPARK_HOME") + if (sparkHome.isEmpty) { + throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.") + } + Seq(sparkHome, "R", "lib").mkString(File.separator) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala index 233af3271d346..a560e74c3315f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConversions._ import org.apache.hadoop.fs.Path -import org.apache.spark.api.r.RBackend +import org.apache.spark.api.r.{RBackend, RUtils} import org.apache.spark.util.RedirectThread /** @@ -71,15 +71,7 @@ object RRunner { val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs) val env = builder.environment() env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString) - val rPackageDir = - if (System.getProperty("spark.master") == "yarn-cluster") { - // The SparkR package distributed as an archive resource should be pointed to - // by a symbol link "sparkr" in the current directory. - new File("sparkr").getAbsolutePath - } else { - val sparkHome = System.getenv("SPARK_HOME") - Seq(sparkHome, "R", "lib").mkString(File.separator) - } + val rPackageDir = RUtils.sparkRPackagePath(true) env.put("SPARKR_PACKAGE_DIR", rPackageDir) env.put("R_PROFILE_USER", Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator)) diff --git a/make-distribution.sh b/make-distribution.sh index cac7032bb2e87..dbc91b7b36358 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -218,7 +218,6 @@ cp -r "$SPARK_HOME/ec2" "$DISTDIR" # Copy SparkR if it exists if [ -d "$SPARK_HOME"/R/lib/SparkR ]; then mkdir -p "$DISTDIR"/R/lib - cp -r "$SPARK_HOME/R/lib/SparkR" "$DISTDIR"/R/lib cp "$SPARK_HOME/R/lib/sparkr.zip" "$DISTDIR"/R/lib fi From 0aa1e97815372c7b1d48d499fa14cc305afd0922 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Mon, 29 Jun 2015 22:32:36 +0800 Subject: [PATCH 07/17] Fix scala style. --- core/src/main/scala/org/apache/spark/api/r/RUtils.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index e8cfd26cf8dc3..fd190f78e264f 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -22,14 +22,16 @@ import java.io.{File} import org.apache.spark.SparkException private[spark] object RUtils { - /** Get the SparkR package path in various deployment modes **/ + /** + * Get the SparkR package path in various deployment modes. + */ def sparkRPackagePath(driver: Boolean): String = { val yarnMode = sys.env.get("SPARK_YARN_MODE") if (!yarnMode.isEmpty && yarnMode.get == "true" && !(driver && System.getProperty("spark.master") == "yarn-client")) { // The SparkR package distributed as an archive resource should be pointed to // by a symbol link "sparkr" in the current directory. - new File("sparkr").getAbsolutePath + new File("sparkr").getAbsolutePath } else { // TBD: add support for MESOS val sparkHome = sys.env.get("SPARK_HOME") From 1acefd182be04626c3badb3d3badfd6f61b310ae Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Mon, 29 Jun 2015 23:36:24 +0800 Subject: [PATCH 08/17] Fix scala style. --- core/src/main/scala/org/apache/spark/api/r/RUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index fd190f78e264f..8b7d9134d8039 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkException private[spark] object RUtils { /** - * Get the SparkR package path in various deployment modes. + * Get the SparkR package path in various deployment modes. */ def sparkRPackagePath(driver: Boolean): String = { val yarnMode = sys.env.get("SPARK_YARN_MODE") From 2ca5048dfe9b4132b95cefb64b91b97041d3046b Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 30 Jun 2015 16:54:22 +0800 Subject: [PATCH 09/17] Fix comments. --- .../scala/org/apache/spark/api/r/RRDD.scala | 2 +- .../scala/org/apache/spark/api/r/RUtils.scala | 23 ++++++++++++++----- .../org/apache/spark/deploy/RRunner.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 8 +++---- 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala index cd593159114f5..664f3c6941fe4 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala @@ -391,7 +391,7 @@ private[r] object RRDD { private def createRProcess(port: Int, script: String): BufferedStreamThread = { val rCommand = SparkEnv.get.conf.get("spark.sparkr.r.command", "Rscript") val rOptions = "--vanilla" - val rLibDir = RUtils.sparkRPackagePath(false) + val rLibDir = RUtils.sparkRPackagePath(driver = false) val rExecScript = rLibDir + "/SparkR/worker/" + script val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript)) // Unset the R_TESTS environment variable for workers. diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index 8b7d9134d8039..f09535bfdc52e 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -17,11 +17,21 @@ package org.apache.spark.api.r -import java.io.{File} +import java.io.File import org.apache.spark.SparkException private[spark] object RUtils { + /** + * Get the SparkR package path in the local spark distribution. + */ + def localSparkRPackagePath: Option[String] = { + val sparkHome = sys.env.get("SPARK_HOME") + sparkHome.map( + Seq(_, "R", "lib").mkString(File.separator) + ) + } + /** * Get the SparkR package path in various deployment modes. */ @@ -29,16 +39,17 @@ private[spark] object RUtils { val yarnMode = sys.env.get("SPARK_YARN_MODE") if (!yarnMode.isEmpty && yarnMode.get == "true" && !(driver && System.getProperty("spark.master") == "yarn-client")) { - // The SparkR package distributed as an archive resource should be pointed to + // For workers in YARN modes and driver in yarn cluster mode, + // the SparkR package distributed as an archive resource should be pointed to // by a symbol link "sparkr" in the current directory. new File("sparkr").getAbsolutePath } else { // TBD: add support for MESOS - val sparkHome = sys.env.get("SPARK_HOME") - if (sparkHome.isEmpty) { + val rPackagePath = localSparkRPackagePath + if (rPackagePath.isEmpty) { throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.") - } - Seq(sparkHome, "R", "lib").mkString(File.separator) + } + rPackagePath.get } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala index a560e74c3315f..adc6ce6654ad0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala @@ -71,7 +71,7 @@ object RRunner { val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs) val env = builder.environment() env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString) - val rPackageDir = RUtils.sparkRPackagePath(true) + val rPackageDir = RUtils.sparkRPackagePath(driver = true) env.put("SPARKR_PACKAGE_DIR", rPackageDir) env.put("R_PROFILE_USER", Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator)) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 439705ea84fb8..cf853b2b11dbc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -37,6 +37,7 @@ import org.apache.ivy.core.settings.IvySettings import org.apache.ivy.plugins.matcher.GlobPatternMatcher import org.apache.ivy.plugins.repository.file.FileRepository import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver} +import org.apache.spark.api.r.RUtils import org.apache.spark.SPARK_VERSION import org.apache.spark.deploy.rest._ import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} @@ -351,12 +352,11 @@ object SparkSubmit { // In yarn mode for an R app, add the SparkR package archive to archives // that can be distributed with the job if (args.isR && clusterManager == YARN) { - val sparkHome = sys.env.get("SPARK_HOME") - if (sparkHome.isEmpty) { + val rPackagePath = RUtils.localSparkRPackagePath + if (rPackagePath.isEmpty) { printErrorAndExit("SPARK_HOME does not exist for R application in yarn mode.") } - val rPackagePath = Seq(sparkHome.get, "R", "lib").mkString(File.separator) - val rPackageFile = new File(rPackagePath, SPARKR_PACKAGE_ARCHIVE) + val rPackageFile = new File(rPackagePath.get, SPARKR_PACKAGE_ARCHIVE) if (!rPackageFile.exists()) { printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in yarn mode.") } From b05340c6c06663b07065ef2a9a3afc9562b3f2fd Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 30 Jun 2015 17:31:13 +0800 Subject: [PATCH 10/17] Fix scala style. --- core/src/main/scala/org/apache/spark/api/r/RUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index f09535bfdc52e..f9e00552b9ecc 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -48,8 +48,8 @@ private[spark] object RUtils { val rPackagePath = localSparkRPackagePath if (rPackagePath.isEmpty) { throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.") - } - rPackagePath.get + } + rPackagePath.get } } } From c38a005db066ad46f8897b379c3b48ee0bc2af66 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 1 Jul 2015 07:34:22 +0800 Subject: [PATCH 11/17] Unzipped SparkR binary package is still required for standalone and Mesos modes. --- make-distribution.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/make-distribution.sh b/make-distribution.sh index dbc91b7b36358..cac7032bb2e87 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -218,6 +218,7 @@ cp -r "$SPARK_HOME/ec2" "$DISTDIR" # Copy SparkR if it exists if [ -d "$SPARK_HOME"/R/lib/SparkR ]; then mkdir -p "$DISTDIR"/R/lib + cp -r "$SPARK_HOME/R/lib/SparkR" "$DISTDIR"/R/lib cp "$SPARK_HOME/R/lib/sparkr.zip" "$DISTDIR"/R/lib fi From 35ecfa3a5ad6594d5763083b221eeaab30366b61 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 7 Jul 2015 18:20:39 +0800 Subject: [PATCH 12/17] Fix comments. --- .../scala/org/apache/spark/api/r/RRDD.scala | 2 +- .../scala/org/apache/spark/api/r/RUtils.scala | 34 ++++++++++++------- .../org/apache/spark/deploy/RRunner.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 13 +++++-- project/MimaExcludes.scala | 3 ++ 5 files changed, 37 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala index 664f3c6941fe4..23a470d6afcae 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala @@ -391,7 +391,7 @@ private[r] object RRDD { private def createRProcess(port: Int, script: String): BufferedStreamThread = { val rCommand = SparkEnv.get.conf.get("spark.sparkr.r.command", "Rscript") val rOptions = "--vanilla" - val rLibDir = RUtils.sparkRPackagePath(driver = false) + val rLibDir = RUtils.sparkRPackagePath(isDriver = false) val rExecScript = rLibDir + "/SparkR/worker/" + script val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript)) // Unset the R_TESTS environment variable for workers. diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index f9e00552b9ecc..d53abd3408c55 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.api.r import java.io.File -import org.apache.spark.SparkException +import org.apache.spark.{SparkEnv, SparkException} private[spark] object RUtils { /** @@ -34,22 +34,32 @@ private[spark] object RUtils { /** * Get the SparkR package path in various deployment modes. + * This assumes that Spark properties `spark.master` and `spark.submit.deployMode` + * and environment variable `SPARK_HOME` are set. */ - def sparkRPackagePath(driver: Boolean): String = { - val yarnMode = sys.env.get("SPARK_YARN_MODE") - if (!yarnMode.isEmpty && yarnMode.get == "true" && - !(driver && System.getProperty("spark.master") == "yarn-client")) { - // For workers in YARN modes and driver in yarn cluster mode, - // the SparkR package distributed as an archive resource should be pointed to - // by a symbol link "sparkr" in the current directory. + def sparkRPackagePath(isDriver: Boolean): String = { + val (master, deployMode) = + if (isDriver) { + (sys.props("spark.master"), sys.props("spark.submit.deployMode")) + } else { + val sparkConf = SparkEnv.get.conf + (sparkConf.get("spark.master"), sparkConf.get("spark.submit.deployMode")) + } + + val isYarnCluster = master.contains("yarn") && deployMode == "cluster" + val isYarnClient = master.contains("yarn") && deployMode == "client" + + // In YARN mode, the SparkR package is distributed as an archive symbolically + // linked to the "sparkr" file in the current directory. Note that this does not apply + // to the driver in client mode because it is run outside of the cluster. + if (isYarnCluster || (isYarnClient && !isDriver)) { new File("sparkr").getAbsolutePath } else { - // TBD: add support for MESOS - val rPackagePath = localSparkRPackagePath - if (rPackagePath.isEmpty) { + // Otherwise, assume the package is local + // TODO: support this for Mesos + localSparkRPackagePath.getOrElse { throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.") } - rPackagePath.get } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala index adc6ce6654ad0..c0cab22fa8252 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala @@ -71,7 +71,7 @@ object RRunner { val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs) val env = builder.environment() env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString) - val rPackageDir = RUtils.sparkRPackagePath(driver = true) + val rPackageDir = RUtils.sparkRPackagePath(isDriver = true) env.put("SPARKR_PACKAGE_DIR", rPackageDir) env.put("R_PROFILE_USER", Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator)) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index cf853b2b11dbc..0885888b40be6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -264,6 +264,11 @@ object SparkSubmit { } } + // Update args.deployMode if it is null. It will be passed down as a Spark property later. + (args.deployMode, deployMode) match { + case (null, CLIENT) => args.deployMode = "client" + case (null, CLUSTER) => args.deployMode = "cluster" + } val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER @@ -349,16 +354,16 @@ object SparkSubmit { } } - // In yarn mode for an R app, add the SparkR package archive to archives + // In YARN mode for an R app, add the SparkR package archive to archives // that can be distributed with the job if (args.isR && clusterManager == YARN) { val rPackagePath = RUtils.localSparkRPackagePath if (rPackagePath.isEmpty) { - printErrorAndExit("SPARK_HOME does not exist for R application in yarn mode.") + printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.") } val rPackageFile = new File(rPackagePath.get, SPARKR_PACKAGE_ARCHIVE) if (!rPackageFile.exists()) { - printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in yarn mode.") + printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.") } val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath) @@ -394,6 +399,8 @@ object SparkSubmit { // All cluster managers OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"), + OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, + sysProp = "spark.submit.deployMode"), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"), OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"), OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars.ivy"), diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 79089aae2a37c..8da18cadbff54 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -83,6 +83,9 @@ object MimaExcludes { "org.apache.spark.streaming.scheduler.InputInfo$"), ProblemFilters.exclude[MissingClassProblem]( "org.apache.spark.streaming.scheduler.InputInfo") + // SPARK-6797 Support YARN modes for SparkR + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.r.BaseRRDD.this") ) case v if v.startsWith("1.4") => From fe25a3351bf049593ae5ea7ddcee73e63cfe39b4 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 8 Jul 2015 12:19:46 +0800 Subject: [PATCH 13/17] Fix Mima test error. --- project/MimaExcludes.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 8da18cadbff54..ee3a031ca435f 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -85,7 +85,13 @@ object MimaExcludes { "org.apache.spark.streaming.scheduler.InputInfo") // SPARK-6797 Support YARN modes for SparkR ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.api.r.BaseRRDD.this") + "org.apache.spark.api.r.PairwiseRRDD.this"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.r.RRDD.createRWorker"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.r.RRDD.this"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.r.StringRRDD.this") ) case v if v.startsWith("1.4") => From 193882f08c6223663802ff35772ed4d6f075557e Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 8 Jul 2015 12:54:22 +0800 Subject: [PATCH 14/17] Fix Mima test error. --- project/MimaExcludes.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ee3a031ca435f..91090afab8540 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -91,7 +91,9 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.r.RRDD.this"), ProblemFilters.exclude[MissingMethodProblem]( - "org.apache.spark.api.r.StringRRDD.this") + "org.apache.spark.api.r.StringRRDD.this"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.api.r.BaseRRDD.this") ) case v if v.startsWith("1.4") => From 72695fbf6600835f7ff3ed86cb5346e42f758d83 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 8 Jul 2015 15:27:09 +0800 Subject: [PATCH 15/17] Fix unit test failures. --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 0885888b40be6..7089a7e26707f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -268,6 +268,7 @@ object SparkSubmit { (args.deployMode, deployMode) match { case (null, CLIENT) => args.deployMode = "client" case (null, CLUSTER) => args.deployMode = "cluster" + case _ => } val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER From 73133746395212a54985e64574fc5b434d6435e3 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 8 Jul 2015 17:41:55 +0800 Subject: [PATCH 16/17] Fix unit test errors. --- .../test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 1b64c329b5d4b..e7878bde6fcb0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -246,7 +246,7 @@ class SparkSubmitSuite mainClass should be ("org.apache.spark.deploy.Client") } classpath should have size 0 - sysProps should have size 8 + sysProps should have size 9 sysProps.keys should contain ("SPARK_SUBMIT") sysProps.keys should contain ("spark.master") sysProps.keys should contain ("spark.app.name") @@ -255,6 +255,7 @@ class SparkSubmitSuite sysProps.keys should contain ("spark.driver.cores") sysProps.keys should contain ("spark.driver.supervise") sysProps.keys should contain ("spark.shuffle.spill") + sysProps.keys should contain ("spark.submit.deployMode") sysProps("spark.shuffle.spill") should be ("false") } From ca63c8644dd430c6373937bf6a79d4dc9d756140 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Mon, 13 Jul 2015 15:18:00 +0800 Subject: [PATCH 17/17] Adjust MimaExcludes after rebase. --- project/MimaExcludes.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 91090afab8540..4e4e810ec36e3 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -83,6 +83,7 @@ object MimaExcludes { "org.apache.spark.streaming.scheduler.InputInfo$"), ProblemFilters.exclude[MissingClassProblem]( "org.apache.spark.streaming.scheduler.InputInfo") + ) ++ Seq( // SPARK-6797 Support YARN modes for SparkR ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.r.PairwiseRRDD.this"),