diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java index 5fddac3d63113..3eef502e141de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java @@ -31,6 +31,8 @@ import org.apache.flink.runtime.executiongraph.SchedulingUtils; import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartCallback; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; @@ -112,6 +114,14 @@ protected void restartTasks(final Set verticesToRestart) { } private Runnable resetAndRescheduleTasks(final long globalModVersion, final Set vertexVersions) { + final RestartStrategy restartStrategy = executionGraph.getRestartStrategy(); + return () -> restartStrategy.restart( + createResetAndRescheduleTasksCallback(globalModVersion, vertexVersions), + executionGraph.getJobMasterMainThreadExecutor() + ); + } + + private RestartCallback createResetAndRescheduleTasksCallback(final long globalModVersion, final Set vertexVersions) { return () -> { if (!isLocalFailoverValid(globalModVersion)) { LOG.info("Skip current region failover as a global failover is ongoing."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java index 20fb083792e87..f5461f0feec74 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java @@ -126,6 +126,8 @@ public void testConcurrentRegionFailovers() throws Exception { // complete region failover blocker to trigger region failover recovery failoverStrategy.getBlockerFuture().complete(null); manualMainThreadExecutor.triggerAll(); + manuallyTriggeredRestartStrategy.triggerAll(); + manualMainThreadExecutor.triggerAll(); // verify that all tasks are recovered and no task is restarted more than once assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java index c4c1c32077412..ce7bf7966fb4d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java @@ -28,8 +28,9 @@ import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; -import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; import org.apache.flink.runtime.instance.SlotSharingGroupId; @@ -117,6 +118,7 @@ public void testRegionFailoverInEagerMode() throws Exception { // vertices { ev11, ev21 } should be affected ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); manualMainThreadExecutor.triggerAll(); + manualMainThreadExecutor.triggerScheduledTasks(); // verify vertex states and complete cancellation assertVertexInState(ExecutionState.FAILED, ev11); @@ -125,6 +127,7 @@ public void testRegionFailoverInEagerMode() throws Exception { assertVertexInState(ExecutionState.DEPLOYING, ev22); ev21.getCurrentExecutionAttempt().completeCancelling(); manualMainThreadExecutor.triggerAll(); + manualMainThreadExecutor.triggerScheduledTasks(); // verify vertex states // in eager mode, all affected vertices should be scheduled in failover @@ -168,6 +171,7 @@ public void testRegionFailoverForRegionInternalErrorsInLazyMode() throws Excepti // regions {ev11}, {ev21}, {ev22} should be affected ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); manualMainThreadExecutor.triggerAll(); + manualMainThreadExecutor.triggerScheduledTasks(); // verify vertex states // only vertices with consumable inputs can be scheduled @@ -249,6 +253,35 @@ public void testNoRestart() throws Exception { assertEquals(JobStatus.FAILED, eg.getState()); } + /** + * Tests that the execution of the restart logic of the failover strategy is dependent on the restart strategy + * calling {@link RestartCallback#triggerFullRecovery()}. + */ + @Test + public void testFailoverExecutionDependentOnRestartStrategyRecoveryTrigger() throws Exception { + final JobGraph jobGraph = createBatchJobGraph(); + final TestRestartStrategy restartStrategy = new TestRestartStrategy(); + + final ExecutionGraph eg = createExecutionGraph(jobGraph, restartStrategy); + + final ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + ev.fail(new Exception("Test Exception")); + + manualMainThreadExecutor.triggerAll(); + + // the entire failover-procedure is being halted by the restart strategy not doing anything + // the only thing the failover strategy should do is cancel tasks that require it + + // sanity check to ensure we actually called into the restart strategy + assertEquals(restartStrategy.getNumberOfQueuedActions(), 1); + // 3 out of 4 tasks will be canceled, and removed from the set of registered executions + assertEquals(eg.getRegisteredExecutions().size(), 1); + // no job state change should occur; in case of a failover we never switch to RESTARTING/CANCELED + // the important thing is that we don't switch to failed which would imply that we started a global failover + assertEquals(JobStatus.RUNNING, eg.getState()); + } + @Test public void testFailGlobalIfErrorOnRestartingTasks() throws Exception { final JobGraph jobGraph = createStreamingJobGraph(); @@ -267,6 +300,7 @@ public void testFailGlobalIfErrorOnRestartingTasks() throws Exception { completeCancelling(ev11, ev12, ev21, ev22); manualMainThreadExecutor.triggerAll(); + manualMainThreadExecutor.triggerScheduledTasks(); final long globalModVersionAfterFailure = eg.getGlobalModVersion(); @@ -340,7 +374,7 @@ private JobGraph createBatchJobGraph() { } private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception { - return createExecutionGraph(jobGraph, new InfiniteDelayRestartStrategy(10)); + return createExecutionGraph(jobGraph, new FixedDelayRestartStrategy(10, 0)); } private ExecutionGraph createExecutionGraph(