From 814fc656e21dedcb99827c76c078217a57d94bc5 Mon Sep 17 00:00:00 2001 From: Yi Tang Date: Mon, 12 Apr 2021 12:59:40 +0800 Subject: [PATCH] [FLINK-20695][ha] Clean ha data for job if globally terminated At the moment Flink only cleans up the ha data (e.g. K8s ConfigMaps, or Zookeeper nodes) while shutting down the cluster. This is not enough for a long running session cluster to which you submit multiple jobs. In this commit, we clean up the data for the particular job if it reaches a globally terminal state. This closes #15910. --- .../KubernetesHaServices.java | 5 ++ .../KubernetesHaServicesTest.java | 34 +++++++++++ .../flink/runtime/dispatcher/Dispatcher.java | 20 +++++-- .../highavailability/AbstractHaServices.java | 16 +++++ .../HighAvailabilityServices.java | 8 +++ .../nonha/AbstractNonHaServices.java | 4 ++ .../zookeeper/ZooKeeperHaServices.java | 33 +++++++++- .../DispatcherResourceCleanupTest.java | 28 +++++++++ .../AbstractHaServicesTest.java | 42 ++++++++++++- .../TestingHighAvailabilityServices.java | 12 ++++ ...TestingManualHighAvailabilityServices.java | 5 ++ .../zookeeper/ZooKeeperHaServicesTest.java | 60 ++++++++++++++++++- 12 files changed, 257 insertions(+), 10 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java index f5ff366710251..b7d35f89b6763 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java @@ -142,6 +142,11 @@ public void internalCleanup() throws Exception { .get(); } + @Override + public void internalCleanupJobData(JobID jobID) throws Exception { + kubeClient.deleteConfigMap(getLeaderNameForJobManager(jobID)).get(); + } + @Override protected String getLeaderNameForResourceManager() { return getLeaderName(RESOURCE_MANAGER_NAME); diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java index d6b69c32c04eb..cd78491551058 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesTest.java @@ -18,6 +18,9 @@ package org.apache.flink.kubernetes.highavailability; +import org.apache.flink.api.common.JobID; +import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap; import org.apache.flink.runtime.blob.VoidBlobStore; import org.junit.Test; @@ -76,4 +79,35 @@ public void testInternalCleanupShouldCleanupConfigMaps() throws Exception { } }; } + + @Test + public void testInternalJobCleanupShouldCleanupConfigMaps() throws Exception { + new Context() { + { + runTest( + () -> { + final KubernetesHaServices kubernetesHaServices = + new KubernetesHaServices( + flinkKubeClient, + executorService, + configuration, + new VoidBlobStore()); + JobID jobID = new JobID(); + String configMapName = + kubernetesHaServices.getLeaderNameForJobManager(jobID); + final KubernetesConfigMap configMap = + new TestingFlinkKubeClient.MockKubernetesConfigMap( + configMapName); + flinkKubeClient.createConfigMap(configMap); + assertThat( + flinkKubeClient.getConfigMap(configMapName).isPresent(), + is(true)); + kubernetesHaServices.internalCleanupJobData(jobID); + assertThat( + flinkKubeClient.getConfigMap(configMapName).isPresent(), + is(false)); + }); + } + }; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index f0ff52ca715ed..1ed13e2c37191 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -767,13 +767,14 @@ private CompletableFuture removeJob(JobID jobId, CleanupJobState cleanupJo private void cleanUpJobData(JobID jobId, boolean cleanupHA) { jobManagerMetricGroup.removeJob(jobId); - boolean cleanupHABlobs = false; + boolean jobGraphRemoved = false; if (cleanupHA) { try { jobGraphWriter.removeJobGraph(jobId); - // only clean up the HA blobs if we could remove the job from HA storage - cleanupHABlobs = true; + // only clean up the HA blobs and ha service data for the particular job + // if we could remove the job from HA storage + jobGraphRemoved = true; } catch (Exception e) { log.warn( "Could not properly remove job {} from submitted job graph store.", @@ -789,6 +790,17 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) { jobId, e); } + + if (jobGraphRemoved) { + try { + highAvailabilityServices.cleanupJobData(jobId); + } catch (Exception e) { + log.warn( + "Could not properly clean data for job {} stored by ha services", + jobId, + e); + } + } } else { try { jobGraphWriter.releaseJobGraph(jobId); @@ -800,7 +812,7 @@ private void cleanUpJobData(JobID jobId, boolean cleanupHA) { } } - blobServer.cleanupJob(jobId, cleanupHABlobs); + blobServer.cleanupJob(jobId, jobGraphRemoved); } /** Terminate all currently running {@link DispatcherJob}s. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java index 9a322a8d92487..4c4dfe610a26c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java @@ -205,6 +205,13 @@ public void closeAndCleanupAllData() throws Exception { logger.info("Finished cleaning up the high availability data."); } + @Override + public void cleanupJobData(JobID jobID) throws Exception { + logger.info("Clean up the high availability data for job {}.", jobID); + internalCleanupJobData(jobID); + logger.info("Finished cleaning up the high availability data for job {}.", jobID); + } + /** * Create leader election service with specified leaderName. * @@ -260,6 +267,15 @@ public void closeAndCleanupAllData() throws Exception { */ protected abstract void internalCleanup() throws Exception; + /** + * Clean up the meta data in the distributed system(e.g. Zookeeper, Kubernetes ConfigMap) for + * the specified Job. + * + * @param jobID The identifier of the job to cleanup. + * @throws Exception when do the cleanup operation on external storage. + */ + protected abstract void internalCleanupJobData(JobID jobID) throws Exception; + /** * Get the leader name for ResourceManager. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index fdb3fa7dcb535..fdf031e34e19f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -237,4 +237,12 @@ default LeaderRetrievalService getClusterRestEndpointLeaderRetriever() { * up data stored by them. */ void closeAndCleanupAllData() throws Exception; + + /** + * Deletes all data for specified job stored by these services in external stores. + * + * @param jobID The identifier of the job to cleanup. + * @throws Exception Thrown, if an exception occurred while cleaning data stored by them. + */ + void cleanupJobData(JobID jobID) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java index 188bdbbfd1c42..dfc53dca61ba0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.highavailability.nonha; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.blob.BlobStore; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; @@ -111,6 +112,9 @@ public void closeAndCleanupAllData() throws Exception { close(); } + @Override + public void cleanupJobData(JobID jobID) throws Exception {} + // ---------------------------------------------------------------------- // Helper methods // ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java index f6ab9e859d630..802162b142846 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java @@ -34,10 +34,14 @@ import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; import org.apache.flink.shaded.curator4.org.apache.curator.utils.ZKPaths; import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException; +import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.Stat; import javax.annotation.Nonnull; +import java.util.List; import java.util.concurrent.Executor; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -138,6 +142,22 @@ public void internalCleanup() throws Exception { cleanupZooKeeperPaths(); } + @Override + public void internalCleanupJobData(JobID jobID) throws Exception { + final List paths = + Stream.of( + HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH, + HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH, + HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH, + HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + .map(configuration::getString) + .map(parent -> parent + "/" + jobID) + .collect(Collectors.toList()); + for (String path : paths) { + deleteZNode(path); + } + } + @Override protected String getLeaderNameForResourceManager() { return RESOURCE_MANAGER_LEADER_PATH; @@ -168,6 +188,10 @@ private void cleanupZooKeeperPaths() throws Exception { } private void deleteOwnedZNode() throws Exception { + deleteZNode("/"); + } + + private void deleteZNode(String path) throws Exception { // delete the HA_CLUSTER_ID znode which is owned by this cluster // Since we are using Curator version 2.12 there is a bug in deleting the children @@ -176,13 +200,18 @@ private void deleteOwnedZNode() throws Exception { // The retry logic can be removed once we upgrade to Curator version >= 4.0.1. boolean zNodeDeleted = false; while (!zNodeDeleted) { + Stat stat = client.checkExists().forPath(path); + if (stat == null) { + logger.debug("znode {} has been deleted", path); + return; + } try { - client.delete().deletingChildrenIfNeeded().forPath("/"); + client.delete().deletingChildrenIfNeeded().forPath(path); zNodeDeleted = true; } catch (KeeperException.NoNodeException ignored) { // concurrent delete operation. Try again. logger.debug( - "Retrying to delete owned znode because of other concurrent delete operation."); + "Retrying to delete znode because of other concurrent delete operation."); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index fb02a7b1c0a6a..c2f7c6ffa5a8f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -131,6 +131,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { private CompletableFuture storedHABlobFuture; private CompletableFuture deleteAllHABlobsFuture; private CompletableFuture cleanupJobFuture; + private CompletableFuture cleanupJobHADataFuture; private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE; @BeforeClass @@ -153,6 +154,8 @@ public void setup() throws Exception { clearedJobLatch = new OneShotLatch(); runningJobsRegistry = new SingleRunningJobsRegistry(jobId, clearedJobLatch); highAvailabilityServices.setRunningJobsRegistry(runningJobsRegistry); + cleanupJobHADataFuture = new CompletableFuture<>(); + highAvailabilityServices.setCleanupJobDataFuture(cleanupJobHADataFuture); storedHABlobFuture = new CompletableFuture<>(); deleteAllHABlobsFuture = new CompletableFuture<>(); @@ -457,6 +460,31 @@ public void testDuplicateJobSubmissionDoesNotDeleteJobMetaData() throws Exceptio assertThatHABlobsHaveBeenRemoved(); } + @Test + public void testHaDataCleanupWhenJobFinished() throws Exception { + TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob(); + TestingJobManagerRunner jobManagerRunner = + jobManagerRunnerFactory.takeCreatedJobManagerRunner(); + finishJob(jobManagerRunner); + JobID jobID = cleanupJobHADataFuture.get(2000, TimeUnit.MILLISECONDS); + assertThat(jobID, is(this.jobId)); + } + + @Test + public void testHaDataCleanupWhenJobNotFinished() throws Exception { + TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob(); + TestingJobManagerRunner jobManagerRunner = + jobManagerRunnerFactory.takeCreatedJobManagerRunner(); + jobManagerRunner.completeResultFutureExceptionally(new JobNotFinishedException(jobId)); + try { + cleanupJobHADataFuture.get(10L, TimeUnit.MILLISECONDS); + fail("We should not delete the HA data for job."); + } catch (TimeoutException ignored) { + // expected + } + assertThat(cleanupJobHADataFuture.isDone(), is(false)); + } + private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) { takeCreatedJobManagerRunner.completeResultFuture( new ArchivedExecutionGraphBuilder() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java index a90dc61c40a1b..96e6daf27efb2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java @@ -37,9 +37,12 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.function.Consumer; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -63,7 +66,8 @@ public void testCloseAndCleanupAllDataDeletesBlobsAfterCleaningUpHAData() throws Executors.directExecutor(), testingBlobStoreService, closeOperations, - () -> closeOperations.offer(CloseOperations.HA_CLEANUP)); + () -> closeOperations.offer(CloseOperations.HA_CLEANUP), + ignored -> {}); haServices.closeAndCleanupAllData(); @@ -94,7 +98,8 @@ public void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails( closeOperations, () -> { throw new FlinkException("test exception"); - }); + }, + ignored -> {}); try { haServices.closeAndCleanupAllData(); @@ -106,6 +111,29 @@ public void testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails( assertThat(closeOperations, contains(CloseOperations.HA_CLOSE, CloseOperations.BLOB_CLOSE)); } + @Test + public void testCleanupJobData() throws Exception { + final Queue closeOperations = new ArrayDeque<>(3); + final TestingBlobStoreService testingBlobStoreService = + new TestingBlobStoreService(closeOperations); + + JobID jobID = new JobID(); + CompletableFuture jobCleanupFuture = new CompletableFuture<>(); + + final TestingHaServices haServices = + new TestingHaServices( + new Configuration(), + Executors.directExecutor(), + testingBlobStoreService, + closeOperations, + () -> {}, + jobCleanupFuture::complete); + + haServices.cleanupJobData(jobID); + JobID jobIDCleaned = jobCleanupFuture.get(); + assertThat(jobIDCleaned, is(jobID)); + } + private enum CloseOperations { HA_CLEANUP, HA_CLOSE, @@ -156,16 +184,19 @@ private static final class TestingHaServices extends AbstractHaServices { private final Queue closeOperations; private final RunnableWithException internalCleanupRunnable; + private final Consumer internalJobCleanupConsumer; private TestingHaServices( Configuration config, Executor ioExecutor, BlobStoreService blobStoreService, Queue closeOperations, - RunnableWithException internalCleanupRunnable) { + RunnableWithException internalCleanupRunnable, + Consumer internalJobCleanupConsumer) { super(config, ioExecutor, blobStoreService); this.closeOperations = closeOperations; this.internalCleanupRunnable = internalCleanupRunnable; + this.internalJobCleanupConsumer = internalJobCleanupConsumer; } @Override @@ -203,6 +234,11 @@ protected void internalCleanup() throws Exception { internalCleanupRunnable.run(); } + @Override + protected void internalCleanupJobData(JobID jobID) throws Exception { + internalJobCleanupConsumer.accept(jobID); + } + @Override protected String getLeaderNameForResourceManager() { throw new UnsupportedOperationException("Not supported by this test implementation."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 0afc9812d86e9..4434eefc852be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -72,6 +73,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private CompletableFuture closeAndCleanupAllDataFuture = new CompletableFuture<>(); + private volatile CompletableFuture jobCleanupFuture; + // ------------------------------------------------------------------------ // Setters for mock / testing implementations // ------------------------------------------------------------------------ @@ -145,6 +148,10 @@ public void setCloseAndCleanupAllDataFuture( this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture; } + public void setCleanupJobDataFuture(CompletableFuture jobCleanupFuture) { + this.jobCleanupFuture = jobCleanupFuture; + } + // ------------------------------------------------------------------------ // HA Services Methods // ------------------------------------------------------------------------ @@ -277,4 +284,9 @@ public void close() throws Exception { public void closeAndCleanupAllData() throws Exception { closeAndCleanupAllDataFuture.complete(null); } + + @Override + public void cleanupJobData(JobID jobID) { + Optional.ofNullable(jobCleanupFuture).ifPresent(f -> f.complete(jobID)); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java index 6cbb92c8cc3fc..912b48f9538e9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingManualHighAvailabilityServices.java @@ -136,6 +136,11 @@ public void closeAndCleanupAllData() throws Exception { // nothing to do } + @Override + public void cleanupJobData(JobID jobID) throws Exception { + // nothing to do + } + public void grantLeadership(JobID jobId, int index, UUID leaderId) { ManualLeaderService manualLeaderService = jobManagerLeaderServices.get(jobId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java index 81dc6b450db15..32601b9532d47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java @@ -49,9 +49,12 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -148,6 +151,41 @@ public void testCloseAndCleanupAllDataWithUncle() throws Exception { assertThat(client.checkExists().forPath(unclePath), is(notNullValue())); } + /** Tests that the ZooKeeperHaServices cleans up paths for job manager. */ + @Test + public void testCleanupJobData() throws Exception { + String rootPath = "/foo/bar/flink"; + final Configuration configuration = createConfiguration(rootPath); + String namespace = configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID); + + final List paths = + Stream.of( + HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH, + HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + .map(configuration::getString) + .map(path -> rootPath + namespace + path) + .collect(Collectors.toList()); + + final TestingBlobStoreService blobStoreService = new TestingBlobStoreService(); + + JobID jobID = new JobID(); + runCleanupTestWithJob( + configuration, + blobStoreService, + jobID, + haServices -> { + for (String path : paths) { + final List children = client.getChildren().forPath(path); + assertThat(children, hasItem(jobID.toString())); + } + haServices.cleanupJobData(jobID); + for (String path : paths) { + final List children = client.getChildren().forPath(path); + assertThat(children, not(hasItem(jobID.toString()))); + } + }); + } + private static CuratorFramework startCuratorFramework() { return CuratorFrameworkFactory.builder() .connectString(ZOO_KEEPER_RESOURCE.getConnectString()) @@ -170,6 +208,16 @@ private void runCleanupTest( TestingBlobStoreService blobStoreService, ThrowingConsumer zooKeeperHaServicesConsumer) throws Exception { + runCleanupTestWithJob( + configuration, blobStoreService, new JobID(), zooKeeperHaServicesConsumer); + } + + private void runCleanupTestWithJob( + Configuration configuration, + TestingBlobStoreService blobStoreService, + JobID jobId, + ThrowingConsumer zooKeeperHaServicesConsumer) + throws Exception { try (ZooKeeperHaServices zooKeeperHaServices = new ZooKeeperHaServices( ZooKeeperUtils.startCuratorFramework(configuration), @@ -190,13 +238,23 @@ private void runCleanupTest( resourceManagerLeaderRetriever.start(listener); resourceManagerLeaderElectionService.start( new TestingContender("foobar", resourceManagerLeaderElectionService)); - final JobID jobId = new JobID(); + LeaderElectionService jobManagerLeaderElectionService = + zooKeeperHaServices.getJobManagerLeaderElectionService(jobId); + jobManagerLeaderElectionService.start( + new TestingContender("", jobManagerLeaderElectionService)); + LeaderRetrievalService jobManagerLeaderRetriever = + zooKeeperHaServices.getJobManagerLeaderRetriever(jobId); + jobManagerLeaderRetriever.start( + new LeaderRetrievalUtils.LeaderConnectionInfoListener()); + runningJobsRegistry.setJobRunning(jobId); listener.getLeaderConnectionInfoFuture().join(); resourceManagerLeaderRetriever.stop(); resourceManagerLeaderElectionService.stop(); + jobManagerLeaderRetriever.stop(); + jobManagerLeaderElectionService.stop(); runningJobsRegistry.clearJob(jobId); zooKeeperHaServicesConsumer.accept(zooKeeperHaServices);