Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 40 additions & 38 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,32 +202,33 @@ public class KafkaStreams implements AutoCloseable {
* | | |
* | v |
* | +------+-------+ +----+-------+
* +-----> | Pending |<--- | Error (5) |
* | Shutdown (3) | +------------+
* +------+-------+
* |
* v
* +------+-------+
* | Not |
* | Running (4) |
* +-----> | Pending | | Pending |
* | Shutdown (3) | | Error (5) |
* +------+-------+ +-----+------+
* | |
* v v
* +------+-------+ +-----+--------+
* | Not | | Error (6) |
* | Running (4) | +--------------+
* +--------------+
*
*
* </pre>
* Note the following:
* - RUNNING state will transit to REBALANCING if any of its threads is in PARTITION_REVOKED or PARTITIONS_ASSIGNED state
* - REBALANCING state will transit to RUNNING if all of its threads are in RUNNING state
* - Any state except NOT_RUNNING can go to PENDING_SHUTDOWN (whenever close is called)
* - Any state except NOT_RUNNING, PENDING_ERROR or ERROR can go to PENDING_SHUTDOWN (whenever close is called)
* - Of special importance: If the global stream thread dies, or all stream threads die (or both) then
* the instance will be in the ERROR state. The user will need to close it.
* the instance will be in the ERROR state. The user will not need to close it.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we always need to call close? If an error happens, we call the handler, and if the handler return shutdown, we transit to PENDING_ERROR. On close() we transit from PENDING_ERROR -> ERROR?

Or do I have some misconception?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the handler will call close, but the user will not need to. The PENDING_ERROR state is indicating the resources are closing before the transition to ERROR after which no more work will be done. We made it so the user can call close on PENDING_ERROR or ERROR but it will only log a warning

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying!

*/
public enum State {
CREATED(1, 3), // 0
REBALANCING(2, 3, 5), // 1
RUNNING(1, 2, 3, 5), // 2
PENDING_SHUTDOWN(4), // 3
NOT_RUNNING, // 4
ERROR(3); // 5
PENDING_ERROR(6), // 5
ERROR; // 6

private final Set<Integer> validTransitions = new HashSet<>();

Expand Down Expand Up @@ -290,8 +291,12 @@ private boolean setState(final State newState) {
} else if (state == State.REBALANCING && newState == State.REBALANCING) {
// when the state is already in REBALANCING, it should not transit to REBALANCING again
return false;
} else if (state == State.ERROR && newState == State.ERROR) {
// when the state is already in ERROR, it should not transit to ERROR again
} else if (state == State.ERROR && (newState == State.PENDING_ERROR || newState == State.ERROR)) {
// when the state is already in ERROR, its transition to PENDING_ERROR or ERROR (due to consecutive close calls)
return false;
} else if (state == State.PENDING_ERROR && newState != State.ERROR) {
// when the state is already in PENDING_ERROR, all other transitions than ERROR (due to thread dying) will be
// refused but we do not throw exception here, to allow appropriate error handling
return false;
} else if (!state.isValidTransition(newState)) {
throw new IllegalStateException("Stream-client " + clientId + ": Unexpected state transition from " + oldState + " to " + newState);
Expand Down Expand Up @@ -439,7 +444,7 @@ private void replaceStreamThread(final Throwable throwable) {
log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
log.error("Encountered the following exception during processing " +
" The streams client is going to shut down now. ", throwable);
close(Duration.ZERO);
closeToError();
}
final StreamThread deadThread = (StreamThread) Thread.currentThread();
threads.remove(deadThread);
Expand Down Expand Up @@ -469,7 +474,7 @@ private void handleStreamsUncaughtException(final Throwable throwable,
log.error("Encountered the following exception during processing " +
"and the registered exception handler opted to " + action + "." +
" The streams client is going to shut down now. ", throwable);
close(Duration.ZERO);
closeToError();
break;
case SHUTDOWN_APPLICATION:
if (throwable instanceof Error) {
Expand All @@ -482,7 +487,7 @@ private void handleStreamsUncaughtException(final Throwable throwable,
log.error("Exception in global thread caused the application to attempt to shutdown." +
" This action will succeed only if there is at least one StreamThread running on this client." +
" Currently there are no running threads so will now close the client.");
close(Duration.ZERO);
closeToError();
} else {
processStreamThread(thread -> thread.sendShutdownRequest(AssignorError.SHUTDOWN_REQUESTED));
log.error("Encountered the following exception during processing " +
Expand Down Expand Up @@ -552,22 +557,6 @@ final class StreamStateListener implements StreamThread.StateListener {
this.threadStatesLock = new Object();
}

/**
* If all threads are dead set to ERROR
*/
private void maybeSetError() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not be needed with the new error definition

// check if we have at least one thread running
for (final StreamThread.State state : threadState.values()) {
if (state != StreamThread.State.DEAD) {
return;
}
}

if (setState(State.ERROR)) {
log.error("All stream threads have died. The instance will be in error state and should be closed.");
}
}

/**
* If all threads are up, including the global thread, set to RUNNING
*/
Expand Down Expand Up @@ -603,8 +592,6 @@ public synchronized void onChange(final Thread thread,
setState(State.REBALANCING);
} else if (newState == StreamThread.State.RUNNING) {
maybeSetRunning();
} else if (newState == StreamThread.State.DEAD) {
maybeSetError();
}
} else if (thread instanceof GlobalStreamThread) {
// global stream thread has different invariants
Expand All @@ -614,9 +601,8 @@ public synchronized void onChange(final Thread thread,
if (newState == GlobalStreamThread.State.RUNNING) {
maybeSetRunning();
} else if (newState == GlobalStreamThread.State.DEAD) {
if (setState(State.ERROR)) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will be doing the same thing but closing the client for now. Maybe a replace globalThread will be added later

log.error("Global thread has died. The instance will be in error state and should be closed.");
}
log.error("Global thread has died. The streams application or client will now close to ERROR.");
closeToError();
}
}
}
Expand Down Expand Up @@ -1204,11 +1190,27 @@ private Thread shutdownHelper(final boolean error) {
metrics.close();
if (!error) {
setState(State.NOT_RUNNING);
} else {
setState(State.ERROR);
}
}, "kafka-streams-close-thread");
}

private boolean close(final long timeoutMs) {
if (state == State.ERROR) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to make close() idempotent and not throw an exception but we will log a warning, but only for close so that is why these logs are not in the setState() method.

log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped.");
return true;
}
if (state == State.PENDING_ERROR) {
log.info("Streams client is in PENDING_ERROR, all resources are being closed and the client will be stopped.");
if (waitOnState(State.ERROR, timeoutMs)) {
log.info("Streams client stopped to ERROR completely");
return true;
} else {
log.info("Streams client cannot transition to ERROR completely within the timeout");
return false;
}
}
if (!setState(State.PENDING_SHUTDOWN)) {
// if transition failed, it means it was either in PENDING_SHUTDOWN
// or NOT_RUNNING already; just check that all threads have been stopped
Expand All @@ -1230,7 +1232,7 @@ private boolean close(final long timeoutMs) {
}

private void closeToError() {
if (!setState(State.ERROR)) {
if (!setState(State.PENDING_ERROR)) {
log.info("Skipping shutdown since we are already in " + state());
} else {
final Thread shutdownThread = shutdownHelper(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ public boolean isRunning() {
private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
private Runnable shutdownErrorHook;
private AtomicInteger assignmentErrorCode;
private final ProcessingMode processingMode;

public static StreamThread create(final InternalTopologyBuilder builder,
final StreamsConfig config,
Expand Down Expand Up @@ -482,7 +483,7 @@ public StreamThread(final Time time,
this.shutdownErrorHook = shutdownErrorHook;
this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
this.cacheResizer = cacheResizer;

this.processingMode = processingMode(config);

// The following sensors are created here but their references are not stored in this object, since within
// this object they are not recorded. The sensors are created here so that the stream threads starts with all
Expand Down Expand Up @@ -539,8 +540,7 @@ public void run() {
}
boolean cleanRun = false;
try {
runLoop();
cleanRun = true;
cleanRun = runLoop();
} finally {
completeShutdown(cleanRun);
}
Expand All @@ -552,7 +552,7 @@ public void run() {
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
void runLoop() {
boolean runLoop() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will let Streams shutdown uncleanly when in EOS mode

subscribeConsumer();

// if the thread is still in the middle of a rebalance, we should keep polling
Expand Down Expand Up @@ -587,11 +587,18 @@ void runLoop() {
}
failedStreamThreadSensor.record();
this.streamsUncaughtExceptionHandler.accept(e);
if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_BETA) {
return false;
}
} catch (final Throwable e) {
failedStreamThreadSensor.record();
this.streamsUncaughtExceptionHandler.accept(e);
if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_BETA) {
return false;
}
}
}
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,17 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin
StreamThread.State.PARTITIONS_ASSIGNED);
return null;
}).anyTimes();
EasyMock.expect(thread.threadMetadata()).andReturn(new ThreadMetadata(
"newThead",
"DEAD",
"",
"",
Collections.emptySet(),
"",
Collections.emptySet(),
Collections.emptySet()
)
).anyTimes();
EasyMock.expect(thread.threadMetadata()).andStubReturn(threadMetadata);
thread.waitOnThreadState(StreamThread.State.DEAD);
EasyMock.expectLastCall().anyTimes();
Expand Down Expand Up @@ -428,70 +439,6 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws In
Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
}

@Test
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is for the functionality we are removing

public void stateShouldTransitToErrorIfAllThreadsDead() throws InterruptedException {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.setStateListener(streamsStateListener);

Assert.assertEquals(0, streamsStateListener.numChanges);
Assert.assertEquals(KafkaStreams.State.CREATED, streams.state());

streams.start();

TestUtils.waitForCondition(
() -> streamsStateListener.numChanges == 2,
"Streams never started.");
Assert.assertEquals(KafkaStreams.State.RUNNING, streams.state());

for (final StreamThread thread : streams.threads) {
threadStatelistenerCapture.getValue().onChange(
thread,
StreamThread.State.PARTITIONS_REVOKED,
StreamThread.State.RUNNING);
}

Assert.assertEquals(3, streamsStateListener.numChanges);
Assert.assertEquals(KafkaStreams.State.REBALANCING, streams.state());

threadStatelistenerCapture.getValue().onChange(
streams.threads.get(NUM_THREADS - 1),
StreamThread.State.PENDING_SHUTDOWN,
StreamThread.State.PARTITIONS_REVOKED);

threadStatelistenerCapture.getValue().onChange(
streams.threads.get(NUM_THREADS - 1),
StreamThread.State.DEAD,
StreamThread.State.PENDING_SHUTDOWN);

Assert.assertEquals(3, streamsStateListener.numChanges);
Assert.assertEquals(KafkaStreams.State.REBALANCING, streams.state());

for (final StreamThread thread : streams.threads) {
if (thread != streams.threads.get(NUM_THREADS - 1)) {
threadStatelistenerCapture.getValue().onChange(
thread,
StreamThread.State.PENDING_SHUTDOWN,
StreamThread.State.PARTITIONS_REVOKED);

threadStatelistenerCapture.getValue().onChange(
thread,
StreamThread.State.DEAD,
StreamThread.State.PENDING_SHUTDOWN);
}
}

Assert.assertEquals(4, streamsStateListener.numChanges);
Assert.assertEquals(KafkaStreams.State.ERROR, streams.state());

streams.close();

// the state should not stuck with ERROR, but transit to NOT_RUNNING in the end
TestUtils.waitForCondition(
() -> streamsStateListener.numChanges == 6,
"Streams never closed.");
Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state());
}

@Test
public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
final StreamsBuilder builder = getBuilderWithSource();
Expand Down Expand Up @@ -535,8 +482,9 @@ public void testStateThreadClose() throws Exception {
streams.threads.get(i).join();
}
TestUtils.waitForCondition(
Comment thread
wcarlson5 marked this conversation as resolved.
Outdated
() -> streams.state() == KafkaStreams.State.ERROR,
"Streams never stopped.");
() -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")),
"Streams never stopped"
);
} finally {
streams.close();
}
Expand Down Expand Up @@ -567,12 +515,18 @@ public void testStateGlobalThreadClose() throws Exception {
() -> globalStreamThread.state() == GlobalStreamThread.State.DEAD,
"Thread never stopped.");
globalStreamThread.join();
assertEquals(streams.state(), KafkaStreams.State.ERROR);
TestUtils.waitForCondition(
() -> streams.state() == KafkaStreams.State.PENDING_ERROR,
"Thread never stopped."
);
} finally {
streams.close();
}

assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
TestUtils.waitForCondition(
() -> streams.state() == KafkaStreams.State.ERROR,
"Thread never stopped."
);
}

@Test
Expand Down Expand Up @@ -635,8 +589,7 @@ public void shouldNotAddThreadWhenError() {
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
final int oldSize = streams.threads.size();
streams.start();
streamThreadOne.shutdown();
streamThreadTwo.shutdown();
globalStreamThread.shutdown();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing stream threads does not put the client in error anymore. The global does

assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
assertThat(streams.threads.size(), equalTo(oldSize));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public static void setupConfigsAndUtils() {
}

@Test
@SuppressWarnings("deprecation")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove deprecation

Copy link
Copy Markdown
Member

@mjsax mjsax Dec 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we even set the exception handler that does not do anything? 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It made it so that the new default was not used until we updated the Error transition as we are doing in this PR. :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Thanks for clarifying.

public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException {
final String appId = "shouldWorkWithUncleanShutdownWipeOutStateStore";
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
Expand Down Expand Up @@ -135,7 +134,6 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE
));
final KafkaStreams driver = new KafkaStreams(builder.build(), STREAMS_CONFIG);
driver.cleanUp();
driver.setUncaughtExceptionHandler((t, e) -> { });
driver.start();

final File stateDir = new File(String.join("/", TEST_FOLDER.getRoot().getPath(), appId, "0_0"));
Expand Down
Loading