Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20250427-2.0.0", // [bomupgrader] sets version
google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0", // [bomupgrader] sets version
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20250106-$google_clients_version",
google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20250519-$google_clients_version",
google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version",
google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20250424-2.0.0", // [bomupgrader] sets version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public final String toString() {
* intended to be used directly outside of {@link BoundedTrieData} with multiple threads.
*/
@VisibleForTesting
static class BoundedTrieNode implements Serializable {
public static class BoundedTrieNode implements Serializable {

public static final String TRUNCATED_TRUE = String.valueOf(true);
public static final String TRUNCATED_FALSE = String.valueOf(false);
Expand All @@ -334,7 +334,7 @@ static class BoundedTrieNode implements Serializable {
private int size;

/** Constructs an empty `BoundedTrieNode` with size 1 and not truncated. */
BoundedTrieNode() {
public BoundedTrieNode() {
this(new HashMap<>(), false, 1);
}

Expand All @@ -345,7 +345,8 @@ static class BoundedTrieNode implements Serializable {
* @param truncated Whether this node is truncated.
* @param size The size of the subtree rooted at this node.
*/
BoundedTrieNode(@Nonnull Map<String, BoundedTrieNode> children, boolean truncated, int size) {
public BoundedTrieNode(
@Nonnull Map<String, BoundedTrieNode> children, boolean truncated, int size) {
this.children = children;
this.size = size;
this.truncated = truncated;
Expand Down Expand Up @@ -561,7 +562,7 @@ boolean contains(List<String> segments) {
*
* @return The size of the subtree.
*/
int getSize() {
public int getSize() {
return size;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.BoundedTrieData;
Expand All @@ -44,9 +45,11 @@
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -133,7 +136,8 @@ private JobMetrics getJobMetrics() throws IOException {
return result;
}

private static class DataflowMetricResultExtractor {
@VisibleForTesting
static class DataflowMetricResultExtractor {
private final ImmutableList.Builder<MetricResult<Long>> counterResults;
private final ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults;
private final ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults;
Expand Down Expand Up @@ -175,7 +179,7 @@ public void addMetricResult(
// stringset metric
StringSetResult value = getStringSetValue(committed);
stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value));
} else if (committed.getTrie() != null && attempted.getTrie() != null) {
} else if (committed.getBoundedTrie() != null && attempted.getBoundedTrie() != null) {
BoundedTrieResult value = getBoundedTrieValue(committed);
boundedTrieResults.add(MetricResult.create(metricKey, !isStreamingJob, value));
} else {
Expand Down Expand Up @@ -206,12 +210,20 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) {
}

private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) {
if (metricUpdate.getTrie() == null) {
BoundedTrieData trieData = null;
Object trieFromResponse = metricUpdate.getBoundedTrie();
// Fail-safely cast Trie returned by dataflow API to BoundedTrieResult
if (trieFromResponse instanceof BoundedTrie) {
trieData = BoundedTrieData.fromProto((BoundedTrie) trieFromResponse);
} else if (trieFromResponse instanceof ArrayMap) {
trieData = trieFromArrayMap((ArrayMap) trieFromResponse);
}

if (trieData != null) {
return BoundedTrieResult.create(trieData.extractResult().getResult());
} else {
return BoundedTrieResult.empty();
}
BoundedTrie bTrie = (BoundedTrie) metricUpdate.getTrie();
BoundedTrieData trieData = BoundedTrieData.fromProto(bTrie);
return BoundedTrieResult.create(trieData.extractResult().getResult());
}

private DistributionResult getDistributionValue(MetricUpdate metricUpdate) {
Expand All @@ -226,6 +238,62 @@ private DistributionResult getDistributionValue(MetricUpdate metricUpdate) {
return DistributionResult.create(sum, count, min, max);
}

/** Translate ArrayMap returned by Dataflow API client to BoundedTrieData. */
@VisibleForTesting
static BoundedTrieData trieFromArrayMap(ArrayMap fieldsMap) {
int bound = 0;
List<String> singleton = null;
Object maybeBound = fieldsMap.get("bound");
if (maybeBound instanceof Number) {
bound = ((Number) maybeBound).intValue();
}
Object maybeSingleton = fieldsMap.get("singleton");
if (maybeSingleton instanceof List) {
List valueList = (List) maybeSingleton;
ImmutableList.Builder<String> builder = ImmutableList.builder();
for (Object stringValue : valueList) {
builder.add((String) stringValue);
}
singleton = builder.build();
}
Object maybeRoot = fieldsMap.get("root");
BoundedTrieData.BoundedTrieNode root = null;
if (maybeRoot instanceof Map) {
root = trieNodeFromMap((Map) maybeRoot);
}
return new BoundedTrieData(singleton, root, bound);
}

/** Translate Map returned by Dataflow API client to BoundedTrieData.BoundedTrieNode. */
private static BoundedTrieData.BoundedTrieNode trieNodeFromMap(Map fieldsMap) {
boolean truncated = false;
Object mayTruncated = fieldsMap.get("truncated");
if (mayTruncated instanceof Boolean) {
truncated = (boolean) mayTruncated;
}
int childrenSize = 0;
ImmutableMap.Builder<String, BoundedTrieData.BoundedTrieNode> builder =
ImmutableMap.builder();
Object maybeChildren = fieldsMap.get("children");
if (maybeChildren instanceof Map) {
Map allChildren = (Map) maybeChildren;
for (Object maybeChildValue : allChildren.entrySet()) {
Map.Entry childValue = (Map.Entry) maybeChildValue;
Object maybeChild = childValue.getValue();
if (maybeChild instanceof Map) {
BoundedTrieData.BoundedTrieNode child = trieNodeFromMap((Map) maybeChild);
Object maybeKey = childValue.getKey();
if (maybeKey instanceof String) {
builder.put((String) maybeKey, child);
}
childrenSize += child.getSize();
}
}
}
Map<String, BoundedTrieData.BoundedTrieNode> children = builder.build();
return new BoundedTrieData.BoundedTrieNode(children, truncated, Math.max(1, childrenSize));
}

public Iterable<MetricResult<DistributionResult>> getDistributionResults() {
return distributionResults.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -202,7 +203,7 @@ private MetricUpdate makeStringSetMetricUpdate(
private MetricUpdate makeBoundedTrieMetricUpdate(
String name, String namespace, String step, BoundedTrie data, boolean tentative) {
MetricUpdate update = new MetricUpdate();
update.setTrie(data);
update.setBoundedTrie(data);
return setStructuredName(update, name, namespace, step, tentative);
}

Expand Down Expand Up @@ -296,6 +297,40 @@ public void testSingleStringSetUpdates() throws IOException {
StringSetResult.create(ImmutableSet.of("ab", "cd")))));
}

@Test
public void testParseBoundedTrieWithSingleton() {
ArrayMap arrayMap = ArrayMap.create();
arrayMap.put("bound", 100);
arrayMap.put(
"singleton", ImmutableList.of("pubsub:", "topic:", "`google.com:abc`.", "some-topic"));

BoundedTrieData result =
DataflowMetrics.DataflowMetricResultExtractor.trieFromArrayMap(arrayMap);
assertEquals(
"BoundedTrieData({'pubsub:topic:`google.com:abc`.some-topicfalse'})", result.toString());
}

@Test
@SuppressWarnings("unchecked") // assemble ArrayMap from scratch for testing
public void testParseBoundedTrieWithRoot() {
ArrayMap arrayMap = ArrayMap.create();
arrayMap.put("bound", 100);
ArrayMap root = ArrayMap.create();
root.put("truncated", false);
ArrayMap children = ArrayMap.create();
ArrayMap leaf = ArrayMap.create();
leaf.put("1", ArrayMap.of("truncated", false));
leaf.put("2", ArrayMap.of("truncated", false));
leaf.put("3", ArrayMap.of("truncated", false));
children.put("gcs:some-bucket.some-folder/", leaf);
root.put("children", children);
arrayMap.put("root", root);

BoundedTrieData result =
DataflowMetrics.DataflowMetricResultExtractor.trieFromArrayMap(arrayMap);
assertEquals("BoundedTrieData({'gcs:some-bucket.some-folder/false'})", result.toString());
}

@Test
public void testSingleBoundedTrieUpdates() throws IOException {
AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);
Expand Down
Loading