From 19120c31e93cbd9be3cc466feea425187b079457 Mon Sep 17 00:00:00 2001 From: "phil.hardwick" Date: Sat, 21 Aug 2021 11:02:09 +0100 Subject: [PATCH 1/4] MINOR: Fix null pointer when getting metric value in MetricsReporter The alive stream threads metric relies on the threads field as a monitor object for its synchronized block. When the alive stream threads metric is registered it isn't initialised so any call to get the metric value before it is initialised will result in a null pointer exception. --- .../apache/kafka/streams/KafkaStreams.java | 2 +- .../MetricsReporterIntegrationTest.java | 127 ++++++++++++++++++ 2 files changed, 128 insertions(+), 1 deletion(-) create mode 100644 streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 291a8c5925d54..eee55fc091104 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -883,6 +883,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, ClientMetrics.addApplicationIdMetric(streamsMetrics, config.getString(StreamsConfig.APPLICATION_ID_CONFIG)); ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString()); ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state); + threads = Collections.synchronizedList(new LinkedList<>()); ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> getNumLiveStreamThreads()); streamsMetadataState = new StreamsMetadataState( @@ -915,7 +916,6 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, globalThreadState = globalStreamThread.state(); } - threads = Collections.synchronizedList(new LinkedList<>()); threadState = new HashMap<>(numStreamThreads); streamStateListener = new StreamStateListener(threadState, globalThreadState); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java new file mode 100644 index 0000000000000..a8dcd152b0136 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.test.IntegrationTest; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.function.Supplier; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +@Category({IntegrationTest.class}) +public class MetricsReporterIntegrationTest { + + private static final int NUM_BROKERS = 1; + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + + // topic names + private static final String STREAM_INPUT = "STREAM_INPUT"; + private static final String STREAM_OUTPUT = "STREAM_OUTPUT"; + + private StreamsBuilder builder; + private Properties streamsConfiguration; + + @Rule + public TestName testName = new TestName(); + + @Before + public void before() throws InterruptedException { + builder = new StreamsBuilder(); + + final String safeTestName = safeUniqueTestName(getClass(), testName); + final String appId = "app-" + safeTestName; + + streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsConfiguration.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricReporterImpl.class.getName()); + } + + final static Map metricNameToInitialValue = new HashMap<>(); + final static Map> metricNameToValueGetter = new HashMap<>(); + + public static class MetricReporterImpl implements MetricsReporter { + + + @Override + public void configure(Map configs) { + } + + @Override + public void init(List metrics) { + } + + @Override + public void metricChange(KafkaMetric metric) { + // get value of metric, e.g. if you wanted checking the type of the value + metricNameToInitialValue.put(metric.metricName().name(), metric.metricValue()); + metricNameToValueGetter.put(metric.metricName().name(), metric::metricValue); + } + + @Override + public void metricRemoval(KafkaMetric metric) { + } + + @Override + public void close() { + } + + } + + @Test + public void shouldBeAbleToProvideInitialMetricValueToMetricsReporter() { + // no need to create the topics, because we don't start the stream - just need to create the KafkaStreams object + // to check all initial values from the metrics are not null + builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())) + .to(STREAM_OUTPUT, Produced.with(Serdes.Integer(), Serdes.String())); + final Topology topology = builder.build(); + final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration); + + kafkaStreams.metrics().keySet().forEach(metricName -> { + final Object initialMetricValue = metricNameToInitialValue.get(metricName.name()); + assertThat(initialMetricValue, notNullValue()); + }); + } + +} \ No newline at end of file From 9109ceb84d63f059df48dfabe2d79288faf73667 Mon Sep 17 00:00:00 2001 From: "phil.hardwick" Date: Mon, 23 Aug 2021 08:17:48 +0100 Subject: [PATCH 2/4] fix checkstyle --- .../MetricsReporterIntegrationTest.java | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java index a8dcd152b0136..11e7f4a9db0bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.function.Supplier; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.notNullValue; @@ -78,29 +77,27 @@ public void before() throws InterruptedException { streamsConfiguration.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MetricReporterImpl.class.getName()); } - final static Map metricNameToInitialValue = new HashMap<>(); - final static Map> metricNameToValueGetter = new HashMap<>(); + final static Map METRIC_NAME_TO_INITIAL_VALUE = new HashMap<>(); public static class MetricReporterImpl implements MetricsReporter { @Override - public void configure(Map configs) { + public void configure(final Map configs) { } @Override - public void init(List metrics) { + public void init(final List metrics) { } @Override - public void metricChange(KafkaMetric metric) { + public void metricChange(final KafkaMetric metric) { // get value of metric, e.g. if you wanted checking the type of the value - metricNameToInitialValue.put(metric.metricName().name(), metric.metricValue()); - metricNameToValueGetter.put(metric.metricName().name(), metric::metricValue); + METRIC_NAME_TO_INITIAL_VALUE.put(metric.metricName().name(), metric.metricValue()); } @Override - public void metricRemoval(KafkaMetric metric) { + public void metricRemoval(final KafkaMetric metric) { } @Override @@ -119,7 +116,7 @@ public void shouldBeAbleToProvideInitialMetricValueToMetricsReporter() { final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration); kafkaStreams.metrics().keySet().forEach(metricName -> { - final Object initialMetricValue = metricNameToInitialValue.get(metricName.name()); + final Object initialMetricValue = METRIC_NAME_TO_INITIAL_VALUE.get(metricName.name()); assertThat(initialMetricValue, notNullValue()); }); } From 122bc373ca924e64770de7cbdf9d0f4d35d8bdbf Mon Sep 17 00:00:00 2001 From: "phil.hardwick" Date: Mon, 23 Aug 2021 11:10:49 +0100 Subject: [PATCH 3/4] Fix class rule in test --- .../MetricsReporterIntegrationTest.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java index 11e7f4a9db0bc..e24358711dfe1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java @@ -28,13 +28,11 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.test.IntegrationTest; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,7 +47,6 @@ public class MetricsReporterIntegrationTest { private static final int NUM_BROKERS = 1; - @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); // topic names @@ -62,6 +59,16 @@ public class MetricsReporterIntegrationTest { @Rule public TestName testName = new TestName(); + @BeforeClass + public static void startCluster() throws IOException { + CLUSTER.start(); + } + + @AfterClass + public static void closeCluster() { + CLUSTER.stop(); + } + @Before public void before() throws InterruptedException { builder = new StreamsBuilder(); From 5c574f39b4e9d4fa05a141a008f8d6e10b473664 Mon Sep 17 00:00:00 2001 From: "phil.hardwick" Date: Mon, 23 Aug 2021 14:39:49 +0100 Subject: [PATCH 4/4] Fix checkstyle --- .../streams/integration/MetricsReporterIntegrationTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java index e24358711dfe1..a7c925ad5f5a7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java @@ -28,7 +28,11 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.test.IntegrationTest; -import org.junit.*; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName;