diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java b/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java new file mode 100644 index 0000000000..744acbdfc7 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.metrics; + +import java.util.Collection; +import java.util.LinkedList; + + +/** + * A ListGauge is a collection of {@link org.apache.samza.metrics.Gauge}. ListGauges are useful for maintaining, recording, + * or collecting specific Gauge values over time. It is implemented as a {@link org.apache.samza.metrics.Metric}. + * For example, the set of recent errors that have occurred. + * + * This current implementation uses a size-bound-policy and holds the N most-recent Gauge objects added to the list. + * This bound N is configurable at instantiation time. + * TODO: Support a time-based and size-and-time-based hybrid policy. + * TODO: Add a derived class to do compaction for errors using hash-based errorIDs and adding timestamp for errors to dedup + * on the read path. + * + * All public methods are thread-safe. + * + */ +public class ListGauge implements Metric { + private final String name; + private final Collection metricList; + private int nItems; + + /** + * Create a new ListGauge. + * @param name Name to be assigned + * @param nItems The number of items to hold in the list + */ + public ListGauge(String name, int nItems) { + this.name = name; + this.metricList = new LinkedList<>(); + this.nItems = nItems; + } + + /** + * Get the name assigned to the ListGauge + * @return the assigned name + */ + public String getName() { + return this.name; + } + + /** + * Add a gauge to the list + * @param value The Gauge value to be added + */ + public synchronized void add(Gauge value) { + if (this.metricList.size() == nItems) { + ((LinkedList) this.metricList).removeFirst(); + } + + this.metricList.add(value); + } + + /** + * Get the Collection of Gauge values currently in the list + * @return the collection of gauge values + */ + public synchronized Collection getValue() { + return this.metricList; + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized void visit(MetricsVisitor visitor) { + visitor.listGauge(this); + } +} diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java index 5a00d01868..8f2cfb4870 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsRegistry.java @@ -64,6 +64,15 @@ public interface MetricsRegistry { */ Gauge newGauge(String group, Gauge value); + /** + * Register a {@link org.apache.samza.metrics.ListGauge} + * @param group Group for this ListGauge + * @param listGauge the ListGauge to register + * @param Type of the ListGauge + * @return ListGauge registered + */ + ListGauge newListGauge(String group, ListGauge listGauge); + /** * Create and Register a new {@link org.apache.samza.metrics.Timer} * @param group Group for this Timer diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java index 75abfe7078..354a9ae54d 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsVisitor.java @@ -31,8 +31,13 @@ public abstract class MetricsVisitor { public abstract void timer(Timer timer); + public abstract void listGauge(ListGauge listGauge); + public void visit(Metric metric) { - if (metric instanceof Counter) { + // Cast for metrics of type ListGauge + if (metric instanceof ListGauge) { + listGauge((ListGauge) metric); + } else if (metric instanceof Counter) { counter((Counter) metric); } else if (metric instanceof Gauge) { gauge((Gauge) metric); diff --git a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java index 739d68f8ef..84225f5baf 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/ReadableMetricsRegistryListener.java @@ -24,5 +24,7 @@ public interface ReadableMetricsRegistryListener { void onGauge(String group, Gauge gauge); + void onListGauge(String group, ListGauge listGauge); + void onTimer(String group, Timer timer); } diff --git a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java index 3df855c2f7..73653ccfca 100644 --- a/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java +++ b/samza-api/src/main/java/org/apache/samza/util/NoOpMetricsRegistry.java @@ -21,9 +21,11 @@ import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.ListGauge; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; + /** * {@link org.apache.samza.metrics.MetricsRegistry} implementation for when no actual metrics need to be * recorded but a registry is still required. @@ -49,6 +51,11 @@ public Gauge newGauge(String group, Gauge gauge) { return gauge; } + @Override + public ListGauge newListGauge(String group, ListGauge listGauge) { + return listGauge; + } + @Override public Timer newTimer(String group, String name) { return new Timer(name); diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java b/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java new file mode 100644 index 0000000000..1747006f47 --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/metrics/TestListGauge.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.metrics; + +import java.time.Duration; +import java.util.Collection; +import java.util.Iterator; +import org.junit.Assert; +import org.junit.Test; + + +/** + * Class to encapsulate test-cases for {@link org.apache.samza.metrics.ListGauge} + */ +public class TestListGauge { + + private final static Duration THREAD_TEST_TIMEOUT = Duration.ofSeconds(10); + + @Test + public void basicTest() { + ListGauge listGauge = new ListGauge("listGauge", 10); + Gauge sampleGauge = new Gauge("key", "value"); + listGauge.add(sampleGauge); + Assert.assertEquals("Names should be the same", listGauge.getName(), "listGauge"); + Assert.assertEquals("List sizes should match", listGauge.getValue().size(), 1); + Assert.assertEquals("ListGauge should contain sampleGauge", listGauge.getValue().contains(sampleGauge), true); + } + + @Test + public void testSizeEnforcement() { + ListGauge listGauge = new ListGauge("listGauge", 10); + for (int i = 15; i > 0; i--) { + Gauge sampleGauge = new Gauge("key", "v" + i); + listGauge.add(sampleGauge); + } + Assert.assertEquals("List sizes should be as configured at creation time", listGauge.getValue().size(), 10); + + int valueIndex = 10; + Collection currentList = listGauge.getValue(); + Iterator iterator = currentList.iterator(); + while (iterator.hasNext()) { + Gauge gauge = (Gauge) iterator.next(); + Assert.assertTrue(gauge.getName().equals("key")); + Assert.assertTrue(gauge.getValue().equals("v" + valueIndex)); + valueIndex--; + } + } + + @Test + public void testThreadSafety() throws InterruptedException { + ListGauge listGauge = new ListGauge("listGauge", 20); + + Thread thread1 = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 1; i <= 100; i++) { + listGauge.add(new Gauge("thread1", i)); + } + } + }); + + Thread thread2 = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 1; i <= 100; i++) { + listGauge.add(new Gauge("key", i)); + } + } + }); + + thread1.start(); + thread2.start(); + + thread1.join(THREAD_TEST_TIMEOUT.toMillis()); + thread2.join(THREAD_TEST_TIMEOUT.toMillis()); + + Assert.assertTrue("ListGauge should have the last 20 values", listGauge.getValue().size() == 20); + for (Gauge gauge : listGauge.getValue()) { + Assert.assertTrue("Values should have the last 20 range", + ((Integer) gauge.getValue()) <= 100 && ((Integer) gauge.getValue()) > 80); + } + } +} diff --git a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java index 8076e02d61..b945636137 100644 --- a/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java +++ b/samza-api/src/test/java/org/apache/samza/metrics/TestTimer.java @@ -62,7 +62,7 @@ public void testTimerWithDifferentWindowSize() { assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L, 3L))); assertEquals(3, snapshot.getValues().size()); - // The time is 500 for update(4L) because getSnapshot calls clock once + 3 + // The time is 500 for update(4L) because getValue calls clock once + 3 // updates that call clock 3 times timer.update(4L); Snapshot snapshot2 = timer.getSnapshot(); diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java index d29b975578..110b58212c 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java @@ -1,27 +1,28 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.samza.system.eventhub; import org.apache.commons.collections4.map.HashedMap; import org.apache.samza.metrics.Counter; import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.ListGauge; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; @@ -29,10 +30,12 @@ import java.util.List; import java.util.Map; + public class TestMetricsRegistry implements MetricsRegistry { private Map> counters = new HashedMap<>(); private Map>> gauges = new HashedMap<>(); + private Map> listGauges = new HashedMap<>(); public List getCounters(String groupName) { return counters.get(groupName); @@ -78,6 +81,14 @@ public Gauge newGauge(String group, Gauge value) { return value; } + @Override + public ListGauge newListGauge(String group, ListGauge listGauge) { + listGauges.putIfAbsent(group, new ArrayList()); + ListGauge value = new ListGauge(group, 1000); + listGauges.get(group).add(value); + return value; + } + @Override public Timer newTimer(String group, String name) { return null; diff --git a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java index 53526d86c4..43d901f5b9 100644 --- a/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java +++ b/samza-core/src/main/java/org/apache/samza/metrics/MetricGroup.java @@ -1,21 +1,21 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.samza.metrics; @@ -48,6 +48,10 @@ public Gauge newGauge(String name, T value) { return registry.newGauge(groupName, new Gauge((prefix + name).toLowerCase(), value)); } + public ListGauge newListGauge(String name, int nItems) { + return registry.newListGauge(groupName, new ListGauge(name, nItems)); + } + /* * Specify a dynamic gauge that always returns the latest value when polled. * The value closure/object must be thread safe, since metrics reporters may access diff --git a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala index e9b6b7658c..63ae3bc779 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala @@ -20,7 +20,7 @@ package org.apache.samza.config -import org.apache.samza.util.HighResolutionClock +import org.apache.samza.system.SystemStream object MetricsConfig { @@ -28,8 +28,11 @@ object MetricsConfig { val METRICS_REPORTERS = "metrics.reporters" val METRICS_REPORTER_FACTORY = "metrics.reporter.%s.class" val METRICS_SNAPSHOT_REPORTER_STREAM = "metrics.reporter.%s.stream" - val METRICS_SNAPSHOT_REPORTER_INTERVAL= "metrics.reporter.%s.interval" - val METRICS_TIMER_ENABLED= "metrics.timer.enabled" + val METRICS_SNAPSHOT_REPORTER_INTERVAL = "metrics.reporter.%s.interval" + val METRICS_TIMER_ENABLED = "metrics.timer.enabled" + val METRICS_SNAPSHOT_REPORTER_PARTITION_KEY = "metrics.reporter.%s.%s.partitionkey" + val METRICS_SNAPSHOT_REPORTER_PARTITION_KEY_JOBNAME = "jobname" + implicit def Config2Metrics(config: Config) = new MetricsConfig(config) } @@ -44,9 +47,36 @@ class MetricsConfig(config: Config) extends ScalaMapConfig(config) { def getMetricsReporterInterval(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL format name) /** - * Returns a list of all metrics names from the config file. Useful for - * getting individual metrics. - */ + * Parses the metric key name specified in config. + * Customers can specify + * hostname (for partitioning by hostname -- default) or jobname or jobid + * + * @param systemStream The systemStream for the MetricsSnapshotStream + * @return the metric key name (if specified), else None + */ + def getMetricsReporterStreamPartitionKeyName(systemStream: SystemStream): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_PARTITION_KEY format(systemStream.getSystem, systemStream.getStream)) + + /** + * Returns the actual value of the partition key, based on the keyName. + * If specified as "jobname" returns the jobname. + * If none is specified or is specified as "hostname", returns the hostname, + * else returns the actual value specified. + * + * @param keyName the desired key name (hostname, jobname, none, or another custom name) + * @param jobName the current job name + * @param hostname the current host name + */ + def getMetricsReporterStreamPartitionKeyValue(keyName: Option[String], jobName: String, hostname: String) = keyName match { + case Some(MetricsConfig.METRICS_SNAPSHOT_REPORTER_PARTITION_KEY_JOBNAME) => jobName + case None => hostname + case _ => keyName.get + } + + + /** + * Returns a list of all metrics names from the config file. Useful for + * getting individual metrics. + */ def getMetricReporterNames() = { getMetricsReporters match { case Some(mr) => if (!"".equals(mr)) { @@ -59,8 +89,9 @@ class MetricsConfig(config: Config) extends ScalaMapConfig(config) { } /** - * Returns the flag to turn on/off the timer metrics. - * @return Boolean flag to enable the timer metrics - */ + * Returns the flag to turn on/off the timer metrics. + * + * @return Boolean flag to enable the timer metrics + */ def getMetricsTimerEnabled: Boolean = getBoolean(MetricsConfig.METRICS_TIMER_ENABLED, true) } \ No newline at end of file diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 5380fc98dc..87a59176bf 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -28,6 +28,7 @@ import java.util.Base64 import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorService, TimeUnit} import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics @@ -42,7 +43,7 @@ import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor} import org.apache.samza.coordinator.stream.{CoordinatorStreamManager, CoordinatorStreamSystemProducer} import org.apache.samza.job.model.JobModel -import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter} +import org.apache.samza.metrics._ import org.apache.samza.serializers._ import org.apache.samza.serializers.model.SamzaObjectMapper import org.apache.samza.storage.{StorageEngineFactory, TaskStorageManager} @@ -751,6 +752,10 @@ class SamzaContainer( } status = SamzaContainerStatus.FAILED exceptionSeen = e + + // Adding a shutdownException to the exception ListGauge in SamzaContainerMetrics + metrics.exception.add(new Gauge[String]("shutdownException", ExceptionUtils.getStackTrace(e))) + debug("Updated value of exceptionAtShutdown to %s" format ExceptionUtils.getStackTrace(e)) } try { diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala index c1229565ad..d6cefb9162 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala @@ -48,6 +48,9 @@ class SamzaContainerMetrics( val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]() + // A string-gauge metric to capture the last 1000 exceptions at this container + val exception = newListGauge[String]("exception", 1000) + def addStoreRestorationGauge(taskName: TaskName, storeName: String) { taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L)) } diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala index 1520b0e62d..4342cd9eb1 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsHelper.scala @@ -40,9 +40,11 @@ trait MetricsHelper { def newGauge[T](name: String, value: T) = metricGroup.newGauge[T](name,value) + def newListGauge[T](name: String, listSize: Int) = metricGroup.newListGauge[T](name, listSize) + /** - * Specify a dynamic gauge that always returns the latest value when polled. - * The value closure must be thread safe, since metrics reporters may access + * Specify a dynamic gauge that always returns the latest value when polled. + * The value closure must be thread safe, since metrics reporters may access * it from another thread. */ def newGauge[T](name: String, value: () => T) = { diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala index 40ffee2bfa..aec0898ee9 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala @@ -23,9 +23,9 @@ import org.apache.samza.util.Logging import java.util.concurrent.ConcurrentHashMap /** - * A class that holds all metrics registered with it. It can be registered - * with one or more MetricReporters to flush metrics. - */ + * A class that holds all metrics registered with it. It can be registered + * with one or more MetricReporters to flush metrics. + */ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with Logging { var listeners = Set[ReadableMetricsRegistryListener]() @@ -37,7 +37,7 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with def this() = this("unknown") def newCounter(group: String, counter: Counter) = { - debug("Add new counter %s %s %s." format (group, counter.getName, counter)) + debug("Add new counter %s %s %s." format(group, counter.getName, counter)) putAndGetGroup(group).putIfAbsent(counter.getName, counter) val realCounter = metrics.get(group).get(counter.getName).asInstanceOf[Counter] listeners.foreach(_.onCounter(group, realCounter)) @@ -45,25 +45,33 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with } def newCounter(group: String, name: String) = { - debug("Creating new counter %s %s." format (group, name)) + debug("Creating new counter %s %s." format(group, name)) newCounter(group, new Counter(name)) } def newGauge[T](group: String, gauge: Gauge[T]) = { - debug("Adding new gauge %s %s %s." format (group, gauge.getName, gauge)) + debug("Adding new gauge %s %s %s." format(group, gauge.getName, gauge)) putAndGetGroup(group).putIfAbsent(gauge.getName, gauge) val realGauge = metrics.get(group).get(gauge.getName).asInstanceOf[Gauge[T]] listeners.foreach(_.onGauge(group, realGauge)) realGauge } + def newListGauge[T](group: String, listGauge: ListGauge) = { + debug("Adding new listgauge %s %s %s." format(group, listGauge.getName, listGauge)) + putAndGetGroup(group).putIfAbsent(listGauge.getName, listGauge) + val realListGauge = metrics.get(group).get(listGauge.getName).asInstanceOf[ListGauge] + listeners.foreach(_.onListGauge(group, realListGauge)) + realListGauge + } + def newGauge[T](group: String, name: String, value: T) = { - debug("Creating new gauge %s %s %s." format (group, name, value)) + debug("Creating new gauge %s %s %s." format(group, name, value)) newGauge(group, new Gauge[T](name, value)) } def newTimer(group: String, timer: Timer) = { - debug("Add new timer %s %s %s." format (group, timer.getName, timer)) + debug("Add new timer %s %s %s." format(group, timer.getName, timer)) putAndGetGroup(group).putIfAbsent(timer.getName, timer) val realTimer = metrics.get(group).get(timer.getName).asInstanceOf[Timer] listeners.foreach(_.onTimer(group, realTimer)) @@ -71,7 +79,7 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with } def newTimer(group: String, name: String) = { - debug("Creating new timer %s %s." format (group, name)) + debug("Creating new timer %s %s." format(group, name)) newTimer(group, new Timer(name)) } @@ -95,4 +103,5 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with def unlisten(listener: ReadableMetricsRegistryListener) { listeners -= listener } + } diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala index 7da8a9c52a..797d5c5f71 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala @@ -20,19 +20,15 @@ package org.apache.samza.metrics.reporter import java.lang.management.ManagementFactory + import org.apache.samza.util.Logging import javax.management.MBeanServer import javax.management.ObjectName + import org.apache.samza.config.Config -import org.apache.samza.metrics.Counter -import org.apache.samza.metrics.Gauge -import org.apache.samza.metrics.Timer -import org.apache.samza.metrics.MetricsReporter -import org.apache.samza.metrics.MetricsReporterFactory -import org.apache.samza.metrics.ReadableMetricsRegistry -import org.apache.samza.metrics.ReadableMetricsRegistryListener +import org.apache.samza.metrics._ + import scala.collection.JavaConverters._ -import org.apache.samza.metrics.MetricsVisitor import org.apache.samza.metrics.JmxUtil._ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { @@ -49,9 +45,14 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { registry.getGroup(group).asScala.foreach { case (name, metric) => metric.visit(new MetricsVisitor { + def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry)))) + def gauge[T](gauge: Gauge[T]) = registerBean(new JmxGauge(gauge.asInstanceOf[Gauge[Object]], getObjectName(group, name, sources(registry)))) + def timer(timer: Timer) = registerBean(new JmxTimer(timer, getObjectName(group, name, sources(registry)))) + + def listGauge[T](listGauge: ListGauge) = registerBean(new JmxListGauge(listGauge.asInstanceOf[ListGauge], getObjectName(group, listGauge.getName, sources(registry)))) }) } }) @@ -73,6 +74,10 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { def onTimer(group: String, timer: Timer) { registerBean(new JmxTimer(timer, getObjectName(group, timer.getName, source))) } + + def onListGauge(group: String, listGauge: ListGauge) { + registerBean(new JmxListGauge(listGauge, getObjectName(group, listGauge.getName, source))) + } } } else { warn("Trying to re-register a registry for source %s. Ignoring." format source) @@ -107,6 +112,13 @@ trait JmxGaugeMBean extends MetricMBean { class JmxGauge(g: org.apache.samza.metrics.Gauge[Object], on: ObjectName) extends JmxGaugeMBean { def getValue = g.getValue + + def objectName = on +} + +class JmxListGauge(g: org.apache.samza.metrics.ListGauge, on: ObjectName) extends JmxGaugeMBean { + def getValue = g.getValue + def objectName = on } @@ -116,6 +128,7 @@ trait JmxCounterMBean extends MetricMBean { class JmxCounter(c: org.apache.samza.metrics.Counter, on: ObjectName) extends JmxCounterMBean { def getCount() = c.getCount() + def objectName = on } @@ -125,6 +138,7 @@ trait JmxTimerMBean extends MetricMBean { class JmxTimer(t: org.apache.samza.metrics.Timer, on: ObjectName) extends JmxTimerMBean { def getAverageTime() = t.getSnapshot().getAverage() + def objectName = on } diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala index 65ca49c2f3..bc58f1119c 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala @@ -19,6 +19,8 @@ package org.apache.samza.metrics.reporter +import java.util + import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.samza.metrics._ import org.apache.samza.serializers.Serializer @@ -26,7 +28,6 @@ import org.apache.samza.system.OutgoingMessageEnvelope import org.apache.samza.system.SystemProducer import org.apache.samza.system.SystemStream import org.apache.samza.util.Logging - import java.util.HashMap import java.util.Map import java.util.concurrent.Executors @@ -54,6 +55,7 @@ class MetricsSnapshotReporter( samzaVersion: String, host: String, serializer: Serializer[MetricsSnapshot] = null, + partitionKey: Object, clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging { val executor = Executors.newSingleThreadScheduledExecutor( @@ -83,15 +85,18 @@ class MetricsSnapshotReporter( } def stop = { - info("Stopping producer.") - producer.stop + // Scheduling an event with 0 delay to ensure flushing of metrics one last time before shutdown + executor.schedule(this,0, TimeUnit.SECONDS) info("Stopping reporter timer.") - + // Allow the scheduled task above to finish, and block for termination (for max 60 seconds) executor.shutdown executor.awaitTermination(60, TimeUnit.SECONDS) + info("Stopping producer.") + producer.stop + if (!executor.isTerminated) { warn("Unable to shutdown reporter timer.") } @@ -112,6 +117,8 @@ class MetricsSnapshotReporter( registry.getGroup(group).asScala.foreach { case (name, metric) => metric.visit(new MetricsVisitor { + // for listGauge the value is returned as a list, which gets serialized + def listGauge[T](listGauge: ListGauge) = {groupMsg.put(name, listGauge.getValue) } def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long) def gauge[T](gauge: Gauge[T]) = groupMsg.put(name, gauge.getValue.asInstanceOf[Object]) def timer(timer: Timer) = groupMsg.put(name, timer.getSnapshot().getAverage(): java.lang.Double) @@ -133,12 +140,18 @@ class MetricsSnapshotReporter( metricsSnapshot } - producer.send(source, new OutgoingMessageEnvelope(out, host, null, maybeSerialized)) + try { + + producer.send(source, new OutgoingMessageEnvelope(out, partitionKey, null, maybeSerialized)) - // Always flush, since we don't want metrics to get batched up. - producer.flush(source) + // Always flush, since we don't want metrics to get batched up. + producer.flush(source) + } catch { + case e: Exception => error("Exception when flushing metrics for source %s " format(source), e) + } } + debug("Finished flushing metrics.") } } diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala index 33802a17b0..2bd98e8eff 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala @@ -103,6 +103,11 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging info("Got serde %s." format serde) + + val partitionKeyName = config.getMetricsReporterStreamPartitionKeyName(systemStream) + val partitionKeyValue = config.getMetricsReporterStreamPartitionKeyValue(partitionKeyName, jobName, Util.getLocalHost.getHostName) + info("Using PartitionKey %s = %s" format (partitionKeyName, partitionKeyValue)) + val pollingInterval: Int = config .getMetricsReporterInterval(name) .getOrElse("60").toInt @@ -118,7 +123,8 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging version, samzaVersion, Util.getLocalHost.getHostName, - serde) + serde, + partitionKeyValue) reporter.register(this.getClass.getSimpleName.toString, registry) diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala index 122a1dfaee..c4e72f1cdb 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala @@ -49,6 +49,9 @@ class ApplicationMasterRestServlet(samzaConfig: Config, samzaAppState: SamzaAppl registry.getGroup(group).asScala.foreach { case (name, metric) => metric.visit(new MetricsVisitor() { + def listGauge[T](listGauge: ListGauge) = + groupMap.put(name, listGauge.getValue) + def counter(counter: Counter) = groupMap.put(counter.getName, counter.getCount: java.lang.Long)