Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ public StringSetData getCumulative() {
return setValue.get();
}

// Used by Streaming metric container to extract deltas since streaming metrics are
// reported as deltas rather than cumulative as in batch.
// For delta we take the current value then reset the cell to empty so the next call only see
// delta/updates from last call.
public StringSetData getAndReset() {
return setValue.getAndUpdate(unused -> StringSetData.empty());
}

@Override
public MetricName getName() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ public Iterable<CounterUpdate> extractMetricUpdates(boolean isFinalUpdate) {
.transform(
update ->
MetricsToCounterUpdateConverter.fromStringSet(
update.getKey(), update.getUpdate())));
update.getKey(), true, update.getUpdate())));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,16 @@ public static CounterUpdate fromGauge(
.setIntegerGauge(integerGaugeProto);
}

public static CounterUpdate fromStringSet(MetricKey key, StringSetData stringSetData) {
public static CounterUpdate fromStringSet(
MetricKey key, boolean isCumulative, StringSetData stringSetData) {
CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET);

StringList stringList = new StringList();
stringList.setElements(new ArrayList<>(stringSetData.stringSet()));

return new CounterUpdate()
.setStructuredNameAndMetadata(name)
.setCumulative(false)
.setCumulative(isCumulative)
.setStringList(stringList);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.runners.core.metrics.GaugeCell;
import org.apache.beam.runners.core.metrics.MetricsMap;
import org.apache.beam.runners.core.metrics.StringSetCell;
import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
Expand Down Expand Up @@ -73,7 +74,7 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
private final ConcurrentHashMap<MetricName, GaugeCell> perWorkerGauges =
new ConcurrentHashMap<>();

private MetricsMap<MetricName, StringSetCell> stringSet = new MetricsMap<>(StringSetCell::new);
private MetricsMap<MetricName, StringSetCell> stringSets = new MetricsMap<>(StringSetCell::new);

private MetricsMap<MetricName, DeltaDistributionCell> distributions =
new MetricsMap<>(DeltaDistributionCell::new);
Expand Down Expand Up @@ -182,7 +183,7 @@ public Gauge getPerWorkerGauge(MetricName metricName) {

@Override
public StringSet getStringSet(MetricName metricName) {
return stringSet.get(metricName);
return stringSets.get(metricName);
}

@Override
Expand All @@ -202,9 +203,11 @@ public Histogram getPerWorkerHistogram(
}

public Iterable<CounterUpdate> extractUpdates() {
// Streaming metrics are updated as delta and not cumulative.
return counterUpdates()
.append(distributionUpdates())
.append(gaugeUpdates().append(stringSetUpdates()));
.append(gaugeUpdates())
.append(stringSetUpdates());
}

private FluentIterable<CounterUpdate> counterUpdates() {
Expand Down Expand Up @@ -247,14 +250,18 @@ private FluentIterable<CounterUpdate> gaugeUpdates() {
}

private FluentIterable<CounterUpdate> stringSetUpdates() {
return FluentIterable.from(stringSet.entries())
return FluentIterable.from(stringSets.entries())
.transform(
new Function<Entry<MetricName, StringSetCell>, CounterUpdate>() {
@Override
public @Nullable CounterUpdate apply(
@Nonnull Map.Entry<MetricName, StringSetCell> entry) {
StringSetData value = entry.getValue().getAndReset();
if (value.stringSet().isEmpty()) {
return null;
}
return MetricsToCounterUpdateConverter.fromStringSet(
MetricKey.create(stepName, entry.getKey()), entry.getValue().getCumulative());
MetricKey.create(stepName, entry.getKey()), false, value);
}
})
.filter(Predicates.notNull());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,14 @@ public void testStringSetUpdateExtraction() {
.setStringList(new StringList().setElements(Arrays.asList("ij", "kl", "mn")));

updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update, name2Update));
assertThat(updates, containsInAnyOrder(name2Update));

// test deltas
c1.getStringSet(name1).add("op");
name1Update.setStringList(
new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh", "op")));
name1Update.setStringList(new StringList().setElements(Arrays.asList("op")));

updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
assertThat(updates, containsInAnyOrder(name1Update, name2Update));
assertThat(updates, containsInAnyOrder(name1Update));
}

@Test
Expand Down
Loading