From 62a6e20179dd63703d18de9784c8b3770077e968 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Fri, 24 Feb 2017 16:29:43 -0500 Subject: [PATCH 01/16] WIP --- .../security/AMCredentialRenewer.scala | 9 +-- .../ConfigurableCredentialManager.scala | 9 +-- .../deploy}/security/CredentialUpdater.scala | 14 ++-- .../security/HBaseCredentialProvider.scala | 9 +-- .../security/HadoopFSCredentialProvider.scala | 12 ++- .../security/HiveCredentialProvider.scala | 11 ++- .../security/ServiceCredentialProvider.scala | 20 +---- .../spark/internal/config/package.scala | 45 +++++++++++ .../cluster/StandaloneSchedulerBackend.scala | 57 +++++++++++++- dev/.rat-excludes | 2 +- docs/running-on-yarn.md | 3 +- ....deploy.security.ServiceCredentialProvider | 3 + ...oy.yarn.security.ServiceCredentialProvider | 3 - .../spark/deploy/yarn/ApplicationMaster.scala | 6 +- .../org/apache/spark/deploy/yarn/Client.scala | 6 +- .../deploy/yarn/YarnSparkHadoopUtil.scala | 4 +- .../org/apache/spark/deploy/yarn/config.scala | 76 +++++++++---------- ...deploy.security.ServiceCredentialProvider} | 0 .../ConfigurableCredentialManagerSuite.scala | 2 +- .../HadoopFSCredentialProviderSuite.scala | 2 +- 20 files changed, 178 insertions(+), 115 deletions(-) rename {resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn => core/src/main/scala/org/apache/spark/deploy}/security/AMCredentialRenewer.scala (97%) rename {resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn => core/src/main/scala/org/apache/spark/deploy}/security/ConfigurableCredentialManager.scala (97%) rename {resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn => core/src/main/scala/org/apache/spark/deploy}/security/CredentialUpdater.scala (95%) rename {resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn => core/src/main/scala/org/apache/spark/deploy}/security/HBaseCredentialProvider.scala (98%) rename {resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn => core/src/main/scala/org/apache/spark/deploy}/security/HadoopFSCredentialProvider.scala (98%) rename {resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn => core/src/main/scala/org/apache/spark/deploy}/security/HiveCredentialProvider.scala (99%) rename {resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn => core/src/main/scala/org/apache/spark/deploy}/security/ServiceCredentialProvider.scala (60%) create mode 100644 resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider delete mode 100644 resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider rename resource-managers/yarn/src/test/resources/META-INF/services/{org.apache.spark.deploy.yarn.security.ServiceCredentialProvider => org.apache.spark.deploy.security.ServiceCredentialProvider} (100%) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/core/src/main/scala/org/apache/spark/deploy/security/AMCredentialRenewer.scala similarity index 97% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala rename to core/src/main/scala/org/apache/spark/deploy/security/AMCredentialRenewer.scala index 7e76f402db249..c422f3550f3c7 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/AMCredentialRenewer.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import java.security.PrivilegedExceptionAction import java.util.concurrent.{Executors, TimeUnit} @@ -22,11 +22,8 @@ import java.util.concurrent.{Executors, TimeUnit} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.UserGroupInformation - import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil -import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.util.ThreadUtils @@ -51,7 +48,7 @@ import org.apache.spark.util.ThreadUtils * appeared, it will read the credentials and update the currently running UGI with it. This * process happens again once 80% of the validity of this has expired. */ -private[yarn] class AMCredentialRenewer( +private[spark] class AMCredentialRenewer( sparkConf: SparkConf, hadoopConf: Configuration, credentialManager: ConfigurableCredentialManager) extends Logging { @@ -62,7 +59,7 @@ private[yarn] class AMCredentialRenewer( Executors.newSingleThreadScheduledExecutor( ThreadUtils.namedThreadFactory("Credential Refresh Thread")) - private val hadoopUtil = YarnSparkHadoopUtil.get + private val hadoopUtil = SparkHadoopUtil.get private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH) private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/ConfigurableCredentialManager.scala similarity index 97% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala rename to core/src/main/scala/org/apache/spark/deploy/security/ConfigurableCredentialManager.scala index 4f4be52a0d691..904584a9a60ab 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/ConfigurableCredentialManager.scala @@ -15,19 +15,18 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import java.util.ServiceLoader -import scala.collection.JavaConverters._ - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials - import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.util.Utils +import scala.collection.JavaConverters._ + /** * A ConfigurableCredentialManager to manage all the registered credential providers and offer * APIs for other modules to obtain credentials as well as renewal time. By default @@ -41,7 +40,7 @@ import org.apache.spark.util.Utils * For example, Hive's credential provider [[HiveCredentialProvider]] can be enabled/disabled by * the configuration spark.yarn.security.credentials.hive.enabled. */ -private[yarn] final class ConfigurableCredentialManager( +private[spark] final class ConfigurableCredentialManager( sparkConf: SparkConf, hadoopConf: Configuration) extends Logging { private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled" private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled" diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala b/core/src/main/scala/org/apache/spark/deploy/security/CredentialUpdater.scala similarity index 95% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala rename to core/src/main/scala/org/apache/spark/deploy/security/CredentialUpdater.scala index 5df4fbd9c1537..8c3d15e240159 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/CredentialUpdater.scala @@ -15,22 +15,20 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import java.util.concurrent.{Executors, TimeUnit} -import scala.util.control.NonFatal - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.{Credentials, UserGroupInformation} - import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.util.{ThreadUtils, Utils} +import scala.util.control.NonFatal + private[spark] class CredentialUpdater( sparkConf: SparkConf, hadoopConf: Configuration, @@ -38,7 +36,8 @@ private[spark] class CredentialUpdater( @volatile private var lastCredentialsFileSuffix = 0 - private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH) + // TODO move to ConfigBuilder + private val credentialsFile = sparkConf.get("spark.yarn.credentials.file") private val freshHadoopConf = SparkHadoopUtil.get.getConfBypassingFSCache( hadoopConf, new Path(credentialsFile).toUri.getScheme) @@ -55,7 +54,8 @@ private[spark] class CredentialUpdater( /** Start the credential updater task */ def start(): Unit = { - val startTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME) + // TODO move to ConfigBuilder + val startTime = sparkConf.getTimeAsMs("spark.yarn.credentials.renewalTime") val remainingTime = startTime - System.currentTimeMillis() if (remainingTime <= 0) { credentialUpdater.schedule(credentialUpdaterRunnable, 1, TimeUnit.MINUTES) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseCredentialProvider.scala similarity index 98% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala rename to core/src/main/scala/org/apache/spark/deploy/security/HBaseCredentialProvider.scala index 5571df09a2ec9..e3487f3a7c93c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseCredentialProvider.scala @@ -15,18 +15,17 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security - -import scala.reflect.runtime.universe -import scala.util.control.NonFatal +package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.{Token, TokenIdentifier} - import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + private[security] class HBaseCredentialProvider extends ServiceCredentialProvider with Logging { override def serviceName: String = "hbase" diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala similarity index 98% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala rename to core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala index f65c886db944e..9c048fdb9384e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala @@ -15,21 +15,19 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security - -import scala.collection.JavaConverters._ -import scala.util.Try +package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.Master import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier - -import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.{SparkConf, SparkException} + +import scala.collection.JavaConverters._ +import scala.util.Try private[security] class HadoopFSCredentialProvider extends ServiceCredentialProvider with Logging { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveCredentialProvider.scala similarity index 99% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala rename to core/src/main/scala/org/apache/spark/deploy/security/HiveCredentialProvider.scala index 16d8fc32bb42d..a65904224c9fd 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveCredentialProvider.scala @@ -15,24 +15,23 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import java.lang.reflect.UndeclaredThrowableException import java.security.PrivilegedExceptionAction -import scala.reflect.runtime.universe -import scala.util.control.NonFatal - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier import org.apache.hadoop.io.Text -import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.Token - +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.util.Utils +import scala.reflect.runtime.universe +import scala.util.control.NonFatal + private[security] class HiveCredentialProvider extends ServiceCredentialProvider with Logging { override def serviceName: String = "hive" diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala similarity index 60% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala rename to core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala index 4e3fcce8dbb1d..9caba64903dff 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala @@ -1,25 +1,7 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.{Credentials, UserGroupInformation} - import org.apache.spark.SparkConf /** diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 223c921810378..ec4dac6c3f4ef 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -264,4 +264,49 @@ package object config { .booleanConf .createWithDefault(false) + + ////// TODO + + private[spark] val CREDENTIALS_RENEWAL_TIME = ConfigBuilder("spark.yarn.credentials.renewalTime") + .internal() + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(Long.MaxValue) + + private[spark] val CREDENTIALS_UPDATE_TIME = ConfigBuilder("spark.yarn.credentials.updateTime") + .internal() + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(Long.MaxValue) + + private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file") + .internal() + .stringConf + .createWithDefault(null) + + private[spark] val CREDENTIAL_FILE_MAX_COUNT = + ConfigBuilder("spark.yarn.credentials.file.retention.count") + .intConf + .createWithDefault(5) + + private[spark] val CREDENTIALS_FILE_MAX_RETENTION = + ConfigBuilder("spark.yarn.credentials.file.retention.days") + .intConf + .createWithDefault(5) + + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") + .doc("Staging directory used while submitting applications.") + .stringConf + .createOptional + + private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes") + .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " + + "fs.defaultFS does not need to be listed here.") + .stringConf + .toSequence + .createWithDefault(Nil) + + private[spark] val FILESYSTEMS_TO_ACCESS = ConfigBuilder("spark.yarn.access.hadoopFileSystems") + .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + + "that hosts fs.defaultFS does not need to be listed here.") + .fallbackConf(NAMENODES_TO_ACCESS) + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 7befdb0c1f64d..00146a058286d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -17,18 +17,23 @@ package org.apache.spark.scheduler.cluster +import java.io.File +import java.util.UUID import java.util.concurrent.Semaphore -import scala.concurrent.Future - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.deploy.{ApplicationDescription, Command} +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} +import org.apache.spark.deploy.security.ConfigurableCredentialManager +import org.apache.spark.deploy.{ApplicationDescription, Command, SparkHadoopUtil} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KEYTAB, PRINCIPAL} import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkContext} + +import scala.concurrent.Future /** * A [[SchedulerBackend]] implementation for Spark's standalone cluster manager. @@ -55,10 +60,43 @@ private[spark] class StandaloneSchedulerBackend( private val maxCores = conf.getOption("spark.cores.max").map(_.toInt) private val totalExpectedCores = maxCores.getOrElse(0) + + private var loginFromKeytab = false + private var principal: String = null + private var keytab: String = null + private var credentials: Credentials = null + + def setupCredentials(): Unit = { + loginFromKeytab = sc.conf.contains(PRINCIPAL.key) + if (loginFromKeytab) { + principal = sc.conf.get(PRINCIPAL).get + keytab = sc.conf.get(KEYTAB).orNull + + require(keytab != null, "Keytab must be specified when principal is specified.") + logInfo("Attempting to login to the Kerberos" + + s" using principal: $principal and keytab: $keytab") + val f = new File(keytab) + // Generate a file name that can be used for the keytab file, that does not conflict + // with any user file. + val keytabFileName = f.getName + "-" + UUID.randomUUID().toString + sc.conf.set(KEYTAB.key, keytabFileName) + sc.conf.set(PRINCIPAL.key, principal) + } + // Defensive copy of the credentials + credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) + } + + + + + + override def start() { super.start() launcherBackend.connect() + setupCredentials() + // The endpoint for executors to talk to us val driverUrl = RpcEndpointAddress( sc.conf.get("spark.driver.host"), @@ -110,6 +148,17 @@ private[spark] class StandaloneSchedulerBackend( launcherBackend.setState(SparkAppHandle.State.SUBMITTED) waitForRegistration() launcherBackend.setState(SparkAppHandle.State.RUNNING) + + + + + if (conf.contains("spark.yarn.credentials.file")) { + val newConf = SparkHadoopUtil.get.newConfiguration(conf) + val cu = new ConfigurableCredentialManager(conf, newConf).credentialUpdater() + cu.start() + } + + } override def stop(): Unit = synchronized { diff --git a/dev/.rat-excludes b/dev/.rat-excludes index 2355d40d1e6fe..4ae5b0b083241 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -102,7 +102,7 @@ spark-deps-.* org.apache.spark.scheduler.ExternalClusterManager .*\.sql .Rbuildignore -org.apache.spark.deploy.yarn.security.ServiceCredentialProvider +org.apache.spark.deploy.security.ServiceCredentialProvider spark-warehouse structured-streaming/* kafka-source-initial-offset-version-2.1.0.bin diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index cf95b95afd2ee..465d7c2f4c1d0 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -503,8 +503,9 @@ launch time. This is done by listing them in the `spark.yarn.access.hadoopFileSy spark.yarn.access.hadoopFileSystems hdfs://ireland.example.org:8020/,webhdfs://frankfurt.example.org:50070/ ``` +TODO Spark supports integrating with other security-aware services through Java Services mechanism (see -`java.util.ServiceLoader`). To do that, implementations of `org.apache.spark.deploy.yarn.security.ServiceCredentialProvider` +`java.util.ServiceLoader`). To do that, implementations of `org.apache.spark.deploy.security.ServiceCredentialProvider` should be available to Spark by listing their names in the corresponding file in the jar's `META-INF/services` directory. These plug-ins can be disabled by setting `spark.yarn.security.credentials.{service}.enabled` to `false`, where `{service}` is the name of diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider new file mode 100644 index 0000000000000..9ffeb4d500296 --- /dev/null +++ b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider @@ -0,0 +1,3 @@ +org.apache.spark.deploy.security.HadoopFSCredentialProvider +org.apache.spark.deploy.security.HBaseCredentialProvider +org.apache.spark.deploy.security.HiveCredentialProvider diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider deleted file mode 100644 index f5a807ecac9d7..0000000000000 --- a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider +++ /dev/null @@ -1,3 +0,0 @@ -org.apache.spark.deploy.yarn.security.HadoopFSCredentialProvider -org.apache.spark.deploy.yarn.security.HBaseCredentialProvider -org.apache.spark.deploy.yarn.security.HiveCredentialProvider diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 9df43aea3f3d5..87118ad8b7a70 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -20,25 +20,23 @@ package org.apache.spark.deploy.yarn import java.io.{File, IOException} import java.lang.reflect.InvocationTargetException import java.net.{Socket, URI, URL} -import java.util.concurrent.{TimeoutException, TimeUnit} +import java.util.concurrent.{TimeUnit, TimeoutException} import scala.collection.mutable.HashMap import scala.concurrent.Promise import scala.concurrent.duration.Duration import scala.util.control.NonFatal - import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException import org.apache.hadoop.yarn.util.{ConverterUtils, Records} - import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer +import org.apache.spark.deploy.security.{AMCredentialRenewer, ConfigurableCredentialManager} import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, ConfigurableCredentialManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.rpc._ diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index a00234c2b416c..0828f74fc197d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.yarn import java.io.{File, FileOutputStream, IOException, OutputStreamWriter} -import java.net.{InetAddress, UnknownHostException, URI} +import java.net.{InetAddress, URI, UnknownHostException} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import java.util.{Properties, UUID} @@ -27,7 +27,6 @@ import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} import scala.util.control.NonFatal - import com.google.common.base.Objects import com.google.common.io.Files import org.apache.hadoop.conf.Configuration @@ -45,11 +44,10 @@ import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records - import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.ConfigurableCredentialManager import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 93578855122cd..9e18c3384b887 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -22,7 +22,6 @@ import java.util.regex.Matcher import java.util.regex.Pattern import scala.collection.mutable.{HashMap, ListBuffer} - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf @@ -32,10 +31,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils - import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.security.{ConfigurableCredentialManager, CredentialUpdater} +import org.apache.spark.deploy.security.{ConfigurableCredentialManager, CredentialUpdater} import org.apache.spark.internal.config._ import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.util.Utils diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index f19a5b22a757d..9b388b0f457f6 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -120,10 +120,10 @@ package object config { .intConf .createOptional - private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") - .doc("Staging directory used while submitting applications.") - .stringConf - .createOptional +// private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") +// .doc("Staging directory used while submitting applications.") +// .stringConf +// .createOptional /* Cluster-mode launcher configuration. */ @@ -231,27 +231,27 @@ package object config { /* Security configuration. */ - private[spark] val CREDENTIAL_FILE_MAX_COUNT = - ConfigBuilder("spark.yarn.credentials.file.retention.count") - .intConf - .createWithDefault(5) - - private[spark] val CREDENTIALS_FILE_MAX_RETENTION = - ConfigBuilder("spark.yarn.credentials.file.retention.days") - .intConf - .createWithDefault(5) - - private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes") - .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " + - "fs.defaultFS does not need to be listed here.") - .stringConf - .toSequence - .createWithDefault(Nil) - - private[spark] val FILESYSTEMS_TO_ACCESS = ConfigBuilder("spark.yarn.access.hadoopFileSystems") - .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + - "that hosts fs.defaultFS does not need to be listed here.") - .fallbackConf(NAMENODES_TO_ACCESS) +// private[spark] val CREDENTIAL_FILE_MAX_COUNT = +// ConfigBuilder("spark.yarn.credentials.file.retention.count") +// .intConf +// .createWithDefault(5) +// +// private[spark] val CREDENTIALS_FILE_MAX_RETENTION = +// ConfigBuilder("spark.yarn.credentials.file.retention.days") +// .intConf +// .createWithDefault(5) + +// private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes") +// .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " + +// "fs.defaultFS does not need to be listed here.") +// .stringConf +// .toSequence +// .createWithDefault(Nil) +// +// private[spark] val FILESYSTEMS_TO_ACCESS = ConfigBuilder("spark.yarn.access.hadoopFileSystems") +// .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + +// "that hosts fs.defaultFS does not need to be listed here.") +// .fallbackConf(NAMENODES_TO_ACCESS) /* Rolled log aggregation configuration. */ @@ -271,10 +271,10 @@ package object config { /* Private configs. */ - private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file") - .internal() - .stringConf - .createWithDefault(null) +// private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file") +// .internal() +// .stringConf +// .createWithDefault(null) // Internal config to propagate the location of the user's jar to the driver/executors private[spark] val APP_JAR = ConfigBuilder("spark.yarn.user.jar") @@ -329,15 +329,15 @@ package object config { .stringConf .createOptional - private[spark] val CREDENTIALS_RENEWAL_TIME = ConfigBuilder("spark.yarn.credentials.renewalTime") - .internal() - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(Long.MaxValue) - - private[spark] val CREDENTIALS_UPDATE_TIME = ConfigBuilder("spark.yarn.credentials.updateTime") - .internal() - .timeConf(TimeUnit.MILLISECONDS) - .createWithDefault(Long.MaxValue) +// private[spark] val CREDENTIALS_RENEWAL_TIME = ConfigBuilder("spark.yarn.credentials.renewalTime") +// .internal() +// .timeConf(TimeUnit.MILLISECONDS) +// .createWithDefault(Long.MaxValue) +// +// private[spark] val CREDENTIALS_UPDATE_TIME = ConfigBuilder("spark.yarn.credentials.updateTime") +// .internal() +// .timeConf(TimeUnit.MILLISECONDS) +// .createWithDefault(Long.MaxValue) // The list of cache-related config entries. This is used by Client and the AM to clean // up the environment so that these settings do not appear on the web UI. diff --git a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider similarity index 100% rename from resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider rename to resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala index b0067aa4517c7..187c17e5ca1e6 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala @@ -21,8 +21,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Text import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.Token +import org.apache.spark.deploy.security.{ConfigurableCredentialManager, HBaseCredentialProvider, HiveCredentialProvider, ServiceCredentialProvider} import org.scalatest.{BeforeAndAfter, Matchers} - import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.yarn.config._ diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala index f50ee193c258f..770ec6e0434f7 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala @@ -18,8 +18,8 @@ package org.apache.spark.deploy.yarn.security import org.apache.hadoop.conf.Configuration +import org.apache.spark.deploy.security.HadoopFSCredentialProvider import org.scalatest.{Matchers, PrivateMethodTester} - import org.apache.spark.{SparkException, SparkFunSuite} class HadoopFSCredentialProviderSuite From accfe0cebc645ed2b99aaded7629b93b56fcb7ea Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Fri, 24 Feb 2017 16:35:24 -0500 Subject: [PATCH 02/16] Add license header that somehow got removed --- .../security/ServiceCredentialProvider.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala index 9caba64903dff..8dfcc4b7ab60a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration From b8559b5895c81c871b1db00b75f038082b2dd4fb Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Fri, 24 Feb 2017 16:46:18 -0500 Subject: [PATCH 03/16] Fixup tests --- .../security/ConfigurableCredentialManagerSuite.scala | 6 ++---- .../deploy}/security/HadoopFSCredentialProviderSuite.scala | 7 +++---- ....apache.spark.deploy.security.ServiceCredentialProvider | 2 +- 3 files changed, 6 insertions(+), 9 deletions(-) rename {resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn => core/src/test/scala/org/apache/spark/deploy}/security/ConfigurableCredentialManagerSuite.scala (96%) rename {resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn => core/src/test/scala/org/apache/spark/deploy}/security/HadoopFSCredentialProviderSuite.scala (93%) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/ConfigurableCredentialManagerSuite.scala similarity index 96% rename from resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala rename to core/src/test/scala/org/apache/spark/deploy/security/ConfigurableCredentialManagerSuite.scala index 187c17e5ca1e6..84e962edadd2a 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/ConfigurableCredentialManagerSuite.scala @@ -15,16 +15,14 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Text import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.Token -import org.apache.spark.deploy.security.{ConfigurableCredentialManager, HBaseCredentialProvider, HiveCredentialProvider, ServiceCredentialProvider} -import org.scalatest.{BeforeAndAfter, Matchers} import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.yarn.config._ +import org.scalatest.{BeforeAndAfter, Matchers} class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { private var credentialManager: ConfigurableCredentialManager = null diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSCredentialProviderSuite.scala similarity index 93% rename from resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala rename to core/src/test/scala/org/apache/spark/deploy/security/HadoopFSCredentialProviderSuite.scala index 770ec6e0434f7..c620ceeb3d093 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSCredentialProviderSuite.scala @@ -15,14 +15,13 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration -import org.apache.spark.deploy.security.HadoopFSCredentialProvider -import org.scalatest.{Matchers, PrivateMethodTester} import org.apache.spark.{SparkException, SparkFunSuite} +import org.scalatest.{Matchers, PrivateMethodTester} -class HadoopFSCredentialProviderSuite +class HadoopFSCredentialProviderSuite extends SparkFunSuite with PrivateMethodTester with Matchers { diff --git a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider index d0ef5efa36e86..2676a0ad589fa 100644 --- a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider +++ b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider @@ -1 +1 @@ -org.apache.spark.deploy.yarn.security.TestCredentialProvider +org.apache.spark.deploy.security.TestCredentialProvider From 539cc6cf630e9429e7131e755d8e9fa12479cd0c Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Sat, 25 Feb 2017 20:01:12 -0500 Subject: [PATCH 04/16] WIP --- build/sbt-launch-0.13.13.jar.part | 0 .../spark/deploy/ApplicationDescription.scala | 4 +- .../cluster/StandaloneSchedulerBackend.scala | 65 ++++++++++++++++++- .../HadoopFSCredentialProviderSuite.scala | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 1 + 5 files changed, 68 insertions(+), 4 deletions(-) create mode 100644 build/sbt-launch-0.13.13.jar.part diff --git a/build/sbt-launch-0.13.13.jar.part b/build/sbt-launch-0.13.13.jar.part new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index c5c5c60923f4e..46fb45231bf7c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy import java.net.URI +import java.nio.ByteBuffer private[spark] case class ApplicationDescription( name: String, @@ -32,7 +33,8 @@ private[spark] case class ApplicationDescription( // number of executors this application wants to start with, // only used if dynamic allocation is enabled initialExecutorLimit: Option[Int] = None, - user: String = System.getProperty("user.name", "")) { + user: String = System.getProperty("user.name", ""), + tokens: Option[ByteBuffer] = None) { override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 00146a058286d..d8654ec77980a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -18,15 +18,18 @@ package org.apache.spark.scheduler.cluster import java.io.File +import java.nio.ByteBuffer import java.util.UUID import java.util.concurrent.Semaphore +import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} import org.apache.spark.deploy.security.ConfigurableCredentialManager import org.apache.spark.deploy.{ApplicationDescription, Command, SparkHadoopUtil} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{KEYTAB, PRINCIPAL} +import org.apache.spark.internal.config.{CREDENTIALS_RENEWAL_TIME, CREDENTIALS_UPDATE_TIME, KEYTAB, PRINCIPAL} import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ @@ -65,6 +68,8 @@ private[spark] class StandaloneSchedulerBackend( private var principal: String = null private var keytab: String = null private var credentials: Credentials = null + private val credentialManager = new ConfigurableCredentialManager(sc.conf, sc.hadoopConfiguration) + def setupCredentials(): Unit = { loginFromKeytab = sc.conf.contains(PRINCIPAL.key) @@ -84,6 +89,7 @@ private[spark] class StandaloneSchedulerBackend( } // Defensive copy of the credentials credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) + logInfo("Credentials loaded: " + UserGroupInformation.getCurrentUser) } @@ -143,7 +149,62 @@ private[spark] class StandaloneSchedulerBackend( } val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) - client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) + + + + + //////////////////////////////////////////// + //////////////////////////////////////////// + // Merge credentials obtained from registered providers + val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(sc.hadoopConfiguration, credentials) + + if (credentials != null) { + logDebug(SparkHadoopUtil.get.dumpTokens(credentials).mkString("\n")) + } + + // If we use principal and keytab to login, also credentials can be renewed some time + // after current time, we should pass the next renewal and updating time to credential + // renewer and updater. + if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() && + nearestTimeOfNextRenewal != Long.MaxValue) { + + // Valid renewal time is 75% of next renewal time, and the valid update time will be + // slightly later then renewal time (80% of next renewal time). This is to make sure + // credentials are renewed and updated before expired. + val currTime = System.currentTimeMillis() + val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime + val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime + + sc.conf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong) + sc.conf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong) + } + + + def setupSecurityToken(appDesc: ApplicationDescription): ApplicationDescription = { + val dob = new DataOutputBuffer + credentials.writeTokenStorageToStream(dob) + appDesc.copy(tokens = Some(ByteBuffer.wrap(dob.getData))) + } + + val secureAppDesc = setupSecurityToken(appDesc) + + // If we passed in a keytab, make sure we copy the keytab to the staging directory on + // HDFS, and setup the relevant environment vars, so the AM can login again. +// if (loginFromKeytab) { +// logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + +// " via the YARN Secure Distributed Cache.") +// val (_, localizedPath) = distribute(keytab, +// destName = sparkConf.get(KEYTAB), +// appMasterOnly = true) +// require(localizedPath != null, "Keytab file already distributed.") +// } + //////////////////////////////////////////// + //////////////////////////////////////////// + + + + + client = new StandaloneAppClient(sc.env.rpcEnv, masters, secureAppDesc, this, conf) client.start() launcherBackend.setState(SparkAppHandle.State.SUBMITTED) waitForRegistration() diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSCredentialProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSCredentialProviderSuite.scala index c620ceeb3d093..c551498f6d53f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSCredentialProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSCredentialProviderSuite.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkException, SparkFunSuite} import org.scalatest.{Matchers, PrivateMethodTester} -class HadoopFSCredentialProviderSuite +class HadoopFSCredentialProviderSuite extends SparkFunSuite with PrivateMethodTester with Matchers { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 0828f74fc197d..e2e88556255f5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -981,6 +981,7 @@ private[spark] class Client( amContainer } + // TODO - doesn't actually login from keytab! That's done in SparkSubmit! def setupCredentials(): Unit = { loginFromKeytab = sparkConf.contains(PRINCIPAL.key) if (loginFromKeytab) { From 3f76281094493d63b6364fe38612e56f437c6a7c Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Mon, 27 Feb 2017 16:26:48 -0500 Subject: [PATCH 05/16] Push delegation token out to ExecutorRunner --- .../spark/deploy/ApplicationDescription.scala | 2 +- .../security/HadoopFSCredentialProvider.scala | 14 ++--- .../spark/deploy/worker/ExecutorRunner.scala | 29 ++++++++-- .../cluster/StandaloneSchedulerBackend.scala | 53 ++++++++++++++++--- 4 files changed, 81 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 46fb45231bf7c..ce441ad3317f4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -34,7 +34,7 @@ private[spark] case class ApplicationDescription( // only used if dynamic allocation is enabled initialExecutorLimit: Option[Int] = None, user: String = System.getProperty("user.name", ""), - tokens: Option[ByteBuffer] = None) { + tokens: Option[Array[Byte]] = None) { override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala index 9c048fdb9384e..21063048d686e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala @@ -22,9 +22,10 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.Master import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.util.Utils import scala.collection.JavaConverters._ import scala.util.Try @@ -99,14 +100,15 @@ private[security] class HadoopFSCredentialProvider } private def getTokenRenewer(conf: Configuration): String = { - val delegTokenRenewer = Master.getMasterPrincipal(conf) - logDebug("delegation token renewer is: " + delegTokenRenewer) + var delegTokenRenewer = Master.getMasterPrincipal(conf) if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - val errorMessage = "Can't get Master Kerberos principal for use as renewer" - logError(errorMessage) - throw new SparkException(errorMessage) + logWarning("Can't get Master Kerberos principal for use as renewer, " + + "will proceed with " + Utils.getCurrentUserName() + " as renewer") + delegTokenRenewer = Utils.getCurrentUserName() } + logInfo("delegation token renewer is: " + delegTokenRenewer) + delegTokenRenewer } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index d4d8521cc8204..89feb1766b457 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -18,13 +18,13 @@ package org.apache.spark.deploy.worker import java.io._ +import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ - import com.google.common.io.Files - -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.hadoop.security.UserGroupInformation +import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged import org.apache.spark.internal.Logging @@ -154,6 +154,29 @@ private[deploy] class ExecutorRunner( // parent process for the executor command builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") + + + ///////////////////////////// + + logInfo(s"APP DESC TOKENS: ${appDesc} tokens ${appDesc.tokens}") + appDesc.tokens.foreach { bytes => + val creds = new File(executorDir, "credentials-" + appId) + logInfo("Writing out delegation tokens to " + creds.toString) + Utils.writeByteBuffer(ByteBuffer.wrap(bytes), new FileOutputStream(creds)) // TODO - duh + logInfo(s"Delegation Tokens written out successfully to $creds") + builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", creds.toString) + } + + + + ///////////////////////////// + + + + + + + // Add webUI log urls val baseUrl = if (conf.getBoolean("spark.ui.reverseProxy", false)) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index d8654ec77980a..478cdee01d016 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -22,14 +22,15 @@ import java.nio.ByteBuffer import java.util.UUID import java.util.concurrent.Semaphore +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext +import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} import org.apache.spark.deploy.security.ConfigurableCredentialManager import org.apache.spark.deploy.{ApplicationDescription, Command, SparkHadoopUtil} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{CREDENTIALS_RENEWAL_TIME, CREDENTIALS_UPDATE_TIME, KEYTAB, PRINCIPAL} +import org.apache.spark.internal.config._ import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ @@ -64,13 +65,14 @@ private[spark] class StandaloneSchedulerBackend( private val totalExpectedCores = maxCores.getOrElse(0) + ////////////////////////////////////////////////////// + ////////////////////////////////////////////////////// private var loginFromKeytab = false private var principal: String = null private var keytab: String = null private var credentials: Credentials = null private val credentialManager = new ConfigurableCredentialManager(sc.conf, sc.hadoopConfiguration) - def setupCredentials(): Unit = { loginFromKeytab = sc.conf.contains(PRINCIPAL.key) if (loginFromKeytab) { @@ -92,7 +94,8 @@ private[spark] class StandaloneSchedulerBackend( logInfo("Credentials loaded: " + UserGroupInformation.getCurrentUser) } - + ////////////////////////////////////////////////////// + ////////////////////////////////////////////////////// @@ -181,12 +184,48 @@ private[spark] class StandaloneSchedulerBackend( def setupSecurityToken(appDesc: ApplicationDescription): ApplicationDescription = { + logInfo(s"Writing credentials to buffer: ${credentials}: ${credentials.getAllTokens}") val dob = new DataOutputBuffer credentials.writeTokenStorageToStream(dob) - appDesc.copy(tokens = Some(ByteBuffer.wrap(dob.getData))) + dob.close() + + val tokens = Some(dob.getData) + + appDesc.copy(tokens = tokens) + } + + def buildPath(components: String*): String = { + components.mkString(Path.SEPARATOR) + } + + val SPARK_STAGING: String = ".sparkStaging" + + def getAppStagingDir(appId: String): String = { + buildPath(SPARK_STAGING, appId) + } + + + + var secureAppDesc = setupSecurityToken(appDesc) + + val appStagingBaseDir = sc.conf.get(STAGING_DIR).map { new Path(_) } + .getOrElse(FileSystem.get(sc.hadoopConfiguration).getHomeDirectory()) + val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) + + + + val su = UserGroupInformation.getCurrentUser().getShortUserName() + if (loginFromKeytab) { + val credentialsFile = "credentials-" + UUID.randomUUID().toString + sc.conf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString) + logInfo(s"Credentials file set to: $credentialsFile") } - val secureAppDesc = setupSecurityToken(appDesc) + + + + logInfo(s"Submitting ${secureAppDesc} with tokens ${secureAppDesc.tokens}") + // If we passed in a keytab, make sure we copy the keytab to the staging directory on // HDFS, and setup the relevant environment vars, so the AM can login again. @@ -213,7 +252,7 @@ private[spark] class StandaloneSchedulerBackend( - if (conf.contains("spark.yarn.credentials.file")) { + if (conf.contains(CREDENTIALS_FILE_PATH)) { val newConf = SparkHadoopUtil.get.newConfiguration(conf) val cu = new ConfigurableCredentialManager(conf, newConf).credentialUpdater() cu.start() From 25e7639af248bba4f648d13f5dc76a4fe8bfca34 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Tue, 28 Feb 2017 16:21:10 -0500 Subject: [PATCH 06/16] More wip... probably borked --- .../spark/deploy/security/CredentialUpdater.scala | 5 +++-- .../apache/spark/deploy/worker/ExecutorRunner.scala | 3 +++ .../spark/executor/CoarseGrainedExecutorBackend.scala | 10 ++++++++-- .../scheduler/cluster/StandaloneSchedulerBackend.scala | 9 +++++++++ 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/CredentialUpdater.scala b/core/src/main/scala/org/apache/spark/deploy/security/CredentialUpdater.scala index 8c3d15e240159..0fefb47260c4e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/CredentialUpdater.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/CredentialUpdater.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.util.{ThreadUtils, Utils} import scala.util.control.NonFatal @@ -37,7 +38,7 @@ private[spark] class CredentialUpdater( @volatile private var lastCredentialsFileSuffix = 0 // TODO move to ConfigBuilder - private val credentialsFile = sparkConf.get("spark.yarn.credentials.file") + private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH) private val freshHadoopConf = SparkHadoopUtil.get.getConfBypassingFSCache( hadoopConf, new Path(credentialsFile).toUri.getScheme) @@ -55,7 +56,7 @@ private[spark] class CredentialUpdater( /** Start the credential updater task */ def start(): Unit = { // TODO move to ConfigBuilder - val startTime = sparkConf.getTimeAsMs("spark.yarn.credentials.renewalTime") + val startTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME) val remainingTime = startTime - System.currentTimeMillis() if (remainingTime <= 0) { credentialUpdater.schedule(credentialUpdaterRunnable, 1, TimeUnit.MINUTES) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 89feb1766b457..6a008d6fef159 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -157,6 +157,9 @@ private[deploy] class ExecutorRunner( ///////////////////////////// + /////////////////// just a one-time token... it can be renewed by driver + /////////////////// but will only last 7 days + /////////////////// for longer, the credetial updater will be used logInfo(s"APP DESC TOKENS: ${appDesc} tokens ${appDesc.tokens}") appDesc.tokens.foreach { bytes => diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b376ecd301eab..46789d5012a02 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -24,12 +24,12 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.util.{Failure, Success} import scala.util.control.NonFatal - import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.CREDENTIALS_FILE_PATH import org.apache.spark.rpc._ import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -213,12 +213,18 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { driverConf.set(key, value) } } - if (driverConf.contains("spark.yarn.credentials.file")) { + + ////////////////////////// + ////////////////////////// only useful if principal/keytab are specified + + if (driverConf.contains(CREDENTIALS_FILE_PATH.key)) { logInfo("Will periodically update credentials from: " + driverConf.get("spark.yarn.credentials.file")) SparkHadoopUtil.get.startCredentialUpdater(driverConf) } + ////////////////////////////// + val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 478cdee01d016..0307d3b5b2224 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -104,8 +104,17 @@ private[spark] class StandaloneSchedulerBackend( super.start() launcherBackend.connect() + + + ////////////////////////////////////////////////////// + ////////////////////////////////////////////////////// + setupCredentials() + + ////////////////////////////////////////////////////// + ////////////////////////////////////////////////////// + // The endpoint for executors to talk to us val driverUrl = RpcEndpointAddress( sc.conf.get("spark.driver.host"), From 847f6044d2fd0bf1af52d3d7c5d618c8e537e916 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Thu, 2 Mar 2017 11:48:45 -0500 Subject: [PATCH 07/16] Untested... make cluster mode work with standalone --- ....deploy.security.ServiceCredentialProvider | 0 .../spark/deploy/DriverDescription.scala | 3 +- .../deploy/rest/RestSubmissionClient.scala | 66 +++++++++++++++++-- .../deploy/rest/StandaloneRestServer.scala | 24 ++++++- .../rest/SubmitRestProtocolRequest.scala | 1 + .../spark/deploy/worker/DriverRunner.scala | 53 ++++++++++++++- ....deploy.security.ServiceCredentialProvider | 0 .../rest/StandaloneRestSubmitSuite.scala | 6 +- 8 files changed, 140 insertions(+), 13 deletions(-) rename {resource-managers/yarn => core}/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider (100%) rename {resource-managers/yarn => core}/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider (100%) diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider b/core/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider similarity index 100% rename from resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider rename to core/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index 1f5626ab5a896..c0b07d4cb270e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -22,7 +22,8 @@ private[deploy] case class DriverDescription( mem: Int, cores: Int, supervise: Boolean, - command: Command) { + command: Command, + tokens: Option[Array[Byte]] = None) { override def toString: String = s"DriverDescription (${command.mainClass})" } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 21cb94142b15b..28a2117cc207d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -17,9 +17,10 @@ package org.apache.spark.deploy.rest -import java.io.{DataOutputStream, FileNotFoundException} +import java.io.{DataOutputStream, File, FileNotFoundException} import java.net.{ConnectException, HttpURLConnection, SocketException, URL} import java.nio.charset.StandardCharsets +import java.util.UUID import java.util.concurrent.TimeoutException import javax.servlet.http.HttpServletResponse @@ -28,11 +29,13 @@ import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.io.Source import scala.util.control.NonFatal - import com.fasterxml.jackson.core.JsonProcessingException - -import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SparkException} +import org.apache.commons.codec.binary.Base64 +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.spark.{SparkConf, SparkException, SPARK_VERSION => sparkVersion} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{KEYTAB, PRINCIPAL} import org.apache.spark.util.Utils /** @@ -72,6 +75,41 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { // whether there are masters still alive for us to communicate with private val lostMasters = new mutable.HashSet[String] + + ////////////// + ///////////// + private var loginFromKeytab = false + private var principal: String = null + private var keytab: String = null + private var credentials: Credentials = null + + + // TODO - doesn't actually login from keytab! That's done in SparkSubmit! + def setupCredentials(sparkConf: SparkConf): Unit = { + loginFromKeytab = sparkConf.contains(PRINCIPAL.key) + if (loginFromKeytab) { + principal = sparkConf.get(PRINCIPAL).get + keytab = sparkConf.get(KEYTAB).orNull + + require(keytab != null, "Keytab must be specified when principal is specified.") + logInfo("Attempting to login to the Kerberos" + + s" using principal: $principal and keytab: $keytab") + val f = new File(keytab) + // Generate a file name that can be used for the keytab file, that does not conflict + // with any user file. + val keytabFileName = f.getName + "-" + UUID.randomUUID().toString + sparkConf.set(KEYTAB.key, keytabFileName) + sparkConf.set(PRINCIPAL.key, principal) + } + // Defensive copy of the credentials + credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) + } + /////////////// + ////////////// + + + + /** * Submit an application specified by the parameters in the provided request. * @@ -174,7 +212,13 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { mainClass: String, appArgs: Array[String], sparkProperties: Map[String, String], - environmentVariables: Map[String, String]): CreateSubmissionRequest = { + environmentVariables: Map[String, String], + conf: SparkConf): CreateSubmissionRequest = { + + + setupCredentials(conf) + + val message = new CreateSubmissionRequest message.clientSparkVersion = sparkVersion message.appResource = appResource @@ -182,6 +226,16 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { message.appArgs = appArgs message.sparkProperties = sparkProperties message.environmentVariables = environmentVariables + + ////////// + val dob = new DataOutputBuffer + credentials.writeTokenStorageToStream(dob) + dob.close() + + message.tokens = new String(Base64.encodeBase64(dob.getData)) + ////////// + + message.validate() message } @@ -413,7 +467,7 @@ private[spark] object RestSubmissionClient { val sparkProperties = conf.getAll.toMap val client = new RestSubmissionClient(master) val submitRequest = client.constructSubmitRequest( - appResource, mainClass, appArgs, sparkProperties, env) + appResource, mainClass, appArgs, sparkProperties, env, conf) client.createSubmission(submitRequest) } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 56620064c57fa..ba9efc2ef6a46 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -20,7 +20,8 @@ package org.apache.spark.deploy.rest import java.io.File import javax.servlet.http.HttpServletResponse -import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} +import org.apache.commons.codec.binary.Base64 +import org.apache.spark.{SparkConf, SPARK_VERSION => sparkVersion} import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription} import org.apache.spark.deploy.ClientArguments._ import org.apache.spark.rpc.RpcEndpointRef @@ -130,6 +131,18 @@ private[rest] class StandaloneSubmitRequestServlet( throw new SubmitRestMissingFieldException("Main class is missing.") } + ///////////////// + ///////////////// + + val tokens = Option(Base64.decodeBase64(request.tokens)) + + + ///////////////// + ///////////////// + + + + // Optional fields val sparkProperties = request.sparkProperties val driverMemory = sparkProperties.get("spark.driver.memory") @@ -157,8 +170,15 @@ private[rest] class StandaloneSubmitRequestServlet( val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES) val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) + + ////////// + ////////// + new DriverDescription( - appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command) + appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command, tokens) + + /////// + /////// } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala index 0d50a768942ed..e4f4d0635a636 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala @@ -41,6 +41,7 @@ private[rest] class CreateSubmissionRequest extends SubmitRestProtocolRequest { var appArgs: Array[String] = null var sparkProperties: Map[String, String] = null var environmentVariables: Map[String, String] = null + var tokens: String = null protected override def doValidate(): Unit = { super.doValidate() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index e878c10183f61..7ab0ba41d0f52 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -19,18 +19,19 @@ package org.apache.spark.deploy.worker import java.io._ import java.net.URI +import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ - import com.google.common.io.Files - import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages.DriverStateChanged import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.master.DriverState.DriverState +import org.apache.spark.deploy.security.{AMCredentialRenewer, ConfigurableCredentialManager} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.CREDENTIALS_FILE_PATH import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils} @@ -56,6 +57,15 @@ private[deploy] class DriverRunner( @volatile private[worker] var finalState: Option[DriverState] = None @volatile private[worker] var finalException: Option[Exception] = None + + /////////////////////// + /////////////////////// + private var credentialRenewer: AMCredentialRenewer = _ + /////////////////////// + /////////////////////// + + + // Timeout to wait for when trying to terminate a driver. private val DRIVER_TERMINATE_TIMEOUT_MS = 10 * 1000 @@ -171,6 +181,8 @@ private[deploy] class DriverRunner( val driverDir = createWorkingDirectory() val localJarFilename = downloadUserJar(driverDir) + + def substituteVariables(argument: String): String = argument match { case "{{WORKER_URL}}" => workerUrl case "{{USER_JAR}}" => localJarFilename @@ -181,6 +193,43 @@ private[deploy] class DriverRunner( val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) + + + //////////////////// + //////////////////// + ///////////////////////////// + /////////////////// just a one-time token... it can be renewed by driver + /////////////////// but will only last 7 days + /////////////////// for longer, the credetial updater will be used + + logInfo(s"Driver description: ${driverDesc} tokens ${driverDesc.tokens}") + driverDesc.tokens.foreach { bytes => + val creds = new File(driverDir, "driver-credentials-" + driverId) + logInfo("Writing out delegation tokens to " + creds.toString) + Utils.writeByteBuffer(ByteBuffer.wrap(bytes), new FileOutputStream(creds)) // TODO - duh + logInfo(s"Delegation Tokens written out successfully to $creds") + builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", creds.toString) + } + + /// SHOULD BOTH RENEW AND UPDATE CREDENTIALS + + // If the credentials file config is present, we must periodically renew tokens. So create + // a new AMDelegationTokenRenewer + if (conf.contains(CREDENTIALS_FILE_PATH.key)) { + // If a principal and keytab have been set, use that to create new credentials for executors + // periodically + val newConf = SparkHadoopUtil.get.newConfiguration(conf) + credentialRenewer = + new ConfigurableCredentialManager(conf, newConf).credentialRenewer() + credentialRenewer.scheduleLoginFromKeytab() + } + + + ///////////////////////////// + //////////////////// + //////////////////// + + runDriver(builder, driverDir, driverDesc.supervise) } diff --git a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider b/core/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider similarity index 100% rename from resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider rename to core/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index dd50e33da30ac..0020943a0c148 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -56,7 +56,8 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { val sparkProperties = Map("spark.app.name" -> "pi") val environmentVariables = Map("SPARK_ONE" -> "UN", "SPARK_TWO" -> "DEUX") val request = new RestSubmissionClient("spark://host:port").constructSubmitRequest( - "my-app-resource", "my-main-class", appArgs, sparkProperties, environmentVariables) + "my-app-resource", "my-main-class", appArgs, + sparkProperties, environmentVariables, new SparkConf(loadDefaults = false)) assert(request.action === Utils.getFormattedClassName(request)) assert(request.clientSparkVersion === SPARK_VERSION) assert(request.appResource === "my-app-resource") @@ -447,7 +448,8 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { val args = new SparkSubmitArguments(commandLineArgs) val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args) new RestSubmissionClient("spark://host:port").constructSubmitRequest( - mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty) + mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty, + new SparkConf(loadDefaults = false)) } /** Return the response as a submit response, or fail with error otherwise. */ From 4689a55402f193199faf2dc2e2c6c4c904e34bf0 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Tue, 7 Mar 2017 11:35:51 -0500 Subject: [PATCH 08/16] Hadoop FileInputFormat is hardcoded to request delegation tokens with renewer = yarn.resourcemanager.principal --- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 11 +++++++++++ .../security/HadoopFSCredentialProvider.scala | 13 ++++++------- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 5ffdedd1658ab..2c62f1521cea7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -546,6 +546,17 @@ object SparkSubmit extends CommandLineUtils { } } + + //////////////////////////// + //////////////////////////// + + /////// TODO = standalone mode should log in as well + + //////////////////////////// + //////////////////////////////////////////////////////// + + + // assure a keytab is available from any place in a JVM if (clusterManager == YARN || clusterManager == LOCAL) { if (args.principal != null) { diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala index 21063048d686e..75baa20baaef2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.Master import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.util.Utils @@ -100,15 +100,14 @@ private[security] class HadoopFSCredentialProvider } private def getTokenRenewer(conf: Configuration): String = { - var delegTokenRenewer = Master.getMasterPrincipal(conf) + val delegTokenRenewer = Master.getMasterPrincipal(conf) + logDebug("delegation token renewer is: " + delegTokenRenewer) if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - logWarning("Can't get Master Kerberos principal for use as renewer, " + - "will proceed with " + Utils.getCurrentUserName() + " as renewer") - delegTokenRenewer = Utils.getCurrentUserName() + val errorMessage = "Can't get Master Kerberos principal for use as renewer" + logError(errorMessage) + throw new SparkException(errorMessage) } - logInfo("delegation token renewer is: " + delegTokenRenewer) - delegTokenRenewer } From 3e85aa5bfbaee2760d9eb3559d23546508b463d9 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Tue, 7 Mar 2017 15:59:21 -0500 Subject: [PATCH 09/16] Still need to sort out a few things, but overall much smaller patch-set --- .../spark/deploy/ApplicationDescription.scala | 4 +- .../spark/deploy/DriverDescription.scala | 3 +- .../org/apache/spark/deploy/SparkSubmit.scala | 13 +-- .../deploy/rest/RestSubmissionClient.scala | 110 +++++++++--------- .../deploy/rest/StandaloneRestServer.scala | 24 +--- .../rest/SubmitRestProtocolRequest.scala | 1 - .../spark/deploy/worker/DriverRunner.scala | 2 - .../spark/internal/config/package.scala | 3 - 8 files changed, 60 insertions(+), 100 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index ce441ad3317f4..c5c5c60923f4e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy import java.net.URI -import java.nio.ByteBuffer private[spark] case class ApplicationDescription( name: String, @@ -33,8 +32,7 @@ private[spark] case class ApplicationDescription( // number of executors this application wants to start with, // only used if dynamic allocation is enabled initialExecutorLimit: Option[Int] = None, - user: String = System.getProperty("user.name", ""), - tokens: Option[Array[Byte]] = None) { + user: String = System.getProperty("user.name", "")) { override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index c0b07d4cb270e..1f5626ab5a896 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -22,8 +22,7 @@ private[deploy] case class DriverDescription( mem: Int, cores: Int, supervise: Boolean, - command: Command, - tokens: Option[Array[Byte]] = None) { + command: Command) { override def toString: String = s"DriverDescription (${command.mainClass})" } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 2c62f1521cea7..319cb090ee6d0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -546,19 +546,8 @@ object SparkSubmit extends CommandLineUtils { } } - - //////////////////////////// - //////////////////////////// - - /////// TODO = standalone mode should log in as well - - //////////////////////////// - //////////////////////////////////////////////////////// - - - // assure a keytab is available from any place in a JVM - if (clusterManager == YARN || clusterManager == LOCAL) { + if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == STANDALONE) { if (args.principal != null) { require(args.keytab != null, "Keytab must be specified when principal is specified") if (!new File(args.keytab).exists()) { diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 28a2117cc207d..614deb583e490 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -17,26 +17,29 @@ package org.apache.spark.deploy.rest -import java.io.{DataOutputStream, File, FileNotFoundException} +import java.io.{DataOutputStream, File, FileInputStream, FileNotFoundException} import java.net.{ConnectException, HttpURLConnection, SocketException, URL} import java.nio.charset.StandardCharsets +import java.nio.file.Files import java.util.UUID import java.util.concurrent.TimeoutException import javax.servlet.http.HttpServletResponse -import scala.collection.mutable -import scala.concurrent.{Await, Future} -import scala.concurrent.duration._ -import scala.io.Source -import scala.util.control.NonFatal import com.fasterxml.jackson.core.JsonProcessingException import org.apache.commons.codec.binary.Base64 -import org.apache.hadoop.io.DataOutputBuffer +import org.apache.commons.io +import org.apache.hadoop.io.{DataInputBuffer, DataOutputBuffer, IOUtils} import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.spark.{SparkConf, SparkException, SPARK_VERSION => sparkVersion} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{KEYTAB, PRINCIPAL} import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkException, SPARK_VERSION => sparkVersion} + +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.io.Source +import scala.util.control.NonFatal /** * A client that submits applications to a [[RestSubmissionServer]]. @@ -75,41 +78,6 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { // whether there are masters still alive for us to communicate with private val lostMasters = new mutable.HashSet[String] - - ////////////// - ///////////// - private var loginFromKeytab = false - private var principal: String = null - private var keytab: String = null - private var credentials: Credentials = null - - - // TODO - doesn't actually login from keytab! That's done in SparkSubmit! - def setupCredentials(sparkConf: SparkConf): Unit = { - loginFromKeytab = sparkConf.contains(PRINCIPAL.key) - if (loginFromKeytab) { - principal = sparkConf.get(PRINCIPAL).get - keytab = sparkConf.get(KEYTAB).orNull - - require(keytab != null, "Keytab must be specified when principal is specified.") - logInfo("Attempting to login to the Kerberos" + - s" using principal: $principal and keytab: $keytab") - val f = new File(keytab) - // Generate a file name that can be used for the keytab file, that does not conflict - // with any user file. - val keytabFileName = f.getName + "-" + UUID.randomUUID().toString - sparkConf.set(KEYTAB.key, keytabFileName) - sparkConf.set(PRINCIPAL.key, principal) - } - // Defensive copy of the credentials - credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) - } - /////////////// - ////////////// - - - - /** * Submit an application specified by the parameters in the provided request. * @@ -212,13 +180,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { mainClass: String, appArgs: Array[String], sparkProperties: Map[String, String], - environmentVariables: Map[String, String], - conf: SparkConf): CreateSubmissionRequest = { - - - setupCredentials(conf) - - + environmentVariables: Map[String, String]): CreateSubmissionRequest = { val message = new CreateSubmissionRequest message.clientSparkVersion = sparkVersion message.appResource = appResource @@ -227,12 +189,50 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { message.sparkProperties = sparkProperties message.environmentVariables = environmentVariables + def base64EncodedValue(fn: DataOutputBuffer => Unit): String = { + val dob = new DataOutputBuffer + fn(dob) + dob.close() + new String(Base64.encodeBase64(dob.getData)) + } + ////////// - val dob = new DataOutputBuffer - credentials.writeTokenStorageToStream(dob) - dob.close() + // Security propogation + + if (sparkProperties.contains(PRINCIPAL.key)) { + val principal = sparkProperties.get(PRINCIPAL.key).get + val keytab = sparkProperties.get(KEYTAB.key).orNull + require(keytab != null, "Keytab must be specified when principal is specified.") + logInfo("Attempting to login to the Kerberos" + + s" using principal: $principal and keytab: $keytab") + val f = new File(keytab) + // Generate a file name that can be used for the keytab file, that does not conflict + // with any user file. + val keytabFileName = f.getName + "-" + UUID.randomUUID().toString + + logInfo("To enable the driver to login from keytab, credentials are are being copied" + + " to the Master inside the CreateSubmissionRequest") + + val keytabContent = base64EncodedValue { dob => + io.IOUtils.copy(new FileInputStream(f), dob) + } + + message.sparkProperties ++= Map( + KEYTAB.key -> keytabFileName, + KEYTAB.key + ".content" -> keytabContent, + PRINCIPAL.key -> principal + ) + } + + // Add credentials - works in the case of a user submitting a job + // that completes < YARN max token renewal + val credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) + val bootstrapCredentails = base64EncodedValue { dob => + credentials.writeTokenStorageToStream(dob) + } - message.tokens = new String(Base64.encodeBase64(dob.getData)) + message.sparkProperties ++= Map( + "spark.yarn.credentials.bootstrap" -> bootstrapCredentails) ////////// @@ -467,7 +467,7 @@ private[spark] object RestSubmissionClient { val sparkProperties = conf.getAll.toMap val client = new RestSubmissionClient(master) val submitRequest = client.constructSubmitRequest( - appResource, mainClass, appArgs, sparkProperties, env, conf) + appResource, mainClass, appArgs, sparkProperties, env) client.createSubmission(submitRequest) } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index ba9efc2ef6a46..56620064c57fa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -20,8 +20,7 @@ package org.apache.spark.deploy.rest import java.io.File import javax.servlet.http.HttpServletResponse -import org.apache.commons.codec.binary.Base64 -import org.apache.spark.{SparkConf, SPARK_VERSION => sparkVersion} +import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf} import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription} import org.apache.spark.deploy.ClientArguments._ import org.apache.spark.rpc.RpcEndpointRef @@ -131,18 +130,6 @@ private[rest] class StandaloneSubmitRequestServlet( throw new SubmitRestMissingFieldException("Main class is missing.") } - ///////////////// - ///////////////// - - val tokens = Option(Base64.decodeBase64(request.tokens)) - - - ///////////////// - ///////////////// - - - - // Optional fields val sparkProperties = request.sparkProperties val driverMemory = sparkProperties.get("spark.driver.memory") @@ -170,15 +157,8 @@ private[rest] class StandaloneSubmitRequestServlet( val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES) val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) - - ////////// - ////////// - new DriverDescription( - appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command, tokens) - - /////// - /////// + appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala index e4f4d0635a636..0d50a768942ed 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala @@ -41,7 +41,6 @@ private[rest] class CreateSubmissionRequest extends SubmitRestProtocolRequest { var appArgs: Array[String] = null var sparkProperties: Map[String, String] = null var environmentVariables: Map[String, String] = null - var tokens: String = null protected override def doValidate(): Unit = { super.doValidate() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 7ab0ba41d0f52..2a21a46ec4505 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -181,8 +181,6 @@ private[deploy] class DriverRunner( val driverDir = createWorkingDirectory() val localJarFilename = downloadUserJar(driverDir) - - def substituteVariables(argument: String): String = argument match { case "{{WORKER_URL}}" => workerUrl case "{{USER_JAR}}" => localJarFilename diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index ec4dac6c3f4ef..9657e510178ab 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -264,9 +264,6 @@ package object config { .booleanConf .createWithDefault(false) - - ////// TODO - private[spark] val CREDENTIALS_RENEWAL_TIME = ConfigBuilder("spark.yarn.credentials.renewalTime") .internal() .timeConf(TimeUnit.MILLISECONDS) From f743e6b207b7f71034fe617a402f54e0121b13a2 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Wed, 8 Mar 2017 12:06:14 -0500 Subject: [PATCH 10/16] WIP --- .../apache/spark/deploy/SparkHadoopUtil.scala | 18 +- .../deploy/rest/RestSubmissionClient.scala | 28 +-- .../spark/deploy/worker/DriverRunner.scala | 55 ++---- .../spark/deploy/worker/ExecutorRunner.scala | 30 +-- .../spark/internal/config/package.scala | 11 ++ .../cluster/StandaloneSchedulerBackend.scala | 176 +++++------------- 6 files changed, 116 insertions(+), 202 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 941e2d13fb28e..ec923b1135320 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,25 +17,26 @@ package org.apache.spark.deploy -import java.io.IOException +import java.io.{File, FileOutputStream, IOException} import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} import scala.collection.JavaConverters._ import scala.util.control.NonFatal - import com.google.common.primitives.Longs +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier - import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BOOTSTRAP_TOKENS import org.apache.spark.util.Utils /** @@ -350,6 +351,17 @@ class SparkHadoopUtil extends Logging { } buffer.toString } + + private[spark] def decodeAndWriteToFile(env: collection.Map[String, String], + key: String, where: File): Unit = { + if (env.contains(key)) { + val creds = new FileOutputStream(where) + val base64 = env.get(key).get + val raw = Base64.decodeBase64(base64) + IOUtils.write(raw, creds) + creds.close() + } + } } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 614deb583e490..5652601ba1810 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -20,7 +20,6 @@ package org.apache.spark.deploy.rest import java.io.{DataOutputStream, File, FileInputStream, FileNotFoundException} import java.net.{ConnectException, HttpURLConnection, SocketException, URL} import java.nio.charset.StandardCharsets -import java.nio.file.Files import java.util.UUID import java.util.concurrent.TimeoutException import javax.servlet.http.HttpServletResponse @@ -28,10 +27,10 @@ import javax.servlet.http.HttpServletResponse import com.fasterxml.jackson.core.JsonProcessingException import org.apache.commons.codec.binary.Base64 import org.apache.commons.io -import org.apache.hadoop.io.{DataInputBuffer, DataOutputBuffer, IOUtils} +import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{KEYTAB, PRINCIPAL} +import org.apache.spark.internal.config._ import org.apache.spark.util.Utils import org.apache.spark.{SparkConf, SparkException, SPARK_VERSION => sparkVersion} @@ -217,22 +216,25 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { io.IOUtils.copy(new FileInputStream(f), dob) } - message.sparkProperties ++= Map( - KEYTAB.key -> keytabFileName, - KEYTAB.key + ".content" -> keytabContent, - PRINCIPAL.key -> principal - ) + message.environmentVariables += KEYTAB_CONTENT.key -> keytabContent + // overwrite with localized version + message.environmentVariables += KEYTAB.key -> keytabFileName + message.sparkProperties += KEYTAB.key -> keytabFileName } // Add credentials - works in the case of a user submitting a job // that completes < YARN max token renewal + // The reason this is here is so you don't strictly need a keytab/principal + // you only need that if you want to run a long running job... but maybe get rid of it? val credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) - val bootstrapCredentails = base64EncodedValue { dob => - credentials.writeTokenStorageToStream(dob) - } + if (credentials.getAllTokens.size() > 0) { + val bootstrapCredentails = base64EncodedValue { dob => + credentials.writeTokenStorageToStream(dob) + } - message.sparkProperties ++= Map( - "spark.yarn.credentials.bootstrap" -> bootstrapCredentails) + logInfo("Security tokens will be sent to driver and executors") + message.environmentVariables += BOOTSTRAP_TOKENS.key -> bootstrapCredentails + } ////////// diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 2a21a46ec4505..6b8ed207c64bc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -24,6 +24,8 @@ import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ import com.google.common.io.Files +import org.apache.commons.codec.binary.Base64 +import org.apache.commons.io.IOUtils import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages.DriverStateChanged @@ -31,7 +33,7 @@ import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.security.{AMCredentialRenewer, ConfigurableCredentialManager} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.CREDENTIALS_FILE_PATH +import org.apache.spark.internal.config.{BOOTSTRAP_TOKENS, CREDENTIALS_FILE_PATH, KEYTAB, KEYTAB_CONTENT} import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils} @@ -57,15 +59,6 @@ private[deploy] class DriverRunner( @volatile private[worker] var finalState: Option[DriverState] = None @volatile private[worker] var finalException: Option[Exception] = None - - /////////////////////// - /////////////////////// - private var credentialRenewer: AMCredentialRenewer = _ - /////////////////////// - /////////////////////// - - - // Timeout to wait for when trying to terminate a driver. private val DRIVER_TERMINATE_TIMEOUT_MS = 10 * 1000 @@ -191,38 +184,22 @@ private[deploy] class DriverRunner( val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) - - - //////////////////// - //////////////////// - ///////////////////////////// - /////////////////// just a one-time token... it can be renewed by driver - /////////////////// but will only last 7 days - /////////////////// for longer, the credetial updater will be used - - logInfo(s"Driver description: ${driverDesc} tokens ${driverDesc.tokens}") - driverDesc.tokens.foreach { bytes => - val creds = new File(driverDir, "driver-credentials-" + driverId) - logInfo("Writing out delegation tokens to " + creds.toString) - Utils.writeByteBuffer(ByteBuffer.wrap(bytes), new FileOutputStream(creds)) // TODO - duh - logInfo(s"Delegation Tokens written out successfully to $creds") - builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", creds.toString) + /////////////// + //////////////// + //// + if (driverDesc.command.environment.contains(BOOTSTRAP_TOKENS.key)) { + val tokenFile = new File(driverDir, "driver-credentials-" + driverId) + SparkHadoopUtil.get.decodeAndWriteToFile(driverDesc.command.environment, + BOOTSTRAP_TOKENS.key, tokenFile) + builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", tokenFile.toString) } - /// SHOULD BOTH RENEW AND UPDATE CREDENTIALS - - // If the credentials file config is present, we must periodically renew tokens. So create - // a new AMDelegationTokenRenewer - if (conf.contains(CREDENTIALS_FILE_PATH.key)) { - // If a principal and keytab have been set, use that to create new credentials for executors - // periodically - val newConf = SparkHadoopUtil.get.newConfiguration(conf) - credentialRenewer = - new ConfigurableCredentialManager(conf, newConf).credentialRenewer() - credentialRenewer.scheduleLoginFromKeytab() + if (driverDesc.command.environment.contains(KEYTAB_CONTENT.key)) { + val keytab = driverDesc.command.environment.get(KEYTAB.key).get + val keytabFile = new File(driverDir, keytab) + SparkHadoopUtil.get.decodeAndWriteToFile(driverDesc.command.environment, + KEYTAB_CONTENT.key, keytabFile) } - - ///////////////////////////// //////////////////// //////////////////// diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 6a008d6fef159..1c48eb1bedd49 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -25,9 +25,10 @@ import scala.collection.JavaConverters._ import com.google.common.io.Files import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{SecurityManager, SparkConf, SparkContext} -import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} +import org.apache.spark.deploy.{ApplicationDescription, ExecutorState, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BOOTSTRAP_TOKENS import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.{ShutdownHookManager, Utils} import org.apache.spark.util.logging.FileAppender @@ -155,30 +156,17 @@ private[deploy] class ExecutorRunner( builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") - ///////////////////////////// - /////////////////// just a one-time token... it can be renewed by driver - /////////////////// but will only last 7 days - /////////////////// for longer, the credetial updater will be used - - logInfo(s"APP DESC TOKENS: ${appDesc} tokens ${appDesc.tokens}") - appDesc.tokens.foreach { bytes => - val creds = new File(executorDir, "credentials-" + appId) - logInfo("Writing out delegation tokens to " + creds.toString) - Utils.writeByteBuffer(ByteBuffer.wrap(bytes), new FileOutputStream(creds)) // TODO - duh - logInfo(s"Delegation Tokens written out successfully to $creds") - builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", creds.toString) + ///////////////////////////// + if (appDesc.command.environment.contains(BOOTSTRAP_TOKENS.key)) { + val tokenFile = new File(executorDir, "executor-credentials-" + appId) + SparkHadoopUtil.get.decodeAndWriteToFile(appDesc.command.environment, + BOOTSTRAP_TOKENS.key, tokenFile) + builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", tokenFile.toString) } - - ///////////////////////////// - - - - - - + ///////////////////////////// // Add webUI log urls val baseUrl = diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9657e510178ab..2ab55dbb62e11 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -306,4 +306,15 @@ package object config { "that hosts fs.defaultFS does not need to be listed here.") .fallbackConf(NAMENODES_TO_ACCESS) + private[spark] val KEYTAB_CONTENT = ConfigBuilder("spark.yarn.keytab.content") + .doc("Base64 encoded content of spark.yarn.keytab") + .internal() + .stringConf + .createOptional + + private[spark] val BOOTSTRAP_TOKENS = ConfigBuilder("spark.deploy.bootstrap.tokens") + .doc("Base64 encoded tokens to propogate to driver/executors laucnhed in standalone mode") + .internal() + .stringConf + .createOptional } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 0307d3b5b2224..bbda56ba87eaa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -27,7 +27,8 @@ import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} -import org.apache.spark.deploy.security.ConfigurableCredentialManager +import org.apache.spark.deploy.security.{AMCredentialRenewer, ConfigurableCredentialManager} +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil import org.apache.spark.deploy.{ApplicationDescription, Command, SparkHadoopUtil} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -65,40 +66,24 @@ private[spark] class StandaloneSchedulerBackend( private val totalExpectedCores = maxCores.getOrElse(0) - ////////////////////////////////////////////////////// - ////////////////////////////////////////////////////// private var loginFromKeytab = false private var principal: String = null private var keytab: String = null private var credentials: Credentials = null - private val credentialManager = new ConfigurableCredentialManager(sc.conf, sc.hadoopConfiguration) + + private var credentialRenewer: AMCredentialRenewer = _ + def setupCredentials(): Unit = { - loginFromKeytab = sc.conf.contains(PRINCIPAL.key) + loginFromKeytab = conf.contains(PRINCIPAL.key) if (loginFromKeytab) { - principal = sc.conf.get(PRINCIPAL).get - keytab = sc.conf.get(KEYTAB).orNull - - require(keytab != null, "Keytab must be specified when principal is specified.") - logInfo("Attempting to login to the Kerberos" + - s" using principal: $principal and keytab: $keytab") - val f = new File(keytab) - // Generate a file name that can be used for the keytab file, that does not conflict - // with any user file. - val keytabFileName = f.getName + "-" + UUID.randomUUID().toString - sc.conf.set(KEYTAB.key, keytabFileName) - sc.conf.set(PRINCIPAL.key, principal) + principal = conf.get(PRINCIPAL).get + keytab = conf.get(KEYTAB).orNull } // Defensive copy of the credentials credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) - logInfo("Credentials loaded: " + UserGroupInformation.getCurrentUser) } - ////////////////////////////////////////////////////// - ////////////////////////////////////////////////////// - - - override def start() { super.start() @@ -111,10 +96,51 @@ private[spark] class StandaloneSchedulerBackend( setupCredentials() + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + val credentialManager = new ConfigurableCredentialManager(conf, hadoopConf) + + // Merge credentials obtained from registered providers + val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(hadoopConf, credentials) + + if (credentials != null) { + logDebug(SparkHadoopUtil.get.dumpTokens(credentials).mkString("\n")) + } + + // If we use principal and keytab to login, also credentials can be renewed some time + // after current time, we should pass the next renewal and updating time to credential + // renewer and updater. + if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() && + nearestTimeOfNextRenewal != Long.MaxValue) { + + // Valid renewal time is 75% of next renewal time, and the valid update time will be + // slightly later then renewal time (80% of next renewal time). This is to make sure + // credentials are renewed and updated before expired. + val currTime = System.currentTimeMillis() + val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime + val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime + + conf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong) + conf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong) + } + + // If the credentials file config is present, we must periodically renew tokens. So create + // a new AMDelegationTokenRenewer + if (conf.contains(CREDENTIALS_FILE_PATH.key)) { + // If a principal and keytab have been set, use that to create new credentials for executors + // periodically + val hconf = SparkHadoopUtil.get.newConfiguration(conf) + credentialRenewer = credentialManager.credentialRenewer() + credentialRenewer.scheduleLoginFromKeytab() + } + // NOTE we don't need an updater since the above accomplishes it already + ////////////////////////////////////////////////////// ////////////////////////////////////////////////////// + + + // The endpoint for executors to talk to us val driverUrl = RpcEndpointAddress( sc.conf.get("spark.driver.host"), @@ -161,113 +187,11 @@ private[spark] class StandaloneSchedulerBackend( } val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) - - - - - //////////////////////////////////////////// - //////////////////////////////////////////// - // Merge credentials obtained from registered providers - val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(sc.hadoopConfiguration, credentials) - - if (credentials != null) { - logDebug(SparkHadoopUtil.get.dumpTokens(credentials).mkString("\n")) - } - - // If we use principal and keytab to login, also credentials can be renewed some time - // after current time, we should pass the next renewal and updating time to credential - // renewer and updater. - if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() && - nearestTimeOfNextRenewal != Long.MaxValue) { - - // Valid renewal time is 75% of next renewal time, and the valid update time will be - // slightly later then renewal time (80% of next renewal time). This is to make sure - // credentials are renewed and updated before expired. - val currTime = System.currentTimeMillis() - val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime - val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime - - sc.conf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong) - sc.conf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong) - } - - - def setupSecurityToken(appDesc: ApplicationDescription): ApplicationDescription = { - logInfo(s"Writing credentials to buffer: ${credentials}: ${credentials.getAllTokens}") - val dob = new DataOutputBuffer - credentials.writeTokenStorageToStream(dob) - dob.close() - - val tokens = Some(dob.getData) - - appDesc.copy(tokens = tokens) - } - - def buildPath(components: String*): String = { - components.mkString(Path.SEPARATOR) - } - - val SPARK_STAGING: String = ".sparkStaging" - - def getAppStagingDir(appId: String): String = { - buildPath(SPARK_STAGING, appId) - } - - - - var secureAppDesc = setupSecurityToken(appDesc) - - val appStagingBaseDir = sc.conf.get(STAGING_DIR).map { new Path(_) } - .getOrElse(FileSystem.get(sc.hadoopConfiguration).getHomeDirectory()) - val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) - - - - val su = UserGroupInformation.getCurrentUser().getShortUserName() - if (loginFromKeytab) { - val credentialsFile = "credentials-" + UUID.randomUUID().toString - sc.conf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString) - logInfo(s"Credentials file set to: $credentialsFile") - } - - - - - logInfo(s"Submitting ${secureAppDesc} with tokens ${secureAppDesc.tokens}") - - - // If we passed in a keytab, make sure we copy the keytab to the staging directory on - // HDFS, and setup the relevant environment vars, so the AM can login again. -// if (loginFromKeytab) { -// logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + -// " via the YARN Secure Distributed Cache.") -// val (_, localizedPath) = distribute(keytab, -// destName = sparkConf.get(KEYTAB), -// appMasterOnly = true) -// require(localizedPath != null, "Keytab file already distributed.") -// } - //////////////////////////////////////////// - //////////////////////////////////////////// - - - - - client = new StandaloneAppClient(sc.env.rpcEnv, masters, secureAppDesc, this, conf) + client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start() launcherBackend.setState(SparkAppHandle.State.SUBMITTED) waitForRegistration() launcherBackend.setState(SparkAppHandle.State.RUNNING) - - - - - if (conf.contains(CREDENTIALS_FILE_PATH)) { - val newConf = SparkHadoopUtil.get.newConfiguration(conf) - val cu = new ConfigurableCredentialManager(conf, newConf).credentialUpdater() - cu.start() - } - - } override def stop(): Unit = synchronized { From 31c91dcec25718052ae5c775bfe1b41359e8840f Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Wed, 8 Mar 2017 13:14:48 -0500 Subject: [PATCH 11/16] WIP --- .../scheduler/cluster/StandaloneSchedulerBackend.scala | 7 ------- .../spark/deploy/rest/StandaloneRestSubmitSuite.scala | 6 ++---- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index bbda56ba87eaa..ad8441a304d4c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -17,18 +17,11 @@ package org.apache.spark.scheduler.cluster -import java.io.File -import java.nio.ByteBuffer -import java.util.UUID import java.util.concurrent.Semaphore -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} import org.apache.spark.deploy.security.{AMCredentialRenewer, ConfigurableCredentialManager} -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil import org.apache.spark.deploy.{ApplicationDescription, Command, SparkHadoopUtil} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 0020943a0c148..dd50e33da30ac 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -56,8 +56,7 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { val sparkProperties = Map("spark.app.name" -> "pi") val environmentVariables = Map("SPARK_ONE" -> "UN", "SPARK_TWO" -> "DEUX") val request = new RestSubmissionClient("spark://host:port").constructSubmitRequest( - "my-app-resource", "my-main-class", appArgs, - sparkProperties, environmentVariables, new SparkConf(loadDefaults = false)) + "my-app-resource", "my-main-class", appArgs, sparkProperties, environmentVariables) assert(request.action === Utils.getFormattedClassName(request)) assert(request.clientSparkVersion === SPARK_VERSION) assert(request.appResource === "my-app-resource") @@ -448,8 +447,7 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { val args = new SparkSubmitArguments(commandLineArgs) val (_, _, sparkProperties, _) = SparkSubmit.prepareSubmitEnvironment(args) new RestSubmissionClient("spark://host:port").constructSubmitRequest( - mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty, - new SparkConf(loadDefaults = false)) + mainJar, mainClass, appArgs, sparkProperties.toMap, Map.empty) } /** Return the response as a submit response, or fail with error otherwise. */ From 19644195af14c9b8a451609157b9d47f7251ced4 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Wed, 8 Mar 2017 17:15:41 -0500 Subject: [PATCH 12/16] Still something isn't working --- .../deploy/rest/RestSubmissionClient.scala | 28 +++++++++--------- .../spark/deploy/worker/DriverRunner.scala | 13 +++++---- .../spark/deploy/worker/DriverWrapper.scala | 20 ++++++++++++- .../spark/deploy/worker/ExecutorRunner.scala | 5 ++++ .../cluster/StandaloneSchedulerBackend.scala | 29 ++++++++++++++++++- 5 files changed, 73 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 5652601ba1810..1543e0f27e975 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -222,22 +222,22 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { message.sparkProperties += KEYTAB.key -> keytabFileName } - // Add credentials - works in the case of a user submitting a job - // that completes < YARN max token renewal - // The reason this is here is so you don't strictly need a keytab/principal - // you only need that if you want to run a long running job... but maybe get rid of it? - val credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) - if (credentials.getAllTokens.size() > 0) { - val bootstrapCredentails = base64EncodedValue { dob => - credentials.writeTokenStorageToStream(dob) - } - - logInfo("Security tokens will be sent to driver and executors") - message.environmentVariables += BOOTSTRAP_TOKENS.key -> bootstrapCredentails - } +// // Add credentials - works in the case of a user submitting a job +// // that completes < YARN max token renewal +// // The reason this is here is so you don't strictly need a keytab/principal +// // you only need that if you want to run a long running job... but maybe get rid of it? +// val credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) +// logInfo(s"USERS CURRENT CREDENTAILS ${credentials}") +// if (credentials.getAllTokens.size() > 0) { +// val bootstrapCredentials = base64EncodedValue { dob => +// credentials.writeTokenStorageToStream(dob) +// } +// +// logInfo("Security tokens will be sent to driver and executors") +// message.environmentVariables += BOOTSTRAP_TOKENS.key -> bootstrapCredentials +// } ////////// - message.validate() message } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 6b8ed207c64bc..2ce0fec79fdba 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -185,14 +185,15 @@ private[deploy] class DriverRunner( driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) /////////////// + //////////////// we don't support user token propogation in cluster mode //////////////// //// - if (driverDesc.command.environment.contains(BOOTSTRAP_TOKENS.key)) { - val tokenFile = new File(driverDir, "driver-credentials-" + driverId) - SparkHadoopUtil.get.decodeAndWriteToFile(driverDesc.command.environment, - BOOTSTRAP_TOKENS.key, tokenFile) - builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", tokenFile.toString) - } +// if (driverDesc.command.environment.contains(BOOTSTRAP_TOKENS.key)) { +// val tokenFile = new File(driverDir, "driver-credentials-" + driverId) +// SparkHadoopUtil.get.decodeAndWriteToFile(driverDesc.command.environment, +// BOOTSTRAP_TOKENS.key, tokenFile) +// builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", tokenFile.toString) +// } if (driverDesc.command.environment.contains(KEYTAB_CONTENT.key)) { val keytab = driverDesc.command.environment.get(KEYTAB.key).get diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 6799f78ec0c19..f8e8b15516989 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -19,7 +19,9 @@ package org.apache.spark.deploy.worker import java.io.File -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.hadoop.security.UserGroupInformation +import org.apache.spark.internal.config.{KEYTAB, PRINCIPAL} +import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} @@ -52,6 +54,22 @@ object DriverWrapper { } Thread.currentThread.setContextClassLoader(loader) + + //////////////////////////////// + val loginFromKeytab = conf.contains(PRINCIPAL.key) + if (loginFromKeytab) { + val principal = conf.get(PRINCIPAL).get + val keytab = conf.get(KEYTAB).orNull + require(keytab != null, "Keytab must be specified when principal is specified") + if (!new File(keytab).exists()) { + throw new SparkException(s"Keytab file: ${keytab} does not exist") + } else { + UserGroupInformation.loginUserFromKeytab(principal, keytab) + } + } + ////////////////////////////// + + // Delegate to supplied main class val clazz = Utils.classForName(mainClass) val mainMethod = clazz.getMethod("main", classOf[Array[String]]) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 1c48eb1bedd49..ec107c755ddaa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -158,11 +158,16 @@ private[deploy] class ExecutorRunner( ///////////////////////////// ///////////////////////////// + logInfo("Environment") + appDesc.command.environment.foreach(kv => logInfo(" * " + kv)) + if (appDesc.command.environment.contains(BOOTSTRAP_TOKENS.key)) { val tokenFile = new File(executorDir, "executor-credentials-" + appId) SparkHadoopUtil.get.decodeAndWriteToFile(appDesc.command.environment, BOOTSTRAP_TOKENS.key, tokenFile) builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", tokenFile.toString) + + logInfo("Wrote HADOOP_TOKEN_FILE_LOCATION to " + tokenFile) } ///////////////////////////// diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index ad8441a304d4c..dbd87bdb26f68 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -19,6 +19,8 @@ package org.apache.spark.scheduler.cluster import java.util.concurrent.Semaphore +import org.apache.commons.codec.binary.Base64 +import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} import org.apache.spark.deploy.security.{AMCredentialRenewer, ConfigurableCredentialManager} @@ -163,11 +165,36 @@ private[spark] class StandaloneSchedulerBackend( Nil } + + ////////////////////// + ////////////////////// + + def base64EncodedValue(fn: DataOutputBuffer => Unit): String = { + val dob = new DataOutputBuffer + fn(dob) + dob.close() + new String(Base64.encodeBase64(dob.getData)) + } + + val bootstrap = if (credentials.getAllTokens.size() > 0) { + val bootstrapCredentials = base64EncodedValue { dob => + credentials.writeTokenStorageToStream(dob) + } + + logInfo("Security tokens will be sent to executors") + Map(BOOTSTRAP_TOKENS.key -> bootstrapCredentials) + } else Map.empty[String, String] + + val executorEnv = sc.executorEnvs.toMap ++ bootstrap + + ////////////////////// + ////////////////////// + // Start executors with a few necessary configs for registering with the scheduler val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", - args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) + args, executorEnv, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val webUrl = sc.ui.map(_.webUrl).getOrElse("") val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) // If we're using dynamic allocation, set our initial executor limit to 0 for now. From 83f05014659e08a4cd8c9703941c98aaaba9eb31 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Wed, 15 Mar 2017 16:28:56 -0400 Subject: [PATCH 13/16] Actually use credential updater --- .../apache/spark/deploy/SparkHadoopUtil.scala | 29 ++++++++++++++----- .../deploy/yarn/YarnSparkHadoopUtil.scala | 15 ---------- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index e3af81dc2dc35..5f217b3060f60 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -22,22 +22,23 @@ import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} -import scala.collection.JavaConverters._ -import scala.util.control.NonFatal import com.google.common.primitives.Longs import org.apache.commons.codec.binary.Base64 import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier -import org.apache.spark.{SparkConf, SparkException} +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.deploy.security.{ConfigurableCredentialManager, CredentialUpdater} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.BOOTSTRAP_TOKENS import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkException} + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal /** * :: DeveloperApi :: @@ -49,6 +50,9 @@ class SparkHadoopUtil extends Logging { val conf: Configuration = newConfiguration(sparkConf) UserGroupInformation.setConfiguration(conf) + private var credentialUpdater: CredentialUpdater = _ + + /** * Runs the given function with a Hadoop UserGroupInformation as a thread local variable * (distributed to child threads), used for authenticating HDFS and YARN calls. @@ -291,12 +295,21 @@ class SparkHadoopUtil extends Logging { * Start a thread to periodically update the current user's credentials with new credentials so * that access to secured service does not fail. */ - private[spark] def startCredentialUpdater(conf: SparkConf) {} + private[spark] def startCredentialUpdater(sparkConf: SparkConf): Unit = { + credentialUpdater = + new ConfigurableCredentialManager(sparkConf, newConfiguration(sparkConf)).credentialUpdater() + credentialUpdater.start() + } /** * Stop the thread that does the credential updates. */ - private[spark] def stopCredentialUpdater() {} + private[spark] def stopCredentialUpdater(): Unit = { + if (credentialUpdater != null) { + credentialUpdater.stop() + credentialUpdater = null + } + } /** * Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 80d193329b7e4..2cc2d889338e3 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -45,8 +45,6 @@ import org.apache.spark.util.Utils */ class YarnSparkHadoopUtil extends SparkHadoopUtil { - private var credentialUpdater: CredentialUpdater = _ - override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { dest.addCredentials(source.getCredentials()) } @@ -86,19 +84,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { if (credentials != null) credentials.getSecretKey(new Text(key)) else null } - private[spark] override def startCredentialUpdater(sparkConf: SparkConf): Unit = { - credentialUpdater = - new ConfigurableCredentialManager(sparkConf, newConfiguration(sparkConf)).credentialUpdater() - credentialUpdater.start() - } - - private[spark] override def stopCredentialUpdater(): Unit = { - if (credentialUpdater != null) { - credentialUpdater.stop() - credentialUpdater = null - } - } - private[spark] def getContainerId: ContainerId = { val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) ConverterUtils.toContainerId(containerIdString) From 917b077ca1e05a9bb44bcb91c33ed64a1d1c364c Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Tue, 4 Apr 2017 10:38:18 -0400 Subject: [PATCH 14/16] Change order of configuration setting so that everything works --- .../cluster/StandaloneSchedulerBackend.scala | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index dbd87bdb26f68..53eef89bb05b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -81,13 +81,6 @@ private[spark] class StandaloneSchedulerBackend( override def start() { - super.start() - launcherBackend.connect() - - - - ////////////////////////////////////////////////////// - ////////////////////////////////////////////////////// setupCredentials() @@ -104,6 +97,7 @@ private[spark] class StandaloneSchedulerBackend( // If we use principal and keytab to login, also credentials can be renewed some time // after current time, we should pass the next renewal and updating time to credential // renewer and updater. + if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() && nearestTimeOfNextRenewal != Long.MaxValue) { @@ -114,6 +108,9 @@ private[spark] class StandaloneSchedulerBackend( val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime + logInfo(s"Setting credential renewal time: ${renewalTime.toLong} ms," + + s" update time ${updateTime.toLong} ms") + conf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong) conf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong) } @@ -123,18 +120,13 @@ private[spark] class StandaloneSchedulerBackend( if (conf.contains(CREDENTIALS_FILE_PATH.key)) { // If a principal and keytab have been set, use that to create new credentials for executors // periodically - val hconf = SparkHadoopUtil.get.newConfiguration(conf) credentialRenewer = credentialManager.credentialRenewer() credentialRenewer.scheduleLoginFromKeytab() } // NOTE we don't need an updater since the above accomplishes it already - - ////////////////////////////////////////////////////// - ////////////////////////////////////////////////////// - - - + super.start() + launcherBackend.connect() // The endpoint for executors to talk to us val driverUrl = RpcEndpointAddress( From 16f9551ef1073160f13e9522600416d29388b85b Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Tue, 4 Apr 2017 11:48:09 -0400 Subject: [PATCH 15/16] Remove inadvertent file --- build/sbt-launch-0.13.13.jar.part | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 build/sbt-launch-0.13.13.jar.part diff --git a/build/sbt-launch-0.13.13.jar.part b/build/sbt-launch-0.13.13.jar.part deleted file mode 100644 index e69de29bb2d1d..0000000000000 From 246c76a82554ee20ed31202a6b12d92a823d68a2 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Tue, 4 Apr 2017 12:39:23 -0400 Subject: [PATCH 16/16] Cleanup code --- .../deploy/rest/RestSubmissionClient.scala | 29 +++---------- .../spark/deploy/worker/DriverRunner.scala | 36 +++++----------- .../spark/deploy/worker/ExecutorRunner.scala | 27 ++++-------- .../spark/internal/config/package.scala | 12 ------ .../CoarseGrainedSchedulerBackend.scala | 1 + .../cluster/StandaloneSchedulerBackend.scala | 22 ++-------- .../scala/org/apache/spark/util/Utils.scala | 13 +++++- .../org/apache/spark/deploy/yarn/config.scala | 41 +------------------ 8 files changed, 42 insertions(+), 139 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 1543e0f27e975..dead7867ca1a8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -28,9 +28,9 @@ import com.fasterxml.jackson.core.JsonProcessingException import org.apache.commons.codec.binary.Base64 import org.apache.commons.io import org.apache.hadoop.io.DataOutputBuffer -import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.BOOTSTRAP_TOKENS import org.apache.spark.util.Utils import org.apache.spark.{SparkConf, SparkException, SPARK_VERSION => sparkVersion} @@ -188,16 +188,14 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { message.sparkProperties = sparkProperties message.environmentVariables = environmentVariables - def base64EncodedValue(fn: DataOutputBuffer => Unit): String = { + def uti(fn: DataOutputBuffer => Unit): String = { val dob = new DataOutputBuffer fn(dob) dob.close() new String(Base64.encodeBase64(dob.getData)) } - ////////// - // Security propogation - + // Propagate kerberos credentials if necessary if (sparkProperties.contains(PRINCIPAL.key)) { val principal = sparkProperties.get(PRINCIPAL.key).get val keytab = sparkProperties.get(KEYTAB.key).orNull @@ -212,32 +210,15 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { logInfo("To enable the driver to login from keytab, credentials are are being copied" + " to the Master inside the CreateSubmissionRequest") - val keytabContent = base64EncodedValue { dob => + val keytabContent = Utils.base64EncodedValue { dob => io.IOUtils.copy(new FileInputStream(f), dob) } - message.environmentVariables += KEYTAB_CONTENT.key -> keytabContent + message.environmentVariables += BOOTSTRAP_TOKENS -> keytabContent // overwrite with localized version - message.environmentVariables += KEYTAB.key -> keytabFileName message.sparkProperties += KEYTAB.key -> keytabFileName } -// // Add credentials - works in the case of a user submitting a job -// // that completes < YARN max token renewal -// // The reason this is here is so you don't strictly need a keytab/principal -// // you only need that if you want to run a long running job... but maybe get rid of it? -// val credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials) -// logInfo(s"USERS CURRENT CREDENTAILS ${credentials}") -// if (credentials.getAllTokens.size() > 0) { -// val bootstrapCredentials = base64EncodedValue { dob => -// credentials.writeTokenStorageToStream(dob) -// } -// -// logInfo("Security tokens will be sent to driver and executors") -// message.environmentVariables += BOOTSTRAP_TOKENS.key -> bootstrapCredentials -// } - ////////// - message.validate() message } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index d38af6bfaf893..290a4c5488e1e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -19,22 +19,21 @@ package org.apache.spark.deploy.worker import java.io._ import java.net.URI -import java.nio.ByteBuffer import java.nio.charset.StandardCharsets -import scala.collection.JavaConverters._ import com.google.common.io.Files -import org.apache.commons.codec.binary.Base64 -import org.apache.commons.io.IOUtils -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages.DriverStateChanged import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.master.DriverState.DriverState +import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{BOOTSTRAP_TOKENS, CREDENTIALS_FILE_PATH, KEYTAB, KEYTAB_CONTENT} +import org.apache.spark.internal.config.KEYTAB +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.BOOTSTRAP_TOKENS import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils} +import org.apache.spark.{SecurityManager, SparkConf} + +import scala.collection.JavaConverters._ /** * Manages the execution of one driver, including automatically restarting the driver on failure. @@ -183,27 +182,14 @@ private[deploy] class DriverRunner( val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) - /////////////// - //////////////// we don't support user token propogation in cluster mode - //////////////// - //// -// if (driverDesc.command.environment.contains(BOOTSTRAP_TOKENS.key)) { -// val tokenFile = new File(driverDir, "driver-credentials-" + driverId) -// SparkHadoopUtil.get.decodeAndWriteToFile(driverDesc.command.environment, -// BOOTSTRAP_TOKENS.key, tokenFile) -// builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", tokenFile.toString) -// } - - if (driverDesc.command.environment.contains(KEYTAB_CONTENT.key)) { - val keytab = driverDesc.command.environment.get(KEYTAB.key).get + // We only support propagtation of credentials if keytab is passed + // since we can't renew indefinitely without a keytab + if (driverDesc.command.environment.contains(BOOTSTRAP_TOKENS) && conf.contains(KEYTAB)) { + val keytab = conf.get(KEYTAB).get val keytabFile = new File(driverDir, keytab) SparkHadoopUtil.get.decodeAndWriteToFile(driverDesc.command.environment, - KEYTAB_CONTENT.key, keytabFile) + BOOTSTRAP_TOKENS, keytabFile) } - ///////////////////////////// - //////////////////// - //////////////////// - runDriver(builder, driverDir, driverDesc.supervise) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index ec107c755ddaa..7681d5620f8ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -18,20 +18,19 @@ package org.apache.spark.deploy.worker import java.io._ -import java.nio.ByteBuffer import java.nio.charset.StandardCharsets -import scala.collection.JavaConverters._ import com.google.common.io.Files -import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.{SecurityManager, SparkConf, SparkContext} -import org.apache.spark.deploy.{ApplicationDescription, ExecutorState, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged +import org.apache.spark.deploy.{ApplicationDescription, ExecutorState, SparkHadoopUtil} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.BOOTSTRAP_TOKENS import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.util.{ShutdownHookManager, Utils} import org.apache.spark.util.logging.FileAppender +import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.BOOTSTRAP_TOKENS + +import scala.collection.JavaConverters._ /** * Manages the execution of one executor process. @@ -155,24 +154,16 @@ private[deploy] class ExecutorRunner( // parent process for the executor command builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") - - ///////////////////////////// - ///////////////////////////// - logInfo("Environment") - appDesc.command.environment.foreach(kv => logInfo(" * " + kv)) - - if (appDesc.command.environment.contains(BOOTSTRAP_TOKENS.key)) { + // Ensure delegation tokens are propagated if necessary + if (appDesc.command.environment.contains(BOOTSTRAP_TOKENS)) { val tokenFile = new File(executorDir, "executor-credentials-" + appId) SparkHadoopUtil.get.decodeAndWriteToFile(appDesc.command.environment, - BOOTSTRAP_TOKENS.key, tokenFile) + BOOTSTRAP_TOKENS, tokenFile) builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", tokenFile.toString) logInfo("Wrote HADOOP_TOKEN_FILE_LOCATION to " + tokenFile) } - ///////////////////////////// - ///////////////////////////// - // Add webUI log urls val baseUrl = if (conf.getBoolean("spark.ui.reverseProxy", false)) { diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0128e1a61aa57..fc89dd8f56eb0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -313,16 +313,4 @@ package object config { .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + "that hosts fs.defaultFS does not need to be listed here.") .fallbackConf(NAMENODES_TO_ACCESS) - - private[spark] val KEYTAB_CONTENT = ConfigBuilder("spark.yarn.keytab.content") - .doc("Base64 encoded content of spark.yarn.keytab") - .internal() - .stringConf - .createOptional - - private[spark] val BOOTSTRAP_TOKENS = ConfigBuilder("spark.deploy.bootstrap.tokens") - .doc("Base64 encoded tokens to propogate to driver/executors laucnhed in standalone mode") - .internal() - .stringConf - .createOptional } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4eedaaea61195..11365f50a2087 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -639,4 +639,5 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private[spark] object CoarseGrainedSchedulerBackend { val ENDPOINT_NAME = "CoarseGrainedScheduler" + val BOOTSTRAP_TOKENS = "SPARK_BOOTSTRAP_TOKENS" } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 53eef89bb05b7..bf4c42081c163 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -19,8 +19,6 @@ package org.apache.spark.scheduler.cluster import java.util.concurrent.Semaphore -import org.apache.commons.codec.binary.Base64 -import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener} import org.apache.spark.deploy.security.{AMCredentialRenewer, ConfigurableCredentialManager} @@ -30,6 +28,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.BOOTSTRAP_TOKENS import org.apache.spark.util.Utils import org.apache.spark.{SparkConf, SparkContext} @@ -157,31 +156,18 @@ private[spark] class StandaloneSchedulerBackend( Nil } - - ////////////////////// - ////////////////////// - - def base64EncodedValue(fn: DataOutputBuffer => Unit): String = { - val dob = new DataOutputBuffer - fn(dob) - dob.close() - new String(Base64.encodeBase64(dob.getData)) - } - + // Propagate security tokens to executors val bootstrap = if (credentials.getAllTokens.size() > 0) { - val bootstrapCredentials = base64EncodedValue { dob => + val bootstrapCredentials = Utils.base64EncodedValue { dob => credentials.writeTokenStorageToStream(dob) } logInfo("Security tokens will be sent to executors") - Map(BOOTSTRAP_TOKENS.key -> bootstrapCredentials) + Map(BOOTSTRAP_TOKENS -> bootstrapCredentials) } else Map.empty[String, String] val executorEnv = sc.executorEnvs.toMap ++ bootstrap - ////////////////////// - ////////////////////// - // Start executors with a few necessary configs for registering with the scheduler val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 943dde0723271..1a1e1b4040c67 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -40,20 +40,20 @@ import scala.reflect.ClassTag import scala.util.Try import scala.util.control.{ControlThrowable, NonFatal} import scala.util.matching.Regex - import _root_.io.netty.channel.unix.Errors.NativeIoException import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import com.google.common.io.{ByteStreams, Files => GFiles} import com.google.common.net.InetAddresses +import org.apache.commons.codec.binary.Base64 import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException import org.json4s._ import org.slf4j.Logger - import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging @@ -2627,6 +2627,15 @@ private[spark] object Utils extends Logging { redact(redactionPattern, kvs.toArray) } + /** + * Base64 encode the data in a DataOutputBuffer + */ + def base64EncodedValue(fn: DataOutputBuffer => Unit): String = { + val dob = new DataOutputBuffer + fn(dob) + dob.close() + new String(Base64.encodeBase64(dob.getData)) + } } private[util] object CallerContext extends Logging { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 6006216e1eab5..4ebbddde49a1f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -231,30 +231,6 @@ package object config { .stringConf .createOptional - /* Security configuration. */ - -// private[spark] val CREDENTIAL_FILE_MAX_COUNT = -// ConfigBuilder("spark.yarn.credentials.file.retention.count") -// .intConf -// .createWithDefault(5) -// -// private[spark] val CREDENTIALS_FILE_MAX_RETENTION = -// ConfigBuilder("spark.yarn.credentials.file.retention.days") -// .intConf -// .createWithDefault(5) - -// private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes") -// .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " + -// "fs.defaultFS does not need to be listed here.") -// .stringConf -// .toSequence -// .createWithDefault(Nil) -// -// private[spark] val FILESYSTEMS_TO_ACCESS = ConfigBuilder("spark.yarn.access.hadoopFileSystems") -// .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + -// "that hosts fs.defaultFS does not need to be listed here.") -// .fallbackConf(NAMENODES_TO_ACCESS) - /* Rolled log aggregation configuration. */ private[spark] val ROLLED_LOG_INCLUDE_PATTERN = @@ -273,11 +249,6 @@ package object config { /* Private configs. */ -// private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.yarn.credentials.file") -// .internal() -// .stringConf -// .createWithDefault(null) - // Internal config to propagate the location of the user's jar to the driver/executors private[spark] val APP_JAR = ConfigBuilder("spark.yarn.user.jar") .internal() @@ -330,17 +301,7 @@ package object config { .internal() .stringConf .createOptional - -// private[spark] val CREDENTIALS_RENEWAL_TIME = ConfigBuilder("spark.yarn.credentials.renewalTime") -// .internal() -// .timeConf(TimeUnit.MILLISECONDS) -// .createWithDefault(Long.MaxValue) -// -// private[spark] val CREDENTIALS_UPDATE_TIME = ConfigBuilder("spark.yarn.credentials.updateTime") -// .internal() -// .timeConf(TimeUnit.MILLISECONDS) -// .createWithDefault(Long.MaxValue) - + // The list of cache-related config entries. This is used by Client and the AM to clean // up the environment so that these settings do not appear on the web UI. private[yarn] val CACHE_CONFIGS = Seq(