Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ public void addMetricResult(
metricKey.metricName(),
metricKey.stepName(),
isStreamingJob ? null : value, // Committed
isStreamingJob ? value : null)); // Attempted
value)); // Attempted
/* In Dataflow streaming jobs, only ATTEMPTED metrics are available.
* In Dataflow batch jobs, only COMMITTED metrics are available.
* In Dataflow batch jobs, only COMMITTED metrics are available, but
* we must provide ATTEMPTED, so we use COMMITTED as a good approximation.
* Reporting the appropriate metric depending on whether it's a batch/streaming job.
*/
} else if (committed.getScalar() != null && attempted.getScalar() != null) {
Expand All @@ -166,9 +167,10 @@ public void addMetricResult(
metricKey.metricName(),
metricKey.stepName(),
isStreamingJob ? null : value, // Committed
isStreamingJob ? value : null)); // Attempted
value)); // Attempted
/* In Dataflow streaming jobs, only ATTEMPTED metrics are available.
* In Dataflow batch jobs, only COMMITTED metrics are available.
* In Dataflow batch jobs, only COMMITTED metrics are available, but
* we must provide ATTEMPTED, so we use COMMITTED as a good approximation.
* Reporting the appropriate metric depending on whether it's a batch/streaming job.
*/
} else {
Expand Down Expand Up @@ -350,10 +352,18 @@ abstract static class DataflowMetricResult<T> implements MetricResult<T> {
public abstract MetricName name();
public abstract String step();
@Nullable
public abstract T committed();
@Nullable
protected abstract T committedInternal();
public abstract T attempted();

public T committed() {
T committed = committedInternal();
if (committed == null) {
throw new UnsupportedOperationException("This runner does not currently support committed"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not so keen on the use of exception instead of null. If something is explicitly optional @Nullable seems reasonable (insert standard rant about how Optional is a better choice but Java screwed it up and Guava is shaded).

I would imagine that changes to this class would just remove @Nullable from attempted, then an additional static factory method like createCommitted(<just committed value>) that populates both fields with that value, createAttempted(<just attempted value>) that leaves committed null and create(<both not nullable>). Incidentally the existing create method seems broken as it fails to mark them as @Nullable. When we plug in the checker framework that should be caught.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see - the javadoc tells you to do this. That sucks. Want to fix the javadoc to have this better spec?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK let's just make Dataflow match the spec and later change the spec to be good.

+ " metrics results. Please use 'attempted' instead.");
}
return committed;
}

public static <T> MetricResult<T> create(MetricName name, String scope,
T committed, T attempted) {
return new AutoValue_DataflowMetrics_DataflowMetricResult<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -208,7 +210,7 @@ public void testSingleCounterUpdates() throws IOException {
DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
MetricQueryResults result = dataflowMetrics.queryMetrics(null);
assertThat(result.counters(), containsInAnyOrder(
attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null)));
attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L)));
assertThat(result.counters(), containsInAnyOrder(
committedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L)));
}
Expand Down Expand Up @@ -241,7 +243,7 @@ public void testIgnoreDistributionButGetCounterUpdates() throws IOException {
DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
MetricQueryResults result = dataflowMetrics.queryMetrics(null);
assertThat(result.counters(), containsInAnyOrder(
attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null)));
attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L)));
assertThat(result.counters(), containsInAnyOrder(
committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L)));
}
Expand Down Expand Up @@ -275,7 +277,7 @@ public void testDistributionUpdates() throws IOException {
MetricQueryResults result = dataflowMetrics.queryMetrics(null);
assertThat(result.distributions(), contains(
attemptedMetricsResult("distributionNamespace", "distributionName", "myStepName",
(DistributionResult) null)));
DistributionResult.create(18, 2, 2, 16))));
assertThat(result.distributions(), contains(
committedMetricsResult("distributionNamespace", "distributionName", "myStepName",
DistributionResult.create(18, 2, 2, 16))));
Expand Down Expand Up @@ -308,9 +310,14 @@ public void testDistributionUpdatesStreaming() throws IOException {

DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
MetricQueryResults result = dataflowMetrics.queryMetrics(null);
assertThat(result.distributions(), contains(
committedMetricsResult("distributionNamespace", "distributionName", "myStepName",
(DistributionResult) null)));
try {
result.distributions().iterator().next().committed();
fail("Expected UnsupportedOperationException");
} catch (UnsupportedOperationException expected) {
assertThat(expected.getMessage(),
containsString("This runner does not currently support committed"
+ " metrics results. Please use 'attempted' instead."));
}
assertThat(result.distributions(), contains(
attemptedMetricsResult("distributionNamespace", "distributionName", "myStepName",
DistributionResult.create(18, 2, 2, 16))));
Expand Down Expand Up @@ -356,9 +363,9 @@ public void testMultipleCounterUpdates() throws IOException {
DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
MetricQueryResults result = dataflowMetrics.queryMetrics(null);
assertThat(result.counters(), containsInAnyOrder(
attemptedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null),
attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", (Long) null),
attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", (Long) null)));
attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L),
attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),
attemptedMetricsResult("otherNamespace", "counterName", "myStepName4", 1200L)));
assertThat(result.counters(), containsInAnyOrder(
committedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L),
committedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),
Expand Down Expand Up @@ -400,10 +407,14 @@ public void testMultipleCounterUpdatesStreaming() throws IOException {

DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
MetricQueryResults result = dataflowMetrics.queryMetrics(null);
assertThat(result.counters(), containsInAnyOrder(
committedMetricsResult("counterNamespace", "counterName", "myStepName", (Long) null),
committedMetricsResult("otherNamespace", "otherCounter", "myStepName3", (Long) null),
committedMetricsResult("otherNamespace", "counterName", "myStepName4", (Long) null)));
try {
result.counters().iterator().next().committed();
fail("Expected UnsupportedOperationException");
} catch (UnsupportedOperationException expected) {
assertThat(expected.getMessage(),
containsString("This runner does not currently support committed"
+ " metrics results. Please use 'attempted' instead."));
}
assertThat(result.counters(), containsInAnyOrder(
attemptedMetricsResult("counterNamespace", "counterName", "myStepName", 1233L),
attemptedMetricsResult("otherNamespace", "otherCounter", "myStepName3", 12L),
Expand Down