diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json index 96e098eb7f97..dbf131c70637 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json @@ -2,5 +2,6 @@ "comment": "Modify this file in a trivial way to cause this test suite to run", "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", "https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test", - "https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test" + "https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test", + "https://github.com/apache/beam/pull/33921": "noting that PR #33921 should run this test" } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 72f094af9b3c..f9f00a1d0732 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -191,12 +191,12 @@ def commonLegacyExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner 'org.apache.beam.sdk.testing.UsesMetricsPusher', 'org.apache.beam.sdk.testing.UsesBundleFinalizer', + 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', ] def commonRunnerV2ExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesExternalService', 'org.apache.beam.sdk.testing.UsesGaugeMetrics', - 'org.apache.beam.sdk.testing.UsesStringSetMetrics', 'org.apache.beam.sdk.testing.UsesSetState', 'org.apache.beam.sdk.testing.UsesMapState', 'org.apache.beam.sdk.testing.UsesMultimapState', @@ -205,6 +205,7 @@ def commonRunnerV2ExcludeCategories = [ 'org.apache.beam.sdk.testing.UsesTestStream', 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime', 'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput', + 'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', ] // For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index 1bfaaceb8253..2d8638b3330f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -19,6 +19,7 @@ import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.SideInputInfo; +import java.util.Collections; import java.util.Objects; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.core.InMemoryStateInternals; @@ -77,6 +78,9 @@ public class BatchModeExecutionContext protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE = "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl"; + // TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries. + private final boolean populateBoundedTrieMetrics; + private BatchModeExecutionContext( CounterFactory counterFactory, Cache> dataCache, @@ -84,7 +88,8 @@ private BatchModeExecutionContext( ReaderFactory readerFactory, PipelineOptions options, DataflowExecutionStateTracker executionStateTracker, - DataflowExecutionStateRegistry executionStateRegistry) { + DataflowExecutionStateRegistry executionStateRegistry, + boolean populateBoundedTrieMetrics) { super( counterFactory, createMetricsContainerRegistry(), @@ -97,6 +102,7 @@ private BatchModeExecutionContext( this.dataCache = dataCache; this.containerRegistry = (MetricsContainerRegistry) getMetricsContainerRegistry(); + this.populateBoundedTrieMetrics = populateBoundedTrieMetrics; } private static MetricsContainerRegistry createMetricsContainerRegistry() { @@ -132,7 +138,8 @@ public static BatchModeExecutionContext forTesting( counterFactory, options, "test-work-item-id"), - stateRegistry); + stateRegistry, + true); } public static BatchModeExecutionContext forTesting(PipelineOptions options, String stageName) { @@ -245,7 +252,8 @@ public static BatchModeExecutionContext create( counterFactory, options, workItemId), - executionStateRegistry); + executionStateRegistry, + false); } /** Create a new {@link StepContext}. */ @@ -520,7 +528,10 @@ public Iterable extractMetricUpdates(boolean isFinalUpdate) { update -> MetricsToCounterUpdateConverter.fromStringSet( update.getKey(), true, update.getUpdate())), - FluentIterable.from(updates.boundedTrieUpdates()) + FluentIterable.from( + populateBoundedTrieMetrics + ? updates.boundedTrieUpdates() + : Collections.emptyList()) .transform( update -> MetricsToCounterUpdateConverter.fromBoundedTrie( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index f9efa2c56751..ffc81f9f2380 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -23,6 +23,7 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; @@ -99,6 +100,9 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private final Clock clock; + // TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries. + @VisibleForTesting boolean populateBoundedTrieMetrics; + private StreamingStepMetricsContainer(String stepName) { this.stepName = stepName; this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>(); @@ -219,7 +223,7 @@ public Iterable extractUpdates() { .append(distributionUpdates()) .append(gaugeUpdates()) .append(stringSetUpdates()) - .append(boundedTrieUpdates()); + .append(populateBoundedTrieMetrics ? boundedTrieUpdates() : Collections.emptyList()); } private FluentIterable counterUpdates() { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 3bc16bf2d38c..236bbb63f80d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -357,6 +357,7 @@ public void testBoundedTrieUpdateExtraction() { .setBoundedTrie( MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto())); + ((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true; Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update)); @@ -385,6 +386,7 @@ public void testBoundedTrieUpdateExtraction() { .setBoundedTrie( MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2.toProto())); + ((StreamingStepMetricsContainer) c2).populateBoundedTrieMetrics = true; updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name2Update)); @@ -396,6 +398,7 @@ public void testBoundedTrieUpdateExtraction() { name1Update.setBoundedTrie( MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto())); + ((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true; updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); assertThat(updates, containsInAnyOrder(name1Update)); }