diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java index 34beaa6b63ed..0e33881d0e37 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieCell.java @@ -74,6 +74,17 @@ public synchronized BoundedTrieData getCumulative() { return value.getCumulative(); } + // Used by Streaming metric container to extract deltas since streaming metrics are + // reported as deltas rather than cumulative as in batch. + // For delta we take the current value then reset the cell to empty so the next call only see + // delta/updates from last call. + public synchronized BoundedTrieData getAndReset() { + // since we are resetting no need to do a deep copy, just change the reference + BoundedTrieData shallowCopy = this.value; + this.value = new BoundedTrieData(); // create new object, should not call reset on existing + return shallowCopy; + } + @Override public MetricName getName() { return name; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java index b1efa59fba33..63fb289d3eec 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/BoundedTrieData.java @@ -180,14 +180,19 @@ public synchronized BoundedTrieResult extractResult() { */ public synchronized void add(Iterable segments) { List segmentsParts = ImmutableList.copyOf(segments); - if (this.root == null) { - if (this.singleton == null || !this.singleton.equals(segmentsParts)) { + if (segmentsParts.isEmpty()) { + return; + } + if (this.singleton == null && this.root == null) { + // empty case + this.singleton = segmentsParts; + } else if (this.singleton != null && this.singleton.equals(segmentsParts)) { + // skip + } else { + if (this.root == null) { this.root = this.asTrie(); this.singleton = null; } - } - - if (this.root != null) { this.root.add(segmentsParts); if (this.root.getSize() > this.bound) { this.root.trim(); @@ -271,6 +276,11 @@ public synchronized boolean contains(@Nonnull List value) { } } + /** @return true if this {@link BoundedTrieData} is empty else false. */ + public boolean isEmpty() { + return (root == null || root.children.isEmpty()) && (singleton == null || singleton.isEmpty()); + } + @Override public final boolean equals(@Nullable Object other) { if (this == other) { @@ -335,7 +345,7 @@ static class BoundedTrieNode implements Serializable { * @param truncated Whether this node is truncated. * @param size The size of the subtree rooted at this node. */ - BoundedTrieNode(Map children, boolean truncated, int size) { + BoundedTrieNode(@Nonnull Map children, boolean truncated, int size) { this.children = children; this.size = size; this.truncated = truncated; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java index ca5421e6f77f..b51e14f6a23a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/BoundedTrieNodeTest.java @@ -882,12 +882,31 @@ public void testClear() { assertTrue(trie.extractResult().getResult().isEmpty()); } + @Test + public void testIsEmpty() { + BoundedTrieData trie = new BoundedTrieData(); + assertTrue(trie.isEmpty()); + + trie.add(Collections.emptyList()); + assertTrue(trie.isEmpty()); + + trie.add(ImmutableList.of("a", "b")); + assertFalse(trie.isEmpty()); + + trie.add(ImmutableList.of("c", "d")); + assertFalse(trie.isEmpty()); + + trie.clear(); + assertTrue(trie.isEmpty()); + } + @Test public void testBoundedTrieDataContains() { BoundedTrieData trie = new BoundedTrieData(); trie.add(ImmutableList.of("a", "b")); assertTrue(trie.contains(ImmutableList.of("a", "b"))); - assertTrue(trie.contains(ImmutableList.of("a"))); + // path ab is not same as path a + assertFalse(trie.contains(ImmutableList.of("a"))); assertFalse(trie.contains(ImmutableList.of("a", "c"))); } @@ -1000,18 +1019,11 @@ public void testGetCumulativeWithRoot() { assertFalse(cumulativeTrie.contains(ImmutableList.of("g", "h"))); } - @Test - public void testAddEmptyPath() { - BoundedTrieData trie = new BoundedTrieData(); - trie.add(Collections.emptyList()); - assertEquals(1, trie.size()); - assertTrue(trie.extractResult().getResult().contains(ImmutableList.of("false"))); - } - @Test public void testContainsEmptyPath() { BoundedTrieData trie = new BoundedTrieData(); trie.add(Collections.emptyList()); - assertTrue(trie.contains(Collections.emptyList())); + assertFalse(trie.contains(Collections.emptyList())); + assertTrue(trie.isEmpty()); } } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index ad03ac713289..1eeef835907f 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -94,6 +94,7 @@ dependencies { // io-kafka is only used in PTransform override so it is optional provided project(":sdks:java:io:kafka") implementation project(":sdks:java:io:google-cloud-platform") + implementation project(":runners:core-java") implementation library.java.avro implementation library.java.bigdataoss_util implementation library.java.commons_codec diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index c79897b5e5c3..5f9342edcfaf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -29,7 +29,9 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.sdk.metrics.BoundedTrieResult; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeResult; @@ -55,6 +57,7 @@ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) class DataflowMetrics extends MetricResults { + private static final Logger LOG = LoggerFactory.getLogger(DataflowMetrics.class); /** * Client for the Dataflow service. This can be used to query the service for information about @@ -104,13 +107,13 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) { ImmutableList> distributions = ImmutableList.of(); ImmutableList> gauges = ImmutableList.of(); ImmutableList> stringSets = ImmutableList.of(); - ImmutableList> boudedTries = ImmutableList.of(); + ImmutableList> boundedTries = ImmutableList.of(); JobMetrics jobMetrics; try { jobMetrics = getJobMetrics(); } catch (IOException e) { LOG.warn("Unable to query job metrics.\n"); - return MetricQueryResults.create(counters, distributions, gauges, stringSets, boudedTries); + return MetricQueryResults.create(counters, distributions, gauges, stringSets, boundedTries); } metricUpdates = firstNonNull(jobMetrics.getMetrics(), Collections.emptyList()); return populateMetricQueryResults(metricUpdates, filter); @@ -134,6 +137,7 @@ private static class DataflowMetricResultExtractor { private final ImmutableList.Builder> distributionResults; private final ImmutableList.Builder> gaugeResults; private final ImmutableList.Builder> stringSetResults; + private final ImmutableList.Builder> boundedTrieResults; private final boolean isStreamingJob; DataflowMetricResultExtractor(boolean isStreamingJob) { @@ -141,6 +145,7 @@ private static class DataflowMetricResultExtractor { distributionResults = ImmutableList.builder(); gaugeResults = ImmutableList.builder(); stringSetResults = ImmutableList.builder(); + boundedTrieResults = ImmutableList.builder(); /* In Dataflow streaming jobs, only ATTEMPTED metrics are available. * In Dataflow batch jobs, only COMMITTED metrics are available, but * we must provide ATTEMPTED, so we use COMMITTED as a good approximation. @@ -169,6 +174,9 @@ public void addMetricResult( // stringset metric StringSetResult value = getStringSetValue(committed); stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); + } else if (committed.getTrie() != null && attempted.getTrie() != null) { + BoundedTrieResult value = getBoundedTrieValue(committed); + boundedTrieResults.add(MetricResult.create(metricKey, !isStreamingJob, value)); } else { // This is exceptionally unexpected. We expect matching user metrics to only have the // value types provided by the Metrics API. @@ -196,6 +204,15 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) { return StringSetResult.create(ImmutableSet.copyOf(((Collection) metricUpdate.getSet()))); } + private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) { + if (metricUpdate.getTrie() == null) { + return BoundedTrieResult.empty(); + } + BoundedTrie bTrie = (BoundedTrie) metricUpdate.getTrie(); + BoundedTrieData trieData = BoundedTrieData.fromProto(bTrie); + return BoundedTrieResult.create(trieData.extractResult().getResult()); + } + private DistributionResult getDistributionValue(MetricUpdate metricUpdate) { if (metricUpdate.getDistribution() == null) { return DistributionResult.IDENTITY_ELEMENT; @@ -220,9 +237,13 @@ public Iterable> getGaugeResults() { return gaugeResults.build(); } - public Iterable> geStringSetResults() { + public Iterable> getStringSetResults() { return stringSetResults.build(); } + + public Iterable> getBoundedTrieResults() { + return boundedTrieResults.build(); + } } private static class DataflowMetricQueryResultsFactory { @@ -388,8 +409,8 @@ public MetricQueryResults build() { extractor.getCounterResults(), extractor.getDistributionResults(), extractor.getGaugeResults(), - extractor.geStringSetResults(), - ImmutableList.of()); + extractor.getStringSetResults(), + extractor.getBoundedTrieResults()); } } } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index 745b065ea841..30f1466d4a54 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -41,11 +41,14 @@ import java.io.IOException; import java.math.BigDecimal; import java.util.Set; +import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.DataflowTemplateJob; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; +import org.apache.beam.sdk.metrics.BoundedTrieResult; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricsFilter; @@ -196,6 +199,13 @@ private MetricUpdate makeStringSetMetricUpdate( return setStructuredName(update, name, namespace, step, tentative); } + private MetricUpdate makeBoundedTrieMetricUpdate( + String name, String namespace, String step, BoundedTrie data, boolean tentative) { + MetricUpdate update = new MetricUpdate(); + update.setTrie(data); + return setStructuredName(update, name, namespace, step, tentative); + } + @Test public void testSingleCounterUpdates() throws IOException { AppliedPTransform myStep = mock(AppliedPTransform.class); @@ -286,6 +296,64 @@ public void testSingleStringSetUpdates() throws IOException { StringSetResult.create(ImmutableSet.of("ab", "cd"))))); } + @Test + public void testSingleBoundedTrieUpdates() throws IOException { + AppliedPTransform myStep = mock(AppliedPTransform.class); + when(myStep.getFullName()).thenReturn("myStepName"); + BiMap, String> transformStepNames = HashBiMap.create(); + transformStepNames.put(myStep, "s2"); + + JobMetrics jobMetrics = new JobMetrics(); + DataflowPipelineJob job = mock(DataflowPipelineJob.class); + DataflowPipelineOptions options = mock(DataflowPipelineOptions.class); + when(options.isStreaming()).thenReturn(false); + when(job.getDataflowOptions()).thenReturn(options); + when(job.getState()).thenReturn(State.RUNNING); + when(job.getJobId()).thenReturn(JOB_ID); + when(job.getTransformStepNames()).thenReturn(transformStepNames); + + // The parser relies on the fact that one tentative and one committed metric update exist in + // the job metrics results. + MetricUpdate mu1 = + makeBoundedTrieMetricUpdate( + "counterName", + "counterNamespace", + "s2", + new BoundedTrieData(ImmutableList.of("ab", "cd")).toProto(), + false); + MetricUpdate mu1Tentative = + makeBoundedTrieMetricUpdate( + "counterName", + "counterNamespace", + "s2", + new BoundedTrieData(ImmutableList.of("ab", "cd")).toProto(), + true); + jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative)); + DataflowClient dataflowClient = mock(DataflowClient.class); + when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); + + DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); + MetricQueryResults result = dataflowMetrics.allMetrics(); + assertThat( + result.getBoundedTries(), + containsInAnyOrder( + attemptedMetricsResult( + "counterNamespace", + "counterName", + "myStepName", + BoundedTrieResult.create( + ImmutableSet.of(ImmutableList.of("ab", "cd", String.valueOf(false))))))); + assertThat( + result.getBoundedTries(), + containsInAnyOrder( + committedMetricsResult( + "counterNamespace", + "counterName", + "myStepName", + BoundedTrieResult.create( + ImmutableSet.of(ImmutableList.of("ab", "cd", String.valueOf(false))))))); + } + @Test public void testIgnoreDistributionButGetCounterUpdates() throws IOException { AppliedPTransform myStep = mock(AppliedPTransform.class); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index cf07e5f72401..1bfaaceb8253 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -519,6 +519,11 @@ public Iterable extractMetricUpdates(boolean isFinalUpdate) { .transform( update -> MetricsToCounterUpdateConverter.fromStringSet( + update.getKey(), true, update.getUpdate())), + FluentIterable.from(updates.boundedTrieUpdates()) + .transform( + update -> + MetricsToCounterUpdateConverter.fromBoundedTrie( update.getKey(), true, update.getUpdate()))); }); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java index 4866d2011222..6d37deb847cc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java @@ -19,6 +19,8 @@ import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt; +import com.google.api.services.dataflow.model.BoundedTrie; +import com.google.api.services.dataflow.model.BoundedTrieNode; import com.google.api.services.dataflow.model.CounterMetadata; import com.google.api.services.dataflow.model.CounterStructuredName; import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata; @@ -27,10 +29,15 @@ import com.google.api.services.dataflow.model.IntegerGauge; import com.google.api.services.dataflow.model.StringList; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.model.pipeline.v1.MetricsApi; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.core.metrics.DistributionData; import org.apache.beam.runners.core.metrics.StringSetData; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; /** Convertor from Metrics to {@link CounterUpdate} protos. */ @SuppressWarnings({ @@ -111,6 +118,46 @@ public static CounterUpdate fromStringSet( .setStringList(stringList); } + public static CounterUpdate fromBoundedTrie( + MetricKey key, boolean isCumulative, BoundedTrieData boundedTrieData) { + // BoundedTrie uses SET kind metric aggregation which tracks unique strings as a trie. + CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET); + BoundedTrie counterUpdateTrie = getBoundedTrie(boundedTrieData.toProto()); + + return new CounterUpdate() + .setStructuredNameAndMetadata(name) + .setCumulative(isCumulative) + .setBoundedTrie(counterUpdateTrie); + } + + /** + * Converts from org.apache.beam.model.pipeline.v1.BoundedTrie to + * com.google.api.services.dataflow.model.BoundedTrie. This is because even though Dataflow + * CounterUpdate uses org.apache.beam.model.pipeline.v1.BoundedTrieNode in it's definition when + * the google-api client is generated the package is renamed. + * + * @param trie org.apache.beam.model.pipeline.v1.BoundedTrie to be converted + * @return converted com.google.api.services.dataflow.model.BoundedTrie. + */ + @VisibleForTesting + static BoundedTrie getBoundedTrie(MetricsApi.BoundedTrie trie) { + BoundedTrie counterUpdateTrie = new BoundedTrie(); + counterUpdateTrie.setBound(trie.getBound()); + counterUpdateTrie.setSingleton( + trie.getSingletonList().isEmpty() ? null : trie.getSingletonList()); + counterUpdateTrie.setRoot(trie.hasRoot() ? getBoundedTrieNode(trie.getRoot()) : null); + return counterUpdateTrie; + } + + private static BoundedTrieNode getBoundedTrieNode(MetricsApi.BoundedTrieNode node) { + BoundedTrieNode boundedTrieNode = new BoundedTrieNode(); + boundedTrieNode.setTruncated(node.getTruncated()); + Map children = new HashMap<>(); + node.getChildrenMap().forEach((key, value) -> children.put(key, getBoundedTrieNode(value))); + boundedTrieNode.setChildren(children); + return boundedTrieNode; + } + public static CounterUpdate fromDistribution( MetricKey key, boolean isCumulative, DistributionData update) { CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.DISTRIBUTION); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 0135e91eab9f..f9efa2c56751 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nonnull; import org.apache.beam.runners.core.metrics.BoundedTrieCell; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.core.metrics.DistributionData; import org.apache.beam.runners.core.metrics.GaugeCell; import org.apache.beam.runners.core.metrics.MetricsMap; @@ -217,7 +218,8 @@ public Iterable extractUpdates() { return counterUpdates() .append(distributionUpdates()) .append(gaugeUpdates()) - .append(stringSetUpdates()); + .append(stringSetUpdates()) + .append(boundedTrieUpdates()); } private FluentIterable counterUpdates() { @@ -277,6 +279,24 @@ private FluentIterable stringSetUpdates() { .filter(Predicates.notNull()); } + private FluentIterable boundedTrieUpdates() { + return FluentIterable.from(boundedTries.entries()) + .transform( + new Function, CounterUpdate>() { + @Override + public @Nullable CounterUpdate apply( + @Nonnull Map.Entry entry) { + BoundedTrieData value = entry.getValue().getAndReset(); + if (value.isEmpty()) { + return null; + } + return MetricsToCounterUpdateConverter.fromBoundedTrie( + MetricKey.create(stepName, entry.getKey()), false, value); + } + }) + .filter(Predicates.notNull()); + } + private FluentIterable distributionUpdates() { return FluentIterable.from(distributions.entries()) .transform( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java index d3fa69e2c31c..b64856bce7fe 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java @@ -32,6 +32,7 @@ import com.google.api.services.dataflow.model.DistributionUpdate; import com.google.api.services.dataflow.model.StringList; import java.util.Arrays; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; @@ -40,6 +41,7 @@ import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; +import org.apache.beam.sdk.metrics.BoundedTrie; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.MetricName; @@ -47,6 +49,7 @@ import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.StringSet; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.hamcrest.Matchers; import org.junit.Test; import org.junit.runner.RunWith; @@ -193,6 +196,41 @@ public void extractMetricUpdatesStringSet() { assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected)); } + @Test + public void extractMetricUpdatesBoundedTrie() { + BatchModeExecutionContext executionContext = + BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage"); + DataflowOperationContext operationContext = + executionContext.createOperationContext(NameContextsForTests.nameContextForTest()); + + BoundedTrie boundedTrie = + operationContext + .metricsContainer() + .getBoundedTrie(MetricName.named("namespace", "some-bounded-trie")); + boundedTrie.add("ab"); + boundedTrie.add("cd"); + + BoundedTrieData trieData = new BoundedTrieData(); + trieData.add(ImmutableList.of("ab")); + trieData.add(ImmutableList.of("cd")); + + final CounterUpdate expected = + new CounterUpdate() + .setStructuredNameAndMetadata( + new CounterStructuredNameAndMetadata() + .setName( + new CounterStructuredName() + .setOrigin("USER") + .setOriginNamespace("namespace") + .setName("some-bounded-trie") + .setOriginalStepName("originalName")) + .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) + .setCumulative(true) // batch counters are cumulative + .setBoundedTrie(MetricsToCounterUpdateConverter.getBoundedTrie(trieData.toProto())); + + assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected)); + } + @Test public void extractMsecCounters() { BatchModeExecutionContext executionContext = diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java new file mode 100644 index 000000000000..3de9678425e3 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverterTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.google.api.services.dataflow.model.BoundedTrie; +import com.google.api.services.dataflow.model.BoundedTrieNode; +import com.google.api.services.dataflow.model.CounterUpdate; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import org.apache.beam.runners.core.metrics.BoundedTrieData; +import org.apache.beam.runners.core.metrics.StringSetData; +import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind; +import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class MetricsToCounterUpdateConverterTest { + + private static final String STEP_NAME = "testStep"; + private static final String METRIC_NAME = "testMetric"; + private static final String NAMESPACE = "testNamespace"; + + private MetricKey createMetricKey() { + return MetricKey.create(STEP_NAME, MetricName.named(NAMESPACE, METRIC_NAME)); + } + + @Test + public void testFromStringSet() { + MetricKey key = createMetricKey(); + boolean isCumulative = false; + StringSetData stringSetData = StringSetData.create(ImmutableSet.of("a", "b", "c")); + + CounterUpdate counterUpdate = + MetricsToCounterUpdateConverter.fromStringSet(key, isCumulative, stringSetData); + + assertEquals( + Kind.SET.toString(), counterUpdate.getStructuredNameAndMetadata().getMetadata().getKind()); + assertFalse(counterUpdate.getCumulative()); + assertEquals( + stringSetData.stringSet(), new HashSet<>(counterUpdate.getStringList().getElements())); + } + + @Test + public void testFromBoundedTrie() { + MetricKey key = createMetricKey(); + BoundedTrieData boundedTrieData = new BoundedTrieData(); + boundedTrieData.add(ImmutableList.of("ab")); + boundedTrieData.add(ImmutableList.of("cd")); + + CounterUpdate counterUpdate = + MetricsToCounterUpdateConverter.fromBoundedTrie(key, true, boundedTrieData); + + assertEquals( + Kind.SET.toString(), counterUpdate.getStructuredNameAndMetadata().getMetadata().getKind()); + assertTrue(counterUpdate.getCumulative()); + BoundedTrie trie = counterUpdate.getBoundedTrie(); + assertEquals(100, (int) trie.getBound()); + assertNull(trie.getSingleton()); + + BoundedTrieNode root = getMiddleNode(ImmutableList.of("ab", "cd")); + assertEquals(root, trie.getRoot()); + } + + @Test + public void testGetBoundedTrieNodeLevels() { + BoundedTrieData boundedTrieData = new BoundedTrieData(); + boundedTrieData.add(ImmutableList.of("ab")); + boundedTrieData.add(ImmutableList.of("cd")); + boundedTrieData.add(ImmutableList.of("ef", "gh")); + boundedTrieData.add(ImmutableList.of("ef", "xy")); + + BoundedTrie actualTrie = + MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData.toProto()); + + BoundedTrie expectedTrie = new BoundedTrie(); + expectedTrie.setBound(100); + BoundedTrieNode root = new BoundedTrieNode(); + Map rootChildren = new HashMap<>(); + rootChildren.put("ab", getEmptyNode()); + rootChildren.put("cd", getEmptyNode()); + rootChildren.put("ef", getMiddleNode(ImmutableList.of("gh", "xy"))); + root.setChildren(rootChildren); + root.setTruncated(false); + expectedTrie.setRoot(root); + expectedTrie.setSingleton(null); + assertEquals(expectedTrie, actualTrie); + } + + @Test + public void testGetBoundedTrieNodeSingleton() { + BoundedTrieData boundedTrieData = new BoundedTrieData(); + boundedTrieData.add(ImmutableList.of("ab")); + BoundedTrie actualTrie = + MetricsToCounterUpdateConverter.getBoundedTrie(boundedTrieData.toProto()); + + BoundedTrie expectedTrie = new BoundedTrie(); + expectedTrie.setBound(100); + expectedTrie.setSingleton(ImmutableList.of("ab")); + expectedTrie.setRoot(null); + + assertEquals(expectedTrie, actualTrie); + } + + private static BoundedTrieNode getMiddleNode(ImmutableList elements) { + BoundedTrieNode middleNode = new BoundedTrieNode(); + Map children = new HashMap<>(); + elements.forEach(val -> children.put(val, getEmptyNode())); + middleNode.setChildren(children); + middleNode.setTruncated(false); + return middleNode; + } + + private static BoundedTrieNode getEmptyNode() { + BoundedTrieNode leafNode = new BoundedTrieNode(); + leafNode.setChildren(Collections.emptyMap()); + leafNode.setTruncated(false); + return leafNode; + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java index 4b758aa6cd45..3bc16bf2d38c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java @@ -50,8 +50,10 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.runners.core.metrics.BoundedTrieData; import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind; import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Origin; +import org.apache.beam.sdk.metrics.BoundedTrie; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Gauge; import org.apache.beam.sdk.metrics.LabeledMetricNameUtils; @@ -61,6 +63,7 @@ import org.apache.beam.sdk.metrics.NoOpHistogram; import org.apache.beam.sdk.metrics.StringSet; import org.apache.beam.sdk.util.HistogramData; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.hamcrest.collection.IsEmptyIterable; import org.hamcrest.collection.IsMapContaining; @@ -325,6 +328,78 @@ public void testStringSetUpdateExtraction() { assertThat(updates, containsInAnyOrder(name1Update)); } + @Test + public void testBoundedTrieUpdateExtraction() { + BoundedTrie boundedTrie = c1.getBoundedTrie(name1); + boundedTrie.add("ab"); + boundedTrie.add("cd", "ef"); + boundedTrie.add("gh"); + boundedTrie.add("gh"); + + BoundedTrieData expectedName1 = new BoundedTrieData(); + expectedName1.add(ImmutableList.of("ab")); + expectedName1.add(ImmutableList.of("cd", "ef")); + expectedName1.add(ImmutableList.of("gh")); + expectedName1.add(ImmutableList.of("gh")); + + CounterUpdate name1Update = + new CounterUpdate() + .setStructuredNameAndMetadata( + new CounterStructuredNameAndMetadata() + .setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + .setOriginNamespace("ns") + .setName("name1") + .setOriginalStepName("s1")) + .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) + .setCumulative(false) + .setBoundedTrie( + MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto())); + + Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); + assertThat(updates, containsInAnyOrder(name1Update)); + + boundedTrie = c2.getBoundedTrie(name2); + boundedTrie.add("ij"); + boundedTrie.add("kl", "mn"); + boundedTrie.add("mn"); + + BoundedTrieData expectedName2 = new BoundedTrieData(); + expectedName2.add(ImmutableList.of("ij")); + expectedName2.add(ImmutableList.of("kl", "mn")); + expectedName2.add(ImmutableList.of("mn")); + + CounterUpdate name2Update = + new CounterUpdate() + .setStructuredNameAndMetadata( + new CounterStructuredNameAndMetadata() + .setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + .setOriginNamespace("ns") + .setName("name2") + .setOriginalStepName("s2")) + .setMetadata(new CounterMetadata().setKind(Kind.SET.toString()))) + .setCumulative(false) + .setBoundedTrie( + MetricsToCounterUpdateConverter.getBoundedTrie(expectedName2.toProto())); + + updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); + assertThat(updates, containsInAnyOrder(name2Update)); + + // test delta + c1.getBoundedTrie(name1).add("op"); + + expectedName1.clear(); + expectedName1.add(ImmutableList.of("op")); + name1Update.setBoundedTrie( + MetricsToCounterUpdateConverter.getBoundedTrie(expectedName1.toProto())); + + updates = StreamingStepMetricsContainer.extractMetricUpdates(registry); + assertThat(updates, containsInAnyOrder(name1Update)); + } + @Test public void testPerWorkerMetrics() { StreamingStepMetricsContainer.setEnablePerWorkerMetrics(false); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java index 7b6a6195f327..7c4b8d492e10 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java @@ -30,7 +30,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; - import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; import org.apache.beam.fn.harness.Cache;