From 05086be167f7cc02a862a689a86a863802762b85 Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Wed, 1 Jan 2025 16:24:57 -0800 Subject: [PATCH 1/5] Revert "Drop support DataflowMetrics and MetricsToCounterUpdateConverter until Dataflow java client support is ready" This reverts commit a2cb7080d5abb4fe3f9dbf16fe68ad3d059d72b8. --- .../google-cloud-dataflow-java/build.gradle | 1 + .../runners/dataflow/DataflowMetrics.java | 36 ++++++++-- .../runners/dataflow/DataflowMetricsTest.java | 67 ++++++++++++++++++ .../worker/BatchModeExecutionContext.java | 7 +- .../MetricsToCounterUpdateConverter.java | 11 +++ .../worker/StreamingStepMetricsContainer.java | 17 ++++- .../worker/BatchModeExecutionContextTest.java | 40 +++++++++++ .../StreamingStepMetricsContainerTest.java | 69 +++++++++++++++++++ 8 files changed, 241 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index ad03ac713289..1eeef835907f 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -94,6 +94,7 @@ dependencies { // io-kafka is only used in PTransform override so it is optional provided project(":sdks:java:io:kafka") implementation project(":sdks:java:io:google-cloud-platform") + implementation project(":runners:core-java") implementation library.java.avro implementation library.java.bigdataoss_util implementation library.java.commons_codec diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index c79897b5e5c3..94d0c902e84b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -29,7 +29,9 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.sdk.metrics.BoundedTrieResult; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeResult; @@ -55,7 +57,11 @@ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) class DataflowMetrics extends MetricResults { + private static final Logger LOG = LoggerFactory.getLogger(DataflowMetrics.class); + // TODO (rosinha): Remove this once bounded_trie is available in metrics proto Dataflow + // java client. + public static final String BOUNDED_TRIE = "bounded_trie"; /** * Client for the Dataflow service. This can be used to query the service for information about * the job. @@ -104,13 +110,13 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) { ImmutableList> distributions = ImmutableList.of(); ImmutableList> gauges = ImmutableList.of(); ImmutableList> stringSets = ImmutableList.of(); - ImmutableList> boudedTries = ImmutableList.of(); + ImmutableList> boundedTries = ImmutableList.of(); JobMetrics jobMetrics; try { jobMetrics = getJobMetrics(); } catch (IOException e) { LOG.warn("Unable to query job metrics.\n"); - return MetricQueryResults.create(counters, distributions, gauges, stringSets, boudedTries); + return MetricQueryResults.create(counters, distributions, gauges, stringSets, boundedTries); } metricUpdates = firstNonNull(jobMetrics.getMetrics(), Collections.emptyList()); return populateMetricQueryResults(metricUpdates, filter); @@ -134,6 +140,7 @@ private static class DataflowMetricResultExtractor { private final ImmutableList.Builder> distributionResults; private final ImmutableList.Builder> gaugeResults; private final ImmutableList.Builder> stringSetResults; + private final ImmutableList.Builder> boundedTrieResults; private final boolean isStreamingJob; DataflowMetricResultExtractor(boolean isStreamingJob) { @@ -141,6 +148,7 @@ private static class DataflowMetricResultExtractor { distributionResults = ImmutableList.builder(); gaugeResults = ImmutableList.builder(); stringSetResults = ImmutableList.builder(); + boundedTrieResults = ImmutableList.builder(); /* In Dataflow streaming jobs, only ATTEMPTED metrics are available. * In Dataflow batch jobs, only COMMITTED metrics are available, but * we must provide ATTEMPTED, so we use COMMITTED as a good approximation. @@ -169,6 +177,11 @@ public void addMetricResult( // stringset metric StringSetResult value = getStringSetValue(committed); stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); + } else if (committed.get(BOUNDED_TRIE) != null && attempted.get(BOUNDED_TRIE) != null) { + // TODO (rosinha): This is dummy code. Once Dataflow MetricUpdate + // google client api is updated. Update this. + BoundedTrieResult value = getBoundedTrieValue(committed); + boundedTrieResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); } else { // This is exceptionally unexpected. We expect matching user metrics to only have the // value types provided by the Metrics API. @@ -196,6 +209,15 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) { return StringSetResult.create(ImmutableSet.copyOf(((Collection) metricUpdate.getSet()))); } + private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) { + if (metricUpdate.get(BOUNDED_TRIE) == null) { + return BoundedTrieResult.empty(); + } + BoundedTrie bTrie = (BoundedTrie) metricUpdate.get(BOUNDED_TRIE); + BoundedTrieData trieData = BoundedTrieData.fromProto(bTrie); + return BoundedTrieResult.create(trieData.extractResult().getResult()); + } + private DistributionResult getDistributionValue(MetricUpdate metricUpdate) { if (metricUpdate.getDistribution() == null) { return DistributionResult.IDENTITY_ELEMENT; @@ -220,9 +242,13 @@ public Iterable> getGaugeResults() { return gaugeResults.build(); } - public Iterable> geStringSetResults() { + public Iterable> getStringSetResults() { return stringSetResults.build(); } + + public Iterable> getBoundedTrieResults() { + return boundedTrieResults.build(); + } } private static class DataflowMetricQueryResultsFactory { @@ -388,8 +414,8 @@ public MetricQueryResults build() { extractor.getCounterResults(), extractor.getDistributionResults(), extractor.getGaugeResults(), - extractor.geStringSetResults(), - ImmutableList.of()); + extractor.getStringSetResults(), + extractor.getBoundedTrieResults()); } } } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index 745b065ea841..90a63554ef34 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -41,11 +41,13 @@ import java.io.IOException; import java.math.BigDecimal; import java.util.Set; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.DataflowTemplateJob; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; +import org.apache.beam.sdk.metrics.BoundedTrieResult; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricsFilter; @@ -196,6 +198,13 @@ private MetricUpdate makeStringSetMetricUpdate( return setStructuredName(update, name, namespace, step, tentative); } + private MetricUpdate makeBoundedTrieMetricUpdate( + String name, String namespace, String step, BoundedTrieData data, boolean tentative) { + MetricUpdate update = new MetricUpdate(); + update.set(DataflowMetrics.BOUNDED_TRIE, data.toProto()); + return setStructuredName(update, name, namespace, step, tentative); + } + @Test public void testSingleCounterUpdates() throws IOException { AppliedPTransform myStep = mock(AppliedPTransform.class); @@ -286,6 +295,64 @@ public void testSingleStringSetUpdates() throws IOException { StringSetResult.create(ImmutableSet.of("ab", "cd"))))); } + @Test + public void testSingleBoundedTrieUpdates() throws IOException { + AppliedPTransform myStep = mock(AppliedPTransform.class); + when(myStep.getFullName()).thenReturn("myStepName"); + BiMap, String> transformStepNames = HashBiMap.create(); + transformStepNames.put(myStep, "s2"); + + JobMetrics jobMetrics = new JobMetrics(); + DataflowPipelineJob job = mock(DataflowPipelineJob.class); + DataflowPipelineOptions options = mock(DataflowPipelineOptions.class); + when(options.isStreaming()).thenReturn(false); + when(job.getDataflowOptions()).thenReturn(options); + when(job.getState()).thenReturn(State.RUNNING); + when(job.getJobId()).thenReturn(JOB_ID); + when(job.getTransformStepNames()).thenReturn(transformStepNames); + + // The parser relies on the fact that one tentative and one committed metric update exist in + // the job metrics results. + MetricUpdate mu1 = + makeBoundedTrieMetricUpdate( + "counterName", + "counterNamespace", + "s2", + new BoundedTrieData(ImmutableList.of("ab", "cd")), + false); + MetricUpdate mu1Tentative = + makeBoundedTrieMetricUpdate( + "counterName", + "counterNamespace", + "s2", + new BoundedTrieData(ImmutableList.of("ab", "cd")), + true); + jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative)); + DataflowClient dataflowClient = mock(DataflowClient.class); + when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); + + DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); + MetricQueryResults result = dataflowMetrics.allMetrics(); + assertThat( + result.getBoundedTries(), + containsInAnyOrder( + attemptedMetricsResult( + "counterNamespace", + "counterName", + "myStepName", + BoundedTrieResult.create( + ImmutableSet.of(ImmutableList.of("ab", "cd", String.valueOf(false))))))); + assertThat( + result.getBoundedTries(), + containsInAnyOrder( + committedMetricsResult( + "counterNamespace", + "counterName", + "myStepName", + BoundedTrieResult.create( + ImmutableSet.of(ImmutableList.of("ab", "cd", String.valueOf(false))))))); + } + @Test public void testIgnoreDistributionButGetCounterUpdates() throws IOException { AppliedPTransform myStep = mock(AppliedPTransform.class); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index cf07e5f72401..d62198e8321d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -519,7 +519,12 @@ public Iterable extractMetricUpdates(boolean isFinalUpdate) { .transform( update -> MetricsToCounterUpdateConverter.fromStringSet( - update.getKey(), true, update.getUpdate()))); + update.getKey(), true, update.getUpdate())), + FluentIterable.from(updates.boundedTrieUpdates()) + .transform( + update -> + MetricsToCounterUpdateConverter.fromBoundedTrie( + update.getKey(), update.getUpdate()))); }); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java index 4866d2011222..42cc4ac8bf8f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java @@ -27,6 +27,7 @@ import com.google.api.services.dataflow.model.IntegerGauge; import com.google.api.services.dataflow.model.StringList; import java.util.ArrayList; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.core.metrics.DistributionData; import org.apache.beam.runners.core.metrics.StringSetData; import org.apache.beam.sdk.metrics.MetricKey; @@ -111,6 +112,16 @@ public static CounterUpdate fromStringSet( .setStringList(stringList); } + public static CounterUpdate fromBoundedTrie(MetricKey key, BoundedTrieData boundedTrieData) { + // BoundedTrie uses SET kind metric aggregation which tracks unique strings. + CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET); + // TODO (rosinha): Once the CounterUpdate API is updated in dataflow client update this. + return new CounterUpdate() + .setStructuredNameAndMetadata(name) + .setCumulative(false) + .set("bounded_trie", boundedTrieData.toProto()); + } + public static CounterUpdate fromDistribution( MetricKey key, boolean isCumulative, DistributionData update) { CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.DISTRIBUTION); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 0135e91eab9f..8ade66061ba5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -217,7 +217,8 @@ public Iterable extractUpdates() { return counterUpdates() .append(distributionUpdates()) .append(gaugeUpdates()) - .append(stringSetUpdates()); + .append(stringSetUpdates()) + .append(boundedTrieUpdates()); } private FluentIterable counterUpdates() { @@ -277,6 +278,20 @@ private FluentIterable stringSetUpdates() { .filter(Predicates.notNull()); } + private FluentIterable boundedTrieUpdates() { + return FluentIterable.from(boundedTries.entries()) + .transform( + new Function, CounterUpdate>() { + @Override + public @Nullable CounterUpdate apply( + @Nonnull Map.Entry entry) { + return MetricsToCounterUpdateConverter.fromBoundedTrie( + MetricKey.create(stepName, entry.getKey()), entry.getValue().getCumulative()); + } + }) + .filter(Predicates.notNull()); + } + private FluentIterable distributionUpdates() { return FluentIterable.from(distributions.entries()) .transform( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java index d3fa69e2c31c..051f164fa616 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java @@ -32,6 +32,8 @@ import com.google.api.services.dataflow.model.DistributionUpdate; import com.google.api.services.dataflow.model.StringList; import java.util.Arrays; +import org.apache.beam.model.pipeline.v1.MetricsApi; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; @@ -40,6 +42,7 @@ import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; +import org.apache.beam.sdk.metrics.BoundedTrie; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.MetricName; @@ -47,6 +50,7 @@ import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.StringSet; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.hamcrest.Matchers; import org.junit.Test; import org.junit.runner.RunWith; @@ -193,6 +197,42 @@ public void extractMetricUpdatesStringSet() { assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected)); } + @Test + public void extractMetricUpdatesBoundedTrie() { + BatchModeExecutionContext executionContext = + BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage"); + DataflowOperationContext operationContext = + executionContext.createOperationContext(NameContextsForTests.nameContextForTest()); + + BoundedTrie boundedTrie = + operationContext + .metricsContainer() + .getBoundedTrie(MetricName.named("namespace", "some-bounded-trie")); + boundedTrie.add("ab"); + boundedTrie.add("cd"); + + BoundedTrieData trieData = new BoundedTrieData(); + trieData.add(ImmutableList.of("ab")); + trieData.add(ImmutableList.of("cd")); + MetricsApi.BoundedTrie expectedTrie = trieData.toProto(); + + final CounterUpdate expected = + new CounterUpdate() + .setStructuredNameAndMetadata( + new CounterStructuredNameAndMetadata() + .setName( + new CounterStructuredName() + .setOrigin("USER") + .setOriginNamespace("namespace") + .setName("some-bounded-trie") + .setOriginalStepName("originalName")) + .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) + .setCumulative(false) + .set("bounded_trie", expectedTrie); + + assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected)); + } + @Test public void extractMsecCounters() { BatchModeExecutionContext executionContext = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 4b758aa6cd45..3073bf907f96 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -50,8 +50,10 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind; import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Origin; +import org.apache.beam.sdk.metrics.BoundedTrie; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.LabeledMetricNameUtils; @@ -61,6 +63,7 @@ import org.apache.beam.sdk.metrics.NoOpHistogram; import org.apache.beam.sdk.metrics.StringSet; import org.apache.beam.sdk.util.HistogramData; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.hamcrest.collection.IsEmptyIterable; import org.hamcrest.collection.IsMapContaining; @@ -325,6 +328,72 @@ public void testStringSetUpdateExtraction() { assertThat(updates, containsInAnyOrder(name1Update)); } + @Test + public void testBoundedTrieUpdateExtraction() { + BoundedTrie boundedTrie = c1.getBoundedTrie(name1); + boundedTrie.add("ab"); + boundedTrie.add("cd", "ef"); + boundedTrie.add("gh"); + boundedTrie.add("gh"); + + BoundedTrieData expectedName1 = new BoundedTrieData(); + expectedName1.add(ImmutableList.of("ab")); + expectedName1.add(ImmutableList.of("cd", "ef")); + expectedName1.add(ImmutableList.of("gh")); + expectedName1.add(ImmutableList.of("gh")); + + CounterUpdate name1Update = + new CounterUpdate() + .setStructuredNameAndMetadata( + new CounterStructuredNameAndMetadata() + .setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + .setOriginNamespace("ns") + .setName("name1") + .setOriginalStepName("s1")) + .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) + .setCumulative(false) + .set("bounded_trie", expectedName1.toProto()); + + Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); + assertThat(updates, containsInAnyOrder(name1Update)); + + boundedTrie = c2.getBoundedTrie(name2); + boundedTrie.add("ij"); + boundedTrie.add("kl", "mn"); + boundedTrie.add("mn"); + + BoundedTrieData expectedName2 = new BoundedTrieData(); + expectedName2.add(ImmutableList.of("ij")); + expectedName2.add(ImmutableList.of("kl", "mn")); + expectedName2.add(ImmutableList.of("mn")); + + CounterUpdate name2Update = + new CounterUpdate() + .setStructuredNameAndMetadata( + new CounterStructuredNameAndMetadata() + .setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + .setOriginNamespace("ns") + .setName("name2") + .setOriginalStepName("s2")) + .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) + .setCumulative(false) + .set("bounded_trie", expectedName2.toProto()); + + updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); + assertThat(updates, containsInAnyOrder(name1Update, name2Update)); + + c1.getBoundedTrie(name1).add("op"); + expectedName1.add(ImmutableList.of("op")); + name1Update.set("bounded_trie", expectedName1.toProto()); + + updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); + assertThat(updates, containsInAnyOrder(name1Update, name2Update)); + } + @Test public void testPerWorkerMetrics() { StreamingStepMetricsContainer.setEnablePerWorkerMetrics(false); From 6ad7c0d4c8d52f0e2d4ed5d5f8d3cb13d1fd7c13 Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Tue, 4 Feb 2025 17:02:59 -0800 Subject: [PATCH 2/5] Updated to use trie from CounterUpdate and Metric Dataflow APIs --- .../runners/core/metrics/BoundedTrieData.java | 12 +- .../runners/dataflow/DataflowMetrics.java | 11 +- .../runners/dataflow/DataflowMetricsTest.java | 9 +- .../MetricsToCounterUpdateConverter.java | 42 +++++- .../worker/BatchModeExecutionContextTest.java | 4 +- .../MetricsToCounterUpdateConverterTest.java | 142 ++++++++++++++++++ .../StreamingStepMetricsContainerTest.java | 7 +- 7 files changed, 201 insertions(+), 26 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java index b1efa59fba33..942470535043 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java @@ -180,14 +180,16 @@ public synchronized BoundedTrieResult extractResult() { */ public synchronized void add(Iterable segments) { List segmentsParts = ImmutableList.copyOf(segments); - if (this.root == null) { - if (this.singleton == null || !this.singleton.equals(segmentsParts)) { + if (this.singleton == null && this.root == null) { + // empty case + this.singleton = segmentsParts; + } else if (this.singleton != null && this.singleton.equals(segmentsParts)) { + // skip + } else { + if (this.root == null) { this.root = this.asTrie(); this.singleton = null; } - } - - if (this.root != null) { this.root.add(segmentsParts); if (this.root.getSize() > this.bound) { this.root.trim(); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index 94d0c902e84b..5f9342edcfaf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -59,9 +59,6 @@ class DataflowMetrics extends MetricResults { private static final Logger LOG = LoggerFactory.getLogger(DataflowMetrics.class); - // TODO (rosinha): Remove this once bounded_trie is available in metrics proto Dataflow - // java client. - public static final String BOUNDED_TRIE = "bounded_trie"; /** * Client for the Dataflow service. This can be used to query the service for information about * the job. @@ -177,9 +174,7 @@ public void addMetricResult( // stringset metric StringSetResult value = getStringSetValue(committed); stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); - } else if (committed.get(BOUNDED_TRIE) != null && attempted.get(BOUNDED_TRIE) != null) { - // TODO (rosinha): This is dummy code. Once Dataflow MetricUpdate - // google client api is updated. Update this. + } else if (committed.getTrie() != null && attempted.getTrie() != null) { BoundedTrieResult value = getBoundedTrieValue(committed); boundedTrieResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); } else { @@ -210,10 +205,10 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) { } private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) { - if (metricUpdate.get(BOUNDED_TRIE) == null) { + if (metricUpdate.getTrie() == null) { return BoundedTrieResult.empty(); } - BoundedTrie bTrie = (BoundedTrie) metricUpdate.get(BOUNDED_TRIE); + BoundedTrie bTrie = (BoundedTrie) metricUpdate.getTrie(); BoundedTrieData trieData = BoundedTrieData.fromProto(bTrie); return BoundedTrieResult.create(trieData.extractResult().getResult()); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index 90a63554ef34..30f1466d4a54 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.util.Set; +import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie; import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.DataflowTemplateJob; @@ -199,9 +200,9 @@ private MetricUpdate makeStringSetMetricUpdate( } private MetricUpdate makeBoundedTrieMetricUpdate( - String name, String namespace, String step, BoundedTrieData data, boolean tentative) { + String name, String namespace, String step, BoundedTrie data, boolean tentative) { MetricUpdate update = new MetricUpdate(); - update.set(DataflowMetrics.BOUNDED_TRIE, data.toProto()); + update.setTrie(data); return setStructuredName(update, name, namespace, step, tentative); } @@ -318,14 +319,14 @@ public void testSingleBoundedTrieUpdates() throws IOException { "counterName", "counterNamespace", "s2", - new BoundedTrieData(ImmutableList.of("ab", "cd")), + new BoundedTrieData(ImmutableList.of("ab", "cd")).toProto(), false); MetricUpdate mu1Tentative = makeBoundedTrieMetricUpdate( "counterName", "counterNamespace", "s2", - new BoundedTrieData(ImmutableList.of("ab", "cd")), + new BoundedTrieData(ImmutableList.of("ab", "cd")).toProto(), true); jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative)); DataflowClient dataflowClient = mock(DataflowClient.class); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java index 42cc4ac8bf8f..cfb5e498e9ad 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java @@ -19,6 +19,8 @@ import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt; +import com.google.api.services.dataflow.model.BoundedTrie; +import com.google.api.services.dataflow.model.BoundedTrieNode; import com.google.api.services.dataflow.model.CounterMetadata; import com.google.api.services.dataflow.model.CounterStructuredName; import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata; @@ -26,7 +28,11 @@ import com.google.api.services.dataflow.model.DistributionUpdate; import com.google.api.services.dataflow.model.IntegerGauge; import com.google.api.services.dataflow.model.StringList; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.MetricsApi; import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.core.metrics.DistributionData; import org.apache.beam.runners.core.metrics.StringSetData; @@ -113,13 +119,43 @@ public static CounterUpdate fromStringSet( } public static CounterUpdate fromBoundedTrie(MetricKey key, BoundedTrieData boundedTrieData) { - // BoundedTrie uses SET kind metric aggregation which tracks unique strings. + // BoundedTrie uses SET kind metric aggregation which tracks unique strings as a trie. CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET); - // TODO (rosinha): Once the CounterUpdate API is updated in dataflow client update this. + BoundedTrie counterUpdateTrie = getBoundedTrie(boundedTrieData); + return new CounterUpdate() .setStructuredNameAndMetadata(name) .setCumulative(false) - .set("bounded_trie", boundedTrieData.toProto()); + .setBoundedTrie(counterUpdateTrie); + } + + @VisibleForTesting + static BoundedTrie getBoundedTrie(BoundedTrieData boundedTrieData) { + BoundedTrie counterUpdateTrie = new BoundedTrie(); + MetricsApi.BoundedTrie trie = boundedTrieData.toProto(); + counterUpdateTrie.setBound(trie.getBound()); + counterUpdateTrie.setSingleton( + trie.getSingletonList().isEmpty() ? null : trie.getSingletonList()); + counterUpdateTrie.setRoot(getBoundedTrieNode(trie.getRoot())); + return counterUpdateTrie; + } + + /** + * Converts from org.apache.beam.model.pipeline.v1.BoundedTrieNode to + * com.google.api.services.dataflow.model.BoundedTrieNode. This is because even though Dataflow + * CounterUpdate uses org.apache.beam.model.pipeline.v1.BoundedTrieNode in it's definition when + * the google-api client is generated the package is renamed. + * + * @param node org.apache.beam.model.pipeline.v1.BoundedTrieNode to be converted + * @return converted org.apache.beam.model.pipeline.v1.BoundedTrieNode. + */ + private static BoundedTrieNode getBoundedTrieNode(MetricsApi.BoundedTrieNode node) { + BoundedTrieNode boundedTrieNode = new BoundedTrieNode(); + boundedTrieNode.setTruncated(node.getTruncated()); + Map children = new HashMap<>(); + node.getChildrenMap().forEach((key, value) -> children.put(key, getBoundedTrieNode(value))); + boundedTrieNode.setChildren(children); + return boundedTrieNode; } public static CounterUpdate fromDistribution( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java index 051f164fa616..ef968972a892 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java @@ -32,7 +32,6 @@ import com.google.api.services.dataflow.model.DistributionUpdate; import com.google.api.services.dataflow.model.StringList; import java.util.Arrays; -import org.apache.beam.model.pipeline.v1.MetricsApi; import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; @@ -214,7 +213,6 @@ public void extractMetricUpdatesBoundedTrie() { BoundedTrieData trieData = new BoundedTrieData(); trieData.add(ImmutableList.of("ab")); trieData.add(ImmutableList.of("cd")); - MetricsApi.BoundedTrie expectedTrie = trieData.toProto(); final CounterUpdate expected = new CounterUpdate() @@ -228,7 +226,7 @@ public void extractMetricUpdatesBoundedTrie() { .setOriginalStepName("originalName")) .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) .setCumulative(false) - .set("bounded_trie", expectedTrie); + .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(trieData)); assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected)); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java new file mode 100644 index 000000000000..eb716f052ec0 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import com.google.api.services.dataflow.model.BoundedTrie; +import com.google.api.services.dataflow.model.BoundedTrieNode; +import com.google.api.services.dataflow.model.CounterUpdate; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import org.apache.beam.runners.core.metrics.BoundedTrieData; +import org.apache.beam.runners.core.metrics.StringSetData; +import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind; +import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class MetricsToCounterUpdateConverterTest { + + private static final String STEP_NAME = "testStep"; + private static final String METRIC_NAME = "testMetric"; + private static final String NAMESPACE = "testNamespace"; + + private MetricKey createMetricKey() { + return MetricKey.create(STEP_NAME, MetricName.named(NAMESPACE, METRIC_NAME)); + } + + @Test + public void testFromStringSet() { + MetricKey key = createMetricKey(); + boolean isCumulative = false; + StringSetData stringSetData = StringSetData.create(ImmutableSet.of("a", "b", "c")); + + CounterUpdate counterUpdate = + MetricsToCounterUpdateConverter.fromStringSet(key, isCumulative, stringSetData); + + assertEquals( + Kind.SET.toString(), counterUpdate.getStructuredNameAndMetadata().getMetadata().getKind()); + assertFalse(counterUpdate.getCumulative()); + assertEquals( + stringSetData.stringSet(), new HashSet<>(counterUpdate.getStringList().getElements())); + } + + @Test + public void testFromBoundedTrie() { + MetricKey key = createMetricKey(); + BoundedTrieData boundedTrieData = new BoundedTrieData(); + boundedTrieData.add(ImmutableList.of("ab")); + boundedTrieData.add(ImmutableList.of("cd")); + + CounterUpdate counterUpdate = + MetricsToCounterUpdateConverter.fromBoundedTrie(key, boundedTrieData); + + assertEquals( + Kind.SET.toString(), counterUpdate.getStructuredNameAndMetadata().getMetadata().getKind()); + assertFalse(counterUpdate.getCumulative()); + BoundedTrie trie = counterUpdate.getBoundedTrie(); + assertEquals(100, (int) trie.getBound()); + assertEquals(Collections.emptyList(), trie.getSingleton()); + + BoundedTrieNode root = getMiddleNode(ImmutableList.of("ab", "cd")); + assertEquals(root, trie.getRoot()); + } + + @Test + public void testGetBoundedTrieNodeLevels() { + BoundedTrieData boundedTrieData = new BoundedTrieData(); + boundedTrieData.add(ImmutableList.of("ab")); + boundedTrieData.add(ImmutableList.of("cd")); + boundedTrieData.add(ImmutableList.of("ef", "gh")); + boundedTrieData.add(ImmutableList.of("ef", "xy")); + + BoundedTrie actualTrie = MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData); + + BoundedTrie expectedTrie = new BoundedTrie(); + expectedTrie.setBound(100); + BoundedTrieNode root = new BoundedTrieNode(); + Map rootChildren = new HashMap<>(); + rootChildren.put("ab", getEmptyNode()); + rootChildren.put("cd", getEmptyNode()); + rootChildren.put("ef", getMiddleNode(ImmutableList.of("gh", "xy"))); + root.setChildren(rootChildren); + root.setTruncated(false); + expectedTrie.setRoot(root); + expectedTrie.setSingleton(null); + assertEquals(expectedTrie, actualTrie); + } + + @Test + public void testGetBoundedTrieNodeSingleton() { + BoundedTrieData boundedTrieData = new BoundedTrieData(); + boundedTrieData.add(ImmutableList.of("ab")); + BoundedTrie actualTrie = MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData); + + BoundedTrie expectedTrie = new BoundedTrie(); + expectedTrie.setBound(100); + expectedTrie.setSingleton(ImmutableList.of("ab")); + expectedTrie.setRoot(getEmptyNode()); + + assertEquals(expectedTrie, actualTrie); + } + + private static BoundedTrieNode getMiddleNode(ImmutableList elements) { + BoundedTrieNode middleNode = new BoundedTrieNode(); + Map children = new HashMap<>(); + elements.forEach(val -> children.put(val, getEmptyNode())); + middleNode.setChildren(children); + middleNode.setTruncated(false); + return middleNode; + } + + private static BoundedTrieNode getEmptyNode() { + BoundedTrieNode leafNode = new BoundedTrieNode(); + leafNode.setChildren(Collections.emptyMap()); + leafNode.setTruncated(false); + return leafNode; + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 3073bf907f96..0a0d413a9460 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -354,7 +354,7 @@ public void testBoundedTrieUpdateExtraction() { .setOriginalStepName("s1")) .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) .setCumulative(false) - .set("bounded_trie", expectedName1.toProto()); + .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1)); Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update)); @@ -381,14 +381,15 @@ public void testBoundedTrieUpdateExtraction() { .setOriginalStepName("s2")) .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) .setCumulative(false) - .set("bounded_trie", expectedName2.toProto()); + .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2)); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update, name2Update)); c1.getBoundedTrie(name1).add("op"); + expectedName1.add(ImmutableList.of("op")); - name1Update.set("bounded_trie", expectedName1.toProto()); + name1Update.setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1)); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update, name2Update)); From 8bb06594c24bcc9e02d16805f1af1b8f27dd1c17 Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Tue, 4 Feb 2025 17:39:16 -0800 Subject: [PATCH 3/5] Support reporting deltas in bounded trie for streaming --- .../beam/runners/core/metrics/BoundedTrieCell.java | 11 +++++++++++ .../beam/runners/core/metrics/BoundedTrieData.java | 5 +++++ .../dataflow/worker/BatchModeExecutionContext.java | 2 +- .../worker/MetricsToCounterUpdateConverter.java | 5 +++-- .../worker/StreamingStepMetricsContainer.java | 7 ++++++- .../worker/MetricsToCounterUpdateConverterTest.java | 8 +++++--- .../worker/StreamingStepMetricsContainerTest.java | 6 ++++-- 7 files changed, 35 insertions(+), 9 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java index 34beaa6b63ed..06160a3b8c11 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java @@ -74,6 +74,17 @@ public synchronized BoundedTrieData getCumulative() { return value.getCumulative(); } + // Used by Streaming metric container to extract deltas since streaming metrics are + // reported as deltas rather than cumulative as in batch. + // For delta we take the current value then reset the cell to empty so the next call only see + // delta/updates from last call. + public synchronized BoundedTrieData getAndReset() { + // since we are resetting no need to deep a copy just change the reference + BoundedTrieData shallowCopy = this.value; + this.value = new BoundedTrieData(); // create now object should not call reset on existing + return shallowCopy; + } + @Override public MetricName getName() { return name; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java index 942470535043..0cd2185d76fc 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java @@ -273,6 +273,11 @@ public synchronized boolean contains(@Nonnull List value) { } } + /** @return true if this {@link BoundedTrieData} is empty else false. */ + public boolean isEmpty() { + return (root == null || root.children.isEmpty()) && (singleton == null || singleton.isEmpty()); + } + @Override public final boolean equals(@Nullable Object other) { if (this == other) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index d62198e8321d..1bfaaceb8253 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -524,7 +524,7 @@ public Iterable extractMetricUpdates(boolean isFinalUpdate) { .transform( update -> MetricsToCounterUpdateConverter.fromBoundedTrie( - update.getKey(), update.getUpdate()))); + update.getKey(), true, update.getUpdate()))); }); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java index cfb5e498e9ad..13fe051e0e3f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java @@ -118,14 +118,15 @@ public static CounterUpdate fromStringSet( .setStringList(stringList); } - public static CounterUpdate fromBoundedTrie(MetricKey key, BoundedTrieData boundedTrieData) { + public static CounterUpdate fromBoundedTrie( + MetricKey key, boolean isCumulative, BoundedTrieData boundedTrieData) { // BoundedTrie uses SET kind metric aggregation which tracks unique strings as a trie. CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET); BoundedTrie counterUpdateTrie = getBoundedTrie(boundedTrieData); return new CounterUpdate() .setStructuredNameAndMetadata(name) - .setCumulative(false) + .setCumulative(isCumulative) .setBoundedTrie(counterUpdateTrie); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 8ade66061ba5..f9efa2c56751 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nonnull; import org.apache.beam.runners.core.metrics.BoundedTrieCell; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.core.metrics.DistributionData; import org.apache.beam.runners.core.metrics.GaugeCell; import org.apache.beam.runners.core.metrics.MetricsMap; @@ -285,8 +286,12 @@ private FluentIterable boundedTrieUpdates() { @Override public @Nullable CounterUpdate apply( @Nonnull Map.Entry entry) { + BoundedTrieData value = entry.getValue().getAndReset(); + if (value.isEmpty()) { + return null; + } return MetricsToCounterUpdateConverter.fromBoundedTrie( - MetricKey.create(stepName, entry.getKey()), entry.getValue().getCumulative()); + MetricKey.create(stepName, entry.getKey()), false, value); } }) .filter(Predicates.notNull()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java index eb716f052ec0..069cfe7f01f2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java @@ -19,6 +19,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import com.google.api.services.dataflow.model.BoundedTrie; import com.google.api.services.dataflow.model.BoundedTrieNode; @@ -73,14 +75,14 @@ public void testFromBoundedTrie() { boundedTrieData.add(ImmutableList.of("cd")); CounterUpdate counterUpdate = - MetricsToCounterUpdateConverter.fromBoundedTrie(key, boundedTrieData); + MetricsToCounterUpdateConverter.fromBoundedTrie(key, true, boundedTrieData); assertEquals( Kind.SET.toString(), counterUpdate.getStructuredNameAndMetadata().getMetadata().getKind()); - assertFalse(counterUpdate.getCumulative()); + assertTrue(counterUpdate.getCumulative()); BoundedTrie trie = counterUpdate.getBoundedTrie(); assertEquals(100, (int) trie.getBound()); - assertEquals(Collections.emptyList(), trie.getSingleton()); + assertNull(trie.getSingleton()); BoundedTrieNode root = getMiddleNode(ImmutableList.of("ab", "cd")); assertEquals(root, trie.getRoot()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 0a0d413a9460..340d94761b6e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -384,15 +384,17 @@ public void testBoundedTrieUpdateExtraction() { .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2)); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); - assertThat(updates, containsInAnyOrder(name1Update, name2Update)); + assertThat(updates, containsInAnyOrder(name2Update)); + // test delta c1.getBoundedTrie(name1).add("op"); + expectedName1.clear(); expectedName1.add(ImmutableList.of("op")); name1Update.setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1)); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); - assertThat(updates, containsInAnyOrder(name1Update, name2Update)); + assertThat(updates, containsInAnyOrder(name1Update)); } @Test From c7dcc7dd7da9b13895b531350f872d6d63ceee04 Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Tue, 4 Feb 2025 21:47:27 -0800 Subject: [PATCH 4/5] Other clean up changes --- .../runners/core/metrics/BoundedTrieCell.java | 4 +-- .../runners/core/metrics/BoundedTrieData.java | 5 ++- .../core/metrics/BoundedTrieNodeTest.java | 32 +++++++++++++------ .../MetricsToCounterUpdateConverter.java | 27 ++++++++-------- .../worker/BatchModeExecutionContextTest.java | 2 +- .../MetricsToCounterUpdateConverterTest.java | 8 +++-- .../StreamingStepMetricsContainerTest.java | 9 ++++-- .../fn/harness/state/StateBackedIterable.java | 1 - 8 files changed, 53 insertions(+), 35 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java index 06160a3b8c11..0e33881d0e37 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java @@ -79,9 +79,9 @@ public synchronized BoundedTrieData getCumulative() { // For delta we take the current value then reset the cell to empty so the next call only see // delta/updates from last call. public synchronized BoundedTrieData getAndReset() { - // since we are resetting no need to deep a copy just change the reference + // since we are resetting no need to do a deep copy, just change the reference BoundedTrieData shallowCopy = this.value; - this.value = new BoundedTrieData(); // create now object should not call reset on existing + this.value = new BoundedTrieData(); // create new object, should not call reset on existing return shallowCopy; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java index 0cd2185d76fc..63fb289d3eec 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java @@ -180,6 +180,9 @@ public synchronized BoundedTrieResult extractResult() { */ public synchronized void add(Iterable segments) { List segmentsParts = ImmutableList.copyOf(segments); + if (segmentsParts.isEmpty()) { + return; + } if (this.singleton == null && this.root == null) { // empty case this.singleton = segmentsParts; @@ -342,7 +345,7 @@ static class BoundedTrieNode implements Serializable { * @param truncated Whether this node is truncated. * @param size The size of the subtree rooted at this node. */ - BoundedTrieNode(Map children, boolean truncated, int size) { + BoundedTrieNode(@Nonnull Map children, boolean truncated, int size) { this.children = children; this.size = size; this.truncated = truncated; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java index ca5421e6f77f..b51e14f6a23a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java @@ -882,12 +882,31 @@ public void testClear() { assertTrue(trie.extractResult().getResult().isEmpty()); } + @Test + public void testIsEmpty() { + BoundedTrieData trie = new BoundedTrieData(); + assertTrue(trie.isEmpty()); + + trie.add(Collections.emptyList()); + assertTrue(trie.isEmpty()); + + trie.add(ImmutableList.of("a", "b")); + assertFalse(trie.isEmpty()); + + trie.add(ImmutableList.of("c", "d")); + assertFalse(trie.isEmpty()); + + trie.clear(); + assertTrue(trie.isEmpty()); + } + @Test public void testBoundedTrieDataContains() { BoundedTrieData trie = new BoundedTrieData(); trie.add(ImmutableList.of("a", "b")); assertTrue(trie.contains(ImmutableList.of("a", "b"))); - assertTrue(trie.contains(ImmutableList.of("a"))); + // path ab is not same as path a + assertFalse(trie.contains(ImmutableList.of("a"))); assertFalse(trie.contains(ImmutableList.of("a", "c"))); } @@ -1000,18 +1019,11 @@ public void testGetCumulativeWithRoot() { assertFalse(cumulativeTrie.contains(ImmutableList.of("g", "h"))); } - @Test - public void testAddEmptyPath() { - BoundedTrieData trie = new BoundedTrieData(); - trie.add(Collections.emptyList()); - assertEquals(1, trie.size()); - assertTrue(trie.extractResult().getResult().contains(ImmutableList.of("false"))); - } - @Test public void testContainsEmptyPath() { BoundedTrieData trie = new BoundedTrieData(); trie.add(Collections.emptyList()); - assertTrue(trie.contains(Collections.emptyList())); + assertFalse(trie.contains(Collections.emptyList())); + assertTrue(trie.isEmpty()); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java index 13fe051e0e3f..0f393b5c604d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java @@ -28,7 +28,7 @@ import com.google.api.services.dataflow.model.DistributionUpdate; import com.google.api.services.dataflow.model.IntegerGauge; import com.google.api.services.dataflow.model.StringList; -import com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -122,7 +122,7 @@ public static CounterUpdate fromBoundedTrie( MetricKey key, boolean isCumulative, BoundedTrieData boundedTrieData) { // BoundedTrie uses SET kind metric aggregation which tracks unique strings as a trie. CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET); - BoundedTrie counterUpdateTrie = getBoundedTrie(boundedTrieData); + BoundedTrie counterUpdateTrie = getBoundedTrie(boundedTrieData.toProto()); return new CounterUpdate() .setStructuredNameAndMetadata(name) @@ -130,26 +130,25 @@ public static CounterUpdate fromBoundedTrie( .setBoundedTrie(counterUpdateTrie); } + /** + * Converts from org.apache.beam.model.pipeline.v1.BoundedTrie to + * com.google.api.services.dataflow.model.BoundedTrie. This is because even though Dataflow + * CounterUpdate uses org.apache.beam.model.pipeline.v1.BoundedTrieNode in it's definition when + * the google-api client is generated the package is renamed. + * + * @param trie org.apache.beam.model.pipeline.v1.BoundedTrie to be converted + * @return converted com.google.api.services.dataflow.model.BoundedTrie. + */ @VisibleForTesting - static BoundedTrie getBoundedTrie(BoundedTrieData boundedTrieData) { + static BoundedTrie getBoundedTrie(MetricsApi.BoundedTrie trie) { BoundedTrie counterUpdateTrie = new BoundedTrie(); - MetricsApi.BoundedTrie trie = boundedTrieData.toProto(); counterUpdateTrie.setBound(trie.getBound()); counterUpdateTrie.setSingleton( trie.getSingletonList().isEmpty() ? null : trie.getSingletonList()); - counterUpdateTrie.setRoot(getBoundedTrieNode(trie.getRoot())); + counterUpdateTrie.setRoot(trie.hasRoot() ? getBoundedTrieNode(trie.getRoot()) : null); return counterUpdateTrie; } - /** - * Converts from org.apache.beam.model.pipeline.v1.BoundedTrieNode to - * com.google.api.services.dataflow.model.BoundedTrieNode. This is because even though Dataflow - * CounterUpdate uses org.apache.beam.model.pipeline.v1.BoundedTrieNode in it's definition when - * the google-api client is generated the package is renamed. - * - * @param node org.apache.beam.model.pipeline.v1.BoundedTrieNode to be converted - * @return converted org.apache.beam.model.pipeline.v1.BoundedTrieNode. - */ private static BoundedTrieNode getBoundedTrieNode(MetricsApi.BoundedTrieNode node) { BoundedTrieNode boundedTrieNode = new BoundedTrieNode(); boundedTrieNode.setTruncated(node.getTruncated()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java index ef968972a892..1d12295795d3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java @@ -226,7 +226,7 @@ public void extractMetricUpdatesBoundedTrie() { .setOriginalStepName("originalName")) .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) .setCumulative(false) - .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(trieData)); + .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(trieData.toProto())); assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected)); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java index 069cfe7f01f2..3de9678425e3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java @@ -96,7 +96,8 @@ public void testGetBoundedTrieNodeLevels() { boundedTrieData.add(ImmutableList.of("ef", "gh")); boundedTrieData.add(ImmutableList.of("ef", "xy")); - BoundedTrie actualTrie = MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData); + BoundedTrie actualTrie = + MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData.toProto()); BoundedTrie expectedTrie = new BoundedTrie(); expectedTrie.setBound(100); @@ -116,12 +117,13 @@ public void testGetBoundedTrieNodeLevels() { public void testGetBoundedTrieNodeSingleton() { BoundedTrieData boundedTrieData = new BoundedTrieData(); boundedTrieData.add(ImmutableList.of("ab")); - BoundedTrie actualTrie = MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData); + BoundedTrie actualTrie = + MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData.toProto()); BoundedTrie expectedTrie = new BoundedTrie(); expectedTrie.setBound(100); expectedTrie.setSingleton(ImmutableList.of("ab")); - expectedTrie.setRoot(getEmptyNode()); + expectedTrie.setRoot(null); assertEquals(expectedTrie, actualTrie); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 340d94761b6e..3bc16bf2d38c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -354,7 +354,8 @@ public void testBoundedTrieUpdateExtraction() { .setOriginalStepName("s1")) .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) .setCumulative(false) - .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1)); + .setBoundedTrie( + MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto())); Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update)); @@ -381,7 +382,8 @@ public void testBoundedTrieUpdateExtraction() { .setOriginalStepName("s2")) .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) .setCumulative(false) - .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2)); + .setBoundedTrie( + MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2.toProto())); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name2Update)); @@ -391,7 +393,8 @@ public void testBoundedTrieUpdateExtraction() { expectedName1.clear(); expectedName1.add(ImmutableList.of("op")); - name1Update.setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1)); + name1Update.setBoundedTrie( + MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto())); updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update)); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java index 7b6a6195f327..7c4b8d492e10 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java @@ -30,7 +30,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; - import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; import org.apache.beam.fn.harness.Cache; From e6c55be0ce7b12871f35c26646860b75c44b8541 Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Wed, 5 Feb 2025 13:38:47 -0800 Subject: [PATCH 5/5] Fix failing test --- .../dataflow/worker/MetricsToCounterUpdateConverter.java | 2 +- .../runners/dataflow/worker/BatchModeExecutionContextTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java index 0f393b5c604d..6d37deb847cc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java @@ -28,7 +28,6 @@ import com.google.api.services.dataflow.model.DistributionUpdate; import com.google.api.services.dataflow.model.IntegerGauge; import com.google.api.services.dataflow.model.StringList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -38,6 +37,7 @@ import org.apache.beam.runners.core.metrics.StringSetData; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; /** Convertor from Metrics to {@link CounterUpdate} protos. */ @SuppressWarnings({ diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java index 1d12295795d3..b64856bce7fe 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java @@ -225,7 +225,7 @@ public void extractMetricUpdatesBoundedTrie() { .setName("some-bounded-trie") .setOriginalStepName("originalName")) .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) - .setCumulative(false) + .setCumulative(true) // batch counters are cumulative .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(trieData.toProto())); assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected));