From 79404ff07ecf2b6f40be13a96a61374f5c7e50bb Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Thu, 24 Apr 2025 17:18:15 -0400 Subject: [PATCH 1/4] Parse struct returned from Dataflow API to BoundedTrieData fix checkstyle Use getBoundedTrie add debug log adapt to ArrayMap --- .../beam/gradle/BeamModulePlugin.groovy | 2 +- .../runners/core/metrics/BoundedTrieData.java | 9 +- .../google-cloud-dataflow-java/build.gradle | 1 + .../runners/dataflow/DataflowMetrics.java | 88 +++++++++++- .../runners/dataflow/DataflowMetricsTest.java | 125 ++++++++++++++++++ .../beam/checkstyle/suppressions.xml | 2 + 6 files changed, 216 insertions(+), 11 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 88023feb7eac..25efa002a36f 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -739,7 +739,7 @@ class BeamModulePlugin implements Plugin { google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20250427-2.0.0", // [bomupgrader] sets version google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0", // [bomupgrader] sets version - google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20250106-$google_clients_version", + google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20250519-$google_clients_version", google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version", google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version", google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20250424-2.0.0", // [bomupgrader] sets version diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java index 63fb289d3eec..c80e24d95072 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java @@ -311,7 +311,7 @@ public final String toString() { * intended to be used directly outside of {@link BoundedTrieData} with multiple threads. */ @VisibleForTesting - static class BoundedTrieNode implements Serializable { + public static class BoundedTrieNode implements Serializable { public static final String TRUNCATED_TRUE = String.valueOf(true); public static final String TRUNCATED_FALSE = String.valueOf(false); @@ -334,7 +334,7 @@ static class BoundedTrieNode implements Serializable { private int size; /** Constructs an empty `BoundedTrieNode` with size 1 and not truncated. */ - BoundedTrieNode() { + public BoundedTrieNode() { this(new HashMap<>(), false, 1); } @@ -345,7 +345,8 @@ static class BoundedTrieNode implements Serializable { * @param truncated Whether this node is truncated. * @param size The size of the subtree rooted at this node. */ - BoundedTrieNode(@Nonnull Map children, boolean truncated, int size) { + public BoundedTrieNode( + @Nonnull Map children, boolean truncated, int size) { this.children = children; this.size = size; this.truncated = truncated; @@ -561,7 +562,7 @@ boolean contains(List segments) { * * @return The size of the subtree. */ - int getSize() { + public int getSize() { return size; } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 90d388b8bb68..9a5ad9953b87 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -111,6 +111,7 @@ dependencies { implementation library.java.google_auth_library_oauth2_http implementation library.java.google_http_client implementation library.java.google_http_client_gson + implementation library.java.protobuf_java permitUnusedDeclared library.java.google_http_client_gson // BEAM-11761 implementation library.java.hamcrest implementation library.java.jackson_annotations diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index e8e2f6c44c73..77e7ae44c8fd 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.metrics.BoundedTrieData; @@ -44,9 +45,11 @@ import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.metrics.StringSetResult; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -133,7 +136,8 @@ private JobMetrics getJobMetrics() throws IOException { return result; } - private static class DataflowMetricResultExtractor { + @VisibleForTesting + static class DataflowMetricResultExtractor { private final ImmutableList.Builder> counterResults; private final ImmutableList.Builder> distributionResults; private final ImmutableList.Builder> gaugeResults; @@ -175,7 +179,8 @@ public void addMetricResult( // stringset metric StringSetResult value = getStringSetValue(committed); stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); - } else if (committed.getTrie() != null && attempted.getTrie() != null) { + } else if (committed.getBoundedTrie() != null && attempted.getBoundedTrie() != null) { + LOG.info("DO NOT MERGE: get bounded trie: {}", committed.getBoundedTrie()); BoundedTrieResult value = getBoundedTrieValue(committed); boundedTrieResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); } else { @@ -206,12 +211,24 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) { } private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) { - if (metricUpdate.getTrie() == null) { + BoundedTrieData trieData = null; + Object trieFromResponse = metricUpdate.getBoundedTrie(); + // Fail-safely cast Trie returned by dataflow API to BoundedTrieResult + if (trieFromResponse instanceof BoundedTrie) { + BoundedTrie bTrie = (BoundedTrie) metricUpdate.getBoundedTrie(); + trieData = BoundedTrieData.fromProto(bTrie); + } else if (trieFromResponse instanceof ArrayMap) { + LOG.info("DO NOT MERGE try to resolve ArrayMap"); + trieData = trieFromArrayMap((ArrayMap) trieFromResponse); + } else { + LOG.info("DO NOT MERGE bounded trie is of type {}", trieFromResponse.getClass().getName()); + } + + if (trieData != null) { + return BoundedTrieResult.create(trieData.extractResult().getResult()); + } else { return BoundedTrieResult.empty(); } - BoundedTrie bTrie = (BoundedTrie) metricUpdate.getTrie(); - BoundedTrieData trieData = BoundedTrieData.fromProto(bTrie); - return BoundedTrieResult.create(trieData.extractResult().getResult()); } private DistributionResult getDistributionValue(MetricUpdate metricUpdate) { @@ -226,6 +243,65 @@ private DistributionResult getDistributionValue(MetricUpdate metricUpdate) { return DistributionResult.create(sum, count, min, max); } + /** Translate ArrayMap returned by Dataflow API client to BoundedTrieData. */ + @VisibleForTesting + static BoundedTrieData trieFromArrayMap(ArrayMap fieldsMap) { + // {root={truncated=false, children={gcs:={children={yathu_test.={truncated=false, + // children={temp/2={truncated=false}, temp/1={truncated=false}, temp/3={truncated=false}}}}, + // truncated=false}}}, bound=100} + int bound = 0; + List singleton = null; + Object maybeBound = fieldsMap.get("bound"); + if (maybeBound instanceof Number) { + bound = ((Number) maybeBound).intValue(); + } + Object maybeSingleton = fieldsMap.get("singleton"); + if (maybeSingleton instanceof List) { + List valueList = (List) maybeSingleton; + ImmutableList.Builder builder = ImmutableList.builder(); + for (Object stringValue : valueList) { + builder.add((String) stringValue); + } + singleton = builder.build(); + } + Object maybeRoot = fieldsMap.get("root"); + BoundedTrieData.BoundedTrieNode root = null; + if (maybeRoot instanceof Map) { + root = trieNodeFromMap((Map) maybeRoot); + } + return new BoundedTrieData(singleton, root, bound); + } + + /** Translate Map returned by Dataflow API client to BoundedTrieData.BoundedTrieNode. */ + private static BoundedTrieData.BoundedTrieNode trieNodeFromMap(Map fieldsMap) { + boolean truncated = false; + Object mayTruncated = fieldsMap.get("truncated"); + if (mayTruncated instanceof Boolean) { + truncated = (boolean) mayTruncated; + } + int childrenSize = 0; + ImmutableMap.Builder builder = + ImmutableMap.builder(); + Object maybeChildren = fieldsMap.get("children"); + if (maybeChildren instanceof Map) { + Map allChildren = (Map) maybeChildren; + for (Object maybeChildValue : allChildren.entrySet()) { + Map.Entry childValue = (Map.Entry) maybeChildValue; + Object maybeChild = childValue.getValue(); + if (maybeChild instanceof Map) { + BoundedTrieData.BoundedTrieNode child = trieNodeFromMap((Map) maybeChild); + Object maybeKey = childValue.getKey(); + if (maybeKey instanceof String) { + builder.put((String) maybeKey, child); + } + childrenSize += child.getSize(); + } + } + } + Map children = builder.build(); + return new BoundedTrieData.BoundedTrieNode(children, truncated, Math.max(1, childrenSize)); + } + public Iterable> getDistributionResults() { return distributionResults.build(); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index 30f1466d4a54..e1d4d22dfea7 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -296,6 +296,131 @@ public void testSingleStringSetUpdates() throws IOException { StringSetResult.create(ImmutableSet.of("ab", "cd"))))); } + /* + @Test + public void testSingletonBoundedTrieFromMessage() throws ParseException { + String textProto = + " fields {\n" + + " key: \"bound\"\n" + + " value {\n" + + " number_value: 100\n" + + " }\n" + + " }\n" + + " fields {\n" + + " key: \"singleton\"\n" + + " value {\n" + + " list_value {\n" + + " values {\n" + + " string_value: \"pubsub:\"\n" + + " }\n" + + " values {\n" + + " string_value: \"topic:\"\n" + + " }\n" + + " values {\n" + + " string_value: \"`google.com:abc`.\"\n" + + " }\n" + + " values {\n" + + " string_value: \"some-topic\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }"; + com.google.protobuf.Struct response = + TextFormat.parse(textProto, com.google.protobuf.Struct.class); + BoundedTrieData result = DataflowMetrics.DataflowMetricResultExtractor.trieFromStruct(response); + assertEquals( + "BoundedTrieData({'pubsub:topic:`google.com:abc`.some-topicfalse'})", result.toString()); + } + + @Test + public void testNestedBoundedTrieFromMessage() throws ParseException { + String textProto = + "fields {\n" + + " key: \"bound\"\n" + + " value {\n" + + " number_value: 100\n" + + " }\n" + + "}\n" + + "fields {\n" + + " key: \"root\"\n" + + " value {\n" + + " struct_value {\n" + + " fields {\n" + + " key: \"children\"\n" + + " value {\n" + + " struct_value {\n" + + " fields {\n" + + " key: \"gcs:\"\n" + + " value {\n" + + " struct_value {\n" + + " fields {\n" + + " key: \"children\"\n" + + " value {\n" + + " struct_value {\n" + + " fields {\n" + + " key: \"some-bucket.\"\n" + + " value {\n" + + " struct_value {\n" + + " fields {\n" + + " key: \"children\"\n" + + " value {\n" + + " struct_value {\n" + + " fields {\n" + + " key: \"some-folder/\"\n" + + " value {\n" + + " struct_value {\n" + + " fields {\n" + + " key: \"truncated\"\n" + + " value {\n" + + " bool_value: true\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " fields {\n" + + " key: \"truncated\"\n" + + " value {\n" + + " bool_value: false\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " fields {\n" + + " key: \"truncated\"\n" + + " value {\n" + + " bool_value: false\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " fields {\n" + + " key: \"truncated\"\n" + + " value {\n" + + " bool_value: false\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + com.google.protobuf.Struct response = + TextFormat.parse(textProto, com.google.protobuf.Struct.class); + BoundedTrieData result = DataflowMetrics.DataflowMetricResultExtractor.trieFromStruct(response); + assertEquals("BoundedTrieData({'gcs:some-bucket.some-folder/true'})", result.toString()); + } + */ + @Test public void testSingleBoundedTrieUpdates() throws IOException { AppliedPTransform myStep = mock(AppliedPTransform.class); diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index af384ff19c09..620331942fd4 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -90,6 +90,8 @@ + + From e3506e782b3e85d7ff2a2aa76ee280deed8ee1c7 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 6 Jun 2025 17:23:03 -0400 Subject: [PATCH 2/4] Add test, clean up --- .../google-cloud-dataflow-java/build.gradle | 1 - .../runners/dataflow/DataflowMetrics.java | 7 - .../runners/dataflow/DataflowMetricsTest.java | 144 ++++-------------- .../beam/checkstyle/suppressions.xml | 2 - 4 files changed, 27 insertions(+), 127 deletions(-) diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index 9a5ad9953b87..90d388b8bb68 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -111,7 +111,6 @@ dependencies { implementation library.java.google_auth_library_oauth2_http implementation library.java.google_http_client implementation library.java.google_http_client_gson - implementation library.java.protobuf_java permitUnusedDeclared library.java.google_http_client_gson // BEAM-11761 implementation library.java.hamcrest implementation library.java.jackson_annotations diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index 77e7ae44c8fd..2338f63c44ff 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -180,7 +180,6 @@ public void addMetricResult( StringSetResult value = getStringSetValue(committed); stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); } else if (committed.getBoundedTrie() != null && attempted.getBoundedTrie() != null) { - LOG.info("DO NOT MERGE: get bounded trie: {}", committed.getBoundedTrie()); BoundedTrieResult value = getBoundedTrieValue(committed); boundedTrieResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); } else { @@ -218,10 +217,7 @@ private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) { BoundedTrie bTrie = (BoundedTrie) metricUpdate.getBoundedTrie(); trieData = BoundedTrieData.fromProto(bTrie); } else if (trieFromResponse instanceof ArrayMap) { - LOG.info("DO NOT MERGE try to resolve ArrayMap"); trieData = trieFromArrayMap((ArrayMap) trieFromResponse); - } else { - LOG.info("DO NOT MERGE bounded trie is of type {}", trieFromResponse.getClass().getName()); } if (trieData != null) { @@ -246,9 +242,6 @@ private DistributionResult getDistributionValue(MetricUpdate metricUpdate) { /** Translate ArrayMap returned by Dataflow API client to BoundedTrieData. */ @VisibleForTesting static BoundedTrieData trieFromArrayMap(ArrayMap fieldsMap) { - // {root={truncated=false, children={gcs:={children={yathu_test.={truncated=false, - // children={temp/2={truncated=false}, temp/1={truncated=false}, temp/3={truncated=false}}}}, - // truncated=false}}}, bound=100} int bound = 0; List singleton = null; Object maybeBound = fieldsMap.get("bound"); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index e1d4d22dfea7..425ca8cecbd9 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -25,6 +25,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -296,130 +297,39 @@ public void testSingleStringSetUpdates() throws IOException { StringSetResult.create(ImmutableSet.of("ab", "cd"))))); } - /* @Test - public void testSingletonBoundedTrieFromMessage() throws ParseException { - String textProto = - " fields {\n" - + " key: \"bound\"\n" - + " value {\n" - + " number_value: 100\n" - + " }\n" - + " }\n" - + " fields {\n" - + " key: \"singleton\"\n" - + " value {\n" - + " list_value {\n" - + " values {\n" - + " string_value: \"pubsub:\"\n" - + " }\n" - + " values {\n" - + " string_value: \"topic:\"\n" - + " }\n" - + " values {\n" - + " string_value: \"`google.com:abc`.\"\n" - + " }\n" - + " values {\n" - + " string_value: \"some-topic\"\n" - + " }\n" - + " }\n" - + " }\n" - + " }"; - com.google.protobuf.Struct response = - TextFormat.parse(textProto, com.google.protobuf.Struct.class); - BoundedTrieData result = DataflowMetrics.DataflowMetricResultExtractor.trieFromStruct(response); + public void testParseBoundedTrieWithSingleton() { + ArrayMap arrayMap = ArrayMap.create(); + arrayMap.put("bound", 100); + arrayMap.put( + "singleton", ImmutableList.of("pubsub:", "topic:", "`google.com:abc`.", "some-topic")); + + BoundedTrieData result = + DataflowMetrics.DataflowMetricResultExtractor.trieFromArrayMap(arrayMap); assertEquals( "BoundedTrieData({'pubsub:topic:`google.com:abc`.some-topicfalse'})", result.toString()); } @Test - public void testNestedBoundedTrieFromMessage() throws ParseException { - String textProto = - "fields {\n" - + " key: \"bound\"\n" - + " value {\n" - + " number_value: 100\n" - + " }\n" - + "}\n" - + "fields {\n" - + " key: \"root\"\n" - + " value {\n" - + " struct_value {\n" - + " fields {\n" - + " key: \"children\"\n" - + " value {\n" - + " struct_value {\n" - + " fields {\n" - + " key: \"gcs:\"\n" - + " value {\n" - + " struct_value {\n" - + " fields {\n" - + " key: \"children\"\n" - + " value {\n" - + " struct_value {\n" - + " fields {\n" - + " key: \"some-bucket.\"\n" - + " value {\n" - + " struct_value {\n" - + " fields {\n" - + " key: \"children\"\n" - + " value {\n" - + " struct_value {\n" - + " fields {\n" - + " key: \"some-folder/\"\n" - + " value {\n" - + " struct_value {\n" - + " fields {\n" - + " key: \"truncated\"\n" - + " value {\n" - + " bool_value: true\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " fields {\n" - + " key: \"truncated\"\n" - + " value {\n" - + " bool_value: false\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " fields {\n" - + " key: \"truncated\"\n" - + " value {\n" - + " bool_value: false\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " fields {\n" - + " key: \"truncated\"\n" - + " value {\n" - + " bool_value: false\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + "}"; - com.google.protobuf.Struct response = - TextFormat.parse(textProto, com.google.protobuf.Struct.class); - BoundedTrieData result = DataflowMetrics.DataflowMetricResultExtractor.trieFromStruct(response); - assertEquals("BoundedTrieData({'gcs:some-bucket.some-folder/true'})", result.toString()); + @SuppressWarnings("unchecked") // assemble ArrayMap from scratch for testing + public void testParseBoundedTrieWithRoot() { + ArrayMap arrayMap = ArrayMap.create(); + arrayMap.put("bound", 100); + ArrayMap root = ArrayMap.create(); + root.put("truncated", false); + ArrayMap children = ArrayMap.create(); + ArrayMap leaf = ArrayMap.create(); + leaf.put("1", ArrayMap.of("truncated", false)); + leaf.put("2", ArrayMap.of("truncated", false)); + leaf.put("3", ArrayMap.of("truncated", false)); + children.put("gcs:some-bucket.some-folder/", leaf); + root.put("children", children); + arrayMap.put("root", root); + + BoundedTrieData result = + DataflowMetrics.DataflowMetricResultExtractor.trieFromArrayMap(arrayMap); + assertEquals("BoundedTrieData({'gcs:some-bucket.some-folder/false'})", result.toString()); } - */ @Test public void testSingleBoundedTrieUpdates() throws IOException { diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index 620331942fd4..af384ff19c09 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -90,8 +90,6 @@ - - From 16846125a4fbb0dd91a8bb2c2b48cfa6b1c71d9f Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 6 Jun 2025 18:42:54 -0400 Subject: [PATCH 3/4] Fix test: setTrie -> setBoundedTrie --- .../org/apache/beam/runners/dataflow/DataflowMetricsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index 425ca8cecbd9..98e3d4bd4050 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -203,7 +203,7 @@ private MetricUpdate makeStringSetMetricUpdate( private MetricUpdate makeBoundedTrieMetricUpdate( String name, String namespace, String step, BoundedTrie data, boolean tentative) { MetricUpdate update = new MetricUpdate(); - update.setTrie(data); + update.setBoundedTrie(data); return setStructuredName(update, name, namespace, step, tentative); } From 3b0c220d2b39ce263689c5810ffe2affb588fb84 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 9 Jun 2025 20:52:58 -0400 Subject: [PATCH 4/4] Fix comments --- .../java/org/apache/beam/runners/dataflow/DataflowMetrics.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index 2338f63c44ff..06435793b56f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -214,8 +214,7 @@ private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) { Object trieFromResponse = metricUpdate.getBoundedTrie(); // Fail-safely cast Trie returned by dataflow API to BoundedTrieResult if (trieFromResponse instanceof BoundedTrie) { - BoundedTrie bTrie = (BoundedTrie) metricUpdate.getBoundedTrie(); - trieData = BoundedTrieData.fromProto(bTrie); + trieData = BoundedTrieData.fromProto((BoundedTrie) trieFromResponse); } else if (trieFromResponse instanceof ArrayMap) { trieData = trieFromArrayMap((ArrayMap) trieFromResponse); }