diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 6ddf2a1840015..bc2a4333d2040 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -122,7 +122,6 @@ public class KafkaStreams { private GlobalStreamThread globalStreamThread; private final StreamThread[] threads; - private final Map threadState; private final Metrics metrics; private final QueryableStoreProvider queryableStoreProvider; @@ -253,6 +252,13 @@ public synchronized State state() { } private final class StreamStateListener implements StreamThread.StateListener { + + private final Map threadState; + + StreamStateListener(Map threadState) { + this.threadState = threadState; + } + @Override public synchronized void onChange(final StreamThread thread, final StreamThread.State newState, @@ -333,7 +339,7 @@ public KafkaStreams(final TopologyBuilder builder, metrics = new Metrics(metricConfig, reporters, time); threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; - threadState = new HashMap<>(threads.length); + final Map threadState = new HashMap<>(threads.length); final ArrayList storeProviders = new ArrayList<>(); streamsMetadataState = new StreamsMetadataState(builder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); @@ -358,6 +364,7 @@ public KafkaStreams(final TopologyBuilder builder, globalThreadId); } + final StreamStateListener streamStateListener = new StreamStateListener(threadState); for (int i = 0; i < threads.length; i++) { threads[i] = new StreamThread(builder, config, @@ -369,7 +376,7 @@ public KafkaStreams(final TopologyBuilder builder, time, streamsMetadataState, cacheSizeBytes); - threads[i].setStateListener(new StreamStateListener()); + threads[i].setStateListener(streamStateListener); threadState.put(threads[i].getId(), threads[i].state()); storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); }