Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.flink.runtime.executiongraph.SchedulingUtils;
import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
Expand Down Expand Up @@ -112,6 +114,14 @@ protected void restartTasks(final Set<ExecutionVertexID> verticesToRestart) {
}

private Runnable resetAndRescheduleTasks(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
final RestartStrategy restartStrategy = executionGraph.getRestartStrategy();
return () -> restartStrategy.restart(
createResetAndRescheduleTasksCallback(globalModVersion, vertexVersions),
executionGraph.getJobMasterMainThreadExecutor()
);
}

private RestartCallback createResetAndRescheduleTasksCallback(final long globalModVersion, final Set<ExecutionVertexVersion> vertexVersions) {
return () -> {
if (!isLocalFailoverValid(globalModVersion)) {
LOG.info("Skip current region failover as a global failover is ongoing.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public void testConcurrentRegionFailovers() throws Exception {
// complete region failover blocker to trigger region failover recovery
failoverStrategy.getBlockerFuture().complete(null);
manualMainThreadExecutor.triggerAll();
manuallyTriggeredRestartStrategy.triggerAll();
manualMainThreadExecutor.triggerAll();

// verify that all tasks are recovered and no task is restarted more than once
assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
Expand Down Expand Up @@ -117,6 +118,7 @@ public void testRegionFailoverInEagerMode() throws Exception {
// vertices { ev11, ev21 } should be affected
ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
manualMainThreadExecutor.triggerAll();
manualMainThreadExecutor.triggerScheduledTasks();

// verify vertex states and complete cancellation
assertVertexInState(ExecutionState.FAILED, ev11);
Expand All @@ -125,6 +127,7 @@ public void testRegionFailoverInEagerMode() throws Exception {
assertVertexInState(ExecutionState.DEPLOYING, ev22);
ev21.getCurrentExecutionAttempt().completeCancelling();
manualMainThreadExecutor.triggerAll();
manualMainThreadExecutor.triggerScheduledTasks();

// verify vertex states
// in eager mode, all affected vertices should be scheduled in failover
Expand Down Expand Up @@ -168,6 +171,7 @@ public void testRegionFailoverForRegionInternalErrorsInLazyMode() throws Excepti
// regions {ev11}, {ev21}, {ev22} should be affected
ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
manualMainThreadExecutor.triggerAll();
manualMainThreadExecutor.triggerScheduledTasks();

// verify vertex states
// only vertices with consumable inputs can be scheduled
Expand Down Expand Up @@ -249,6 +253,35 @@ public void testNoRestart() throws Exception {
assertEquals(JobStatus.FAILED, eg.getState());
}

/**
* Tests that the execution of the restart logic of the failover strategy is dependent on the restart strategy
* calling {@link RestartCallback#triggerFullRecovery()}.
*/
@Test
public void testFailoverExecutionDependentOnRestartStrategyRecoveryTrigger() throws Exception {
final JobGraph jobGraph = createBatchJobGraph();
final TestRestartStrategy restartStrategy = new TestRestartStrategy();

final ExecutionGraph eg = createExecutionGraph(jobGraph, restartStrategy);

final ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next();

ev.fail(new Exception("Test Exception"));

manualMainThreadExecutor.triggerAll();

// the entire failover-procedure is being halted by the restart strategy not doing anything
// the only thing the failover strategy should do is cancel tasks that require it

// sanity check to ensure we actually called into the restart strategy
assertEquals(restartStrategy.getNumberOfQueuedActions(), 1);
// 3 out of 4 tasks will be canceled, and removed from the set of registered executions
assertEquals(eg.getRegisteredExecutions().size(), 1);
// no job state change should occur; in case of a failover we never switch to RESTARTING/CANCELED
// the important thing is that we don't switch to failed which would imply that we started a global failover
assertEquals(JobStatus.RUNNING, eg.getState());
}

@Test
public void testFailGlobalIfErrorOnRestartingTasks() throws Exception {
final JobGraph jobGraph = createStreamingJobGraph();
Expand All @@ -267,6 +300,7 @@ public void testFailGlobalIfErrorOnRestartingTasks() throws Exception {
completeCancelling(ev11, ev12, ev21, ev22);

manualMainThreadExecutor.triggerAll();
manualMainThreadExecutor.triggerScheduledTasks();

final long globalModVersionAfterFailure = eg.getGlobalModVersion();

Expand Down Expand Up @@ -340,7 +374,7 @@ private JobGraph createBatchJobGraph() {
}

private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
return createExecutionGraph(jobGraph, new InfiniteDelayRestartStrategy(10));
return createExecutionGraph(jobGraph, new FixedDelayRestartStrategy(10, 0));
}

private ExecutionGraph createExecutionGraph(
Expand Down