diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala index 40ffee2bfa..9751f688c8 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala @@ -50,8 +50,10 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with } def newGauge[T](group: String, gauge: Gauge[T]) = { - debug("Adding new gauge %s %s %s." format (group, gauge.getName, gauge)) - putAndGetGroup(group).putIfAbsent(gauge.getName, gauge) + if (putAndGetGroup(group).containsKey(gauge.getName)) { + debug("Updating existing gauge %s %s %s" format (group, gauge.getName, gauge)) + } + putAndGetGroup(group).put(gauge.getName, gauge) val realGauge = metrics.get(group).get(gauge.getName).asInstanceOf[Gauge[T]] listeners.foreach(_.onGauge(group, realGauge)) realGauge diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala index ca9c023ad8..2a4f44eafc 100644 --- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala @@ -25,7 +25,7 @@ import java.util import org.apache.samza.SamzaException import org.apache.samza.config.MapConfig -import org.apache.samza.metrics.{Gauge, MetricsRegistryMap} +import org.apache.samza.metrics.{Counter, Gauge, MetricsRegistryMap, MetricsVisitor, Timer} import org.apache.samza.util.ExponentialSleepStrategy import org.junit.{Assert, Test} import org.rocksdb.{FlushOptions, Options, RocksDB, RocksIterator} @@ -210,4 +210,27 @@ class TestRocksDbKeyValueStore rocksDB.close() } + + @Test + def testRocksDBMetricsWithBulkLoadRWRecreate(): Unit = { + val registry = new MetricsRegistryMap("registrymap") + val metrics = new KeyValueStoreMetrics("dbstore", registry) + + // Sample metric values for estimate-num-keys metrics + val bulkloadStoreMetricValue = "100" + val readWriteStoreMetricValue = "10" + + // Metric during bulk-load/bootstrap + metrics.newGauge("estimate-num-keys", () => bulkloadStoreMetricValue) + + assert(registry.getGroup("org.apache.samza.storage.kv.KeyValueStoreMetrics"). + get("dbstore-estimate-num-keys").asInstanceOf[Gauge[String]].getValue.eq("100")) + + // Bulk-load complete, new store in read-write mode + metrics.newGauge("estimate-num-keys", () => readWriteStoreMetricValue.toString) + + assert(registry.getGroup("org.apache.samza.storage.kv.KeyValueStoreMetrics"). + get("dbstore-estimate-num-keys").asInstanceOf[Gauge[String]].getValue.eq("10")) + } + }