diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java index 6bf34306e272..fc8dcb49894f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java @@ -72,14 +72,6 @@ public StringSetData getCumulative() { return setValue.get(); } - // Used by Streaming metric container to extract deltas since streaming metrics are - // reported as deltas rather than cumulative as in batch. - // For delta we take the current value then reset the cell to empty so the next call only see - // delta/updates from last call. - public StringSetData getAndReset() { - return setValue.getAndUpdate(unused -> StringSetData.empty()); - } - @Override public MetricName getName() { return name; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index cf07e5f72401..aeef7784c2c3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -519,7 +519,7 @@ public Iterable extractMetricUpdates(boolean isFinalUpdate) { .transform( update -> MetricsToCounterUpdateConverter.fromStringSet( - update.getKey(), true, update.getUpdate()))); + update.getKey(), update.getUpdate()))); }); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java index 4866d2011222..dbedc51528a5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java @@ -98,8 +98,7 @@ public static CounterUpdate fromGauge( .setIntegerGauge(integerGaugeProto); } - public static CounterUpdate fromStringSet( - MetricKey key, boolean isCumulative, StringSetData stringSetData) { + public static CounterUpdate fromStringSet(MetricKey key, StringSetData stringSetData) { CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET); StringList stringList = new StringList(); @@ -107,7 +106,7 @@ public static CounterUpdate fromStringSet( return new CounterUpdate() .setStructuredNameAndMetadata(name) - .setCumulative(isCumulative) + .setCumulative(false) .setStringList(stringList); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index dd7db5ae9d1b..03ba501941ae 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -34,7 +34,6 @@ import org.apache.beam.runners.core.metrics.GaugeCell; import org.apache.beam.runners.core.metrics.MetricsMap; import org.apache.beam.runners.core.metrics.StringSetCell; -import org.apache.beam.runners.core.metrics.StringSetData; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Gauge; @@ -74,7 +73,7 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private final ConcurrentHashMap perWorkerGauges = new ConcurrentHashMap<>(); - private MetricsMap stringSets = new MetricsMap<>(StringSetCell::new); + private MetricsMap stringSet = new MetricsMap<>(StringSetCell::new); private MetricsMap distributions = new MetricsMap<>(DeltaDistributionCell::new); @@ -183,7 +182,7 @@ public Gauge getPerWorkerGauge(MetricName metricName) { @Override public StringSet getStringSet(MetricName metricName) { - return stringSets.get(metricName); + return stringSet.get(metricName); } @Override @@ -203,11 +202,9 @@ public Histogram getPerWorkerHistogram( } public Iterable extractUpdates() { - // Streaming metrics are updated as delta and not cumulative. return counterUpdates() .append(distributionUpdates()) - .append(gaugeUpdates()) - .append(stringSetUpdates()); + .append(gaugeUpdates().append(stringSetUpdates())); } private FluentIterable counterUpdates() { @@ -250,18 +247,14 @@ private FluentIterable gaugeUpdates() { } private FluentIterable stringSetUpdates() { - return FluentIterable.from(stringSets.entries()) + return FluentIterable.from(stringSet.entries()) .transform( new Function, CounterUpdate>() { @Override public @Nullable CounterUpdate apply( @Nonnull Map.Entry entry) { - StringSetData value = entry.getValue().getAndReset(); - if (value.stringSet().isEmpty()) { - return null; - } return MetricsToCounterUpdateConverter.fromStringSet( - MetricKey.create(stepName, entry.getKey()), false, value); + MetricKey.create(stepName, entry.getKey()), entry.getValue().getCumulative()); } }) .filter(Predicates.notNull()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 4b758aa6cd45..37c5ad261280 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -315,14 +315,14 @@ public void testStringSetUpdateExtraction() { .setStringList(new StringList().setElements(Arrays.asList("ij", "kl", "mn"))); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); - assertThat(updates, containsInAnyOrder(name2Update)); + assertThat(updates, containsInAnyOrder(name1Update, name2Update)); - // test deltas c1.getStringSet(name1).add("op"); - name1Update.setStringList(new StringList().setElements(Arrays.asList("op"))); + name1Update.setStringList( + new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh", "op"))); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); - assertThat(updates, containsInAnyOrder(name1Update)); + assertThat(updates, containsInAnyOrder(name1Update, name2Update)); } @Test