Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test"
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test",
"https://github.com/apache/beam/pull/33921": "noting that PR #33921 should run this test"
}
3 changes: 2 additions & 1 deletion runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,12 @@ def commonLegacyExcludeCategories = [
'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner
'org.apache.beam.sdk.testing.UsesMetricsPusher',
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics',
]

def commonRunnerV2ExcludeCategories = [
'org.apache.beam.sdk.testing.UsesExternalService',
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesStringSetMetrics',
'org.apache.beam.sdk.testing.UsesSetState',
'org.apache.beam.sdk.testing.UsesMapState',
'org.apache.beam.sdk.testing.UsesMultimapState',
Expand All @@ -205,6 +205,7 @@ def commonRunnerV2ExcludeCategories = [
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime',
'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput',
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics',
]

// For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.SideInputInfo;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.InMemoryStateInternals;
Expand Down Expand Up @@ -77,14 +78,18 @@ public class BatchModeExecutionContext
protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";

// TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries.
private final boolean populateBoundedTrieMetrics;

private BatchModeExecutionContext(
CounterFactory counterFactory,
Cache<?, WeightedValue<?>> dataCache,
Cache<?, ?> logicalReferenceCache,
ReaderFactory readerFactory,
PipelineOptions options,
DataflowExecutionStateTracker executionStateTracker,
DataflowExecutionStateRegistry executionStateRegistry) {
DataflowExecutionStateRegistry executionStateRegistry,
boolean populateBoundedTrieMetrics) {
super(
counterFactory,
createMetricsContainerRegistry(),
Expand All @@ -97,6 +102,7 @@ private BatchModeExecutionContext(
this.dataCache = dataCache;
this.containerRegistry =
(MetricsContainerRegistry<MetricsContainerImpl>) getMetricsContainerRegistry();
this.populateBoundedTrieMetrics = populateBoundedTrieMetrics;
}

private static MetricsContainerRegistry<MetricsContainerImpl> createMetricsContainerRegistry() {
Expand Down Expand Up @@ -132,7 +138,8 @@ public static BatchModeExecutionContext forTesting(
counterFactory,
options,
"test-work-item-id"),
stateRegistry);
stateRegistry,
true);
}

public static BatchModeExecutionContext forTesting(PipelineOptions options, String stageName) {
Expand Down Expand Up @@ -245,7 +252,8 @@ public static BatchModeExecutionContext create(
counterFactory,
options,
workItemId),
executionStateRegistry);
executionStateRegistry,
false);
}

/** Create a new {@link StepContext}. */
Expand Down Expand Up @@ -520,7 +528,10 @@ public Iterable<CounterUpdate> extractMetricUpdates(boolean isFinalUpdate) {
update ->
MetricsToCounterUpdateConverter.fromStringSet(
update.getKey(), true, update.getUpdate())),
FluentIterable.from(updates.boundedTrieUpdates())
FluentIterable.from(
populateBoundedTrieMetrics
? updates.boundedTrieUpdates()
: Collections.emptyList())
.transform(
update ->
MetricsToCounterUpdateConverter.fromBoundedTrie(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -99,6 +100,9 @@ public class StreamingStepMetricsContainer implements MetricsContainer {

private final Clock clock;

// TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries.
@VisibleForTesting boolean populateBoundedTrieMetrics;

private StreamingStepMetricsContainer(String stepName) {
this.stepName = stepName;
this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -219,7 +223,7 @@ public Iterable<CounterUpdate> extractUpdates() {
.append(distributionUpdates())
.append(gaugeUpdates())
.append(stringSetUpdates())
.append(boundedTrieUpdates());
.append(populateBoundedTrieMetrics ? boundedTrieUpdates() : Collections.emptyList());
}

private FluentIterable<CounterUpdate> counterUpdates() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ public void testBoundedTrieUpdateExtraction() {
.setBoundedTrie(
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto()));

((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true;
Iterable<CounterUpdate> updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update));

Expand Down Expand Up @@ -385,6 +386,7 @@ public void testBoundedTrieUpdateExtraction() {
.setBoundedTrie(
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2.toProto()));

((StreamingStepMetricsContainer) c2).populateBoundedTrieMetrics = true;
updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name2Update));

Expand All @@ -396,6 +398,7 @@ public void testBoundedTrieUpdateExtraction() {
name1Update.setBoundedTrie(
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto()));

((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true;
updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update));
}
Expand Down
Loading