From 1e7e80747ccc1cfc452f5c26380b6574496195f7 Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Tue, 21 Jan 2025 00:43:03 -0800 Subject: [PATCH 1/2] Report delta StringSet counters from for streaming --- .../apache/beam/runners/core/metrics/StringSetCell.java | 8 ++++++++ .../dataflow/worker/BatchModeExecutionContext.java | 2 +- .../dataflow/worker/MetricsToCounterUpdateConverter.java | 5 +++-- .../dataflow/worker/StreamingStepMetricsContainer.java | 4 +++- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java index fc8dcb49894f..6bf34306e272 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java @@ -72,6 +72,14 @@ public StringSetData getCumulative() { return setValue.get(); } + // Used by Streaming metric container to extract deltas since streaming metrics are + // reported as deltas rather than cumulative as in batch. + // For delta we take the current value then reset the cell to empty so the next call only see + // delta/updates from last call. + public StringSetData getAndReset() { + return setValue.getAndUpdate(unused -> StringSetData.empty()); + } + @Override public MetricName getName() { return name; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index aeef7784c2c3..cf07e5f72401 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -519,7 +519,7 @@ public Iterable extractMetricUpdates(boolean isFinalUpdate) { .transform( update -> MetricsToCounterUpdateConverter.fromStringSet( - update.getKey(), update.getUpdate()))); + update.getKey(), true, update.getUpdate()))); }); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java index dbedc51528a5..4866d2011222 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java @@ -98,7 +98,8 @@ public static CounterUpdate fromGauge( .setIntegerGauge(integerGaugeProto); } - public static CounterUpdate fromStringSet(MetricKey key, StringSetData stringSetData) { + public static CounterUpdate fromStringSet( + MetricKey key, boolean isCumulative, StringSetData stringSetData) { CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET); StringList stringList = new StringList(); @@ -106,7 +107,7 @@ public static CounterUpdate fromStringSet(MetricKey key, StringSetData stringSet return new CounterUpdate() .setStructuredNameAndMetadata(name) - .setCumulative(false) + .setCumulative(isCumulative) .setStringList(stringList); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 03ba501941ae..59ca17b2835d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -202,6 +202,7 @@ public Histogram getPerWorkerHistogram( } public Iterable extractUpdates() { + // Streaming metrics are updated as delta and not cumulative. return counterUpdates() .append(distributionUpdates()) .append(gaugeUpdates().append(stringSetUpdates())); @@ -254,7 +255,8 @@ private FluentIterable stringSetUpdates() { public @Nullable CounterUpdate apply( @Nonnull Map.Entry entry) { return MetricsToCounterUpdateConverter.fromStringSet( - MetricKey.create(stepName, entry.getKey()), entry.getValue().getCumulative()); + MetricKey.create(stepName, entry.getKey()), + false, entry.getValue().getAndReset()); } }) .filter(Predicates.notNull()); From 51465fbc9158ec6704756d87d32f20b4cd34e3cc Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Wed, 22 Jan 2025 22:21:17 -0800 Subject: [PATCH 2/2] Update unit test --- .../worker/StreamingStepMetricsContainer.java | 17 +++++++++++------ .../StreamingStepMetricsContainerTest.java | 8 ++++---- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 59ca17b2835d..dd7db5ae9d1b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.core.metrics.GaugeCell; import org.apache.beam.runners.core.metrics.MetricsMap; import org.apache.beam.runners.core.metrics.StringSetCell; +import org.apache.beam.runners.core.metrics.StringSetData; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Gauge; @@ -73,7 +74,7 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private final ConcurrentHashMap perWorkerGauges = new ConcurrentHashMap<>(); - private MetricsMap stringSet = new MetricsMap<>(StringSetCell::new); + private MetricsMap stringSets = new MetricsMap<>(StringSetCell::new); private MetricsMap distributions = new MetricsMap<>(DeltaDistributionCell::new); @@ -182,7 +183,7 @@ public Gauge getPerWorkerGauge(MetricName metricName) { @Override public StringSet getStringSet(MetricName metricName) { - return stringSet.get(metricName); + return stringSets.get(metricName); } @Override @@ -205,7 +206,8 @@ public Iterable extractUpdates() { // Streaming metrics are updated as delta and not cumulative. return counterUpdates() .append(distributionUpdates()) - .append(gaugeUpdates().append(stringSetUpdates())); + .append(gaugeUpdates()) + .append(stringSetUpdates()); } private FluentIterable counterUpdates() { @@ -248,15 +250,18 @@ private FluentIterable gaugeUpdates() { } private FluentIterable stringSetUpdates() { - return FluentIterable.from(stringSet.entries()) + return FluentIterable.from(stringSets.entries()) .transform( new Function, CounterUpdate>() { @Override public @Nullable CounterUpdate apply( @Nonnull Map.Entry entry) { + StringSetData value = entry.getValue().getAndReset(); + if (value.stringSet().isEmpty()) { + return null; + } return MetricsToCounterUpdateConverter.fromStringSet( - MetricKey.create(stepName, entry.getKey()), - false, entry.getValue().getAndReset()); + MetricKey.create(stepName, entry.getKey()), false, value); } }) .filter(Predicates.notNull()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 37c5ad261280..4b758aa6cd45 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -315,14 +315,14 @@ public void testStringSetUpdateExtraction() { .setStringList(new StringList().setElements(Arrays.asList("ij", "kl", "mn"))); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); - assertThat(updates, containsInAnyOrder(name1Update, name2Update)); + assertThat(updates, containsInAnyOrder(name2Update)); + // test deltas c1.getStringSet(name1).add("op"); - name1Update.setStringList( - new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh", "op"))); + name1Update.setStringList(new StringList().setElements(Arrays.asList("op"))); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); - assertThat(updates, containsInAnyOrder(name1Update, name2Update)); + assertThat(updates, containsInAnyOrder(name1Update)); } @Test