From d3693f40fb7536822fd84d12a35b77e2562b93b3 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Fri, 11 Dec 2020 17:18:39 +0800 Subject: [PATCH 1/5] KAFKA-10017: fix 2 issues in EosBetaUpgradeIntegrationTest --- .../EosBetaUpgradeIntegrationTest.java | 53 +++++++++++++------ 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index 22c5cf00ba01d..9a559fb87c85e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -123,6 +123,13 @@ public static Collection data() { KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, KafkaStreams.State.NOT_RUNNING) ) ); + private static final List> REBALANCED_RUNNING = + Collections.unmodifiableList( + Arrays.asList( + KeyValue.pair(State.RUNNING, State.REBALANCING), + KeyValue.pair(State.REBALANCING, State.RUNNING) + ) + ); @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( @@ -147,8 +154,6 @@ public static Collection data() { private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1); private final AtomicInteger commitRequested = new AtomicInteger(0); - private Throwable uncaughtException; - private int testNumber = 0; @Before @@ -256,8 +261,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { streams2Alpha.cleanUp(); streams2Alpha.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForRunning(stateTransitions1); - waitForRunning(stateTransitions2); + waitForRebalancingRunning(stateTransitions1); + waitForRebalancingRunning(stateTransitions2); // in all phases, we write comments that assume that p-0/p-1 are assigned to the first client // and p-2/p-3 are assigned to the second client (in reality the assignment might be different though) @@ -416,8 +421,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { assignmentListener.prepareForRebalance(); streams1Beta.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForRunning(stateTransitions1); - waitForRunning(stateTransitions2); + waitForRebalancingRunning(stateTransitions1); + waitForRebalancingRunning(stateTransitions2); final Set newlyCommittedKeys; if (!injectError) { @@ -562,8 +567,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { assignmentListener.prepareForRebalance(); streams2AlphaTwo.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForRunning(stateTransitions1); - waitForRunning(stateTransitions2); + waitForRebalancingRunning(stateTransitions1); + waitForRebalancingRunning(stateTransitions2); // 7b. write third batch of input data final Set keysFirstClientBeta = keysFromInstance(streams1Beta); @@ -631,8 +636,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { assignmentListener.prepareForRebalance(); streams1BetaTwo.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForRunning(stateTransitions1); - waitForRunning(stateTransitions2); + waitForRebalancingRunning(stateTransitions1); + waitForRebalancingRunning(stateTransitions2); } // phase 8: (write partial last batch of data) @@ -766,8 +771,8 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { assignmentListener.prepareForRebalance(); streams2Beta.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForRunning(stateTransitions1); - waitForRunning(stateTransitions2); + waitForRebalancingRunning(stateTransitions1); + waitForRebalancingRunning(stateTransitions2); newlyCommittedKeys.clear(); if (!injectError) { @@ -938,11 +943,13 @@ public void close() {} final KafkaStreams streams = new KafkaStreams(builder.build(), config, new TestKafkaClientSupplier()); streams.setUncaughtExceptionHandler((t, e) -> { - if (uncaughtException != null) { + // should only have our injected exception or commit exception + if (!(e instanceof RuntimeException) && !(e.getMessage().contains("test exception"))) { + // The exception won't cause the test fail since we actually "expected" exception thrown and failed the stream. + // So, log to stderr for debugging when the exception is not what we expected e.printStackTrace(System.err); fail("Should only get one uncaught exception from Streams."); } - uncaughtException = e; }); return streams; @@ -956,6 +963,20 @@ private void waitForRunning(final List REBALANCING -> RUNNING because when new stream joined, we'll do 2 rebalancing: 1 for + // new member join, 1 for leader re-join group during Stable. So, if we only wait for Running, it might enter rebalancing soon + private void waitForRebalancingRunning(final List> observed) throws Exception { + waitForCondition( + () -> !observed.isEmpty() && observed.size() >= 2 && + Arrays.asList( + observed.get(observed.size() - 2), + observed.get(observed.size() - 1) + ).equals(REBALANCED_RUNNING), + MAX_WAIT_TIME_MS, + () -> "Client did not run from Rebalancing to Running on time. Observers transitions: " + observed + ); + } + private void waitForStateTransition(final List> observed, final List> expected) throws Exception { @@ -963,7 +984,7 @@ private void waitForStateTransition(final List observed.equals(expected), MAX_WAIT_TIME_MS, - () -> "Client did not startup on time. Observers transitions: " + observed + () -> "Client did not have the expected state transition on time. Observers transitions: " + observed ); } @@ -1156,7 +1177,7 @@ public void commitTransaction() { if (!crash.compareAndSet(true, false)) { super.commitTransaction(); } else { - throw new RuntimeException("Injected producer commit exception."); + throw new RuntimeException("Injected producer commit test exception."); } } } From ee037dd69633ce50ffdffd0fa77e47938458ff19 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Mon, 21 Dec 2020 15:45:59 +0800 Subject: [PATCH 2/5] KAFKA-10017: wait for specific number of rebalancing for new started stream --- .../EosBetaUpgradeIntegrationTest.java | 116 +++++++++++------- .../utils/IntegrationTestUtils.java | 14 ++- 2 files changed, 83 insertions(+), 47 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index 9a559fb87c85e..946f407861b12 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -82,7 +82,7 @@ import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertFalse; @RunWith(Parameterized.class) @Category({IntegrationTest.class}) @@ -123,13 +123,8 @@ public static Collection data() { KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, KafkaStreams.State.NOT_RUNNING) ) ); - private static final List> REBALANCED_RUNNING = - Collections.unmodifiableList( - Arrays.asList( - KeyValue.pair(State.RUNNING, State.REBALANCING), - KeyValue.pair(State.REBALANCING, State.RUNNING) - ) - ); + private static final KeyValue REBALANCED_RUNNING = + KeyValue.pair(State.REBALANCING, State.RUNNING); @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( @@ -142,8 +137,13 @@ public static Collection data() { private final static String CONSUMER_GROUP_ID = "readCommitted"; private final static String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic"; private final static String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic"; + private final static String APP_DIR_1 = "appDir1"; + private final static String APP_DIR_2 = "appDir2"; + private final static String UNEXPECTED_EXCEPTION_MSG = "Fail the test since we got an unexpected exception or" + + "there are too many exceptions thrown, please check standard error log for more info."; private final String storeName = "store"; + private final StableAssignmentListener assignmentListener = new StableAssignmentListener(); private final AtomicBoolean errorInjectedClient1 = new AtomicBoolean(false); @@ -155,6 +155,15 @@ public static Collection data() { private final AtomicInteger commitRequested = new AtomicInteger(0); private int testNumber = 0; + private boolean hasUnexpectedError = false; + private Map exceptionCounts = new HashMap() { + { + put(APP_DIR_1, 0); + put(APP_DIR_2, 0); + } + }; + private int prevNumAssignments = 0; + private int expectedNumAssignments = 0; @Before public void createTopics() throws Exception { @@ -240,7 +249,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { try { // phase 1: start both clients - streams1Alpha = getKafkaStreams("appDir1", StreamsConfig.EXACTLY_ONCE); + streams1Alpha = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE); streams1Alpha.setStateListener( (newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState)) ); @@ -251,18 +260,19 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); waitForRunning(stateTransitions1); - streams2Alpha = getKafkaStreams("appDir2", StreamsConfig.EXACTLY_ONCE); + streams2Alpha = getKafkaStreams(APP_DIR_2, StreamsConfig.EXACTLY_ONCE); streams2Alpha.setStateListener( (newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState)) ); stateTransitions1.clear(); - assignmentListener.prepareForRebalance(); + prevNumAssignments = assignmentListener.prepareForRebalance(); streams2Alpha.cleanUp(); streams2Alpha.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForRebalancingRunning(stateTransitions1); - waitForRebalancingRunning(stateTransitions2); + expectedNumAssignments = assignmentListener.numTotalAssignments() - prevNumAssignments; + waitForNumRebalancingToRunning(stateTransitions2, expectedNumAssignments); + waitForRunning(stateTransitions1); // in all phases, we write comments that assume that p-0/p-1 are assigned to the first client // and p-2/p-3 are assigned to the second client (in reality the assignment might be different though) @@ -394,6 +404,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { stateTransitions1.clear(); streams1Alpha.close(); waitForStateTransition(stateTransitions1, CLOSE_CRASHED); + assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); } // phase 5: (restart first client) @@ -416,13 +427,14 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { commitRequested.set(0); stateTransitions1.clear(); stateTransitions2.clear(); - streams1Beta = getKafkaStreams("appDir1", StreamsConfig.EXACTLY_ONCE_BETA); + streams1Beta = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_BETA); streams1Beta.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState))); - assignmentListener.prepareForRebalance(); + prevNumAssignments = assignmentListener.prepareForRebalance(); streams1Beta.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForRebalancingRunning(stateTransitions1); - waitForRebalancingRunning(stateTransitions2); + expectedNumAssignments = assignmentListener.numTotalAssignments() - prevNumAssignments; + waitForNumRebalancingToRunning(stateTransitions1, expectedNumAssignments); + waitForRunning(stateTransitions2); final Set newlyCommittedKeys; if (!injectError) { @@ -532,6 +544,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { stateTransitions2.clear(); streams2Alpha.close(); waitForStateTransition(stateTransitions2, CLOSE_CRASHED); + assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); final List> expectedCommittedResultAfterFailure = computeExpectedResult(uncommittedInputDataAfterFirstUpgrade, committedState); @@ -560,15 +573,16 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { commitCounterClient2.set(-1); stateTransitions1.clear(); stateTransitions2.clear(); - streams2AlphaTwo = getKafkaStreams("appDir2", StreamsConfig.EXACTLY_ONCE); + streams2AlphaTwo = getKafkaStreams(APP_DIR_2, StreamsConfig.EXACTLY_ONCE); streams2AlphaTwo.setStateListener( (newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState)) ); - assignmentListener.prepareForRebalance(); + prevNumAssignments = assignmentListener.prepareForRebalance(); streams2AlphaTwo.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForRebalancingRunning(stateTransitions1); - waitForRebalancingRunning(stateTransitions2); + expectedNumAssignments = assignmentListener.numTotalAssignments() - prevNumAssignments; + waitForNumRebalancingToRunning(stateTransitions2, expectedNumAssignments); + waitForRunning(stateTransitions1); // 7b. write third batch of input data final Set keysFirstClientBeta = keysFromInstance(streams1Beta); @@ -622,6 +636,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { stateTransitions1.clear(); streams1Beta.close(); waitForStateTransition(stateTransitions1, CLOSE_CRASHED); + assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); final List> expectedCommittedResultAfterFailure = computeExpectedResult(uncommittedInputDataBetweenUpgrade, committedState); @@ -631,13 +646,14 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { // 7c. restart the first client in eos-beta mode and wait until rebalance stabilizes stateTransitions1.clear(); stateTransitions2.clear(); - streams1BetaTwo = getKafkaStreams("appDir1", StreamsConfig.EXACTLY_ONCE_BETA); + streams1BetaTwo = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_BETA); streams1BetaTwo.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState))); - assignmentListener.prepareForRebalance(); + prevNumAssignments = assignmentListener.prepareForRebalance(); streams1BetaTwo.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForRebalancingRunning(stateTransitions1); - waitForRebalancingRunning(stateTransitions2); + expectedNumAssignments = assignmentListener.numTotalAssignments() - prevNumAssignments; + waitForNumRebalancingToRunning(stateTransitions1, expectedNumAssignments); + waitForRunning(stateTransitions2); } // phase 8: (write partial last batch of data) @@ -743,6 +759,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { stateTransitions2.clear(); streams2AlphaTwo.close(); waitForStateTransition(stateTransitions2, CLOSE_CRASHED); + assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); } // phase 10: (restart second client) @@ -764,15 +781,16 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { commitRequested.set(0); stateTransitions1.clear(); stateTransitions2.clear(); - streams2Beta = getKafkaStreams("appDir1", StreamsConfig.EXACTLY_ONCE_BETA); + streams2Beta = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_BETA); streams2Beta.setStateListener( (newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState)) ); - assignmentListener.prepareForRebalance(); + prevNumAssignments = assignmentListener.prepareForRebalance(); streams2Beta.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - waitForRebalancingRunning(stateTransitions1); - waitForRebalancingRunning(stateTransitions2); + expectedNumAssignments = assignmentListener.numTotalAssignments() - prevNumAssignments; + waitForNumRebalancingToRunning(stateTransitions2, expectedNumAssignments); + waitForRunning(stateTransitions1); newlyCommittedKeys.clear(); if (!injectError) { @@ -874,7 +892,7 @@ public void init(final ProcessorContext context) { this.context = context; state = (KeyValueStore) context.getStateStore(storeName); final String clientId = context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString(); - if ("appDir1".equals(clientId)) { + if (APP_DIR_1.equals(clientId)) { crash = errorInjectedClient1; sharedCommit = commitCounterClient1; } else { @@ -943,12 +961,21 @@ public void close() {} final KafkaStreams streams = new KafkaStreams(builder.build(), config, new TestKafkaClientSupplier()); streams.setUncaughtExceptionHandler((t, e) -> { - // should only have our injected exception or commit exception - if (!(e instanceof RuntimeException) && !(e.getMessage().contains("test exception"))) { - // The exception won't cause the test fail since we actually "expected" exception thrown and failed the stream. - // So, log to stderr for debugging when the exception is not what we expected + if (!injectError) { + // we don't expect any exception thrown in stop case e.printStackTrace(System.err); - fail("Should only get one uncaught exception from Streams."); + hasUnexpectedError = true; + } else { + int exceptionCount = (int) exceptionCounts.get(appDir); + // should only have our injected exception or commit exception, and 2 exceptions for each stream + if (++exceptionCount > 2 || !(e instanceof RuntimeException) || + !(e.getMessage().contains("test exception"))) { + // The exception won't cause the test fail since we actually "expected" exception thrown and failed the stream. + // So, log to stderr for debugging when the exception is not what we expected, and fail in the main thread + e.printStackTrace(System.err); + hasUnexpectedError = true; + } + exceptionCounts.put(appDir, exceptionCount); } }); @@ -963,17 +990,14 @@ private void waitForRunning(final List REBALANCING -> RUNNING because when new stream joined, we'll do 2 rebalancing: 1 for - // new member join, 1 for leader re-join group during Stable. So, if we only wait for Running, it might enter rebalancing soon - private void waitForRebalancingRunning(final List> observed) throws Exception { - waitForCondition( - () -> !observed.isEmpty() && observed.size() >= 2 && - Arrays.asList( - observed.get(observed.size() - 2), - observed.get(observed.size() - 1) - ).equals(REBALANCED_RUNNING), + // Wait for the numRebalancing of RUNNING> state transition because when new stream joined, + // we'll do multiple rebalancing. So, if we only wait for Running, it might enter rebalancing soon + private void waitForNumRebalancingToRunning(final List> observed, + final int numRebalancing) throws Exception { + waitForCondition(() -> !observed.isEmpty() && + observed.stream().filter(kv -> kv.equals(REBALANCED_RUNNING)).count() == numRebalancing, MAX_WAIT_TIME_MS, - () -> "Client did not run from Rebalancing to Running on time. Observers transitions: " + observed + () -> "Client did not run " + numRebalancing + " of Rebalancing to Running on time. Observers transitions: " + observed ); } @@ -1164,7 +1188,7 @@ private class ErrorInjector extends KafkaProducer { public ErrorInjector(final Map configs) { super(configs, new ByteArraySerializer(), new ByteArraySerializer()); final String clientId = configs.get(ProducerConfig.CLIENT_ID_CONFIG).toString(); - if (clientId.contains("appDir1")) { + if (clientId.contains(APP_DIR_1)) { crash = commitErrorInjectedClient1; } else { crash = commitErrorInjectedClient2; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index db659cf41dedb..04abf07a98c0f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -1270,15 +1270,24 @@ public static S getStore(final long waitTime, public static class StableAssignmentListener implements AssignmentListener { final AtomicInteger numStableAssignments = new AtomicInteger(0); + final AtomicInteger totalAssignments = new AtomicInteger(0); int nextExpectedNumStableAssignments; @Override public void onAssignmentComplete(final boolean stable) { + totalAssignments.incrementAndGet(); if (stable) { numStableAssignments.incrementAndGet(); } } + /** + * get the total number of assignments (unstable + stable) + */ + public int numTotalAssignments() { + return totalAssignments.get(); + } + public int numStableAssignments() { return numStableAssignments.get(); } @@ -1287,9 +1296,12 @@ public int numStableAssignments() { * Saves the current number of stable rebalances so that we can tell when the next stable assignment has been * reached. This should be called once for every invocation of {@link #waitForNextStableAssignment(long)}, * before the rebalance-triggering event. + * + * @return the total number of assignments so far */ - public void prepareForRebalance() { + public int prepareForRebalance() { nextExpectedNumStableAssignments = numStableAssignments.get() + 1; + return totalAssignments.get(); } /** From 89eb7efaf2cf5ca1908309dc543d8ab60346861e Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Wed, 23 Dec 2020 11:24:03 +0800 Subject: [PATCH 3/5] KAFKA-10017: address reviewer's comment to use volatile for different threads --- .../streams/integration/EosBetaUpgradeIntegrationTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index 946f407861b12..a497c001a5986 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -139,7 +139,7 @@ public static Collection data() { private final static String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic"; private final static String APP_DIR_1 = "appDir1"; private final static String APP_DIR_2 = "appDir2"; - private final static String UNEXPECTED_EXCEPTION_MSG = "Fail the test since we got an unexpected exception or" + + private final static String UNEXPECTED_EXCEPTION_MSG = "Fail the test since we got an unexpected exception or " + "there are too many exceptions thrown, please check standard error log for more info."; private final String storeName = "store"; @@ -155,7 +155,6 @@ public static Collection data() { private final AtomicInteger commitRequested = new AtomicInteger(0); private int testNumber = 0; - private boolean hasUnexpectedError = false; private Map exceptionCounts = new HashMap() { { put(APP_DIR_1, 0); @@ -165,6 +164,8 @@ public static Collection data() { private int prevNumAssignments = 0; private int expectedNumAssignments = 0; + private volatile boolean hasUnexpectedError = false; + @Before public void createTopics() throws Exception { applicationId = "appId-" + ++testNumber; From f2f096758b4b02a7426c88b44978787e54c98c95 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Fri, 15 Jan 2021 10:26:38 +0800 Subject: [PATCH 4/5] KAFKA-10017: fix the exception handler issue --- .../EosBetaUpgradeIntegrationTest.java | 40 +++++-------------- .../utils/IntegrationTestUtils.java | 14 +------ 2 files changed, 11 insertions(+), 43 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index a497c001a5986..f7f38dd45ebe5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -123,8 +123,6 @@ public static Collection data() { KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, KafkaStreams.State.NOT_RUNNING) ) ); - private static final KeyValue REBALANCED_RUNNING = - KeyValue.pair(State.REBALANCING, State.RUNNING); @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( @@ -161,8 +159,6 @@ public static Collection data() { put(APP_DIR_2, 0); } }; - private int prevNumAssignments = 0; - private int expectedNumAssignments = 0; private volatile boolean hasUnexpectedError = false; @@ -267,13 +263,12 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { ); stateTransitions1.clear(); - prevNumAssignments = assignmentListener.prepareForRebalance(); + assignmentListener.prepareForRebalance(); streams2Alpha.cleanUp(); streams2Alpha.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - expectedNumAssignments = assignmentListener.numTotalAssignments() - prevNumAssignments; - waitForNumRebalancingToRunning(stateTransitions2, expectedNumAssignments); waitForRunning(stateTransitions1); + waitForRunning(stateTransitions2); // in all phases, we write comments that assume that p-0/p-1 are assigned to the first client // and p-2/p-3 are assigned to the second client (in reality the assignment might be different though) @@ -430,11 +425,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { stateTransitions2.clear(); streams1Beta = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_BETA); streams1Beta.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState))); - prevNumAssignments = assignmentListener.prepareForRebalance(); + assignmentListener.prepareForRebalance(); streams1Beta.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - expectedNumAssignments = assignmentListener.numTotalAssignments() - prevNumAssignments; - waitForNumRebalancingToRunning(stateTransitions1, expectedNumAssignments); + waitForRunning(stateTransitions1); waitForRunning(stateTransitions2); final Set newlyCommittedKeys; @@ -578,12 +572,11 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { streams2AlphaTwo.setStateListener( (newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState)) ); - prevNumAssignments = assignmentListener.prepareForRebalance(); + assignmentListener.prepareForRebalance(); streams2AlphaTwo.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - expectedNumAssignments = assignmentListener.numTotalAssignments() - prevNumAssignments; - waitForNumRebalancingToRunning(stateTransitions2, expectedNumAssignments); waitForRunning(stateTransitions1); + waitForRunning(stateTransitions2); // 7b. write third batch of input data final Set keysFirstClientBeta = keysFromInstance(streams1Beta); @@ -649,11 +642,10 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { stateTransitions2.clear(); streams1BetaTwo = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_BETA); streams1BetaTwo.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState))); - prevNumAssignments = assignmentListener.prepareForRebalance(); + assignmentListener.prepareForRebalance(); streams1BetaTwo.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - expectedNumAssignments = assignmentListener.numTotalAssignments() - prevNumAssignments; - waitForNumRebalancingToRunning(stateTransitions1, expectedNumAssignments); + waitForRunning(stateTransitions1); waitForRunning(stateTransitions2); } @@ -786,12 +778,11 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { streams2Beta.setStateListener( (newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState)) ); - prevNumAssignments = assignmentListener.prepareForRebalance(); + assignmentListener.prepareForRebalance(); streams2Beta.start(); assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); - expectedNumAssignments = assignmentListener.numTotalAssignments() - prevNumAssignments; - waitForNumRebalancingToRunning(stateTransitions2, expectedNumAssignments); waitForRunning(stateTransitions1); + waitForRunning(stateTransitions2); newlyCommittedKeys.clear(); if (!injectError) { @@ -991,17 +982,6 @@ private void waitForRunning(final List RUNNING> state transition because when new stream joined, - // we'll do multiple rebalancing. So, if we only wait for Running, it might enter rebalancing soon - private void waitForNumRebalancingToRunning(final List> observed, - final int numRebalancing) throws Exception { - waitForCondition(() -> !observed.isEmpty() && - observed.stream().filter(kv -> kv.equals(REBALANCED_RUNNING)).count() == numRebalancing, - MAX_WAIT_TIME_MS, - () -> "Client did not run " + numRebalancing + " of Rebalancing to Running on time. Observers transitions: " + observed - ); - } - private void waitForStateTransition(final List> observed, final List> expected) throws Exception { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 04abf07a98c0f..db659cf41dedb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -1270,24 +1270,15 @@ public static S getStore(final long waitTime, public static class StableAssignmentListener implements AssignmentListener { final AtomicInteger numStableAssignments = new AtomicInteger(0); - final AtomicInteger totalAssignments = new AtomicInteger(0); int nextExpectedNumStableAssignments; @Override public void onAssignmentComplete(final boolean stable) { - totalAssignments.incrementAndGet(); if (stable) { numStableAssignments.incrementAndGet(); } } - /** - * get the total number of assignments (unstable + stable) - */ - public int numTotalAssignments() { - return totalAssignments.get(); - } - public int numStableAssignments() { return numStableAssignments.get(); } @@ -1296,12 +1287,9 @@ public int numStableAssignments() { * Saves the current number of stable rebalances so that we can tell when the next stable assignment has been * reached. This should be called once for every invocation of {@link #waitForNextStableAssignment(long)}, * before the rebalance-triggering event. - * - * @return the total number of assignments so far */ - public int prepareForRebalance() { + public void prepareForRebalance() { nextExpectedNumStableAssignments = numStableAssignments.get() + 1; - return totalAssignments.get(); } /** From 62cbfb648aed3469ba546b80f7de7060f0934583 Mon Sep 17 00:00:00 2001 From: Luke Chen <43372967+showuon@users.noreply.github.com> Date: Fri, 15 Jan 2021 19:35:25 +0800 Subject: [PATCH 5/5] trigger the jenkins test again --- .../streams/integration/EosBetaUpgradeIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index f7f38dd45ebe5..a3540412690a8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -137,7 +137,7 @@ public static Collection data() { private final static String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic"; private final static String APP_DIR_1 = "appDir1"; private final static String APP_DIR_2 = "appDir2"; - private final static String UNEXPECTED_EXCEPTION_MSG = "Fail the test since we got an unexpected exception or " + + private final static String UNEXPECTED_EXCEPTION_MSG = "Fail the test since we got an unexpected exception, or " + "there are too many exceptions thrown, please check standard error log for more info."; private final String storeName = "store";