From 19618aa8cea21ebea9003fd171141e0345842630 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jul 2018 10:48:57 -0700 Subject: [PATCH 01/25] [SPARK-23146] Support client mode. Client mode works more or less identically to cluster mode. However, in client mode, the Spark Context needs to be manually bootstrapped with certain properties which would have otherwise been set up by spark-submit in cluster mode. Specifically: - The user must provide a pod name for the driver. This implies that all drivers in client mode must be running inside a pod. This pod is primarily used to create the owner reference graph so that executors are not orphaned if the driver pod is deleted. - The user must provide a host (spark.driver.host) and port (spark.driver.port) that the executors can connect to. When using spark-submit in cluster mode, spark-submit generates the headless service automatically; in client mode, the user is responsible for setting up their own connectivity. --- .../org/apache/spark/deploy/k8s/Config.scala | 1 + .../cluster/k8s/ExecutorPodsAllocator.scala | 9 ++- .../k8s/KubernetesClusterManager.scala | 16 +++-- .../k8s/integrationtest/KubernetesSuite.scala | 66 +++++++++++++++++++ 4 files changed, 83 insertions(+), 9 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index bf33179ae3dab..1323b5926d86c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -65,6 +65,7 @@ private[spark] object Config extends Logging { "spark.kubernetes.authenticate.driver" val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted" + val KUBERNETES_AUTH_CLIENT_MODE_PREFIX = "spark.kubernetes.authenticate" val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 5a143ad3600fd..ac2de47a7c7f2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -46,13 +46,18 @@ private[spark] class ExecutorPodsAllocator( private val podCreationTimeout = math.max(podAllocationDelay * 5, 60000) + private val namespace = conf.get(KUBERNETES_NAMESPACE) + private val kubernetesDriverPodName = conf .get(KUBERNETES_DRIVER_POD_NAME) .getOrElse(throw new SparkException("Must specify the driver pod name")) - private val driverPod = kubernetesClient.pods() + private val driverPod = Option(kubernetesClient.pods() .withName(kubernetesDriverPodName) - .get() + .get()) + .getOrElse(throw new SparkException( + s"No pod was found named $kubernetesDriverPodName in the cluster in the" + + s" namespace $namespace (this was supposed to be the driver pod.).")) // Executor IDs that have been requested from Kubernetes but have not been detected in any // snapshot yet. Mapped to the timestamp when they were created. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index de2a52bc7a0b8..05ec8ec853648 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster.k8s import java.io.File +import java.util.Locale import java.util.concurrent.TimeUnit import com.google.common.cache.CacheBuilder @@ -35,12 +36,6 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { - if (masterURL.startsWith("k8s") && - sc.deployMode == "client" && - !sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) { - throw new SparkException("Client mode is currently not supported for Kubernetes.") - } - new TaskSchedulerImpl(sc) } @@ -48,10 +43,17 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { + val wasSparkSubmitted = sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false) + val authConfPrefix = if (wasSparkSubmitted) { + KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX + } else { + KUBERNETES_AUTH_CLIENT_MODE_PREFIX + } + val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( KUBERNETES_MASTER_INTERNAL_URL, Some(sc.conf.get(KUBERNETES_NAMESPACE)), - KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + authConfPrefix, sc.conf, Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 6e334c83fbde8..23c4f01f41c9f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -156,6 +156,72 @@ private[spark] class KubernetesSuite extends SparkFunSuite }) } + test("Run in client mode.") { + val labels = Map("spark-app-selector" -> driverPodName) + val driverPort = 7077 + val blockManagerPort = 10000 + val driverService = testBackend.getKubernetesClient.services().createNew() + .withNewMetadata() + .withName(s"$driverPodName-svc") + .endMetadata() + .withNewSpec() + .withClusterIP("None") + .withSelector(labels.asJava) + .addNewPort() + .withName("driver-port") + .withPort(driverPort) + .withNewTargetPort(driverPort) + .endPort() + .addNewPort() + .withName("block-manager") + .withPort(blockManagerPort) + .withNewTargetPort(blockManagerPort) + .endPort() + .endSpec() + .done() + try { + val driverPod = testBackend.getKubernetesClient.pods().createNew() + .withNewMetadata() + .withName(driverPodName) + .withLabels(labels.asJava) + .endMetadata() + .withNewSpec() + .addNewContainer() + .withImage(image) + .withImagePullPolicy("IfNotPresent") + .withCommand("/opt/spark/bin/run-example") + .addToArgs("--master", s"k8s://${kubernetesTestComponents.clientConfig.getMasterUrl}") + .addToArgs("--deploy-mode", "client") + .addToArgs( + "--conf", + s"spark.kubernetes.namespace=${kubernetesTestComponents.namespace}") + .addToArgs("--conf", "spark.kubernetes.driver.pod.name=driverPodName") + .addToArgs("--conf", "spark.executor.memory=500m") + .addToArgs("--conf", "spark.executor.cores=1") + .addToArgs("--conf", "spark.executor.instances=1") + .addToArgs("--conf", + s"spark.driver.host=" + + s"${driverService.getMetadata.getName}.${kubernetesTestComponents.namespace}.svc") + .addToArgs("--conf", s"spark.driver.port=$driverPort") + .addToArgs("--conf", s"spark.driver.blockManager.port=$blockManagerPort") + .addToArgs("SparkPi") + .addToArgs("10") + .endContainer() + .endSpec() + .done() + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPodName) + .getLog + .contains("Pi is roughly 3"), "The application did not complete.") + } + } finally { + // Have to delete the service manually since it doesn't have an owner reference + kubernetesTestComponents.kubernetesClient.services.delete(driverService) + } + } + // TODO(ssuchter): Enable the below after debugging // test("Run PageRank using remote data file") { // sparkAppConf From 4bab48bf596dbb2a18c054f300dfb0606b001b39 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jul 2018 11:46:18 -0700 Subject: [PATCH 02/25] Allow driver pod name to be optional. --- .../spark/deploy/k8s/KubernetesConf.scala | 4 ++-- .../features/BasicExecutorFeatureStep.scala | 19 ++++++++++--------- .../cluster/k8s/ExecutorPodsAllocator.scala | 16 ++++++++-------- .../k8s/KubernetesClusterManager.scala | 6 ++++++ .../BasicExecutorFeatureStepSuite.scala | 6 +++--- .../k8s/integrationtest/KubernetesSuite.scala | 4 ++-- 6 files changed, 31 insertions(+), 24 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index b0ccaa36b01ed..c4b2307c0a53c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -43,7 +43,7 @@ private[spark] case class KubernetesDriverSpecificConf( */ private[spark] case class KubernetesExecutorSpecificConf( executorId: String, - driverPod: Pod) + driverPod: Option[Pod]) extends KubernetesRoleSpecificConf /** @@ -178,7 +178,7 @@ private[spark] object KubernetesConf { sparkConf: SparkConf, executorId: String, appId: String, - driverPod: Pod): KubernetesConf[KubernetesExecutorSpecificConf] = { + driverPod: Option[Pod]): KubernetesConf[KubernetesExecutorSpecificConf] = { val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) require( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 91c54a9776982..241ddbf1d8cd9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder} +import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkException import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod} @@ -152,19 +152,20 @@ private[spark] class BasicExecutorFeatureStep( .build() }.getOrElse(executorContainer) val driverPod = kubernetesConf.roleSpecificConf.driverPod + val ownerReference = driverPod.map(pod => + new OwnerReferenceBuilder() + .withController(true) + .withApiVersion(pod.getApiVersion) + .withKind(pod.getKind) + .withName(pod.getMetadata.getName) + .withUid(pod.getMetadata.getUid) + .build()) val executorPod = new PodBuilder(pod.pod) .editOrNewMetadata() .withName(name) .withLabels(kubernetesConf.roleLabels.asJava) .withAnnotations(kubernetesConf.roleAnnotations.asJava) - .withOwnerReferences() - .addNewOwnerReference() - .withController(true) - .withApiVersion(driverPod.getApiVersion) - .withKind(driverPod.getKind) - .withName(driverPod.getMetadata.getName) - .withUid(driverPod.getMetadata.getUid) - .endOwnerReference() + .addToOwnerReferences(ownerReference.toSeq: _*) .endMetadata() .editOrNewSpec() .withHostname(hostname) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index ac2de47a7c7f2..77bb9c3fcc9f4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -50,14 +50,14 @@ private[spark] class ExecutorPodsAllocator( private val kubernetesDriverPodName = conf .get(KUBERNETES_DRIVER_POD_NAME) - .getOrElse(throw new SparkException("Must specify the driver pod name")) - - private val driverPod = Option(kubernetesClient.pods() - .withName(kubernetesDriverPodName) - .get()) - .getOrElse(throw new SparkException( - s"No pod was found named $kubernetesDriverPodName in the cluster in the" + - s" namespace $namespace (this was supposed to be the driver pod.).")) + + private val driverPod = kubernetesDriverPodName + .map(name => Option(kubernetesClient.pods() + .withName(name) + .get()) + .getOrElse(throw new SparkException( + s"No pod was found named $kubernetesDriverPodName in the cluster in the " + + s"namespace $namespace (this was supposed to be the driver pod.)."))) // Executor IDs that have been requested from Kubernetes but have not been detected in any // snapshot yet. Mapped to the timestamp when they were created. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 05ec8ec853648..8005d6974dd75 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -50,6 +50,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit KUBERNETES_AUTH_CLIENT_MODE_PREFIX } + val apiServerUrl = if (wasSparkSubmitted) { + KUBERNETES_MASTER_INTERNAL_URL + } else { + masterURL.substring("k8s://".length()) + } + val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( KUBERNETES_MASTER_INTERNAL_URL, Some(sc.conf.get(KUBERNETES_NAMESPACE)), diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index f06030aa55c0c..58c9982df330c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -81,7 +81,7 @@ class BasicExecutorFeatureStepSuite val step = new BasicExecutorFeatureStep( KubernetesConf( baseConf, - KubernetesExecutorSpecificConf("1", DRIVER_POD), + KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)), RESOURCE_NAME_PREFIX, APP_ID, LABELS, @@ -120,7 +120,7 @@ class BasicExecutorFeatureStepSuite val step = new BasicExecutorFeatureStep( KubernetesConf( conf, - KubernetesExecutorSpecificConf("1", DRIVER_POD), + KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)), longPodNamePrefix, APP_ID, LABELS, @@ -140,7 +140,7 @@ class BasicExecutorFeatureStepSuite val step = new BasicExecutorFeatureStep( KubernetesConf( conf, - KubernetesExecutorSpecificConf("1", DRIVER_POD), + KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)), RESOURCE_NAME_PREFIX, APP_ID, LABELS, diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 23c4f01f41c9f..4ce1525bbcbf1 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -190,12 +190,12 @@ private[spark] class KubernetesSuite extends SparkFunSuite .withImage(image) .withImagePullPolicy("IfNotPresent") .withCommand("/opt/spark/bin/run-example") - .addToArgs("--master", s"k8s://${kubernetesTestComponents.clientConfig.getMasterUrl}") + .addToArgs("--master", s"k8s://https://kubernetes.default.svc") .addToArgs("--deploy-mode", "client") .addToArgs( "--conf", s"spark.kubernetes.namespace=${kubernetesTestComponents.namespace}") - .addToArgs("--conf", "spark.kubernetes.driver.pod.name=driverPodName") + .addToArgs("--conf", s"spark.kubernetes.driver.pod.name=$driverPodName") .addToArgs("--conf", "spark.executor.memory=500m") .addToArgs("--conf", "spark.executor.cores=1") .addToArgs("--conf", "spark.executor.instances=1") From 94ed1cca627816cd6b990d1ca301292047dcae70 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jul 2018 11:47:33 -0700 Subject: [PATCH 03/25] Add driver pod name check for cluster mode --- .../scheduler/cluster/k8s/KubernetesClusterManager.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 8005d6974dd75..d000fbb54b4f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -56,6 +56,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit masterURL.substring("k8s://".length()) } + if (wasSparkSubmitted) { + require(sc.conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, + "If the application is deployed using spark-submit, the driver pod name must" + + " be provided.") + } + val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( KUBERNETES_MASTER_INTERNAL_URL, Some(sc.conf.get(KUBERNETES_NAMESPACE)), From 560993eff1ed4d44fd3d530978ea429adf104de1 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jul 2018 12:01:11 -0700 Subject: [PATCH 04/25] Fix build --- .../scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala | 2 +- .../cluster/k8s/KubernetesExecutorBuilderSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 0c19f5946b75f..e847f8590d353 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -166,7 +166,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { conf, executorSpecificConf.executorId, TEST_SPARK_APP_ID, - driverPod) + Some(driverPod)) k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap && // Since KubernetesConf.createExecutorConf clones the SparkConf object, force // deep equality comparison for the SparkConf object and use object equality diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index a6bc8bce32926..5255e14d35fcc 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -47,7 +47,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesExecutorSpecificConf( - "executor-id", new PodBuilder().build()), + "executor-id", Some(new PodBuilder().build())), "prefix", "appId", Map.empty, @@ -64,7 +64,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesExecutorSpecificConf( - "executor-id", new PodBuilder().build()), + "executor-id", Some(new PodBuilder().build())), "prefix", "appId", Map.empty, From e961fd3c80adfe0d06c7e7dcd2ba46299d910658 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jul 2018 12:04:17 -0700 Subject: [PATCH 05/25] Remove unused imports --- .../scheduler/cluster/k8s/KubernetesClusterManager.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index d000fbb54b4f7..74b9b6cacdbd4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -17,16 +17,15 @@ package org.apache.spark.scheduler.cluster.k8s import java.io.File -import java.util.Locale import java.util.concurrent.TimeUnit import com.google.common.cache.CacheBuilder import io.fabric8.kubernetes.client.Config -import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory} +import org.apache.spark.SparkContext import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.util.{SystemClock, ThreadUtils} From a00561f970e45d0bda31b79600e4ab0e37fcaf56 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jul 2018 12:07:30 -0700 Subject: [PATCH 06/25] Fix build --- .../org/apache/spark/deploy/k8s/KubernetesConfSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index 661f942435921..ecdb71359c5bb 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -184,9 +184,9 @@ class KubernetesConfSuite extends SparkFunSuite { new SparkConf(false), EXECUTOR_ID, APP_ID, - DRIVER_POD) + Some(DRIVER_POD)) assert(conf.roleSpecificConf.executorId === EXECUTOR_ID) - assert(conf.roleSpecificConf.driverPod === DRIVER_POD) + assert(conf.roleSpecificConf.driverPod.get === DRIVER_POD) } test("Image pull secrets.") { @@ -195,7 +195,7 @@ class KubernetesConfSuite extends SparkFunSuite { .set(IMAGE_PULL_SECRETS, "my-secret-1,my-secret-2 "), EXECUTOR_ID, APP_ID, - DRIVER_POD) + Some(DRIVER_POD)) assert(conf.imagePullSecrets() === Seq( new LocalObjectReferenceBuilder().withName("my-secret-1").build(), @@ -221,7 +221,7 @@ class KubernetesConfSuite extends SparkFunSuite { sparkConf, EXECUTOR_ID, APP_ID, - DRIVER_POD) + Some(DRIVER_POD)) assert(conf.roleLabels === Map( SPARK_EXECUTOR_ID_LABEL -> EXECUTOR_ID, SPARK_APP_ID_LABEL -> APP_ID, From a2609b0a7a26768290e1de0066376ef1b549d054 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jul 2018 12:21:28 -0700 Subject: [PATCH 07/25] More compilation fixes. --- .../spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala index af6b35eae484a..9314ece847f8f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala @@ -37,7 +37,7 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{ val sparkConf = new SparkConf(false) val kubernetesConf = KubernetesConf( sparkConf, - KubernetesExecutorSpecificConf("1", new PodBuilder().build()), + KubernetesExecutorSpecificConf("1", Some(new PodBuilder().build())), "resource-name-prefix", "app-id", Map.empty, From 97f128449d32bda7dee9265b1426b1970b4f4c60 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jul 2018 12:25:54 -0700 Subject: [PATCH 08/25] Fix build again --- .../deploy/k8s/features/MountSecretsFeatureStepSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala index eff75b8a15daa..9c6767eaca137 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala @@ -35,7 +35,7 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite { val sparkConf = new SparkConf(false) val kubernetesConf = KubernetesConf( sparkConf, - KubernetesExecutorSpecificConf("1", new PodBuilder().build()), + KubernetesExecutorSpecificConf("1", Some(new PodBuilder().build())), "resource-name-prefix", "app-id", Map.empty, From 9a178308cc40ed38908657d4a0c372f980204c12 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jul 2018 12:49:11 -0700 Subject: [PATCH 09/25] Small change to force build to re-run --- .../scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index 5255e14d35fcc..abbe3fc2a66d5 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -27,7 +27,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { private val SECRETS_STEP_TYPE = "mount-secrets" private val ENV_SECRETS_STEP_TYPE = "env-secrets" private val LOCAL_DIRS_STEP_TYPE = "local-dirs" - private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep]) private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( From 64cc39ba9ef623a750ab4e939fd66711411dc235 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jul 2018 13:46:06 -0700 Subject: [PATCH 10/25] Fix build --- .../scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index 55649334977c5..44fe4a24e1102 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -96,7 +96,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesExecutorSpecificConf( - "executor-id", new PodBuilder().build()), + "executor-id", Some(new PodBuilder().build())), "prefix", "appId", Map.empty, From 0f6ad738fd6db20e583481881ea2dabf490bc707 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jul 2018 14:48:00 -0700 Subject: [PATCH 11/25] Fix integration test. --- .../k8s/KubernetesClusterManager.scala | 32 +++++++++---------- .../k8s/integrationtest/KubernetesSuite.scala | 5 +++ 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 74b9b6cacdbd4..3a141441e60cf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -43,31 +43,31 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { val wasSparkSubmitted = sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false) - val authConfPrefix = if (wasSparkSubmitted) { - KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX - } else { - KUBERNETES_AUTH_CLIENT_MODE_PREFIX - } - - val apiServerUrl = if (wasSparkSubmitted) { - KUBERNETES_MASTER_INTERNAL_URL - } else { - masterURL.substring("k8s://".length()) - } - - if (wasSparkSubmitted) { + val (authConfPrefix, + apiServerUri, + defaultServiceAccountToken, + defaultServiceAccountCaCrt) = if (wasSparkSubmitted) { require(sc.conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, "If the application is deployed using spark-submit, the driver pod name must" + " be provided.") + (KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + KUBERNETES_MASTER_INTERNAL_URL, + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + } else { + (KUBERNETES_AUTH_CLIENT_MODE_PREFIX, + masterURL.substring("k8s://".length()), + None, + None) } val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( - KUBERNETES_MASTER_INTERNAL_URL, + apiServerUri, Some(sc.conf.get(KUBERNETES_NAMESPACE)), authConfPrefix, sc.conf, - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + defaultServiceAccountToken, + defaultServiceAccountCaCrt) val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( "kubernetes-executor-requests") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 4ce1525bbcbf1..ea201867137f2 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -186,6 +186,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite .withLabels(labels.asJava) .endMetadata() .withNewSpec() + .withServiceAccountName("default") .addNewContainer() .withImage(image) .withImagePullPolicy("IfNotPresent") @@ -195,6 +196,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite .addToArgs( "--conf", s"spark.kubernetes.namespace=${kubernetesTestComponents.namespace}") + .addToArgs("--conf", "spark.kubernetes.authenticate.oauthTokenFile=" + + "/var/run/secrets/kubernetes.io/serviceaccount/token") + .addToArgs("--conf", "spark.kubernetes.authenticate.caCertFile=" + + "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") .addToArgs("--conf", s"spark.kubernetes.driver.pod.name=$driverPodName") .addToArgs("--conf", "spark.executor.memory=500m") .addToArgs("--conf", "spark.executor.cores=1") From 88a9d7fa94e17e55f8e28d8922cff759625b1e42 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 11 Jul 2018 15:26:07 -0700 Subject: [PATCH 12/25] Add namespaces for client mode test --- .../k8s/integrationtest/KubernetesSuite.scala | 54 +++++++++++-------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index ea201867137f2..6e58595f834a2 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -160,27 +160,35 @@ private[spark] class KubernetesSuite extends SparkFunSuite val labels = Map("spark-app-selector" -> driverPodName) val driverPort = 7077 val blockManagerPort = 10000 - val driverService = testBackend.getKubernetesClient.services().createNew() - .withNewMetadata() - .withName(s"$driverPodName-svc") - .endMetadata() - .withNewSpec() - .withClusterIP("None") - .withSelector(labels.asJava) - .addNewPort() - .withName("driver-port") - .withPort(driverPort) - .withNewTargetPort(driverPort) - .endPort() - .addNewPort() - .withName("block-manager") - .withPort(blockManagerPort) - .withNewTargetPort(blockManagerPort) - .endPort() - .endSpec() - .done() + val driverService = testBackend + .getKubernetesClient + .services() + .inNamespace(kubernetesTestComponents.namespace) + .createNew() + .withNewMetadata() + .withName(s"$driverPodName-svc") + .endMetadata() + .withNewSpec() + .withClusterIP("None") + .withSelector(labels.asJava) + .addNewPort() + .withName("driver-port") + .withPort(driverPort) + .withNewTargetPort(driverPort) + .endPort() + .addNewPort() + .withName("block-manager") + .withPort(blockManagerPort) + .withNewTargetPort(blockManagerPort) + .endPort() + .endSpec() + .done() try { - val driverPod = testBackend.getKubernetesClient.pods().createNew() + val driverPod = testBackend + .getKubernetesClient + .pods() + .inNamespace(kubernetesTestComponents.namespace) + .createNew() .withNewMetadata() .withName(driverPodName) .withLabels(labels.asJava) @@ -223,7 +231,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite } } finally { // Have to delete the service manually since it doesn't have an owner reference - kubernetesTestComponents.kubernetesClient.services.delete(driverService) + kubernetesTestComponents + .kubernetesClient + .services() + .inNamespace(kubernetesTestComponents.namespace) + .delete(driverService) } } From 5785ce7b9674891715cac4e3f11860395caae564 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 13 Jul 2018 12:15:45 -0700 Subject: [PATCH 13/25] Add container name for client mode test. --- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 6e58595f834a2..1adbdc18838fb 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -196,6 +196,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite .withNewSpec() .withServiceAccountName("default") .addNewContainer() + .withName("spark-example") .withImage(image) .withImagePullPolicy("IfNotPresent") .withCommand("/opt/spark/bin/run-example") From 75db0632ff124e5df0d8dc98ff13ed2f1aaa440e Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 13 Jul 2018 13:21:07 -0700 Subject: [PATCH 14/25] Fix dockerfile bug --- .../kubernetes/docker/src/main/dockerfiles/spark/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile index 9badf8556afc3..42a670174eae1 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile @@ -31,7 +31,7 @@ RUN set -ex && \ apk upgrade --no-cache && \ apk add --no-cache bash tini libc6-compat && \ mkdir -p /opt/spark && \ - mkdir -p /opt/spark/work-dir \ + mkdir -p /opt/spark/work-dir && \ touch /opt/spark/RELEASE && \ rm /bin/sh && \ ln -sv /bin/bash /bin/sh && \ From 846f093fa2dac36fca10b3d9d5a1a2d865fde056 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 13 Jul 2018 13:46:08 -0700 Subject: [PATCH 15/25] Add container image to client mode test --- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 1adbdc18838fb..ed890eb69d33c 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -202,6 +202,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite .withCommand("/opt/spark/bin/run-example") .addToArgs("--master", s"k8s://https://kubernetes.default.svc") .addToArgs("--deploy-mode", "client") + .addToArgs("--conf", s"spark.kubernetes.container.image=$image") .addToArgs( "--conf", s"spark.kubernetes.namespace=${kubernetesTestComponents.namespace}") From 69bf2a46ae0ce473c813a94ad7e8339a4a5fe599 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 13 Jul 2018 13:53:24 -0700 Subject: [PATCH 16/25] Some cleanup and rewording --- .../main/scala/org/apache/spark/deploy/k8s/Config.scala | 2 +- .../scheduler/cluster/k8s/KubernetesClusterManager.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 28c9f0650bcd6..968679df60367 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -91,7 +91,7 @@ private[spark] object Config extends Logging { ConfigBuilder("spark.kubernetes.submitInDriver") .internal() .booleanConf - .createOptional + .createWithDefault(false) val KUBERNETES_EXECUTOR_LIMIT_CORES = ConfigBuilder("spark.kubernetes.executor.limit.cores") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 3a141441e60cf..fe9ae83c29409 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -42,14 +42,14 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { - val wasSparkSubmitted = sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false) + val wasSparkSubmittedInClusterMode = sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK) val (authConfPrefix, apiServerUri, defaultServiceAccountToken, - defaultServiceAccountCaCrt) = if (wasSparkSubmitted) { + defaultServiceAccountCaCrt) = if (wasSparkSubmittedInClusterMode) { require(sc.conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, - "If the application is deployed using spark-submit, the driver pod name must" + - " be provided.") + "If the application is deployed using spark-submit in cluster mode, the driver pod name" + + " must be provided.") (KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, KUBERNETES_MASTER_INTERNAL_URL, Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), From ee5c267206f574d1a18d03f27aa6d7f9cf16eb95 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 13 Jul 2018 17:04:54 -0700 Subject: [PATCH 17/25] Documentation --- docs/running-on-kubernetes.md | 130 ++++++++++++++++++++++++++-------- 1 file changed, 102 insertions(+), 28 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 7149616e534aa..3c0d82534a621 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -117,6 +117,37 @@ If the local proxy is running at localhost:8001, `--master k8s://http://127.0.0. spark-submit. Finally, notice that in the above example we specify a jar with a specific URI with a scheme of `local://`. This URI is the location of the example jar that is already in the Docker image. +## Client Mode + +Starting with Spark 2.4.0, it is possible to run Spark applications on Kubernetes in client mode. When running a Spark +application in client mode, a separate pod is not deployed to run the driver. When running an application in +client mode, it is recommended to account for the following factors: + +### Client Mode Networking + +Spark executors must be able to connect to the Spark driver over a hostname and a port that is routable from the Spark +executors. The specific network configuration that will be required for Spark to work in client mode will vary per +setup. If you run your driver inside a Kubernetes pod, you can use a +[headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) to allow your +driver pod to be routable from the executors by a stable hostname. Specify the driver's hostname via `spark.driver.host` +and your spark driver's port to `spark.driver.port`. + +### Client Mode Garbage Collection + +If you run your Spark driver in a pod, it is highly recommended to set `spark.driver.pod.name` to the name of that pod. +When this property is set, the Spark scheduler will deploy the executor pods with an +[owner reference](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/), which in turn will +ensure that once the driver pod is deleted from the cluster, all of the application's executor pods will also be deleted. + +If your driver is not running inside a pod, keep in mind that the executor pods may not be deleted from the cluster +when the application exits. The Spark scheduler attempts to delete these pods, but if the network request to the API +server fails for any reason, these pods will remain in the cluster. The executor processes should exit when they cannot +reach the driver, so the executor pods should not consume resources in the cluster after your application exits. + +### Authentication Parameters + +Use the exact prefix `spark.kubernetes.authenticate` for Kubernetes authentication parameters in client mode. + ## Dependency Management If your application's dependencies are all hosted in remote locations like HDFS or HTTP servers, they may be referred to @@ -258,10 +289,6 @@ RBAC authorization and how to configure Kubernetes service accounts for pods, pl [Using RBAC Authorization](https://kubernetes.io/docs/admin/authorization/rbac/) and [Configure Service Accounts for Pods](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/). -## Client Mode - -Client mode is not currently supported. - ## Future Work There are several Spark on Kubernetes features that are currently being incubated in a fork - @@ -354,7 +381,7 @@ specific to Spark on Kubernetes. Path to the CA cert file for connecting to the Kubernetes API server over TLS when starting the driver. This file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide - a scheme). + a scheme). In client mode, use spark.kubernetes.authenticate.caCertFile instead. @@ -363,7 +390,7 @@ specific to Spark on Kubernetes. Path to the client key file for authenticating against the Kubernetes API server when starting the driver. This file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not provide - a scheme). + a scheme). In client mode, use spark.kubernetes.authenticate.clientKeyFile instead. @@ -372,7 +399,7 @@ specific to Spark on Kubernetes. Path to the client cert file for authenticating against the Kubernetes API server when starting the driver. This file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not - provide a scheme). + provide a scheme). In client mode, use spark.kubernetes.authenticate.clientCertFile instead. @@ -381,7 +408,7 @@ specific to Spark on Kubernetes. OAuth token to use when authenticating against the Kubernetes API server when starting the driver. Note that unlike the other authentication options, this is expected to be the exact string value of the token to use for - the authentication. + the authentication. In client mode, use spark.kubernetes.authenticate.oauthToken instead. @@ -390,7 +417,7 @@ specific to Spark on Kubernetes. Path to the OAuth token file containing the token to use when authenticating against the Kubernetes API server when starting the driver. This file must be located on the submitting machine's disk. Specify this as a path as opposed to a URI (i.e. do not - provide a scheme). + provide a scheme). In client mode, use spark.kubernetes.authenticate.oauthTokenFile instead. @@ -399,7 +426,8 @@ specific to Spark on Kubernetes. Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod. - Specify this as a path as opposed to a URI (i.e. do not provide a scheme). + Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use + spark.kubernetes.authenticate.caCertFile instead. @@ -407,10 +435,9 @@ specific to Spark on Kubernetes. (none) Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting - executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod. - Specify this as a path as opposed to a URI (i.e. do not provide a scheme). If this is specified, it is highly - recommended to set up TLS for the driver submission server, as this value is sensitive information that would be - passed to the driver pod in plaintext otherwise. + executors. This file must be located on the submitting machine's disk, and will be uploaded to the driver pod as + a Kubernetes secret. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). + In client mode, use spark.kubernetes.authenticate.clientKeyFile instead. @@ -419,7 +446,8 @@ specific to Spark on Kubernetes. Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when requesting executors. This file must be located on the submitting machine's disk, and will be uploaded to the - driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). + driver pod as a Kubernetes secret. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). + In client mode, use spark.kubernetes.authenticate.clientCertFile instead. @@ -428,9 +456,8 @@ specific to Spark on Kubernetes. OAuth token to use when authenticating against the Kubernetes API server from the driver pod when requesting executors. Note that unlike the other authentication options, this must be the exact string value of - the token to use for the authentication. This token value is uploaded to the driver pod. If this is specified, it is - highly recommended to set up TLS for the driver submission server, as this value is sensitive information that would - be passed to the driver pod in plaintext otherwise. + the token to use for the authentication. This token value is uploaded to the driver pod as a Kubernetes secret. + In client mode, use spark.kubernetes.authenticate.oauthToken instead. @@ -439,9 +466,8 @@ specific to Spark on Kubernetes. Path to the OAuth token file containing the token to use when authenticating against the Kubernetes API server from the driver pod when requesting executors. Note that unlike the other authentication options, this file must contain the exact string value of - the token to use for the authentication. This token value is uploaded to the driver pod. If this is specified, it is - highly recommended to set up TLS for the driver submission server, as this value is sensitive information that would - be passed to the driver pod in plaintext otherwise. + the token to use for the authentication. This token value is uploaded to the driver pod as a secret. In client mode, use + spark.kubernetes.authenticate.oauthTokenFile instead. @@ -450,7 +476,8 @@ specific to Spark on Kubernetes. Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting executors. This path must be accessible from the driver pod. - Specify this as a path as opposed to a URI (i.e. do not provide a scheme). + Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use + spark.kubernetes.authenticate.caCertFile instead. @@ -459,7 +486,8 @@ specific to Spark on Kubernetes. Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting executors. This path must be accessible from the driver pod. - Specify this as a path as opposed to a URI (i.e. do not provide a scheme). + Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use + spark.kubernetes.authenticate.clientKeyFile instead. @@ -468,7 +496,8 @@ specific to Spark on Kubernetes. Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when requesting executors. This path must be accessible from the driver pod. - Specify this as a path as opposed to a URI (i.e. do not provide a scheme). + Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use + spark.kubernetes.authenticate.clientCertFile instead. @@ -477,7 +506,8 @@ specific to Spark on Kubernetes. Path to the file containing the OAuth token to use when authenticating against the Kubernetes API server from the driver pod when requesting executors. This path must be accessible from the driver pod. - Note that unlike the other authentication options, this file must contain the exact string value of the token to use for the authentication. + Note that unlike the other authentication options, this file must contain the exact string value of the token to use + for the authentication. In client mode, use spark.kubernetes.authenticate.oauthTokenFile instead. @@ -486,7 +516,48 @@ specific to Spark on Kubernetes. Service account that is used when running the driver pod. The driver pod uses this service account when requesting executor pods from the API server. Note that this cannot be specified alongside a CA cert file, client key file, - client cert file, and/or OAuth token. + client cert file, and/or OAuth token. In client mode, use spark.kubernetes.authenticate.serviceAccountName instead. + + + + spark.kubernetes.authenticate.caCertFile + (none) + + In client mode, path to the CA cert file for connecting to the Kubernetes API server over TLS when + requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). + + + + spark.kubernetes.authenticate.clientKeyFile + (none) + + In client mode, path to the client key file for authenticating against the Kubernetes API server + when requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). + + + + spark.kubernetes.authenticate.clientCertFile + (none) + + In client mode, path to the client cert file for authenticating against the Kubernetes API server + when requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). + + + + spark.kubernetes.authenticate.oauthToken + (none) + + In client mode, the OAuth token to use when authenticating against the Kubernetes API server when + requesting executors. Note that unlike the other authentication options, this must be the exact string value of + the token to use for the authentication. + + + + spark.kubernetes.authenticate.oauthTokenFile + (none) + + In client mode, path to the file containing the OAuth token to use when authenticating against the Kubernetes API + server from the driver pod when requesting executors. @@ -529,8 +600,11 @@ specific to Spark on Kubernetes. spark.kubernetes.driver.pod.name (none) - Name of the driver pod. If not set, the driver pod name is set to "spark.app.name" suffixed by the current timestamp - to avoid name conflicts. + Name of the driver pod. In cluster mode, if this is not set, the driver pod name is set to "spark.app.name" + suffixed by the current timestamp to avoid name conflicts. In client mode, if your application is running + inside a pod, it is highly recommended to set this to the name of the pod your driver is running in. Setting this + value in client mode allows the driver to inform the cluster that your application's executor pods should be + deleted when the driver pod is deleted. From bc38be7a41bb0e36b416a94da73f5e683a449cb9 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 16 Jul 2018 11:21:26 -0700 Subject: [PATCH 18/25] Address comments. --- docs/running-on-kubernetes.md | 12 ++++++------ .../cluster/k8s/KubernetesClusterManager.scala | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 3c0d82534a621..fa867e84127f3 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -120,8 +120,8 @@ This URI is the location of the example jar that is already in the Docker image. ## Client Mode Starting with Spark 2.4.0, it is possible to run Spark applications on Kubernetes in client mode. When running a Spark -application in client mode, a separate pod is not deployed to run the driver. When running an application in -client mode, it is recommended to account for the following factors: +application in client mode, a separate pod is not deployed to run the driver. Your Spark driver does not need to run in +a Kubernetes pod. When running an application in client mode, it is recommended to account for the following factors: ### Client Mode Networking @@ -132,7 +132,7 @@ setup. If you run your driver inside a Kubernetes pod, you can use a driver pod to be routable from the executors by a stable hostname. Specify the driver's hostname via `spark.driver.host` and your spark driver's port to `spark.driver.port`. -### Client Mode Garbage Collection +### Client Mode Executor Pod Garbage Collection If you run your Spark driver in a pod, it is highly recommended to set `spark.driver.pod.name` to the name of that pod. When this property is set, the Spark scheduler will deploy the executor pods with an @@ -557,7 +557,7 @@ specific to Spark on Kubernetes. (none) In client mode, path to the file containing the OAuth token to use when authenticating against the Kubernetes API - server from the driver pod when requesting executors. + server when requesting executors. @@ -603,8 +603,8 @@ specific to Spark on Kubernetes. Name of the driver pod. In cluster mode, if this is not set, the driver pod name is set to "spark.app.name" suffixed by the current timestamp to avoid name conflicts. In client mode, if your application is running inside a pod, it is highly recommended to set this to the name of the pod your driver is running in. Setting this - value in client mode allows the driver to inform the cluster that your application's executor pods should be - deleted when the driver pod is deleted. + value in client mode allows the driver to become the owner of its executor pods, which in turn allows the executor + pods to be garbage collecfted by the cluster. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index fe9ae83c29409..5694adbd1adfe 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -48,8 +48,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit defaultServiceAccountToken, defaultServiceAccountCaCrt) = if (wasSparkSubmittedInClusterMode) { require(sc.conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, - "If the application is deployed using spark-submit in cluster mode, the driver pod name" + - " must be provided.") + "If the application is deployed using spark-submit in cluster mode, the driver pod name " + + "must be provided.") (KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, KUBERNETES_MASTER_INTERNAL_URL, Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), From 37537c63ff79cecc64b96c9549b5da4dbb769f4c Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 16 Jul 2018 13:11:55 -0700 Subject: [PATCH 19/25] Reword docs --- docs/running-on-kubernetes.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index fa867e84127f3..cb8e1d62bd3d2 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -119,9 +119,9 @@ This URI is the location of the example jar that is already in the Docker image. ## Client Mode -Starting with Spark 2.4.0, it is possible to run Spark applications on Kubernetes in client mode. When running a Spark -application in client mode, a separate pod is not deployed to run the driver. Your Spark driver does not need to run in -a Kubernetes pod. When running an application in client mode, it is recommended to account for the following factors: +Starting with Spark 2.4.0, it is possible to run Spark applications on Kubernetes in client mode. When your application +runs in client mode, the driver can run inside a pod or on a physical host. When running an application in client mode, +it is recommended to account for the following factors: ### Client Mode Networking From 0db7a79374f59f5cf703b1133873596c12b3a71a Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 16 Jul 2018 13:13:22 -0700 Subject: [PATCH 20/25] Fix typo --- docs/running-on-kubernetes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index cb8e1d62bd3d2..801ffbf0626de 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -604,7 +604,7 @@ specific to Spark on Kubernetes. suffixed by the current timestamp to avoid name conflicts. In client mode, if your application is running inside a pod, it is highly recommended to set this to the name of the pod your driver is running in. Setting this value in client mode allows the driver to become the owner of its executor pods, which in turn allows the executor - pods to be garbage collecfted by the cluster. + pods to be garbage collected by the cluster. From 086747e12f0af16c3479b07e59934d42ced4004b Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 20 Jul 2018 11:43:00 -0700 Subject: [PATCH 21/25] Clarify some docs --- docs/running-on-kubernetes.md | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 801ffbf0626de..5fea84f14e761 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -129,8 +129,10 @@ Spark executors must be able to connect to the Spark driver over a hostname and executors. The specific network configuration that will be required for Spark to work in client mode will vary per setup. If you run your driver inside a Kubernetes pod, you can use a [headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) to allow your -driver pod to be routable from the executors by a stable hostname. Specify the driver's hostname via `spark.driver.host` -and your spark driver's port to `spark.driver.port`. +driver pod to be routable from the executors by a stable hostname. When deploying your headless service, ensure that +the service's label selector will only match the driver pod and no other pods; it is recommended to assign your driver +pod a sufficiently unique label and to use that label in the node selector of the headless service. Specify the driver's +hostname via `spark.driver.host` and your spark driver's port to `spark.driver.port`. ### Client Mode Executor Pod Garbage Collection @@ -138,11 +140,16 @@ If you run your Spark driver in a pod, it is highly recommended to set `spark.dr When this property is set, the Spark scheduler will deploy the executor pods with an [owner reference](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/), which in turn will ensure that once the driver pod is deleted from the cluster, all of the application's executor pods will also be deleted. - -If your driver is not running inside a pod, keep in mind that the executor pods may not be deleted from the cluster -when the application exits. The Spark scheduler attempts to delete these pods, but if the network request to the API -server fails for any reason, these pods will remain in the cluster. The executor processes should exit when they cannot -reach the driver, so the executor pods should not consume resources in the cluster after your application exits. +The driver will look for a pod with the given name in the namespace specified by `spark.kubernetes.namespace`, and +all executor pods will have their owner reference field set to point to that pod. Be careful to avoid setting the +owner reference to a pod that is not actually that driver pod, or else the executors may be terminated prematurely when +the wrong pod is terminated. + +If your application is not running inside a pod, or if `spark.driver.pod.name` is not set when your application is +actually running in a pod, keep in mind that the executor pods may not be deleted from the cluster when the application +exits. The Spark scheduler attempts to delete these pods, but if the network request to the API server fails for any +reason, these pods will remain in the cluster. The executor processes should exit when they cannot reach the driver, so +the executor pods should not consume resources in the cluster after your application exits. ### Authentication Parameters From d90f753183b6e5047491af016860fd35bfd92caf Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 20 Jul 2018 11:58:33 -0700 Subject: [PATCH 22/25] Fix typo --- .../spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 96277f255fe36..0d829218cf774 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -80,7 +80,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite override def afterAll(): Unit = { testBackend.cleanUp() -} + } before { appLocator = UUID.randomUUID().toString.replaceAll("-", "") @@ -102,7 +102,6 @@ private[spark] class KubernetesSuite extends SparkFunSuite deleteDriverPod() } - protected def runSparkPiAndVerifyCompletion( appResource: String = containerLocalSparkDistroExamplesJar, driverPodChecker: Pod => Unit = doBasicDriverPodCheck, From 001a5256a7e25593c5360d72fc1ff5546fc6fc39 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 20 Jul 2018 12:03:57 -0700 Subject: [PATCH 23/25] Fix wording --- docs/running-on-kubernetes.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 5fea84f14e761..04a11aa1f8c70 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -146,10 +146,11 @@ owner reference to a pod that is not actually that driver pod, or else the execu the wrong pod is terminated. If your application is not running inside a pod, or if `spark.driver.pod.name` is not set when your application is -actually running in a pod, keep in mind that the executor pods may not be deleted from the cluster when the application -exits. The Spark scheduler attempts to delete these pods, but if the network request to the API server fails for any -reason, these pods will remain in the cluster. The executor processes should exit when they cannot reach the driver, so -the executor pods should not consume resources in the cluster after your application exits. +actually running in a pod, keep in mind that the executor pods may not be properly deleted from the cluster when the +application exits. The Spark scheduler attempts to delete these pods, but if the network request to the API server fails +for any reason, these pods will remain in the cluster. The executor processes should exit when they cannot reach the +driver, so the executor pods should not consume compute resources (cpu and memory) in the cluster after your application +exits. ### Authentication Parameters From 72c96e03fe4e49ec1c9b4bfad816e20cff67d75d Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 20 Jul 2018 12:08:06 -0700 Subject: [PATCH 24/25] More feedback --- docs/running-on-kubernetes.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 04a11aa1f8c70..97c650d0f80aa 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -131,19 +131,19 @@ setup. If you run your driver inside a Kubernetes pod, you can use a [headless service](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services) to allow your driver pod to be routable from the executors by a stable hostname. When deploying your headless service, ensure that the service's label selector will only match the driver pod and no other pods; it is recommended to assign your driver -pod a sufficiently unique label and to use that label in the node selector of the headless service. Specify the driver's +pod a sufficiently unique label and to use that label in the label selector of the headless service. Specify the driver's hostname via `spark.driver.host` and your spark driver's port to `spark.driver.port`. ### Client Mode Executor Pod Garbage Collection If you run your Spark driver in a pod, it is highly recommended to set `spark.driver.pod.name` to the name of that pod. When this property is set, the Spark scheduler will deploy the executor pods with an -[owner reference](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/), which in turn will +[OwnerReference](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/), which in turn will ensure that once the driver pod is deleted from the cluster, all of the application's executor pods will also be deleted. The driver will look for a pod with the given name in the namespace specified by `spark.kubernetes.namespace`, and -all executor pods will have their owner reference field set to point to that pod. Be careful to avoid setting the -owner reference to a pod that is not actually that driver pod, or else the executors may be terminated prematurely when -the wrong pod is terminated. +an OwnerReference pointing to that pod will be added to each executor pod's OwnerReferences list. Be careful to avoid +setting the OwnerReference to a pod that is not actually that driver pod, or else the executors may be terminated +prematurely when the wrong pod is deleted. If your application is not running inside a pod, or if `spark.driver.pod.name` is not set when your application is actually running in a pod, keep in mind that the executor pods may not be properly deleted from the cluster when the From ded1ff6081da6f0b3879f6bf63b73caf01983bea Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 25 Jul 2018 09:36:47 -0700 Subject: [PATCH 25/25] Helper method for master url parsing --- .../scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala | 2 ++ .../spark/deploy/k8s/submit/KubernetesClientApplication.scala | 4 ++-- .../scheduler/cluster/k8s/KubernetesClusterManager.scala | 4 ++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 66fff267545dc..588cd9d40f9a0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -58,4 +58,6 @@ private[spark] object KubernetesUtils { case _ => uri } } + + def parseMasterUrl(url: String): String = url.substring("k8s://".length) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index eaff47205dbbc..9398faee2ea5c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -27,7 +27,7 @@ import scala.util.control.NonFatal import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkApplication -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkKubernetesClientFactory} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkKubernetesClientFactory} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging @@ -228,7 +228,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { val namespace = kubernetesConf.namespace() // The master URL has been checked for validity already in SparkSubmit. // We just need to get rid of the "k8s://" prefix here. - val master = sparkConf.get("spark.master").substring("k8s://".length) + val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 5694adbd1adfe..9999c62c878df 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -23,9 +23,9 @@ import com.google.common.cache.CacheBuilder import io.fabric8.kubernetes.client.Config import org.apache.spark.SparkContext +import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.util.{SystemClock, ThreadUtils} @@ -56,7 +56,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) } else { (KUBERNETES_AUTH_CLIENT_MODE_PREFIX, - masterURL.substring("k8s://".length()), + KubernetesUtils.parseMasterUrl(masterURL), None, None) }