Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct;

import static java.util.Arrays.asList;

import com.google.auto.value.AutoValue;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.metrics.DistributionData;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricUpdates;
import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.MetricsMap;

/**
* Implementation of {@link MetricResults} for the Direct Runner.
*/
class DirectMetrics extends MetricResults {

// TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner.
private static final ExecutorService COUNTER_COMMITTER = Executors.newCachedThreadPool();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have some shared ThreadPool provided by the runner for internal maintenance.

Filed https://issues.apache.org/jira/browse/BEAM-723. Please add a comment linking to that JIRA.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


private interface MetricAggregation<UpdateT, ResultT> {
UpdateT zero();
UpdateT combine(Iterable<UpdateT> updates);
ResultT extract(UpdateT data);
}

/**
* Implementation of a metric in the direct runner.
*
* @param <UpdateT> The type of raw data received and aggregated across updates.
* @param <ResultT> The type of result extracted from the data.
*/
private static class DirectMetric<UpdateT, ResultT> {
private final MetricAggregation<UpdateT, ResultT> aggregation;

private final AtomicReference<UpdateT> finishedCommitted;

private final Object attemptedLock = new Object();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document what the lock is protecting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@GuardedBy("attemptedLock")
private volatile UpdateT finishedAttempted;
@GuardedBy("attemptedLock")
private final ConcurrentMap<CommittedBundle<?>, UpdateT> inflightAttempted =
new ConcurrentHashMap<>();

public DirectMetric(MetricAggregation<UpdateT, ResultT> aggregation) {
this.aggregation = aggregation;
finishedCommitted = new AtomicReference<>(aggregation.zero());
finishedAttempted = aggregation.zero();
}

/**
* Add the given {@code tentativeCumulative} update to the physical aggregate.
*
* @param bundle The bundle receiving an update.
* @param tentativeCumulative The new cumulative value for the given bundle.
*/
public void updatePhysical(CommittedBundle<?> bundle, UpdateT tentativeCumulative) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love some javadoc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. But also private class.

// Add (or update) the cumulatiev value for the given bundle.
inflightAttempted.put(bundle, tentativeCumulative);
}

/**
* Commit a physical value for the given {@code bundle}.
*
* @param bundle The bundle being committed.
* @param finalCumulative The final cumulative value for the given bundle.
*/
public void commitPhysical(final CommittedBundle<?> bundle, final UpdateT finalCumulative) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. But also private class.

// To prevent a query from blocking the commit, we perform the commit in two steps.
// 1. We perform a non-blocking write to the uncommitted table to make the new vaule
// available immediately.
// 2. We submit a runnable that will commit the update and remove the tentative value in
// a synchronized block.
inflightAttempted.put(bundle, finalCumulative);
COUNTER_COMMITTER.submit(new Runnable() {
@Override
public void run() {
synchronized (attemptedLock) {
finishedAttempted = aggregation.combine(asList(finishedAttempted, finalCumulative));
inflightAttempted.remove(bundle);
}
}
});
}

/** Extract the latest values from all attempted and in-progress bundles. */
public ResultT extractLatestAttempted() {
ArrayList<UpdateT> updates = new ArrayList<>(inflightAttempted.size() + 1);
// Within this block we know that will be consistent. Specifically, the only change that can
// happen concurrently is the addition of new (larger) values to inflightAttempted.
synchronized (attemptedLock) {
updates.add(finishedAttempted);
updates.addAll(inflightAttempted.values());
}
return aggregation.extract(aggregation.combine(updates));
}

/**
* Commit a logical value for the given {@code bundle}.
*
* @param bundle The bundle being committed.
* @param finalCumulative The final cumulative value for the given bundle.
*/
public void commitLogical(final CommittedBundle<?> bundle, final UpdateT finalCumulative) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

UpdateT current;
do {
current = finishedCommitted.get();
} while (!finishedCommitted.compareAndSet(current,
aggregation.combine(asList(current, finalCumulative))));
}

/** Extract the value from all successfully committed bundles. */
public ResultT extractCommitted() {
return aggregation.extract(finishedCommitted.get());
}
}

private static final MetricAggregation<Long, Long> COUNTER =
new MetricAggregation<Long, Long>() {
@Override
public Long zero() {
return 0L;
}

@Override
public Long combine(Iterable<Long> updates) {
long value = 0;
for (long update : updates) {
value += update;
}
return value;
}

@Override
public Long extract(Long data) {
return data;
}
};

private static final MetricAggregation<DistributionData, DistributionResult> DISTRIBUTION =
new MetricAggregation<DistributionData, DistributionResult>() {
@Override
public DistributionData zero() {
return DistributionData.EMPTY;
}

@Override
public DistributionData combine(Iterable<DistributionData> updates) {
DistributionData result = DistributionData.EMPTY;
for (DistributionData update : updates) {
result = result.combine(update);
}
return result;
}

@Override
public DistributionResult extract(DistributionData data) {
return data.extractResult();
}
};

/** The current values of counters in memory. */
private MetricsMap<MetricKey, DirectMetric<Long, Long>> counters =
new MetricsMap<>(new MetricsMap.Factory<MetricKey, DirectMetric<Long, Long>>() {
@Override
public DirectMetric<Long, Long> createInstance(MetricKey unusedKey) {
return new DirectMetric<>(COUNTER);
}
});
private MetricsMap<MetricKey, DirectMetric<DistributionData, DistributionResult>> distributions =
new MetricsMap<>(
new MetricsMap.Factory<MetricKey, DirectMetric<DistributionData, DistributionResult>>() {
@Override
public DirectMetric<DistributionData, DistributionResult> createInstance(
MetricKey unusedKey) {
return new DirectMetric<>(DISTRIBUTION);
}
});

@AutoValue
abstract static class DirectMetricQueryResults implements MetricQueryResults {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems generic enough for other runners. Could MetricQueryResults be a concrete class instead of an interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe -- MetricQueryResults (and many of these other POJO-like classes/interfaces) will likely move towards being backed by some underlying JSON or Proto or whatever-protocol is being used. Specifically, I would normally expect these results objects to be implemented on top of whatever kind of response we've gotten.

public static MetricQueryResults create(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions) {
return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, distributions);
}
}

@AutoValue
abstract static class DirectMetricResult<T> implements MetricResult<T> {
public static <T> MetricResult<T> create(MetricName name, String scope,
T committed, T attempted) {
return new AutoValue_DirectMetrics_DirectMetricResult<T>(
name, scope, committed, attempted);
}
}

@Override
public MetricQueryResults queryMetrics(MetricsFilter filter) {
ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder();
for (Entry<MetricKey, DirectMetric<Long, Long>> counter : counters.entries()) {
maybeExtractResult(filter, counterResults, counter);
}
ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults =
ImmutableList.builder();
for (Entry<MetricKey, DirectMetric<DistributionData, DistributionResult>> distribution
: distributions.entries()) {
maybeExtractResult(filter, distributionResults, distribution);
}

return DirectMetricQueryResults.create(counterResults.build(), distributionResults.build());
}

private <ResultT> void maybeExtractResult(
MetricsFilter filter,
ImmutableList.Builder<MetricResult<ResultT>> resultsBuilder,
Map.Entry<MetricKey, ? extends DirectMetric<?, ResultT>> entry) {
if (matches(filter, entry.getKey())) {
resultsBuilder.add(DirectMetricResult.create(
entry.getKey().metricName(),
entry.getKey().stepName(),
entry.getValue().extractCommitted(),
entry.getValue().extractLatestAttempted()));
}
}

// Matching logic is implemented here rather than in MetricsFilter because we would like
// MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it with
// a Proto/JSON/etc. schema object.
private boolean matches(MetricsFilter filter, MetricKey key) {
return matchesName(key.metricName(), filter.names())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like the inverted responsibility for what I would expect MetricsFilter to have (especially when compared to the Filter#by() PTransform.)

MetricsFilter provides the names and steps to allow a runner to do better filtering without executing the user code, right? A reasoning comment to why it's just a value object rather than something with real behavior might be valuable here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added:

  // Matching logic is implemented here rather than in MetricsFilter because we would like
  // MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it with
  // a Proto/JSON/etc. schema object.

&& matchesScope(key.stepName(), filter.steps());
}

private boolean matchesScope(String actualScope, Set<String> scopes) {
if (scopes.isEmpty() || scopes.contains(actualScope)) {
return true;
}

for (String scope : scopes) {
if (actualScope.startsWith(scope)) {
return true;
}
}

return false;
}

private boolean matchesName(MetricName metricName, Set<MetricNameFilter> nameFilters) {
if (nameFilters.isEmpty()) {
return true;
}

for (MetricNameFilter nameFilter : nameFilters) {
if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name()))
&& Objects.equal(metricName.namespace(), nameFilter.getNamespace())) {
return true;
}
}

return false;
}

/** Apply metric updates that represent physical counter deltas to the current metric values. */
public void updatePhysical(CommittedBundle<?> bundle, MetricUpdates updates) {
for (MetricUpdate<Long> counter : updates.counterUpdates()) {
counters.get(counter.getKey()).updatePhysical(bundle, counter.getUpdate());
}
for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) {
distributions.get(distribution.getKey())
.updatePhysical(bundle, distribution.getUpdate());
}
}

public void commitPhysical(CommittedBundle<?> bundle, MetricUpdates updates) {
for (MetricUpdate<Long> counter : updates.counterUpdates()) {
counters.get(counter.getKey()).commitPhysical(bundle, counter.getUpdate());
}
for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) {
distributions.get(distribution.getKey())
.commitPhysical(bundle, distribution.getUpdate());
}
}

/** Apply metric updates that represent new logical values from a bundle being committed. */
public void commitLogical(CommittedBundle<?> bundle, MetricUpdates updates) {
for (MetricUpdate<Long> counter : updates.counterUpdates()) {
counters.get(counter.getKey()).commitLogical(bundle, counter.getUpdate());
}
for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) {
distributions.get(distribution.getKey())
.commitLogical(bundle, distribution.getUpdate());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.TestStream;
Expand Down Expand Up @@ -225,6 +227,7 @@ public <OutputT extends POutput, InputT extends PInput> OutputT apply(

@Override
public DirectPipelineResult run(Pipeline pipeline) {
MetricsEnvironment.setMetricsSupported(true);
ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor();
pipeline.traverseTopologically(consumerTrackingVisitor);
for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) {
Expand Down Expand Up @@ -267,8 +270,7 @@ public DirectPipelineResult run(Pipeline pipeline) {

Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
pipeline.getAggregatorSteps();
DirectPipelineResult result =
new DirectPipelineResult(executor, context, aggregatorSteps);
DirectPipelineResult result = new DirectPipelineResult(executor, context, aggregatorSteps);
if (options.isBlockOnRun()) {
try {
result.awaitCompletion();
Expand Down Expand Up @@ -380,6 +382,11 @@ public String toString() {
};
}

@Override
public MetricResults metrics() {
return evaluationContext.getMetrics();
}

/**
* Blocks until the {@link Pipeline} execution represented by this
* {@link DirectPipelineResult} is complete, returning the terminal state.
Expand Down
Loading