From 595b9641fc0c279eecd87a86a33b95629c699a90 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 4 Apr 2017 09:41:53 +0200 Subject: [PATCH 1/2] MINOR: Fix multiple org.apache.kafka.streams.KafkaStreams.StreamStateListener being instantiated --- .../java/org/apache/kafka/streams/KafkaStreams.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 6ddf2a1840015..d719fbf8a5dd6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -122,7 +122,6 @@ public class KafkaStreams { private GlobalStreamThread globalStreamThread; private final StreamThread[] threads; - private final Map threadState; private final Metrics metrics; private final QueryableStoreProvider queryableStoreProvider; @@ -253,6 +252,13 @@ public synchronized State state() { } private final class StreamStateListener implements StreamThread.StateListener { + + private final Map threadState; + + StreamStateListener(Map threadState) { + this.threadState = threadState; + } + @Override public synchronized void onChange(final StreamThread thread, final StreamThread.State newState, @@ -333,7 +339,7 @@ public KafkaStreams(final TopologyBuilder builder, metrics = new Metrics(metricConfig, reporters, time); threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; - threadState = new HashMap<>(threads.length); + final Map threadState = new HashMap<>(threads.length); final ArrayList storeProviders = new ArrayList<>(); streamsMetadataState = new StreamsMetadataState(builder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); @@ -358,6 +364,7 @@ public KafkaStreams(final TopologyBuilder builder, globalThreadId); } + StreamStateListener streamStateListener = new StreamStateListener(threadState); for (int i = 0; i < threads.length; i++) { threads[i] = new StreamThread(builder, config, @@ -369,7 +376,7 @@ public KafkaStreams(final TopologyBuilder builder, time, streamsMetadataState, cacheSizeBytes); - threads[i].setStateListener(new StreamStateListener()); + threads[i].setStateListener(streamStateListener); threadState.put(threads[i].getId(), threads[i].state()); storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); } From 412e6d0a11c0a111b134a44b11fa2cb5288dd28a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 4 Apr 2017 10:48:13 +0200 Subject: [PATCH 2/2] MINOR: Fix missing final --- .../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index d719fbf8a5dd6..bc2a4333d2040 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -364,7 +364,7 @@ public KafkaStreams(final TopologyBuilder builder, globalThreadId); } - StreamStateListener streamStateListener = new StreamStateListener(threadState); + final StreamStateListener streamStateListener = new StreamStateListener(threadState); for (int i = 0; i < threads.length; i++) { threads[i] = new StreamThread(builder, config,