diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index 0983674b9831..8c93e97e7549 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -63,7 +63,7 @@ class DataflowMetrics extends MetricResults { * After the job has finished running, Metrics no longer will change, so their results are * cached here. */ - private MetricQueryResults cachedMetricResults = null; + private JobMetrics cachedMetricResults = null; /** * Constructor for the DataflowMetrics class. @@ -88,14 +88,15 @@ private MetricQueryResults populateMetricQueryResults( .build(); } - private MetricQueryResults queryServiceForMetrics(MetricsFilter filter) { + @Override + public MetricQueryResults queryMetrics(MetricsFilter filter) { List metricUpdates; ImmutableList> counters = ImmutableList.of(); ImmutableList> distributions = ImmutableList.of(); ImmutableList> gauges = ImmutableList.of(); JobMetrics jobMetrics; try { - jobMetrics = dataflowClient.getJobMetrics(dataflowPipelineJob.jobId); + jobMetrics = getJobMetrics(); } catch (IOException e) { LOG.warn("Unable to query job metrics.\n"); return DataflowMetricQueryResults.create(counters, distributions, gauges); @@ -106,17 +107,12 @@ private MetricQueryResults queryServiceForMetrics(MetricsFilter filter) { return populateMetricQueryResults(metricUpdates, filter); } - public MetricQueryResults queryMetrics() { - return queryMetrics(null); - } - - @Override - public MetricQueryResults queryMetrics(MetricsFilter filter) { + private JobMetrics getJobMetrics() throws IOException { if (cachedMetricResults != null) { // Metric results have been cached after the job ran. return cachedMetricResults; } - MetricQueryResults result = queryServiceForMetrics(filter); + JobMetrics result = dataflowClient.getJobMetrics(dataflowPipelineJob.jobId); if (dataflowPipelineJob.getState().isTerminal()) { // Add current query result to the cache. cachedMetricResults = result; diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java index baf02114179a..571c2481a455 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java @@ -109,7 +109,7 @@ public void testEmptyMetricUpdates() throws IOException { when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics); DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient); - MetricQueryResults result = dataflowMetrics.queryMetrics(); + MetricQueryResults result = dataflowMetrics.queryMetrics(null); assertThat(ImmutableList.copyOf(result.counters()), is(empty())); assertThat(ImmutableList.copyOf(result.distributions()), is(empty())); }