diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 88023feb7eac..25efa002a36f 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -739,7 +739,7 @@ class BeamModulePlugin implements Plugin { google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20250427-2.0.0", // [bomupgrader] sets version google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0", // [bomupgrader] sets version - google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20250106-$google_clients_version", + google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20250519-$google_clients_version", google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version", google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version", google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20250424-2.0.0", // [bomupgrader] sets version diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java index 63fb289d3eec..c80e24d95072 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java @@ -311,7 +311,7 @@ public final String toString() { * intended to be used directly outside of {@link BoundedTrieData} with multiple threads. */ @VisibleForTesting - static class BoundedTrieNode implements Serializable { + public static class BoundedTrieNode implements Serializable { public static final String TRUNCATED_TRUE = String.valueOf(true); public static final String TRUNCATED_FALSE = String.valueOf(false); @@ -334,7 +334,7 @@ static class BoundedTrieNode implements Serializable { private int size; /** Constructs an empty `BoundedTrieNode` with size 1 and not truncated. */ - BoundedTrieNode() { + public BoundedTrieNode() { this(new HashMap<>(), false, 1); } @@ -345,7 +345,8 @@ static class BoundedTrieNode implements Serializable { * @param truncated Whether this node is truncated. * @param size The size of the subtree rooted at this node. */ - BoundedTrieNode(@Nonnull Map children, boolean truncated, int size) { + public BoundedTrieNode( + @Nonnull Map children, boolean truncated, int size) { this.children = children; this.size = size; this.truncated = truncated; @@ -561,7 +562,7 @@ boolean contains(List segments) { * * @return The size of the subtree. */ - int getSize() { + public int getSize() { return size; } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index e8e2f6c44c73..06435793b56f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.metrics.BoundedTrieData; @@ -44,9 +45,11 @@ import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.metrics.StringSetResult; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -133,7 +136,8 @@ private JobMetrics getJobMetrics() throws IOException { return result; } - private static class DataflowMetricResultExtractor { + @VisibleForTesting + static class DataflowMetricResultExtractor { private final ImmutableList.Builder> counterResults; private final ImmutableList.Builder> distributionResults; private final ImmutableList.Builder> gaugeResults; @@ -175,7 +179,7 @@ public void addMetricResult( // stringset metric StringSetResult value = getStringSetValue(committed); stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); - } else if (committed.getTrie() != null && attempted.getTrie() != null) { + } else if (committed.getBoundedTrie() != null && attempted.getBoundedTrie() != null) { BoundedTrieResult value = getBoundedTrieValue(committed); boundedTrieResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); } else { @@ -206,12 +210,20 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) { } private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) { - if (metricUpdate.getTrie() == null) { + BoundedTrieData trieData = null; + Object trieFromResponse = metricUpdate.getBoundedTrie(); + // Fail-safely cast Trie returned by dataflow API to BoundedTrieResult + if (trieFromResponse instanceof BoundedTrie) { + trieData = BoundedTrieData.fromProto((BoundedTrie) trieFromResponse); + } else if (trieFromResponse instanceof ArrayMap) { + trieData = trieFromArrayMap((ArrayMap) trieFromResponse); + } + + if (trieData != null) { + return BoundedTrieResult.create(trieData.extractResult().getResult()); + } else { return BoundedTrieResult.empty(); } - BoundedTrie bTrie = (BoundedTrie) metricUpdate.getTrie(); - BoundedTrieData trieData = BoundedTrieData.fromProto(bTrie); - return BoundedTrieResult.create(trieData.extractResult().getResult()); } private DistributionResult getDistributionValue(MetricUpdate metricUpdate) { @@ -226,6 +238,62 @@ private DistributionResult getDistributionValue(MetricUpdate metricUpdate) { return DistributionResult.create(sum, count, min, max); } + /** Translate ArrayMap returned by Dataflow API client to BoundedTrieData. */ + @VisibleForTesting + static BoundedTrieData trieFromArrayMap(ArrayMap fieldsMap) { + int bound = 0; + List singleton = null; + Object maybeBound = fieldsMap.get("bound"); + if (maybeBound instanceof Number) { + bound = ((Number) maybeBound).intValue(); + } + Object maybeSingleton = fieldsMap.get("singleton"); + if (maybeSingleton instanceof List) { + List valueList = (List) maybeSingleton; + ImmutableList.Builder builder = ImmutableList.builder(); + for (Object stringValue : valueList) { + builder.add((String) stringValue); + } + singleton = builder.build(); + } + Object maybeRoot = fieldsMap.get("root"); + BoundedTrieData.BoundedTrieNode root = null; + if (maybeRoot instanceof Map) { + root = trieNodeFromMap((Map) maybeRoot); + } + return new BoundedTrieData(singleton, root, bound); + } + + /** Translate Map returned by Dataflow API client to BoundedTrieData.BoundedTrieNode. */ + private static BoundedTrieData.BoundedTrieNode trieNodeFromMap(Map fieldsMap) { + boolean truncated = false; + Object mayTruncated = fieldsMap.get("truncated"); + if (mayTruncated instanceof Boolean) { + truncated = (boolean) mayTruncated; + } + int childrenSize = 0; + ImmutableMap.Builder builder = + ImmutableMap.builder(); + Object maybeChildren = fieldsMap.get("children"); + if (maybeChildren instanceof Map) { + Map allChildren = (Map) maybeChildren; + for (Object maybeChildValue : allChildren.entrySet()) { + Map.Entry childValue = (Map.Entry) maybeChildValue; + Object maybeChild = childValue.getValue(); + if (maybeChild instanceof Map) { + BoundedTrieData.BoundedTrieNode child = trieNodeFromMap((Map) maybeChild); + Object maybeKey = childValue.getKey(); + if (maybeKey instanceof String) { + builder.put((String) maybeKey, child); + } + childrenSize += child.getSize(); + } + } + } + Map children = builder.build(); + return new BoundedTrieData.BoundedTrieNode(children, truncated, Math.max(1, childrenSize)); + } + public Iterable> getDistributionResults() { return distributionResults.build(); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index 30f1466d4a54..98e3d4bd4050 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -202,7 +203,7 @@ private MetricUpdate makeStringSetMetricUpdate( private MetricUpdate makeBoundedTrieMetricUpdate( String name, String namespace, String step, BoundedTrie data, boolean tentative) { MetricUpdate update = new MetricUpdate(); - update.setTrie(data); + update.setBoundedTrie(data); return setStructuredName(update, name, namespace, step, tentative); } @@ -296,6 +297,40 @@ public void testSingleStringSetUpdates() throws IOException { StringSetResult.create(ImmutableSet.of("ab", "cd"))))); } + @Test + public void testParseBoundedTrieWithSingleton() { + ArrayMap arrayMap = ArrayMap.create(); + arrayMap.put("bound", 100); + arrayMap.put( + "singleton", ImmutableList.of("pubsub:", "topic:", "`google.com:abc`.", "some-topic")); + + BoundedTrieData result = + DataflowMetrics.DataflowMetricResultExtractor.trieFromArrayMap(arrayMap); + assertEquals( + "BoundedTrieData({'pubsub:topic:`google.com:abc`.some-topicfalse'})", result.toString()); + } + + @Test + @SuppressWarnings("unchecked") // assemble ArrayMap from scratch for testing + public void testParseBoundedTrieWithRoot() { + ArrayMap arrayMap = ArrayMap.create(); + arrayMap.put("bound", 100); + ArrayMap root = ArrayMap.create(); + root.put("truncated", false); + ArrayMap children = ArrayMap.create(); + ArrayMap leaf = ArrayMap.create(); + leaf.put("1", ArrayMap.of("truncated", false)); + leaf.put("2", ArrayMap.of("truncated", false)); + leaf.put("3", ArrayMap.of("truncated", false)); + children.put("gcs:some-bucket.some-folder/", leaf); + root.put("children", children); + arrayMap.put("root", root); + + BoundedTrieData result = + DataflowMetrics.DataflowMetricResultExtractor.trieFromArrayMap(arrayMap); + assertEquals("BoundedTrieData({'gcs:some-bucket.some-folder/false'})", result.toString()); + } + @Test public void testSingleBoundedTrieUpdates() throws IOException { AppliedPTransform myStep = mock(AppliedPTransform.class);