From 0074befd2307b055bdab409d6deb9025d2385e19 Mon Sep 17 00:00:00 2001 From: Gary Yao Date: Mon, 1 Jul 2019 10:21:39 +0200 Subject: [PATCH 1/4] [hotfix][runtime] Fix checkstyle violations in FailoverStrategyLoader --- .../executiongraph/failover/FailoverStrategyLoader.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java index 8b6fa6e9c80f8..eb003b9ca7b7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java @@ -28,14 +28,14 @@ import javax.annotation.Nullable; /** - * A utility class to load failover strategies from the configuration. + * A utility class to load failover strategies from the configuration. */ public class FailoverStrategyLoader { - /** Config name for the {@link RestartAllStrategy} */ + /** Config name for the {@link RestartAllStrategy}. */ public static final String FULL_RESTART_STRATEGY_NAME = "full"; - /** Config name for the {@link RestartIndividualStrategy} */ + /** Config name for the {@link RestartIndividualStrategy}. */ public static final String INDIVIDUAL_RESTART_STRATEGY_NAME = "individual"; /** Config name for the {@link RestartPipelinedRegionStrategy} */ From 5aba0b61801ce1dec25607bbac186cc64fee82e1 Mon Sep 17 00:00:00 2001 From: Gary Yao Date: Thu, 27 Jun 2019 17:05:28 +0200 Subject: [PATCH 2/4] [hotfix][runtime, tests] Remove ComponentMainThreadExecutor interface from ManuallyTriggeredScheduledExecutor --- .../concurrent/ManuallyTriggeredScheduledExecutor.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java index dd307c496eb99..870cda8d7f7d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java @@ -39,7 +39,7 @@ /** * Simple {@link ScheduledExecutor} implementation for testing purposes. */ -public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor, ComponentMainThreadExecutor { +public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor { private final Executor executorDelegate; private final ArrayDeque queuedRunnables = new ArrayDeque<>(); @@ -148,10 +148,6 @@ private ScheduledFuture insertRunnable(Runnable command, boolean isPeriodic) return scheduledTask; } - @Override - public void assertRunningInMainThread() { - } - private static final class ScheduledTask implements ScheduledFuture { private final Callable callable; From 498f4980a56a12e37393b7638f6e05519ba16df1 Mon Sep 17 00:00:00 2001 From: Gary Yao Date: Wed, 3 Jul 2019 20:19:16 +0200 Subject: [PATCH 3/4] [hotfix][runtime] Make ComponentMainThreadExecutorServiceAdapter accept ScheduledExecutor --- ...onentMainThreadExecutorServiceAdapter.java | 60 ++++++++++++++++--- 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java index dbd94abd47ede..c71675ac5adb3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutorServiceAdapter.java @@ -18,31 +18,73 @@ package org.apache.flink.runtime.concurrent; -import javax.annotation.Nonnull; +import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; +import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Adapter class for a {@link ScheduledExecutorService} which shall be used as a + * Adapter class for a {@link ScheduledExecutorService} or {@link ScheduledExecutor} which shall be used as a * {@link ComponentMainThreadExecutor}. It enhances the given executor with an assert that the current thread is the * main thread of the executor. */ -public class ComponentMainThreadExecutorServiceAdapter - extends ScheduledExecutorServiceAdapter implements ComponentMainThreadExecutor { +public class ComponentMainThreadExecutorServiceAdapter implements ComponentMainThreadExecutor { + + private final ScheduledExecutor scheduledExecutor; /** A runnable that should assert that the current thread is the expected main thread. */ - @Nonnull private final Runnable mainThreadCheck; public ComponentMainThreadExecutorServiceAdapter( - @Nonnull ScheduledExecutorService scheduledExecutorService, - @Nonnull Runnable mainThreadCheck) { - super(scheduledExecutorService); - this.mainThreadCheck = mainThreadCheck; + final ScheduledExecutorService scheduledExecutorService, + final Runnable mainThreadCheck) { + this(new ScheduledExecutorServiceAdapter(scheduledExecutorService), mainThreadCheck); + } + + public ComponentMainThreadExecutorServiceAdapter( + final ScheduledExecutor scheduledExecutorService, + final Thread mainThread) { + this(scheduledExecutorService, () -> MainThreadValidatorUtil.isRunningInExpectedThread(mainThread)); + } + + private ComponentMainThreadExecutorServiceAdapter( + final ScheduledExecutor scheduledExecutor, + final Runnable mainThreadCheck) { + this.scheduledExecutor = checkNotNull(scheduledExecutor); + this.mainThreadCheck = checkNotNull(mainThreadCheck); } @Override public void assertRunningInMainThread() { mainThreadCheck.run(); } + + @Override + public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { + return scheduledExecutor.schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule(final Callable callable, final long delay, final TimeUnit unit) { + return scheduledExecutor.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { + return scheduledExecutor.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { + return scheduledExecutor.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + @Override + public void execute(final Runnable command) { + scheduledExecutor.execute(command); + } } From 557195cef91b131577ccece7f5e063b342b9f5a2 Mon Sep 17 00:00:00 2001 From: Gary Yao Date: Wed, 3 Jul 2019 20:20:34 +0200 Subject: [PATCH 4/4] [FLINK-12876][runtime] Adapt new RestartPipelinedRegionStrategy to legacy scheduling Implement adapter (AdaptedRestartPipelinedRegionStrategyNG) that adapts org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy to the legacy failover strategy interface (org.apache.flink.runtime.executiongraph.failover.FailoverStrategy). The new AdaptedRestartPipelinedRegionStrategyNG is chosen if config option jobmanager.execution.failover-strategy is set to "region". The legacy behavior can be enabled by setting the config option to "region-legacy". --- .../executiongraph/ExecutionGraph.java | 156 +----- .../executiongraph/SchedulingUtils.java | 218 +++++++++ ...aptedRestartPipelinedRegionStrategyNG.java | 316 ++++++++++++ .../failover/FailoverStrategyLoader.java | 8 +- .../scheduler/ExecutionVertexVersion.java | 45 ++ .../scheduler/ExecutionVertexVersioner.java | 76 +++ ...StrategyNGAbortPendingCheckpointsTest.java | 171 +++++++ ...egionStrategyNGConcurrentFailoverTest.java | 284 +++++++++++ ...PipelinedRegionStrategyNGFailoverTest.java | 459 ++++++++++++++++++ .../ExecutionGraphRestartTest.java | 108 ++++- .../PipelinedFailoverRegionBuildingTest.java | 2 +- ...rtPipelinedRegionStrategyBuildingTest.java | 4 +- .../ExecutionVertexVersionerTest.java | 121 +++++ 13 files changed, 1807 insertions(+), 161 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersion.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index ce65b68b98946..0840c0070a0cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -40,7 +40,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; -import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; @@ -65,9 +64,6 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; -import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; -import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; -import org.apache.flink.runtime.jobmaster.slotpool.Scheduler; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter; @@ -108,16 +104,13 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -952,11 +945,11 @@ public void scheduleForExecution() throws JobException { switch (scheduleMode) { case LAZY_FROM_SOURCES: - newSchedulingFuture = scheduleLazy(slotProvider); + newSchedulingFuture = scheduleLazy(); break; case EAGER: - newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout); + newSchedulingFuture = scheduleEager(); break; default: @@ -985,123 +978,16 @@ public void scheduleForExecution() throws JobException { } } - private CompletableFuture scheduleLazy(SlotProvider slotProvider) { - - final ArrayList> schedulingFutures = new ArrayList<>(numVerticesTotal); - // simply take the vertices without inputs. - for (ExecutionJobVertex ejv : verticesInCreationOrder) { - if (ejv.getJobVertex().isInputVertex()) { - final CompletableFuture schedulingJobVertexFuture = ejv.scheduleAll( - slotProvider, - allowQueuedScheduling, - LocationPreferenceConstraint.ALL, // since it is an input vertex, the input based location preferences should be empty - Collections.emptySet()); - - schedulingFutures.add(schedulingJobVertexFuture); - } - } - - return FutureUtils.waitForAll(schedulingFutures); + private CompletableFuture scheduleLazy() { + return SchedulingUtils.scheduleLazy(getAllExecutionVertices(), this); } /** - * - * - * @param slotProvider The resource provider from which the slots are allocated - * @param timeout The maximum time that the deployment may take, before a - * TimeoutException is thrown. * @return Future which is completed once the {@link ExecutionGraph} has been scheduled. * The future can also be completed exceptionally if an error happened. */ - private CompletableFuture scheduleEager(SlotProvider slotProvider, final Time timeout) { - assertRunningInJobMasterMainThread(); - checkState(state == JobStatus.RUNNING, "job is not running currently"); - - // Important: reserve all the space we need up front. - // that way we do not have any operation that can fail between allocating the slots - // and adding them to the list. If we had a failure in between there, that would - // cause the slots to get lost - final boolean queued = allowQueuedScheduling; - - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - - final Set allPreviousAllocationIds = - Collections.unmodifiableSet(computeAllPriorAllocationIdsIfRequiredByScheduling()); - - // allocate the slots (obtain all their futures - for (ExecutionJobVertex ejv : getVerticesTopologically()) { - // these calls are not blocking, they only return futures - Collection> allocationFutures = ejv.allocateResourcesForAll( - slotProvider, - queued, - LocationPreferenceConstraint.ALL, - allPreviousAllocationIds, - timeout); - - allAllocationFutures.addAll(allocationFutures); - } - - // this future is complete once all slot futures are complete. - // the future fails once one slot future fails. - final ConjunctFuture> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures); - - return allAllocationsFuture.thenAccept( - (Collection executionsToDeploy) -> { - for (Execution execution : executionsToDeploy) { - try { - execution.deploy(); - } catch (Throwable t) { - throw new CompletionException( - new FlinkException( - String.format("Could not deploy execution %s.", execution), - t)); - } - } - }) - // Generate a more specific failure message for the eager scheduling - .exceptionally( - (Throwable throwable) -> { - final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); - final Throwable resultThrowable; - if (strippedThrowable instanceof TimeoutException) { - int numTotal = allAllocationsFuture.getNumFuturesTotal(); - int numComplete = allAllocationsFuture.getNumFuturesCompleted(); - - String message = "Could not allocate all requires slots within timeout of " + - timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete + - ", previous allocation IDs: " + allPreviousAllocationIds; - - StringBuilder executionMessageBuilder = new StringBuilder(); - - for (int i = 0; i < allAllocationFutures.size(); i++) { - CompletableFuture executionFuture = allAllocationFutures.get(i); - - try { - Execution execution = executionFuture.getNow(null); - if (execution != null) { - executionMessageBuilder.append("completed: " + execution); - } else { - executionMessageBuilder.append("incomplete: " + executionFuture); - } - } catch (CompletionException completionException) { - executionMessageBuilder.append("completed exceptionally: " + completionException + "/" + executionFuture); - } - - if (i < allAllocationFutures.size() - 1) { - executionMessageBuilder.append(", "); - } - } - - message += ", execution status: " + executionMessageBuilder.toString(); - - resultThrowable = new NoResourceAvailableException(message); - } else { - resultThrowable = strippedThrowable; - } - - throw new CompletionException(resultThrowable); - }); + private CompletableFuture scheduleEager() { + return SchedulingUtils.scheduleEager(getAllExecutionVertices(), this); } public void cancel() { @@ -1414,7 +1300,7 @@ public FailoverStrategy getFailoverStrategy() { * and is used to disambiguate concurrent modifications between local and global * failover actions. */ - long getGlobalModVersion() { + public long getGlobalModVersion() { return globalModVersion; } @@ -1808,34 +1694,6 @@ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) { } } - /** - * Computes and returns a set with the prior allocation ids from all execution vertices in the graph. - */ - private Set computeAllPriorAllocationIds() { - HashSet allPreviousAllocationIds = new HashSet<>(getNumberOfExecutionJobVertices()); - for (ExecutionVertex executionVertex : getAllExecutionVertices()) { - AllocationID latestPriorAllocation = executionVertex.getLatestPriorAllocation(); - if (latestPriorAllocation != null) { - allPreviousAllocationIds.add(latestPriorAllocation); - } - } - return allPreviousAllocationIds; - } - - /** - * Returns the result of {@link #computeAllPriorAllocationIds()}, but only if the scheduling really requires it. - * Otherwise this method simply returns an empty set. - */ - private Set computeAllPriorAllocationIdsIfRequiredByScheduling() { - // This is a temporary optimization to avoid computing all previous allocations if not required - // This can go away when we progress with the implementation of the Scheduler. - if (slotProvider instanceof Scheduler && ((Scheduler) slotProvider).requiresPreviousExecutionGraphAllocations()) { - return computeAllPriorAllocationIds(); - } else { - return Collections.emptySet(); - } - } - // -------------------------------------------------------------------------------------------- // Listeners & Observers // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java new file mode 100644 index 0000000000000..950fcb18f121d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SchedulingUtils.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmaster.slotpool.Scheduler; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This class contains scheduling logic for EAGER and LAZY_FROM_SOURCES. + * It is used for normal scheduling and legacy failover strategy re-scheduling. + */ +public class SchedulingUtils { + + /** + * Schedule vertices lazy. That means only vertices satisfying its input constraint will be scheduled. + * + * @param vertices Topologically sorted vertices to schedule. + * @param executionGraph The graph the given vertices belong to. + */ + public static CompletableFuture scheduleLazy( + final Iterable vertices, + final ExecutionGraph executionGraph) { + + executionGraph.assertRunningInJobMasterMainThread(); + + final Set previousAllocations = computePriorAllocationIdsIfRequiredByScheduling( + vertices, executionGraph.getSlotProvider()); + + final ArrayList> schedulingFutures = new ArrayList<>(); + for (ExecutionVertex executionVertex : vertices) { + // only schedule vertex when its input constraint is satisfied + if (executionVertex.getJobVertex().getJobVertex().isInputVertex() || + executionVertex.checkInputDependencyConstraints()) { + + final CompletableFuture schedulingVertexFuture = executionVertex.scheduleForExecution( + executionGraph.getSlotProvider(), + executionGraph.isQueuedSchedulingAllowed(), + LocationPreferenceConstraint.ANY, + previousAllocations); + + schedulingFutures.add(schedulingVertexFuture); + } + } + + return FutureUtils.waitForAll(schedulingFutures); + } + + /** + * Schedule vertices eagerly. That means all vertices will be scheduled at once. + * + * @param vertices Topologically sorted vertices to schedule. + * @param executionGraph The graph the given vertices belong to. + */ + public static CompletableFuture scheduleEager( + final Iterable vertices, + final ExecutionGraph executionGraph) { + + executionGraph.assertRunningInJobMasterMainThread(); + + checkState(executionGraph.getState() == JobStatus.RUNNING, "job is not running currently"); + + // Important: reserve all the space we need up front. + // that way we do not have any operation that can fail between allocating the slots + // and adding them to the list. If we had a failure in between there, that would + // cause the slots to get lost + final boolean queued = executionGraph.isQueuedSchedulingAllowed(); + + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList> allAllocationFutures = new ArrayList<>(); + + final Set allPreviousAllocationIds = Collections.unmodifiableSet( + computePriorAllocationIdsIfRequiredByScheduling(vertices, executionGraph.getSlotProvider())); + + // allocate the slots (obtain all their futures) + for (ExecutionVertex ev : vertices) { + // these calls are not blocking, they only return futures + CompletableFuture allocationFuture = ev.getCurrentExecutionAttempt().allocateResourcesForExecution( + executionGraph.getSlotProvider(), + queued, + LocationPreferenceConstraint.ALL, + allPreviousAllocationIds, + executionGraph.getAllocationTimeout()); + + allAllocationFutures.add(allocationFuture); + } + + // this future is complete once all slot futures are complete. + // the future fails once one slot future fails. + final ConjunctFuture> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures); + + return allAllocationsFuture.thenAccept( + (Collection executionsToDeploy) -> { + for (Execution execution : executionsToDeploy) { + try { + execution.deploy(); + } catch (Throwable t) { + throw new CompletionException( + new FlinkException( + String.format("Could not deploy execution %s.", execution), + t)); + } + } + }) + // Generate a more specific failure message for the eager scheduling + .exceptionally( + (Throwable throwable) -> { + final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); + final Throwable resultThrowable; + if (strippedThrowable instanceof TimeoutException) { + int numTotal = allAllocationsFuture.getNumFuturesTotal(); + int numComplete = allAllocationsFuture.getNumFuturesCompleted(); + + String message = "Could not allocate all requires slots within timeout of " + + executionGraph.getAllocationTimeout() + ". Slots required: " + + numTotal + ", slots allocated: " + numComplete + + ", previous allocation IDs: " + allPreviousAllocationIds; + + StringBuilder executionMessageBuilder = new StringBuilder(); + + for (int i = 0; i < allAllocationFutures.size(); i++) { + CompletableFuture executionFuture = allAllocationFutures.get(i); + + try { + Execution execution = executionFuture.getNow(null); + if (execution != null) { + executionMessageBuilder.append("completed: " + execution); + } else { + executionMessageBuilder.append("incomplete: " + executionFuture); + } + } catch (CompletionException completionException) { + executionMessageBuilder.append("completed exceptionally: " + + completionException + "/" + executionFuture); + } + + if (i < allAllocationFutures.size() - 1) { + executionMessageBuilder.append(", "); + } + } + + message += ", execution status: " + executionMessageBuilder.toString(); + + resultThrowable = new NoResourceAvailableException(message); + } else { + resultThrowable = strippedThrowable; + } + + throw new CompletionException(resultThrowable); + }); + } + + /** + * Returns the result of {@link #computePriorAllocationIds(Iterable)}, + * but only if the scheduling really requires it. + * Otherwise this method simply returns an empty set. + */ + private static Set computePriorAllocationIdsIfRequiredByScheduling( + final Iterable vertices, + final SlotProvider slotProvider) { + // This is a temporary optimization to avoid computing all previous allocations if not required + // This can go away when we progress with the implementation of the Scheduler. + if (slotProvider instanceof Scheduler && + ((Scheduler) slotProvider).requiresPreviousExecutionGraphAllocations()) { + + return computePriorAllocationIds(vertices); + } else { + return Collections.emptySet(); + } + } + + /** + * Computes and returns a set with the prior allocation ids for given execution vertices. + */ + private static Set computePriorAllocationIds(final Iterable vertices) { + HashSet allPreviousAllocationIds = new HashSet<>(); + for (ExecutionVertex executionVertex : vertices) { + AllocationID latestPriorAllocation = executionVertex.getLatestPriorAllocation(); + if (latestPriorAllocation != null) { + allPreviousAllocationIds.add(latestPriorAllocation); + } + } + return allPreviousAllocationIds; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java new file mode 100644 index 0000000000000..5fddac3d63113 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; +import org.apache.flink.runtime.executiongraph.SchedulingUtils; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.ExceptionUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * This failover strategy uses flip1.RestartPipelinedRegionStrategy to make task failover decisions. + */ +public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); + + /** The execution graph on which this FailoverStrategy works. */ + private final ExecutionGraph executionGraph; + + /** The versioner helps to maintain execution vertex versions. */ + private final ExecutionVertexVersioner executionVertexVersioner; + + /** The underlying new generation region failover strategy. */ + private RestartPipelinedRegionStrategy restartPipelinedRegionStrategy; + + public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { + this.executionGraph = checkNotNull(executionGraph); + this.executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Override + public void onTaskFailure(final Execution taskExecution, final Throwable cause) { + if (!executionGraph.getRestartStrategy().canRestart()) { + // delegate the failure to a global fail that will check the restart strategy and not restart + LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); + failGlobal(cause); + return; + } + + if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; + } + + final ExecutionVertexID vertexID = getExecutionVertexID(taskExecution.getVertex()); + + final Set tasksToRestart = restartPipelinedRegionStrategy.getTasksNeedingRestart(vertexID, cause); + restartTasks(tasksToRestart); + } + + @VisibleForTesting + protected void restartTasks(final Set verticesToRestart) { + final long globalModVersion = executionGraph.getGlobalModVersion(); + final Set vertexVersions = new HashSet<>( + executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); + + FutureUtils.assertNoException( + cancelTasks(verticesToRestart) + .thenRunAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor()) + .handle(failGlobalOnError())); + } + + private Runnable resetAndRescheduleTasks(final long globalModVersion, final Set vertexVersions) { + return () -> { + if (!isLocalFailoverValid(globalModVersion)) { + LOG.info("Skip current region failover as a global failover is ongoing."); + return; + } + + // found out vertices which are still valid to restart. + // some vertices involved in this failover may be modified if another region + // failover happens during the cancellation stage of this failover. + // Will ignore the modified vertices as the other failover will deal with them. + final Set unmodifiedVertices = executionVertexVersioner + .getUnmodifiedExecutionVertices(vertexVersions) + .stream() + .map(this::getExecutionVertex) + .collect(Collectors.toSet()); + + try { + LOG.info("Finally restart {} tasks to recover from task failure.", unmodifiedVertices.size()); + + // reset tasks to CREATED state and reload state + resetTasks(unmodifiedVertices, globalModVersion); + + // re-schedule tasks + rescheduleTasks(unmodifiedVertices, globalModVersion); + } catch (GlobalModVersionMismatch e) { + throw new IllegalStateException( + "Bug: ExecutionGraph was concurrently modified outside of main thread", e); + } catch (Exception e) { + throw new CompletionException(e); + } + }; + } + + private BiFunction failGlobalOnError() { + return (Object ignored, Throwable t) -> { + if (t != null) { + LOG.info("Unexpected error happens in region failover. Fail globally.", t); + failGlobal(t); + } + return null; + }; + } + + @VisibleForTesting + protected CompletableFuture cancelTasks(final Set vertices) { + final List> cancelFutures = vertices.stream() + .map(this::cancelExecutionVertex) + .collect(Collectors.toList()); + + return FutureUtils.combineAll(cancelFutures); + } + + private void resetTasks(final Set vertices, final long globalModVersion) throws Exception { + final Set colGroups = new HashSet<>(); + final long restartTimestamp = System.currentTimeMillis(); + + for (ExecutionVertex ev : vertices) { + CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup(); + if (cgroup != null && !colGroups.contains(cgroup)){ + cgroup.resetConstraints(); + colGroups.add(cgroup); + } + + ev.resetForNewExecution(restartTimestamp, globalModVersion); + } + + // if there is checkpointed state, reload it into the executions + if (executionGraph.getCheckpointCoordinator() != null) { + // abort pending checkpoints to + // i) enable new checkpoint triggering without waiting for last checkpoint expired. + // ii) ensure the EXACTLY_ONCE semantics if needed. + executionGraph.getCheckpointCoordinator().abortPendingCheckpoints( + new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION)); + + final Map involvedExecutionJobVertices = + getInvolvedExecutionJobVertices(vertices); + executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState( + involvedExecutionJobVertices, false, true); + } + } + + private void rescheduleTasks(final Set vertices, final long globalModVersion) throws Exception { + + // sort vertices topologically + // this is to reduce the possibility that downstream tasks get launched earlier, + // which may cause lots of partition state checks in EAGER mode + final List sortedVertices = sortVerticesTopologically(vertices); + + final CompletableFuture newSchedulingFuture; + switch (executionGraph.getScheduleMode()) { + + case LAZY_FROM_SOURCES: + newSchedulingFuture = SchedulingUtils.scheduleLazy(sortedVertices, executionGraph); + break; + + case EAGER: + newSchedulingFuture = SchedulingUtils.scheduleEager(sortedVertices, executionGraph); + break; + + default: + throw new JobException("Schedule mode is invalid."); + } + + // if no global failover is triggered in the scheduling process, + // register a failure handling callback to the scheduling + if (isLocalFailoverValid(globalModVersion)) { + newSchedulingFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); + + if (!(strippedThrowable instanceof CancellationException)) { + // only fail if the scheduling future was not canceled + failGlobal(strippedThrowable); + } + } + }); + } + } + + private boolean isLocalFailoverValid(final long globalModVersion) { + // local failover is valid only if the job is still RUNNING and + // no global failover happens since the given globalModVersion is recorded + return executionGraph.getState() == JobStatus.RUNNING && + executionGraph.getGlobalModVersion() == globalModVersion; + } + + private CompletableFuture cancelExecutionVertex(final ExecutionVertexID executionVertexId) { + return getExecutionVertex(executionVertexId).cancel(); + } + + private Map getInvolvedExecutionJobVertices( + final Set executionVertices) { + + Map tasks = new HashMap<>(); + for (ExecutionVertex executionVertex : executionVertices) { + JobVertexID jobvertexId = executionVertex.getJobvertexId(); + ExecutionJobVertex jobVertex = executionVertex.getJobVertex(); + tasks.putIfAbsent(jobvertexId, jobVertex); + } + return tasks; + } + + private void failGlobal(final Throwable cause) { + executionGraph.failGlobal(cause); + } + + private ExecutionVertex getExecutionVertex(final ExecutionVertexID vertexID) { + return executionGraph.getAllVertices() + .get(vertexID.getJobVertexId()) + .getTaskVertices()[vertexID.getSubtaskIndex()]; + } + + private ExecutionVertexID getExecutionVertexID(final ExecutionVertex vertex) { + return new ExecutionVertexID(vertex.getJobvertexId(), vertex.getParallelSubtaskIndex()); + } + + private List sortVerticesTopologically(final Set vertices) { + // org execution vertex by jobVertexId + final Map> verticesMap = new HashMap<>(); + for (ExecutionVertex vertex : vertices) { + verticesMap.computeIfAbsent(vertex.getJobvertexId(), id -> new ArrayList<>()).add(vertex); + } + + // sort in jobVertex topological order + final List sortedVertices = new ArrayList<>(vertices.size()); + for (ExecutionJobVertex jobVertex : executionGraph.getVerticesTopologically()) { + sortedVertices.addAll(verticesMap.getOrDefault(jobVertex.getJobVertexId(), Collections.emptyList())); + } + return sortedVertices; + } + + @Override + public void notifyNewVertices(final List newJobVerticesTopological) { + // build the underlying new generation failover strategy when the executionGraph vertices are all added, + // otherwise the failover topology will not be correctly built. + // currently it's safe to add it here, as this method is invoked only once in production code. + checkState(restartPipelinedRegionStrategy == null, "notifyNewVertices() must be called only once"); + this.restartPipelinedRegionStrategy = new RestartPipelinedRegionStrategy( + new DefaultFailoverTopology(executionGraph)); + } + + @Override + public String getStrategyName() { + return "New Pipelined Region Failover"; + } + + // ------------------------------------------------------------------------ + // factory + // ------------------------------------------------------------------------ + + /** + * Factory that instantiates the AdaptedRestartPipelinedRegionStrategyNG. + */ + public static class Factory implements FailoverStrategy.Factory { + + @Override + public FailoverStrategy create(final ExecutionGraph executionGraph) { + return new AdaptedRestartPipelinedRegionStrategyNG(executionGraph); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java index eb003b9ca7b7b..e80c6edf2a207 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java @@ -38,9 +38,12 @@ public class FailoverStrategyLoader { /** Config name for the {@link RestartIndividualStrategy}. */ public static final String INDIVIDUAL_RESTART_STRATEGY_NAME = "individual"; - /** Config name for the {@link RestartPipelinedRegionStrategy} */ + /** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG}. */ public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region"; + /** Config name for the {@link RestartPipelinedRegionStrategy}. */ + public static final String LEGACY_PIPELINED_REGION_RESTART_STRATEGY_NAME = "region-legacy"; + // ------------------------------------------------------------------------ /** @@ -63,6 +66,9 @@ public static FailoverStrategy.Factory loadFailoverStrategy(Configuration config return new RestartAllStrategy.Factory(); case PIPELINED_REGION_RESTART_STRATEGY_NAME: + return new AdaptedRestartPipelinedRegionStrategyNG.Factory(); + + case LEGACY_PIPELINED_REGION_RESTART_STRATEGY_NAME: return new RestartPipelinedRegionStrategy.Factory(); case INDIVIDUAL_RESTART_STRATEGY_NAME: diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersion.java new file mode 100644 index 0000000000000..28ae3cf7a14f8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersion.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +/** + * This class helps to record version of an execution vertex. + */ +public class ExecutionVertexVersion { + + private final ExecutionVertexID executionVertexId; + + private final long version; + + ExecutionVertexVersion(final ExecutionVertexID executionVertexId, final long version) { + this.executionVertexId = executionVertexId; + this.version = version; + } + + public ExecutionVertexID getExecutionVertexId() { + return executionVertexId; + } + + public long getVersion() { + return version; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java new file mode 100644 index 0000000000000..081b6ff9129dd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Records modifications of + * {@link org.apache.flink.runtime.executiongraph.ExecutionVertex ExecutionVertices}, and allows + * for checking whether a vertex was modified. + * + *

Examples for modifications include: + *

    + *
  • cancellation of the underlying execution + *
  • deployment of the execution vertex + *
+ * + * @see DefaultScheduler + */ +public class ExecutionVertexVersioner { + + private final Map executionVertexToVersion = new HashMap<>(); + + public ExecutionVertexVersion recordModification(final ExecutionVertexID executionVertexId) { + final Long newVersion = executionVertexToVersion.merge(executionVertexId, 1L, Long::sum); + return new ExecutionVertexVersion(executionVertexId, newVersion); + } + + public Map recordVertexModifications( + final Collection vertices) { + return vertices.stream() + .map(this::recordModification) + .collect(Collectors.toMap(ExecutionVertexVersion::getExecutionVertexId, Function.identity())); + } + + public boolean isModified(final ExecutionVertexVersion executionVertexVersion) { + final Long currentVersion = executionVertexToVersion.get(executionVertexVersion.getExecutionVertexId()); + Preconditions.checkState(currentVersion != null, + "Execution vertex %s does not have a recorded version", + executionVertexVersion.getExecutionVertexId()); + return currentVersion != executionVertexVersion.getVersion(); + } + + public Set getUnmodifiedExecutionVertices(final Set executionVertexVersions) { + return executionVertexVersions.stream() + .filter(executionVertexVersion -> !isModified(executionVertexVersion)) + .map(ExecutionVertexVersion::getExecutionVertexId) + .collect(Collectors.toSet()); + } + +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java new file mode 100644 index 0000000000000..17271b72a6662 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkState; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG}. + */ +public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest extends TestLogger { + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread()); + } + + @Test + public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { + final JobGraph jobGraph = createStreamingJobGraph(); + final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); + + final Iterator vertexIterator = executionGraph.getAllExecutionVertices().iterator(); + final ExecutionVertex onlyExecutionVertex = vertexIterator.next(); + + setTaskRunning(executionGraph, onlyExecutionVertex); + + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + checkState(checkpointCoordinator != null); + + checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false); + final int pendingCheckpointsBeforeFailure = checkpointCoordinator.getNumberOfPendingCheckpoints(); + + failVertex(onlyExecutionVertex); + + assertThat(pendingCheckpointsBeforeFailure, is(equalTo(1))); + assertNoPendingCheckpoints(checkpointCoordinator); + } + + private void setTaskRunning(final ExecutionGraph executionGraph, final ExecutionVertex executionVertex) { + executionGraph.updateState( + new TaskExecutionState(executionGraph.getJobID(), + executionVertex.getCurrentExecutionAttempt().getAttemptId(), + ExecutionState.RUNNING)); + } + + private void failVertex(final ExecutionVertex onlyExecutionVertex) { + onlyExecutionVertex.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); + manualMainThreadExecutor.triggerAll(); + } + + private static JobGraph createStreamingJobGraph() { + final JobVertex v1 = new JobVertex("vertex1"); + v1.setInvokableClass(AbstractInvokable.class); + + final JobGraph jobGraph = new JobGraph(v1); + jobGraph.setScheduleMode(ScheduleMode.EAGER); + + return jobGraph; + } + + private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception { + final ExecutionGraph executionGraph = new ExecutionGraph( + new DummyJobInformation( + jobGraph.getJobID(), + jobGraph.getName()), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + AkkaUtils.getDefaultTimeout(), + new InfiniteDelayRestartStrategy(10), + AdaptedRestartPipelinedRegionStrategyNG::new, + new SimpleSlotProvider(jobGraph.getJobID(), 1)); + + executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + enableCheckpointing(executionGraph); + executionGraph.setScheduleMode(jobGraph.getScheduleMode()); + executionGraph.start(componentMainThreadExecutor); + executionGraph.scheduleForExecution(); + manualMainThreadExecutor.triggerAll(); + return executionGraph; + } + + private static void enableCheckpointing(final ExecutionGraph executionGraph) { + final List jobVertices = new ArrayList<>(executionGraph.getAllVertices().values()); + final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration( + Long.MAX_VALUE, + Long.MAX_VALUE, + 0, + 1, + CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION, + true, + false, + 0); + + executionGraph.enableCheckpointing( + checkpointCoordinatorConfiguration, + jobVertices, + jobVertices, + jobVertices, + Collections.emptyList(), + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + new MemoryStateBackend(), + new CheckpointStatsTracker( + 0, + jobVertices, + checkpointCoordinatorConfiguration, + new UnregisteredMetricsGroup())); + } + + private static void assertNoPendingCheckpoints(final CheckpointCoordinator checkpointCoordinator) { + assertThat(checkpointCoordinator.getPendingCheckpoints().entrySet(), is(empty())); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java new file mode 100644 index 0000000000000..20fb083792e87 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + + private static final int DEFAULT_PARALLELISM = 2; + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private TestRestartStrategy manuallyTriggeredRestartStrategy; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread()); + manuallyTriggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); + } + + /** + * Tests that 2 concurrent region failovers can lead to a properly vertex state. + *
+	 *     (v11) -+-> (v21)
+	 *            x
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *       (blocking)
+	 * 
+ */ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final ExecutionGraph eg = createExecutionGraph(); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); + + final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); + final ExecutionVertex ev11 = vertexIterator.next(); + final ExecutionVertex ev12 = vertexIterator.next(); + final ExecutionVertex ev21 = vertexIterator.next(); + final ExecutionVertex ev22 = vertexIterator.next(); + + // start job scheduling + eg.scheduleForExecution(); + manualMainThreadExecutor.triggerAll(); + + // fail ev11 to trigger region failover of {ev11}, {ev21}, {ev22} + ev11.getCurrentExecutionAttempt().fail(new Exception("task failure 1")); + manualMainThreadExecutor.triggerAll(); + assertEquals(ExecutionState.FAILED, ev11.getExecutionState()); + assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState()); + assertEquals(ExecutionState.CANCELED, ev21.getExecutionState()); + assertEquals(ExecutionState.CANCELED, ev22.getExecutionState()); + + // fail ev12 to trigger region failover of {ev12}, {ev21}, {ev22} + ev12.getCurrentExecutionAttempt().fail(new Exception("task failure 2")); + manualMainThreadExecutor.triggerAll(); + assertEquals(ExecutionState.FAILED, ev11.getExecutionState()); + assertEquals(ExecutionState.FAILED, ev12.getExecutionState()); + assertEquals(ExecutionState.CANCELED, ev21.getExecutionState()); + assertEquals(ExecutionState.CANCELED, ev22.getExecutionState()); + + // complete region failover blocker to trigger region failover recovery + failoverStrategy.getBlockerFuture().complete(null); + manualMainThreadExecutor.triggerAll(); + + // verify that all tasks are recovered and no task is restarted more than once + assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState()); + assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState()); + assertEquals(ExecutionState.CREATED, ev21.getExecutionState()); + assertEquals(ExecutionState.CREATED, ev22.getExecutionState()); + assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, ev12.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber()); + } + + /** + * Tests that a global failover will take precedence over local failovers. + *
+	 *     (v11) -+-> (v21)
+	 *            x
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *       (blocking)
+	 * 
+ */ + @Test + public void testRegionFailoverInterruptedByGlobalFailover() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause a task failure and delay the local recovery action via the manual executor + // - cause a global failure + // - resume in local recovery action + // - validate that the local recovery does not restart tasks + + final ExecutionGraph eg = createExecutionGraph(); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); + + final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); + final ExecutionVertex ev11 = vertexIterator.next(); + final ExecutionVertex ev12 = vertexIterator.next(); + final ExecutionVertex ev21 = vertexIterator.next(); + final ExecutionVertex ev22 = vertexIterator.next(); + + // start job scheduling + eg.scheduleForExecution(); + manualMainThreadExecutor.triggerAll(); + + // fail ev11 to trigger region failover of {ev11}, {ev21}, {ev22} + ev11.getCurrentExecutionAttempt().fail(new Exception("task failure")); + manualMainThreadExecutor.triggerAll(); + assertEquals(JobStatus.RUNNING, eg.getState()); + assertEquals(ExecutionState.FAILED, ev11.getExecutionState()); + assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState()); + assertEquals(ExecutionState.CANCELED, ev21.getExecutionState()); + assertEquals(ExecutionState.CANCELED, ev22.getExecutionState()); + + // trigger global failover cancelling and immediately recovery + eg.failGlobal(new Exception("Test global failure")); + ev12.getCurrentExecutionAttempt().completeCancelling(); + manuallyTriggeredRestartStrategy.triggerNextAction(); + manualMainThreadExecutor.triggerAll(); + + // verify the job state and vertex attempt number + assertEquals(2, eg.getGlobalModVersion()); + assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, ev12.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber()); + + // complete region failover blocker to trigger region failover + failoverStrategy.getBlockerFuture().complete(null); + manualMainThreadExecutor.triggerAll(); + + // verify that no task is restarted by region failover + assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState()); + assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState()); + assertEquals(ExecutionState.CREATED, ev21.getExecutionState()); + assertEquals(ExecutionState.CREATED, ev22.getExecutionState()); + assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, ev12.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber()); + } + + @Test + public void testSkipFailoverIfExecutionStateIsNotRunning() throws Exception { + final ExecutionGraph executionGraph = createExecutionGraph(); + + final Iterator vertexIterator = executionGraph.getAllExecutionVertices().iterator(); + final ExecutionVertex firstVertex = vertexIterator.next(); + + executionGraph.cancel(); + + final FailoverStrategy failoverStrategy = executionGraph.getFailoverStrategy(); + failoverStrategy.onTaskFailure(firstVertex.getCurrentExecutionAttempt(), new Exception("Test Exception")); + manualMainThreadExecutor.triggerAll(); + + assertEquals(ExecutionState.CANCELED, firstVertex.getExecutionState()); + } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + /** + * Creating a sample ExecutionGraph for testing with topology as below. + *
+	 *     (v11) -+-> (v21)
+	 *            x
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *       (blocking)
+	 * 
+ * 4 regions. Each consists of one individual execution vertex. + */ + private ExecutionGraph createExecutionGraph() throws Exception { + + final JobInformation jobInformation = new DummyJobInformation(TEST_JOB_ID, "test job"); + final SimpleSlotProvider slotProvider = new SimpleSlotProvider(TEST_JOB_ID, DEFAULT_PARALLELISM); + + final Time timeout = Time.seconds(10L); + final ExecutionGraph graph = new ExecutionGraph( + jobInformation, + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + timeout, + manuallyTriggeredRestartStrategy, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + slotProvider, + getClass().getClassLoader(), + VoidBlobWriter.getInstance(), + timeout); + + JobVertex v1 = new JobVertex("vertex1"); + v1.setInvokableClass(NoOpInvokable.class); + v1.setParallelism(DEFAULT_PARALLELISM); + + JobVertex v2 = new JobVertex("vertex2"); + v2.setInvokableClass(NoOpInvokable.class); + v2.setParallelism(DEFAULT_PARALLELISM); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + JobGraph jg = new JobGraph(TEST_JOB_ID, "testjob", v1, v2); + graph.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources()); + + graph.start(componentMainThreadExecutor); + + return graph; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java new file mode 100644 index 0000000000000..c4c1c32077412 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java @@ -0,0 +1,459 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private FailingSlotProviderDecorator slotProvider; + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread()); + slotProvider = new FailingSlotProviderDecorator(new SimpleSlotProvider(TEST_JOB_ID, 14)); + } + + /** + * Tests for region failover for job in EAGER mode. + * This applies to streaming job, with no BLOCKING edge. + *
+	 *     (v11) ---> (v21)
+	 *
+	 *     (v12) ---> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *       (pipelined)
+	 * 
+ */ + @Test + public void testRegionFailoverInEagerMode() throws Exception { + // create a streaming job graph with EAGER schedule mode + final JobGraph jobGraph = createStreamingJobGraph(); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); + final ExecutionVertex ev11 = vertexIterator.next(); + final ExecutionVertex ev12 = vertexIterator.next(); + final ExecutionVertex ev21 = vertexIterator.next(); + final ExecutionVertex ev22 = vertexIterator.next(); + + // trigger task failure of ev11 + // vertices { ev11, ev21 } should be affected + ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); + manualMainThreadExecutor.triggerAll(); + + // verify vertex states and complete cancellation + assertVertexInState(ExecutionState.FAILED, ev11); + assertVertexInState(ExecutionState.DEPLOYING, ev12); + assertVertexInState(ExecutionState.CANCELING, ev21); + assertVertexInState(ExecutionState.DEPLOYING, ev22); + ev21.getCurrentExecutionAttempt().completeCancelling(); + manualMainThreadExecutor.triggerAll(); + + // verify vertex states + // in eager mode, all affected vertices should be scheduled in failover + assertVertexInState(ExecutionState.DEPLOYING, ev11); + assertVertexInState(ExecutionState.DEPLOYING, ev12); + assertVertexInState(ExecutionState.DEPLOYING, ev21); + assertVertexInState(ExecutionState.DEPLOYING, ev22); + + // verify attempt number + assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(0, ev12.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(0, ev22.getCurrentExecutionAttempt().getAttemptNumber()); + } + + /** + * Tests for scenario where a task fails for its own error, in which case the + * region containing the failed task and its consumer regions should be restarted. + *
+	 *     (v11) -+-> (v21)
+	 *            x
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *        (blocking)
+	 * 
+ */ + @Test + public void testRegionFailoverForRegionInternalErrorsInLazyMode() throws Exception { + final JobGraph jobGraph = createBatchJobGraph(); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); + final ExecutionVertex ev11 = vertexIterator.next(); + final ExecutionVertex ev12 = vertexIterator.next(); + final ExecutionVertex ev21 = vertexIterator.next(); + final ExecutionVertex ev22 = vertexIterator.next(); + + // trigger task failure of ev11 + // regions {ev11}, {ev21}, {ev22} should be affected + ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); + manualMainThreadExecutor.triggerAll(); + + // verify vertex states + // only vertices with consumable inputs can be scheduled + assertVertexInState(ExecutionState.DEPLOYING, ev11); + assertVertexInState(ExecutionState.DEPLOYING, ev12); + assertVertexInState(ExecutionState.CREATED, ev21); + assertVertexInState(ExecutionState.CREATED, ev22); + + // verify attempt number + assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(0, ev12.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber()); + assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber()); + } + + /** + * Tests that the failure is properly propagated to underlying strategy + * to calculate tasks to restart. + *
+	 *     (v11) -+-> (v21)
+	 *            x
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *        (blocking)
+	 * 
+ */ + @Test + public void testFailurePropagationToUnderlyingStrategy() throws Exception { + final JobGraph jobGraph = createBatchJobGraph(); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + + final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); + final ExecutionVertex ev11 = vertexIterator.next(); + final ExecutionVertex ev12 = vertexIterator.next(); + final ExecutionVertex ev21 = vertexIterator.next(); + final ExecutionVertex ev22 = vertexIterator.next(); + + // finish upstream regions to trigger scheduling of downstream regions + ev11.getCurrentExecutionAttempt().markFinished(); + ev12.getCurrentExecutionAttempt().markFinished(); + + // trigger task failure of ev21 on consuming data from ev11 + Exception taskFailureCause = new PartitionConnectionException( + new ResultPartitionID( + ev11.getProducedPartitions().keySet().iterator().next(), + ev11.getCurrentExecutionAttempt().getAttemptId()), + new Exception("Test failure")); + ev21.getCurrentExecutionAttempt().fail(taskFailureCause); + manualMainThreadExecutor.triggerAll(); + + assertThat(failoverStrategy.getLastTasksToCancel(), + containsInAnyOrder(ev11.getID(), ev21.getID(), ev22.getID())); + } + + /** + * Tests that when a task fail, and restart strategy doesn't support restarting, the job will go to failed. + */ + @Test + public void testNoRestart() throws Exception { + final JobGraph jobGraph = createBatchJobGraph(); + final NoRestartStrategy restartStrategy = new NoRestartStrategy(); + final ExecutionGraph eg = createExecutionGraph(jobGraph, restartStrategy); + + final ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); + + ev.fail(new Exception("Test Exception")); + + for (ExecutionVertex evs : eg.getAllExecutionVertices()) { + evs.getCurrentExecutionAttempt().completeCancelling(); + } + + manualMainThreadExecutor.triggerAll(); + + assertEquals(JobStatus.FAILED, eg.getState()); + } + + @Test + public void testFailGlobalIfErrorOnRestartingTasks() throws Exception { + final JobGraph jobGraph = createStreamingJobGraph(); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); + final ExecutionVertex ev11 = vertexIterator.next(); + final ExecutionVertex ev12 = vertexIterator.next(); + final ExecutionVertex ev21 = vertexIterator.next(); + final ExecutionVertex ev22 = vertexIterator.next(); + + final long globalModVersionBeforeFailure = eg.getGlobalModVersion(); + + slotProvider.setFailSlotAllocation(true); + ev11.fail(new Exception("Test Exception")); + completeCancelling(ev11, ev12, ev21, ev22); + + manualMainThreadExecutor.triggerAll(); + + final long globalModVersionAfterFailure = eg.getGlobalModVersion(); + + assertNotEquals(globalModVersionBeforeFailure, globalModVersionAfterFailure); + } + + // ------------------------------- Test Utils ----------------------------------------- + + /** + * Creating job graph as below (execution view). + * It's a representative of streaming job. + *
+	 *     (v11) -+-> (v21)
+	 *
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *       (pipelined)
+	 * 
+ * 2 regions. Each has 2 pipelined connected vertices. + */ + private JobGraph createStreamingJobGraph() { + final JobVertex v1 = new JobVertex("vertex1"); + final JobVertex v2 = new JobVertex("vertex2"); + + v1.setParallelism(2); + v2.setParallelism(2); + + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + + final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Testjob", v1, v2); + jobGraph.setScheduleMode(ScheduleMode.EAGER); + + return jobGraph; + } + + /** + * Creating job graph as below (execution view). + * It's a representative of batch job. + *
+	 *     (v11) -+-> (v21)
+	 *            x
+	 *     (v12) -+-> (v22)
+	 *
+	 *            ^
+	 *            |
+	 *        (blocking)
+	 * 
+ * 4 regions. Each consists of one individual vertex. + */ + private JobGraph createBatchJobGraph() { + final JobVertex v1 = new JobVertex("vertex1"); + final JobVertex v2 = new JobVertex("vertex2"); + + v1.setParallelism(2); + v2.setParallelism(2); + + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + final JobGraph jobGraph = new JobGraph(v1, v2); + jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES); + + return jobGraph; + } + + private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception { + return createExecutionGraph(jobGraph, new InfiniteDelayRestartStrategy(10)); + } + + private ExecutionGraph createExecutionGraph( + final JobGraph jobGraph, + final RestartStrategy restartStrategy) throws Exception { + + final ExecutionGraph eg = new ExecutionGraph( + new DummyJobInformation( + jobGraph.getJobID(), + jobGraph.getName()), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + AkkaUtils.getDefaultTimeout(), + restartStrategy, + TestAdaptedRestartPipelinedRegionStrategyNG::new, + slotProvider); + eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); + + eg.setScheduleMode(jobGraph.getScheduleMode()); + + eg.start(componentMainThreadExecutor); + eg.scheduleForExecution(); + manualMainThreadExecutor.triggerAll(); + + return eg; + } + + private static void assertVertexInState(final ExecutionState state, final ExecutionVertex vertex) { + assertEquals(state, vertex.getExecutionState()); + } + + private static void completeCancelling(ExecutionVertex... executionVertices) { + for (final ExecutionVertex executionVertex : executionVertices) { + executionVertex.getCurrentExecutionAttempt().completeCancelling(); + } + } + + /** + * Test implementation of the {@link AdaptedRestartPipelinedRegionStrategyNG} that makes it possible + * to control when the failover action is performed via {@link CompletableFuture}. + * It also exposes some internal state of {@link AdaptedRestartPipelinedRegionStrategyNG}. + */ + static class TestAdaptedRestartPipelinedRegionStrategyNG extends AdaptedRestartPipelinedRegionStrategyNG { + + private CompletableFuture blockerFuture; + + private Set lastTasksToRestart; + + TestAdaptedRestartPipelinedRegionStrategyNG(ExecutionGraph executionGraph) { + super(executionGraph); + this.blockerFuture = CompletableFuture.completedFuture(null); + } + + void setBlockerFuture(CompletableFuture blockerFuture) { + this.blockerFuture = blockerFuture; + } + + @Override + protected void restartTasks(final Set verticesToRestart) { + this.lastTasksToRestart = verticesToRestart; + super.restartTasks(verticesToRestart); + } + + @Override + protected CompletableFuture cancelTasks(final Set vertices) { + final List> terminationAndBlocker = Arrays.asList( + super.cancelTasks(vertices), + blockerFuture); + return FutureUtils.waitForAll(terminationAndBlocker); + } + + CompletableFuture getBlockerFuture() { + return blockerFuture; + } + + Set getLastTasksToCancel() { + return lastTasksToRestart; + } + } + + private static class FailingSlotProviderDecorator implements SlotProvider { + + private final SlotProvider delegate; + + private boolean failSlotAllocation = false; + + FailingSlotProviderDecorator(final SlotProvider delegate) { + this.delegate = checkNotNull(delegate); + } + + @Override + public CompletableFuture allocateSlot( + final SlotRequestId slotRequestId, + final ScheduledUnit scheduledUnit, + final SlotProfile slotProfile, + final boolean allowQueuedScheduling, + final Time allocationTimeout) { + if (failSlotAllocation) { + return FutureUtils.completedExceptionally(new TimeoutException("Expected")); + } + return delegate.allocateSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout); + } + + @Override + public void cancelSlotRequest( + final SlotRequestId slotRequestId, + @Nullable final SlotSharingGroupId slotSharingGroupId, + final Throwable cause) { + delegate.cancelSlotRequest(slotRequestId, slotSharingGroupId, cause); + } + + public void setFailSlotAllocation(final boolean failSlotAllocation) { + this.failSlotAllocation = failSlotAllocation; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 1e6be72f2bb9f..e78fa6f5e30d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -28,6 +28,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; @@ -68,6 +70,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -264,6 +267,30 @@ public void testFailWhileCanceling() throws Exception { } + @Test + public void testTaskFailingWhileGlobalFailing() throws Exception { + try (SlotPool slotPool = new SlotPoolImpl(TEST_JOB_ID)) { + final ExecutionGraph graph = TestingExecutionGraphBuilder.newBuilder() + .setRestartStrategy(new InfiniteDelayRestartStrategy()) + .setFailoverStrategyFactory(new TestFailoverStrategy.Factory()) + .buildAndScheduleForExecution(slotPool); + final TestFailoverStrategy failoverStrategy = (TestFailoverStrategy) graph.getFailoverStrategy(); + + // switch all tasks to running + for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices()) { + vertex.getCurrentExecutionAttempt().switchToRunning(); + } + + graph.failGlobal(new Exception("test")); + + graph.getAllExecutionVertices().iterator().next().fail(new Exception("Test task failure")); + + // no local failover should happen when in global failover cancelling + assertEquals(0, failoverStrategy.getLocalFailoverCount()); + } + + } + private void switchAllTasksToRunning(ExecutionGraph graph) { executeOperationForAllExecutions(graph, Execution::switchToRunning); } @@ -659,6 +686,7 @@ public void testFailureWhileRestarting() throws Exception { private static class TestingExecutionGraphBuilder { private RestartStrategy restartStrategy = new NoRestartStrategy(); + private FailoverStrategy.Factory failoverStrategyFactory = new RestartAllStrategy.Factory(); private JobGraph jobGraph = createJobGraph(); private int tasksNum = NUM_TASKS; private TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); @@ -668,6 +696,11 @@ private TestingExecutionGraphBuilder setRestartStrategy(RestartStrategy restartS return this; } + private TestingExecutionGraphBuilder setFailoverStrategyFactory(FailoverStrategy.Factory failoverStrategyFactory) { + this.failoverStrategyFactory = failoverStrategyFactory; + return this; + } + private TestingExecutionGraphBuilder setJobGraph(JobGraph jobGraph) { this.jobGraph = jobGraph; return this; @@ -689,7 +722,11 @@ private static TestingExecutionGraphBuilder newBuilder() { private ExecutionGraph buildAndScheduleForExecution(SlotPool slotPool) throws Exception { final Scheduler scheduler = createSchedulerWithSlots(tasksNum, slotPool, taskManagerLocation); - final ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, scheduler, jobGraph); + final ExecutionGraph eg = createSimpleExecutionGraph( + restartStrategy, + failoverStrategyFactory, + scheduler, + jobGraph); assertEquals(JobStatus.CREATED, eg.getState()); @@ -744,18 +781,32 @@ private static JobGraph createJobGraphToCancel() throws IOException { } private static ExecutionGraph createSimpleExecutionGraph( - RestartStrategy restartStrategy, SlotProvider slotProvider, JobGraph jobGraph) - throws IOException, JobException { + final RestartStrategy restartStrategy, + final SlotProvider slotProvider, + final JobGraph jobGraph) throws IOException, JobException { - ExecutionGraph executionGraph = new ExecutionGraph( + return createSimpleExecutionGraph(restartStrategy, new RestartAllStrategy.Factory(), slotProvider, jobGraph); + } + + private static ExecutionGraph createSimpleExecutionGraph( + final RestartStrategy restartStrategy, + final FailoverStrategy.Factory failoverStrategyFactory, + final SlotProvider slotProvider, + final JobGraph jobGraph) throws IOException, JobException { + + final ExecutionGraph executionGraph = new ExecutionGraph( + new JobInformation( + TEST_JOB_ID, + "Test job", + new SerializedValue<>(new ExecutionConfig()), + new Configuration(), + Collections.emptyList(), + Collections.emptyList()), TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), - TEST_JOB_ID, - "Test job", - new Configuration(), - new SerializedValue<>(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), restartStrategy, + failoverStrategyFactory, slotProvider); executionGraph.start(mainThreadExecutor); @@ -779,4 +830,45 @@ private void restartAfterFailure(ExecutionGraph eg) { finishAllVertices(eg); assertEquals(JobStatus.FINISHED, eg.getState()); } + + /** + * Test failover strategy which records local failover count. + */ + static class TestFailoverStrategy extends FailoverStrategy { + + private int localFailoverCount = 0; + + @Override + public void onTaskFailure(Execution taskExecution, Throwable cause) { + localFailoverCount++; + } + + @Override + public void notifyNewVertices(List newJobVerticesTopological) { + } + + @Override + public String getStrategyName() { + return "Test Failover Strategy"; + } + + int getLocalFailoverCount() { + return localFailoverCount; + } + + // ------------------------------------------------------------------------ + // factory + // ------------------------------------------------------------------------ + + /** + * Factory that instantiates the TestFailoverStrategy. + */ + public static class Factory implements FailoverStrategy.Factory { + + @Override + public FailoverStrategy create(ExecutionGraph executionGraph) { + return new TestFailoverStrategy(); + } + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java index b9c132227fd66..f17dddcfb0614 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java @@ -627,7 +627,7 @@ private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws JobExcepti final Configuration jobManagerConfig = new Configuration(); jobManagerConfig.setString( JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, - FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME); + FailoverStrategyLoader.LEGACY_PIPELINED_REGION_RESTART_STRATEGY_NAME); final Time timeout = Time.seconds(10L); return ExecutionGraphBuilder.buildGraph( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java index 81f5e38197d5d..106322222ad96 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategyBuildingTest.java @@ -524,7 +524,7 @@ public void testPipelinedOneToOneTopologyWithCoLocation() throws Exception { // utilities // ------------------------------------------------------------------------ - private static void assertSameRegion(FailoverRegion ...regions) { + public static void assertSameRegion(FailoverRegion ...regions) { checkNotNull(regions); for (int i = 0; i < regions.length; i++) { for (int j = i + 1; i < regions.length; i++) { @@ -533,7 +533,7 @@ private static void assertSameRegion(FailoverRegion ...regions) { } } - private static void assertDistinctRegions(FailoverRegion ...regions) { + public static void assertDistinctRegions(FailoverRegion ...regions) { checkNotNull(regions); for (int i = 0; i < regions.length; i++) { for (int j = i + 1; j < regions.length; j++) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java new file mode 100644 index 0000000000000..9ba26eaf8e4da --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersionerTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link ExecutionVertexVersioner}. + */ +public class ExecutionVertexVersionerTest extends TestLogger { + + private static final ExecutionVertexID TEST_EXECUTION_VERTEX_ID1 = new ExecutionVertexID(new JobVertexID(), 0); + private static final ExecutionVertexID TEST_EXECUTION_VERTEX_ID2 = new ExecutionVertexID(new JobVertexID(), 0); + private static final Collection TEST_ALL_EXECUTION_VERTEX_IDS = Arrays.asList( + TEST_EXECUTION_VERTEX_ID1, + TEST_EXECUTION_VERTEX_ID2); + + private ExecutionVertexVersioner executionVertexVersioner; + + @Before + public void setUp() { + executionVertexVersioner = new ExecutionVertexVersioner(); + } + + @Test + public void isModifiedReturnsFalseIfVertexUnmodified() { + final ExecutionVertexVersion executionVertexVersion = + executionVertexVersioner.recordModification(TEST_EXECUTION_VERTEX_ID1); + assertFalse(executionVertexVersioner.isModified(executionVertexVersion)); + } + + @Test + public void isModifiedReturnsTrueIfVertexIsModified() { + final ExecutionVertexVersion executionVertexVersion = + executionVertexVersioner.recordModification(TEST_EXECUTION_VERTEX_ID1); + executionVertexVersioner.recordModification(TEST_EXECUTION_VERTEX_ID1); + assertTrue(executionVertexVersioner.isModified(executionVertexVersion)); + } + + @Test + public void throwsExceptionIfVertexWasNeverModified() { + try { + executionVertexVersioner.isModified(new ExecutionVertexVersion(TEST_EXECUTION_VERTEX_ID1, 0)); + fail("Expected exception not thrown"); + } catch (final IllegalStateException e) { + assertThat(e.getMessage(), containsString("Execution vertex " + + TEST_EXECUTION_VERTEX_ID1 + " does not have a recorded version")); + } + } + + @Test + public void getUnmodifiedVerticesAllVerticesModified() { + final Set executionVertexVersions = new HashSet<>( + executionVertexVersioner.recordVertexModifications(TEST_ALL_EXECUTION_VERTEX_IDS).values()); + executionVertexVersioner.recordVertexModifications(TEST_ALL_EXECUTION_VERTEX_IDS); + + final Set unmodifiedExecutionVertices = + executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + + assertThat(unmodifiedExecutionVertices, is(empty())); + } + + @Test + public void getUnmodifiedVerticesNoVertexModified() { + final Set executionVertexVersions = new HashSet<>( + executionVertexVersioner.recordVertexModifications(TEST_ALL_EXECUTION_VERTEX_IDS).values()); + + final Set unmodifiedExecutionVertices = + executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + + assertThat(unmodifiedExecutionVertices, containsInAnyOrder(TEST_EXECUTION_VERTEX_ID1, TEST_EXECUTION_VERTEX_ID2)); + } + + @Test + public void getUnmodifiedVerticesPartOfVerticesModified() { + final Set executionVertexVersions = new HashSet<>( + executionVertexVersioner.recordVertexModifications(TEST_ALL_EXECUTION_VERTEX_IDS).values()); + executionVertexVersioner.recordModification(TEST_EXECUTION_VERTEX_ID1); + + final Set unmodifiedExecutionVertices = + executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + + assertThat(unmodifiedExecutionVertices, containsInAnyOrder(TEST_EXECUTION_VERTEX_ID2)); + } +}