Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public class KafkaStreams {
private GlobalStreamThread globalStreamThread;

private final StreamThread[] threads;
private final Map<Long, StreamThread.State> threadState;
private final Metrics metrics;
private final QueryableStoreProvider queryableStoreProvider;

Expand Down Expand Up @@ -253,6 +252,13 @@ public synchronized State state() {
}

private final class StreamStateListener implements StreamThread.StateListener {

private final Map<Long, StreamThread.State> threadState;

StreamStateListener(Map<Long, StreamThread.State> threadState) {
this.threadState = threadState;
}

@Override
public synchronized void onChange(final StreamThread thread,
final StreamThread.State newState,
Expand Down Expand Up @@ -333,7 +339,7 @@ public KafkaStreams(final TopologyBuilder builder,
metrics = new Metrics(metricConfig, reporters, time);

threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
threadState = new HashMap<>(threads.length);
final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
streamsMetadataState = new StreamsMetadataState(builder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));

Expand All @@ -358,6 +364,7 @@ public KafkaStreams(final TopologyBuilder builder,
globalThreadId);
}

final StreamStateListener streamStateListener = new StreamStateListener(threadState);
for (int i = 0; i < threads.length; i++) {
threads[i] = new StreamThread(builder,
config,
Expand All @@ -369,7 +376,7 @@ public KafkaStreams(final TopologyBuilder builder,
time,
streamsMetadataState,
cacheSizeBytes);
threads[i].setStateListener(new StreamStateListener());
threads[i].setStateListener(streamStateListener);
threadState.put(threads[i].getId(), threads[i].state());
storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
}
Expand Down