Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this is only needed to force the testing infra to think the file is changed when you have no other changes to make.

"https://github.com/apache/beam/pull/34294": "noting that PR #34294 should run this test"
}
4 changes: 2 additions & 2 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def commonLegacyExcludeCategories = [
'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner
'org.apache.beam.sdk.testing.UsesMetricsPusher',
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics',
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of now does not support returning back BoundedTrie in metric result.
]

def commonRunnerV2ExcludeCategories = [
Expand All @@ -207,7 +207,7 @@ def commonRunnerV2ExcludeCategories = [
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime',
'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput',
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics',
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of now does not support returning back BoundedTrie in metric result.
]

// For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.SideInputInfo;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.InMemoryStateInternals;
Expand Down Expand Up @@ -78,18 +77,14 @@ public class BatchModeExecutionContext
protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";

// TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries.
private final boolean populateBoundedTrieMetrics;

private BatchModeExecutionContext(
CounterFactory counterFactory,
Cache<?, WeightedValue<?>> dataCache,
Cache<?, ?> logicalReferenceCache,
ReaderFactory readerFactory,
PipelineOptions options,
DataflowExecutionStateTracker executionStateTracker,
DataflowExecutionStateRegistry executionStateRegistry,
boolean populateBoundedTrieMetrics) {
DataflowExecutionStateRegistry executionStateRegistry) {
super(
counterFactory,
createMetricsContainerRegistry(),
Expand All @@ -102,7 +97,6 @@ private BatchModeExecutionContext(
this.dataCache = dataCache;
this.containerRegistry =
(MetricsContainerRegistry<MetricsContainerImpl>) getMetricsContainerRegistry();
this.populateBoundedTrieMetrics = populateBoundedTrieMetrics;
}

private static MetricsContainerRegistry<MetricsContainerImpl> createMetricsContainerRegistry() {
Expand Down Expand Up @@ -138,8 +132,7 @@ public static BatchModeExecutionContext forTesting(
counterFactory,
options,
"test-work-item-id"),
stateRegistry,
true);
stateRegistry);
}

public static BatchModeExecutionContext forTesting(PipelineOptions options, String stageName) {
Expand Down Expand Up @@ -252,8 +245,7 @@ public static BatchModeExecutionContext create(
counterFactory,
options,
workItemId),
executionStateRegistry,
false);
executionStateRegistry);
}

/** Create a new {@link StepContext}. */
Expand Down Expand Up @@ -528,10 +520,7 @@ public Iterable<CounterUpdate> extractMetricUpdates(boolean isFinalUpdate) {
update ->
MetricsToCounterUpdateConverter.fromStringSet(
update.getKey(), true, update.getUpdate())),
FluentIterable.from(
populateBoundedTrieMetrics
? updates.boundedTrieUpdates()
: Collections.emptyList())
FluentIterable.from(updates.boundedTrieUpdates())
.transform(
update ->
MetricsToCounterUpdateConverter.fromBoundedTrie(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -104,9 +103,6 @@ public class StreamingStepMetricsContainer implements MetricsContainer {

private final Clock clock;

// TODO(BEAM-33720): Remove once Dataflow legacy runner supports BoundedTries.
@VisibleForTesting boolean populateBoundedTrieMetrics;

private StreamingStepMetricsContainer(String stepName) {
this.stepName = stepName;
this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -217,7 +213,7 @@ public Iterable<CounterUpdate> extractUpdates() {
.append(distributionUpdates())
.append(gaugeUpdates())
.append(stringSetUpdates())
.append(populateBoundedTrieMetrics ? boundedTrieUpdates() : Collections.emptyList());
.append(boundedTrieUpdates());
}

private FluentIterable<CounterUpdate> counterUpdates() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ public void testBoundedTrieUpdateExtraction() {
.setBoundedTrie(
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto()));

((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true;
Iterable<CounterUpdate> updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update));

Expand Down Expand Up @@ -400,7 +399,6 @@ public void testBoundedTrieUpdateExtraction() {
.setBoundedTrie(
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2.toProto()));

((StreamingStepMetricsContainer) c2).populateBoundedTrieMetrics = true;
updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name2Update));

Expand All @@ -412,7 +410,6 @@ public void testBoundedTrieUpdateExtraction() {
name1Update.setBoundedTrie(
MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto()));

((StreamingStepMetricsContainer) c1).populateBoundedTrieMetrics = true;
updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update));
}
Expand Down
Loading