diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java index fc8dcb49894f..6bf34306e272 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java @@ -72,6 +72,14 @@ public StringSetData getCumulative() { return setValue.get(); } + // Used by Streaming metric container to extract deltas since streaming metrics are + // reported as deltas rather than cumulative as in batch. + // For delta we take the current value then reset the cell to empty so the next call only see + // delta/updates from last call. + public StringSetData getAndReset() { + return setValue.getAndUpdate(unused -> StringSetData.empty()); + } + @Override public MetricName getName() { return name; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index aeef7784c2c3..cf07e5f72401 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -519,7 +519,7 @@ public Iterable extractMetricUpdates(boolean isFinalUpdate) { .transform( update -> MetricsToCounterUpdateConverter.fromStringSet( - update.getKey(), update.getUpdate()))); + update.getKey(), true, update.getUpdate()))); }); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java index dbedc51528a5..4866d2011222 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java @@ -98,7 +98,8 @@ public static CounterUpdate fromGauge( .setIntegerGauge(integerGaugeProto); } - public static CounterUpdate fromStringSet(MetricKey key, StringSetData stringSetData) { + public static CounterUpdate fromStringSet( + MetricKey key, boolean isCumulative, StringSetData stringSetData) { CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET); StringList stringList = new StringList(); @@ -106,7 +107,7 @@ public static CounterUpdate fromStringSet(MetricKey key, StringSetData stringSet return new CounterUpdate() .setStructuredNameAndMetadata(name) - .setCumulative(false) + .setCumulative(isCumulative) .setStringList(stringList); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 03ba501941ae..dd7db5ae9d1b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.core.metrics.GaugeCell; import org.apache.beam.runners.core.metrics.MetricsMap; import org.apache.beam.runners.core.metrics.StringSetCell; +import org.apache.beam.runners.core.metrics.StringSetData; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Gauge; @@ -73,7 +74,7 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private final ConcurrentHashMap perWorkerGauges = new ConcurrentHashMap<>(); - private MetricsMap stringSet = new MetricsMap<>(StringSetCell::new); + private MetricsMap stringSets = new MetricsMap<>(StringSetCell::new); private MetricsMap distributions = new MetricsMap<>(DeltaDistributionCell::new); @@ -182,7 +183,7 @@ public Gauge getPerWorkerGauge(MetricName metricName) { @Override public StringSet getStringSet(MetricName metricName) { - return stringSet.get(metricName); + return stringSets.get(metricName); } @Override @@ -202,9 +203,11 @@ public Histogram getPerWorkerHistogram( } public Iterable extractUpdates() { + // Streaming metrics are updated as delta and not cumulative. return counterUpdates() .append(distributionUpdates()) - .append(gaugeUpdates().append(stringSetUpdates())); + .append(gaugeUpdates()) + .append(stringSetUpdates()); } private FluentIterable counterUpdates() { @@ -247,14 +250,18 @@ private FluentIterable gaugeUpdates() { } private FluentIterable stringSetUpdates() { - return FluentIterable.from(stringSet.entries()) + return FluentIterable.from(stringSets.entries()) .transform( new Function, CounterUpdate>() { @Override public @Nullable CounterUpdate apply( @Nonnull Map.Entry entry) { + StringSetData value = entry.getValue().getAndReset(); + if (value.stringSet().isEmpty()) { + return null; + } return MetricsToCounterUpdateConverter.fromStringSet( - MetricKey.create(stepName, entry.getKey()), entry.getValue().getCumulative()); + MetricKey.create(stepName, entry.getKey()), false, value); } }) .filter(Predicates.notNull()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 37c5ad261280..4b758aa6cd45 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -315,14 +315,14 @@ public void testStringSetUpdateExtraction() { .setStringList(new StringList().setElements(Arrays.asList("ij", "kl", "mn"))); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); - assertThat(updates, containsInAnyOrder(name1Update, name2Update)); + assertThat(updates, containsInAnyOrder(name2Update)); + // test deltas c1.getStringSet(name1).add("op"); - name1Update.setStringList( - new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh", "op"))); + name1Update.setStringList(new StringList().setElements(Arrays.asList("op"))); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); - assertThat(updates, containsInAnyOrder(name1Update, name2Update)); + assertThat(updates, containsInAnyOrder(name1Update)); } @Test