diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java b/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java new file mode 100644 index 0000000000..545fd45420 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.metrics; + +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; +import org.apache.samza.util.TimestampedValue; + + +/** + * A {@link ListGauge} is a {@link Metric} that buffers multiple instances of a type T in a list. + * {@link ListGauge}s are useful for maintaining, recording, or collecting values over time. + * For example, a set of specific logging-events (e.g., errors). + * + * Eviction is controlled by parameters (maxNumberOfItems and maxStaleness), which are set during instantiation. + * Eviction happens during element addition or during reads of the ListGauge (getValues). + * + * All public methods are thread-safe. + * + */ +public class ListGauge implements Metric { + private final String name; + private final Queue> elements; + + private final int maxNumberOfItems; + private final Duration maxStaleness; + private final static int DEFAULT_MAX_NITEMS = 1000; + private final static Duration DEFAULT_MAX_STALENESS = Duration.ofMinutes(60); + + /** + * Create a new {@link ListGauge} that auto evicts based on the given maxNumberOfItems, maxStaleness, and period parameters. + * + * @param name Name to be assigned + * @param maxNumberOfItems The max number of items that can remain in the listgauge + * @param maxStaleness The max staleness of items permitted in the listgauge + */ + public ListGauge(String name, int maxNumberOfItems, Duration maxStaleness) { + this.name = name; + this.elements = new ConcurrentLinkedQueue>(); + this.maxNumberOfItems = maxNumberOfItems; + this.maxStaleness = maxStaleness; + } + + /** + * Create a new {@link ListGauge} that auto evicts upto a max of 100 items and a max-staleness of 60 minutes. + * @param name Name to be assigned + */ + public ListGauge(String name) { + this(name, DEFAULT_MAX_NITEMS, DEFAULT_MAX_STALENESS); + } + + /** + * Get the name assigned to this {@link ListGauge} + * @return the assigned name + */ + public String getName() { + return this.name; + } + + /** + * Get the Collection of values currently in the list. + * @return the collection of values + */ + public Collection getValues() { + this.evict(); + return Collections.unmodifiableList(this.elements.stream().map(x -> x.getValue()).collect(Collectors.toList())); + } + + /** + * Add a value to the list. + * (Timestamp assigned to this value is the current timestamp.) + * @param value The Gauge value to be added + */ + public void add(T value) { + this.elements.add(new TimestampedValue(value, Instant.now().toEpochMilli())); + + // perform any evictions that may be needed. + this.evict(); + } + + /** + * {@inheritDoc} + */ + @Override + public void visit(MetricsVisitor visitor) { + visitor.listGauge(this); + } + + /** + * Evicts entries from the elements list, based on the given item-size and durationThreshold. + */ + private void evict() { + this.evictBasedOnSize(); + this.evictBasedOnTimestamp(); + } + + /** + * Evicts entries from elements in FIFO order until it has maxNumberOfItems + */ + private void evictBasedOnSize() { + int numToEvict = this.elements.size() - this.maxNumberOfItems; + while (numToEvict > 0) { + this.elements.poll(); // remove head + numToEvict--; + } + } + + /** + * Removes entries from elements to ensure no element has a timestamp more than maxStaleness before current timestamp. + */ + private void evictBasedOnTimestamp() { + Instant currentTimestamp = Instant.now(); + TimestampedValue valueInfo = this.elements.peek(); + + // continue remove-head if currenttimestamp - head-element's timestamp > durationThreshold + while (valueInfo != null + && currentTimestamp.toEpochMilli() - valueInfo.getTimestamp() > this.maxStaleness.toMillis()) { + this.elements.poll(); + valueInfo = this.elements.peek(); + } + } +} diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java index 5a00d01868..fa0fd3984a 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java @@ -64,6 +64,15 @@ public interface MetricsRegistry { */ Gauge newGauge(String group, Gauge value); + /** + * Register a {@link org.apache.samza.metrics.ListGauge} + * @param group Group for this ListGauge + * @param listGauge the ListGauge to register + * @param Type of the ListGauge + * @return ListGauge registered + */ + ListGauge newListGauge(String group, ListGauge listGauge); + /** * Create and Register a new {@link org.apache.samza.metrics.Timer} * @param group Group for this Timer diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java index 75abfe7078..49a0929515 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java @@ -31,8 +31,13 @@ public abstract class MetricsVisitor { public abstract void timer(Timer timer); + public abstract void listGauge(ListGauge listGauge); + public void visit(Metric metric) { - if (metric instanceof Counter) { + // Cast for metrics of type ListGauge + if (metric instanceof ListGauge) { + listGauge((ListGauge) metric); + } else if (metric instanceof Counter) { counter((Counter) metric); } else if (metric instanceof Gauge) { gauge((Gauge) metric); diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java index 739d68f8ef..ba5b182828 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java @@ -24,5 +24,7 @@ public interface ReadableMetricsRegistryListener { void onGauge(String group, Gauge gauge); + void onListGauge(String group, ListGauge listGauge); + void onTimer(String group, Timer timer); } diff --git a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java index 3df855c2f7..76b82164dc 100644 --- a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java +++ b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java @@ -21,9 +21,11 @@ import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.ListGauge; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; + /** * {@link org.apache.samza.metrics.MetricsRegistry} implementation for when no actual metrics need to be * recorded but a registry is still required. @@ -49,6 +51,11 @@ public Gauge newGauge(String group, Gauge gauge) { return gauge; } + @Override + public ListGauge newListGauge(String group, ListGauge listGauge) { + return listGauge; + } + @Override public Timer newTimer(String group, String name) { return new Timer(name); diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java b/samza-api/src/main/java/org/apache/samza/util/TimestampedValue.java similarity index 97% rename from samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java rename to samza-api/src/main/java/org/apache/samza/util/TimestampedValue.java index 5e451485f3..e767918eb7 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValue.java +++ b/samza-api/src/main/java/org/apache/samza/util/TimestampedValue.java @@ -17,7 +17,7 @@ * under the License. * */ -package org.apache.samza.operators.impl.store; +package org.apache.samza.util; /** * An immutable pair of a value, and its corresponding timestamp. diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java b/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java new file mode 100644 index 0000000000..eb9101238c --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.metrics; + +import java.time.Duration; +import java.util.Collection; +import java.util.Iterator; +import org.junit.Assert; +import org.junit.Test; + + +/** + * Class to encapsulate test-cases for {@link org.apache.samza.metrics.ListGauge} + */ +public class TestListGauge { + + private final static Duration THREAD_TEST_TIMEOUT = Duration.ofSeconds(10); + + private ListGauge getListGaugeForTest() { + return new ListGauge("sampleListGauge", 10, Duration.ofSeconds(60)); + } + + @Test + public void basicTest() { + ListGauge listGauge = getListGaugeForTest(); + listGauge.add("sampleValue"); + Assert.assertEquals("Names should be the same", listGauge.getName(), "sampleListGauge"); + Assert.assertEquals("List sizes should match", listGauge.getValues().size(), 1); + Assert.assertEquals("ListGauge should contain sampleGauge", listGauge.getValues().contains("sampleValue"), true); + } + + @Test + public void testSizeEnforcement() { + ListGauge listGauge = getListGaugeForTest(); + for (int i = 15; i > 0; i--) { + listGauge.add("v" + i); + } + Assert.assertEquals("List sizes should be as configured at creation time", listGauge.getValues().size(), 10); + + int valueIndex = 10; + Collection currentList = listGauge.getValues(); + Iterator iterator = currentList.iterator(); + while (iterator.hasNext()) { + String gaugeValue = (String) iterator.next(); + Assert.assertTrue(gaugeValue.equals("v" + valueIndex)); + valueIndex--; + } + } + + @Test + public void testThreadSafety() throws InterruptedException { + ListGauge listGauge = getListGaugeForTest(); + + Thread thread1 = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 1; i <= 100; i++) { + listGauge.add(i); + } + } + }); + + Thread thread2 = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 1; i <= 100; i++) { + listGauge.add(i); + } + } + }); + + thread1.start(); + thread2.start(); + + thread1.join(THREAD_TEST_TIMEOUT.toMillis()); + thread2.join(THREAD_TEST_TIMEOUT.toMillis()); + + Assert.assertTrue("ListGauge should have the last 10 values", listGauge.getValues().size() == 10); + for (Integer gaugeValue : listGauge.getValues()) { + Assert.assertTrue("Values should have the last 10 range", gaugeValue <= 100 && gaugeValue > 90); + } + } +} diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java index 8076e02d61..c694d3ffc8 100644 --- a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java +++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java @@ -19,13 +19,12 @@ package org.apache.samza.metrics; -import static org.junit.Assert.*; - import java.util.Arrays; - import org.apache.samza.util.Clock; import org.junit.Test; +import static org.junit.Assert.*; + public class TestTimer { /* diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java index d29b975578..01b69edbc2 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java @@ -19,20 +19,21 @@ package org.apache.samza.system.eventhub; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import org.apache.commons.collections4.map.HashedMap; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.ListGauge; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - public class TestMetricsRegistry implements MetricsRegistry { private Map> counters = new HashedMap<>(); private Map>> gauges = new HashedMap<>(); + private Map> listGauges = new HashedMap<>(); public List getCounters(String groupName) { return counters.get(groupName); @@ -68,6 +69,13 @@ public Counter newCounter(String group, Counter counter) { return null; } + @Override + public ListGauge newListGauge(String group, ListGauge listGauge) { + listGauges.putIfAbsent(group, new ArrayList()); + listGauges.get(group).add(listGauge); + return listGauge; + } + @Override public Gauge newGauge(String group, Gauge value) { if (!gauges.containsKey(group)) { diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java index 53526d86c4..fc5784699f 100644 --- a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java +++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java @@ -44,6 +44,10 @@ public Counter newCounter(String name) { return registry.newCounter(groupName, (prefix + name).toLowerCase()); } + public ListGauge newListGauge(String name) { + return registry.newListGauge(groupName, new ListGauge(name)); + } + public Gauge newGauge(String name, T value) { return registry.newGauge(groupName, new Gauge((prefix + name).toLowerCase(), value)); } diff --git a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java index 5ede5e8f14..038abba9d0 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java +++ b/samza-core/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java @@ -18,7 +18,7 @@ */ package org.apache.samza.operators.functions; -import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.util.TimestampedValue; import org.apache.samza.storage.kv.KeyValueStore; /** diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index bbc878364b..d675822df0 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -29,7 +29,7 @@ import org.apache.samza.operators.TimerRegistry; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.BroadcastOperatorSpec; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.JoinOperatorSpec; diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java index 90a71a0eff..0cdde490f6 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -21,7 +21,7 @@ import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.operators.functions.PartialJoinFunction; -import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.storage.kv.KeyValueStore; diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index 32406cbcf0..32b6e5dd49 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -26,7 +26,7 @@ import org.apache.samza.operators.impl.store.TimeSeriesKey; import org.apache.samza.operators.impl.store.TimeSeriesStore; import org.apache.samza.operators.impl.store.TimeSeriesStoreImpl; -import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.operators.triggers.FiringType; diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java index f3d694824a..c9b694d37f 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStore.java @@ -21,6 +21,8 @@ package org.apache.samza.operators.impl.store; import org.apache.samza.storage.kv.ClosableIterator; +import org.apache.samza.util.TimestampedValue; + /** * A key-value store that allows entries to be queried and stored based on time ranges. diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java index f03d3969f9..10a59679bb 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java @@ -23,6 +23,7 @@ import org.apache.samza.storage.kv.Entry; import org.apache.samza.storage.kv.KeyValueIterator; import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.util.TimestampedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java index b14f8a4933..5b0cdac625 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimestampedValueSerde.java @@ -21,6 +21,7 @@ import org.apache.samza.serializers.Serde; import java.nio.ByteBuffer; +import org.apache.samza.util.TimestampedValue; public class TimestampedValueSerde implements Serde> { diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java index 9e058ff424..fe74479d91 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java @@ -23,7 +23,7 @@ import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.operators.impl.store.TimestampedValueSerde; -import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.util.TimestampedValue; import org.apache.samza.serializers.Serde; import java.util.Arrays; diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 5380fc98dc..8d886669b3 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -28,6 +28,7 @@ import java.util.Base64 import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorService, TimeUnit} import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala index c1229565ad..a26e6669bf 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala @@ -48,6 +48,8 @@ class SamzaContainerMetrics( val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]() + val exceptions = newListGauge[String]("exceptions") + def addStoreRestorationGauge(taskName: TaskName, storeName: String) { taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L)) } diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala index 1520b0e62d..21ec7633f4 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala @@ -40,9 +40,11 @@ trait MetricsHelper { def newGauge[T](name: String, value: T) = metricGroup.newGauge[T](name,value) + def newListGauge[T](name: String) = metricGroup.newListGauge[T](name) + /** - * Specify a dynamic gauge that always returns the latest value when polled. - * The value closure must be thread safe, since metrics reporters may access + * Specify a dynamic gauge that always returns the latest value when polled. + * The value closure must be thread safe, since metrics reporters may access * it from another thread. */ def newGauge[T](name: String, value: () => T) = { diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala index 40ffee2bfa..75ed6aa9b4 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala @@ -75,6 +75,21 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with newTimer(group, new Timer(name)) } + /** + * Register a {@link org.apache.samza.metrics.ListGauge} + * + * @param group Group for this ListGauge + * @param listGauge the ListGauge to register + * @tparam T the type of the list gauge + */ + def newListGauge[T](group: String, listGauge: ListGauge[T]) = { + debug("Adding new listgauge %s %s %s." format(group, listGauge.getName, listGauge)) + putAndGetGroup(group).putIfAbsent(listGauge.getName, listGauge) + val realListGauge = metrics.get(group).get(listGauge.getName).asInstanceOf[ListGauge[T]] + listeners.foreach(_.onListGauge(group, realListGauge)) + realListGauge + } + private def putAndGetGroup(group: String) = { metrics.putIfAbsent(group, new ConcurrentHashMap[String, Metric]) metrics.get(group) diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala index 7da8a9c52a..c601b29319 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala @@ -20,19 +20,15 @@ package org.apache.samza.metrics.reporter import java.lang.management.ManagementFactory + import org.apache.samza.util.Logging import javax.management.MBeanServer import javax.management.ObjectName + import org.apache.samza.config.Config -import org.apache.samza.metrics.Counter -import org.apache.samza.metrics.Gauge -import org.apache.samza.metrics.Timer -import org.apache.samza.metrics.MetricsReporter -import org.apache.samza.metrics.MetricsReporterFactory -import org.apache.samza.metrics.ReadableMetricsRegistry -import org.apache.samza.metrics.ReadableMetricsRegistryListener +import org.apache.samza.metrics._ + import scala.collection.JavaConverters._ -import org.apache.samza.metrics.MetricsVisitor import org.apache.samza.metrics.JmxUtil._ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { @@ -52,6 +48,8 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry)))) def gauge[T](gauge: Gauge[T]) = registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, name, sources(registry)))) def timer(timer: Timer) = registerBean(new JmxTimer(timer, getObjectName(group, name, sources(registry)))) + def listGauge[T](listGauge: ListGauge[T]) = registerBean(new JmxListGauge(listGauge.asInstanceOf[ListGauge[Object]], getObjectName(group, name, sources(registry)))) + }) } }) @@ -65,14 +63,15 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { def onCounter(group: String, counter: Counter) { registerBean(new JmxCounter(counter, getObjectName(group, counter.getName, source))) } - def onGauge(group: String, gauge: Gauge[_]) { registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, gauge.getName, source))) } - def onTimer(group: String, timer: Timer) { registerBean(new JmxTimer(timer, getObjectName(group, timer.getName, source))) } + def onListGauge(group: String, listGauge: ListGauge[_]) { + registerBean(new JmxListGauge(listGauge.asInstanceOf[ListGauge[Object]], getObjectName(group, listGauge.getName, source))) + } } } else { warn("Trying to re-register a registry for source %s. Ignoring." format source) @@ -110,6 +109,11 @@ class JmxGauge(g: org.apache.samza.metrics.Gauge[Object], on: ObjectName) extend def objectName = on } +class JmxListGauge(g: org.apache.samza.metrics.ListGauge[Object], on: ObjectName) extends JmxGaugeMBean { + def getValue = g.getValues + def objectName = on +} + trait JmxCounterMBean extends MetricMBean { def getCount(): Long } diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala index 65ca49c2f3..d300e90919 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala @@ -26,7 +26,6 @@ import org.apache.samza.system.OutgoingMessageEnvelope import org.apache.samza.system.SystemProducer import org.apache.samza.system.SystemStream import org.apache.samza.util.Logging - import java.util.HashMap import java.util.Map import java.util.concurrent.Executors @@ -83,15 +82,18 @@ class MetricsSnapshotReporter( } def stop = { - info("Stopping producer.") - producer.stop + // Scheduling an event with 0 delay to ensure flushing of metrics one last time before shutdown + executor.schedule(this,0, TimeUnit.SECONDS) info("Stopping reporter timer.") - + // Allow the scheduled task above to finish, and block for termination (for max 60 seconds) executor.shutdown executor.awaitTermination(60, TimeUnit.SECONDS) + info("Stopping producer.") + producer.stop + if (!executor.isTerminated) { warn("Unable to shutdown reporter timer.") } @@ -112,6 +114,8 @@ class MetricsSnapshotReporter( registry.getGroup(group).asScala.foreach { case (name, metric) => metric.visit(new MetricsVisitor { + // for listGauge the value is returned as a list, which gets serialized + def listGauge[T](listGauge: ListGauge[T]) = {groupMsg.put(name, listGauge.getValues) } def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long) def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object]) def timer(timer: Timer) = groupMsg.put(name, timer.getSnapshot().getAverage(): java.lang.Double) @@ -133,12 +137,18 @@ class MetricsSnapshotReporter( metricsSnapshot } - producer.send(source, new OutgoingMessageEnvelope(out, host, null, maybeSerialized)) + try { + + producer.send(source, new OutgoingMessageEnvelope(out, host, null, maybeSerialized)) - // Always flush, since we don't want metrics to get batched up. - producer.flush(source) + // Always flush, since we don't want metrics to get batched up. + producer.flush(source) + } catch { + case e: Exception => error("Exception when flushing metrics for source %s " format(source), e) + } } + debug("Finished flushing metrics.") } } diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java index 2d8d1eb57c..7ce75bc6d0 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java @@ -44,7 +44,7 @@ import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.MapFunction; -import org.apache.samza.operators.impl.store.TimestampedValue; +import org.apache.samza.util.TimestampedValue; import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.IntegerSerde; diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java index 0315a20212..94e171a8ca 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimeSeriesStoreImpl.java @@ -24,6 +24,7 @@ import org.apache.samza.serializers.StringSerde; import org.apache.samza.storage.kv.ClosableIterator; import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.util.TimestampedValue; import org.junit.Assert; import org.junit.Test; diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java index 40015ec6ee..1621e73df7 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestTimestampedValueSerde.java @@ -20,6 +20,7 @@ import org.apache.samza.serializers.ByteSerde; import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.util.TimestampedValue; import org.junit.Test; import java.nio.ByteBuffer; diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala index 122a1dfaee..f60e021a75 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala @@ -49,6 +49,9 @@ class ApplicationMasterRestServlet(samzaConfig: Config, samzaAppState: SamzaAppl registry.getGroup(group).asScala.foreach { case (name, metric) => metric.visit(new MetricsVisitor() { + def listGauge[T](listGauge: ListGauge[T]) = + groupMap.put(name, listGauge.getValues) + def counter(counter: Counter) = groupMap.put(counter.getName, counter.getCount: java.lang.Long)