From 080cdcaa8b537ffc9b7caf8275f7b271c57be8fd Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Wed, 3 Jul 2019 14:07:00 +0200 Subject: [PATCH 1/5] [FLINK-13060][coordination] Respect restart constraints in new RegionFailover strategy --- ...aptedRestartPipelinedRegionStrategyNG.java | 9 +++- ...egionStrategyNGConcurrentFailoverTest.java | 2 + ...PipelinedRegionStrategyNGFailoverTest.java | 49 ++++++++++++++++++- 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java index 5fddac3d63113..3578ae8d9dc49 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java @@ -107,11 +107,16 @@ protected void restartTasks(final Set verticesToRestart) { FutureUtils.assertNoException( cancelTasks(verticesToRestart) - .thenRunAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor()) + .thenApply(ignored -> createResetAndRescheduleTasksRunnable(globalModVersion, vertexVersions)) + .thenAccept(restartAction -> executionGraph.getRestartStrategy() + .restart( + restartAction::run, + executionGraph.getJobMasterMainThreadExecutor() + )) .handle(failGlobalOnError())); } - private Runnable resetAndRescheduleTasks(final long globalModVersion, final Set vertexVersions) { + private Runnable createResetAndRescheduleTasksRunnable(final long globalModVersion, final Set vertexVersions) { return () -> { if (!isLocalFailoverValid(globalModVersion)) { LOG.info("Skip current region failover as a global failover is ongoing."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java index 20fb083792e87..7da846e638a1b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java @@ -125,7 +125,9 @@ public void testConcurrentRegionFailovers() throws Exception { // complete region failover blocker to trigger region failover recovery failoverStrategy.getBlockerFuture().complete(null); + manuallyTriggeredRestartStrategy.triggerAll(); manualMainThreadExecutor.triggerAll(); + manualMainThreadExecutor.triggerScheduledTasks(); // verify that all tasks are recovered and no task is restarted more than once assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java index c4c1c32077412..1a720b3ae0826 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java @@ -26,10 +26,12 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; -import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; import org.apache.flink.runtime.instance.SlotSharingGroupId; @@ -117,6 +119,7 @@ public void testRegionFailoverInEagerMode() throws Exception { // vertices { ev11, ev21 } should be affected ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); manualMainThreadExecutor.triggerAll(); + manualMainThreadExecutor.triggerScheduledTasks(); // verify vertex states and complete cancellation assertVertexInState(ExecutionState.FAILED, ev11); @@ -125,6 +128,7 @@ public void testRegionFailoverInEagerMode() throws Exception { assertVertexInState(ExecutionState.DEPLOYING, ev22); ev21.getCurrentExecutionAttempt().completeCancelling(); manualMainThreadExecutor.triggerAll(); + manualMainThreadExecutor.triggerScheduledTasks(); // verify vertex states // in eager mode, all affected vertices should be scheduled in failover @@ -168,6 +172,7 @@ public void testRegionFailoverForRegionInternalErrorsInLazyMode() throws Excepti // regions {ev11}, {ev21}, {ev22} should be affected ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); manualMainThreadExecutor.triggerAll(); + manualMainThreadExecutor.triggerScheduledTasks(); // verify vertex states // only vertices with consumable inputs can be scheduled @@ -249,6 +254,45 @@ public void testNoRestart() throws Exception { assertEquals(JobStatus.FAILED, eg.getState()); } + /** + * Tests that the execution of the restart logic of the failover strategy is dependent on the restart strategy + * calling {@link RestartCallback#triggerFullRecovery()}. + */ + @Test + public void testFailoverExecutionDependentOnRestartStrategyRecoveryTrigger() throws Exception { + final JobGraph jobGraph = createBatchJobGraph(); + final CompletableFuture restartCallFuture = new CompletableFuture<>(); + final RestartStrategy restartStrategy = new RestartStrategy() { + @Override + public boolean canRestart() { + return true; + } + + @Override + public void restart(RestartCallback restarter, ScheduledExecutor executor) { + restartCallFuture.complete(null); + } + }; + final ExecutionGraph eg = createExecutionGraph(jobGraph, restartStrategy); + + final ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + ev.fail(new Exception("Test Exception")); + + manualMainThreadExecutor.triggerAll(); + + // the entire failover-procedure is being halted by the restart strategy not doing anything + // the only thing the failover strategy should do is cancel tasks that require it + + // sanity check to ensure we actually called into the restart strategy + assertEquals(restartCallFuture.isDone(),true); + // 3 out of 4 tasks will be canceled, and removed from the set of registered executions + assertEquals(eg.getRegisteredExecutions().size(), 1); + // no job state change should occur; in case of a failover we never switch to RESTARTING/CANCELED + // the important thing is that we don't switch to failed which would imply that we started a global failover + assertEquals(JobStatus.RUNNING, eg.getState()); + } + @Test public void testFailGlobalIfErrorOnRestartingTasks() throws Exception { final JobGraph jobGraph = createStreamingJobGraph(); @@ -267,6 +311,7 @@ public void testFailGlobalIfErrorOnRestartingTasks() throws Exception { completeCancelling(ev11, ev12, ev21, ev22); manualMainThreadExecutor.triggerAll(); + manualMainThreadExecutor.triggerScheduledTasks(); final long globalModVersionAfterFailure = eg.getGlobalModVersion(); @@ -340,7 +385,7 @@ private JobGraph createBatchJobGraph() { } private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception { - return createExecutionGraph(jobGraph, new InfiniteDelayRestartStrategy(10)); + return createExecutionGraph(jobGraph, new FixedDelayRestartStrategy(10, 0)); } private ExecutionGraph createExecutionGraph( From 1f9db6e591aa934acb57dd5bfb0a54fade757193 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Thu, 4 Jul 2019 13:21:55 +0200 Subject: [PATCH 2/5] address comments --- ...aptedRestartPipelinedRegionStrategyNG.java | 19 ++++++++++++------- ...PipelinedRegionStrategyNGFailoverTest.java | 15 ++------------- 2 files changed, 14 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java index 3578ae8d9dc49..00f429798c294 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java @@ -31,6 +31,8 @@ import org.apache.flink.runtime.executiongraph.SchedulingUtils; import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartCallback; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; @@ -107,16 +109,19 @@ protected void restartTasks(final Set verticesToRestart) { FutureUtils.assertNoException( cancelTasks(verticesToRestart) - .thenApply(ignored -> createResetAndRescheduleTasksRunnable(globalModVersion, vertexVersions)) - .thenAccept(restartAction -> executionGraph.getRestartStrategy() - .restart( - restartAction::run, - executionGraph.getJobMasterMainThreadExecutor() - )) + .thenRun(scheduleRestart(globalModVersion, vertexVersions)) .handle(failGlobalOnError())); } - private Runnable createResetAndRescheduleTasksRunnable(final long globalModVersion, final Set vertexVersions) { + private Runnable scheduleRestart(final long globalModVersion, final Set vertexVersions) { + final RestartStrategy restartStrategy = executionGraph.getRestartStrategy(); + return () -> restartStrategy.restart( + resetAndRescheduleTasks(globalModVersion, vertexVersions), + executionGraph.getJobMasterMainThreadExecutor() + ); + } + + private RestartCallback resetAndRescheduleTasks(final long globalModVersion, final Set vertexVersions) { return () -> { if (!isLocalFailoverValid(globalModVersion)) { LOG.info("Skip current region failover as a global failover is ongoing."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java index 1a720b3ae0826..ce7bf7966fb4d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; -import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; @@ -261,18 +260,8 @@ public void testNoRestart() throws Exception { @Test public void testFailoverExecutionDependentOnRestartStrategyRecoveryTrigger() throws Exception { final JobGraph jobGraph = createBatchJobGraph(); - final CompletableFuture restartCallFuture = new CompletableFuture<>(); - final RestartStrategy restartStrategy = new RestartStrategy() { - @Override - public boolean canRestart() { - return true; - } + final TestRestartStrategy restartStrategy = new TestRestartStrategy(); - @Override - public void restart(RestartCallback restarter, ScheduledExecutor executor) { - restartCallFuture.complete(null); - } - }; final ExecutionGraph eg = createExecutionGraph(jobGraph, restartStrategy); final ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); @@ -285,7 +274,7 @@ public void restart(RestartCallback restarter, ScheduledExecutor executor) { // the only thing the failover strategy should do is cancel tasks that require it // sanity check to ensure we actually called into the restart strategy - assertEquals(restartCallFuture.isDone(),true); + assertEquals(restartStrategy.getNumberOfQueuedActions(), 1); // 3 out of 4 tasks will be canceled, and removed from the set of registered executions assertEquals(eg.getRegisteredExecutions().size(), 1); // no job state change should occur; in case of a failover we never switch to RESTARTING/CANCELED From 2899edc2ab51969cd3362e86eb1979137d1bb23f Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Thu, 4 Jul 2019 13:22:34 +0200 Subject: [PATCH 3/5] run async --- .../failover/AdaptedRestartPipelinedRegionStrategyNG.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java index 00f429798c294..e3680413629cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java @@ -109,7 +109,7 @@ protected void restartTasks(final Set verticesToRestart) { FutureUtils.assertNoException( cancelTasks(verticesToRestart) - .thenRun(scheduleRestart(globalModVersion, vertexVersions)) + .thenRunAsync(scheduleRestart(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor()) .handle(failGlobalOnError())); } From c12a5344bb44840da6481e59c626edd2261f5b9b Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Thu, 4 Jul 2019 13:23:39 +0200 Subject: [PATCH 4/5] renamings --- .../failover/AdaptedRestartPipelinedRegionStrategyNG.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java index e3680413629cf..3eef502e141de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java @@ -109,19 +109,19 @@ protected void restartTasks(final Set verticesToRestart) { FutureUtils.assertNoException( cancelTasks(verticesToRestart) - .thenRunAsync(scheduleRestart(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor()) + .thenRunAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor()) .handle(failGlobalOnError())); } - private Runnable scheduleRestart(final long globalModVersion, final Set vertexVersions) { + private Runnable resetAndRescheduleTasks(final long globalModVersion, final Set vertexVersions) { final RestartStrategy restartStrategy = executionGraph.getRestartStrategy(); return () -> restartStrategy.restart( - resetAndRescheduleTasks(globalModVersion, vertexVersions), + createResetAndRescheduleTasksCallback(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor() ); } - private RestartCallback resetAndRescheduleTasks(final long globalModVersion, final Set vertexVersions) { + private RestartCallback createResetAndRescheduleTasksCallback(final long globalModVersion, final Set vertexVersions) { return () -> { if (!isLocalFailoverValid(globalModVersion)) { LOG.info("Skip current region failover as a global failover is ongoing."); From a17cdc4e8bfb0acf982be16c37f2170a1b9b58c8 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Thu, 4 Jul 2019 14:41:12 +0200 Subject: [PATCH 5/5] fix test --- ...dRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java index 7da846e638a1b..f5461f0feec74 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java @@ -125,9 +125,9 @@ public void testConcurrentRegionFailovers() throws Exception { // complete region failover blocker to trigger region failover recovery failoverStrategy.getBlockerFuture().complete(null); + manualMainThreadExecutor.triggerAll(); manuallyTriggeredRestartStrategy.triggerAll(); manualMainThreadExecutor.triggerAll(); - manualMainThreadExecutor.triggerScheduledTasks(); // verify that all tasks are recovered and no task is restarted more than once assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState());