diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 7867c4895c4e..042b95e1314d 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -739,7 +739,7 @@ class BeamModulePlugin implements Plugin { google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20250427-2.0.0", // [bomupgrader] sets version google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0", // [bomupgrader] sets version - google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20250519-$google_clients_version", + google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20250106-$google_clients_version", google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version", google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version", google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20250424-2.0.0", // [bomupgrader] sets version diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java index c80e24d95072..63fb289d3eec 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java @@ -311,7 +311,7 @@ public final String toString() { * intended to be used directly outside of {@link BoundedTrieData} with multiple threads. */ @VisibleForTesting - public static class BoundedTrieNode implements Serializable { + static class BoundedTrieNode implements Serializable { public static final String TRUNCATED_TRUE = String.valueOf(true); public static final String TRUNCATED_FALSE = String.valueOf(false); @@ -334,7 +334,7 @@ public static class BoundedTrieNode implements Serializable { private int size; /** Constructs an empty `BoundedTrieNode` with size 1 and not truncated. */ - public BoundedTrieNode() { + BoundedTrieNode() { this(new HashMap<>(), false, 1); } @@ -345,8 +345,7 @@ public BoundedTrieNode() { * @param truncated Whether this node is truncated. * @param size The size of the subtree rooted at this node. */ - public BoundedTrieNode( - @Nonnull Map children, boolean truncated, int size) { + BoundedTrieNode(@Nonnull Map children, boolean truncated, int size) { this.children = children; this.size = size; this.truncated = truncated; @@ -562,7 +561,7 @@ boolean contains(List segments) { * * @return The size of the subtree. */ - public int getSize() { + int getSize() { return size; } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index 06435793b56f..e8e2f6c44c73 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -29,7 +29,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.metrics.BoundedTrieData; @@ -45,11 +44,9 @@ import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.metrics.StringSetResult; import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -136,8 +133,7 @@ private JobMetrics getJobMetrics() throws IOException { return result; } - @VisibleForTesting - static class DataflowMetricResultExtractor { + private static class DataflowMetricResultExtractor { private final ImmutableList.Builder> counterResults; private final ImmutableList.Builder> distributionResults; private final ImmutableList.Builder> gaugeResults; @@ -179,7 +175,7 @@ public void addMetricResult( // stringset metric StringSetResult value = getStringSetValue(committed); stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); - } else if (committed.getBoundedTrie() != null && attempted.getBoundedTrie() != null) { + } else if (committed.getTrie() != null && attempted.getTrie() != null) { BoundedTrieResult value = getBoundedTrieValue(committed); boundedTrieResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); } else { @@ -210,20 +206,12 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) { } private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) { - BoundedTrieData trieData = null; - Object trieFromResponse = metricUpdate.getBoundedTrie(); - // Fail-safely cast Trie returned by dataflow API to BoundedTrieResult - if (trieFromResponse instanceof BoundedTrie) { - trieData = BoundedTrieData.fromProto((BoundedTrie) trieFromResponse); - } else if (trieFromResponse instanceof ArrayMap) { - trieData = trieFromArrayMap((ArrayMap) trieFromResponse); - } - - if (trieData != null) { - return BoundedTrieResult.create(trieData.extractResult().getResult()); - } else { + if (metricUpdate.getTrie() == null) { return BoundedTrieResult.empty(); } + BoundedTrie bTrie = (BoundedTrie) metricUpdate.getTrie(); + BoundedTrieData trieData = BoundedTrieData.fromProto(bTrie); + return BoundedTrieResult.create(trieData.extractResult().getResult()); } private DistributionResult getDistributionValue(MetricUpdate metricUpdate) { @@ -238,62 +226,6 @@ private DistributionResult getDistributionValue(MetricUpdate metricUpdate) { return DistributionResult.create(sum, count, min, max); } - /** Translate ArrayMap returned by Dataflow API client to BoundedTrieData. */ - @VisibleForTesting - static BoundedTrieData trieFromArrayMap(ArrayMap fieldsMap) { - int bound = 0; - List singleton = null; - Object maybeBound = fieldsMap.get("bound"); - if (maybeBound instanceof Number) { - bound = ((Number) maybeBound).intValue(); - } - Object maybeSingleton = fieldsMap.get("singleton"); - if (maybeSingleton instanceof List) { - List valueList = (List) maybeSingleton; - ImmutableList.Builder builder = ImmutableList.builder(); - for (Object stringValue : valueList) { - builder.add((String) stringValue); - } - singleton = builder.build(); - } - Object maybeRoot = fieldsMap.get("root"); - BoundedTrieData.BoundedTrieNode root = null; - if (maybeRoot instanceof Map) { - root = trieNodeFromMap((Map) maybeRoot); - } - return new BoundedTrieData(singleton, root, bound); - } - - /** Translate Map returned by Dataflow API client to BoundedTrieData.BoundedTrieNode. */ - private static BoundedTrieData.BoundedTrieNode trieNodeFromMap(Map fieldsMap) { - boolean truncated = false; - Object mayTruncated = fieldsMap.get("truncated"); - if (mayTruncated instanceof Boolean) { - truncated = (boolean) mayTruncated; - } - int childrenSize = 0; - ImmutableMap.Builder builder = - ImmutableMap.builder(); - Object maybeChildren = fieldsMap.get("children"); - if (maybeChildren instanceof Map) { - Map allChildren = (Map) maybeChildren; - for (Object maybeChildValue : allChildren.entrySet()) { - Map.Entry childValue = (Map.Entry) maybeChildValue; - Object maybeChild = childValue.getValue(); - if (maybeChild instanceof Map) { - BoundedTrieData.BoundedTrieNode child = trieNodeFromMap((Map) maybeChild); - Object maybeKey = childValue.getKey(); - if (maybeKey instanceof String) { - builder.put((String) maybeKey, child); - } - childrenSize += child.getSize(); - } - } - } - Map children = builder.build(); - return new BoundedTrieData.BoundedTrieNode(children, truncated, Math.max(1, childrenSize)); - } - public Iterable> getDistributionResults() { return distributionResults.build(); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index 98e3d4bd4050..30f1466d4a54 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -25,7 +25,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -203,7 +202,7 @@ private MetricUpdate makeStringSetMetricUpdate( private MetricUpdate makeBoundedTrieMetricUpdate( String name, String namespace, String step, BoundedTrie data, boolean tentative) { MetricUpdate update = new MetricUpdate(); - update.setBoundedTrie(data); + update.setTrie(data); return setStructuredName(update, name, namespace, step, tentative); } @@ -297,40 +296,6 @@ public void testSingleStringSetUpdates() throws IOException { StringSetResult.create(ImmutableSet.of("ab", "cd"))))); } - @Test - public void testParseBoundedTrieWithSingleton() { - ArrayMap arrayMap = ArrayMap.create(); - arrayMap.put("bound", 100); - arrayMap.put( - "singleton", ImmutableList.of("pubsub:", "topic:", "`google.com:abc`.", "some-topic")); - - BoundedTrieData result = - DataflowMetrics.DataflowMetricResultExtractor.trieFromArrayMap(arrayMap); - assertEquals( - "BoundedTrieData({'pubsub:topic:`google.com:abc`.some-topicfalse'})", result.toString()); - } - - @Test - @SuppressWarnings("unchecked") // assemble ArrayMap from scratch for testing - public void testParseBoundedTrieWithRoot() { - ArrayMap arrayMap = ArrayMap.create(); - arrayMap.put("bound", 100); - ArrayMap root = ArrayMap.create(); - root.put("truncated", false); - ArrayMap children = ArrayMap.create(); - ArrayMap leaf = ArrayMap.create(); - leaf.put("1", ArrayMap.of("truncated", false)); - leaf.put("2", ArrayMap.of("truncated", false)); - leaf.put("3", ArrayMap.of("truncated", false)); - children.put("gcs:some-bucket.some-folder/", leaf); - root.put("children", children); - arrayMap.put("root", root); - - BoundedTrieData result = - DataflowMetrics.DataflowMetricResultExtractor.trieFromArrayMap(arrayMap); - assertEquals("BoundedTrieData({'gcs:some-bucket.some-folder/false'})", result.toString()); - } - @Test public void testSingleBoundedTrieUpdates() throws IOException { AppliedPTransform myStep = mock(AppliedPTransform.class);