From c1a45165cd1cccd806e37dfcfa4ad1c5f9cb8b22 Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Thu, 13 Mar 2025 22:02:50 -0700 Subject: [PATCH 1/2] Revert Skip BoundedTrie on Dataflow till service is have BoundedTrie #33921 --- .../google-cloud-dataflow-java/build.gradle | 2 -- .../worker/BatchModeExecutionContext.java | 19 ++++--------------- .../worker/StreamingStepMetricsContainer.java | 6 +----- .../StreamingStepMetricsContainerTest.java | 3 --- 4 files changed, 5 insertions(+), 25 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 11a2d333cfa0..415f11455d62 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -193,7 +193,6 @@ def commonLegacyExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner 'org.apache.beam.sdk.testing.UsesMetricsPusher', 'org.apache.beam.sdk.testing.UsesBundleFinalizer', - 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', ] def commonRunnerV2ExcludeCategories = [ @@ -207,7 +206,6 @@ def commonRunnerV2ExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesTestStream', 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime', 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', - 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', ] // For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index 2d8638b3330f..1bfaaceb8253 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -19,7 +19,6 @@ import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.SideInputInfo; -import java.util.Collections; import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.core.InMemoryStateInternals; @@ -78,9 +77,6 @@ public class BatchModeExecutionContext protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE = "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl"; - // TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries. - private final boolean populateBoundedTrieMetrics; - private BatchModeExecutionContext( CounterFactory counterFactory, Cache> dataCache, @@ -88,8 +84,7 @@ private BatchModeExecutionContext( ReaderFactory readerFactory, PipelineOptions options, DataflowExecutionStateTracker executionStateTracker, - DataflowExecutionStateRegistry executionStateRegistry, - boolean populateBoundedTrieMetrics) { + DataflowExecutionStateRegistry executionStateRegistry) { super( counterFactory, createMetricsContainerRegistry(), @@ -102,7 +97,6 @@ private BatchModeExecutionContext( this.dataCache = dataCache; this.containerRegistry = (MetricsContainerRegistry) getMetricsContainerRegistry(); - this.populateBoundedTrieMetrics = populateBoundedTrieMetrics; } private static MetricsContainerRegistry createMetricsContainerRegistry() { @@ -138,8 +132,7 @@ public static BatchModeExecutionContext forTesting( counterFactory, options, "test-work-item-id"), - stateRegistry, - true); + stateRegistry); } public static BatchModeExecutionContext forTesting(PipelineOptions options, String stageName) { @@ -252,8 +245,7 @@ public static BatchModeExecutionContext create( counterFactory, options, workItemId), - executionStateRegistry, - false); + executionStateRegistry); } /** Create a new {@link StepContext}. */ @@ -528,10 +520,7 @@ public Iterable extractMetricUpdates(boolean isFinalUpdate) { update -> MetricsToCounterUpdateConverter.fromStringSet( update.getKey(), true, update.getUpdate())), - FluentIterable.from( - populateBoundedTrieMetrics - ? updates.boundedTrieUpdates() - : Collections.emptyList()) + FluentIterable.from(updates.boundedTrieUpdates()) .transform( update -> MetricsToCounterUpdateConverter.fromBoundedTrie( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index ca7df40762f4..1a3594a973f3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -23,7 +23,6 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; @@ -104,9 +103,6 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private final Clock clock; - // TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries. - @VisibleForTesting boolean populateBoundedTrieMetrics; - private StreamingStepMetricsContainer(String stepName) { this.stepName = stepName; this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>(); @@ -217,7 +213,7 @@ public Iterable extractUpdates() { .append(distributionUpdates()) .append(gaugeUpdates()) .append(stringSetUpdates()) - .append(populateBoundedTrieMetrics ? boundedTrieUpdates() : Collections.emptyList()); + .append(boundedTrieUpdates()); } private FluentIterable counterUpdates() { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 48d8d2dbf4a1..02ce0b2f8706 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -371,7 +371,6 @@ public void testBoundedTrieUpdateExtraction() { .setBoundedTrie( MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto())); - ((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true; Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update)); @@ -400,7 +399,6 @@ public void testBoundedTrieUpdateExtraction() { .setBoundedTrie( MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2.toProto())); - ((StreamingStepMetricsContainer) c2).populateBoundedTrieMetrics = true; updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name2Update)); @@ -412,7 +410,6 @@ public void testBoundedTrieUpdateExtraction() { name1Update.setBoundedTrie( MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto())); - ((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true; updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update)); } From 7adf9a2ab305a3328a3431542df9deb43a9e76a7 Mon Sep 17 00:00:00 2001 From: Rohit Sinha Date: Tue, 18 Mar 2025 22:11:40 -0700 Subject: [PATCH 2/2] Do not runner UsesBoundedTrie test on Dataflow runner as the test verify metric result --- .../beam_PostCommit_Java_ValidatesRunner_Dataflow.json | 3 ++- runners/google-cloud-dataflow-java/build.gradle | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json index e3d6056a5de9..8aad4bae0703 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json @@ -1,4 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2, + "https://github.com/apache/beam/pull/34294": "noting that PR #34294 should run this test" } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 415f11455d62..4c25737fe2f7 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -193,6 +193,7 @@ def commonLegacyExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner 'org.apache.beam.sdk.testing.UsesMetricsPusher', 'org.apache.beam.sdk.testing.UsesBundleFinalizer', + 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of now does not support returning back BoundedTrie in metric result. ] def commonRunnerV2ExcludeCategories = [ @@ -206,6 +207,7 @@ def commonRunnerV2ExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesTestStream', 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime', 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', + 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of now does not support returning back BoundedTrie in metric result. ] // For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to