From f4119f489a6a7bf7f7347f6842800fc6a6dc5b10 Mon Sep 17 00:00:00 2001 From: bchambers Date: Wed, 12 Oct 2016 10:29:50 -0700 Subject: [PATCH 1/3] Initial Metrics API for Beam Java This includes a simple Counter metric and a Distribution metric that reports the SUM, COUNT, MIN, MAX and MEAN of the reported values. The API is labeled @Experimental since metrics will only be reported and queryable with the DirectRunner, and the API may change as it is implemented on other runners. --- .../beam/sdk/annotations/Experimental.java | 3 + .../org/apache/beam/sdk/metrics/Counter.java | 40 +++++ .../apache/beam/sdk/metrics/CounterCell.java | 76 +++++++++ .../apache/beam/sdk/metrics/DirtyState.java | 98 ++++++++++++ .../apache/beam/sdk/metrics/Distribution.java | 30 ++++ .../beam/sdk/metrics/DistributionCell.java | 58 +++++++ .../beam/sdk/metrics/DistributionData.java | 59 +++++++ .../beam/sdk/metrics/DistributionResult.java | 42 +++++ .../org/apache/beam/sdk/metrics/Metric.java | 24 +++ .../apache/beam/sdk/metrics/MetricCell.java | 47 ++++++ .../apache/beam/sdk/metrics/MetricKey.java | 40 +++++ .../apache/beam/sdk/metrics/MetricName.java | 46 ++++++ .../beam/sdk/metrics/MetricNameFilter.java | 60 +++++++ .../beam/sdk/metrics/MetricQueryResults.java | 33 ++++ .../apache/beam/sdk/metrics/MetricResult.java | 45 ++++++ .../beam/sdk/metrics/MetricResults.java | 34 ++++ .../beam/sdk/metrics/MetricUpdates.java | 72 +++++++++ .../org/apache/beam/sdk/metrics/Metrics.java | 110 +++++++++++++ .../beam/sdk/metrics/MetricsContainer.java | 150 ++++++++++++++++++ .../beam/sdk/metrics/MetricsEnvironment.java | 85 ++++++++++ .../beam/sdk/metrics/MetricsFilter.java | 86 ++++++++++ .../apache/beam/sdk/metrics/MetricsMap.java | 86 ++++++++++ .../apache/beam/sdk/metrics/package-info.java | 28 ++++ .../beam/sdk/metrics/CounterCellTest.java | 55 +++++++ .../beam/sdk/metrics/DirtyStateTest.java | 56 +++++++ .../sdk/metrics/DistributionCellTest.java | 53 +++++++ .../beam/sdk/metrics/MetricMatchers.java | 99 ++++++++++++ .../sdk/metrics/MetricsContainerTest.java | 129 +++++++++++++++ .../sdk/metrics/MetricsEnvironmentTest.java | 63 ++++++++ .../beam/sdk/metrics/MetricsMapTest.java | 103 ++++++++++++ .../apache/beam/sdk/metrics/MetricsTest.java | 98 ++++++++++++ 31 files changed, 2008 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java index 789f4b2e5828..14d2358eccf3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java @@ -83,5 +83,8 @@ public enum Kind { * Do not use: API is unstable and runner support is incomplete. */ SPLITTABLE_DO_FN, + + /** Metrics-related experimental APIs. */ + METRICS } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java new file mode 100644 index 000000000000..9f480168e812 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * A metric that reports a single long value and can be incremented or decremented. + */ +@Experimental(Kind.METRICS) +public interface Counter extends Metric { + + /** Increment the counter. */ + void inc(); + + /** Increment the counter by the given amount. */ + void inc(long n); + + /* Decrement the counter. */ + void dec(); + + /* Decrement the counter by the given amount. */ + void dec(long n); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java new file mode 100644 index 000000000000..bb6583329a80 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Tracks the current value (and delta) for a Counter metric for a specific context and bundle. + * + *

This class generally shouldn't be used directly. The only exception is within a runner where + * a counter is being reported for a specific step (rather than the counter in the current context). + */ +@Experimental(Kind.METRICS) +class CounterCell implements MetricCell, Counter { + + private final DirtyState dirty = new DirtyState(); + private final AtomicLong value = new AtomicLong(); + + /** Increment the counter by the given amount. */ + private void add(long n) { + value.addAndGet(n); + dirty.afterModification(); + } + + @Override + public DirtyState getDirty() { + return dirty; + } + + @Override + public Long getCumulative() { + return value.get(); + } + + @Override + public Counter getInterface() { + return this; + } + + @Override + public void inc() { + add(1); + } + + @Override + public void inc(long n) { + add(n); + } + + @Override + public void dec() { + add(-1); + } + + @Override + public void dec(long n) { + add(-n); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java new file mode 100644 index 000000000000..6706be8ef9ca --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Atomically tracks the dirty-state of a metric. + * + *

Reporting an update is split into two parts such that only changes made before the call to + * {@link #beforeCommit()} are committed when {@link #afterCommit()} is invoked. This allows for + * a two-step commit process of gathering all the dirty updates (calling {#link beforeCommit()}) + * followed by committing and calling {#link afterCommit()}. + * + *

The tracking of dirty states is done conservatively -- sometimes {@link #beforeCommit()} + * will return true (indicating a dirty metric) even if there have been no changes since the last + * commit. + * + *

There is also a possible race when the underlying metric is modified but the call to + * {@link #afterModification()} hasn't happened before the call to {@link #beforeCommit()}. In this + * case the next round of metric updating will see the changes. If this was for the final commit, + * then the metric updates shouldn't be extracted until all possible user modifications have + * completed. + */ +@Experimental(Kind.METRICS) +class DirtyState { + private enum State { + /** Indicates that there have been changes to the MetricCell since last commit. */ + DIRTY, + /** Indicates that there have been no changes to the MetricCell since last commit. */ + CLEAN, + /** Indicates that a commit of the current value is in progress. */ + COMMITTING + } + + private final AtomicReference dirty = new AtomicReference<>(State.DIRTY); + + /** + * Indicate that changes have been made to the metric being tracked by this {@link DirtyState}. + * + *

Should be called after modification of the value. + */ + public void afterModification() { + dirty.set(State.DIRTY); + } + + /** + * Check the dirty state and mark the metric as committing. + * + *

If the state was {@code CLEAN}, this returns {@code false}. If the state was {@code DIRTY} + * or {@code COMMITTING} this returns {@code true} and sets the state to {@code COMMITTING}. + * + * @return {@code false} if the state is clean and {@code true} otherwise. + */ + public boolean beforeCommit() { + // After this loop, we want the state to be either CLEAN or COMMITTING. + // If the state was CLEAN, we don't need to do anything (and exit the loop early) + // If the state was DIRTY, we will attempt to do a CAS(DIRTY, COMMITTING). This will only + // fail if another thread is getting updates which generally shouldn't be the case. + // If the state was COMMITTING, we will attempt to do a CAS(COMMITTING, COMMITTING). This will + // fail if another thread commits updates (which shouldn't be the case) or if the user code + // updates the metric, in which case it will transition to DIRTY and the next iteration will + // successfully update it. + State state; + do { + state = dirty.get(); + } while (state != State.CLEAN && !dirty.compareAndSet(state, State.COMMITTING)); + + return state != State.CLEAN; + } + + /** + * Mark any changes up to the most recently call to {@link #beforeCommit()}} as committed. + * The next call to {@link #beforeCommit()} will return {@code false} unless there have + * been changes made since the previous call to {@link #beforeCommit()}. + */ + public void afterCommit() { + dirty.compareAndSet(State.COMMITTING, State.CLEAN); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java new file mode 100644 index 000000000000..b789020a5674 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * A metric that reports information about the distribution of reported values. + */ +@Experimental(Kind.METRICS) +public interface Distribution extends Metric { + /** Add an observation to this distribution. */ + void update(long value); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java new file mode 100644 index 000000000000..f0074a943701 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Tracks the current value (and delta) for a Distribution metric. + */ +@Experimental(Kind.METRICS) +class DistributionCell implements MetricCell, Distribution { + + private final DirtyState dirty = new DirtyState(); + private final AtomicReference value = + new AtomicReference(DistributionData.EMPTY); + + /** Increment the counter by the given amount. */ + @Override + public void update(long n) { + DistributionData original; + do { + original = value.get(); + } while (!value.compareAndSet(original, original.combine(DistributionData.singleton(n)))); + dirty.afterModification(); + } + + @Override + public DirtyState getDirty() { + return dirty; + } + + @Override + public DistributionData getCumulative() { + return value.get(); + } + + @Override + public Distribution getInterface() { + return this; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java new file mode 100644 index 000000000000..59c7fbd7e096 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; + +/** + * Data describing the the distribution. This should retain enough detail that it can be combined + * with other {@link DistributionData}. + * + *

This is kept distinct from {@link DistributionResult} since this may be extended to include + * data necessary to approximate quantiles, etc. while {@link DistributionResult} would just include + * the approximate value of those quantiles. + */ +@AutoValue +public abstract class DistributionData { + + public abstract long sum(); + public abstract long count(); + public abstract long min(); + public abstract long max(); + + public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE); + + public static DistributionData create(long sum, long count, long min, long max) { + return new AutoValue_DistributionData(sum, count, min, max); + } + + public static DistributionData singleton(long value) { + return create(value, 1, value, value); + } + + public DistributionData combine(DistributionData value) { + return create( + sum() + value.sum(), + count() + value.count(), + Math.min(value.min(), min()), + Math.max(value.max(), max())); + } + + public DistributionResult extractResult() { + return DistributionResult.create(sum(), count(), min(), max()); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java new file mode 100644 index 000000000000..27c242c09912 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; + +/** + * The result of a {@link Distribution} metric. + */ +@AutoValue +public abstract class DistributionResult { + + public abstract long sum(); + public abstract long count(); + public abstract long min(); + public abstract long max(); + + public double mean() { + return (1.0 * sum()) / count(); + } + + public static final DistributionResult ZERO = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE); + + public static DistributionResult create(long sum, long count, long min, long max) { + return new AutoValue_DistributionResult(sum, count, min, max); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java new file mode 100644 index 000000000000..37a5f65d2394 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +/** + * Marker interface for all user-facing metrics. + */ +public interface Metric { } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java new file mode 100644 index 000000000000..211b2dd691ca --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * A {@link MetricCell} is used for accumulating in-memory changes to a metric. It represents a + * specific metric name in a single context. + * + * @param The type of the user interface for reporting changes to this cell. + * @param The type of metric data stored (and extracted) from this cell. + */ +@Experimental(Kind.METRICS) +interface MetricCell { + + /** + * Return the {@link DirtyState} tracking whether this metric cell contains uncommitted changes. + */ + DirtyState getDirty(); + + /** + * Return the cumulative value of this metric. + */ + DataT getCumulative(); + + /** + * Return the user-facing mutator for this cell. + */ + UserT getInterface(); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java new file mode 100644 index 000000000000..bfa4df5e4111 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Metrics are keyed by the step name they are associated with and the name of the metric. + */ +@Experimental(Kind.METRICS) +@AutoValue +public abstract class MetricKey { + + /** The step name that is associated with this metric. */ + public abstract String stepName(); + + /** The name of the metric. */ + public abstract MetricName metricName(); + + public static MetricKey create(String stepName, MetricName metricName) { + return new AutoValue_MetricKey(stepName, metricName); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java new file mode 100644 index 000000000000..843a88575012 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * The name of a metric consists of a {@link #namespace} and a {@link #name}. The {@link #namespace} + * allows grouping related metrics together and also prevents collisions between multiple metrics + * with the same name. + */ +@Experimental(Kind.METRICS) +@AutoValue +public abstract class MetricName { + + /** The namespace associated with this metric. */ + public abstract String namespace(); + + /** The name of this metric. */ + public abstract String name(); + + public static MetricName named(String namespace, String name) { + return new AutoValue_MetricName(namespace, name); + } + + public static MetricName named(Class namespace, String name) { + return new AutoValue_MetricName(namespace.getName(), name); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java new file mode 100644 index 000000000000..a2c379822dde --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * The name of a metric. + */ +@Experimental(Kind.METRICS) +@AutoValue +public abstract class MetricNameFilter { + + /** The inNamespace that a metric must be in to match this {@link MetricNameFilter}. */ + public abstract String getNamespace(); + + /** If set, the metric must have this name to match this {@link MetricNameFilter}. */ + @Nullable + public abstract String getName(); + + public static MetricNameFilter inNamespace(String namespace) { + return new AutoValue_MetricNameFilter(namespace, null); + } + + public static MetricNameFilter inNamespace(Class namespace) { + return new AutoValue_MetricNameFilter(namespace.getName(), null); + } + + public static MetricNameFilter named(String namespace, String name) { + checkNotNull(name, "Must specify a name"); + return new AutoValue_MetricNameFilter(namespace, name); + } + + public static MetricNameFilter named(Class namespace, String name) { + checkNotNull(namespace, "Must specify a inNamespace"); + checkNotNull(name, "Must specify a name"); + return new AutoValue_MetricNameFilter(namespace.getSimpleName(), name); + } +} + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java new file mode 100644 index 000000000000..2241ba8a0a51 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * The results of a query for metrics. Allows accessing all of the metrics that matched the filter. + */ +@Experimental(Kind.METRICS) +public interface MetricQueryResults { + /** Return the metric results for the counters that matched the filter. */ + Iterable> counters(); + + /** Return the metric results for the distributions that matched the filter. */ + Iterable> distributions(); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java new file mode 100644 index 000000000000..9a3971a931f1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * The results of a single current metric. + */ +@Experimental(Kind.METRICS) +public interface MetricResult { + /** Return the name of the metric. */ + MetricName name(); + /** Return the step context to which this metric result applies. */ + String step(); + + /** + * Return the value of this metric across all successfully completed parts of the pipeline. + * + *

Not all runners will support committed metrics. If they are not supported, the runner will + * throw an {@link UnsupportedOperationException}. + */ + T committed(); + + /** + * Return the value of this metric across all attempts of executing all parts of the pipeline. + */ + T attempted(); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java new file mode 100644 index 000000000000..dab65eaca4e3 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Methods for interacting with the metrics of a pipeline that has been executed. Accessed via + * {@link PipelineResult#metrics()}. + */ +@Experimental(Kind.METRICS) +public abstract class MetricResults { + /** + * Query for all metrics that match the filter. + */ + public abstract MetricQueryResults queryMetrics(MetricsFilter filter); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java new file mode 100644 index 000000000000..e84dc6635fc1 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.Iterables; +import java.util.Collections; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Representation of multiple metric updates. + */ +@Experimental(Kind.METRICS) +@AutoValue +public abstract class MetricUpdates { + + public static final MetricUpdates EMPTY = MetricUpdates.create( + Collections.>emptyList(), + Collections.>emptyList()); + + /** + * Representation of a single metric update. + * @param The type of value representing the update. + */ + @AutoValue + public abstract static class MetricUpdate { + + /** The key being updated. */ + public abstract MetricKey getKey(); + /** The value of the update. */ + public abstract T getUpdate(); + + public static MetricUpdate create(MetricKey key, T update) { + return new AutoValue_MetricUpdates_MetricUpdate(key, update); + } + } + + /** Returns true if there are no updates in this MetricUpdates object. */ + public boolean isEmpty() { + return Iterables.isEmpty(counterUpdates()) + && Iterables.isEmpty(distributionUpdates()); + } + + /** All of the counter updates. */ + public abstract Iterable> counterUpdates(); + + /** All of the distribution updates. */ + public abstract Iterable> distributionUpdates(); + + /** Create a new {@link MetricUpdates} bundle. */ + public static MetricUpdates create( + Iterable> counterUpdates, + Iterable> distributionUpdates) { + return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java new file mode 100644 index 000000000000..b72a0b244a17 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * The Metrics is a utility class for producing various kinds of metrics for + * reporting properties of an executing pipeline. + */ +@Experimental(Kind.METRICS) +public class Metrics { + + private Metrics() {} + + /** + * Create a metric that can be incremented and decremented, and is aggregated by taking the sum. + */ + public static Counter counter(String namespace, String name) { + return new DelegatingCounter(MetricName.named(namespace, name)); + } + + /** + * Create a metric that can be incremented and decremented, and is aggregated by taking the sum. + */ + public static Counter counter(Class namespace, String name) { + return new DelegatingCounter(MetricName.named(namespace, name)); + } + + /** + * Create a metric that records various statistics about the distribution of reported values. + */ + public static Distribution distribution(String namespace, String name) { + return new DelegatingDistribution(MetricName.named(namespace, name)); + } + + /** + * Create a metric that records various statistics about the distribution of reported values. + */ + public static Distribution distribution(Class namespace, String name) { + return new DelegatingDistribution(MetricName.named(namespace, name)); + } + + /** Implementation of {@link Counter} that delegates to the instance for the current context. */ + private static class DelegatingCounter implements Counter { + private final MetricName name; + + private DelegatingCounter(MetricName name) { + this.name = name; + } + + /** Increment the counter. */ + @Override public void inc() { + inc(1); + } + + /** Increment the counter by the given amount. */ + @Override public void inc(long n) { + MetricsContainer container = MetricsEnvironment.getCurrentContainer(); + if (container != null) { + container.getCounter(name).inc(n); + } + } + + /* Decrement the counter. */ + @Override public void dec() { + inc(-1); + } + + /* Decrement the counter by the given amount. */ + @Override public void dec(long n) { + inc(-1 * n); + } + } + + /** + * Implementation of {@link Distribution} that delegates to the instance for the current context. + */ + private static class DelegatingDistribution implements Distribution { + private final MetricName name; + + private DelegatingDistribution(MetricName name) { + this.name = name; + } + + @Override + public void update(long value) { + MetricsContainer container = MetricsEnvironment.getCurrentContainer(); + if (container != null) { + container.getDistribution(name).update(value); + } + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java new file mode 100644 index 000000000000..10032a29d1b4 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableList; +import java.util.Map; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; + +/** + * Holds the metrics for a single step and unit-of-commit (bundle). + * + *

This class is thread-safe. It is intended to be used with 1 (or more) threads are updating + * metrics and at-most 1 thread is extracting updates by calling {@link #getUpdates} and + * {@link #commitUpdates}. Outside of this it is still safe. Although races in the update extraction + * may cause updates that don't actually have any changes, it will never lose an update. + * + *

For consistency, all threads that update metrics should finish before getting the final + * cumulative values/updates. + */ +@Experimental(Kind.METRICS) +public class MetricsContainer { + + private final String stepName; + + private MetricsMap counters = + new MetricsMap<>(new MetricsMap.Factory() { + @Override + public CounterCell createInstance(MetricName unusedKey) { + return new CounterCell(); + } + }); + + private MetricsMap distributions = + new MetricsMap<>(new MetricsMap.Factory() { + @Override + public DistributionCell createInstance(MetricName unusedKey) { + return new DistributionCell(); + } + }); + + /** + * Create a new {@link MetricsContainer} associated with the given {@code stepName}. + */ + public MetricsContainer(String stepName) { + this.stepName = stepName; + } + + /** + * Return the {@link CounterCell} that should be used for implementing the given + * {@code metricName} in this container. + */ + public CounterCell getCounter(MetricName metricName) { + return counters.get(metricName); + } + + public DistributionCell getDistribution(MetricName metricName) { + return distributions.get(metricName); + } + + private > + ImmutableList> extractUpdates( + MetricsMap cells) { + ImmutableList.Builder> updates = ImmutableList.builder(); + for (Map.Entry cell : cells.entries()) { + if (cell.getValue().getDirty().beforeCommit()) { + updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()), + cell.getValue().getCumulative())); + } + } + return updates.build(); + } + + /** + * Return the cumulative values for any metrics that have changed since the last time updates were + * committed. + */ + public MetricUpdates getUpdates() { + return MetricUpdates.create( + extractUpdates(counters), + extractUpdates(distributions)); + } + + private void commitUpdates(MetricsMap> cells) { + for (MetricCell cell : cells.values()) { + cell.getDirty().afterCommit(); + } + } + + /** + * Mark all of the updates that were retrieved with the latest call to {@link #getUpdates()} as + * committed. + */ + public void commitUpdates() { + commitUpdates(counters); + commitUpdates(distributions); + } + + private > + ImmutableList> extractCumulatives( + MetricsMap cells) { + ImmutableList.Builder> updates = ImmutableList.builder(); + for (Map.Entry cell : cells.entries()) { + UpdateT update = checkNotNull(cell.getValue().getCumulative()); + updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()), update)); + } + return updates.build(); + } + + /** + * Return the {@link MetricUpdates} representing the cumulative values of all metrics in this + * container. + */ + public MetricUpdates getCumulative() { + ImmutableList.Builder> counterUpdates = ImmutableList.builder(); + for (Map.Entry counter : counters.entries()) { + counterUpdates.add(MetricUpdate.create( + MetricKey.create(stepName, counter.getKey()), counter.getValue().getCumulative())); + } + + ImmutableList.Builder> distributionUpdates = + ImmutableList.builder(); + for (Map.Entry distribution : distributions.entries()) { + distributionUpdates.add(MetricUpdate.create( + MetricKey.create(stepName, distribution.getKey()), + distribution.getValue().getCumulative())); + } + return MetricUpdates.create( + extractCumulatives(counters), + extractCumulatives(distributions)); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java new file mode 100644 index 000000000000..ef2660a83994 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages and provides the metrics container associated with each thread. + * + *

Users should not interact directly with this class. Instead, use {@link Metrics} and the + * returned objects to create and modify metrics. + * + *

The runner should create {@link MetricsContainer} for each context in which metrics are + * reported (by step and name) and call {@link #setMetricsContainer} before invoking any code that + * may update metrics within that step. + * + *

The runner should call {@link #unsetMetricsContainer} (or {@link #setMetricsContainer} back to + * the previous value) when exiting code that set the metrics container. + */ +public class MetricsEnvironment { + + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsContainer.class); + + private static final AtomicBoolean METRICS_SUPPORTED = new AtomicBoolean(false); + private static final AtomicBoolean REPORTED_MISSING_CONTAINER = new AtomicBoolean(false); + + private static final ThreadLocal CONTAINER_FOR_THREAD = + new ThreadLocal(); + + /** Set the {@link MetricsContainer} for the current thread. */ + public static void setMetricsContainer(MetricsContainer container) { + CONTAINER_FOR_THREAD.set(container); + } + + + /** Clear the {@link MetricsContainer} for the current thread. */ + public static void unsetMetricsContainer() { + CONTAINER_FOR_THREAD.remove(); + } + + /** Called by the run to indicate whether metrics reporting is supported. */ + public static void setMetricsSupported(boolean supported) { + METRICS_SUPPORTED.set(supported); + } + + /** + * Return the {@link MetricsContainer} for the current thread. + * + *

May return null if metrics are not supported by the current runner or if the current thread + * is not a work-execution thread. The first time this happens in a given thread it will log a + * diagnostic message. + */ + @Nullable + public static MetricsContainer getCurrentContainer() { + MetricsContainer container = CONTAINER_FOR_THREAD.get(); + if (container == null && REPORTED_MISSING_CONTAINER.compareAndSet(false, true)) { + if (METRICS_SUPPORTED.get()) { + LOGGER.error( + "Unable to update metrics on the current thread. " + + "Most likely caused by using metrics outside the managed work-execution thread."); + } else { + LOGGER.warn("Reporting metrics are not supported in the current execution environment."); + } + } + return container; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java new file mode 100644 index 000000000000..ec812513210d --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Simple POJO representing a filter for querying metrics. + */ +@Experimental(Kind.METRICS) +@AutoValue +public abstract class MetricsFilter { + + public Set steps() { + return immutableSteps(); + } + + public Set names() { + return immutableNames(); + } + + protected abstract ImmutableSet immutableSteps(); + protected abstract ImmutableSet immutableNames(); + + public static Builder builder() { + return new AutoValue_MetricsFilter.Builder(); + } + + /** + * Builder for creating a {@link MetricsFilter}. + */ + @AutoValue.Builder + public abstract static class Builder { + + protected abstract ImmutableSet.Builder immutableNamesBuilder(); + protected abstract ImmutableSet.Builder immutableStepsBuilder(); + + /** + * Add a {@link MetricNameFilter}. + * + *

If no name filters are specified then all metric names will be inculded. + * + * + *

If one or more name filters are specified, then only metrics that match one or more of the + * filters will be included. + */ + public Builder addNameFilter(MetricNameFilter nameFilter) { + immutableNamesBuilder().add(nameFilter); + return this; + } + + /** + * Add a step filter. + * + *

If no steps are specified then metrics will be included for all steps. + * + *

If one or more steps are specified, then metrics will be included if they are part of + * any of the specified steps. + */ + public Builder addStep(String step) { + immutableStepsBuilder().add(step); + return this; + } + + public abstract MetricsFilter build(); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java new file mode 100644 index 000000000000..5a02106f160b --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.Iterables; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * A map from {@code K} to {@code T} that supports getting or creating values associated with a key + * in a thread-safe manner. + */ +@Experimental(Kind.METRICS) +public class MetricsMap { + + /** Interface for creating instances to populate the {@link MetricsMap}. */ + public interface Factory { + /** + * Create an instance of {@code T} to use with the given {@code key}. + * + *

It must be safe to call this from multiple threads. + */ + T createInstance(K key); + } + + private final Factory factory; + private final ConcurrentMap metrics = new ConcurrentHashMap<>(); + + public MetricsMap(Factory factory) { + this.factory = factory; + } + + /** + * Get or create the value associated with the given key. + */ + public T get(K key) { + T metric = metrics.get(key); + if (metric == null) { + metric = factory.createInstance(key); + metric = MoreObjects.firstNonNull(metrics.putIfAbsent(key, metric), metric); + } + return metric; + } + + /** + * Get the value associated with the given key, if it exists. + */ + @Nullable + public T tryGet(K key) { + return metrics.get(key); + } + + /** + * Return an iterable over the entries in the current {@link MetricsMap}. + */ + public Iterable> entries() { + return Iterables.unmodifiableIterable(metrics.entrySet()); + } + + /** + * Return an iterable over the values in the current {@link MetricsMap}. + */ + public Iterable values() { + return Iterables.unmodifiableIterable(metrics.values()); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java new file mode 100644 index 000000000000..f71dc7a75136 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Metrics allow exporting information about the execution of a pipeline. + * They are intended to be used for monitoring and understanding the + * execution. + * + *

Metrics may also be queried from the {@link org.apache.beam.sdk.PipelineResult} object. + * + *

Runners should look at {@link org.apache.beam.sdk.metrics.MetricsContainer} for details on + * how to support metrics. + */ +package org.apache.beam.sdk.metrics; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java new file mode 100644 index 000000000000..408f14544aeb --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link CounterCell}. + */ +@RunWith(JUnit4.class) +public class CounterCellTest { + + private CounterCell cell = new CounterCell(); + + @Test + public void testDeltaAndCumulative() { + cell.inc(5); + cell.inc(7); + assertThat(cell.getCumulative(), equalTo(12L)); + assertThat("getCumulative is idempotent", cell.getCumulative(), equalTo(12L)); + + assertThat(cell.getDirty().beforeCommit(), equalTo(true)); + cell.getDirty().afterCommit(); + assertThat(cell.getDirty().beforeCommit(), equalTo(false)); + assertThat(cell.getCumulative(), equalTo(12L)); + + cell.inc(30); + assertThat(cell.getCumulative(), equalTo(42L)); + + assertThat(cell.getDirty().beforeCommit(), equalTo(true)); + cell.getDirty().afterCommit(); + assertThat(cell.getDirty().beforeCommit(), equalTo(false)); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java new file mode 100644 index 000000000000..d00f8cd8c435 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link DirtyStateTest}. + */ +@RunWith(JUnit4.class) +public class DirtyStateTest { + + private final DirtyState dirty = new DirtyState(); + + @Test + public void basicPath() { + assertThat("Should start dirty", dirty.beforeCommit(), is(true)); + dirty.afterCommit(); + assertThat("Should be clean after commit", dirty.beforeCommit(), is(false)); + + dirty.afterModification(); + assertThat("Should be dirty after change", dirty.beforeCommit(), is(true)); + dirty.afterCommit(); + assertThat("Should be clean after commit", dirty.beforeCommit(), is(false)); + } + + @Test + public void changeAfterBeforeCommit() { + assertThat("Should start dirty", dirty.beforeCommit(), is(true)); + dirty.afterModification(); + dirty.afterCommit(); + assertThat("Changes after beforeCommit should be dirty after afterCommit", + dirty.beforeCommit(), is(true)); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java new file mode 100644 index 000000000000..07e0b266e0d9 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link DistributionCell}. + */ +@RunWith(JUnit4.class) +public class DistributionCellTest { + private DistributionCell cell = new DistributionCell(); + + @Test + public void testDeltaAndCumulative() { + cell.update(5); + cell.update(7); + assertThat(cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7))); + assertThat("getCumulative is idempotent", + cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7))); + + assertThat(cell.getDirty().beforeCommit(), equalTo(true)); + cell.getDirty().afterCommit(); + assertThat(cell.getDirty().beforeCommit(), equalTo(false)); + + cell.update(30); + assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 5, 30))); + + assertThat("Adding a new value made the cell dirty", + cell.getDirty().beforeCommit(), equalTo(true)); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java new file mode 100644 index 000000000000..bdcb94f5eadc --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import java.util.Objects; +import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Matchers for metrics. + */ +public class MetricMatchers { + + public static Matcher> metricUpdate(final String name, final T update) { + return new TypeSafeMatcher>() { + @Override + protected boolean matchesSafely(MetricUpdate item) { + return Objects.equals(name, item.getKey().metricName().name()) + && Objects.equals(update, item.getUpdate()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("MetricUpdate{name=").appendValue(name) + .appendText(", update=").appendValue(update) + .appendText("}"); + } + }; + } + + public static Matcher> metricUpdate( + final String namespace, final String name, final String step, final T update) { + return new TypeSafeMatcher>() { + @Override + protected boolean matchesSafely(MetricUpdate item) { + return Objects.equals(namespace, item.getKey().metricName().namespace()) + && Objects.equals(name, item.getKey().metricName().name()) + && Objects.equals(step, item.getKey().stepName()) + && Objects.equals(update, item.getUpdate()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("MetricUpdate{inNamespace=").appendValue(namespace) + .appendText(", name=").appendValue(name) + .appendText(", step=").appendValue(step) + .appendText(", update=").appendValue(update) + .appendText("}"); + } + }; + } + + public static Matcher> metricResult( + final String namespace, final String name, final String step, + final T logical, final T physical) { + return new TypeSafeMatcher>() { + @Override + protected boolean matchesSafely(MetricResult item) { + return Objects.equals(namespace, item.name().namespace()) + && Objects.equals(name, item.name().name()) + && Objects.equals(step, item.step()) + && Objects.equals(logical, item.committed()) + && Objects.equals(physical, item.attempted()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("MetricResult{inNamespace=").appendValue(namespace) + .appendText(", name=").appendValue(name) + .appendText(", step=").appendValue(step) + .appendText(", logical=").appendValue(logical) + .appendText(", physical=").appendValue(physical) + .appendText("}"); + } + }; + } + +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java new file mode 100644 index 000000000000..58797ce2c505 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link MetricsContainer}. + */ +@RunWith(JUnit4.class) +public class MetricsContainerTest { + + @Test + public void testCounterDeltas() { + MetricsContainer container = new MetricsContainer("step1"); + CounterCell c1 = container.getCounter(MetricName.named("ns", "name1")); + CounterCell c2 = container.getCounter(MetricName.named("ns", "name2")); + assertThat("All counters should start out dirty", + container.getUpdates().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 0L), + metricUpdate("name2", 0L))); + container.commitUpdates(); + assertThat("After commit no counters should be dirty", + container.getUpdates().counterUpdates(), emptyIterable()); + + c1.inc(5L); + c2.inc(4L); + + assertThat(container.getUpdates().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 5L), + metricUpdate("name2", 4L))); + + assertThat("Since we haven't committed, updates are still included", + container.getUpdates().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 5L), + metricUpdate("name2", 4L))); + + container.commitUpdates(); + assertThat("After commit there are no updates", + container.getUpdates().counterUpdates(), emptyIterable()); + + c1.inc(8L); + assertThat(container.getUpdates().counterUpdates(), contains( + metricUpdate("name1", 13L))); + } + + @Test + public void testCounterCumulatives() { + MetricsContainer container = new MetricsContainer("step1"); + CounterCell c1 = container.getCounter(MetricName.named("ns", "name1")); + CounterCell c2 = container.getCounter(MetricName.named("ns", "name2")); + c1.inc(2L); + c2.inc(4L); + c1.inc(3L); + + container.getUpdates(); + container.commitUpdates(); + assertThat("Committing updates shouldn't affect cumulative counter values", + container.getCumulative().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 5L), + metricUpdate("name2", 4L))); + + c1.inc(8L); + assertThat(container.getCumulative().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 13L), + metricUpdate("name2", 4L))); + } + + @Test + public void testDistributionDeltas() { + MetricsContainer container = new MetricsContainer("step1"); + DistributionCell c1 = container.getDistribution(MetricName.named("ns", "name1")); + DistributionCell c2 = container.getDistribution(MetricName.named("ns", "name2")); + + assertThat("Initial update includes initial zero-values", + container.getUpdates().distributionUpdates(), containsInAnyOrder( + metricUpdate("name1", DistributionData.EMPTY), + metricUpdate("name2", DistributionData.EMPTY))); + + container.commitUpdates(); + assertThat("No updates after commit", + container.getUpdates().distributionUpdates(), emptyIterable()); + + c1.update(5L); + c2.update(4L); + + assertThat(container.getUpdates().distributionUpdates(), containsInAnyOrder( + metricUpdate("name1", DistributionData.create(5, 1, 5, 5)), + metricUpdate("name2", DistributionData.create(4, 1, 4, 4)))); + assertThat("Updates stay the same without commit", + container.getUpdates().distributionUpdates(), containsInAnyOrder( + metricUpdate("name1", DistributionData.create(5, 1, 5, 5)), + metricUpdate("name2", DistributionData.create(4, 1, 4, 4)))); + + container.commitUpdates(); + assertThat("No updatess after commit", + container.getUpdates().distributionUpdates(), emptyIterable()); + + c1.update(8L); + c1.update(4L); + assertThat(container.getUpdates().distributionUpdates(), contains( + metricUpdate("name1", DistributionData.create(17, 3, 4, 8)))); + container.commitUpdates(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java new file mode 100644 index 000000000000..4200a200fd38 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; + +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link MetricsEnvironment}. + */ +@RunWith(JUnit4.class) +public class MetricsEnvironmentTest { + @After + public void teardown() { + MetricsEnvironment.unsetMetricsContainer(); + } + + @Test + public void testUsesAppropriateMetricsContainer() { + Counter counter = Metrics.counter("ns", "name"); + MetricsContainer c1 = new MetricsContainer("step1"); + MetricsContainer c2 = new MetricsContainer("step2"); + + MetricsEnvironment.setMetricsContainer(c1); + counter.inc(); + MetricsEnvironment.setMetricsContainer(c2); + counter.dec(); + MetricsEnvironment.unsetMetricsContainer(); + + MetricUpdates updates1 = c1.getUpdates(); + MetricUpdates updates2 = c2.getUpdates(); + assertThat(updates1.counterUpdates(), contains(metricUpdate("ns", "name", "step1", 1L))); + assertThat(updates2.counterUpdates(), contains(metricUpdate("ns", "name", "step2", -1L))); + } + + @Test + public void testBehavesWithoutMetricsContainer() { + assertNull(MetricsEnvironment.getCurrentContainer()); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java new file mode 100644 index 000000000000..4104f8ddab62 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsMapTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertThat; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +/** + * Tests for {@link MetricsMap}. + */ +@RunWith(JUnit4.class) +public class MetricsMapTest { + + public MetricsMap metricsMap = + new MetricsMap<>(new MetricsMap.Factory() { + @Override + public AtomicLong createInstance(String unusedKey) { + return new AtomicLong(); + } + }); + + @Test + public void testCreateSeparateInstances() { + AtomicLong foo = metricsMap.get("foo"); + AtomicLong bar = metricsMap.get("bar"); + + assertThat(foo, not(sameInstance(bar))); + } + + @Test + public void testReuseInstances() { + AtomicLong foo1 = metricsMap.get("foo"); + AtomicLong foo2 = metricsMap.get("foo"); + + assertThat(foo1, sameInstance(foo2)); + } + + @Test + public void testGet() { + assertThat(metricsMap.tryGet("foo"), nullValue(AtomicLong.class)); + + AtomicLong foo = metricsMap.get("foo"); + assertThat(metricsMap.tryGet("foo"), sameInstance(foo)); + } + + @Test + public void testGetEntries() { + AtomicLong foo = metricsMap.get("foo"); + AtomicLong bar = metricsMap.get("bar"); + assertThat(metricsMap.entries(), containsInAnyOrder( + hasEntry("foo", foo), + hasEntry("bar", bar))); + } + + private static Matcher> hasEntry( + final String key, final AtomicLong value) { + return new TypeSafeMatcher>() { + + @Override + public void describeTo(Description description) { + description + .appendText("Map.Entry{key=").appendValue(key) + .appendText(", value=").appendValue(value) + .appendText("}"); + } + + @Override + protected boolean matchesSafely(Entry item) { + return Objects.equals(key, item.getKey()) + && Objects.equals(value, item.getValue()); + } + }; + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java new file mode 100644 index 000000000000..d11b44ddb19f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; + +import org.hamcrest.CoreMatchers; +import org.junit.After; +import org.junit.Test; + +/** + * Tests for {@link Metrics}. + */ +public class MetricsTest { + + private static final String NS = "test"; + private static final String NAME = "name"; + private static final MetricName METRIC_NAME = MetricName.named(NS, NAME); + + @After + public void tearDown() { + MetricsEnvironment.unsetMetricsContainer(); + } + + @Test + public void distributionWithoutContainer() { + assertNull(MetricsEnvironment.getCurrentContainer()); + // Should not fail even though there is no metrics container. + Metrics.distribution(NS, NAME).update(5L); + } + + @Test + public void counterWithoutContainer() { + assertNull(MetricsEnvironment.getCurrentContainer()); + // Should not fail even though there is no metrics container. + Counter counter = Metrics.counter(NS, NAME); + counter.inc(); + counter.inc(5L); + counter.dec(); + counter.dec(5L); + } + + @Test + public void distributionToCell() { + MetricsContainer container = new MetricsContainer("step"); + MetricsEnvironment.setMetricsContainer(container); + + Distribution distribution = Metrics.distribution(NS, NAME); + + distribution.update(5L); + + DistributionCell cell = container.getDistribution(METRIC_NAME); + assertThat(cell.getCumulative(), equalTo(DistributionData.create(5, 1, 5, 5))); + + distribution.update(36L); + assertThat(cell.getCumulative(), equalTo(DistributionData.create(41, 2, 5, 36))); + + distribution.update(1L); + assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 1, 36))); + } + + @Test + public void counterToCell() { + MetricsContainer container = new MetricsContainer("step"); + MetricsEnvironment.setMetricsContainer(container); + Counter counter = Metrics.counter(NS, NAME); + CounterCell cell = container.getCounter(METRIC_NAME); + counter.inc(); + assertThat(cell.getCumulative(), CoreMatchers.equalTo(1L)); + + counter.inc(47L); + assertThat(cell.getCumulative(), CoreMatchers.equalTo(48L)); + + counter.dec(5L); + assertThat(cell.getCumulative(), CoreMatchers.equalTo(43L)); + + counter.dec(); + assertThat(cell.getCumulative(), CoreMatchers.equalTo(42L)); + } +} From 366eff9f4e68f12340726755db0f73823ab4d0f4 Mon Sep 17 00:00:00 2001 From: bchambers Date: Wed, 12 Oct 2016 10:55:05 -0700 Subject: [PATCH 2/3] Add the ability to query metrics on PipelineResult All runners currently implement this by throwing an UnsupportedOperationException. --- .../org/apache/beam/runners/direct/DirectRunner.java | 7 +++++++ .../apache/beam/runners/flink/FlinkRunnerResult.java | 6 ++++++ .../beam/runners/dataflow/DataflowPipelineJob.java | 7 +++++++ .../runners/spark/translation/EvaluationContext.java | 6 ++++++ .../main/java/org/apache/beam/sdk/PipelineResult.java | 11 +++++++++++ 5 files changed, 37 insertions(+) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index a72f7ae10024..e13046d24acf 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Write; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.TestStream; @@ -380,6 +381,12 @@ public String toString() { }; } + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException( + "The DirectRunner does not currently support metrics."); + } + /** * Blocks until the {@link Pipeline} execution represented by this * {@link DirectPipelineResult} is complete, returning the terminal state. diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index 90bb64d31046..6b1548520374 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; import org.joda.time.Duration; @@ -86,4 +87,9 @@ public State waitUntilFinish() { public State waitUntilFinish(Duration duration) { throw new UnsupportedOperationException("FlinkRunnerResult does not support waitUntilFinish."); } + + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics."); + } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 269b824f6db0..bbcf11fcb55e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.util.FluentBackoff; import org.joda.time.Duration; @@ -426,6 +427,12 @@ public String toString() { } } + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException( + "The DataflowRunner does not currently support metrics."); + } + private Map fromMetricUpdates(Aggregator aggregator) throws IOException { if (aggregatorTransforms.contains(aggregator)) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 2397276ceceb..1944b6bb0fdb 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -262,6 +263,11 @@ public AggregatorValues getAggregatorValues(Aggregator aggregator) return runtime.getAggregatorValues(AccumulatorSingleton.getInstance(jsc), aggregator); } + @Override + public MetricResults metrics() { + throw new UnsupportedOperationException("The SparkRunner does not currently support metrics."); + } + @Override public Iterable get(PCollection pcollection) { @SuppressWarnings("unchecked") diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java index d9cdc16a3da4..d7774bbbac2c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java @@ -18,6 +18,9 @@ package org.apache.beam.sdk; import java.io.IOException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.transforms.Aggregator; import org.joda.time.Duration; @@ -127,4 +130,12 @@ public final boolean hasReplacementJob() { return hasReplacement; } } + + /** + * Return the object to access metrics from the pipeline. + * + * @throws UnsupportedOperationException if the runner doesn't support retrieving metrics. + */ + @Experimental(Kind.METRICS) + MetricResults metrics(); } From 369bd8866f864febc3b394289b7aa3203a545258 Mon Sep 17 00:00:00 2001 From: bchambers Date: Wed, 12 Oct 2016 10:55:53 -0700 Subject: [PATCH 3/3] Implement Metrics in the DirectRunner --- .../beam/runners/direct/DirectMetrics.java | 331 ++++++++++++++++++ .../beam/runners/direct/DirectRunner.java | 8 +- .../runners/direct/EvaluationContext.java | 10 + .../ExecutorServiceParallelExecutor.java | 1 + .../direct/ImmutableListBundleFactory.java | 10 + .../runners/direct/StepTransformResult.java | 49 ++- .../runners/direct/TransformExecutor.java | 35 +- .../beam/runners/direct/TransformResult.java | 12 + .../runners/direct/DirectMetricsTest.java | 133 +++++++ .../beam/runners/direct/DirectRunnerTest.java | 36 ++ .../runners/direct/TransformExecutorTest.java | 12 + 11 files changed, 602 insertions(+), 35 deletions(-) create mode 100644 runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java create mode 100644 runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java new file mode 100644 index 000000000000..a749a7615af7 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static java.util.Arrays.asList; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.metrics.DistributionData; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricUpdates; +import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.metrics.MetricsMap; + +/** + * Implementation of {@link MetricResults} for the Direct Runner. + */ +class DirectMetrics extends MetricResults { + + // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner. + private static final ExecutorService COUNTER_COMMITTER = Executors.newCachedThreadPool(); + + private interface MetricAggregation { + UpdateT zero(); + UpdateT combine(Iterable updates); + ResultT extract(UpdateT data); + } + + /** + * Implementation of a metric in the direct runner. + * + * @param The type of raw data received and aggregated across updates. + * @param The type of result extracted from the data. + */ + private static class DirectMetric { + private final MetricAggregation aggregation; + + private final AtomicReference finishedCommitted; + + private final Object attemptedLock = new Object(); + @GuardedBy("attemptedLock") + private volatile UpdateT finishedAttempted; + @GuardedBy("attemptedLock") + private final ConcurrentMap, UpdateT> inflightAttempted = + new ConcurrentHashMap<>(); + + public DirectMetric(MetricAggregation aggregation) { + this.aggregation = aggregation; + finishedCommitted = new AtomicReference<>(aggregation.zero()); + finishedAttempted = aggregation.zero(); + } + + /** + * Add the given {@code tentativeCumulative} update to the physical aggregate. + * + * @param bundle The bundle receiving an update. + * @param tentativeCumulative The new cumulative value for the given bundle. + */ + public void updatePhysical(CommittedBundle bundle, UpdateT tentativeCumulative) { + // Add (or update) the cumulatiev value for the given bundle. + inflightAttempted.put(bundle, tentativeCumulative); + } + + /** + * Commit a physical value for the given {@code bundle}. + * + * @param bundle The bundle being committed. + * @param finalCumulative The final cumulative value for the given bundle. + */ + public void commitPhysical(final CommittedBundle bundle, final UpdateT finalCumulative) { + // To prevent a query from blocking the commit, we perform the commit in two steps. + // 1. We perform a non-blocking write to the uncommitted table to make the new vaule + // available immediately. + // 2. We submit a runnable that will commit the update and remove the tentative value in + // a synchronized block. + inflightAttempted.put(bundle, finalCumulative); + COUNTER_COMMITTER.submit(new Runnable() { + @Override + public void run() { + synchronized (attemptedLock) { + finishedAttempted = aggregation.combine(asList(finishedAttempted, finalCumulative)); + inflightAttempted.remove(bundle); + } + } + }); + } + + /** Extract the latest values from all attempted and in-progress bundles. */ + public ResultT extractLatestAttempted() { + ArrayList updates = new ArrayList<>(inflightAttempted.size() + 1); + // Within this block we know that will be consistent. Specifically, the only change that can + // happen concurrently is the addition of new (larger) values to inflightAttempted. + synchronized (attemptedLock) { + updates.add(finishedAttempted); + updates.addAll(inflightAttempted.values()); + } + return aggregation.extract(aggregation.combine(updates)); + } + + /** + * Commit a logical value for the given {@code bundle}. + * + * @param bundle The bundle being committed. + * @param finalCumulative The final cumulative value for the given bundle. + */ + public void commitLogical(final CommittedBundle bundle, final UpdateT finalCumulative) { + UpdateT current; + do { + current = finishedCommitted.get(); + } while (!finishedCommitted.compareAndSet(current, + aggregation.combine(asList(current, finalCumulative)))); + } + + /** Extract the value from all successfully committed bundles. */ + public ResultT extractCommitted() { + return aggregation.extract(finishedCommitted.get()); + } + } + + private static final MetricAggregation COUNTER = + new MetricAggregation() { + @Override + public Long zero() { + return 0L; + } + + @Override + public Long combine(Iterable updates) { + long value = 0; + for (long update : updates) { + value += update; + } + return value; + } + + @Override + public Long extract(Long data) { + return data; + } + }; + + private static final MetricAggregation DISTRIBUTION = + new MetricAggregation() { + @Override + public DistributionData zero() { + return DistributionData.EMPTY; + } + + @Override + public DistributionData combine(Iterable updates) { + DistributionData result = DistributionData.EMPTY; + for (DistributionData update : updates) { + result = result.combine(update); + } + return result; + } + + @Override + public DistributionResult extract(DistributionData data) { + return data.extractResult(); + } + }; + + /** The current values of counters in memory. */ + private MetricsMap> counters = + new MetricsMap<>(new MetricsMap.Factory>() { + @Override + public DirectMetric createInstance(MetricKey unusedKey) { + return new DirectMetric<>(COUNTER); + } + }); + private MetricsMap> distributions = + new MetricsMap<>( + new MetricsMap.Factory>() { + @Override + public DirectMetric createInstance( + MetricKey unusedKey) { + return new DirectMetric<>(DISTRIBUTION); + } + }); + + @AutoValue + abstract static class DirectMetricQueryResults implements MetricQueryResults { + public static MetricQueryResults create( + Iterable> counters, + Iterable> distributions) { + return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, distributions); + } + } + + @AutoValue + abstract static class DirectMetricResult implements MetricResult { + public static MetricResult create(MetricName name, String scope, + T committed, T attempted) { + return new AutoValue_DirectMetrics_DirectMetricResult( + name, scope, committed, attempted); + } + } + + @Override + public MetricQueryResults queryMetrics(MetricsFilter filter) { + ImmutableList.Builder> counterResults = ImmutableList.builder(); + for (Entry> counter : counters.entries()) { + maybeExtractResult(filter, counterResults, counter); + } + ImmutableList.Builder> distributionResults = + ImmutableList.builder(); + for (Entry> distribution + : distributions.entries()) { + maybeExtractResult(filter, distributionResults, distribution); + } + + return DirectMetricQueryResults.create(counterResults.build(), distributionResults.build()); + } + + private void maybeExtractResult( + MetricsFilter filter, + ImmutableList.Builder> resultsBuilder, + Map.Entry> entry) { + if (matches(filter, entry.getKey())) { + resultsBuilder.add(DirectMetricResult.create( + entry.getKey().metricName(), + entry.getKey().stepName(), + entry.getValue().extractCommitted(), + entry.getValue().extractLatestAttempted())); + } + } + + // Matching logic is implemented here rather than in MetricsFilter because we would like + // MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it with + // a Proto/JSON/etc. schema object. + private boolean matches(MetricsFilter filter, MetricKey key) { + return matchesName(key.metricName(), filter.names()) + && matchesScope(key.stepName(), filter.steps()); + } + + private boolean matchesScope(String actualScope, Set scopes) { + if (scopes.isEmpty() || scopes.contains(actualScope)) { + return true; + } + + for (String scope : scopes) { + if (actualScope.startsWith(scope)) { + return true; + } + } + + return false; + } + + private boolean matchesName(MetricName metricName, Set nameFilters) { + if (nameFilters.isEmpty()) { + return true; + } + + for (MetricNameFilter nameFilter : nameFilters) { + if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name())) + && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) { + return true; + } + } + + return false; + } + + /** Apply metric updates that represent physical counter deltas to the current metric values. */ + public void updatePhysical(CommittedBundle bundle, MetricUpdates updates) { + for (MetricUpdate counter : updates.counterUpdates()) { + counters.get(counter.getKey()).updatePhysical(bundle, counter.getUpdate()); + } + for (MetricUpdate distribution : updates.distributionUpdates()) { + distributions.get(distribution.getKey()) + .updatePhysical(bundle, distribution.getUpdate()); + } + } + + public void commitPhysical(CommittedBundle bundle, MetricUpdates updates) { + for (MetricUpdate counter : updates.counterUpdates()) { + counters.get(counter.getKey()).commitPhysical(bundle, counter.getUpdate()); + } + for (MetricUpdate distribution : updates.distributionUpdates()) { + distributions.get(distribution.getKey()) + .commitPhysical(bundle, distribution.getUpdate()); + } + } + + /** Apply metric updates that represent new logical values from a bundle being committed. */ + public void commitLogical(CommittedBundle bundle, MetricUpdates updates) { + for (MetricUpdate counter : updates.counterUpdates()) { + counters.get(counter.getKey()).commitLogical(bundle, counter.getUpdate()); + } + for (MetricUpdate distribution : updates.distributionUpdates()) { + distributions.get(distribution.getKey()) + .commitLogical(bundle, distribution.getUpdate()); + } + } +} diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index e13046d24acf..894109323fd5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.TestStream; @@ -226,6 +227,7 @@ public OutputT apply( @Override public DirectPipelineResult run(Pipeline pipeline) { + MetricsEnvironment.setMetricsSupported(true); ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor(); pipeline.traverseTopologically(consumerTrackingVisitor); for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) { @@ -268,8 +270,7 @@ public DirectPipelineResult run(Pipeline pipeline) { Map, Collection>> aggregatorSteps = pipeline.getAggregatorSteps(); - DirectPipelineResult result = - new DirectPipelineResult(executor, context, aggregatorSteps); + DirectPipelineResult result = new DirectPipelineResult(executor, context, aggregatorSteps); if (options.isBlockOnRun()) { try { result.awaitCompletion(); @@ -383,8 +384,7 @@ public String toString() { @Override public MetricResults metrics() { - throw new UnsupportedOperationException( - "The DirectRunner does not currently support metrics."); + return evaluationContext.getMetrics(); } /** diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 290125442fa4..e5a30d4f1e1e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -96,6 +96,8 @@ class EvaluationContext { private final AggregatorContainer mergedAggregators; + private final DirectMetrics metrics; + public static EvaluationContext create( DirectOptions options, Clock clock, @@ -130,6 +132,7 @@ private EvaluationContext( this.applicationStateInternals = new ConcurrentHashMap<>(); this.mergedAggregators = AggregatorContainer.create(); + this.metrics = new DirectMetrics(); this.callbackExecutor = WatermarkCallbackExecutor.create(MoreExecutors.directExecutor()); @@ -161,6 +164,8 @@ public CommittedResult handleResult( TransformResult result) { Iterable> committedBundles = commitBundles(result.getOutputBundles()); + metrics.commitLogical(completedBundle, result.getLogicalMetricUpdates()); + // Update watermarks and timers EnumSet outputTypes = EnumSet.copyOf(result.getOutputTypes()); if (Iterables.isEmpty(committedBundles)) { @@ -367,6 +372,11 @@ public AggregatorContainer getAggregatorContainer() { return mergedAggregators; } + /** Returns the metrics container for this pipeline. */ + public DirectMetrics getMetrics() { + return metrics; + } + @VisibleForTesting void forceRefresh() { watermarkManager.refreshAll(); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index fab6a338ae79..3761574195c4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -212,6 +212,7 @@ private void evaluateBundle( TransformExecutor callable = TransformExecutor.create( + evaluationContext, registry, enforcements, bundle, diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java index 497234007576..db92542a2fbb 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java @@ -123,5 +123,15 @@ public CommittedBundle withElements(Iterable> elements) { ImmutableList.copyOf(elements), getSynchronizedProcessingOutputWatermark()); } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + + @Override + public boolean equals(Object obj) { + return this == obj; + } } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java index 1829e4aea92d..989109f8eac3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java @@ -22,10 +22,10 @@ import java.util.Collection; import java.util.EnumSet; import java.util.Set; -import javax.annotation.Nullable; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; +import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -37,31 +37,6 @@ */ @AutoValue public abstract class StepTransformResult implements TransformResult { - @Override - public abstract AppliedPTransform getTransform(); - - @Override - public abstract Iterable> getOutputBundles(); - - @Override - public abstract Iterable> getUnprocessedElements(); - - @Override - @Nullable - public abstract AggregatorContainer.Mutator getAggregatorChanges(); - - @Override - public abstract Instant getWatermarkHold(); - - @Nullable - @Override - public abstract CopyOnAccessInMemoryStateInternals getState(); - - @Override - public abstract TimerUpdate getTimerUpdate(); - - @Override - public abstract Set getOutputTypes(); public static Builder withHold(AppliedPTransform transform, Instant watermarkHold) { return new Builder(transform, watermarkHold); @@ -71,6 +46,20 @@ public static Builder withoutHold(AppliedPTransform transform) { return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE); } + @Override + public TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates) { + return new AutoValue_StepTransformResult( + getTransform(), + getOutputBundles(), + getUnprocessedElements(), + getAggregatorChanges(), + metricUpdates, + getWatermarkHold(), + getState(), + getTimerUpdate(), + getOutputTypes()); + } + /** * A builder for creating instances of {@link StepTransformResult}. */ @@ -78,6 +67,7 @@ public static class Builder { private final AppliedPTransform transform; private final ImmutableList.Builder> bundlesBuilder; private final ImmutableList.Builder> unprocessedElementsBuilder; + private MetricUpdates metricUpdates; private CopyOnAccessInMemoryStateInternals state; private TimerUpdate timerUpdate; private AggregatorContainer.Mutator aggregatorChanges; @@ -91,6 +81,7 @@ private Builder(AppliedPTransform transform, Instant watermarkHold) { this.producedOutputs = EnumSet.noneOf(OutputType.class); this.unprocessedElementsBuilder = ImmutableList.builder(); this.timerUpdate = TimerUpdate.builder(null).build(); + this.metricUpdates = MetricUpdates.EMPTY; } public StepTransformResult build() { @@ -99,6 +90,7 @@ public StepTransformResult build() { bundlesBuilder.build(), unprocessedElementsBuilder.build(), aggregatorChanges, + metricUpdates, watermarkHold, state, timerUpdate, @@ -110,6 +102,11 @@ public Builder withAggregatorChanges(AggregatorContainer.Mutator aggregatorChang return this; } + public Builder withMetricUpdates(MetricUpdates metricUpdates) { + this.metricUpdates = metricUpdates; + return this; + } + public Builder withState(CopyOnAccessInMemoryStateInternals state) { this.state = state; return this; diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index aaee9a5c9e91..03f615bc258c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -25,6 +25,9 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.metrics.MetricUpdates; +import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.util.WindowedValue; @@ -38,6 +41,7 @@ */ class TransformExecutor implements Runnable { public static TransformExecutor create( + EvaluationContext context, TransformEvaluatorFactory factory, Iterable modelEnforcements, CommittedBundle inputBundle, @@ -45,6 +49,7 @@ public static TransformExecutor create( CompletionCallback completionCallback, TransformExecutorService transformEvaluationState) { return new TransformExecutor<>( + context, factory, modelEnforcements, inputBundle, @@ -63,10 +68,12 @@ public static TransformExecutor create( private final CompletionCallback onComplete; private final TransformExecutorService transformEvaluationState; + private final EvaluationContext context; private final AtomicReference thread; private TransformExecutor( + EvaluationContext context, TransformEvaluatorFactory factory, Iterable modelEnforcements, CommittedBundle inputBundle, @@ -82,11 +89,14 @@ private TransformExecutor( this.onComplete = completionCallback; this.transformEvaluationState = transformEvaluationState; + this.context = context; this.thread = new AtomicReference<>(); } @Override public void run() { + MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName()); + MetricsEnvironment.setMetricsContainer(metricsContainer); checkState( thread.compareAndSet(null, Thread.currentThread()), "Tried to execute %s for %s on thread %s, but is already executing on thread %s", @@ -108,9 +118,9 @@ public void run() { return; } - processElements(evaluator, enforcements); + processElements(evaluator, metricsContainer, enforcements); - finishBundle(evaluator, enforcements); + finishBundle(evaluator, metricsContainer, enforcements); } catch (Throwable t) { onComplete.handleThrowable(inputBundle, t); if (t instanceof RuntimeException) { @@ -118,6 +128,10 @@ public void run() { } throw new RuntimeException(t); } finally { + // Report the physical metrics from the end of this step. + context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative()); + + MetricsEnvironment.unsetMetricsContainer(); transformEvaluationState.complete(this); } } @@ -127,7 +141,9 @@ public void run() { * necessary {@link ModelEnforcement ModelEnforcements}. */ private void processElements( - TransformEvaluator evaluator, Collection> enforcements) + TransformEvaluator evaluator, + MetricsContainer metricsContainer, + Collection> enforcements) throws Exception { if (inputBundle != null) { for (WindowedValue value : inputBundle.getElements()) { @@ -137,6 +153,13 @@ private void processElements( evaluator.processElement(value); + // Report the physical metrics after each element + MetricUpdates deltas = metricsContainer.getUpdates(); + if (deltas != null) { + context.getMetrics().updatePhysical(inputBundle, deltas); + metricsContainer.commitUpdates(); + } + for (ModelEnforcement enforcement : enforcements) { enforcement.afterElement(value); } @@ -152,9 +175,11 @@ private void processElements( * {@link TransformEvaluator#finishBundle()} */ private TransformResult finishBundle( - TransformEvaluator evaluator, Collection> enforcements) + TransformEvaluator evaluator, MetricsContainer metricsContainer, + Collection> enforcements) throws Exception { - TransformResult result = evaluator.finishBundle(); + TransformResult result = evaluator.finishBundle() + .withLogicalMetricUpdates(metricsContainer.getCumulative()); CommittedResult outputs = onComplete.handleResult(inputBundle, result); for (ModelEnforcement enforcement : enforcements) { enforcement.afterFinish(inputBundle, result, outputs.getOutputs()); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java index ba2d48e76669..ac1e39568bc1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java @@ -22,6 +22,7 @@ import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; +import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -56,6 +57,11 @@ public interface TransformResult { */ @Nullable AggregatorContainer.Mutator getAggregatorChanges(); + /** + * Returns the logical metric updates. + */ + MetricUpdates getLogicalMetricUpdates(); + /** * Returns the Watermark Hold for the transform at the time this result was produced. * @@ -86,4 +92,10 @@ public interface TransformResult { * {@link OutputType#BUNDLE}, as empty bundles may be dropped when the transform is committed. */ Set getOutputTypes(); + + /** + * Returns a new TransformResult based on this one but overwriting any existing logical metric + * updates with {@code metricUpdates}. + */ + TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates); } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java new file mode 100644 index 000000000000..df012446f672 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult; +import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.metrics.DistributionData; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricUpdates; +import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link DirectMetrics}. + */ +@RunWith(JUnit4.class) +public class DirectMetricsTest { + + @Mock + private CommittedBundle bundle1; + @Mock + private CommittedBundle bundle2; + + private static final MetricName NAME1 = MetricName.named("ns1", "name1"); + private static final MetricName NAME2 = MetricName.named("ns1", "name2"); + private static final MetricName NAME3 = MetricName.named("ns2", "name1"); + + private DirectMetrics metrics = new DirectMetrics(); + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testApplyLogicalQueryNoFilter() { + metrics.commitLogical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step1", NAME1), 5L), + MetricUpdate.create(MetricKey.create("step1", NAME2), 8L)), + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step1", NAME1), + DistributionData.create(8, 2, 3, 5))))); + metrics.commitLogical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step2", NAME1), 7L), + MetricUpdate.create(MetricKey.create("step1", NAME2), 4L)), + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step1", NAME1), + DistributionData.create(4, 1, 4, 4))))); + + MetricQueryResults results = metrics.queryMetrics(MetricsFilter.builder().build()); + assertThat(results.counters(), containsInAnyOrder( + metricResult("ns1", "name1", "step1", 5L, 0L), + metricResult("ns1", "name2", "step1", 12L, 0L), + metricResult("ns1", "name1", "step2", 7L, 0L))); + assertThat(results.distributions(), contains( + metricResult("ns1", "name1", "step1", + DistributionResult.create(12, 3, 3, 5), + DistributionResult.ZERO))); + } + + @Test + public void testApplyPhysicalCountersQueryOneNamespace() { + metrics.updatePhysical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step1", NAME1), 5L), + MetricUpdate.create(MetricKey.create("step1", NAME3), 8L)), + ImmutableList.>of())); + metrics.updatePhysical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step2", NAME1), 7L), + MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)), + ImmutableList.>of())); + + assertThat(metrics.queryMetrics( + MetricsFilter.builder().addNameFilter(inNamespace("ns1")).build()).counters(), + containsInAnyOrder( + metricResult("ns1", "name1", "step1", 0L, 5L), + metricResult("ns1", "name1", "step2", 0L, 7L))); + } + + @Test + public void testApplyPhysicalQueryCompositeScope() { + metrics.updatePhysical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L), + MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)), + ImmutableList.>of())); + metrics.updatePhysical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L), + MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)), + ImmutableList.>of())); + + assertThat(metrics.queryMetrics( + MetricsFilter.builder().addStep("Outer1").build()).counters(), + containsInAnyOrder( + metricResult("ns1", "name1", "Outer1/Inner1", 0L, 12L), + metricResult("ns1", "name1", "Outer1/Inner2", 0L, 8L))); + } +} diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 4768fb030fdb..d93dd7a1d306 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.direct; +import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isA; @@ -35,12 +37,20 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.PipelineRunner; @@ -444,4 +454,30 @@ public Long decode(InputStream inStream, Context context) throws IOException { throw new CoderException("Cannot decode a long"); } } + + public void testMetrics() throws Exception { + Pipeline pipeline = getPipeline(); + pipeline + .apply(Create.of(5, 8, 13)) + .apply("MyStep", ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + Counter count = Metrics.counter(DirectRunnerTest.class, "count"); + Distribution values = Metrics.distribution(DirectRunnerTest.class, "input"); + + count.inc(); + values.update(c.element()); + } + })); + PipelineResult result = pipeline.run(); + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(DirectRunnerTest.class)) + .build()); + assertThat(metrics.counters(), contains( + metricResult(DirectRunnerTest.class.getName(), "count", "MyStep", 3L, 3L))); + assertThat(metrics.distributions(), contains( + metricResult(DirectRunnerTest.class.getName(), "input", "MyStep", + DistributionResult.create(26L, 3L, 5L, 13L), + DistributionResult.create(26L, 3L, 5L, 13L)))); + } } diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index c63e9bd30275..5015e5a1d763 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -72,6 +72,7 @@ public class TransformExecutorTest { private RegisteringCompletionCallback completionCallback; private TransformExecutorService transformEvaluationState; private BundleFactory bundleFactory; + @Mock private DirectMetrics metrics; @Mock private EvaluationContext evaluationContext; @Mock private TransformEvaluatorRegistry registry; @@ -90,6 +91,8 @@ public void setup() { TestPipeline p = TestPipeline.create(); created = p.apply(Create.of("foo", "spam", "third")); downstream = created.apply(WithKeys.of(3)); + + when(evaluationContext.getMetrics()).thenReturn(metrics); } @Test @@ -116,6 +119,7 @@ public TransformResult finishBundle() throws Exception { TransformExecutor executor = TransformExecutor.create( + evaluationContext, registry, Collections.emptyList(), null, @@ -135,6 +139,7 @@ public void nullTransformEvaluatorTerminates() throws Exception { TransformExecutor executor = TransformExecutor.create( + evaluationContext, registry, Collections.emptyList(), null, @@ -177,6 +182,7 @@ public TransformResult finishBundle() throws Exception { TransformExecutor executor = TransformExecutor.create( + evaluationContext, registry, Collections.emptyList(), inputBundle, @@ -219,6 +225,7 @@ public TransformResult finishBundle() throws Exception { TransformExecutor executor = TransformExecutor.create( + evaluationContext, registry, Collections.emptyList(), inputBundle, @@ -254,6 +261,7 @@ public TransformResult finishBundle() throws Exception { TransformExecutor executor = TransformExecutor.create( + evaluationContext, registry, Collections.emptyList(), inputBundle, @@ -294,6 +302,7 @@ public TransformResult finishBundle() throws Exception { TransformExecutor executor = TransformExecutor.create( + evaluationContext, registry, Collections.emptyList(), null, @@ -335,6 +344,7 @@ public TransformResult finishBundle() throws Exception { TestEnforcementFactory enforcement = new TestEnforcementFactory(); TransformExecutor executor = TransformExecutor.create( + evaluationContext, registry, Collections.singleton(enforcement), inputBundle, @@ -392,6 +402,7 @@ public TransformResult finishBundle() throws Exception { TransformExecutor executor = TransformExecutor.create( + evaluationContext, registry, Collections.singleton(ImmutabilityEnforcementFactory.create()), inputBundle, @@ -448,6 +459,7 @@ public TransformResult finishBundle() throws Exception { TransformExecutor executor = TransformExecutor.create( + evaluationContext, registry, Collections.singleton(ImmutabilityEnforcementFactory.create()), inputBundle,