From 304d018d7738e10d2a61c1447b69c1dbd27d55c8 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Fri, 23 Apr 2021 17:17:01 +0200 Subject: [PATCH] [FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase work with Adaptive Scheduler. The test previously relied on an implicit contract that instances of OperatorCoordinators are never recreated on the same JobManager. That implicit contract is no longer true with the Adaptive Scheduler. This change adjusts the test to no longer make that assumption. --- .../CoordinatorEventsExactlyOnceITCase.java | 112 ++++++++++++++---- 1 file changed, 86 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java index edc9edeed4f36..2337115564c36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java @@ -64,11 +64,13 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -169,6 +171,9 @@ public static void shutdownMiniCluster() throws Exception { @Test public void test() throws Exception { + // this captures variables communicated across instances, recoveries, etc. + TestScript.reset(); + final int numEvents1 = 200; final int numEvents2 = 5; final int delay1 = 1; @@ -296,19 +301,23 @@ private static final class EventSendingCoordinator implements OperatorCoordinato private final int delay; private final int maxNumber; + private final int failAtMessage; private int nextNumber; private CompletableFuture requestedCheckpoint; private CompletableFuture nextToComplete; - private final int failAtMessage; - private boolean failedBefore; - - private final ArrayDeque recoveredTaskRunning = new ArrayDeque<>(); - private SubtaskGateway subtaskGateway; private boolean workLoopRunning; + /** + * This contains all variables that are necessary to track the progress of the test, and + * which need to be tracked across instances of this coordinator (some scheduler + * implementations may re-instantiate the ExecutionGraph and the coordinators around global + * failures). + */ + private final TestScript testScript; + private EventSendingCoordinator(Context context, String name, int numEvents, int delay) { checkArgument(delay > 0); checkArgument(numEvents >= 3); @@ -316,6 +325,9 @@ private EventSendingCoordinator(Context context, String name, int numEvents, int this.context = context; this.maxNumber = numEvents; this.delay = delay; + + this.testScript = TestScript.getForOperator(name); + this.mailboxExecutor = Executors.newSingleThreadExecutor( new DispatcherThreadFactory( @@ -349,17 +361,12 @@ public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exc String.format("Don't recognize event '%s' from task %d.", event, subtask)); } - // We complete all events that were enqueued. We may need to complete - // multiple ones here, because it can happen that after a failure no real recovery - // happens that results in an event being sent (and this method being called), but that - // immediately another failure comes, triggered by the other operator coordinator (or - // its task). - synchronized (recoveredTaskRunning) { - for (CountDownLatch latch : recoveredTaskRunning) { - latch.countDown(); - } - recoveredTaskRunning.clear(); - } + // this unblocks all the delayed actions that where kicked off while the previous + // task was still running (if there was a previous task). this is part of simulating + // the extreme race where the coordinator thread stalls for so long that a new + // task execution attempt gets deployed before the last events targeted at the old task + // where sent. + testScript.signalRecoveredTaskReady(); // first, we hand this over to the mailbox thread, so we preserve order on operations, // even if the action is only to do a thread safe scheduling into the scheduledExecutor @@ -375,13 +382,13 @@ public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exc @Override public void subtaskFailed(int subtask, @Nullable Throwable reason) { - // we need to create and enqueue this outside the mailbox, so that the - // enqueuing is strictly ordered with the completion (which also happens outside - // the mail box executor). + // we need to create and register this outside the mailbox so that the + // registration is not affected by the artificial stall on the mailbox, but happens + // strictly before the tasks are restored and the operator events are received (to + // trigger the latches) which also happens outside the mailbox. + final CountDownLatch successorIsRunning = new CountDownLatch(1); - synchronized (recoveredTaskRunning) { - recoveredTaskRunning.addLast(successorIsRunning); - } + testScript.registerHookToNotifyAfterTaskRecovered(successorIsRunning); // simulate a heavy thread race here: the mailbox has a last enqueued action before the // cancellation is processed. But through a race, the mailbox freezes for a while and in @@ -483,7 +490,12 @@ private void executeSingleAction() { System.exit(-1); } - // schedule the next step + // schedule the next step. we do this here, after the previous step concluded, rather + // than scheduling a periodic action. Otherwise, the periodic task would enqueue many + // actions while the mailbox stalls and process them all instantaneously after the + // un-stalling. That wouldn't break the test, but it voids the differences in event + // sending delays between the different coordinators, which are part of provoking the + // situation that requires checkpoint alignment between the coordinators' event streams. scheduleSingleAction(); } @@ -515,8 +527,8 @@ private void sendNextEvent() { } private void checkWhetherToTriggerFailure() { - if (nextNumber >= failAtMessage && !failedBefore) { - failedBefore = true; + if (nextNumber >= failAtMessage && !testScript.hasAlreadyFailed()) { + testScript.recordHasFailed(); context.failJob(new Exception("test failure")); } } @@ -622,6 +634,54 @@ private void restoreState(List target) throws Exception { } } + // ------------------------------------------------------------------------ + // dedicated class to hold the "test script" + // ------------------------------------------------------------------------ + + private static final class TestScript { + + private static final Map MAP_FOR_OPERATOR = new HashMap<>(); + + static TestScript getForOperator(String operatorName) { + return MAP_FOR_OPERATOR.computeIfAbsent(operatorName, (key) -> new TestScript()); + } + + static void reset() { + MAP_FOR_OPERATOR.clear(); + } + + private final Collection recoveredTaskRunning = new ArrayList<>(); + private boolean failedBefore; + + void recordHasFailed() { + this.failedBefore = true; + } + + boolean hasAlreadyFailed() { + return failedBefore; + } + + void registerHookToNotifyAfterTaskRecovered(CountDownLatch latch) { + synchronized (recoveredTaskRunning) { + recoveredTaskRunning.add(latch); + } + } + + void signalRecoveredTaskReady() { + // We complete all latches that were registered. We may need to complete + // multiple ones here, because it can happen that after a previous failure, the next + // executions fails immediately again, before even registering at the coordinator. + // in that case, we have multiple latches from multiple failure notifications waiting + // to be completed. + synchronized (recoveredTaskRunning) { + for (CountDownLatch latch : recoveredTaskRunning) { + latch.countDown(); + } + recoveredTaskRunning.clear(); + } + } + } + // ------------------------------------------------------------------------ // serialization shenannigans // ------------------------------------------------------------------------