From 6fca505cbffe0e57a91b0b0060fd8ab1cd707e8a Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Sat, 15 Jun 2019 04:09:34 +0800 Subject: [PATCH 1/9] [SPARK-28042][K8S] Support using hostpath volume mount as local storage --- .../k8s/features/LocalDirsFeatureStep.scala | 46 ++++++++++++++----- .../k8s/submit/KubernetesDriverBuilder.scala | 4 +- .../k8s/KubernetesExecutorBuilder.scala | 4 +- .../features/LocalDirsFeatureStepSuite.scala | 27 ++++++++++- 4 files changed, 64 insertions(+), 17 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 19ed2df5551db..571b085dfe614 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -18,7 +18,8 @@ package org.apache.spark.deploy.k8s.features import java.util.UUID -import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder} +import collection.JavaConverters._ +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder, VolumeMount, VolumeMountBuilder} import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ @@ -43,20 +44,34 @@ private[spark] class LocalDirsFeatureStep( val localDirVolumes = resolvedLocalDirs .zipWithIndex .map { case (localDir, index) => - new VolumeBuilder() - .withName(s"spark-local-dir-${index + 1}") - .withNewEmptyDir() - .withMedium(if (useLocalDirTmpFs) "Memory" else null) - .endEmptyDir() - .build() + val name = s"spark-local-dir-${index + 1}" + hasVolume(pod, name) match { + case true => + pod.pod.getSpec.getVolumes().asScala.find(v => v.getName.equals(name)).get + case false => + new VolumeBuilder() + .withName(s"spark-local-dir-${index + 1}") + .withNewEmptyDir() + .withMedium(if (useLocalDirTmpFs) "Memory" else null) + .endEmptyDir() + .build() + } } val localDirVolumeMounts = localDirVolumes .zip(resolvedLocalDirs) .map { case (localDirVolume, localDirPath) => - new VolumeMountBuilder() - .withName(localDirVolume.getName) - .withMountPath(localDirPath) - .build() + hasVolumeMount(pod, localDirVolume.getName, localDirPath) match { + case true => + pod.container.getVolumeMounts().asScala + .find(m => m.getName.equals(localDirVolume.getName) + && m.getMountPath.equals(localDirPath)) + .get + case false => + new VolumeMountBuilder() + .withName(localDirVolume.getName) + .withMountPath(localDirPath) + .build() + } } val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() @@ -72,4 +87,13 @@ private[spark] class LocalDirsFeatureStep( .build() SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) } + + def hasVolume(pod: SparkPod, name: String): Boolean = { + pod.pod.getSpec().getVolumes().asScala.exists(v => v.getName.equals(name)) + } + + def hasVolumeMount(pod: SparkPod, name: String, path: String): Boolean = { + pod.container.getVolumeMounts().asScala + .exists(m => m.getName.equals(name) && m.getMountPath.equals(path)) + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 57e4060bc85b9..43639a3b7dc1b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -43,12 +43,12 @@ private[spark] class KubernetesDriverBuilder { new DriverServiceFeatureStep(conf), new MountSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf), - new LocalDirsFeatureStep(conf), new MountVolumesFeatureStep(conf), new DriverCommandFeatureStep(conf), new HadoopConfDriverFeatureStep(conf), new KerberosConfDriverFeatureStep(conf), - new PodTemplateConfigMapStep(conf)) + new PodTemplateConfigMapStep(conf), + new LocalDirsFeatureStep(conf)) val spec = KubernetesDriverSpec( initialPod, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 48aa2c56d4d69..35d513c410680 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -43,8 +43,8 @@ private[spark] class KubernetesExecutorBuilder { new BasicExecutorFeatureStep(conf, secMgr), new MountSecretsFeatureStep(conf), new EnvSecretsFeatureStep(conf), - new LocalDirsFeatureStep(conf), - new MountVolumesFeatureStep(conf)) + new MountVolumesFeatureStep(conf), + new LocalDirsFeatureStep(conf)) features.foldLeft(initialPod) { case (pod, feature) => feature.configurePod(pod) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 8f34ce5c6b94f..25d15d9c4274b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -19,9 +19,8 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder} import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesTestConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesHostPathVolumeConf, KubernetesTestConf, KubernetesVolumeSpec, SparkPod} import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.deploy.k8s.submit.JavaMainAppResource import org.apache.spark.util.SparkConfWithEnv class LocalDirsFeatureStepSuite extends SparkFunSuite { @@ -116,4 +115,28 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite { .withValue(defaultLocalDir) .build()) } + + test("local dir on mounted volume") { + val volumeConf = KubernetesVolumeSpec( + "spark-local-dir-1", + "/tmp", + "", + false, + KubernetesHostPathVolumeConf("/hostPath/tmp") + ) + val mountVolumeConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) + val mountVolumeStep = new MountVolumesFeatureStep(mountVolumeConf) + val configuredPod = mountVolumeStep.configurePod(SparkPod.initialPod()) + + val sparkConf = new SparkConfWithEnv(Map("SPARK_LOCAL_DIRS" -> "/tmp")) + val localDirConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) + val localDirStep = new LocalDirsFeatureStep(localDirConf, defaultLocalDir) + val newConfiguredPod = localDirStep.configurePod(configuredPod) + + assert(newConfiguredPod.pod.getSpec.getVolumes.size() === 1) + assert(newConfiguredPod.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/hostPath/tmp") + assert(newConfiguredPod.container.getVolumeMounts.size() === 1) + assert(newConfiguredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") + assert(newConfiguredPod.container.getVolumeMounts.get(0).getName === "testVolume") + } } From ef66c87e4bbbf3f6585d420e9e2ffb05b97d5520 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Sun, 16 Jun 2019 09:32:41 +0800 Subject: [PATCH 2/9] SPARK-28042: fix unit test and address comments --- .../deploy/k8s/features/LocalDirsFeatureStep.scala | 11 ++++++----- .../k8s/features/LocalDirsFeatureStepSuite.scala | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 571b085dfe614..9aa45ee8ad6a1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -63,9 +63,9 @@ private[spark] class LocalDirsFeatureStep( hasVolumeMount(pod, localDirVolume.getName, localDirPath) match { case true => pod.container.getVolumeMounts().asScala - .find(m => m.getName.equals(localDirVolume.getName) - && m.getMountPath.equals(localDirPath)) - .get + .find(m => m.getName.equals(localDirVolume.getName) + && m.getMountPath.equals(localDirPath)) + .get case false => new VolumeMountBuilder() .withName(localDirVolume.getName) @@ -75,7 +75,7 @@ private[spark] class LocalDirsFeatureStep( } val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() - .addToVolumes(localDirVolumes: _*) + .addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, v.getName)): _*) .endSpec() .build() val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) @@ -83,7 +83,8 @@ private[spark] class LocalDirsFeatureStep( .withName("SPARK_LOCAL_DIRS") .withValue(resolvedLocalDirs.mkString(",")) .endEnv() - .addToVolumeMounts(localDirVolumeMounts: _*) + .addToVolumeMounts(localDirVolumeMounts + .filter(m => !hasVolumeMount(pod, m.getName, m.getMountPath)): _*) .build() SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 25d15d9c4274b..92c8fd4e5d39d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -137,6 +137,6 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite { assert(newConfiguredPod.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/hostPath/tmp") assert(newConfiguredPod.container.getVolumeMounts.size() === 1) assert(newConfiguredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") - assert(newConfiguredPod.container.getVolumeMounts.get(0).getName === "testVolume") + assert(newConfiguredPod.container.getVolumeMounts.get(0).getName === "spark-local-dir-1") } } From 5610fe4ae6ce500de3105b9904ab577c747a33c5 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Sun, 16 Jun 2019 10:27:20 +0800 Subject: [PATCH 3/9] SPARK-28042: refactor checks according to comments --- .../k8s/features/LocalDirsFeatureStep.scala | 33 ++++++++----------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 9aa45ee8ad6a1..b1152af43489f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.k8s.features import java.util.UUID import collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, VolumeBuilder, VolumeMount, VolumeMountBuilder} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ @@ -45,10 +45,9 @@ private[spark] class LocalDirsFeatureStep( .zipWithIndex .map { case (localDir, index) => val name = s"spark-local-dir-${index + 1}" - hasVolume(pod, name) match { - case true => - pod.pod.getSpec.getVolumes().asScala.find(v => v.getName.equals(name)).get - case false => + findVolume(pod, name) match { + case Some(volume) => volume + case None => new VolumeBuilder() .withName(s"spark-local-dir-${index + 1}") .withNewEmptyDir() @@ -60,13 +59,9 @@ private[spark] class LocalDirsFeatureStep( val localDirVolumeMounts = localDirVolumes .zip(resolvedLocalDirs) .map { case (localDirVolume, localDirPath) => - hasVolumeMount(pod, localDirVolume.getName, localDirPath) match { - case true => - pod.container.getVolumeMounts().asScala - .find(m => m.getName.equals(localDirVolume.getName) - && m.getMountPath.equals(localDirPath)) - .get - case false => + findVolumeMount(pod, localDirVolume.getName, localDirPath) match { + case Some(volumeMount) => volumeMount + case None => new VolumeMountBuilder() .withName(localDirVolume.getName) .withMountPath(localDirPath) @@ -75,7 +70,7 @@ private[spark] class LocalDirsFeatureStep( } val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() - .addToVolumes(localDirVolumes.filter(v => !hasVolume(pod, v.getName)): _*) + .addToVolumes(localDirVolumes.filter(v => findVolume(pod, v.getName).isEmpty): _*) .endSpec() .build() val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) @@ -84,17 +79,17 @@ private[spark] class LocalDirsFeatureStep( .withValue(resolvedLocalDirs.mkString(",")) .endEnv() .addToVolumeMounts(localDirVolumeMounts - .filter(m => !hasVolumeMount(pod, m.getName, m.getMountPath)): _*) + .filter(m => findVolumeMount(pod, m.getName, m.getMountPath).isEmpty): _*) .build() SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) } - def hasVolume(pod: SparkPod, name: String): Boolean = { - pod.pod.getSpec().getVolumes().asScala.exists(v => v.getName.equals(name)) + def findVolume(pod: SparkPod, name: String): Option[Volume] = { + pod.pod.getSpec.getVolumes.asScala.find(v => v.getName.equals(name)) } - def hasVolumeMount(pod: SparkPod, name: String, path: String): Boolean = { - pod.container.getVolumeMounts().asScala - .exists(m => m.getName.equals(name) && m.getMountPath.equals(path)) + def findVolumeMount(pod: SparkPod, name: String, path: String): Option[VolumeMount] = { + pod.container.getVolumeMounts.asScala + .find(m => m.getName.equals(name) && m.getMountPath.equals(path)) } } From f071c2c6dce97fe300995212a16a6c33ae9b5a32 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Wed, 24 Jul 2019 11:10:07 +0800 Subject: [PATCH 4/9] Address comments --- .../k8s/features/LocalDirsFeatureStep.scala | 15 ++++++++------- .../k8s/features/LocalDirsFeatureStepSuite.scala | 6 +++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index b1152af43489f..65dfcc0103063 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -41,15 +41,14 @@ private[spark] class LocalDirsFeatureStep( private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { + val prefix = "spark-local-dir-" val localDirVolumes = resolvedLocalDirs .zipWithIndex - .map { case (localDir, index) => - val name = s"spark-local-dir-${index + 1}" - findVolume(pod, name) match { - case Some(volume) => volume + .map { case (localDir, index) => findVolumeMount(pod, prefix, localDir) match { + case Some(volumeMount) => findVolume(pod, volumeMount.getName).get case None => new VolumeBuilder() - .withName(s"spark-local-dir-${index + 1}") + .withName(s"$prefix${index + 1}") .withNewEmptyDir() .withMedium(if (useLocalDirTmpFs) "Memory" else null) .endEmptyDir() @@ -84,12 +83,14 @@ private[spark] class LocalDirsFeatureStep( SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) } + def findVolume(pod: SparkPod, name: String): Option[Volume] = { pod.pod.getSpec.getVolumes.asScala.find(v => v.getName.equals(name)) } - def findVolumeMount(pod: SparkPod, name: String, path: String): Option[VolumeMount] = { + def findVolumeMount(pod: SparkPod, prefix: String, path: String): Option[VolumeMount] = { pod.container.getVolumeMounts.asScala - .find(m => m.getName.equals(name) && m.getMountPath.equals(path)) + .find(m => m.getName.startsWith(prefix) && m.getMountPath.equals(path)) } + } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 92c8fd4e5d39d..8cec2c3f65937 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -118,7 +118,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite { test("local dir on mounted volume") { val volumeConf = KubernetesVolumeSpec( - "spark-local-dir-1", + "spark-local-dir-test", "/tmp", "", false, @@ -129,7 +129,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite { val configuredPod = mountVolumeStep.configurePod(SparkPod.initialPod()) val sparkConf = new SparkConfWithEnv(Map("SPARK_LOCAL_DIRS" -> "/tmp")) - val localDirConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) + val localDirConf = KubernetesTestConf.createDriverConf(sparkConf) val localDirStep = new LocalDirsFeatureStep(localDirConf, defaultLocalDir) val newConfiguredPod = localDirStep.configurePod(configuredPod) @@ -137,6 +137,6 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite { assert(newConfiguredPod.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/hostPath/tmp") assert(newConfiguredPod.container.getVolumeMounts.size() === 1) assert(newConfiguredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") - assert(newConfiguredPod.container.getVolumeMounts.get(0).getName === "spark-local-dir-1") + assert(newConfiguredPod.container.getVolumeMounts.get(0).getName === "spark-local-dir-test") } } From 45c8bc5b4a80a7b182186e114bab2fb7affb98ed Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Wed, 24 Jul 2019 11:17:49 +0800 Subject: [PATCH 5/9] Update documents as well. --- docs/running-on-kubernetes.md | 11 ++++++++++- .../deploy/k8s/features/LocalDirsFeatureStep.scala | 7 +++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index f1000346cba0a..60d2c2967fe1f 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -285,7 +285,16 @@ The configuration properties for mounting volumes into the executor pods use pre ## Local Storage -Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately. +Spark supports using volumes to spill data during shuffles and other operations. To use a volume as local storage, the volume's name should be set with `spark-local-dir` prefix and mount path should be set in the `spark.local.dir`, for example: + +``` +--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.path= +--conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.readOnly=false +--conf spark.local.dir= +``` + + +If no volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately. `emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 65dfcc0103063..8ab3bbd255897 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.k8s.features import java.util.UUID import collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} +import io.fabric8.kubernetes.api.model._ import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ @@ -44,7 +44,8 @@ private[spark] class LocalDirsFeatureStep( val prefix = "spark-local-dir-" val localDirVolumes = resolvedLocalDirs .zipWithIndex - .map { case (localDir, index) => findVolumeMount(pod, prefix, localDir) match { + .map { + case (localDir, index) => findVolumeMount(pod, prefix, localDir) match { case Some(volumeMount) => findVolume(pod, volumeMount.getName).get case None => new VolumeBuilder() @@ -83,7 +84,6 @@ private[spark] class LocalDirsFeatureStep( SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) } - def findVolume(pod: SparkPod, name: String): Option[Volume] = { pod.pod.getSpec.getVolumes.asScala.find(v => v.getName.equals(name)) } @@ -92,5 +92,4 @@ private[spark] class LocalDirsFeatureStep( pod.container.getVolumeMounts.asScala .find(m => m.getName.startsWith(prefix) && m.getMountPath.equals(path)) } - } From 9392dad701a8ec8cda1c3bd2dd55be0e8979da99 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Wed, 24 Jul 2019 11:43:02 +0800 Subject: [PATCH 6/9] update document --- docs/running-on-kubernetes.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 60d2c2967fe1f..6466c222c8fd5 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -285,7 +285,7 @@ The configuration properties for mounting volumes into the executor pods use pre ## Local Storage -Spark supports using volumes to spill data during shuffles and other operations. To use a volume as local storage, the volume's name should be set with `spark-local-dir` prefix and mount path should be set in the `spark.local.dir`, for example: +Spark supports using volumes to spill data during shuffles and other operations. To use a volume as local storage, the volume's name should starts with `spark-local-dir-` and the mount path should be set in the spark configuration `spark.local.dir` or in the pod environment variable `SPARK_LOCAL_DIRS`, for example: ``` --conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.path= @@ -294,7 +294,7 @@ Spark supports using volumes to spill data during shuffles and other operations. ``` -If no volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately. +If none volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately. `emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod. From 6e5fcf64472cd25785673b3c9599cc59360d9381 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Thu, 25 Jul 2019 11:24:10 +0800 Subject: [PATCH 7/9] Don't set configuration when specific volume is set --- docs/running-on-kubernetes.md | 3 +- .../k8s/features/LocalDirsFeatureStep.scala | 72 ++++++++++--------- .../features/LocalDirsFeatureStepSuite.scala | 7 +- 3 files changed, 44 insertions(+), 38 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 6466c222c8fd5..213a64f6d8fb7 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -285,12 +285,11 @@ The configuration properties for mounting volumes into the executor pods use pre ## Local Storage -Spark supports using volumes to spill data during shuffles and other operations. To use a volume as local storage, the volume's name should starts with `spark-local-dir-` and the mount path should be set in the spark configuration `spark.local.dir` or in the pod environment variable `SPARK_LOCAL_DIRS`, for example: +Spark supports using volumes to spill data during shuffles and other operations. To use a volume as local storage, the volume's name should starts with `spark-local-dir-`, for example: ``` --conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.path= --conf spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.readOnly=false ---conf spark.local.dir= ``` diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 8ab3bbd255897..a6d01dc030b5d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -41,55 +41,57 @@ private[spark] class LocalDirsFeatureStep( private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { - val prefix = "spark-local-dir-" - val localDirVolumes = resolvedLocalDirs - .zipWithIndex - .map { - case (localDir, index) => findVolumeMount(pod, prefix, localDir) match { - case Some(volumeMount) => findVolume(pod, volumeMount.getName).get - case None => - new VolumeBuilder() - .withName(s"$prefix${index + 1}") - .withNewEmptyDir() + var localDirs = findLocalDirVolumeMount(pod) + var localDirVolumes : Seq[Volume] = Seq() + var localDirVolumeMounts : Seq[VolumeMount] = Seq() + + if (localDirs.isEmpty) { + localDirs = resolvedLocalDirs.toSeq + localDirVolumes = resolvedLocalDirs + .zipWithIndex + .map { case (_, index) => + new VolumeBuilder() + .withName(s"spark-local-dir-${index + 1}") + .withNewEmptyDir() .withMedium(if (useLocalDirTmpFs) "Memory" else null) - .endEmptyDir() - .build() - } - } - val localDirVolumeMounts = localDirVolumes - .zip(resolvedLocalDirs) - .map { case (localDirVolume, localDirPath) => - findVolumeMount(pod, localDirVolume.getName, localDirPath) match { - case Some(volumeMount) => volumeMount - case None => - new VolumeMountBuilder() - .withName(localDirVolume.getName) - .withMountPath(localDirPath) - .build() + .endEmptyDir() + .build() } - } + + localDirVolumeMounts = localDirVolumes + .zip(resolvedLocalDirs) + .map { case (localDirVolume, localDirPath) => + new VolumeMountBuilder() + .withName(localDirVolume.getName) + .withMountPath(localDirPath) + .build() + } + } + val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() - .addToVolumes(localDirVolumes.filter(v => findVolume(pod, v.getName).isEmpty): _*) + .addToVolumes(localDirVolumes: _*) .endSpec() .build() val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) .addNewEnv() .withName("SPARK_LOCAL_DIRS") - .withValue(resolvedLocalDirs.mkString(",")) + .withValue(localDirs.mkString(",")) .endEnv() - .addToVolumeMounts(localDirVolumeMounts - .filter(m => findVolumeMount(pod, m.getName, m.getMountPath).isEmpty): _*) + .addToVolumeMounts(localDirVolumeMounts: _*) .build() SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) } - def findVolume(pod: SparkPod, name: String): Option[Volume] = { - pod.pod.getSpec.getVolumes.asScala.find(v => v.getName.equals(name)) - } + def findLocalDirVolumeMount(pod: SparkPod): Seq[String] = { + val localDirVolumes = pod.pod.getSpec.getVolumes.asScala + .filter(v => v.getName.startsWith("spark-local-dir-")) - def findVolumeMount(pod: SparkPod, prefix: String, path: String): Option[VolumeMount] = { - pod.container.getVolumeMounts.asScala - .find(m => m.getName.startsWith(prefix) && m.getMountPath.equals(path)) + localDirVolumes.map { volume => pod.container.getVolumeMounts.asScala + .find(m => m.getName.equals(volume.getName)) match { + case Some(m) => m.getMountPath + case _ => "" + } + }.filter(s => s.length > 0) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 8cec2c3f65937..7ca5e174d3c3a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -128,7 +128,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite { val mountVolumeStep = new MountVolumesFeatureStep(mountVolumeConf) val configuredPod = mountVolumeStep.configurePod(SparkPod.initialPod()) - val sparkConf = new SparkConfWithEnv(Map("SPARK_LOCAL_DIRS" -> "/tmp")) + val sparkConf = new SparkConfWithEnv(Map()) val localDirConf = KubernetesTestConf.createDriverConf(sparkConf) val localDirStep = new LocalDirsFeatureStep(localDirConf, defaultLocalDir) val newConfiguredPod = localDirStep.configurePod(configuredPod) @@ -138,5 +138,10 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite { assert(newConfiguredPod.container.getVolumeMounts.size() === 1) assert(newConfiguredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") assert(newConfiguredPod.container.getVolumeMounts.get(0).getName === "spark-local-dir-test") + assert(newConfiguredPod.container.getEnv.get(0) === + new EnvVarBuilder() + .withName("SPARK_LOCAL_DIRS") + .withValue("/tmp") + .build()) } } From 2abb8e98bdadb96b3eba068848100b5030f70887 Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Fri, 26 Jul 2019 11:09:45 +0800 Subject: [PATCH 8/9] address comments --- docs/running-on-kubernetes.md | 2 +- .../k8s/features/LocalDirsFeatureStep.scala | 31 +++++++++---------- .../features/LocalDirsFeatureStepSuite.scala | 11 +++---- 3 files changed, 19 insertions(+), 25 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 213a64f6d8fb7..bb6a62b28d971 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -293,7 +293,7 @@ Spark supports using volumes to spill data during shuffles and other operations. ``` -If none volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately. +If no volume is set as local storage, Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `spark.local.dir` or the environment variable `SPARK_LOCAL_DIRS` . If no directories are explicitly specified then a default directory is created and configured appropriately. `emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index a6d01dc030b5d..75f6c7c917430 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -18,8 +18,8 @@ package org.apache.spark.deploy.k8s.features import java.util.UUID -import collection.JavaConverters._ import io.fabric8.kubernetes.api.model._ +import scala.collection.JavaConverters._ import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ @@ -29,15 +29,6 @@ private[spark] class LocalDirsFeatureStep( defaultLocalDir: String = s"/var/data/spark-${UUID.randomUUID}") extends KubernetesFeatureConfigStep { - // Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system - // property - we want to instead default to mounting an emptydir volume that doesn't already - // exist in the image. - // We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already - // a bit opinionated about YARN and Mesos. - private val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS")) - .orElse(conf.getOption("spark.local.dir")) - .getOrElse(defaultLocalDir) - .split(",") private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { @@ -46,6 +37,15 @@ private[spark] class LocalDirsFeatureStep( var localDirVolumeMounts : Seq[VolumeMount] = Seq() if (localDirs.isEmpty) { + // Cannot use Utils.getConfiguredLocalDirs because that will default to the Java system + // property - we want to instead default to mounting an emptydir volume that doesn't already + // exist in the image. + // We could make utils.getConfiguredLocalDirs opinionated about Kubernetes, as it is already + // a bit opinionated about YARN and Mesos. + val resolvedLocalDirs = Option(conf.sparkConf.getenv("SPARK_LOCAL_DIRS")) + .orElse(conf.getOption("spark.local.dir")) + .getOrElse(defaultLocalDir) + .split(",") localDirs = resolvedLocalDirs.toSeq localDirVolumes = resolvedLocalDirs .zipWithIndex @@ -85,13 +85,10 @@ private[spark] class LocalDirsFeatureStep( def findLocalDirVolumeMount(pod: SparkPod): Seq[String] = { val localDirVolumes = pod.pod.getSpec.getVolumes.asScala - .filter(v => v.getName.startsWith("spark-local-dir-")) + .filter(_.getName.startsWith("spark-local-dir-")) - localDirVolumes.map { volume => pod.container.getVolumeMounts.asScala - .find(m => m.getName.equals(volume.getName)) match { - case Some(m) => m.getMountPath - case _ => "" - } - }.filter(s => s.length > 0) + localDirVolumes.flatMap { volume => + pod.container.getVolumeMounts.asScala + .filter(_.getName == volume.getName).map(_.getMountPath) } } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 7ca5e174d3c3a..13bac4360083e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder} import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesHostPathVolumeConf, KubernetesTestConf, KubernetesVolumeSpec, SparkPod} +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.util.SparkConfWithEnv @@ -124,13 +124,10 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite { false, KubernetesHostPathVolumeConf("/hostPath/tmp") ) - val mountVolumeConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) - val mountVolumeStep = new MountVolumesFeatureStep(mountVolumeConf) + val kubernetesConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) + val mountVolumeStep = new MountVolumesFeatureStep(kubernetesConf) val configuredPod = mountVolumeStep.configurePod(SparkPod.initialPod()) - - val sparkConf = new SparkConfWithEnv(Map()) - val localDirConf = KubernetesTestConf.createDriverConf(sparkConf) - val localDirStep = new LocalDirsFeatureStep(localDirConf, defaultLocalDir) + val localDirStep = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir) val newConfiguredPod = localDirStep.configurePod(configuredPod) assert(newConfiguredPod.pod.getSpec.getVolumes.size() === 1) From c29dd7ae46ce4624898864100cfa6069e1d4115a Mon Sep 17 00:00:00 2001 From: Junjie Chen Date: Sun, 28 Jul 2019 19:05:07 +0800 Subject: [PATCH 9/9] fix coding style --- .../k8s/features/LocalDirsFeatureStep.scala | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index 75f6c7c917430..91edee72fc75a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -18,9 +18,10 @@ package org.apache.spark.deploy.k8s.features import java.util.UUID -import io.fabric8.kubernetes.api.model._ import scala.collection.JavaConverters._ +import io.fabric8.kubernetes.api.model._ + import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ @@ -32,7 +33,9 @@ private[spark] class LocalDirsFeatureStep( private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) override def configurePod(pod: SparkPod): SparkPod = { - var localDirs = findLocalDirVolumeMount(pod) + var localDirs = pod.container.getVolumeMounts.asScala + .filter(_.getName.startsWith("spark-local-dir-")) + .map(_.getMountPath) var localDirVolumes : Seq[Volume] = Seq() var localDirVolumeMounts : Seq[VolumeMount] = Seq() @@ -46,7 +49,7 @@ private[spark] class LocalDirsFeatureStep( .orElse(conf.getOption("spark.local.dir")) .getOrElse(defaultLocalDir) .split(",") - localDirs = resolvedLocalDirs.toSeq + localDirs = resolvedLocalDirs.toBuffer localDirVolumes = resolvedLocalDirs .zipWithIndex .map { case (_, index) => @@ -82,13 +85,4 @@ private[spark] class LocalDirsFeatureStep( .build() SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) } - - def findLocalDirVolumeMount(pod: SparkPod): Seq[String] = { - val localDirVolumes = pod.pod.getSpec.getVolumes.asScala - .filter(_.getName.startsWith("spark-local-dir-")) - - localDirVolumes.flatMap { volume => - pod.container.getVolumeMounts.asScala - .filter(_.getName == volume.getName).map(_.getMountPath) } - } }