From df7a5fb9359336467dab2e3455207f6ede21b4db Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Fri, 7 Feb 2025 16:34:56 -0800 Subject: [PATCH 1/4] Temporarily stop publishing BoundedTrie metrics till it is supported in Dataflow. --- .../worker/BatchModeExecutionContext.java | 19 +++++++++++++++---- .../worker/StreamingStepMetricsContainer.java | 6 +++++- .../StreamingStepMetricsContainerTest.java | 2 ++ 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index 1bfaaceb8253..2d8638b3330f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -19,6 +19,7 @@ import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.SideInputInfo; +import java.util.Collections; import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.core.InMemoryStateInternals; @@ -77,6 +78,9 @@ public class BatchModeExecutionContext protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE = "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl"; + // TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries. + private final boolean populateBoundedTrieMetrics; + private BatchModeExecutionContext( CounterFactory counterFactory, Cache> dataCache, @@ -84,7 +88,8 @@ private BatchModeExecutionContext( ReaderFactory readerFactory, PipelineOptions options, DataflowExecutionStateTracker executionStateTracker, - DataflowExecutionStateRegistry executionStateRegistry) { + DataflowExecutionStateRegistry executionStateRegistry, + boolean populateBoundedTrieMetrics) { super( counterFactory, createMetricsContainerRegistry(), @@ -97,6 +102,7 @@ private BatchModeExecutionContext( this.dataCache = dataCache; this.containerRegistry = (MetricsContainerRegistry) getMetricsContainerRegistry(); + this.populateBoundedTrieMetrics = populateBoundedTrieMetrics; } private static MetricsContainerRegistry createMetricsContainerRegistry() { @@ -132,7 +138,8 @@ public static BatchModeExecutionContext forTesting( counterFactory, options, "test-work-item-id"), - stateRegistry); + stateRegistry, + true); } public static BatchModeExecutionContext forTesting(PipelineOptions options, String stageName) { @@ -245,7 +252,8 @@ public static BatchModeExecutionContext create( counterFactory, options, workItemId), - executionStateRegistry); + executionStateRegistry, + false); } /** Create a new {@link StepContext}. */ @@ -520,7 +528,10 @@ public Iterable extractMetricUpdates(boolean isFinalUpdate) { update -> MetricsToCounterUpdateConverter.fromStringSet( update.getKey(), true, update.getUpdate())), - FluentIterable.from(updates.boundedTrieUpdates()) + FluentIterable.from( + populateBoundedTrieMetrics + ? updates.boundedTrieUpdates() + : Collections.emptyList()) .transform( update -> MetricsToCounterUpdateConverter.fromBoundedTrie( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index f9efa2c56751..ffc81f9f2380 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -23,6 +23,7 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; @@ -99,6 +100,9 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private final Clock clock; + // TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries. + @VisibleForTesting boolean populateBoundedTrieMetrics; + private StreamingStepMetricsContainer(String stepName) { this.stepName = stepName; this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>(); @@ -219,7 +223,7 @@ public Iterable extractUpdates() { .append(distributionUpdates()) .append(gaugeUpdates()) .append(stringSetUpdates()) - .append(boundedTrieUpdates()); + .append(populateBoundedTrieMetrics ? boundedTrieUpdates() : Collections.emptyList()); } private FluentIterable counterUpdates() { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 3bc16bf2d38c..ed61f662fc59 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -357,6 +357,7 @@ public void testBoundedTrieUpdateExtraction() { .setBoundedTrie( MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto())); + ((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true; Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update)); @@ -385,6 +386,7 @@ public void testBoundedTrieUpdateExtraction() { .setBoundedTrie( MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2.toProto())); + ((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true; updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name2Update)); From 56fd786b7c0fbac0b68539d22a13dfab1f4f644d Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Fri, 7 Feb 2025 16:56:41 -0800 Subject: [PATCH 2/4] Exclude UsesBoundedTrieMetrics from Dataflow runner --- runners/google-cloud-dataflow-java/build.gradle | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 1eeef835907f..fc6d2320adda 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -191,12 +191,12 @@ def commonLegacyExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner 'org.apache.beam.sdk.testing.UsesMetricsPusher', 'org.apache.beam.sdk.testing.UsesBundleFinalizer', + 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', ] def commonRunnerV2ExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesExternalService', 'org.apache.beam.sdk.testing.UsesGaugeMetrics', - 'org.apache.beam.sdk.testing.UsesStringSetMetrics', 'org.apache.beam.sdk.testing.UsesSetState', 'org.apache.beam.sdk.testing.UsesMapState', 'org.apache.beam.sdk.testing.UsesMultimapState', @@ -205,6 +205,7 @@ def commonRunnerV2ExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesTestStream', 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime', 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', + 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', ] // For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to From 8a27665179d9d3fc11b6b0aca3030588df1f0758 Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Fri, 7 Feb 2025 18:58:28 -0800 Subject: [PATCH 3/4] Fix failing test --- .../dataflow/worker/StreamingStepMetricsContainerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index ed61f662fc59..236bbb63f80d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -386,7 +386,7 @@ public void testBoundedTrieUpdateExtraction() { .setBoundedTrie( MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2.toProto())); - ((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true; + ((StreamingStepMetricsContainer) c2).populateBoundedTrieMetrics = true; updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name2Update)); @@ -398,6 +398,7 @@ public void testBoundedTrieUpdateExtraction() { name1Update.setBoundedTrie( MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto())); + ((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true; updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update)); } From 7b4a86b9f23ce6855cd38a717a8329ffa3866a41 Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Mon, 10 Feb 2025 08:44:52 -0800 Subject: [PATCH 4/4] Trivial change to postcommit to trigger it --- .../beam_PostCommit_Java_ValidatesRunner_Dataflow.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json index 96e098eb7f97..dbf131c70637 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json @@ -2,5 +2,6 @@ "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", "https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test", - "https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test" + "https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test", + "https://github.com/apache/beam/pull/33921": "noting that PR #33921 should run this test" }