From c867790abad182bcbffa9a7a88de76a608b6d039 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 21 Jul 2016 20:18:05 -0700 Subject: [PATCH 1/3] Fix MapAggregatorValues use of toStringHelper --- .../main/java/org/apache/beam/sdk/util/MapAggregatorValues.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java index fe02666dee49..3d949ec6dc80 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java @@ -43,7 +43,7 @@ public Map getValuesAtSteps() { @Override public String toString() { - return MoreObjects.toStringHelper(MapAggregatorValues.class) + return MoreObjects.toStringHelper(this) .add("stepValues", stepValues) .toString(); } From 9da4bbcdaf3c19ee5f78836b7cffaab947861a58 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 21 Jul 2016 20:24:17 -0700 Subject: [PATCH 2/3] Inline MapAggregatorValues to remove dependencies This class is trivial. Adding it to the public API of the SDK is not desirable, since it is just for runners. Adding it to runners-core would be OK but is really overkill for a glorified Map. --- .../beam/runners/direct/DirectRunner.java | 18 +++++-- .../runners/dataflow/DataflowPipelineJob.java | 17 ++++++- .../beam/sdk/util/MapAggregatorValues.java | 50 ------------------- 3 files changed, 30 insertions(+), 55 deletions(-) delete mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 743c565517bf..a9c8ecbe4c61 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; -import org.apache.beam.sdk.util.MapAggregatorValues; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; @@ -47,6 +46,7 @@ import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; +import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -321,7 +321,7 @@ public AggregatorValues getAggregatorValues(Aggregator aggregator) throws AggregatorRetrievalException { AggregatorContainer aggregators = evaluationContext.getAggregatorContainer(); Collection> steps = aggregatorSteps.get(aggregator); - Map stepValues = new HashMap<>(); + final Map stepValues = new HashMap<>(); for (AppliedPTransform transform : evaluationContext.getSteps()) { if (steps.contains(transform.getTransform())) { T aggregate = aggregators.getAggregate( @@ -331,7 +331,19 @@ public AggregatorValues getAggregatorValues(Aggregator aggregator) } } } - return new MapAggregatorValues<>(stepValues); + return new AggregatorValues() { + @Override + public Map getValuesAtSteps() { + return stepValues; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("stepValues", stepValues) + .toString(); + } + }; } /** diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 3194f7ca8a16..a6baa4f41794 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.util.AttemptAndTimeBoundedExponentialBackOff; import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; -import org.apache.beam.sdk.util.MapAggregatorValues; import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.util.BackOff; @@ -41,6 +40,7 @@ import com.google.api.services.dataflow.model.JobMetrics; import com.google.api.services.dataflow.model.MetricUpdate; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; import org.joda.time.Duration; import org.slf4j.Logger; @@ -369,7 +369,20 @@ private boolean nextBackOff(Sleeper sleeper, BackOff backoff) { public AggregatorValues getAggregatorValues(Aggregator aggregator) throws AggregatorRetrievalException { try { - return new MapAggregatorValues<>(fromMetricUpdates(aggregator)); + final Map stepValues = fromMetricUpdates(aggregator); + return new AggregatorValues() { + @Override + public Map getValuesAtSteps() { + return stepValues; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("stepValues", stepValues) + .toString(); + } + }; } catch (IOException e) { throw new AggregatorRetrievalException( "IOException when retrieving Aggregator values for Aggregator " + aggregator, e); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java deleted file mode 100644 index 3d949ec6dc80..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MapAggregatorValues.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.runners.AggregatorValues; -import org.apache.beam.sdk.transforms.Aggregator; - -import com.google.common.base.MoreObjects; - -import java.util.Map; - -/** - * An {@link AggregatorValues} implementation that is backed by an in-memory map. - * - * @param the output type of the {@link Aggregator} - */ -public class MapAggregatorValues extends AggregatorValues { - private final Map stepValues; - - public MapAggregatorValues(Map stepValues) { - this.stepValues = stepValues; - } - - @Override - public Map getValuesAtSteps() { - return stepValues; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("stepValues", stepValues) - .toString(); - } -} From adec254d5fdb409e786a1fc2bcee38f8a7a04408 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 1 Jul 2016 14:56:20 -0700 Subject: [PATCH 3/3] Move aggregator support classes out of runners namespace, make private --- .../org/apache/beam/runners/direct/DirectRunner.java | 7 +++---- .../apache/beam/runners/flink/FlinkRunnerResult.java | 4 ++-- .../beam/runners/dataflow/DataflowPipelineJob.java | 4 ++-- .../apache/beam/runners/dataflow/DataflowRunner.java | 4 +--- .../beam/runners/dataflow/DataflowPipelineJobTest.java | 4 ++-- .../runners/spark/translation/EvaluationContext.java | 4 ++-- .../runners/spark/translation/SparkRuntimeContext.java | 2 +- .../spark/translation/MultiOutputWordCountTest.java | 2 +- .../sdk/{runners => }/AggregatorPipelineExtractor.java | 6 +++--- .../{runners => }/AggregatorRetrievalException.java | 2 +- .../beam/sdk/{runners => }/AggregatorValues.java | 2 +- .../src/main/java/org/apache/beam/sdk/Pipeline.java | 10 ++++++++++ .../main/java/org/apache/beam/sdk/PipelineResult.java | 2 -- .../{runners => }/AggregatorPipelineExtractorTest.java | 4 ++-- .../java/org/apache/beam/sdk/transforms/DoFnTest.java | 1 + .../org/apache/beam/sdk/transforms/OldDoFnTest.java | 3 ++- 16 files changed, 34 insertions(+), 27 deletions(-) rename sdks/java/core/src/main/java/org/apache/beam/sdk/{runners => }/AggregatorPipelineExtractor.java (96%) rename sdks/java/core/src/main/java/org/apache/beam/sdk/{runners => }/AggregatorRetrievalException.java (97%) rename sdks/java/core/src/main/java/org/apache/beam/sdk/{runners => }/AggregatorValues.java (98%) rename sdks/java/core/src/test/java/org/apache/beam/sdk/{runners => }/AggregatorPipelineExtractorTest.java (99%) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index a9c8ecbe4c61..f2b781ec5e35 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -20,15 +20,14 @@ import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory; +import org.apache.beam.sdk.AggregatorRetrievalException; +import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.AggregatorPipelineExtractor; -import org.apache.beam.sdk.runners.AggregatorRetrievalException; -import org.apache.beam.sdk.runners.AggregatorValues; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -244,7 +243,7 @@ public DirectPipelineResult run(Pipeline pipeline) { executor.start(consumerTrackingVisitor.getRootTransforms()); Map, Collection>> aggregatorSteps = - new AggregatorPipelineExtractor(pipeline).getAggregatorSteps(); + pipeline.getAggregatorSteps(); DirectPipelineResult result = new DirectPipelineResult(executor, context, aggregatorSteps); if (options.isBlockOnRun()) { diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index cae0b2aab8c5..923d54c56631 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.flink; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.runners.AggregatorRetrievalException; -import org.apache.beam.sdk.runners.AggregatorValues; +import org.apache.beam.sdk.AggregatorRetrievalException; +import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; import org.joda.time.Duration; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index a6baa4f41794..e043e23ad162 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -23,9 +23,9 @@ import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; +import org.apache.beam.sdk.AggregatorRetrievalException; +import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.runners.AggregatorRetrievalException; -import org.apache.beam.sdk.runners.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.util.AttemptAndTimeBoundedExponentialBackOff; import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index fadd9c78d888..3b68e92bc5cb 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -71,7 +71,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.options.StreamingOptions; -import org.apache.beam.sdk.runners.AggregatorPipelineExtractor; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.Aggregator; @@ -596,9 +595,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { // Obtain all of the extractors from the PTransforms used in the pipeline so the // DataflowPipelineJob has access to them. - AggregatorPipelineExtractor aggregatorExtractor = new AggregatorPipelineExtractor(pipeline); Map, Collection>> aggregatorSteps = - aggregatorExtractor.getAggregatorSteps(); + pipeline.getAggregatorSteps(); DataflowAggregatorTransforms aggregatorTransforms = new DataflowAggregatorTransforms(aggregatorSteps, jobSpecification.getStepNames()); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 343d53858bde..e6277d9c98c3 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -35,10 +35,10 @@ import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; +import org.apache.beam.sdk.AggregatorRetrievalException; +import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.AggregatorRetrievalException; -import org.apache.beam.sdk.runners.AggregatorValues; import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 169c2afd6236..4ccac0e1a0b2 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -22,10 +22,10 @@ import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.coders.CoderHelpers; +import org.apache.beam.sdk.AggregatorRetrievalException; +import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.runners.AggregatorRetrievalException; -import org.apache.beam.sdk.runners.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java index 46f5b33f5eb3..c2edd023c7e1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java @@ -20,12 +20,12 @@ import org.apache.beam.runners.spark.aggregators.AggAccumParam; import org.apache.beam.runners.spark.aggregators.NamedAggregators; +import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Max; diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java index 291f7b266e55..0d0c0b41efd8 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java @@ -20,11 +20,11 @@ import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkRunner; +import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.AggregatorValues; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.ApproximateUnique; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java similarity index 96% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractor.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java index 146ddfa42313..ac215c9ed1f0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java @@ -15,10 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.runners; +package org.apache.beam.sdk; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AggregatorRetriever; import org.apache.beam.sdk.transforms.PTransform; @@ -36,7 +36,7 @@ * Retrieves {@link Aggregator Aggregators} at each {@link ParDo} and returns a {@link Map} of * {@link Aggregator} to the {@link PTransform PTransforms} in which it is present. */ -public class AggregatorPipelineExtractor { +class AggregatorPipelineExtractor { private final Pipeline pipeline; /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorRetrievalException.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java similarity index 97% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorRetrievalException.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java index a0973c3b52ec..30408152c3a7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorRetrievalException.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorRetrievalException.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.runners; +package org.apache.beam.sdk; import org.apache.beam.sdk.transforms.Aggregator; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java similarity index 98% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java index 6f6836e9a477..efaad85b67b5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorValues.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.runners; +package org.apache.beam.sdk; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index e4f3e4a83f66..1bbc56f1fad2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.runners.TransformTreeNode; +import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; @@ -47,6 +48,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -517,6 +519,14 @@ private String uniquifyInternal(String namePrefix, String origName) { } } + /** + * Returns a {@link Map} from each {@link Aggregator} in the {@link Pipeline} to the {@link + * PTransform PTransforms} in which it is used. + */ + public Map, Collection>> getAggregatorSteps() { + return new AggregatorPipelineExtractor(this).getAggregatorSteps(); + } + /** * Builds a name from a "/"-delimited prefix and a name. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java index 993962cabf9a..edfc9248f527 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk; -import org.apache.beam.sdk.runners.AggregatorRetrievalException; -import org.apache.beam.sdk.runners.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; import org.joda.time.Duration; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java similarity index 99% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java rename to sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java index 13476e29878a..930fbe781b05 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.runners; +package org.apache.beam.sdk; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -23,8 +23,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; +import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Max; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java index 710e4cecf578..3fb3193f4d5a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java @@ -128,6 +128,7 @@ public void testCreateAggregatorsWithDifferentNamesSucceeds() { DoFn doFn = new NoOpDoFn(); Aggregator aggregatorOne = + doFn.createAggregator(nameOne, combiner); Aggregator aggregatorTwo = doFn.createAggregator(nameTwo, combiner); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java index 9d144b3b729a..5946d9a0dff9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java @@ -24,10 +24,10 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThat; +import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.runners.AggregatorValues; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine.CombineFn; @@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import com.google.common.collect.ImmutableMap; + import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category;