diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index 22c5cf00ba01d..a3540412690a8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -82,7 +82,7 @@ import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertFalse; @RunWith(Parameterized.class) @Category({IntegrationTest.class}) @@ -135,8 +135,13 @@ public static Collection data() { private final static String CONSUMER_GROUP_ID = "readCommitted"; private final static String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic"; private final static String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic"; + private final static String APP_DIR_1 = "appDir1"; + private final static String APP_DIR_2 = "appDir2"; + private final static String UNEXPECTED_EXCEPTION_MSG = "Fail the test since we got an unexpected exception, or " + + "there are too many exceptions thrown, please check standard error log for more info."; private final String storeName = "store"; + private final StableAssignmentListener assignmentListener = new StableAssignmentListener(); private final AtomicBoolean errorInjectedClient1 = new AtomicBoolean(false); @@ -147,9 +152,15 @@ public static Collection data() { private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1); private final AtomicInteger commitRequested = new AtomicInteger(0); - private Throwable uncaughtException; - private int testNumber = 0; + private Map exceptionCounts = new HashMap() { + { + put(APP_DIR_1, 0); + put(APP_DIR_2, 0); + } + }; + + private volatile boolean hasUnexpectedError = false; @Before public void createTopics() throws Exception { @@ -235,7 +246,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { try { // phase 1: start both clients - streams1Alpha = getKafkaStreams("appDir1", StreamsConfig.EXACTLY_ONCE); + streams1Alpha = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE); streams1Alpha.setStateListener( (newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState)) ); @@ -246,7 +257,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS); waitForRunning(stateTransitions1); - streams2Alpha = getKafkaStreams("appDir2", StreamsConfig.EXACTLY_ONCE); + streams2Alpha = getKafkaStreams(APP_DIR_2, StreamsConfig.EXACTLY_ONCE); streams2Alpha.setStateListener( (newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState)) ); @@ -389,6 +400,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { stateTransitions1.clear(); streams1Alpha.close(); waitForStateTransition(stateTransitions1, CLOSE_CRASHED); + assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); } // phase 5: (restart first client) @@ -411,7 +423,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { commitRequested.set(0); stateTransitions1.clear(); stateTransitions2.clear(); - streams1Beta = getKafkaStreams("appDir1", StreamsConfig.EXACTLY_ONCE_BETA); + streams1Beta = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_BETA); streams1Beta.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState))); assignmentListener.prepareForRebalance(); streams1Beta.start(); @@ -527,6 +539,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { stateTransitions2.clear(); streams2Alpha.close(); waitForStateTransition(stateTransitions2, CLOSE_CRASHED); + assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); final List> expectedCommittedResultAfterFailure = computeExpectedResult(uncommittedInputDataAfterFirstUpgrade, committedState); @@ -555,7 +568,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { commitCounterClient2.set(-1); stateTransitions1.clear(); stateTransitions2.clear(); - streams2AlphaTwo = getKafkaStreams("appDir2", StreamsConfig.EXACTLY_ONCE); + streams2AlphaTwo = getKafkaStreams(APP_DIR_2, StreamsConfig.EXACTLY_ONCE); streams2AlphaTwo.setStateListener( (newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState)) ); @@ -617,6 +630,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { stateTransitions1.clear(); streams1Beta.close(); waitForStateTransition(stateTransitions1, CLOSE_CRASHED); + assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); final List> expectedCommittedResultAfterFailure = computeExpectedResult(uncommittedInputDataBetweenUpgrade, committedState); @@ -626,7 +640,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { // 7c. restart the first client in eos-beta mode and wait until rebalance stabilizes stateTransitions1.clear(); stateTransitions2.clear(); - streams1BetaTwo = getKafkaStreams("appDir1", StreamsConfig.EXACTLY_ONCE_BETA); + streams1BetaTwo = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_BETA); streams1BetaTwo.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState))); assignmentListener.prepareForRebalance(); streams1BetaTwo.start(); @@ -738,6 +752,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { stateTransitions2.clear(); streams2AlphaTwo.close(); waitForStateTransition(stateTransitions2, CLOSE_CRASHED); + assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError); } // phase 10: (restart second client) @@ -759,7 +774,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception { commitRequested.set(0); stateTransitions1.clear(); stateTransitions2.clear(); - streams2Beta = getKafkaStreams("appDir1", StreamsConfig.EXACTLY_ONCE_BETA); + streams2Beta = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_BETA); streams2Beta.setStateListener( (newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState)) ); @@ -869,7 +884,7 @@ public void init(final ProcessorContext context) { this.context = context; state = (KeyValueStore) context.getStateStore(storeName); final String clientId = context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString(); - if ("appDir1".equals(clientId)) { + if (APP_DIR_1.equals(clientId)) { crash = errorInjectedClient1; sharedCommit = commitCounterClient1; } else { @@ -938,11 +953,22 @@ public void close() {} final KafkaStreams streams = new KafkaStreams(builder.build(), config, new TestKafkaClientSupplier()); streams.setUncaughtExceptionHandler((t, e) -> { - if (uncaughtException != null) { + if (!injectError) { + // we don't expect any exception thrown in stop case e.printStackTrace(System.err); - fail("Should only get one uncaught exception from Streams."); + hasUnexpectedError = true; + } else { + int exceptionCount = (int) exceptionCounts.get(appDir); + // should only have our injected exception or commit exception, and 2 exceptions for each stream + if (++exceptionCount > 2 || !(e instanceof RuntimeException) || + !(e.getMessage().contains("test exception"))) { + // The exception won't cause the test fail since we actually "expected" exception thrown and failed the stream. + // So, log to stderr for debugging when the exception is not what we expected, and fail in the main thread + e.printStackTrace(System.err); + hasUnexpectedError = true; + } + exceptionCounts.put(appDir, exceptionCount); } - uncaughtException = e; }); return streams; @@ -963,7 +989,7 @@ private void waitForStateTransition(final List observed.equals(expected), MAX_WAIT_TIME_MS, - () -> "Client did not startup on time. Observers transitions: " + observed + () -> "Client did not have the expected state transition on time. Observers transitions: " + observed ); } @@ -1143,7 +1169,7 @@ private class ErrorInjector extends KafkaProducer { public ErrorInjector(final Map configs) { super(configs, new ByteArraySerializer(), new ByteArraySerializer()); final String clientId = configs.get(ProducerConfig.CLIENT_ID_CONFIG).toString(); - if (clientId.contains("appDir1")) { + if (clientId.contains(APP_DIR_1)) { crash = commitErrorInjectedClient1; } else { crash = commitErrorInjectedClient2; @@ -1156,7 +1182,7 @@ public void commitTransaction() { if (!crash.compareAndSet(true, false)) { super.commitTransaction(); } else { - throw new RuntimeException("Injected producer commit exception."); + throw new RuntimeException("Injected producer commit test exception."); } } }