From 680fa04edde7e6e3c5ad6a04d15246bb7f949928 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 6 Dec 2022 17:00:15 -0800 Subject: [PATCH 1/2] [SPARK-41410][K8S][FOLLOWUP] Remove PVC_COUNTER decrement --- .../spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 4188a9038aa2c..f25a0171205bf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -455,7 +455,6 @@ class ExecutorPodsAllocator( .inNamespace(namespace) .resource(createdExecutorPod) .delete() - PVC_COUNTER.decrementAndGet() throw e } } From 1ec0f4374b1a119382539b773d7638ab5ba3719e Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 6 Dec 2022 17:53:30 -0800 Subject: [PATCH 2/2] Add a test case for createion failure --- .../k8s/ExecutorPodsAllocatorSuite.scala | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index e4c3a853d18af..a066775f7dabb 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -882,6 +882,47 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1) } + test("SPARK-41410: An exception during PVC creation should not increase PVC counter") { + val prefix = "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1" + val confWithPVC = conf.clone + .set(KUBERNETES_DRIVER_OWN_PVC.key, "true") + .set(KUBERNETES_DRIVER_REUSE_PVC.key, "true") + .set(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC.key, "true") + .set(EXECUTOR_INSTANCES.key, "1") + .set(s"$prefix.mount.path", "/spark-local-dir") + .set(s"$prefix.mount.readOnly", "false") + .set(s"$prefix.option.claimName", "OnDemand") + .set(s"$prefix.option.sizeLimit", "200Gi") + .set(s"$prefix.option.storageClass", "gp3") + + when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), + meq(kubernetesClient), any(classOf[ResourceProfile]))) + .thenAnswer((invocation: InvocationOnMock) => { + val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) + KubernetesExecutorSpec( + executorPodWithIdAndVolume(k8sConf.executorId.toInt, k8sConf.resourceProfileId), + Seq(persistentVolumeClaim("pvc-0", "gp3", "200Gi"))) + }) + + podsAllocatorUnderTest = new ExecutorPodsAllocator( + confWithPVC, secMgr, executorBuilder, + kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + + val startTime = Instant.now.toEpochMilli + waitForExecutorPodsClock.setTime(startTime) + + val counter = PrivateMethod[AtomicInteger](Symbol("PVC_COUNTER"))() + assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) + + when(pvcResource.create()).thenThrow(new KubernetesClientException("PVC fails to create")) + intercept[KubernetesClientException] { + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + } + assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + } + private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)