diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java index 9f74417193a04..113d745c3b185 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java @@ -22,7 +22,7 @@ public class Min extends SampledStat { public Min() { - super(Double.MIN_VALUE); + super(Double.MAX_VALUE); } @Override @@ -32,10 +32,10 @@ protected void update(Sample sample, MetricConfig config, double value, long now @Override public double combine(List samples, MetricConfig config, long now) { - double max = Double.MAX_VALUE; + double min = Double.MAX_VALUE; for (int i = 0; i < samples.size(); i++) - max = Math.min(max, samples.get(i).value); - return max; + min = Math.min(min, samples.get(i).value); + return min; } } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index f5b49ba55ca90..52f0cd7e04bec 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -316,6 +316,38 @@ public void testOldDataHasNoEffect() { assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.milliseconds()), EPS); } + @Test + public void testSampledStatInitialValue() { + // initialValue from each SampledStat is set as the initialValue on its Sample. + // The only way to test the initialValue is to infer it by having a SampledStat + // with expired Stats, because their values are reset to the initial values. + // Most implementations of combine on SampledStat end up returning the default + // value, so we can use this. This doesn't work for Percentiles though. + // This test looks a lot like testOldDataHasNoEffect because it's the same + // flow that leads to this state. + Max max = new Max(); + Min min = new Min(); + Avg avg = new Avg(); + Count count = new Count(); + Rate.SampledTotal sampledTotal = new Rate.SampledTotal(); + + long windowMs = 100; + int samples = 2; + MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples); + max.record(config, 50, time.milliseconds()); + min.record(config, 50, time.milliseconds()); + avg.record(config, 50, time.milliseconds()); + count.record(config, 50, time.milliseconds()); + sampledTotal.record(config, 50, time.milliseconds()); + time.sleep(samples * windowMs); + + assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.milliseconds()), EPS); + assertEquals(Double.MAX_VALUE, min.measure(config, time.milliseconds()), EPS); + assertEquals(0.0, avg.measure(config, time.milliseconds()), EPS); + assertEquals(0, count.measure(config, time.milliseconds()), EPS); + assertEquals(0.0, sampledTotal.measure(config, time.milliseconds()), EPS); + } + @Test(expected = IllegalArgumentException.class) public void testDuplicateMetricName() { metrics.sensor("test").add(metrics.metricName("test", "grp1"), new Avg());