Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertFalse;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
Expand Down Expand Up @@ -135,8 +135,13 @@ public static Collection<Boolean[]> data() {
private final static String CONSUMER_GROUP_ID = "readCommitted";
private final static String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
private final static String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
private final static String APP_DIR_1 = "appDir1";
private final static String APP_DIR_2 = "appDir2";
private final static String UNEXPECTED_EXCEPTION_MSG = "Fail the test since we got an unexpected exception, or " +
"there are too many exceptions thrown, please check standard error log for more info.";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing space between or (line above) and there

private final String storeName = "store";


private final StableAssignmentListener assignmentListener = new StableAssignmentListener();

private final AtomicBoolean errorInjectedClient1 = new AtomicBoolean(false);
Expand All @@ -147,9 +152,15 @@ public static Collection<Boolean[]> data() {
private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1);
private final AtomicInteger commitRequested = new AtomicInteger(0);

private Throwable uncaughtException;

private int testNumber = 0;
private Map<String, Integer> exceptionCounts = new HashMap<String, Integer>() {
{
put(APP_DIR_1, 0);
put(APP_DIR_2, 0);
}
};

private volatile boolean hasUnexpectedError = false;

@Before
public void createTopics() throws Exception {
Expand Down Expand Up @@ -235,7 +246,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {

try {
// phase 1: start both clients
streams1Alpha = getKafkaStreams("appDir1", StreamsConfig.EXACTLY_ONCE);
streams1Alpha = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE);
streams1Alpha.setStateListener(
(newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState))
);
Expand All @@ -246,7 +257,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
waitForRunning(stateTransitions1);

streams2Alpha = getKafkaStreams("appDir2", StreamsConfig.EXACTLY_ONCE);
streams2Alpha = getKafkaStreams(APP_DIR_2, StreamsConfig.EXACTLY_ONCE);
streams2Alpha.setStateListener(
(newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState))
);
Expand Down Expand Up @@ -389,6 +400,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
stateTransitions1.clear();
streams1Alpha.close();
waitForStateTransition(stateTransitions1, CLOSE_CRASHED);
assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError);
}

// phase 5: (restart first client)
Expand All @@ -411,7 +423,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
commitRequested.set(0);
stateTransitions1.clear();
stateTransitions2.clear();
streams1Beta = getKafkaStreams("appDir1", StreamsConfig.EXACTLY_ONCE_BETA);
streams1Beta = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_BETA);
streams1Beta.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState)));
assignmentListener.prepareForRebalance();
streams1Beta.start();
Expand Down Expand Up @@ -527,6 +539,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
stateTransitions2.clear();
streams2Alpha.close();
waitForStateTransition(stateTransitions2, CLOSE_CRASHED);
assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError);

final List<KeyValue<Long, Long>> expectedCommittedResultAfterFailure =
computeExpectedResult(uncommittedInputDataAfterFirstUpgrade, committedState);
Expand Down Expand Up @@ -555,7 +568,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
commitCounterClient2.set(-1);
stateTransitions1.clear();
stateTransitions2.clear();
streams2AlphaTwo = getKafkaStreams("appDir2", StreamsConfig.EXACTLY_ONCE);
streams2AlphaTwo = getKafkaStreams(APP_DIR_2, StreamsConfig.EXACTLY_ONCE);
streams2AlphaTwo.setStateListener(
(newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState))
);
Expand Down Expand Up @@ -617,6 +630,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
stateTransitions1.clear();
streams1Beta.close();
waitForStateTransition(stateTransitions1, CLOSE_CRASHED);
assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError);

final List<KeyValue<Long, Long>> expectedCommittedResultAfterFailure =
computeExpectedResult(uncommittedInputDataBetweenUpgrade, committedState);
Expand All @@ -626,7 +640,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
// 7c. restart the first client in eos-beta mode and wait until rebalance stabilizes
stateTransitions1.clear();
stateTransitions2.clear();
streams1BetaTwo = getKafkaStreams("appDir1", StreamsConfig.EXACTLY_ONCE_BETA);
streams1BetaTwo = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_BETA);
streams1BetaTwo.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState)));
assignmentListener.prepareForRebalance();
streams1BetaTwo.start();
Expand Down Expand Up @@ -738,6 +752,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
stateTransitions2.clear();
streams2AlphaTwo.close();
waitForStateTransition(stateTransitions2, CLOSE_CRASHED);
assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError);
}

// phase 10: (restart second client)
Expand All @@ -759,7 +774,7 @@ public void shouldUpgradeFromEosAlphaToEosBeta() throws Exception {
commitRequested.set(0);
stateTransitions1.clear();
stateTransitions2.clear();
streams2Beta = getKafkaStreams("appDir1", StreamsConfig.EXACTLY_ONCE_BETA);
streams2Beta = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_BETA);
streams2Beta.setStateListener(
(newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState))
);
Expand Down Expand Up @@ -869,7 +884,7 @@ public void init(final ProcessorContext context) {
this.context = context;
state = (KeyValueStore<Long, Long>) context.getStateStore(storeName);
final String clientId = context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString();
if ("appDir1".equals(clientId)) {
if (APP_DIR_1.equals(clientId)) {
crash = errorInjectedClient1;
sharedCommit = commitCounterClient1;
} else {
Expand Down Expand Up @@ -938,11 +953,22 @@ public void close() {}

final KafkaStreams streams = new KafkaStreams(builder.build(), config, new TestKafkaClientSupplier());
streams.setUncaughtExceptionHandler((t, e) -> {
if (uncaughtException != null) {
if (!injectError) {
// we don't expect any exception thrown in stop case
e.printStackTrace(System.err);
fail("Should only get one uncaught exception from Streams.");
hasUnexpectedError = true;
} else {
int exceptionCount = (int) exceptionCounts.get(appDir);
// should only have our injected exception or commit exception, and 2 exceptions for each stream
if (++exceptionCount > 2 || !(e instanceof RuntimeException) ||
!(e.getMessage().contains("test exception"))) {
// The exception won't cause the test fail since we actually "expected" exception thrown and failed the stream.
// So, log to stderr for debugging when the exception is not what we expected, and fail in the main thread
e.printStackTrace(System.err);
hasUnexpectedError = true;
}
exceptionCounts.put(appDir, exceptionCount);
}
uncaughtException = e;
});

return streams;
Expand All @@ -963,7 +989,7 @@ private void waitForStateTransition(final List<KeyValue<KafkaStreams.State, Kafk
waitForCondition(
() -> observed.equals(expected),
MAX_WAIT_TIME_MS,
() -> "Client did not startup on time. Observers transitions: " + observed
() -> "Client did not have the expected state transition on time. Observers transitions: " + observed
);
}

Expand Down Expand Up @@ -1143,7 +1169,7 @@ private class ErrorInjector extends KafkaProducer<byte[], byte[]> {
public ErrorInjector(final Map<String, Object> configs) {
super(configs, new ByteArraySerializer(), new ByteArraySerializer());
final String clientId = configs.get(ProducerConfig.CLIENT_ID_CONFIG).toString();
if (clientId.contains("appDir1")) {
if (clientId.contains(APP_DIR_1)) {
crash = commitErrorInjectedClient1;
} else {
crash = commitErrorInjectedClient2;
Expand All @@ -1156,7 +1182,7 @@ public void commitTransaction() {
if (!crash.compareAndSet(true, false)) {
super.commitTransaction();
} else {
throw new RuntimeException("Injected producer commit exception.");
throw new RuntimeException("Injected producer commit test exception.");
}
}
}
Expand Down