From 7c3c51ffcb65b50c344ab05733874964e1d1064c Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 16 Jun 2020 15:13:33 -0500 Subject: [PATCH 1/2] KAFKA-10165: Remove Percentiles from e2e metrics --- .../internals/ProcessorContextImpl.java | 2 +- .../processor/internals/StandbyTask.java | 11 +- .../processor/internals/StreamTask.java | 24 ++-- .../processor/internals/TaskManager.java | 6 - .../metrics/ProcessorNodeMetrics.java | 34 +---- .../internals/metrics/StreamsMetricsImpl.java | 43 +----- .../internals/metrics/TaskMetrics.java | 28 ++++ .../integration/MetricsIntegrationTest.java | 4 - .../processor/internals/StandbyTaskTest.java | 42 ++++++ .../processor/internals/StreamTaskTest.java | 122 +++++++++--------- .../metrics/ProcessorNodeMetricsTest.java | 46 +------ .../metrics/StreamsMetricsImplTest.java | 8 +- 12 files changed, 168 insertions(+), 202 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index a58a8627ce7bf..b220fa56a589f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -235,7 +235,7 @@ private void forward(final ProcessorNode child, setCurrentNode(child); child.process(key, value); if (child.isTerminalNode()) { - streamTask.maybeRecordE2ELatency(timestamp(), child.name()); + streamTask.maybeRecordE2ELatency(timestamp(), currentSystemTimeMs(), child.name()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 5df59f6e37c3a..b334bc15a65ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -44,6 +44,7 @@ public class StandbyTask extends AbstractTask implements Task { private final Sensor closeTaskSensor; private final boolean eosEnabled; private final InternalProcessorContext processorContext; + private final StreamsMetricsImpl streamsMetrics; private Map offsetSnapshotSinceLastCommit; @@ -52,7 +53,7 @@ public class StandbyTask extends AbstractTask implements Task { * @param partitions input topic partitions, used for thread metadata only * @param topology the instance of {@link ProcessorTopology} * @param config the {@link StreamsConfig} specified by the user - * @param metrics the {@link StreamsMetrics} created by the thread + * @param streamsMetrics the {@link StreamsMetrics} created by the thread * @param stateMgr the {@link ProcessorStateManager} for this task * @param stateDirectory the {@link StateDirectory} created by the thread */ @@ -60,13 +61,14 @@ public class StandbyTask extends AbstractTask implements Task { final Set partitions, final ProcessorTopology topology, final StreamsConfig config, - final StreamsMetricsImpl metrics, + final StreamsMetricsImpl streamsMetrics, final ProcessorStateManager stateMgr, final StateDirectory stateDirectory, final ThreadCache cache, final InternalProcessorContext processorContext) { super(id, topology, stateDirectory, stateMgr, partitions); this.processorContext = processorContext; + this.streamsMetrics = streamsMetrics; processorContext.transitionToStandby(cache); final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName()); @@ -74,7 +76,7 @@ public class StandbyTask extends AbstractTask implements Task { final LogContext logContext = new LogContext(logPrefix); log = logContext.logger(getClass()); - closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), metrics); + closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics); eosEnabled = StreamThread.eosEnabled(config); } @@ -174,18 +176,21 @@ public void postCommit() { @Override public void closeClean() { + streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); close(true); log.info("Closed clean"); } @Override public void closeDirty() { + streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); close(false); log.info("Closed dirty"); } @Override public void closeAndRecycleState() { + streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); if (state() == State.SUSPENDED) { stateMgr.recycle(); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index fa8b94ba854a0..eb1bf4b16512e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -37,7 +37,6 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; -import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; @@ -89,6 +88,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private final Map consumedOffsets; private final PunctuationQueue streamTimePunctuationQueue; private final PunctuationQueue systemTimePunctuationQueue; + private final StreamsMetricsImpl streamsMetrics; private long processTimeMs = 0L; @@ -135,6 +135,7 @@ public StreamTask(final TaskId id, eosEnabled = StreamThread.eosEnabled(config); final String threadId = Thread.currentThread().getName(); + this.streamsMetrics = streamsMetrics; closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics); final String taskId = id.toString(); if (streamsMetrics.version() == Version.FROM_0100_TO_24) { @@ -148,18 +149,18 @@ public StreamTask(final TaskId id, punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics); bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics); - for (final String terminalNode : topology.terminalNodes()) { + for (final String terminalNodeName : topology.terminalNodes()) { e2eLatencySensors.put( - terminalNode, - ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, terminalNode, RecordingLevel.INFO, streamsMetrics) + terminalNodeName, + TaskMetrics.e2ELatencySensor(threadId, taskId, terminalNodeName, RecordingLevel.INFO, streamsMetrics) ); } for (final ProcessorNode sourceNode : topology.sources()) { - final String processorId = sourceNode.name(); + final String sourceNodeName = sourceNode.name(); e2eLatencySensors.put( - processorId, - ProcessorNodeMetrics.recordE2ELatencySensor(threadId, taskId, processorId, RecordingLevel.INFO, streamsMetrics) + sourceNodeName, + TaskMetrics.e2ELatencySensor(threadId, taskId, sourceNodeName, RecordingLevel.INFO, streamsMetrics) ); } @@ -462,12 +463,14 @@ private Map extractPartitionTimes() { @Override public void closeClean() { + streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); close(true); log.info("Closed clean"); } @Override public void closeDirty() { + streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), id.toString()); close(false); log.info("Closed dirty"); } @@ -480,6 +483,7 @@ public void update(final Set topicPartitions, final Map tagMap = streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId); - addMinAndMaxAndP99AndP90ToSensor( - sensor, - PROCESSOR_NODE_LEVEL_GROUP, - tagMap, - RECORD_E2E_LATENCY, - RECORD_E2E_LATENCY_MIN_DESCRIPTION, - RECORD_E2E_LATENCY_MAX_DESCRIPTION, - RECORD_E2E_LATENCY_P99_DESCRIPTION, - RECORD_E2E_LATENCY_P90_DESCRIPTION - ); - return sensor; - } - private static Sensor throughputAndLatencySensorWithParent(final String threadId, final String taskId, final String processorNodeId, @@ -337,7 +307,7 @@ private static Sensor throughputAndLatencySensorWithParent(final String threadId descriptionOfCount, descriptionOfAvgLatency, descriptionOfMaxLatency, - RecordingLevel.DEBUG, + recordingLevel, streamsMetrics ); return throughputAndLatencySensor( @@ -349,7 +319,7 @@ private static Sensor throughputAndLatencySensorWithParent(final String threadId descriptionOfCount, descriptionOfAvgLatency, descriptionOfMaxLatency, - RecordingLevel.DEBUG, + recordingLevel, streamsMetrics, parentSensor ); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index fe0c94c9341cf..215dfc51085fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -28,9 +28,6 @@ import org.apache.kafka.common.metrics.stats.CumulativeSum; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Min; -import org.apache.kafka.common.metrics.stats.Percentile; -import org.apache.kafka.common.metrics.stats.Percentiles; -import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.metrics.stats.WindowedCount; @@ -47,9 +44,9 @@ import java.util.LinkedList; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import java.util.Optional; public class StreamsMetricsImpl implements StreamsMetrics { @@ -154,9 +151,6 @@ public int hashCode() { public static final String RATE_DESCRIPTION_PREFIX = "The average number of "; public static final String RATE_DESCRIPTION_SUFFIX = " per second"; - private static final int PERCENTILES_SIZE_IN_BYTES = 100 * 1000; // 100 kB - private static double MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000d; // maximum latency is 10 days; values above that will be pinned - public StreamsMetricsImpl(final Metrics metrics, final String clientId, final String builtInMetricsVersion) { Objects.requireNonNull(metrics, "Metrics cannot be null"); Objects.requireNonNull(builtInMetricsVersion, "Built-in metrics version cannot be null"); @@ -650,14 +644,12 @@ public static void addAvgAndMaxToSensor(final Sensor sensor, ); } - public static void addMinAndMaxAndP99AndP90ToSensor(final Sensor sensor, - final String group, - final Map tags, - final String operation, - final String descriptionOfMin, - final String descriptionOfMax, - final String descriptionOfP99, - final String descriptionOfP90) { + public static void addMinAndMaxToSensor(final Sensor sensor, + final String group, + final Map tags, + final String operation, + final String descriptionOfMin, + final String descriptionOfMax) { sensor.add( new MetricName( operation + MIN_SUFFIX, @@ -675,27 +667,6 @@ public static void addMinAndMaxAndP99AndP90ToSensor(final Sensor sensor, tags), new Max() ); - - sensor.add( - new Percentiles( - PERCENTILES_SIZE_IN_BYTES, - MAXIMUM_E2E_LATENCY, - BucketSizing.LINEAR, - new Percentile( - new MetricName( - operation + P99_SUFFIX, - group, - descriptionOfP99, - tags), - 99), - new Percentile( - new MetricName( - operation + P90_SUFFIX, - group, - descriptionOfP90, - tags), - 90)) - ); } public static void addAvgAndMaxLatencyToSensor(final Sensor sensor, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java index 8fe2e3a11e435..534dc02f162bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java @@ -24,11 +24,13 @@ import java.util.Map; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addMinAndMaxToSensor; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor; public class TaskMetrics { @@ -86,6 +88,13 @@ private TaskMetrics() {} private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " + "from consumer and not yet processed for this active task"; + private static final String RECORD_E2E_LATENCY = "record-e2e-latency"; + private static final String RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX = + "end-to-end latency of a record, measuring by comparing the record timestamp with the " + + "system time when it has been fully processed by the node"; + private static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION = "The minimum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX; + private static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = "The maximum " + RECORD_E2E_LATENCY_DESCRIPTION_SUFFIX; + public static Sensor processLatencySensor(final String threadId, final String taskId, final StreamsMetricsImpl streamsMetrics) { @@ -133,6 +142,25 @@ public static Sensor activeBufferedRecordsSensor(final String threadId, return sensor; } + public static Sensor e2ELatencySensor(final String threadId, + final String taskId, + final String processorNodeId, + final RecordingLevel recordingLevel, + final StreamsMetricsImpl streamsMetrics) { + final String sensorName = processorNodeId + "-" + RECORD_E2E_LATENCY; + final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, sensorName, recordingLevel); + final Map tagMap = streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId); + addMinAndMaxToSensor( + sensor, + PROCESSOR_NODE_LEVEL_GROUP, + tagMap, + RECORD_E2E_LATENCY, + RECORD_E2E_LATENCY_MIN_DESCRIPTION, + RECORD_E2E_LATENCY_MAX_DESCRIPTION + ); + return sensor; + } + public static Sensor punctuateSensor(final String threadId, final String taskId, final StreamsMetricsImpl streamsMetrics) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index f06057f484966..b9f8f9ef2cd36 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -206,8 +206,6 @@ public class MetricsIntegrationTest { private static final String EXPIRED_WINDOW_RECORD_DROP_TOTAL = "expired-window-record-drop-total"; private static final String E2E_LATENCY_MIN = "record-e2e-latency-min"; private static final String E2E_LATENCY_MAX = "record-e2e-latency-max"; - private static final String E2E_LATENCY_P99 = "record-e2e-latency-p99"; - private static final String E2E_LATENCY_P90 = "record-e2e-latency-p90"; // stores name private static final String TIME_WINDOWED_AGGREGATED_STREAM_STORE = "time-windowed-aggregated-stream-store"; @@ -608,8 +606,6 @@ private void checkProcessorNodeLevelMetrics(final String builtInMetricsVersion) checkMetricByName(listMetricProcessor, FORWARD_RATE, numberOfModifiedForwardMetrics); checkMetricByName(listMetricProcessor, E2E_LATENCY_MIN, numberOfSourceNodes + numberOfTerminalNodes); checkMetricByName(listMetricProcessor, E2E_LATENCY_MAX, numberOfSourceNodes + numberOfTerminalNodes); - checkMetricByName(listMetricProcessor, E2E_LATENCY_P99, numberOfSourceNodes + numberOfTerminalNodes); - checkMetricByName(listMetricProcessor, E2E_LATENCY_P90, numberOfSourceNodes + numberOfTerminalNodes); } private void checkKeyValueStoreMetrics(final String group0100To24, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 3f4b410358c07..82f33c4697b9a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -52,6 +52,8 @@ import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -62,6 +64,7 @@ import static org.apache.kafka.streams.processor.internals.Task.State.SUSPENDED; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; @@ -380,6 +383,37 @@ public void shouldThrowOnCloseCleanCheckpointError() { EasyMock.replay(stateManager); } + @Test + public void shouldUnregisterMetricsInCloseClean() { + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); + EasyMock.replay(stateManager); + + task = createStandbyTask(); + task.initializeIfNeeded(); + + task.suspend(); + task.closeClean(); + // Currently, there are no metrics registered for standby tasks. + // This is a regression test so that, if we add some, we will be sure to deregister them. + assertThat(getTaskMetrics(), empty()); + } + + @Test + public void shouldUnregisterMetricsInCloseDirty() { + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); + EasyMock.replay(stateManager); + + task = createStandbyTask(); + task.initializeIfNeeded(); + + task.suspend(); + task.closeDirty(); + + // Currently, there are no metrics registered for standby tasks. + // This is a regression test so that, if we add some, we will be sure to deregister them. + assertThat(getTaskMetrics(), empty()); + } + @Test public void shouldCloseStateManagerOnTaskCreated() { stateManager.close(); @@ -476,6 +510,10 @@ public void shouldRecycleTask() { task.suspend(); task.closeAndRecycleState(); // SUSPENDED + // Currently, there are no metrics registered for standby tasks. + // This is a regression test so that, if we add some, we will be sure to deregister them. + assertThat(getTaskMetrics(), empty()); + EasyMock.verify(stateManager); } @@ -538,4 +576,8 @@ private void verifyCloseTaskMetric(final double expected, final StreamsMetricsIm final double totalCloses = metric.measurable().measure(metric.config(), System.currentTimeMillis()); assertThat(totalCloses, equalTo(expected)); } + + private List getTaskMetrics() { + return streamsMetrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList()); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 7a2cf7ad49579..0b9a35fd0c8d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.HashSet; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; @@ -74,6 +73,7 @@ import java.util.Arrays; import java.util.Base64; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -97,7 +97,9 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -404,7 +406,7 @@ public void shouldRecordBufferedRecords() { final KafkaMetric metric = getMetric("active-buffer", "%s-count", task.id().toString(), StreamsConfig.METRICS_LATEST); - assertThat(metric.metricValue(), equalTo(0.0d)); + assertThat(metric.metricValue(), equalTo(0.0)); task.addRecords(partition1, asList( getConsumerRecord(partition1, 10), @@ -412,12 +414,12 @@ public void shouldRecordBufferedRecords() { )); task.recordProcessTimeRatioAndBufferSize(100L, time.milliseconds()); - assertThat(metric.metricValue(), equalTo(2.0d)); + assertThat(metric.metricValue(), equalTo(2.0)); task.process(0L); task.recordProcessTimeRatioAndBufferSize(100L, time.milliseconds()); - assertThat(metric.metricValue(), equalTo(1.0d)); + assertThat(metric.metricValue(), equalTo(1.0)); } @Test @@ -426,22 +428,22 @@ public void shouldRecordProcessRatio() { final KafkaMetric metric = getMetric("active-process", "%s-ratio", task.id().toString(), StreamsConfig.METRICS_LATEST); - assertThat(metric.metricValue(), equalTo(0.0d)); + assertThat(metric.metricValue(), equalTo(0.0)); task.recordProcessBatchTime(10L); task.recordProcessBatchTime(15L); task.recordProcessTimeRatioAndBufferSize(100L, time.milliseconds()); - assertThat(metric.metricValue(), equalTo(0.25d)); + assertThat(metric.metricValue(), equalTo(0.25)); task.recordProcessBatchTime(10L); - assertThat(metric.metricValue(), equalTo(0.25d)); + assertThat(metric.metricValue(), equalTo(0.25)); task.recordProcessBatchTime(10L); task.recordProcessTimeRatioAndBufferSize(20L, time.milliseconds()); - assertThat(metric.metricValue(), equalTo(1.0d)); + assertThat(metric.metricValue(), equalTo(1.0)); } @Test @@ -458,24 +460,7 @@ public void shouldRecordE2ELatencyOnProcessForSourceNodes() { task.addRecords(partition1, singletonList(getConsumerRecord(partition1, 0L))); task.process(100L); - assertThat(maxMetric.metricValue(), equalTo(100d)); - } - - @Test - public void shouldRecordE2ELatencyOnProcessForTerminalNodes() { - time = new MockTime(0L, 0L, 0L); - metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time); - task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); - - final String terminalNode = processorStreamTime.name(); - - final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), terminalNode, StreamsConfig.METRICS_LATEST); - - // e2e latency = 100 - time.setCurrentTimeMs(100L); - task.maybeRecordE2ELatency(0L, terminalNode); - - assertThat(maxMetric.metricValue(), equalTo(100d)); + assertThat(maxMetric.metricValue(), equalTo(100.0)); } @Test @@ -493,50 +478,24 @@ public void shouldRecordE2ELatencyMinAndMax() { assertThat(maxMetric.metricValue(), equalTo(Double.NaN)); // e2e latency = 10 - time.setCurrentTimeMs(10L); - task.maybeRecordE2ELatency(0L, sourceNode); - assertThat(minMetric.metricValue(), equalTo(10d)); - assertThat(maxMetric.metricValue(), equalTo(10d)); + task.maybeRecordE2ELatency(0L, 10L, sourceNode); + assertThat(minMetric.metricValue(), equalTo(10.0)); + assertThat(maxMetric.metricValue(), equalTo(10.0)); // e2e latency = 15 - time.setCurrentTimeMs(25L); - task.maybeRecordE2ELatency(10L, sourceNode); - assertThat(minMetric.metricValue(), equalTo(10d)); - assertThat(maxMetric.metricValue(), equalTo(15d)); + task.maybeRecordE2ELatency(10L, 25L, sourceNode); + assertThat(minMetric.metricValue(), equalTo(10.0)); + assertThat(maxMetric.metricValue(), equalTo(15.0)); // e2e latency = 25 - time.setCurrentTimeMs(30L); - task.maybeRecordE2ELatency(5L, sourceNode); - assertThat(minMetric.metricValue(), equalTo(10d)); - assertThat(maxMetric.metricValue(), equalTo(25d)); + task.maybeRecordE2ELatency(5L, 30L, sourceNode); + assertThat(minMetric.metricValue(), equalTo(10.0)); + assertThat(maxMetric.metricValue(), equalTo(25.0)); // e2e latency = 20 - time.setCurrentTimeMs(40L); - task.maybeRecordE2ELatency(35L, sourceNode); - assertThat(minMetric.metricValue(), equalTo(5d)); - assertThat(maxMetric.metricValue(), equalTo(25d)); - } - - @Test - public void shouldRecordE2ELatencyPercentiles() { - time = new MockTime(0L, 0L, 0L); - metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time); - task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); - - final String sourceNode = source1.name(); - - final Metric p99Metric = getProcessorMetric("record-e2e-latency", "%s-p99", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST); - final Metric p90Metric = getProcessorMetric("record-e2e-latency", "%s-p90", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST); - - for (int i = 0; i < 100; i++) { - time.setCurrentTimeMs(i); - task.maybeRecordE2ELatency(0L, sourceNode); - } - - final double expectedAccuracy = 0.25d; // Make sure it's accurate to within 25% of the expected value - - assertEquals((double) p99Metric.metricValue(), 99d, 99 * expectedAccuracy); - assertEquals((double) p90Metric.metricValue(), 90d, 90 * expectedAccuracy); + task.maybeRecordE2ELatency(35L, 40L, sourceNode); + assertThat(minMetric.metricValue(), equalTo(5.0)); + assertThat(maxMetric.metricValue(), equalTo(25.0)); } @Test @@ -1764,6 +1723,34 @@ public void shouldNotThrowFromStateManagerCloseInCloseDirty() { EasyMock.verify(stateManager); } + @Test + public void shouldUnregisterMetricsInCloseClean() { + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); + EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes(); + EasyMock.replay(stateManager, recordCollector); + + task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); + + task.suspend(); + assertThat(getTaskMetrics(), not(empty())); + task.closeClean(); + assertThat(getTaskMetrics(), empty()); + } + + @Test + public void shouldUnregisterMetricsInCloseDirty() { + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); + EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes(); + EasyMock.replay(stateManager, recordCollector); + + task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); + + task.suspend(); + assertThat(getTaskMetrics(), not(empty())); + task.closeDirty(); + assertThat(getTaskMetrics(), empty()); + } + @Test public void closeShouldBeIdempotent() { EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); @@ -1813,7 +1800,10 @@ public void shouldOnlyRecycleSuspendedTasks() { assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING task.suspend(); + + assertThat(getTaskMetrics(), not(empty())); task.closeAndRecycleState(); // SUSPENDED + assertThat(getTaskMetrics(), empty()); EasyMock.verify(stateManager, recordCollector); } @@ -1848,6 +1838,10 @@ public void shouldAlwaysSuspendRunningTasks() { assertThat(task.state(), equalTo(SUSPENDED)); } + private List getTaskMetrics() { + return metrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList()); + } + private StreamTask createOptimizedStatefulTask(final StreamsConfig config, final Consumer consumer) { final StateStore stateStore = new MockKeyValueStore(storeName, true); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java index 14f370da80721..08131e6309ca6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java @@ -87,7 +87,7 @@ public void shouldGetSuppressionEmitSensor() { expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, - StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP, + PROCESSOR_NODE_LEVEL_GROUP, tagMap, metricNamePrefix, descriptionOfRate, @@ -108,7 +108,7 @@ public void shouldGetIdempotentUpdateSkipSensor() { expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, - StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP, + PROCESSOR_NODE_LEVEL_GROUP, tagMap, metricNamePrefix, descriptionOfRate, @@ -261,38 +261,6 @@ public void shouldGetProcessAtSourceSensorOrForwardSensor() { } } - @Test - public void shouldGetRecordE2ELatencySensor() { - final String operation = "record-e2e-latency"; - final String recordE2ELatencyMinDescription = - "The minimum end-to-end latency of a record, measuring by comparing the record timestamp with the " - + "system time when it has been fully processed by the node"; - final String recordE2ELatencyMaxDescription = - "The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the " - + "system time when it has been fully processed by the node"; - final String recordE2ELatencyP99Description = - "The 99th percentile end-to-end latency of a record, measuring by comparing the record timestamp with the " - + "system time when it has been fully processed by the node"; - final String recordE2ELatencyP90Description = - "The 90th percentile end-to-end latency of a record, measuring by comparing the record timestamp with the " - + "system time when it has been fully processed by the node"; - expect(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, operation, RecordingLevel.INFO)) - .andReturn(expectedSensor); - expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap); - StreamsMetricsImpl.addMinAndMaxAndP99AndP90ToSensor( - expectedSensor, - PROCESSOR_NODE_LEVEL_GROUP, - tagMap, - operation, - recordE2ELatencyMinDescription, - recordE2ELatencyMaxDescription, - recordE2ELatencyP99Description, - recordE2ELatencyP90Description - ); - - verifySensor(() -> ProcessorNodeMetrics.recordE2ELatencySensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, RecordingLevel.INFO, streamsMetrics)); - } - private void shouldGetThroughputAndLatencySensorWithParentOrEmptySensor(final String metricNamePrefix, final String descriptionOfRate, final String descriptionOfCount, @@ -353,7 +321,7 @@ private void setUpThroughputAndLatencyParentSensor(final String metricNamePrefix .andReturn(parentTagMap); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedParentSensor, - StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP, + PROCESSOR_NODE_LEVEL_GROUP, parentTagMap, metricNamePrefix, descriptionOfRate, @@ -361,7 +329,7 @@ private void setUpThroughputAndLatencyParentSensor(final String metricNamePrefix ); StreamsMetricsImpl.addAvgAndMaxToSensor( expectedParentSensor, - StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP, + PROCESSOR_NODE_LEVEL_GROUP, parentTagMap, metricNamePrefix + StreamsMetricsImpl.LATENCY_SUFFIX, descriptionOfAvg, @@ -378,7 +346,7 @@ private void setUpThroughputParentSensor(final String metricNamePrefix, .andReturn(parentTagMap); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedParentSensor, - StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP, + PROCESSOR_NODE_LEVEL_GROUP, parentTagMap, metricNamePrefix, descriptionOfRate, @@ -395,7 +363,7 @@ private void setUpThroughputAndLatencySensor(final String metricNamePrefix, setUpThroughputSensor(metricNamePrefix, descriptionOfRate, descriptionOfCount, RecordingLevel.DEBUG, parentSensors); StreamsMetricsImpl.addAvgAndMaxToSensor( expectedSensor, - StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP, + PROCESSOR_NODE_LEVEL_GROUP, tagMap, metricNamePrefix + StreamsMetricsImpl.LATENCY_SUFFIX, descriptionOfAvgLatency, @@ -419,7 +387,7 @@ private void setUpThroughputSensor(final String metricNamePrefix, expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap); StreamsMetricsImpl.addInvocationRateAndCountToSensor( expectedSensor, - StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP, + PROCESSOR_NODE_LEVEL_GROUP, tagMap, metricNamePrefix, descriptionOfRate, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index 47fec02970079..e0ee428da356f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -999,17 +999,15 @@ public void shouldAddAvgAndMinAndMaxMetricsToSensor() { } @Test - public void shouldAddMinAndMaxAndP99AndP90MetricsToSensor() { + public void shouldAddMinAndMaxMetricsToSensor() { StreamsMetricsImpl - .addMinAndMaxAndP99AndP90ToSensor(sensor, group, tags, metricNamePrefix, description1, description2, description3, description4); + .addMinAndMaxToSensor(sensor, group, tags, metricNamePrefix, description1, description2); final double valueToRecord1 = 18.0; final double valueToRecord2 = 42.0; verifyMetric(metricNamePrefix + "-min", description1, valueToRecord1, valueToRecord2, valueToRecord1); verifyMetric(metricNamePrefix + "-max", description2, valueToRecord1, valueToRecord2, valueToRecord2); - verifyMetricWithinError(metricNamePrefix + "-p99", description3, valueToRecord1, valueToRecord2, valueToRecord2, 1.0); - verifyMetricWithinError(metricNamePrefix + "-p90", description4, valueToRecord1, valueToRecord2, valueToRecord2, 1.0); - assertThat(metrics.metrics().size(), equalTo(4 + 1)); // one metric is added automatically in the constructor of Metrics + assertThat(metrics.metrics().size(), equalTo(2 + 1)); // one metric is added automatically in the constructor of Metrics } @Test From ae1a3908bcaad82e30ad35a0e4462d7a4b7978b5 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 16 Jun 2020 22:42:47 -0500 Subject: [PATCH 2/2] separate test case for closeAndRecycle metrics --- .../processor/internals/StreamTaskTest.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 0b9a35fd0c8d3..59e96d40d4c1c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1751,6 +1751,20 @@ public void shouldUnregisterMetricsInCloseDirty() { assertThat(getTaskMetrics(), empty()); } + @Test + public void shouldUnregisterMetricsInCloseAndRecycle() { + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); + EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap()).anyTimes(); + EasyMock.replay(stateManager, recordCollector); + + task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); + + task.suspend(); + assertThat(getTaskMetrics(), not(empty())); + task.closeAndRecycleState(); + assertThat(getTaskMetrics(), empty()); + } + @Test public void closeShouldBeIdempotent() { EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); @@ -1800,10 +1814,7 @@ public void shouldOnlyRecycleSuspendedTasks() { assertThrows(IllegalStateException.class, () -> task.closeAndRecycleState()); // RUNNING task.suspend(); - - assertThat(getTaskMetrics(), not(empty())); task.closeAndRecycleState(); // SUSPENDED - assertThat(getTaskMetrics(), empty()); EasyMock.verify(stateManager, recordCollector); }