Skip to content

Conversation

@bjchambers
Copy link
Contributor

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • [*] Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • [*] Make sure tests pass via mvn clean verify. (Even better, enable
    Travis-CI on your fork and ensure the whole test matrix passes).
  • [*] Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • [*] If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

@bjchambers
Copy link
Contributor Author

R: @tgroh

@bjchambers
Copy link
Contributor Author

Implementing the Distribution metric revealed a lot of problems with some of the internal mechanism in the first commit. Happy to squash those together prior to review if it makes things simpler -- just let me know.

@sumitchawla
Copy link

+1 for this feature

@iemejia
Copy link
Member

iemejia commented Sep 29, 2016

Such an useful idea, great. Question, is there already a sort of BIP or other document about the proposed Metrics API ?

@jbonofre
Copy link
Member

It looks good. Thanks ! Need to take a deeper look.

@swegner
Copy link
Contributor

swegner commented Sep 29, 2016

+R: @swegner

Copy link
Contributor

@swegner swegner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving the DirectRunner review for @tgroh

T metric = metrics.get(key);
if (metric == null) {
metric = createInstance();
if (metrics.putIfAbsent(key, metric) == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentMap.computeIfAbsent would also work here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't use Java 8 methods in the SDK.

return Iterables.unmodifiableIterable(metrics.entrySet());
}

protected abstract T createInstance();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type of polymorphic factory method has been problematic in the past, for example in the old StateSampler and ExecutionContext, because the abstract class defines the spec for object creation. In the ExecutionContext case, we had an abstract getOrCreateStepContext, except one runner needed an extra [StateSampler] param.

Perhaps a better design would be to initialize the MetricsContainer or MetricsMap with an abstract factory for constructing metric instances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* {@code metricName} in this container.
*/
public CounterCell getOrCreateCounter(MetricName metricName) {
return counters.getOrCreate(metricName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these getOrCreateFoo() methods had the create logic embedded, you could use a regular ConcurrentMap and get rid of MetricsMap.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not completely sure what you mean -- I suspect you're proposing either inlining the logic from MetricsMap#getOrCreate into each of these methods or passing a function to MetricsMap#getOrCreate to do that.

I actually prefer leaving it like it is, although I'm open to renaming the class. There are several different places where we need the functionality of a "create-on-demand, concurrent map" and it is useful to have that in one place so we can make changes in that one place.

}

public static MetricName named(Class<?> namespace, String name) {
return new AutoValue_MetricName(namespace.getSimpleName(), name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkNotNull(namespace). Might also want to check the class isn't anonymous since getSimpleName() will fail.

AutoValue will handle the rest of the nullability checks for you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (in local changes)

public abstract String getNamespace();

/**
* The name of this metric. Should only be null in {@link MetricFilter MetricFilters} being used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding nullability so we can re-use this type for filtering semantics is gross. Can we update the MetricFilter to encapsulate this logic instead? For example, have separate filters for name and namespace, rather than a partial MetricName?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, but what do we do if they are both set? Do we intersect them so that only metrics that are listed in both the set of namespace/name pairs you've requested AND the list of namespaces you've requested show up? Or do we treat it as OR (even though other filters are treated as AND).

Maybe the right solution would be to have a separate type for "MetricNameFilter" that makes the name optional, and use that in filters?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One pattern for search queries is to specify each dimension filter separately. Each dimension can define its own semantics, and the overall filter takes the intersection of all dimensions.

In this case, you might have a MetricNameFilter class with is(MetricName) and hasNamespace(String) factory methods. And a MetricsQuery which takes filters on different dimensions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done in an earlier revision.

};

@AutoValue
abstract static class DirectMetricQueryResults implements MetricQueryResults {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems generic enough for other runners. Could MetricQueryResults be a concrete class instead of an interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe -- MetricQueryResults (and many of these other POJO-like classes/interfaces) will likely move towards being backed by some underlying JSON or Proto or whatever-protocol is being used. Specifically, I would normally expect these results objects to be implemented on top of whatever kind of response we've gotten.

}
}

private boolean matches(MetricFilter filter, MetricKey key) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this logic go in MetricFilter ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See responses elsewhere about trying to keep the "protocol"-like classes simple, so they can be swapped out using an actual protocol.


@Override
public MetricResults metrics() {
throw new UnsupportedOperationException("FlinkRunnerResult does not support metrics.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use consistent wording across runners. "does not support metrics" vs. "not supported by [...] runner yet"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private final PipelineExecutor executor;
private final EvaluationContext evaluationContext;
private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps;
private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps;;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
@Nullable AggregatorContainer.Mutator getAggregatorChanges();

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logical

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@bjchambers
Copy link
Contributor Author

@iemejia : We have some thoughts on the general direction but no formal document prepared -- I'm working on writing one up that I'll share on the mailing list. In the meantime, I'm happy to answer any questions you may have on the mailing list!

@swegner: Updated to reflect your comments. Right now, the direct-runner only commits physical counters at the end since they're now cumulative. I'll look at a mechanism to manage tentative physical values as an addition.

@swegner
Copy link
Contributor

swegner commented Oct 3, 2016

Addition tests? There's a number of public classes which don't have unit tests. It'd also be nice to see a RunnableOnService test which demonstrates usage in a pipeline.

if (container == null && REPORTED_MISSING_CONTAINER.compareAndSet(false, true)) {
LOGGER.error("Unable to get {} for the current thread.\n"
+ "Most likely caused by using a runner that doesn't support metrics.\n"
+ "May also be caused by reporting metrics from outside the work-execution therad",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

therad -> thread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

@Nullable
public T getUpdateIfDirty() {
DirtyState state = dirty.get();
if (state != DirtyState.CLEAN) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible race condition for non-atomic check-and-set. However, I believe the race condition is ok because it will simply result in extra updates. Perhaps add a comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a loop and a comment explaining why it shouldn't loop under normal circumstances.

// When a delta is extracting, they transition to the COMMITTING state.
// When a delta is committed, it transitions to the CLEAN state only if it is in the COMMITTING
// state. This ensures that counters that were modified after the delta was extracted but before
// it was committed are not falsely marked as CLEAN.delta was being committed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove delta was being committed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*/
@Nullable
public T getUpdateIfDirty() {
DirtyState state = dirty.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: Invert conditional and reduce nesting for readability:

if (dirty.get() == DirtyState.CLEAN) {
    // If the metric was clean, we know no changes have been made since the last call to
    // getUpdateIfDirty, so we can return null.
    return null;
}
dirty.set(DirtyState.COMMITTING);
return getCumulative();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually prefer to leave the nesting in cases like this, since it makes it clearer we're looking at multiple cases. I reserve reducing nesting for cases where one case is an error path or the where there is significant code in the nested path.


/** Increment the counter by the given amount. */
public void add(long n) {
markDirty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there's a race condition here (threads T1 and T2):

  • T1: call .add()
  • T1: add() -> markDirty() (state => DIRTY)
  • [context switch]
  • T2: call getUpdateIfDirty() (state => COMMITTING)
  • T2: call 'commitUpdate()(state =>CLEAN`)
  • [context switch]
  • T1: add() -> value.addAndGet(n)
  • [context switch]
  • T2: call getUpdateIfDirty() (no update)

In this interleaving, an unread update can exist in the CLEAN state. If no other values are written, this can cause the last update to be lost.

Moving the markDirty() call to the end of the method would prevent this case, however it could cause a new race.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this might need a lock or a concurrent data structure that holds both bits (value and dirty flag). ReaderWriterLock would do the trick.

AtomicMarkableReference also looks interesting. With some refactoring I believe it could replace the AtomicReference and also track the dirty state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know that we need to go that far. I think the right behavior is to call markDirty() after the add(), as you initially hinted at. The race then becomes we may have reported a value that included the added data and then mark dirty and then report it again with no change. But, the dirtiness is just an optimization intended to allow us to report less often than we need, so this isn't a problem.

}

/**
* Add a step filter.
Copy link
Contributor

@swegner swegner Oct 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make these to be more descriptive of how they will contribute to the query filter:

Filter to metrics for the specified step.

If no steps are specified then metrics will be included for all steps. If multiple steps are specified, then metrics will be included for each.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* <p>If no name filters are specified then metrics will be returned regardless of what name
* they have.
*/
public Builder addNameFilter(MetricNameFilter nameFilter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about a filter on metric type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet implemented, we can add one in a future iteration of this API.

public abstract class MetricResults {

/** Retrieve the current counter value. */
public MetricResult<Long> getCounter(MetricNameFilter name, @Nullable String step) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ease of use, perhaps have overloads with/without the step parameter, rather than forcing users to pass a null step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually wonder if the step should be required. Otherwise, it is very likely that we'll get multiple metrics. Similarly, maybe the name should be a MetricName so that there must be a name specified. Any thoughts?

Alternatively, this could return the Iterable which would remove the need for their to be a single match...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same functionality can be accomplished with the more general query API, but this single-metric version is a nice lightweight syntax for the single-metric case. So looking at it from the usage perspective:

Pipeline p = Pipeline.create(..);
MetricName fooSize = MetricName.create(MyPipeline.class, "fooSize");
p.apply(..)
  .apply(MapElements.via(foo => {
    fooSize.add(foo.size());
    // ..
  })
  .apply(..);

PipelineResult result = p.run();
while (true) {
  Thread.sleep(1000);
  LOG.info("Mean foo size: {}", result.metrics().getDistribution(fooSize).committed().mean());
}
  • I like the ability to not specify a step name, otherwise using metrics also requires naming step PTransform application.
  • Providing a MetricName instead of a MetricNameFilter seems reasonable, since we really only want equality matching on a MetricName and not any other semantics MetricNameFilter may provide.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing these APIs for now. These are easy layers to add on top of the existing query functionality, so we should see what is useful/necessary later.

}
}

@Experimental(Kind.METRICS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Move the annotation after the javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

public static MetricsContainer getCurrentContainer() {
MetricsContainer container = CONTAINER_FOR_THREAD.get();
if (container == null && REPORTED_MISSING_CONTAINER.compareAndSet(false, true)) {
LOGGER.error("Unable to get {} for the current thread.\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming metrics get baked into standard transforms, this log message will be displayed every time a pipeline is executed on a runner that doesn't yet support metrics. Perhaps make it a bit less scary? (warning instead of error, remove reference to internal MetricsContainer, reword message to assume no runner support, ...)

Or provide a more explicit signal to differentiate between runners which don't support metrics or using metrics on the wrong thread. For example, you could keep a bit that tracks whether MetricsContainer.setMetricsContainer() has ever been called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setMetricsContainer needs to be kept lean in case it is run between every element (which is possible). I'll add a static setMetricsSupported that a runner which supports metrics can use to report the less-scary message.

@AutoValue
public abstract class MetricsFilter {

public Set<String> steps() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this a StepFilter will give room to more easily add additional step filter conditions in the future. For example: all steps for a specified PTransform type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I kind of like that but I also worry about verbosity. I would think that if we had a StepFilter, we would still like to support addStep(step) as a short-hand for addStepFilter(StepFilter.forStep(step)), in which case we don't need to make that change now, since it would be an implementation change not an API change.

Copy link
Member

@tgroh tgroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all the way through yet, but I have some initial comments

}

/** Increment the counter by the given amount. */
public void inc(long n) {
Copy link
Member

@tgroh tgroh Oct 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add? Relate to AtomicLong.incrementAndGet() vs AtomicLong.addAndGet(long)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Went with these names to mimic Dropwizard: http://metrics.dropwizard.io/3.1.0/apidocs/com/codahale/metrics/Counter.html.

Although by that logic we should consider renaming Distribution to http://metrics.dropwizard.io/3.1.0/apidocs/com/codahale/metrics/Histogram.html

How strongly do you think it should be add?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kind of conflicted - we're naming in-line with a relatively standard library each way, so it's up to you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna vote for dropwizard here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I think Dropwizard is more closely aligned with what we're building. It may merit renaming Distribution back to Histogram for consistency, in fact.


/** Increment the counter by the given amount. */
public void add(long n) {
markDirty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line seems as though it should occur after the actual mutation; otherwise cleaning may not see the updated value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See latest commits -- it has been moved afterwards, and is now called "markDirtyAfterUpdate" to emphasize it should come at the end.

}

public static MetricName named(Class<?> namespace, String name) {
return new AutoValue_MetricName(namespace.getSimpleName(), name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use the fully qualified namespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable -- we still need to nail down how the namespace should be setup in Python, so I don't want to get too hung up on this. It seems like the full path is reasonable though, switched to .getName().

try {
return Iterables.getOnlyElement(metrics.counters());
} catch (IllegalArgumentException e) {
throw new RuntimeException("Expected one matching counter", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably still be an IllegalArgumentException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cut this API for now.

import org.apache.beam.sdk.annotations.Experimental.Kind;

/**
* Simple POJO representing a filter for querying metrics.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AutoValue-able?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. It's a bit painful because we want to use ImmutableSet without adding that to the API surface. Scott had asked the same thing, and I pointed out it doesn't work for that reason. Although I may have made it work by using AutoValue as protected methods and just casting up for the public methdos... we'll see if that passes the API surface tests.

if (metric == null) {
metric = factory.createInstance(key);
if (metrics.putIfAbsent(key, metric) == null) {
metric = metrics.get(key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if statement is backwards. If you get null back, you won and your current value is in the map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, yep, makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

T metric = metrics.get(key);
if (metric == null) {
  metric = factory.createInstance(key);
  metric = Objects.firstNonNull(metrics.putIfAbsent(key, metric), metric);
}

Look better?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a much cleaner way then I usually do it. Nice!

(Though MoreObjects)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Test
public void testIncrementsCell() {
MetricsContainer container = Mockito.mock(MetricsContainer.class);
CounterCell cell = Mockito.mock(CounterCell.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you use a real cell?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to keep the test more narrowly scoped, but I suppose we could... Done.

@bjchambers bjchambers force-pushed the metrics branch 2 times, most recently from da98508 to e26859c Compare October 5, 2016 00:42

private final AggregatorContainer mergedAggregators;

private final DirectMetrics metrics = new DirectMetrics();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize in the constructor, like we do with mergedAggregators, applicationStateInternals and others.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

@Override
public MetricResults metrics() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Sprak/Spark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

/**
* Return the object to access metrics from the pipeline.
*
* <p>Runners that don't support metrics will throw an {@link UnsupportedOperationException}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make this an @throws

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

/** Increment the counter by the given amount. */
public void inc(long n) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kind of conflicted - we're naming in-line with a relatively standard library each way, so it's up to you.

});
}

public ResultT extractLogical() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returns the tentative logical value rather than the committed, which doesn't seem correct.

Additionally, we never call updateLogical, so this really should just be an atomic compareAndSet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
private void processElements(
TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements)
private void processElements(TransformEvaluator<T> evaluator, MetricsContainer metricsContainer,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/**
* Returns the logical metric updates.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide an empty MetricUpdates instead of a null one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@bjchambers
Copy link
Contributor Author

PTAL, most of these are addressed. Note I have a follow-up commit that switches StepTransformResult.Builder to use AutoValue.Builder which would have made things a bit cleaner.

committedPhysical = combine(Arrays.asList(committedPhysical, finalCumulative));
uncommittedPhysical.remove(bundle);
synchronized (attemptedLock) {
finishedAttempted = combine(Arrays.asList(finishedAttempted, finalCumulative));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not your schmancy new overload?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed when I added it. Done.

*/
private void processElements(TransformEvaluator<T> evaluator, MetricsContainer metricsContainer,
private void processElements(
TransformEvaluator<T> evaluator, MetricsContainer metricsContainer,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This formatting is still inconsistent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

commitBundles(result.getOutputBundles());
if (result.getLogicalMetricUpdates() != null) {
if (!result.getLogicalMetricUpdates().isEmpty()) {
metrics.commitLogical(completedBundle, result.getLogicalMetricUpdates());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on why we still need/want the if statement. My understanding is that we want it because otherwise we'd spin for no reason if we have contention, but we'd still be correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of it. Not necessary here (we only lock on a per-metric update, which don't exist if its empty). The motivation was to be closer to what other runners will do -- eg., not bother sending updates if they're empty. But, that isn't necessary within the direct runner, especially not once we get heere.

@bjchambers
Copy link
Contributor Author

Pushed update

Copy link
Member

@tgroh tgroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, less a few documentation/comment/messaging things

*/
class DirectMetrics extends MetricResults {

private static final ExecutorService COUNTER_COMMITTER = Executors.newCachedThreadPool();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have some shared ThreadPool provided by the runner for internal maintenance.

Filed https://issues.apache.org/jira/browse/BEAM-723. Please add a comment linking to that JIRA.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

private boolean matches(MetricsFilter filter, MetricKey key) {
return matchesName(key.metricName(), filter.names())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like the inverted responsibility for what I would expect MetricsFilter to have (especially when compared to the Filter#by() PTransform.)

MetricsFilter provides the names and steps to allow a runner to do better filtering without executing the user code, right? A reasoning comment to why it's just a value object rather than something with real behavior might be valuable here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added:

  // Matching logic is implemented here rather than in MetricsFilter because we would like
  // MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it with
  // a Proto/JSON/etc. schema object.

StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()
.withLogicalMetricUpdates(
MetricUpdates.create(
Copy link
Member

@tgroh tgroh Oct 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the empty update? Here and below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, don't even need to set the metric updates anymore. Done.


@Override
public MetricResults metrics() {
throw new UnsupportedOperationException("The Flink Runner does not yet support metrics.");
Copy link
Member

@tgroh tgroh Oct 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make your wording consistent.

I like "The %sRunner does not currently support metrics", but it's up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* <p>The runner should call {@link #unsetMetricsContainer} (or {@link #setMetricsContainer} back to
* the previous value) when exiting code that set the metrics container.
*/
public class MetricsEnvironment {
Copy link
Member

@tgroh tgroh Oct 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should include a comment that users should never interact with this directly. Potentially this could be in a similar kind of state as util, where we don't generate any documentation for the runner-based utilities that have to exist in the SDK for visibility reasons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@bjchambers
Copy link
Contributor Author

It looks like the travis failure is spurious. I'm going to rebase and squash to kick off a new run.

@bjchambers
Copy link
Contributor Author

R: @kennknowles

@kennknowles
Copy link
Member

Still reviewing. I also want to wait and give @aljoscha a chance to comment on the API proposal thread.

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments. My biggest worry is really that we have yet another way for the worker/evaluation context to pass capabilities to e.g. a DoFn, this time not via params but via a dynamically-scoped binding that appears to the user essentially as a global variable. I do like that you've separated the MetricsContainer so it is first class, so the global vars in MetricsEnvironment track only the binding, but still this sort of thing can come back to bite.


protected abstract ResultT extract(UpdateT data);

public void updatePhysical(CommittedBundle<?> bundle, UpdateT tentativeCumulative) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love some javadoc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. But also private class.

inflightAttempted.put(bundle, tentativeCumulative);
}

public void commitPhysical(final CommittedBundle<?> bundle, final UpdateT finalCumulative) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. But also private class.

return extract(combine(updates));
}

public void commitLogical(final CommittedBundle<?> bundle, final UpdateT finalCumulative) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

} while (!finishedCommitted.compareAndSet(current, combine(current, finalCumulative)));
}

/** Extract the from all successfully committed bundles. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Extract the from all"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public abstract static class DirectMetric<UpdateT, ResultT> {
private AtomicReference<UpdateT> finishedCommitted;

private final Object attemptedLock = new Object();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document what the lock is protecting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

new ThreadLocal<MetricsContainer>();

/** Set the {@link MetricsContainer} for the current thread. */
public static void setMetricsContainer(MetricsContainer container) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entering and exiting dynamically-scoped contexts like this should be done via an explicit environment creation/destruction in a try-with-resources block.

}

/** Increment the counter by the given amount. */
public void inc(long n) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna vote for dropwizard here.

return counters.getOrCreate(metricName);
}

public DistributionCell getOrCreateDistribution(MetricName metricName) {
Copy link
Member

@kennknowles kennknowles Oct 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's phase out getOrCreate as a phrase; it was a mistake that we introduced it elsewhere. Just call it get; if something has to be lazily initialized, that should be behind the abstraction boundary. In this particular case, the distribution can be platonically said to "exist" whether or not there is an object backing it, yea?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* Return the {@link CounterCell} that should be used for implementing the given
* {@code metricName} in this container.
*/
public CounterCell getOrCreateCounter(MetricName metricName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CounterCell and DistributionCell are package-private. So this is a public method of a public class returning a type that cannot be accessed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made them public. I think there are cases where they'll be used directly.

import org.apache.beam.sdk.annotations.Experimental.Kind;

/**
* Interface for reporting metric updates of type {@code T} from inside worker harness.
Copy link
Member

@kennknowles kennknowles Oct 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So MetricCell is what the worker harness uses to update its internal state - it just needs getUpdateIfDirty and getCumulative, since it can never write to the metric. It would be nice to explicitly factor this out.

Then, on the MetricsContainer what is actually exposed is the DistributionCell types, etc, where the useful methods are actually the additional stuff not contained here.

Can the ownership of the dirty bit be more clearly separately from the ownership of the value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can easily do that -- updating the value and the dirty bit needs to done atomically (or in a disciplined manner).

@kennknowles
Copy link
Member

LGTM. I'm ready to get this in and move forwards with it. I'm sure we'll learn more as we go.

bchambers added 3 commits October 13, 2016 15:27
This includes a simple Counter metric and a Distribution metric that
reports the SUM, COUNT, MIN, MAX and MEAN of the reported values.

The API is labeled @experimental since metrics will only be reported
and queryable with the DirectRunner, and the API may change as it is
implemented on other runners.
All runners currently implement this by throwing an
UnsupportedOperationException.
@asfgit asfgit closed this in 3c73170 Oct 13, 2016
/**
* Return the {@link MetricUpdates} representing the cumulative values of all metrics in this
* container.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bjchambers I'm confused. It seems like counterUpdates is never used at all after being created and populated, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... It looks like this is dead code. It was refactored into the extractCumulatives(counters) and extractCumulatives(distributions).

@bjchambers bjchambers deleted the metrics branch November 21, 2016 21:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants