From b5fe711cd9f8e7486d3b1898952f9c08899332be Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 27 Aug 2020 03:35:59 -0700 Subject: [PATCH] [SPARK-32713][K8S] Support execId placeholder in executor PVC conf -c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=pvc-spark-SPARK_EXECUTOR_ID \ -c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data \ -c spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false \ --- .../features/MountVolumesFeatureStep.scala | 9 ++++++- .../MountVolumesFeatureStepSuite.scala | 25 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index e598a38e7f36f..94b5c37f96e3d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model._ import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Constants.ENV_EXECUTOR_ID private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) extends KubernetesFeatureConfigStep { @@ -56,7 +57,13 @@ private[spark] class MountVolumesFeatureStep(conf: KubernetesConf) new VolumeBuilder() .withHostPath(new HostPathVolumeSource(hostPath, "")) - case KubernetesPVCVolumeConf(claimName) => + case KubernetesPVCVolumeConf(claimNameTemplate) => + val claimName = conf match { + case c: KubernetesExecutorConf => + claimNameTemplate.replaceAll(ENV_EXECUTOR_ID, c.executorId) + case _ => + claimNameTemplate + } new VolumeBuilder() .withPersistentVolumeClaim( new PersistentVolumeClaimVolumeSource(claimName, spec.mountReadOnly)) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 0d0ed50c0927a..a9a1ec46a6e69 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -64,6 +64,31 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { } + test("SPARK-32713 Mounts parameterized persistentVolumeClaims in executors") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + "/tmp", + "", + true, + KubernetesPVCVolumeConf("pvc-spark-SPARK_EXECUTOR_ID") + ) + val driverConf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeConf)) + val driverStep = new MountVolumesFeatureStep(driverConf) + val driverPod = driverStep.configurePod(SparkPod.initialPod()) + + assert(driverPod.pod.getSpec.getVolumes.size() === 1) + val driverPVC = driverPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim + assert(driverPVC.getClaimName === "pvc-spark-SPARK_EXECUTOR_ID") + + val executorConf = KubernetesTestConf.createExecutorConf(volumes = Seq(volumeConf)) + val executorStep = new MountVolumesFeatureStep(executorConf) + val executorPod = executorStep.configurePod(SparkPod.initialPod()) + + assert(executorPod.pod.getSpec.getVolumes.size() === 1) + val executorPVC = executorPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim + assert(executorPVC.getClaimName === s"pvc-spark-${KubernetesTestConf.EXECUTOR_ID}") + } + test("Mounts emptyDir") { val volumeConf = KubernetesVolumeSpec( "testVolume",