diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index 0983674b9831..376befa77f4e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -153,9 +153,10 @@ public void addMetricResult( metricKey.metricName(), metricKey.stepName(), isStreamingJob ? null : value, // Committed - isStreamingJob ? value : null)); // Attempted + value)); // Attempted /* In Dataflow streaming jobs, only ATTEMPTED metrics are available. - * In Dataflow batch jobs, only COMMITTED metrics are available. + * In Dataflow batch jobs, only COMMITTED metrics are available, but + * we must provide ATTEMPTED, so we use COMMITTED as a good approximation. * Reporting the appropriate metric depending on whether it's a batch/streaming job. */ } else if (committed.getScalar() != null && attempted.getScalar() != null) { @@ -166,9 +167,10 @@ public void addMetricResult( metricKey.metricName(), metricKey.stepName(), isStreamingJob ? null : value, // Committed - isStreamingJob ? value : null)); // Attempted + value)); // Attempted /* In Dataflow streaming jobs, only ATTEMPTED metrics are available. - * In Dataflow batch jobs, only COMMITTED metrics are available. + * In Dataflow batch jobs, only COMMITTED metrics are available, but + * we must provide ATTEMPTED, so we use COMMITTED as a good approximation. * Reporting the appropriate metric depending on whether it's a batch/streaming job. */ } else { @@ -350,10 +352,18 @@ abstract static class DataflowMetricResult implements MetricResult { public abstract MetricName name(); public abstract String step(); @Nullable - public abstract T committed(); - @Nullable + protected abstract T committedInternal(); public abstract T attempted(); + public T committed() { + T committed = committedInternal(); + if (committed == null) { + throw new UnsupportedOperationException("This runner does not currently support committed" + + " metrics results. Please use 'attempted' instead."); + } + return committed; + } + public static MetricResult create(MetricName name, String scope, T committed, T attempted) { return new AutoValue_DataflowMetrics_DataflowMetricResult<>( diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index baf02114179a..7aa291065aae 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -22,8 +22,10 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -208,7 +210,7 @@ public void testSingleCounterUpdates() throws IOException { DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); MetricQueryResults result = dataflowMetrics.queryMetrics(null); assertThat(result.counters(), containsInAnyOrder( - attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null))); + attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L))); assertThat(result.counters(), containsInAnyOrder( committedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L))); } @@ -241,7 +243,7 @@ public void testIgnoreDistributionButGetCounterUpdates() throws IOException { DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); MetricQueryResults result = dataflowMetrics.queryMetrics(null); assertThat(result.counters(), containsInAnyOrder( - attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null))); + attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L))); assertThat(result.counters(), containsInAnyOrder( committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L))); } @@ -275,7 +277,7 @@ public void testDistributionUpdates() throws IOException { MetricQueryResults result = dataflowMetrics.queryMetrics(null); assertThat(result.distributions(), contains( attemptedMetricsResult("distributionNamespace", "distributionName", "myStepName", - (DistributionResult) null))); + DistributionResult.create(18, 2, 2, 16)))); assertThat(result.distributions(), contains( committedMetricsResult("distributionNamespace", "distributionName", "myStepName", DistributionResult.create(18, 2, 2, 16)))); @@ -308,9 +310,14 @@ public void testDistributionUpdatesStreaming() throws IOException { DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); MetricQueryResults result = dataflowMetrics.queryMetrics(null); - assertThat(result.distributions(), contains( - committedMetricsResult("distributionNamespace", "distributionName", "myStepName", - (DistributionResult) null))); + try { + result.distributions().iterator().next().committed(); + fail("Expected UnsupportedOperationException"); + } catch (UnsupportedOperationException expected) { + assertThat(expected.getMessage(), + containsString("This runner does not currently support committed" + + " metrics results. Please use 'attempted' instead.")); + } assertThat(result.distributions(), contains( attemptedMetricsResult("distributionNamespace", "distributionName", "myStepName", DistributionResult.create(18, 2, 2, 16)))); @@ -356,9 +363,9 @@ public void testMultipleCounterUpdates() throws IOException { DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); MetricQueryResults result = dataflowMetrics.queryMetrics(null); assertThat(result.counters(), containsInAnyOrder( - attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null), - attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", (Long) null), - attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", (Long) null))); + attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L), + attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L), + attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", 1200L))); assertThat(result.counters(), containsInAnyOrder( committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L), committedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L), @@ -400,10 +407,14 @@ public void testMultipleCounterUpdatesStreaming() throws IOException { DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); MetricQueryResults result = dataflowMetrics.queryMetrics(null); - assertThat(result.counters(), containsInAnyOrder( - committedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null), - committedMetricsResult("otherNamespace", "otherCounter", "myStepName3", (Long) null), - committedMetricsResult("otherNamespace", "counterName", "myStepName4", (Long) null))); + try { + result.counters().iterator().next().committed(); + fail("Expected UnsupportedOperationException"); + } catch (UnsupportedOperationException expected) { + assertThat(expected.getMessage(), + containsString("This runner does not currently support committed" + + " metrics results. Please use 'attempted' instead.")); + } assertThat(result.counters(), containsInAnyOrder( attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L), attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),