Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,21 @@
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
import org.apache.beam.sdk.runners.AggregatorRetrievalException;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.util.MapAggregatorValues;
import org.apache.beam.sdk.util.TimerInternals.TimerData;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
Expand All @@ -47,6 +45,7 @@
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;

import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -244,7 +243,7 @@ public DirectPipelineResult run(Pipeline pipeline) {
executor.start(consumerTrackingVisitor.getRootTransforms());

Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
pipeline.getAggregatorSteps();
DirectPipelineResult result =
new DirectPipelineResult(executor, context, aggregatorSteps);
if (options.isBlockOnRun()) {
Expand Down Expand Up @@ -321,7 +320,7 @@ public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
throws AggregatorRetrievalException {
AggregatorContainer aggregators = evaluationContext.getAggregatorContainer();
Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
Map<String, T> stepValues = new HashMap<>();
final Map<String, T> stepValues = new HashMap<>();
for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
if (steps.contains(transform.getTransform())) {
T aggregate = aggregators.getAggregate(
Expand All @@ -331,7 +330,19 @@ public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
}
}
}
return new MapAggregatorValues<>(stepValues);
return new AggregatorValues<T>() {
@Override
public Map<String, T> getValuesAtSteps() {
return stepValues;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("stepValues", stepValues)
.toString();
}
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.beam.runners.flink;

import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.runners.AggregatorRetrievalException;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.transforms.Aggregator;

import org.joda.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@
import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.runners.AggregatorRetrievalException;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.util.AttemptAndTimeBoundedExponentialBackOff;
import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
import org.apache.beam.sdk.util.MapAggregatorValues;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.util.BackOff;
Expand All @@ -41,6 +40,7 @@
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;

import org.joda.time.Duration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -369,7 +369,20 @@ private boolean nextBackOff(Sleeper sleeper, BackOff backoff) {
public <OutputT> AggregatorValues<OutputT> getAggregatorValues(Aggregator<?, OutputT> aggregator)
throws AggregatorRetrievalException {
try {
return new MapAggregatorValues<>(fromMetricUpdates(aggregator));
final Map<String, OutputT> stepValues = fromMetricUpdates(aggregator);
return new AggregatorValues<OutputT>() {
@Override
public Map<String, OutputT> getValuesAtSteps() {
return stepValues;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("stepValues", stepValues)
.toString();
}
};
} catch (IOException e) {
throw new AggregatorRetrievalException(
"IOException when retrieving Aggregator values for Aggregator " + aggregator, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.Aggregator;
Expand Down Expand Up @@ -596,9 +595,8 @@ public DataflowPipelineJob run(Pipeline pipeline) {

// Obtain all of the extractors from the PTransforms used in the pipeline so the
// DataflowPipelineJob has access to them.
AggregatorPipelineExtractor aggregatorExtractor = new AggregatorPipelineExtractor(pipeline);
Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
aggregatorExtractor.getAggregatorSteps();
pipeline.getAggregatorSteps();

DataflowAggregatorTransforms aggregatorTransforms =
new DataflowAggregatorTransforms(aggregatorSteps, jobSpecification.getStepNames());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AggregatorRetrievalException;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.AggregatorRetrievalException;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

import org.apache.beam.runners.spark.aggregators.AggAccumParam;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Max;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@

import org.apache.beam.runners.spark.EvaluationResult;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.ApproximateUnique;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.runners;
package org.apache.beam.sdk;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AggregatorRetriever;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -36,7 +36,7 @@
* Retrieves {@link Aggregator Aggregators} at each {@link ParDo} and returns a {@link Map} of
* {@link Aggregator} to the {@link PTransform PTransforms} in which it is present.
*/
public class AggregatorPipelineExtractor {
class AggregatorPipelineExtractor {
private final Pipeline pipeline;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.runners;
package org.apache.beam.sdk;

import org.apache.beam.sdk.transforms.Aggregator;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.runners;
package org.apache.beam.sdk;

import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
Expand Down
10 changes: 10 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -47,6 +48,7 @@
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -517,6 +519,14 @@ private String uniquifyInternal(String namePrefix, String origName) {
}
}

/**
* Returns a {@link Map} from each {@link Aggregator} in the {@link Pipeline} to the {@link
* PTransform PTransforms} in which it is used.
*/
public Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> getAggregatorSteps() {
return new AggregatorPipelineExtractor(this).getAggregatorSteps();
}

/**
* Builds a name from a "/"-delimited prefix and a name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk;

import org.apache.beam.sdk.runners.AggregatorRetrievalException;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.transforms.Aggregator;

import org.joda.time.Duration;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.runners;
package org.apache.beam.sdk;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Max;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public void testCreateAggregatorsWithDifferentNamesSucceeds() {
DoFn<Void, Void> doFn = new NoOpDoFn();

Aggregator<Double, Double> aggregatorOne =

doFn.createAggregator(nameOne, combiner);
Aggregator<Double, Double> aggregatorTwo =
doFn.createAggregator(nameTwo, combiner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;

import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.runners.AggregatorValues;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
Expand All @@ -36,6 +36,7 @@
import org.apache.beam.sdk.transforms.display.DisplayData;

import com.google.common.collect.ImmutableMap;

import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down