diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/package-info.java new file mode 100644 index 0000000000..b5bcf18eda --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/protobuf/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * Defines a {@link com.google.cloud.dataflow.sdk.coders.Coder} + * for Protocol Buffers messages, {@code ProtoCoder}. + * + * @see com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder + */ +package com.google.cloud.dataflow.sdk.coders.protobuf; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java index 7ecccf14a6..7d59b09c8d 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java @@ -72,7 +72,7 @@ *

Reading from Cloud Bigtable

* *

The Bigtable source returns a set of rows from a single table, returning a - * {@code PCollection<Row>}. + * {@code PCollection}. * *

To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions} * or builder configured with the project and other information necessary to identify the diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java new file mode 100644 index 0000000000..112a954d71 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/package-info.java @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** + * Defines transforms for reading and writing from Google Cloud Bigtable. + * + * @see com.google.cloud.dataflow.sdk.io.bigtable.BigtableIO + */ +package com.google.cloud.dataflow.sdk.io.bigtable; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java index d0cc4e53d5..0feae957f8 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java @@ -952,6 +952,9 @@ private void groupByKeyHelper( context.addInput( PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(windowingStrategy))); + context.addInput( + PropertyNames.IS_MERGING_WINDOW_FN, + !windowingStrategy.getWindowFn().isNonMerging()); } }); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java index 1c0279897a..2a164c3518 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java @@ -18,7 +18,6 @@ import com.google.cloud.dataflow.sdk.io.Read.Bounded; import com.google.cloud.dataflow.sdk.io.Source.Reader; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.PTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java index 745f8f2718..307bc5cdb5 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EvaluatorKey.java @@ -15,7 +15,6 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import java.util.Objects; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java index 14428888e2..bde1df45e9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactory.java @@ -16,7 +16,6 @@ package com.google.cloud.dataflow.sdk.runners.inprocess; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.Flatten; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java index 0347281749..ec63be84c9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java @@ -22,7 +22,6 @@ import com.google.cloud.dataflow.sdk.coders.IterableCoder; import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Builder; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java index e280e22d2b..7cf53aafe6 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java @@ -1209,8 +1209,11 @@ public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) { * and deletedTimers. */ public TimerUpdate build() { - return new TimerUpdate(key, ImmutableSet.copyOf(completedTimers), - ImmutableSet.copyOf(setTimers), ImmutableSet.copyOf(deletedTimers)); + return new TimerUpdate( + key, + ImmutableSet.copyOf(completedTimers), + ImmutableSet.copyOf(setTimers), + ImmutableSet.copyOf(deletedTimers)); } } @@ -1245,6 +1248,13 @@ Iterable getDeletedTimers() { return deletedTimers; } + /** + * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers. + */ + public TimerUpdate withCompletedTimers(Iterable completedTimers) { + return new TimerUpdate(this.key, completedTimers, setTimers, deletedTimers); + } + @Override public int hashCode() { return Objects.hash(key, completedTimers, setTimers, deletedTimers); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java index cc20161097..112ba17d14 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundle.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015 Google Inc. + * Copyright (C) 2016 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -22,7 +22,6 @@ import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.base.MoreObjects; -import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.collect.ImmutableList; import org.joda.time.Instant; @@ -64,6 +63,11 @@ private InProcessBundle(PCollection pcollection, boolean keyed, Object key) { this.elements = ImmutableList.builder(); } + @Override + public PCollection getPCollection() { + return pcollection; + } + @Override public InProcessBundle add(WindowedValue element) { checkState(!committed, "Can't add element %s to committed bundle %s", element, this); @@ -105,12 +109,12 @@ public Instant getSynchronizedProcessingOutputWatermark() { @Override public String toString() { - ToStringHelper toStringHelper = - MoreObjects.toStringHelper(this).add("pcollection", pcollection); - if (keyed) { - toStringHelper = toStringHelper.add("key", key); - } - return toStringHelper.add("elements", elements).toString(); + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("pcollection", pcollection) + .add("key", key) + .add("elements", committedElements) + .toString(); } }; } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java new file mode 100644 index 0000000000..757e9e11d9 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -0,0 +1,364 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; +import com.google.cloud.dataflow.sdk.util.ExecutionContext; +import com.google.cloud.dataflow.sdk.util.SideInputReader; +import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.util.common.CounterSet; +import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.PValue; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.annotation.Nullable; + +/** + * The evaluation context for a specific pipeline being executed by the + * {@link InProcessPipelineRunner}. Contains state shared within the execution across all + * transforms. + * + *

{@link InProcessEvaluationContext} contains shared state for an execution of the + * {@link InProcessPipelineRunner} that can be used while evaluating a {@link PTransform}. This + * consists of views into underlying state and watermark implementations, access to read and write + * {@link PCollectionView PCollectionViews}, and constructing {@link CounterSet CounterSets} and + * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when + * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and + * known to be empty). + * + *

{@link InProcessEvaluationContext} also handles results by committing finalizing bundles based + * on the current global state and updating the global state appropriately. This includes updating + * the per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that + * can be executed. + */ +class InProcessEvaluationContext { + /** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */ + private final Map, String> stepNames; + + /** The options that were used to create this {@link Pipeline}. */ + private final InProcessPipelineOptions options; + + /** The current processing time and event time watermarks and timers. */ + private final InMemoryWatermarkManager watermarkManager; + + /** Executes callbacks based on the progression of the watermark. */ + private final WatermarkCallbackExecutor callbackExecutor; + + /** The stateInternals of the world, by applied PTransform and key. */ + private final ConcurrentMap> + applicationStateInternals; + + private final InProcessSideInputContainer sideInputContainer; + + private final CounterSet mergedCounters; + + public static InProcessEvaluationContext create( + InProcessPipelineOptions options, + Collection> rootTransforms, + Map>> valueToConsumers, + Map, String> stepNames, + Collection> views) { + return new InProcessEvaluationContext( + options, rootTransforms, valueToConsumers, stepNames, views); + } + + private InProcessEvaluationContext( + InProcessPipelineOptions options, + Collection> rootTransforms, + Map>> valueToConsumers, + Map, String> stepNames, + Collection> views) { + this.options = checkNotNull(options); + checkNotNull(rootTransforms); + checkNotNull(valueToConsumers); + checkNotNull(stepNames); + checkNotNull(views); + this.stepNames = stepNames; + + this.watermarkManager = + InMemoryWatermarkManager.create( + NanosOffsetClock.create(), rootTransforms, valueToConsumers); + this.sideInputContainer = InProcessSideInputContainer.create(this, views); + + this.applicationStateInternals = new ConcurrentHashMap<>(); + this.mergedCounters = new CounterSet(); + + this.callbackExecutor = WatermarkCallbackExecutor.create(); + } + + /** + * Handle the provided {@link InProcessTransformResult}, produced after evaluating the provided + * {@link CommittedBundle} (potentially null, if the result of a root {@link PTransform}). + * + *

The result is the output of running the transform contained in the + * {@link InProcessTransformResult} on the contents of the provided bundle. + * + * @param completedBundle the bundle that was processed to produce the result. Potentially + * {@code null} if the transform that produced the result is a root + * transform + * @param completedTimers the timers that were delivered to produce the {@code completedBundle}, + * or an empty iterable if no timers were delivered + * @param result the result of evaluating the input bundle + * @return the committed bundles contained within the handled {@code result} + */ + public synchronized Iterable> handleResult( + @Nullable CommittedBundle completedBundle, + Iterable completedTimers, + InProcessTransformResult result) { + Iterable> committedBundles = + commitBundles(result.getOutputBundles()); + // Update watermarks and timers + watermarkManager.updateWatermarks( + completedBundle, + result.getTransform(), + result.getTimerUpdate().withCompletedTimers(completedTimers), + committedBundles, + result.getWatermarkHold()); + fireAllAvailableCallbacks(); + // Update counters + if (result.getCounters() != null) { + mergedCounters.merge(result.getCounters()); + } + // Update state internals + CopyOnAccessInMemoryStateInternals theirState = result.getState(); + if (theirState != null) { + CopyOnAccessInMemoryStateInternals committedState = theirState.commit(); + StepAndKey stepAndKey = + StepAndKey.of( + result.getTransform(), completedBundle == null ? null : completedBundle.getKey()); + if (!committedState.isEmpty()) { + applicationStateInternals.put(stepAndKey, committedState); + } else { + applicationStateInternals.remove(stepAndKey); + } + } + return committedBundles; + } + + private Iterable> commitBundles( + Iterable> bundles) { + ImmutableList.Builder> completed = ImmutableList.builder(); + for (UncommittedBundle inProgress : bundles) { + AppliedPTransform producing = + inProgress.getPCollection().getProducingTransformInternal(); + TransformWatermarks watermarks = watermarkManager.getWatermarks(producing); + CommittedBundle committed = + inProgress.commit(watermarks.getSynchronizedProcessingOutputTime()); + // Empty bundles don't impact watermarks and shouldn't trigger downstream execution, so + // filter them out + if (!Iterables.isEmpty(committed.getElements())) { + completed.add(committed); + } + } + return completed.build(); + } + + private void fireAllAvailableCallbacks() { + for (AppliedPTransform transform : stepNames.keySet()) { + fireAvailableCallbacks(transform); + } + } + + private void fireAvailableCallbacks(AppliedPTransform producingTransform) { + TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform); + callbackExecutor.fireForWatermark(producingTransform, watermarks.getOutputWatermark()); + } + + /** + * Create a {@link UncommittedBundle} for use by a source. + */ + public UncommittedBundle createRootBundle(PCollection output) { + return InProcessBundle.unkeyed(output); + } + + /** + * Create a {@link UncommittedBundle} whose elements belong to the specified {@link + * PCollection}. + */ + public UncommittedBundle createBundle(CommittedBundle input, PCollection output) { + return input.isKeyed() + ? InProcessBundle.keyed(output, input.getKey()) + : InProcessBundle.unkeyed(output); + } + + /** + * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by + * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. + */ + public UncommittedBundle createKeyedBundle( + CommittedBundle input, Object key, PCollection output) { + return InProcessBundle.keyed(output, key); + } + + /** + * Create a {@link PCollectionViewWriter}, whose elements will be used in the provided + * {@link PCollectionView}. + */ + public PCollectionViewWriter createPCollectionViewWriter( + PCollection> input, final PCollectionView output) { + return new PCollectionViewWriter() { + @Override + public void add(Iterable> values) { + sideInputContainer.write(output, values); + } + }; + } + + /** + * Schedule a callback to be executed after output would be produced for the given window + * if there had been input. + * + *

Output would be produced when the watermark for a {@link PValue} passes the point at + * which the trigger for the specified window (with the specified windowing strategy) must have + * fired from the perspective of that {@link PValue}, as specified by the value of + * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the + * {@link WindowingStrategy}. When the callback has fired, either values will have been produced + * for a key in that window, the window is empty, or all elements in the window are late. The + * callback will be executed regardless of whether values have been produced. + */ + public void scheduleAfterOutputWouldBeProduced( + PValue value, + BoundedWindow window, + WindowingStrategy windowingStrategy, + Runnable runnable) { + AppliedPTransform producing = getProducing(value); + callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable); + + fireAvailableCallbacks(lookupProducing(value)); + } + + private AppliedPTransform getProducing(PValue value) { + if (value.getProducingTransformInternal() != null) { + return value.getProducingTransformInternal(); + } + return lookupProducing(value); + } + + private AppliedPTransform lookupProducing(PValue value) { + for (AppliedPTransform transform : stepNames.keySet()) { + if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) { + return transform; + } + } + return null; + } + + /** + * Get the options used by this {@link Pipeline}. + */ + public InProcessPipelineOptions getPipelineOptions() { + return options; + } + + /** + * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key. + */ + public InProcessExecutionContext getExecutionContext( + AppliedPTransform application, Object key) { + StepAndKey stepAndKey = StepAndKey.of(application, key); + return new InProcessExecutionContext( + options.getClock(), + key, + (CopyOnAccessInMemoryStateInternals) applicationStateInternals.get(stepAndKey), + watermarkManager.getWatermarks(application)); + } + + /** + * Get all of the steps used in this {@link Pipeline}. + */ + public Collection> getSteps() { + return stepNames.keySet(); + } + + /** + * Get the Step Name for the provided application. + */ + public String getStepName(AppliedPTransform application) { + return stepNames.get(application); + } + + /** + * Returns a {@link SideInputReader} capable of reading the provided + * {@link PCollectionView PCollectionViews}. + * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to + * read + * @return a {@link SideInputReader} that can read all of the provided + * {@link PCollectionView PCollectionViews} + */ + public SideInputReader createSideInputReader(final List> sideInputs) { + return sideInputContainer.createReaderForViews(sideInputs); + } + + /** + * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent + * of all other {@link CounterSet CounterSets} created by this call. + * + * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in + * all created {@link CounterSet CounterSets} when the transforms that call this method + * complete. + */ + public CounterSet createCounterSet() { + return new CounterSet(); + } + + /** + * Returns all of the counters that have been merged into this context via calls to + * {@link CounterSet#merge(CounterSet)}. + */ + public CounterSet getCounters() { + return mergedCounters; + } + + /** + * Extracts all timers that have been fired and have not already been extracted. + * + *

This is a destructive operation. Timers will only appear in the result of this method once + * for each time they are set. + */ + public Map, Map> extractFiredTimers() { + return watermarkManager.extractFiredTimers(); + } + + /** + * Returns true if all steps are done. + */ + public boolean isDone() { + return watermarkManager.isDone(); + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java index d659d962f0..60c8543a2f 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java @@ -15,10 +15,15 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; +import com.google.cloud.dataflow.sdk.options.Default; import com.google.cloud.dataflow.sdk.options.PipelineOptions; /** * Options that can be used to configure the {@link InProcessPipelineRunner}. */ -public interface InProcessPipelineOptions extends PipelineOptions {} +public interface InProcessPipelineOptions extends PipelineOptions { + @Default.InstanceFactory(NanosOffsetClock.Factory.class) + Clock getClock(); + void setClock(Clock clock); +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 124de46b94..7a268ee5fa 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -17,31 +17,22 @@ import static com.google.common.base.Preconditions.checkArgument; -import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.annotations.Experimental; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKey; import com.google.cloud.dataflow.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessCreatePCollectionView; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; -import com.google.cloud.dataflow.sdk.util.ExecutionContext; -import com.google.cloud.dataflow.sdk.util.SideInputReader; import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; -import com.google.cloud.dataflow.sdk.util.common.CounterSet; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.cloud.dataflow.sdk.values.PValue; import com.google.common.collect.ImmutableMap; import org.joda.time.Instant; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -82,6 +73,11 @@ public class InProcessPipelineRunner { * @param the type of elements that can be added to this bundle */ public static interface UncommittedBundle { + /** + * Returns the PCollection that the elements of this bundle belong to. + */ + PCollection getPCollection(); + /** * Outputs an element to this bundle. * @@ -110,7 +106,7 @@ public static interface UncommittedBundle { public static interface CommittedBundle { /** - * @return the PCollection that the elements of this bundle belong to + * Returns the PCollection that the elements of this bundle belong to. */ PCollection getPCollection(); @@ -154,84 +150,22 @@ public static interface PCollectionViewWriter { void add(Iterable> values); } - /** - * The evaluation context for the {@link InProcessPipelineRunner}. Contains state shared within - * the current evaluation. - */ - public static interface InProcessEvaluationContext { - /** - * Create a {@link UncommittedBundle} for use by a source. - */ - UncommittedBundle createRootBundle(PCollection output); - - /** - * Create a {@link UncommittedBundle} whose elements belong to the specified {@link - * PCollection}. - */ - UncommittedBundle createBundle(CommittedBundle input, PCollection output); - - /** - * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by - * {@link GroupByKeyOnly} {@link PTransform PTransforms}. - */ - UncommittedBundle createKeyedBundle( - CommittedBundle input, Object key, PCollection output); - - /** - * Create a bundle whose elements will be used in a PCollectionView. - */ - PCollectionViewWriter createPCollectionViewWriter( - PCollection> input, PCollectionView output); - - /** - * Get the options used by this {@link Pipeline}. - */ - InProcessPipelineOptions getPipelineOptions(); - - /** - * Get an {@link ExecutionContext} for the provided application. - */ - InProcessExecutionContext getExecutionContext( - AppliedPTransform application, @Nullable Object key); - - /** - * Get the Step Name for the provided application. - */ - String getStepName(AppliedPTransform application); - - /** - * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to - * read - * @return a {@link SideInputReader} that can read all of the provided - * {@link PCollectionView PCollectionViews} - */ - SideInputReader createSideInputReader(List> sideInputs); + //////////////////////////////////////////////////////////////////////////////////////////////// + private final InProcessPipelineOptions options; - /** - * Schedules a callback after the watermark for a {@link PValue} after the trigger for the - * specified window (with the specified windowing strategy) must have fired from the perspective - * of that {@link PValue}, as specified by the value of - * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the - * {@link WindowingStrategy}. - */ - void callAfterOutputMustHaveBeenProduced(PValue value, BoundedWindow window, - WindowingStrategy windowingStrategy, Runnable runnable); + public static InProcessPipelineRunner fromOptions(PipelineOptions options) { + return new InProcessPipelineRunner(options.as(InProcessPipelineOptions.class)); + } - /** - * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent - * of all other {@link CounterSet CounterSets} created by this call. - * - * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in - * all created {@link CounterSet CounterSets} when the transforms that call this method - * complete. - */ - CounterSet createCounterSet(); + private InProcessPipelineRunner(InProcessPipelineOptions options) { + this.options = options; + } - /** - * Returns all of the counters that have been merged into this context via calls to - * {@link CounterSet#merge(CounterSet)}. - */ - CounterSet getCounters(); + /** + * Returns the {@link PipelineOptions} used to create this {@link InProcessPipelineRunner}. + */ + public InProcessPipelineOptions getPipelineOptions() { + return options; } /** diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java index bf9a2e1c53..37c9fcfa65 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainer.java @@ -17,7 +17,6 @@ import static com.google.common.base.Preconditions.checkArgument; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; import com.google.cloud.dataflow.sdk.util.PCollectionViewWindow; @@ -26,6 +25,7 @@ import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.values.PCollectionView; import com.google.common.base.MoreObjects; +import com.google.common.base.Throwables; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -89,7 +89,7 @@ private InProcessSideInputContainer(InProcessEvaluationContext context, * the provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without * casting, but will change as this {@link InProcessSideInputContainer} is modified. */ - public SideInputReader withViews(Collection> newContainedViews) { + public SideInputReader createReaderForViews(Collection> newContainedViews) { if (!containedViews.containsAll(newContainedViews)) { Set> currentlyContained = ImmutableSet.copyOf(containedViews); Set> newRequested = ImmutableSet.copyOf(newContainedViews); @@ -108,8 +108,20 @@ public SideInputReader withViews(Collection> newContainedView * *

The provided iterable is expected to contain only a single window and pane. */ - public void write(PCollectionView view, Iterable> values) - throws ExecutionException { + public void write(PCollectionView view, Iterable> values) { + Map>> valuesPerWindow = + indexValuesByWindow(values); + for (Map.Entry>> windowValues : + valuesPerWindow.entrySet()) { + updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue()); + } + } + + /** + * Index the provided values by all {@link BoundedWindow windows} in which they appear. + */ + private Map>> indexValuesByWindow( + Iterable> values) { Map>> valuesPerWindow = new HashMap<>(); for (WindowedValue value : values) { for (BoundedWindow window : value.getWindows()) { @@ -121,29 +133,40 @@ public void write(PCollectionView view, Iterable> windowValues.add(value); } } - for (Map.Entry>> windowValues : - valuesPerWindow.entrySet()) { - PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, windowValues.getKey()); - SettableFuture>> future = viewByWindows.get(windowedView); + return valuesPerWindow; + } + + /** + * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the + * specified values, if the values are part of a later pane than currently exist within the + * {@link PCollectionViewWindow}. + */ + private void updatePCollectionViewWindowValues( + PCollectionView view, BoundedWindow window, Collection> windowValues) { + PCollectionViewWindow windowedView = PCollectionViewWindow.of(view, window); + SettableFuture>> future = null; + try { + future = viewByWindows.get(windowedView); if (future.isDone()) { - try { - Iterator> existingValues = future.get().iterator(); - PaneInfo newPane = windowValues.getValue().iterator().next().getPane(); - // The current value may have no elements, if no elements were produced for the window, - // but we are recieving late data. - if (!existingValues.hasNext() - || newPane.getIndex() > existingValues.next().getPane().getIndex()) { - viewByWindows.invalidate(windowedView); - viewByWindows.get(windowedView).set(windowValues.getValue()); - } - } catch (InterruptedException e) { - // TODO: Handle meaningfully. This should never really happen when the result remains - // useful, but the result could be available and the thread can still be interrupted. - Thread.currentThread().interrupt(); + Iterator> existingValues = future.get().iterator(); + PaneInfo newPane = windowValues.iterator().next().getPane(); + // The current value may have no elements, if no elements were produced for the window, + // but we are recieving late data. + if (!existingValues.hasNext() + || newPane.getIndex() > existingValues.next().getPane().getIndex()) { + viewByWindows.invalidate(windowedView); + viewByWindows.get(windowedView).set(windowValues); } } else { - future.set(windowValues.getValue()); + future.set(windowValues); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (future != null && !future.isDone()) { + future.set(Collections.>emptyList()); } + } catch (ExecutionException e) { + Throwables.propagate(e.getCause()); } } @@ -165,7 +188,7 @@ public T get(final PCollectionView view, final BoundedWindow window) { viewByWindows.get(windowedView); WindowingStrategy windowingStrategy = view.getWindowingStrategyInternal(); - evaluationContext.callAfterOutputMustHaveBeenProduced( + evaluationContext.scheduleAfterOutputWouldBeProduced( view, window, windowingStrategy, new Runnable() { @Override public void run() { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java index e3ae1a028c..24142c2151 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java @@ -17,7 +17,6 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java index cd79c219bd..af5914bab0 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java @@ -17,7 +17,6 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java new file mode 100644 index 0000000000..15955724eb --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/StepAndKey.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.common.base.MoreObjects; + +import java.util.Objects; + +/** + * A (Step, Key) pair. This is useful as a map key or cache key for things that are available + * per-step in a keyed manner (e.g. State). + */ +final class StepAndKey { + private final AppliedPTransform step; + private final Object key; + + /** + * Create a new {@link StepAndKey} with the provided step and key. + */ + public static StepAndKey of(AppliedPTransform step, Object key) { + return new StepAndKey(step, key); + } + + private StepAndKey(AppliedPTransform step, Object key) { + this.step = step; + this.key = key; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(StepAndKey.class) + .add("step", step.getFullName()) + .add("key", key) + .toString(); + } + + @Override + public int hashCode() { + return Objects.hash(step, key); + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } else if (!(other instanceof StepAndKey)) { + return false; + } else { + StepAndKey that = (StepAndKey) other; + return Objects.equals(this.step, that.step) + && Objects.equals(this.key, that.key); + } + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java index 3b672e0def..860ddfe48f 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorFactory.java @@ -16,7 +16,6 @@ package com.google.cloud.dataflow.sdk.runners.inprocess; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.PTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java new file mode 100644 index 0000000000..0c8cb7e80a --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Flatten.FlattenPCollectionList; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +import javax.annotation.Nullable; + +/** + * A {@link TransformEvaluatorFactory} that delegates to primitive {@link TransformEvaluatorFactory} + * implementations based on the type of {@link PTransform} of the application. + */ +class TransformEvaluatorRegistry implements TransformEvaluatorFactory { + public static TransformEvaluatorRegistry defaultRegistry() { + @SuppressWarnings("rawtypes") + ImmutableMap, TransformEvaluatorFactory> primitives = + ImmutableMap., TransformEvaluatorFactory>builder() + .put(Read.Bounded.class, new BoundedReadEvaluatorFactory()) + .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory()) + .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory()) + .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory()) + .put( + GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class, + new GroupByKeyEvaluatorFactory()) + .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory()) + .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory()) + .build(); + return new TransformEvaluatorRegistry(primitives); + } + + // the TransformEvaluatorFactories can construct instances of all generic types of transform, + // so all instances of a primitive can be handled with the same evaluator factory. + @SuppressWarnings("rawtypes") + private final Map, TransformEvaluatorFactory> factories; + + private TransformEvaluatorRegistry( + @SuppressWarnings("rawtypes") + Map, TransformEvaluatorFactory> factories) { + this.factories = factories; + } + + @Override + public TransformEvaluator forApplication( + AppliedPTransform application, + @Nullable CommittedBundle inputBundle, + InProcessEvaluationContext evaluationContext) + throws Exception { + TransformEvaluatorFactory factory = factories.get(application.getTransform().getClass()); + return factory.forApplication(application, inputBundle, evaluationContext); + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java index 4beac337d6..97f0e25d38 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java @@ -21,7 +21,6 @@ import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.PTransform; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java index f47cd1de98..314d81f6aa 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java @@ -17,7 +17,6 @@ import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java new file mode 100644 index 0000000000..27d59b9a64 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutor.java @@ -0,0 +1,143 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Ordering; + +import org.joda.time.Instant; + +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Executes callbacks that occur based on the progression of the watermark per-step. + * + *

Callbacks are registered by calls to + * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)}, + * and are executed after a call to {@link #fireForWatermark(AppliedPTransform, Instant)} with the + * same {@link AppliedPTransform} and a watermark sufficient to ensure that the trigger for the + * windowing strategy would have been produced. + * + *

NOTE: {@link WatermarkCallbackExecutor} does not track the latest observed watermark for any + * {@link AppliedPTransform} - any call to + * {@link #callOnGuaranteedFiring(AppliedPTransform, BoundedWindow, WindowingStrategy, Runnable)} + * that could have potentially already fired should be followed by a call to + * {@link #fireForWatermark(AppliedPTransform, Instant)} for the same transform with the current + * value of the watermark. + */ +class WatermarkCallbackExecutor { + /** + * Create a new {@link WatermarkCallbackExecutor}. + */ + public static WatermarkCallbackExecutor create() { + return new WatermarkCallbackExecutor(); + } + + private final ConcurrentMap, PriorityQueue> + callbacks; + private final ExecutorService executor; + + private WatermarkCallbackExecutor() { + this.callbacks = new ConcurrentHashMap<>(); + this.executor = Executors.newSingleThreadExecutor(); + } + + /** + * Execute the provided {@link Runnable} after the next call to + * {@link #fireForWatermark(AppliedPTransform, Instant)} where the window is guaranteed to have + * produced output. + */ + public void callOnGuaranteedFiring( + AppliedPTransform step, + BoundedWindow window, + WindowingStrategy windowingStrategy, + Runnable runnable) { + WatermarkCallback callback = + WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable); + + PriorityQueue callbackQueue = callbacks.get(step); + if (callbackQueue == null) { + callbackQueue = new PriorityQueue<>(11, new CallbackOrdering()); + if (callbacks.putIfAbsent(step, callbackQueue) != null) { + callbackQueue = callbacks.get(step); + } + } + + synchronized (callbackQueue) { + callbackQueue.offer(callback); + } + } + + /** + * Schedule all pending callbacks that must have produced output by the time of the provided + * watermark. + */ + public void fireForWatermark(AppliedPTransform step, Instant watermark) { + PriorityQueue callbackQueue = callbacks.get(step); + if (callbackQueue == null) { + return; + } + synchronized (callbackQueue) { + while (!callbackQueue.isEmpty() && callbackQueue.peek().shouldFire(watermark)) { + executor.submit(callbackQueue.poll().getCallback()); + } + } + } + + private static class WatermarkCallback { + public static WatermarkCallback onGuaranteedFiring( + BoundedWindow window, WindowingStrategy strategy, Runnable callback) { + @SuppressWarnings("unchecked") + Instant firingAfter = + strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window); + return new WatermarkCallback(firingAfter, callback); + } + + private final Instant fireAfter; + private final Runnable callback; + + private WatermarkCallback(Instant fireAfter, Runnable callback) { + this.fireAfter = fireAfter; + this.callback = callback; + } + + public boolean shouldFire(Instant currentWatermark) { + return currentWatermark.isAfter(fireAfter) + || currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE); + } + + public Runnable getCallback() { + return callback; + } + } + + private static class CallbackOrdering extends Ordering { + @Override + public int compare(WatermarkCallback left, WatermarkCallback right) { + return ComparisonChain.start() + .compare(left.fireAfter, right.fireAfter) + .compare(left.callback, right.callback, Ordering.arbitrary()) + .result(); + } + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java index cc0347a124..b8d20e303f 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java @@ -1690,21 +1690,9 @@ public List> getSideInputs() { @Override public PCollection> apply(PCollection> input) { - if (fn instanceof RequiresContextInternal) { - return input - .apply(GroupByKey.create(fewKeys)) - .apply(ParDo.of(new DoFn>, KV>>() { - @Override - public void processElement(ProcessContext c) throws Exception { - c.output(c.element()); - } - })) - .apply(Combine.groupedValues(fn).withSideInputs(sideInputs)); - } else { - return input - .apply(GroupByKey.create(fewKeys)) - .apply(Combine.groupedValues(fn).withSideInputs(sideInputs)); - } + return input + .apply(GroupByKey.create(fewKeys)) + .apply(Combine.groupedValues(fn).withSideInputs(sideInputs)); } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java new file mode 100644 index 0000000000..656c010d91 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/CombineFns.java @@ -0,0 +1,1100 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.transforms; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.coders.CoderRegistry; +import com.google.cloud.dataflow.sdk.coders.StandardCoder; +import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn; +import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn; +import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.GlobalCombineFn; +import com.google.cloud.dataflow.sdk.transforms.CombineFnBase.PerKeyCombineFn; +import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.CombineFnWithContext; +import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.Context; +import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import com.google.cloud.dataflow.sdk.util.PropertyNames; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * Static utility methods that create combine function instances. + */ +public class CombineFns { + + /** + * Returns a {@link ComposeKeyedCombineFnBuilder} to construct a composed + * {@link PerKeyCombineFn}. + * + *

The same {@link TupleTag} cannot be used in a composition multiple times. + * + *

Example: + *

{ @code
+   * PCollection> latencies = ...;
+   *
+   * TupleTag maxLatencyTag = new TupleTag();
+   * TupleTag meanLatencyTag = new TupleTag();
+   *
+   * SimpleFunction identityFn =
+   *     new SimpleFunction() {
+   *       @Override
+   *       public Integer apply(Integer input) {
+   *           return input;
+   *       }};
+   * PCollection> maxAndMean = latencies.apply(
+   *     Combine.perKey(
+   *         CombineFns.composeKeyed()
+   *            .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
+   *            .with(identityFn, new MeanFn(), meanLatencyTag)));
+   *
+   * PCollection finalResultCollection = maxAndMean
+   *     .apply(ParDo.of(
+   *         new DoFn, T>() {
+   *           @Override
+   *           public void processElement(ProcessContext c) throws Exception {
+   *             KV e = c.element();
+   *             Integer maxLatency = e.getValue().get(maxLatencyTag);
+   *             Double meanLatency = e.getValue().get(meanLatencyTag);
+   *             .... Do Something ....
+   *             c.output(...some T...);
+   *           }
+   *         }));
+   * } 
+ */ + public static ComposeKeyedCombineFnBuilder composeKeyed() { + return new ComposeKeyedCombineFnBuilder(); + } + + /** + * Returns a {@link ComposeCombineFnBuilder} to construct a composed + * {@link GlobalCombineFn}. + * + *

The same {@link TupleTag} cannot be used in a composition multiple times. + * + *

Example: + *

{ @code
+   * PCollection globalLatencies = ...;
+   *
+   * TupleTag maxLatencyTag = new TupleTag();
+   * TupleTag meanLatencyTag = new TupleTag();
+   *
+   * SimpleFunction identityFn =
+   *     new SimpleFunction() {
+   *       @Override
+   *       public Integer apply(Integer input) {
+   *           return input;
+   *       }};
+   * PCollection maxAndMean = globalLatencies.apply(
+   *     Combine.globally(
+   *         CombineFns.compose()
+   *            .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
+   *            .with(identityFn, new MeanFn(), meanLatencyTag)));
+   *
+   * PCollection finalResultCollection = maxAndMean
+   *     .apply(ParDo.of(
+   *         new DoFn() {
+   *           @Override
+   *           public void processElement(ProcessContext c) throws Exception {
+   *             CoCombineResult e = c.element();
+   *             Integer maxLatency = e.get(maxLatencyTag);
+   *             Double meanLatency = e.get(meanLatencyTag);
+   *             .... Do Something ....
+   *             c.output(...some T...);
+   *           }
+   *         }));
+   * } 
+ */ + public static ComposeCombineFnBuilder compose() { + return new ComposeCombineFnBuilder(); + } + + ///////////////////////////////////////////////////////////////////////////// + + /** + * A builder class to construct a composed {@link PerKeyCombineFn}. + */ + public static class ComposeKeyedCombineFnBuilder { + /** + * Returns a {@link ComposedKeyedCombineFn} that can take additional + * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function. + * + *

The {@link ComposedKeyedCombineFn} extracts inputs from {@code DataT} with + * the {@code extractInputFn} and combines them with the {@code keyedCombineFn}, + * and then it outputs each combined value with a {@link TupleTag} to a + * {@link CoCombineResult}. + */ + public ComposedKeyedCombineFn with( + SimpleFunction extractInputFn, + KeyedCombineFn keyedCombineFn, + TupleTag outputTag) { + return new ComposedKeyedCombineFn() + .with(extractInputFn, keyedCombineFn, outputTag); + } + + /** + * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional + * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function. + * + *

The {@link ComposedKeyedCombineFnWithContext} extracts inputs from {@code DataT} with + * the {@code extractInputFn} and combines them with the {@code keyedCombineFnWithContext}, + * and then it outputs each combined value with a {@link TupleTag} to a + * {@link CoCombineResult}. + */ + public ComposedKeyedCombineFnWithContext with( + SimpleFunction extractInputFn, + KeyedCombineFnWithContext keyedCombineFnWithContext, + TupleTag outputTag) { + return new ComposedKeyedCombineFnWithContext() + .with(extractInputFn, keyedCombineFnWithContext, outputTag); + } + + /** + * Returns a {@link ComposedKeyedCombineFn} that can take additional + * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function. + */ + public ComposedKeyedCombineFn with( + SimpleFunction extractInputFn, + CombineFn combineFn, + TupleTag outputTag) { + return with(extractInputFn, combineFn.asKeyedFn(), outputTag); + } + + /** + * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional + * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function. + */ + public ComposedKeyedCombineFnWithContext with( + SimpleFunction extractInputFn, + CombineFnWithContext combineFnWithContext, + TupleTag outputTag) { + return with(extractInputFn, combineFnWithContext.asKeyedFn(), outputTag); + } + } + + /** + * A builder class to construct a composed {@link GlobalCombineFn}. + */ + public static class ComposeCombineFnBuilder { + /** + * Returns a {@link ComposedCombineFn} that can take additional + * {@link GlobalCombineFn GlobalCombineFns} and apply them as a single combine function. + * + *

The {@link ComposedCombineFn} extracts inputs from {@code DataT} with + * the {@code extractInputFn} and combines them with the {@code combineFn}, + * and then it outputs each combined value with a {@link TupleTag} to a + * {@link CoCombineResult}. + */ + public ComposedCombineFn with( + SimpleFunction extractInputFn, + CombineFn combineFn, + TupleTag outputTag) { + return new ComposedCombineFn() + .with(extractInputFn, combineFn, outputTag); + } + + /** + * Returns a {@link ComposedCombineFnWithContext} that can take additional + * {@link GlobalCombineFn GlobalCombineFns} and apply them as a single combine function. + * + *

The {@link ComposedCombineFnWithContext} extracts inputs from {@code DataT} with + * the {@code extractInputFn} and combines them with the {@code combineFnWithContext}, + * and then it outputs each combined value with a {@link TupleTag} to a + * {@link CoCombineResult}. + */ + public ComposedCombineFnWithContext with( + SimpleFunction extractInputFn, + CombineFnWithContext combineFnWithContext, + TupleTag outputTag) { + return new ComposedCombineFnWithContext() + .with(extractInputFn, combineFnWithContext, outputTag); + } + } + + ///////////////////////////////////////////////////////////////////////////// + + /** + * A tuple of outputs produced by a composed combine functions. + * + *

See {@link #compose()} or {@link #composeKeyed()}) for details. + */ + public static class CoCombineResult implements Serializable { + + private enum NullValue { + INSTANCE; + } + + private final Map, Object> valuesMap; + + /** + * The constructor of {@link CoCombineResult}. + * + *

Null values should have been filtered out from the {@code valuesMap}. + * {@link TupleTag TupleTags} that associate with null values doesn't exist in the key set of + * {@code valuesMap}. + * + * @throws NullPointerException if any key or value in {@code valuesMap} is null + */ + CoCombineResult(Map, Object> valuesMap) { + ImmutableMap.Builder, Object> builder = ImmutableMap.builder(); + for (Entry, Object> entry : valuesMap.entrySet()) { + if (entry.getValue() != null) { + builder.put(entry); + } else { + builder.put(entry.getKey(), NullValue.INSTANCE); + } + } + this.valuesMap = builder.build(); + } + + /** + * Returns the value represented by the given {@link TupleTag}. + * + *

It is an error to request a non-exist tuple tag from the {@link CoCombineResult}. + */ + @SuppressWarnings("unchecked") + public V get(TupleTag tag) { + checkArgument( + valuesMap.keySet().contains(tag), "TupleTag " + tag + " is not in the CoCombineResult"); + Object value = valuesMap.get(tag); + if (value == NullValue.INSTANCE) { + return null; + } else { + return (V) value; + } + } + } + + ///////////////////////////////////////////////////////////////////////////// + + /** + * A composed {@link CombineFn} that applies multiple {@link CombineFn CombineFns}. + * + *

For each {@link CombineFn} it extracts inputs from {@code DataT} with + * the {@code extractInputFn} and combines them, + * and then it outputs each combined value with a {@link TupleTag} to a + * {@link CoCombineResult}. + */ + public static class ComposedCombineFn extends CombineFn { + + private final List> combineFns; + private final List> extractInputFns; + private final List> outputTags; + private final int combineFnCount; + + private ComposedCombineFn() { + this.extractInputFns = ImmutableList.of(); + this.combineFns = ImmutableList.of(); + this.outputTags = ImmutableList.of(); + this.combineFnCount = 0; + } + + private ComposedCombineFn( + ImmutableList> extractInputFns, + ImmutableList> combineFns, + ImmutableList> outputTags) { + @SuppressWarnings({"unchecked", "rawtypes"}) + List> castedExtractInputFns = (List) extractInputFns; + this.extractInputFns = castedExtractInputFns; + + @SuppressWarnings({"unchecked", "rawtypes"}) + List> castedCombineFns = (List) combineFns; + this.combineFns = castedCombineFns; + + this.outputTags = outputTags; + this.combineFnCount = this.combineFns.size(); + } + + /** + * Returns a {@link ComposedCombineFn} with an additional {@link CombineFn}. + */ + public ComposedCombineFn with( + SimpleFunction extractInputFn, + CombineFn combineFn, + TupleTag outputTag) { + checkUniqueness(outputTags, outputTag); + return new ComposedCombineFn<>( + ImmutableList.>builder() + .addAll(extractInputFns) + .add(extractInputFn) + .build(), + ImmutableList.>builder() + .addAll(combineFns) + .add(combineFn) + .build(), + ImmutableList.>builder() + .addAll(outputTags) + .add(outputTag) + .build()); + } + + /** + * Returns a {@link ComposedCombineFnWithContext} with an additional + * {@link CombineFnWithContext}. + */ + public ComposedCombineFnWithContext with( + SimpleFunction extractInputFn, + CombineFnWithContext combineFn, + TupleTag outputTag) { + checkUniqueness(outputTags, outputTag); + List> fnsWithContext = Lists.newArrayList(); + for (CombineFn fn : combineFns) { + fnsWithContext.add(toFnWithContext(fn)); + } + return new ComposedCombineFnWithContext<>( + ImmutableList.>builder() + .addAll(extractInputFns) + .add(extractInputFn) + .build(), + ImmutableList.>builder() + .addAll(fnsWithContext) + .add(combineFn) + .build(), + ImmutableList.>builder() + .addAll(outputTags) + .add(outputTag) + .build()); + } + + @Override + public Object[] createAccumulator() { + Object[] accumsArray = new Object[combineFnCount]; + for (int i = 0; i < combineFnCount; ++i) { + accumsArray[i] = combineFns.get(i).createAccumulator(); + } + return accumsArray; + } + + @Override + public Object[] addInput(Object[] accumulator, DataT value) { + for (int i = 0; i < combineFnCount; ++i) { + Object input = extractInputFns.get(i).apply(value); + accumulator[i] = combineFns.get(i).addInput(accumulator[i], input); + } + return accumulator; + } + + @Override + public Object[] mergeAccumulators(Iterable accumulators) { + Iterator iter = accumulators.iterator(); + if (!iter.hasNext()) { + return createAccumulator(); + } else { + // Reuses the first accumulator, and overwrites its values. + // It is safe because {@code accum[i]} only depends on + // the i-th component of each accumulator. + Object[] accum = iter.next(); + for (int i = 0; i < combineFnCount; ++i) { + accum[i] = combineFns.get(i).mergeAccumulators(new ProjectionIterable(accumulators, i)); + } + return accum; + } + } + + @Override + public CoCombineResult extractOutput(Object[] accumulator) { + Map, Object> valuesMap = Maps.newHashMap(); + for (int i = 0; i < combineFnCount; ++i) { + valuesMap.put( + outputTags.get(i), + combineFns.get(i).extractOutput(accumulator[i])); + } + return new CoCombineResult(valuesMap); + } + + @Override + public Object[] compact(Object[] accumulator) { + for (int i = 0; i < combineFnCount; ++i) { + accumulator[i] = combineFns.get(i).compact(accumulator[i]); + } + return accumulator; + } + + @Override + public Coder getAccumulatorCoder(CoderRegistry registry, Coder dataCoder) + throws CannotProvideCoderException { + List> coders = Lists.newArrayList(); + for (int i = 0; i < combineFnCount; ++i) { + Coder inputCoder = + registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder); + coders.add(combineFns.get(i).getAccumulatorCoder(registry, inputCoder)); + } + return new ComposedAccumulatorCoder(coders); + } + } + + /** + * A composed {@link CombineFnWithContext} that applies multiple + * {@link CombineFnWithContext CombineFnWithContexts}. + * + *

For each {@link CombineFnWithContext} it extracts inputs from {@code DataT} with + * the {@code extractInputFn} and combines them, + * and then it outputs each combined value with a {@link TupleTag} to a + * {@link CoCombineResult}. + */ + public static class ComposedCombineFnWithContext + extends CombineFnWithContext { + + private final List> extractInputFns; + private final List> combineFnWithContexts; + private final List> outputTags; + private final int combineFnCount; + + private ComposedCombineFnWithContext() { + this.extractInputFns = ImmutableList.of(); + this.combineFnWithContexts = ImmutableList.of(); + this.outputTags = ImmutableList.of(); + this.combineFnCount = 0; + } + + private ComposedCombineFnWithContext( + ImmutableList> extractInputFns, + ImmutableList> combineFnWithContexts, + ImmutableList> outputTags) { + @SuppressWarnings({"unchecked", "rawtypes"}) + List> castedExtractInputFns = + (List) extractInputFns; + this.extractInputFns = castedExtractInputFns; + + @SuppressWarnings({"rawtypes", "unchecked"}) + List> castedCombineFnWithContexts + = (List) combineFnWithContexts; + this.combineFnWithContexts = castedCombineFnWithContexts; + + this.outputTags = outputTags; + this.combineFnCount = this.combineFnWithContexts.size(); + } + + /** + * Returns a {@link ComposedCombineFnWithContext} with an additional {@link GlobalCombineFn}. + */ + public ComposedCombineFnWithContext with( + SimpleFunction extractInputFn, + GlobalCombineFn globalCombineFn, + TupleTag outputTag) { + checkUniqueness(outputTags, outputTag); + return new ComposedCombineFnWithContext<>( + ImmutableList.>builder() + .addAll(extractInputFns) + .add(extractInputFn) + .build(), + ImmutableList.>builder() + .addAll(combineFnWithContexts) + .add(toFnWithContext(globalCombineFn)) + .build(), + ImmutableList.>builder() + .addAll(outputTags) + .add(outputTag) + .build()); + } + + @Override + public Object[] createAccumulator(Context c) { + Object[] accumsArray = new Object[combineFnCount]; + for (int i = 0; i < combineFnCount; ++i) { + accumsArray[i] = combineFnWithContexts.get(i).createAccumulator(c); + } + return accumsArray; + } + + @Override + public Object[] addInput(Object[] accumulator, DataT value, Context c) { + for (int i = 0; i < combineFnCount; ++i) { + Object input = extractInputFns.get(i).apply(value); + accumulator[i] = combineFnWithContexts.get(i).addInput(accumulator[i], input, c); + } + return accumulator; + } + + @Override + public Object[] mergeAccumulators(Iterable accumulators, Context c) { + Iterator iter = accumulators.iterator(); + if (!iter.hasNext()) { + return createAccumulator(c); + } else { + // Reuses the first accumulator, and overwrites its values. + // It is safe because {@code accum[i]} only depends on + // the i-th component of each accumulator. + Object[] accum = iter.next(); + for (int i = 0; i < combineFnCount; ++i) { + accum[i] = combineFnWithContexts.get(i).mergeAccumulators( + new ProjectionIterable(accumulators, i), c); + } + return accum; + } + } + + @Override + public CoCombineResult extractOutput(Object[] accumulator, Context c) { + Map, Object> valuesMap = Maps.newHashMap(); + for (int i = 0; i < combineFnCount; ++i) { + valuesMap.put( + outputTags.get(i), + combineFnWithContexts.get(i).extractOutput(accumulator[i], c)); + } + return new CoCombineResult(valuesMap); + } + + @Override + public Object[] compact(Object[] accumulator, Context c) { + for (int i = 0; i < combineFnCount; ++i) { + accumulator[i] = combineFnWithContexts.get(i).compact(accumulator[i], c); + } + return accumulator; + } + + @Override + public Coder getAccumulatorCoder(CoderRegistry registry, Coder dataCoder) + throws CannotProvideCoderException { + List> coders = Lists.newArrayList(); + for (int i = 0; i < combineFnCount; ++i) { + Coder inputCoder = + registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder); + coders.add(combineFnWithContexts.get(i).getAccumulatorCoder(registry, inputCoder)); + } + return new ComposedAccumulatorCoder(coders); + } + } + + /** + * A composed {@link KeyedCombineFn} that applies multiple {@link KeyedCombineFn KeyedCombineFns}. + * + *

For each {@link KeyedCombineFn} it extracts inputs from {@code DataT} with + * the {@code extractInputFn} and combines them, + * and then it outputs each combined value with a {@link TupleTag} to a + * {@link CoCombineResult}. + */ + public static class ComposedKeyedCombineFn + extends KeyedCombineFn { + + private final List> extractInputFns; + private final List> keyedCombineFns; + private final List> outputTags; + private final int combineFnCount; + + private ComposedKeyedCombineFn() { + this.extractInputFns = ImmutableList.of(); + this.keyedCombineFns = ImmutableList.of(); + this.outputTags = ImmutableList.of(); + this.combineFnCount = 0; + } + + private ComposedKeyedCombineFn( + ImmutableList> extractInputFns, + ImmutableList> keyedCombineFns, + ImmutableList> outputTags) { + @SuppressWarnings({"unchecked", "rawtypes"}) + List> castedExtractInputFns = (List) extractInputFns; + this.extractInputFns = castedExtractInputFns; + + @SuppressWarnings({"unchecked", "rawtypes"}) + List> castedKeyedCombineFns = + (List) keyedCombineFns; + this.keyedCombineFns = castedKeyedCombineFns; + this.outputTags = outputTags; + this.combineFnCount = this.keyedCombineFns.size(); + } + + /** + * Returns a {@link ComposedKeyedCombineFn} with an additional {@link KeyedCombineFn}. + */ + public ComposedKeyedCombineFn with( + SimpleFunction extractInputFn, + KeyedCombineFn keyedCombineFn, + TupleTag outputTag) { + checkUniqueness(outputTags, outputTag); + return new ComposedKeyedCombineFn<>( + ImmutableList.>builder() + .addAll(extractInputFns) + .add(extractInputFn) + .build(), + ImmutableList.>builder() + .addAll(keyedCombineFns) + .add(keyedCombineFn) + .build(), + ImmutableList.>builder() + .addAll(outputTags) + .add(outputTag) + .build()); + } + + /** + * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional + * {@link KeyedCombineFnWithContext}. + */ + public ComposedKeyedCombineFnWithContext with( + SimpleFunction extractInputFn, + KeyedCombineFnWithContext keyedCombineFn, + TupleTag outputTag) { + checkUniqueness(outputTags, outputTag); + List> fnsWithContext = + Lists.newArrayList(); + for (KeyedCombineFn fn : keyedCombineFns) { + fnsWithContext.add(toFnWithContext(fn)); + } + return new ComposedKeyedCombineFnWithContext<>( + ImmutableList.>builder() + .addAll(extractInputFns) + .add(extractInputFn) + .build(), + ImmutableList.>builder() + .addAll(fnsWithContext) + .add(keyedCombineFn) + .build(), + ImmutableList.>builder() + .addAll(outputTags) + .add(outputTag) + .build()); + } + + /** + * Returns a {@link ComposedKeyedCombineFn} with an additional {@link CombineFn}. + */ + public ComposedKeyedCombineFn with( + SimpleFunction extractInputFn, + CombineFn keyedCombineFn, + TupleTag outputTag) { + return with(extractInputFn, keyedCombineFn.asKeyedFn(), outputTag); + } + + /** + * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional + * {@link CombineFnWithContext}. + */ + public ComposedKeyedCombineFnWithContext with( + SimpleFunction extractInputFn, + CombineFnWithContext keyedCombineFn, + TupleTag outputTag) { + return with(extractInputFn, keyedCombineFn.asKeyedFn(), outputTag); + } + + @Override + public Object[] createAccumulator(K key) { + Object[] accumsArray = new Object[combineFnCount]; + for (int i = 0; i < combineFnCount; ++i) { + accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key); + } + return accumsArray; + } + + @Override + public Object[] addInput(K key, Object[] accumulator, DataT value) { + for (int i = 0; i < combineFnCount; ++i) { + Object input = extractInputFns.get(i).apply(value); + accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input); + } + return accumulator; + } + + @Override + public Object[] mergeAccumulators(K key, final Iterable accumulators) { + Iterator iter = accumulators.iterator(); + if (!iter.hasNext()) { + return createAccumulator(key); + } else { + // Reuses the first accumulator, and overwrites its values. + // It is safe because {@code accum[i]} only depends on + // the i-th component of each accumulator. + Object[] accum = iter.next(); + for (int i = 0; i < combineFnCount; ++i) { + accum[i] = keyedCombineFns.get(i).mergeAccumulators( + key, new ProjectionIterable(accumulators, i)); + } + return accum; + } + } + + @Override + public CoCombineResult extractOutput(K key, Object[] accumulator) { + Map, Object> valuesMap = Maps.newHashMap(); + for (int i = 0; i < combineFnCount; ++i) { + valuesMap.put( + outputTags.get(i), + keyedCombineFns.get(i).extractOutput(key, accumulator[i])); + } + return new CoCombineResult(valuesMap); + } + + @Override + public Object[] compact(K key, Object[] accumulator) { + for (int i = 0; i < combineFnCount; ++i) { + accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i]); + } + return accumulator; + } + + @Override + public Coder getAccumulatorCoder( + CoderRegistry registry, Coder keyCoder, Coder dataCoder) + throws CannotProvideCoderException { + List> coders = Lists.newArrayList(); + for (int i = 0; i < combineFnCount; ++i) { + Coder inputCoder = + registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder); + coders.add(keyedCombineFns.get(i).getAccumulatorCoder(registry, keyCoder, inputCoder)); + } + return new ComposedAccumulatorCoder(coders); + } + } + + /** + * A composed {@link KeyedCombineFnWithContext} that applies multiple + * {@link KeyedCombineFnWithContext KeyedCombineFnWithContexts}. + * + *

For each {@link KeyedCombineFnWithContext} it extracts inputs from {@code DataT} with + * the {@code extractInputFn} and combines them, + * and then it outputs each combined value with a {@link TupleTag} to a + * {@link CoCombineResult}. + */ + public static class ComposedKeyedCombineFnWithContext + extends KeyedCombineFnWithContext { + + private final List> extractInputFns; + private final List> keyedCombineFns; + private final List> outputTags; + private final int combineFnCount; + + private ComposedKeyedCombineFnWithContext() { + this.extractInputFns = ImmutableList.of(); + this.keyedCombineFns = ImmutableList.of(); + this.outputTags = ImmutableList.of(); + this.combineFnCount = 0; + } + + private ComposedKeyedCombineFnWithContext( + ImmutableList> extractInputFns, + ImmutableList> keyedCombineFns, + ImmutableList> outputTags) { + @SuppressWarnings({"unchecked", "rawtypes"}) + List> castedExtractInputFns = + (List) extractInputFns; + this.extractInputFns = castedExtractInputFns; + + @SuppressWarnings({"unchecked", "rawtypes"}) + List> castedKeyedCombineFns = + (List) keyedCombineFns; + this.keyedCombineFns = castedKeyedCombineFns; + this.outputTags = outputTags; + this.combineFnCount = this.keyedCombineFns.size(); + } + + /** + * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional + * {@link PerKeyCombineFn}. + */ + public ComposedKeyedCombineFnWithContext with( + SimpleFunction extractInputFn, + PerKeyCombineFn perKeyCombineFn, + TupleTag outputTag) { + checkUniqueness(outputTags, outputTag); + return new ComposedKeyedCombineFnWithContext<>( + ImmutableList.>builder() + .addAll(extractInputFns) + .add(extractInputFn) + .build(), + ImmutableList.>builder() + .addAll(keyedCombineFns) + .add(toFnWithContext(perKeyCombineFn)) + .build(), + ImmutableList.>builder() + .addAll(outputTags) + .add(outputTag) + .build()); + } + + /** + * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional + * {@link GlobalCombineFn}. + */ + public ComposedKeyedCombineFnWithContext with( + SimpleFunction extractInputFn, + GlobalCombineFn perKeyCombineFn, + TupleTag outputTag) { + return with(extractInputFn, perKeyCombineFn.asKeyedFn(), outputTag); + } + + @Override + public Object[] createAccumulator(K key, Context c) { + Object[] accumsArray = new Object[combineFnCount]; + for (int i = 0; i < combineFnCount; ++i) { + accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key, c); + } + return accumsArray; + } + + @Override + public Object[] addInput(K key, Object[] accumulator, DataT value, Context c) { + for (int i = 0; i < combineFnCount; ++i) { + Object input = extractInputFns.get(i).apply(value); + accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input, c); + } + return accumulator; + } + + @Override + public Object[] mergeAccumulators(K key, Iterable accumulators, Context c) { + Iterator iter = accumulators.iterator(); + if (!iter.hasNext()) { + return createAccumulator(key, c); + } else { + // Reuses the first accumulator, and overwrites its values. + // It is safe because {@code accum[i]} only depends on + // the i-th component of each accumulator. + Object[] accum = iter.next(); + for (int i = 0; i < combineFnCount; ++i) { + accum[i] = keyedCombineFns.get(i).mergeAccumulators( + key, new ProjectionIterable(accumulators, i), c); + } + return accum; + } + } + + @Override + public CoCombineResult extractOutput(K key, Object[] accumulator, Context c) { + Map, Object> valuesMap = Maps.newHashMap(); + for (int i = 0; i < combineFnCount; ++i) { + valuesMap.put( + outputTags.get(i), + keyedCombineFns.get(i).extractOutput(key, accumulator[i], c)); + } + return new CoCombineResult(valuesMap); + } + + @Override + public Object[] compact(K key, Object[] accumulator, Context c) { + for (int i = 0; i < combineFnCount; ++i) { + accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i], c); + } + return accumulator; + } + + @Override + public Coder getAccumulatorCoder( + CoderRegistry registry, Coder keyCoder, Coder dataCoder) + throws CannotProvideCoderException { + List> coders = Lists.newArrayList(); + for (int i = 0; i < combineFnCount; ++i) { + Coder inputCoder = + registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder); + coders.add(keyedCombineFns.get(i).getAccumulatorCoder( + registry, keyCoder, inputCoder)); + } + return new ComposedAccumulatorCoder(coders); + } + } + + ///////////////////////////////////////////////////////////////////////////// + + private static class ProjectionIterable implements Iterable { + private final Iterable iterable; + private final int column; + + private ProjectionIterable(Iterable iterable, int column) { + this.iterable = iterable; + this.column = column; + } + + @Override + public Iterator iterator() { + final Iterator iter = iterable.iterator(); + return new Iterator() { + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public Object next() { + return iter.next()[column]; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + } + + private static class ComposedAccumulatorCoder extends StandardCoder { + private List> coders; + private int codersCount; + + public ComposedAccumulatorCoder(List> coders) { + this.coders = ImmutableList.copyOf(coders); + this.codersCount = coders.size(); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @JsonCreator + public static ComposedAccumulatorCoder of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List> components) { + return new ComposedAccumulatorCoder((List) components); + } + + @Override + public void encode(Object[] value, OutputStream outStream, Context context) + throws CoderException, IOException { + checkArgument(value.length == codersCount); + Context nestedContext = context.nested(); + for (int i = 0; i < codersCount; ++i) { + coders.get(i).encode(value[i], outStream, nestedContext); + } + } + + @Override + public Object[] decode(InputStream inStream, Context context) + throws CoderException, IOException { + Object[] ret = new Object[codersCount]; + Context nestedContext = context.nested(); + for (int i = 0; i < codersCount; ++i) { + ret[i] = coders.get(i).decode(inStream, nestedContext); + } + return ret; + } + + @Override + public List> getCoderArguments() { + return coders; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + for (int i = 0; i < codersCount; ++i) { + coders.get(i).verifyDeterministic(); + } + } + } + + @SuppressWarnings("unchecked") + private static CombineFnWithContext + toFnWithContext(GlobalCombineFn globalCombineFn) { + if (globalCombineFn instanceof CombineFnWithContext) { + return (CombineFnWithContext) globalCombineFn; + } else { + final CombineFn combineFn = + (CombineFn) globalCombineFn; + return new CombineFnWithContext() { + @Override + public AccumT createAccumulator(Context c) { + return combineFn.createAccumulator(); + } + @Override + public AccumT addInput(AccumT accumulator, InputT input, Context c) { + return combineFn.addInput(accumulator, input); + } + @Override + public AccumT mergeAccumulators(Iterable accumulators, Context c) { + return combineFn.mergeAccumulators(accumulators); + } + @Override + public OutputT extractOutput(AccumT accumulator, Context c) { + return combineFn.extractOutput(accumulator); + } + @Override + public AccumT compact(AccumT accumulator, Context c) { + return combineFn.compact(accumulator); + } + @Override + public OutputT defaultValue() { + return combineFn.defaultValue(); + } + @Override + public Coder getAccumulatorCoder(CoderRegistry registry, Coder inputCoder) + throws CannotProvideCoderException { + return combineFn.getAccumulatorCoder(registry, inputCoder); + } + @Override + public Coder getDefaultOutputCoder( + CoderRegistry registry, Coder inputCoder) throws CannotProvideCoderException { + return combineFn.getDefaultOutputCoder(registry, inputCoder); + } + }; + } + } + + private static KeyedCombineFnWithContext + toFnWithContext(PerKeyCombineFn perKeyCombineFn) { + if (perKeyCombineFn instanceof KeyedCombineFnWithContext) { + @SuppressWarnings("unchecked") + KeyedCombineFnWithContext keyedCombineFnWithContext = + (KeyedCombineFnWithContext) perKeyCombineFn; + return keyedCombineFnWithContext; + } else { + @SuppressWarnings("unchecked") + final KeyedCombineFn keyedCombineFn = + (KeyedCombineFn) perKeyCombineFn; + return new KeyedCombineFnWithContext() { + @Override + public AccumT createAccumulator(K key, Context c) { + return keyedCombineFn.createAccumulator(key); + } + @Override + public AccumT addInput(K key, AccumT accumulator, InputT value, Context c) { + return keyedCombineFn.addInput(key, accumulator, value); + } + @Override + public AccumT mergeAccumulators(K key, Iterable accumulators, Context c) { + return keyedCombineFn.mergeAccumulators(key, accumulators); + } + @Override + public OutputT extractOutput(K key, AccumT accumulator, Context c) { + return keyedCombineFn.extractOutput(key, accumulator); + } + @Override + public AccumT compact(K key, AccumT accumulator, Context c) { + return keyedCombineFn.compact(key, accumulator); + } + @Override + public Coder getAccumulatorCoder(CoderRegistry registry, Coder keyCoder, + Coder inputCoder) throws CannotProvideCoderException { + return keyedCombineFn.getAccumulatorCoder(registry, keyCoder, inputCoder); + } + @Override + public Coder getDefaultOutputCoder(CoderRegistry registry, Coder keyCoder, + Coder inputCoder) throws CannotProvideCoderException { + return keyedCombineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder); + } + }; + } + } + + private static void checkUniqueness( + List> registeredTags, TupleTag outputTag) { + checkArgument( + !registeredTags.contains(outputTag), + "Cannot compose with tuple tag %s because it is already present in the composition.", + outputTag); + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java index da16db99c6..fac2c2841b 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java @@ -80,7 +80,7 @@ public static FromEndOfWindow pastEndOfWindow() { public interface AfterWatermarkEarly extends TriggerBuilder { /** * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever - * the given {@code Trigger} fires before the watermark has passed the end of the window. + * the given {@code Trigger} fires after the watermark has passed the end of the window. */ TriggerBuilder withLateFirings(OnceTrigger lateTrigger); } @@ -91,7 +91,7 @@ public interface AfterWatermarkEarly extends TriggerBui public interface AfterWatermarkLate extends TriggerBuilder { /** * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever - * the given {@code Trigger} fires after the watermark has passed the end of the window. + * the given {@code Trigger} fires before the watermark has passed the end of the window. */ TriggerBuilder withEarlyFirings(OnceTrigger earlyTrigger); } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java index 5611fabe28..ec6518976b 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PropertyNames.java @@ -65,6 +65,7 @@ public class PropertyNames { public static final String INPUTS = "inputs"; public static final String INPUT_CODER = "input_coder"; public static final String IS_GENERATED = "is_generated"; + public static final String IS_MERGING_WINDOW_FN = "is_merging_window_fn"; public static final String IS_PAIR_LIKE = "is_pair_like"; public static final String IS_STREAM_LIKE = "is_stream_like"; public static final String IS_WRAPPER = "is_wrapper"; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java index 9f22fbbe9e..43955149ee 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java @@ -25,7 +25,7 @@ import com.google.cloud.dataflow.sdk.io.BoundedSource; import com.google.cloud.dataflow.sdk.io.CountingSource; import com.google.cloud.dataflow.sdk.io.Read; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; +import com.google.cloud.dataflow.sdk.io.Read.Bounded; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java index bf25970aff..0120b9880d 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.when; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java index 5c9e824afe..4ced82f8c7 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java @@ -23,7 +23,6 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java index 24251522d8..52398cf73a 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java @@ -1047,6 +1047,18 @@ public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() { assertThat(built.getCompletedTimers(), emptyIterable()); } + @Test + public void timerUpdateWithCompletedTimersNotAddedToExisting() { + TimerUpdateBuilder builder = TimerUpdate.builder(null); + TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME); + + TimerUpdate built = builder.build(); + assertThat(built.getCompletedTimers(), emptyIterable()); + assertThat( + built.withCompletedTimers(ImmutableList.of(timer)).getCompletedTimers(), contains(timer)); + assertThat(built.getCompletedTimers(), emptyIterable()); + } + private static Matcher earlierThan(final Instant laterInstant) { return new BaseMatcher() { @Override diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java new file mode 100644 index 0000000000..149096040a --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -0,0 +1,436 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.coders.VarIntCoder; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; +import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo.Timing; +import com.google.cloud.dataflow.sdk.util.SideInputReader; +import com.google.cloud.dataflow.sdk.util.TimeDomain; +import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.util.common.Counter; +import com.google.cloud.dataflow.sdk.util.common.Counter.AggregationKind; +import com.google.cloud.dataflow.sdk.util.common.CounterSet; +import com.google.cloud.dataflow.sdk.util.state.BagState; +import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals; +import com.google.cloud.dataflow.sdk.util.state.StateNamespaces; +import com.google.cloud.dataflow.sdk.util.state.StateTag; +import com.google.cloud.dataflow.sdk.util.state.StateTags; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.PValue; +import com.google.common.collect.ImmutableList; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Tests for {@link InProcessEvaluationContext}. + */ +@RunWith(JUnit4.class) +public class InProcessEvaluationContextTest { + private TestPipeline p; + private InProcessEvaluationContext context; + private PCollection created; + private PCollection> downstream; + private PCollectionView> view; + + @Before + public void setup() { + InProcessPipelineRunner runner = + InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create()); + p = TestPipeline.create(); + created = p.apply(Create.of(1, 2, 3)); + downstream = created.apply(WithKeys.of("foo")); + view = created.apply(View.asIterable()); + Collection> rootTransforms = + ImmutableList.>of(created.getProducingTransformInternal()); + Map>> valueToConsumers = new HashMap<>(); + valueToConsumers.put( + created, + ImmutableList.>of( + downstream.getProducingTransformInternal(), view.getProducingTransformInternal())); + valueToConsumers.put(downstream, ImmutableList.>of()); + valueToConsumers.put(view, ImmutableList.>of()); + + Map, String> stepNames = new HashMap<>(); + stepNames.put(created.getProducingTransformInternal(), "s1"); + stepNames.put(downstream.getProducingTransformInternal(), "s2"); + stepNames.put(view.getProducingTransformInternal(), "s3"); + + Collection> views = ImmutableList.>of(view); + context = InProcessEvaluationContext.create( + runner.getPipelineOptions(), + rootTransforms, + valueToConsumers, + stepNames, + views); + } + + @Test + public void writeToViewWriterThenReadReads() { + PCollectionViewWriter> viewWriter = + context.createPCollectionViewWriter( + PCollection.>createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED), + view); + BoundedWindow window = new TestBoundedWindow(new Instant(1024L)); + BoundedWindow second = new TestBoundedWindow(new Instant(899999L)); + WindowedValue firstValue = + WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); + WindowedValue secondValue = + WindowedValue.of( + 2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)); + Iterable> values = ImmutableList.of(firstValue, secondValue); + viewWriter.add(values); + + SideInputReader reader = + context.createSideInputReader(ImmutableList.>of(view)); + assertThat(reader.get(view, window), containsInAnyOrder(1)); + assertThat(reader.get(view, second), containsInAnyOrder(2)); + + WindowedValue overrittenSecondValue = + WindowedValue.of( + 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1)); + viewWriter.add(Collections.singleton(overrittenSecondValue)); + assertThat(reader.get(view, second), containsInAnyOrder(4444)); + } + + @Test + public void getExecutionContextSameStepSameKeyState() { + InProcessExecutionContext fooContext = + context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + + StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1", null); + stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); + + context.handleResult( + InProcessBundle.keyed(created, "foo").commit(Instant.now()), + ImmutableList.of(), + StepTransformResult.withoutHold(created.getProducingTransformInternal()) + .withState(stepContext.commitState()) + .build()); + + InProcessExecutionContext secondFooContext = + context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + assertThat( + secondFooContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .read(), + contains(1)); + } + + + @Test + public void getExecutionContextDifferentKeysIndependentState() { + InProcessExecutionContext fooContext = + context.getExecutionContext(created.getProducingTransformInternal(), "foo"); + + StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + fooContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .add(1); + + InProcessExecutionContext barContext = + context.getExecutionContext(created.getProducingTransformInternal(), "bar"); + assertThat(barContext, not(equalTo(fooContext))); + assertThat( + barContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .read(), + emptyIterable()); + } + + @Test + public void getExecutionContextDifferentStepsIndependentState() { + String myKey = "foo"; + InProcessExecutionContext fooContext = + context.getExecutionContext(created.getProducingTransformInternal(), myKey); + + StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + fooContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .add(1); + + InProcessExecutionContext barContext = + context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + assertThat( + barContext + .getOrCreateStepContext("s1", "s1", null) + .stateInternals() + .state(StateNamespaces.global(), intBag) + .read(), + emptyIterable()); + } + + @Test + public void handleResultMergesCounters() { + CounterSet counters = context.createCounterSet(); + Counter myCounter = Counter.longs("foo", AggregationKind.SUM); + counters.addCounter(myCounter); + + myCounter.addValue(4L); + InProcessTransformResult result = + StepTransformResult.withoutHold(created.getProducingTransformInternal()) + .withCounters(counters) + .build(); + context.handleResult(null, ImmutableList.of(), result); + assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(4L)); + + CounterSet againCounters = context.createCounterSet(); + Counter myLongCounterAgain = Counter.longs("foo", AggregationKind.SUM); + againCounters.add(myLongCounterAgain); + myLongCounterAgain.addValue(8L); + + InProcessTransformResult secondResult = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + .withCounters(againCounters) + .build(); + context.handleResult( + InProcessBundle.unkeyed(created).commit(Instant.now()), + ImmutableList.of(), + secondResult); + assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L)); + } + + @Test + public void handleResultStoresState() { + String myKey = "foo"; + InProcessExecutionContext fooContext = + context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + + StateTag> intBag = StateTags.bag("myBag", VarIntCoder.of()); + + CopyOnAccessInMemoryStateInternals state = + fooContext.getOrCreateStepContext("s1", "s1", null).stateInternals(); + BagState bag = state.state(StateNamespaces.global(), intBag); + bag.add(1); + bag.add(2); + bag.add(4); + + InProcessTransformResult stateResult = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + .withState(state) + .build(); + + context.handleResult( + InProcessBundle.keyed(created, myKey).commit(Instant.now()), + ImmutableList.of(), + stateResult); + + InProcessExecutionContext afterResultContext = + context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + + CopyOnAccessInMemoryStateInternals afterResultState = + afterResultContext.getOrCreateStepContext("s1", "s1", null).stateInternals(); + assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4)); + } + + @Test + public void callAfterOutputMustHaveBeenProducedAfterEndOfWatermarkCallsback() throws Exception { + final CountDownLatch callLatch = new CountDownLatch(1); + Runnable callback = + new Runnable() { + @Override + public void run() { + callLatch.countDown(); + } + }; + + // Should call back after the end of the global window + context.scheduleAfterOutputWouldBeProduced( + downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); + + InProcessTransformResult result = + StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) + .build(); + + context.handleResult(null, ImmutableList.of(), result); + + // Difficult to demonstrate that we took no action in a multithreaded world; poll for a bit + // will likely be flaky if this logic is broken + assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false)); + + InProcessTransformResult finishedResult = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + context.handleResult(null, ImmutableList.of(), finishedResult); + // Obtain the value via blocking call + assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true)); + } + + @Test + public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception { + InProcessTransformResult finishedResult = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + context.handleResult(null, ImmutableList.of(), finishedResult); + + final CountDownLatch callLatch = new CountDownLatch(1); + Runnable callback = + new Runnable() { + @Override + public void run() { + callLatch.countDown(); + } + }; + context.scheduleAfterOutputWouldBeProduced( + downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); + assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true)); + } + + @Test + public void extractFiredTimersExtractsTimers() { + InProcessTransformResult holdResult = + StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) + .build(); + context.handleResult(null, ImmutableList.of(), holdResult); + + String key = "foo"; + TimerData toFire = + TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME); + InProcessTransformResult timerResult = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null)) + .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build()) + .build(); + + // haven't added any timers, must be empty + assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + context.handleResult( + InProcessBundle.keyed(created, key).commit(Instant.now()), + ImmutableList.of(), + timerResult); + + // timer hasn't fired + assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + + InProcessTransformResult advanceResult = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + // Should cause the downstream timer to fire + context.handleResult(null, ImmutableList.of(), advanceResult); + + Map, Map> fired = context.extractFiredTimers(); + assertThat( + fired, + Matchers.>hasKey(downstream.getProducingTransformInternal())); + Map downstreamFired = + fired.get(downstream.getProducingTransformInternal()); + assertThat(downstreamFired, Matchers.hasKey(key)); + + FiredTimers firedForKey = downstreamFired.get(key); + assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), emptyIterable()); + assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), emptyIterable()); + assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire)); + + // Don't reextract timers + assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + } + + @Test + public void createBundleUnkeyedResultUnkeyed() { + CommittedBundle> newBundle = + context + .createBundle(InProcessBundle.unkeyed(created).commit(Instant.now()), downstream) + .commit(Instant.now()); + assertThat(newBundle.isKeyed(), is(false)); + } + + @Test + public void createBundleKeyedResultPropagatesKey() { + CommittedBundle> newBundle = + context + .createBundle(InProcessBundle.keyed(created, "foo").commit(Instant.now()), downstream) + .commit(Instant.now()); + assertThat(newBundle.isKeyed(), is(true)); + assertThat(newBundle.getKey(), Matchers.equalTo("foo")); + } + + @Test + public void createRootBundleUnkeyed() { + assertThat(context.createRootBundle(created).commit(Instant.now()).isKeyed(), is(false)); + } + + @Test + public void createKeyedBundleKeyed() { + CommittedBundle> keyedBundle = + context + .createKeyedBundle( + InProcessBundle.unkeyed(created).commit(Instant.now()), "foo", downstream) + .commit(Instant.now()); + assertThat(keyedBundle.isKeyed(), is(true)); + assertThat(keyedBundle.getKey(), Matchers.equalTo("foo")); + } + + private static class TestBoundedWindow extends BoundedWindow { + private final Instant ts; + + public TestBoundedWindow(Instant ts) { + this.ts = ts; + } + + @Override + public Instant maxTimestamp() { + return ts; + } + } +} diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java index 4cfe782936..16b4eb7d07 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java @@ -24,7 +24,6 @@ import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.Mean; @@ -137,7 +136,7 @@ public void getAfterWriteReturnsPaneInWindow() throws Exception { container.write(mapView, ImmutableList.>of(one, two)); Map viewContents = - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(mapView, firstWindow); assertThat(viewContents, hasEntry("one", 1)); assertThat(viewContents, hasEntry("two", 2)); @@ -153,7 +152,7 @@ public void getReturnsLatestPaneInWindow() throws Exception { container.write(mapView, ImmutableList.>of(one, two)); Map viewContents = - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(mapView, secondWindow); assertThat(viewContents, hasEntry("one", 1)); assertThat(viewContents, hasEntry("two", 2)); @@ -164,7 +163,7 @@ public void getReturnsLatestPaneInWindow() throws Exception { container.write(mapView, ImmutableList.>of(three)); Map overwrittenViewContents = - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(mapView, secondWindow); assertThat(overwrittenViewContents, hasEntry("three", 3)); assertThat(overwrittenViewContents.size(), is(1)); @@ -176,15 +175,18 @@ public void getReturnsLatestPaneInWindow() throws Exception { */ @Test public void getBlocksUntilPaneAvailable() throws Exception { - BoundedWindow window = new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(1024L); - } - }; + BoundedWindow window = + new BoundedWindow() { + @Override + public Instant maxTimestamp() { + return new Instant(1024L); + } + }; Future singletonFuture = - getFutureOfView(container.withViews(ImmutableList.>of(singletonView)), - singletonView, window); + getFutureOfView( + container.createReaderForViews(ImmutableList.>of(singletonView)), + singletonView, + window); WindowedValue singletonValue = WindowedValue.of(4.75, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); @@ -203,7 +205,7 @@ public Instant maxTimestamp() { } }; SideInputReader newReader = - container.withViews(ImmutableList.>of(singletonView)); + container.createReaderForViews(ImmutableList.>of(singletonView)); Future singletonFuture = getFutureOfView(newReader, singletonView, window); WindowedValue singletonValue = @@ -216,25 +218,31 @@ public Instant maxTimestamp() { @Test public void withPCollectionViewsErrorsForContainsNotInViews() { - PCollectionView>> newView = PCollectionViews.multimapView(pipeline, - WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + PCollectionView>> newView = + PCollectionViews.multimapView( + pipeline, + WindowingStrategy.globalDefault(), + KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("with unknown views " + ImmutableList.of(newView).toString()); - container.withViews(ImmutableList.>of(newView)); + container.createReaderForViews(ImmutableList.>of(newView)); } @Test public void withViewsForViewNotInContainerFails() { - PCollectionView>> newView = PCollectionViews.multimapView(pipeline, - WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + PCollectionView>> newView = + PCollectionViews.multimapView( + pipeline, + WindowingStrategy.globalDefault(), + KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("unknown views"); thrown.expectMessage(newView.toString()); - container.withViews(ImmutableList.>of(newView)); + container.createReaderForViews(ImmutableList.>of(newView)); } @Test @@ -242,7 +250,7 @@ public void getOnReaderForViewNotInReaderFails() { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("unknown view: " + iterableView.toString()); - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(iterableView, GlobalWindow.INSTANCE); } @@ -255,11 +263,11 @@ public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exceptio PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue)); assertThat( - container.withViews(ImmutableList.>of(singletonView)) + container.createReaderForViews(ImmutableList.>of(singletonView)) .get(singletonView, firstWindow), equalTo(2.875)); assertThat( - container.withViews(ImmutableList.>of(singletonView)) + container.createReaderForViews(ImmutableList.>of(singletonView)) .get(singletonView, secondWindow), equalTo(4.125)); } @@ -274,7 +282,7 @@ public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Excep container.write(iterableView, ImmutableList.of(firstValue, secondValue)); assertThat( - container.withViews(ImmutableList.>of(iterableView)) + container.createReaderForViews(ImmutableList.>of(iterableView)) .get(iterableView, firstWindow), contains(44, 44)); } @@ -286,11 +294,11 @@ public void writeForElementInMultipleWindowsSucceeds() throws Exception { ImmutableList.of(firstWindow, secondWindow), PaneInfo.ON_TIME_AND_ONLY_FIRING); container.write(singletonView, ImmutableList.of(multiWindowedValue)); assertThat( - container.withViews(ImmutableList.>of(singletonView)) + container.createReaderForViews(ImmutableList.>of(singletonView)) .get(singletonView, firstWindow), equalTo(2.875)); assertThat( - container.withViews(ImmutableList.>of(singletonView)) + container.createReaderForViews(ImmutableList.>of(singletonView)) .get(singletonView, secondWindow), equalTo(2.875)); } @@ -306,7 +314,7 @@ public void finishDoesNotOverwriteWrittenElements() throws Exception { immediatelyInvokeCallback(mapView, secondWindow); Map viewContents = - container.withViews(ImmutableList.>of(mapView)) + container.createReaderForViews(ImmutableList.>of(mapView)) .get(mapView, secondWindow); assertThat(viewContents, hasEntry("one", 1)); @@ -317,8 +325,11 @@ public void finishDoesNotOverwriteWrittenElements() throws Exception { @Test public void finishOnPendingViewsSetsEmptyElements() throws Exception { immediatelyInvokeCallback(mapView, secondWindow); - Future> mapFuture = getFutureOfView( - container.withViews(ImmutableList.>of(mapView)), mapView, secondWindow); + Future> mapFuture = + getFutureOfView( + container.createReaderForViews(ImmutableList.>of(mapView)), + mapView, + secondWindow); assertThat(mapFuture.get().isEmpty(), is(true)); } @@ -329,18 +340,21 @@ public void finishOnPendingViewsSetsEmptyElements() throws Exception { */ private void immediatelyInvokeCallback(PCollectionView view, BoundedWindow window) { doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Object callback = invocation.getArguments()[3]; - Runnable callbackRunnable = (Runnable) callback; - callbackRunnable.run(); - return null; - } - }) + new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Object callback = invocation.getArguments()[3]; + Runnable callbackRunnable = (Runnable) callback; + callbackRunnable.run(); + return null; + } + }) .when(context) - .callAfterOutputMustHaveBeenProduced(Mockito.eq(view), Mockito.eq(window), - Mockito.eq(view.getWindowingStrategyInternal()), Mockito.any(Runnable.class)); + .scheduleAfterOutputWouldBeProduced( + Mockito.eq(view), + Mockito.eq(window), + Mockito.eq(view.getWindowingStrategyInternal()), + Mockito.any(Runnable.class)); } private Future getFutureOfView(final SideInputReader myReader, diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java index 033f9de204..66430b6a7f 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java @@ -26,7 +26,6 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java index ae599bab62..3b928b9077 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java @@ -26,7 +26,6 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java index f139c5648e..a9bbcc8cc5 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java @@ -25,7 +25,6 @@ import com.google.cloud.dataflow.sdk.io.CountingSource; import com.google.cloud.dataflow.sdk.io.Read; import com.google.cloud.dataflow.sdk.io.UnboundedSource; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java index 2f5cd0fb88..2f5bdde6ad 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java @@ -25,7 +25,6 @@ import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java new file mode 100644 index 0000000000..be3e062fbd --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Tests for {@link WatermarkCallbackExecutor}. + */ +@RunWith(JUnit4.class) +public class WatermarkCallbackExecutorTest { + private WatermarkCallbackExecutor executor = WatermarkCallbackExecutor.create(); + private AppliedPTransform create; + private AppliedPTransform sum; + + @Before + public void setup() { + TestPipeline p = TestPipeline.create(); + PCollection created = p.apply(Create.of(1, 2, 3)); + create = created.getProducingTransformInternal(); + sum = created.apply(Sum.integersGlobally()).getProducingTransformInternal(); + } + + @Test + public void onGuaranteedFiringFiresAfterTrigger() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + executor.callOnGuaranteedFiring( + create, + GlobalWindow.INSTANCE, + WindowingStrategy.globalDefault(), + new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, BoundedWindow.TIMESTAMP_MAX_VALUE); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true)); + } + + @Test + public void multipleCallbacksShouldFireFires() throws Exception { + CountDownLatch latch = new CountDownLatch(2); + WindowFn windowFn = FixedWindows.of(Duration.standardMinutes(10)); + IntervalWindow window = + new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); + executor.callOnGuaranteedFiring( + create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + executor.callOnGuaranteedFiring( + create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(10))); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true)); + } + + @Test + public void noCallbacksShouldFire() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + WindowFn windowFn = FixedWindows.of(Duration.standardMinutes(10)); + IntervalWindow window = + new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); + executor.callOnGuaranteedFiring( + create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(5))); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false)); + } + + @Test + public void unrelatedStepShouldNotFire() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + WindowFn windowFn = FixedWindows.of(Duration.standardMinutes(10)); + IntervalWindow window = + new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); + executor.callOnGuaranteedFiring( + sum, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(20))); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false)); + } + + private static class CountDownLatchCallback implements Runnable { + private final CountDownLatch latch; + + public CountDownLatchCallback(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void run() { + latch.countDown(); + } + } +} diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/CombineFnsTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/CombineFnsTest.java new file mode 100644 index 0000000000..ad37708677 --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/CombineFnsTest.java @@ -0,0 +1,413 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.transforms; + +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.NullableCoder; +import com.google.cloud.dataflow.sdk.coders.StandardCoder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.RunnableOnService; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Combine.BinaryCombineFn; +import com.google.cloud.dataflow.sdk.transforms.CombineFns.CoCombineResult; +import com.google.cloud.dataflow.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import com.google.cloud.dataflow.sdk.transforms.Max.MaxIntegerFn; +import com.google.cloud.dataflow.sdk.transforms.Min.MinIntegerFn; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.common.collect.ImmutableList; + +import org.hamcrest.Matchers; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; + +/** + * Unit tests for {@link CombineFns}. + */ +@RunWith(JUnit4.class) +public class CombineFnsTest { + @Rule public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testDuplicatedTags() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("it is already present in the composition"); + + TupleTag tag = new TupleTag(); + CombineFns.compose() + .with(new GetIntegerFunction(), new MaxIntegerFn(), tag) + .with(new GetIntegerFunction(), new MinIntegerFn(), tag); + } + + @Test + public void testDuplicatedTagsKeyed() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("it is already present in the composition"); + + TupleTag tag = new TupleTag(); + CombineFns.composeKeyed() + .with(new GetIntegerFunction(), new MaxIntegerFn(), tag) + .with(new GetIntegerFunction(), new MinIntegerFn(), tag); + } + + @Test + public void testDuplicatedTagsWithContext() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("it is already present in the composition"); + + TupleTag tag = new TupleTag(); + CombineFns.compose() + .with( + new GetUserStringFunction(), + new ConcatStringWithContext(null /* view */).forKey("G", StringUtf8Coder.of()), + tag) + .with( + new GetUserStringFunction(), + new ConcatStringWithContext(null /* view */).forKey("G", StringUtf8Coder.of()), + tag); + } + + @Test + public void testDuplicatedTagsWithContextKeyed() { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("it is already present in the composition"); + + TupleTag tag = new TupleTag(); + CombineFns.composeKeyed() + .with( + new GetUserStringFunction(), + new ConcatStringWithContext(null /* view */), + tag) + .with( + new GetUserStringFunction(), + new ConcatStringWithContext(null /* view */), + tag); + } + + @Test + @Category(RunnableOnService.class) + public void testComposedCombine() { + Pipeline p = TestPipeline.create(); + p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of()); + + PCollection>> perKeyInput = p.apply( + Create.timestamped( + Arrays.asList( + KV.of("a", KV.of(1, UserString.of("1"))), + KV.of("a", KV.of(1, UserString.of("1"))), + KV.of("a", KV.of(4, UserString.of("4"))), + KV.of("b", KV.of(1, UserString.of("1"))), + KV.of("b", KV.of(13, UserString.of("13")))), + Arrays.asList(0L, 4L, 7L, 10L, 16L)) + .withCoder(KvCoder.of( + StringUtf8Coder.of(), + KvCoder.of(BigEndianIntegerCoder.of(), UserStringCoder.of())))); + + TupleTag maxIntTag = new TupleTag(); + TupleTag concatStringTag = new TupleTag(); + PCollection>> combineGlobally = perKeyInput + .apply(Values.>create()) + .apply(Combine.globally(CombineFns.compose() + .with( + new GetIntegerFunction(), + new MaxIntegerFn(), + maxIntTag) + .with( + new GetUserStringFunction(), + new ConcatString(), + concatStringTag))) + .apply(WithKeys.of("global")) + .apply( + "ExtractGloballyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag))); + + PCollection>> combinePerKey = perKeyInput + .apply(Combine.perKey(CombineFns.composeKeyed() + .with( + new GetIntegerFunction(), + new MaxIntegerFn().asKeyedFn(), + maxIntTag) + .with( + new GetUserStringFunction(), + new ConcatString().asKeyedFn(), + concatStringTag))) + .apply("ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag))); + DataflowAssert.that(combineGlobally).containsInAnyOrder( + KV.of("global", KV.of(13, "111134"))); + DataflowAssert.that(combinePerKey).containsInAnyOrder( + KV.of("a", KV.of(4, "114")), + KV.of("b", KV.of(13, "113"))); + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testComposedCombineWithContext() { + Pipeline p = TestPipeline.create(); + p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of()); + + PCollectionView view = p + .apply(Create.of("I")) + .apply(View.asSingleton()); + + PCollection>> perKeyInput = p.apply( + Create.timestamped( + Arrays.asList( + KV.of("a", KV.of(1, UserString.of("1"))), + KV.of("a", KV.of(1, UserString.of("1"))), + KV.of("a", KV.of(4, UserString.of("4"))), + KV.of("b", KV.of(1, UserString.of("1"))), + KV.of("b", KV.of(13, UserString.of("13")))), + Arrays.asList(0L, 4L, 7L, 10L, 16L)) + .withCoder(KvCoder.of( + StringUtf8Coder.of(), + KvCoder.of(BigEndianIntegerCoder.of(), UserStringCoder.of())))); + + TupleTag maxIntTag = new TupleTag(); + TupleTag concatStringTag = new TupleTag(); + PCollection>> combineGlobally = perKeyInput + .apply(Values.>create()) + .apply(Combine.globally(CombineFns.compose() + .with( + new GetIntegerFunction(), + new MaxIntegerFn(), + maxIntTag) + .with( + new GetUserStringFunction(), + new ConcatStringWithContext(view).forKey("G", StringUtf8Coder.of()), + concatStringTag)) + .withoutDefaults() + .withSideInputs(ImmutableList.of(view))) + .apply(WithKeys.of("global")) + .apply( + "ExtractGloballyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag))); + + PCollection>> combinePerKey = perKeyInput + .apply(Combine.perKey(CombineFns.composeKeyed() + .with( + new GetIntegerFunction(), + new MaxIntegerFn().asKeyedFn(), + maxIntTag) + .with( + new GetUserStringFunction(), + new ConcatStringWithContext(view), + concatStringTag)) + .withSideInputs(ImmutableList.of(view))) + .apply("ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag))); + DataflowAssert.that(combineGlobally).containsInAnyOrder( + KV.of("global", KV.of(13, "111134GI"))); + DataflowAssert.that(combinePerKey).containsInAnyOrder( + KV.of("a", KV.of(4, "114Ia")), + KV.of("b", KV.of(13, "113Ib"))); + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testComposedCombineNullValues() { + Pipeline p = TestPipeline.create(); + p.getCoderRegistry().registerCoder(UserString.class, NullableCoder.of(UserStringCoder.of())); + p.getCoderRegistry().registerCoder(String.class, NullableCoder.of(StringUtf8Coder.of())); + + PCollection>> perKeyInput = p.apply( + Create.timestamped( + Arrays.asList( + KV.of("a", KV.of(1, UserString.of("1"))), + KV.of("a", KV.of(1, UserString.of("1"))), + KV.of("a", KV.of(4, UserString.of("4"))), + KV.of("b", KV.of(1, UserString.of("1"))), + KV.of("b", KV.of(13, UserString.of("13")))), + Arrays.asList(0L, 4L, 7L, 10L, 16L)) + .withCoder(KvCoder.of( + StringUtf8Coder.of(), + KvCoder.of( + BigEndianIntegerCoder.of(), NullableCoder.of(UserStringCoder.of()))))); + + TupleTag maxIntTag = new TupleTag(); + TupleTag concatStringTag = new TupleTag(); + + PCollection>> combinePerKey = perKeyInput + .apply(Combine.perKey(CombineFns.composeKeyed() + .with( + new GetIntegerFunction(), + new MaxIntegerFn().asKeyedFn(), + maxIntTag) + .with( + new GetUserStringFunction(), + new OutputNullString().asKeyedFn(), + concatStringTag))) + .apply("ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag))); + DataflowAssert.that(combinePerKey).containsInAnyOrder( + KV.of("a", KV.of(4, (String) null)), + KV.of("b", KV.of(13, (String) null))); + p.run(); + } + + private static class UserString implements Serializable { + private String strValue; + + static UserString of(String strValue) { + UserString ret = new UserString(); + ret.strValue = strValue; + return ret; + } + } + + private static class UserStringCoder extends StandardCoder { + public static UserStringCoder of() { + return INSTANCE; + } + + private static final UserStringCoder INSTANCE = new UserStringCoder(); + + @Override + public void encode(UserString value, OutputStream outStream, Context context) + throws CoderException, IOException { + StringUtf8Coder.of().encode(value.strValue, outStream, context); + } + + @Override + public UserString decode(InputStream inStream, Context context) + throws CoderException, IOException { + return UserString.of(StringUtf8Coder.of().decode(inStream, context)); + } + + @Override + public List> getCoderArguments() { + return null; + } + + @Override + public void verifyDeterministic() throws NonDeterministicException {} + } + + private static class GetIntegerFunction + extends SimpleFunction, Integer> { + @Override + public Integer apply(KV input) { + return input.getKey(); + } + } + + private static class GetUserStringFunction + extends SimpleFunction, UserString> { + @Override + public UserString apply(KV input) { + return input.getValue(); + } + } + + private static class ConcatString extends BinaryCombineFn { + @Override + public UserString apply(UserString left, UserString right) { + String retStr = left.strValue + right.strValue; + char[] chars = retStr.toCharArray(); + Arrays.sort(chars); + return UserString.of(new String(chars)); + } + } + + private static class OutputNullString extends BinaryCombineFn { + @Override + public UserString apply(UserString left, UserString right) { + return null; + } + } + + private static class ConcatStringWithContext + extends KeyedCombineFnWithContext { + private final PCollectionView view; + + private ConcatStringWithContext(PCollectionView view) { + this.view = view; + } + + @Override + public UserString createAccumulator(String key, CombineWithContext.Context c) { + return UserString.of(key + c.sideInput(view)); + } + + @Override + public UserString addInput( + String key, UserString accumulator, UserString input, CombineWithContext.Context c) { + assertThat(accumulator.strValue, Matchers.startsWith(key + c.sideInput(view))); + accumulator.strValue += input.strValue; + return accumulator; + } + + @Override + public UserString mergeAccumulators( + String key, Iterable accumulators, CombineWithContext.Context c) { + String keyPrefix = key + c.sideInput(view); + String all = keyPrefix; + for (UserString accumulator : accumulators) { + assertThat(accumulator.strValue, Matchers.startsWith(keyPrefix)); + all += accumulator.strValue.substring(keyPrefix.length()); + accumulator.strValue = "cleared in mergeAccumulators"; + } + return UserString.of(all); + } + + @Override + public UserString extractOutput( + String key, UserString accumulator, CombineWithContext.Context c) { + assertThat(accumulator.strValue, Matchers.startsWith(key + c.sideInput(view))); + char[] chars = accumulator.strValue.toCharArray(); + Arrays.sort(chars); + return UserString.of(new String(chars)); + } + } + + private static class ExtractResultDoFn + extends DoFn, KV>>{ + + private final TupleTag maxIntTag; + private final TupleTag concatStringTag; + + ExtractResultDoFn(TupleTag maxIntTag, TupleTag concatStringTag) { + this.maxIntTag = maxIntTag; + this.concatStringTag = concatStringTag; + } + + @Override + public void processElement(ProcessContext c) throws Exception { + UserString userString = c.element().getValue().get(concatStringTag); + KV value = KV.of( + c.element().getValue().get(maxIntTag), + userString == null ? null : userString.strValue); + c.output(KV.of(c.element().getKey(), value)); + } + } +}