From 2fdc37e1d9bb80e550b70c233811b2343ccf554f Mon Sep 17 00:00:00 2001 From: "high.lee" Date: Thu, 9 Jan 2020 10:54:35 +0900 Subject: [PATCH] KAFKA-9152; Improve Sensor Retrieval --- .../internals/metrics/StreamsMetricsImpl.java | 66 ++++++++++--------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index d5b594a217506..309b361d35d6a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -46,6 +46,7 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.Optional; public class StreamsMetricsImpl implements StreamsMetrics { @@ -210,10 +211,14 @@ public final Sensor threadLevelSensor(final String threadId, final Sensor... parents) { final String key = threadSensorPrefix(threadId); synchronized (threadLevelSensors) { - threadLevelSensors.putIfAbsent(key, new LinkedList<>()); final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); - threadLevelSensors.get(key).push(fullSensorName); + final Sensor sensor = Optional.ofNullable(metrics.getSensor(fullSensorName)) + .orElseGet(() -> { + threadLevelSensors.putIfAbsent(key, new LinkedList<>()); + final Sensor newSensor = metrics.sensor(fullSensorName, recordingLevel, parents); + threadLevelSensors.get(key).push(fullSensorName); + return newSensor; + }); return sensor; } } @@ -290,12 +295,14 @@ public final Sensor taskLevelSensor(final String threadId, final Sensor... parents) { final String key = taskSensorPrefix(threadId, taskId); synchronized (taskLevelSensors) { - if (!taskLevelSensors.containsKey(key)) { - taskLevelSensors.put(key, new LinkedList<>()); - } final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); - taskLevelSensors.get(key).push(fullSensorName); + final Sensor sensor = Optional.ofNullable(metrics.getSensor(fullSensorName)) + .orElseGet(() -> { + taskLevelSensors.putIfAbsent(key, new LinkedList<>()); + final Sensor newSensor = metrics.sensor(fullSensorName, recordingLevel, parents); + taskLevelSensors.get(key).push(fullSensorName); + return newSensor; + }); return sensor; } } @@ -323,15 +330,14 @@ public Sensor nodeLevelSensor(final String threadId, final Sensor... parents) { final String key = nodeSensorPrefix(threadId, taskId, processorNodeName); synchronized (nodeLevelSensors) { - if (!nodeLevelSensors.containsKey(key)) { - nodeLevelSensors.put(key, new LinkedList<>()); - } - final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - - final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); - - nodeLevelSensors.get(key).push(fullSensorName); + final Sensor sensor = Optional.ofNullable(metrics.getSensor(fullSensorName)) + .orElseGet(() -> { + nodeLevelSensors.putIfAbsent(key, new LinkedList<>()); + final Sensor newSensor = metrics.sensor(fullSensorName, recordingLevel, parents); + nodeLevelSensors.get(key).push(fullSensorName); + return newSensor; + }); return sensor; } @@ -362,15 +368,14 @@ public Sensor cacheLevelSensor(final String threadId, final Sensor... parents) { final String key = cacheSensorPrefix(threadId, taskName, storeName); synchronized (cacheLevelSensors) { - if (!cacheLevelSensors.containsKey(key)) { - cacheLevelSensors.put(key, new LinkedList<>()); - } - final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - - final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); - - cacheLevelSensors.get(key).push(fullSensorName); + final Sensor sensor = Optional.ofNullable(metrics.getSensor(fullSensorName)) + .orElseGet(() -> { + cacheLevelSensors.putIfAbsent(key, new LinkedList<>()); + final Sensor newSensor = metrics.sensor(fullSensorName, recordingLevel, parents); + cacheLevelSensors.get(key).push(fullSensorName); + return newSensor; + }); return sensor; } @@ -413,13 +418,14 @@ public final Sensor storeLevelSensor(final String threadId, final Sensor... parents) { final String key = storeSensorPrefix(threadId, taskId, storeName); synchronized (storeLevelSensors) { - if (!storeLevelSensors.containsKey(key)) { - storeLevelSensors.put(key, new LinkedList<>()); - } final String fullSensorName = key + SENSOR_NAME_DELIMITER + sensorName; - final Sensor sensor = metrics.sensor(fullSensorName, recordingLevel, parents); - - storeLevelSensors.get(key).push(fullSensorName); + final Sensor sensor = Optional.ofNullable(metrics.getSensor(fullSensorName)) + .orElseGet(() -> { + storeLevelSensors.putIfAbsent(key, new LinkedList<>()); + final Sensor newSensor = metrics.sensor(fullSensorName, recordingLevel, parents); + storeLevelSensors.get(key).push(fullSensorName); + return newSensor; + }); return sensor; }